Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-11 03:34:14

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 
0004 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0005 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0006 #include "FWCore/Framework/interface/RunPrincipal.h"
0007 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0008 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0009 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0010 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0011 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0012 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0013 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0014 #include "FWCore/Concurrency/interface/FunctorTask.h"
0015 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0016 #include "FWCore/Utilities/interface/make_sentry.h"
0017 
0018 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0019 
0020 #include "FWCore/Framework/interface/global/OutputModuleBase.h"
0021 #include "FWCore/Framework/interface/one/OutputModuleBase.h"
0022 #include "FWCore/Framework/interface/limited/OutputModuleBase.h"
0023 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0024 
0025 namespace {
0026 
0027   template <typename F>
0028   void async(edm::one::OutputModuleBase& iMod, oneapi::tbb::task_group& iGroup, F&& iFunc) {
0029     iMod.sharedResourcesAcquirer().serialQueueChain().push(iGroup, std::move(iFunc));
0030   }
0031 
0032   template <typename F>
0033   void async(edm::limited::OutputModuleBase& iMod, oneapi::tbb::task_group& iGroup, F&& iFunc) {
0034     iMod.queue().push(iGroup, std::move(iFunc));
0035   }
0036 
0037   template <typename F>
0038   void async(edm::global::OutputModuleBase&, oneapi::tbb::task_group& iGroup, F iFunc) {
0039     //NOTE, need the functor since group can not run a 'mutable' lambda
0040     auto t = edm::make_functor_task(iFunc);
0041     iGroup.run([t]() {
0042       edm::TaskSentry s(t);
0043       t->execute();
0044     });
0045   }
0046 }  // namespace
0047 
0048 namespace edm {
0049 
0050   template <typename T>
0051   void OutputModuleCommunicatorT<T>::closeFile() {
0052     module().doCloseFile();
0053   }
0054 
0055   template <typename T>
0056   bool OutputModuleCommunicatorT<T>::shouldWeCloseFile() const {
0057     return module().shouldWeCloseFile();
0058   }
0059 
0060   template <typename T>
0061   void OutputModuleCommunicatorT<T>::openFile(edm::FileBlock const& fb) {
0062     module().doOpenFile(fb);
0063   }
0064 
0065   template <typename T>
0066   void OutputModuleCommunicatorT<T>::writeProcessBlockAsync(WaitingTaskHolder iTask,
0067                                                             ProcessBlockPrincipal const& processBlockPrincipal,
0068                                                             ProcessContext const* processContext,
0069                                                             ActivityRegistry* activityRegistry) noexcept {
0070     auto token = ServiceRegistry::instance().presentToken();
0071     GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
0072                                 LuminosityBlockID(),
0073                                 RunIndex::invalidRunIndex(),
0074                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
0075                                 Timestamp::invalidTimestamp(),
0076                                 processContext);
0077     auto t = [&mod = module(),
0078               &processBlockPrincipal,
0079               globalContext,
0080               token,
0081               desc = &description(),
0082               activityRegistry,
0083               iTask]() mutable {
0084       std::exception_ptr ex;
0085       // Caught exception is propagated via WaitingTaskHolder
0086       CMS_SA_ALLOW try {
0087         ServiceRegistry::Operate op(token);
0088         ParentContext parentContext(&globalContext);
0089         ModuleCallingContext mcc(desc);
0090         ModuleContextSentry moduleContextSentry(&mcc, parentContext);
0091         activityRegistry->preModuleWriteProcessBlockSignal_(globalContext, mcc);
0092         auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
0093           ar->postModuleWriteProcessBlockSignal_(globalContext, mcc);
0094         }));
0095         mod.doWriteProcessBlock(processBlockPrincipal, &mcc);
0096       } catch (...) {
0097         ex = std::current_exception();
0098       }
0099       iTask.doneWaiting(ex);
0100     };
0101     async(module(), *iTask.group(), std::move(t));
0102   }
0103 
0104   template <typename T>
0105   void OutputModuleCommunicatorT<T>::writeRunAsync(
0106       WaitingTaskHolder iTask,
0107       edm::RunPrincipal const& rp,
0108       ProcessContext const* processContext,
0109       ActivityRegistry* activityRegistry,
0110       MergeableRunProductMetadata const* mergeableRunProductMetadata) noexcept {
0111     auto token = ServiceRegistry::instance().presentToken();
0112     GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
0113                                 LuminosityBlockID(rp.run(), 0),
0114                                 rp.index(),
0115                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
0116                                 rp.endTime(),
0117                                 processContext);
0118     auto t = [&mod = module(),
0119               &rp,
0120               globalContext,
0121               token,
0122               desc = &description(),
0123               activityRegistry,
0124               mergeableRunProductMetadata,
0125               iTask]() mutable {
0126       std::exception_ptr ex;
0127       // Caught exception is propagated via WaitingTaskHolder
0128       CMS_SA_ALLOW try {
0129         ServiceRegistry::Operate op(token);
0130         ParentContext parentContext(&globalContext);
0131         ModuleCallingContext mcc(desc);
0132         ModuleContextSentry moduleContextSentry(&mcc, parentContext);
0133         activityRegistry->preModuleWriteRunSignal_(globalContext, mcc);
0134         auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
0135           ar->postModuleWriteRunSignal_(globalContext, mcc);
0136         }));
0137         mod.doWriteRun(rp, &mcc, mergeableRunProductMetadata);
0138       } catch (...) {
0139         ex = std::current_exception();
0140       }
0141       iTask.doneWaiting(ex);
0142     };
0143     async(module(), *iTask.group(), std::move(t));
0144   }
0145 
0146   template <typename T>
0147   void OutputModuleCommunicatorT<T>::writeLumiAsync(WaitingTaskHolder iTask,
0148                                                     edm::LuminosityBlockPrincipal const& lbp,
0149                                                     ProcessContext const* processContext,
0150                                                     ActivityRegistry* activityRegistry) noexcept {
0151     auto token = ServiceRegistry::instance().presentToken();
0152     GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
0153                                 lbp.id(),
0154                                 lbp.runPrincipal().index(),
0155                                 lbp.index(),
0156                                 lbp.beginTime(),
0157                                 processContext);
0158     auto t = [&mod = module(), &lbp, activityRegistry, token, globalContext, desc = &description(), iTask]() mutable {
0159       std::exception_ptr ex;
0160       // Caught exception is propagated via WaitingTaskHolder
0161       CMS_SA_ALLOW try {
0162         ServiceRegistry::Operate op(token);
0163 
0164         ParentContext parentContext(&globalContext);
0165         ModuleCallingContext mcc(desc);
0166         ModuleContextSentry moduleContextSentry(&mcc, parentContext);
0167         activityRegistry->preModuleWriteLumiSignal_(globalContext, mcc);
0168         auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
0169           ar->postModuleWriteLumiSignal_(globalContext, mcc);
0170         }));
0171         mod.doWriteLuminosityBlock(lbp, &mcc);
0172       } catch (...) {
0173         ex = std::current_exception();
0174       }
0175       iTask.doneWaiting(ex);
0176     };
0177     async(module(), *iTask.group(), std::move(t));
0178   }
0179 
0180   template <typename T>
0181   bool OutputModuleCommunicatorT<T>::wantAllEvents() const {
0182     return module().wantAllEvents();
0183   }
0184 
0185   template <typename T>
0186   bool OutputModuleCommunicatorT<T>::limitReached() const {
0187     return module().limitReached();
0188   }
0189 
0190   template <typename T>
0191   void OutputModuleCommunicatorT<T>::configure(OutputModuleDescription const& desc) {
0192     module().configure(desc);
0193   }
0194 
0195   template <typename T>
0196   edm::SelectedProductsForBranchType const& OutputModuleCommunicatorT<T>::keptProducts() const {
0197     return module().keptProducts();
0198   }
0199 
0200   template <typename T>
0201   void OutputModuleCommunicatorT<T>::selectProducts(edm::ProductRegistry const& preg,
0202                                                     ThinnedAssociationsHelper const& helper,
0203                                                     ProcessBlockHelperBase const& processBlockHelper) {
0204     module().selectProducts(preg, helper, processBlockHelper);
0205   }
0206 
0207   template <typename T>
0208   void OutputModuleCommunicatorT<T>::setEventSelectionInfo(
0209       std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
0210       bool anyProductProduced) {
0211     module().setEventSelectionInfo(outputModulePathPositions, anyProductProduced);
0212   }
0213 
0214   template <typename T>
0215   ModuleDescription const& OutputModuleCommunicatorT<T>::description() const {
0216     return module().description();
0217   }
0218 
0219   namespace impl {
0220     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(void*) {
0221       return std::unique_ptr<edm::OutputModuleCommunicator>{};
0222     }
0223     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::global::OutputModuleBase* iMod) {
0224       return std::make_unique<OutputModuleCommunicatorT<edm::global::OutputModuleBase>>(iMod);
0225     }
0226     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::one::OutputModuleBase* iMod) {
0227       return std::make_unique<OutputModuleCommunicatorT<edm::one::OutputModuleBase>>(iMod);
0228     }
0229     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::limited::OutputModuleBase* iMod) {
0230       return std::make_unique<OutputModuleCommunicatorT<edm::limited::OutputModuleBase>>(iMod);
0231     }
0232   }  // namespace impl
0233 }  // namespace edm
0234 
0235 namespace edm {
0236   template class OutputModuleCommunicatorT<one::OutputModuleBase>;
0237   template class OutputModuleCommunicatorT<global::OutputModuleBase>;
0238   template class OutputModuleCommunicatorT<limited::OutputModuleBase>;
0239 }  // namespace edm