Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:47:00

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 
0004 #include "DataFormats/Provenance/interface/LuminosityBlockID.h"
0005 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0006 #include "FWCore/Framework/interface/RunPrincipal.h"
0007 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0008 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0009 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0010 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0011 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0012 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0013 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0014 #include "FWCore/Concurrency/interface/FunctorTask.h"
0015 #include "FWCore/Utilities/interface/LuminosityBlockIndex.h"
0016 #include "FWCore/Utilities/interface/make_sentry.h"
0017 
0018 #include "FWCore/Framework/interface/OutputModuleCommunicatorT.h"
0019 
0020 #include "FWCore/Framework/interface/global/OutputModuleBase.h"
0021 #include "FWCore/Framework/interface/one/OutputModuleBase.h"
0022 #include "FWCore/Framework/interface/limited/OutputModuleBase.h"
0023 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0024 
0025 namespace {
0026 
0027   template <typename F>
0028   void async(edm::one::OutputModuleBase& iMod, oneapi::tbb::task_group& iGroup, F&& iFunc) {
0029     iMod.sharedResourcesAcquirer().serialQueueChain().push(iGroup, std::move(iFunc));
0030   }
0031 
0032   template <typename F>
0033   void async(edm::limited::OutputModuleBase& iMod, oneapi::tbb::task_group& iGroup, F&& iFunc) {
0034     iMod.queue().push(iGroup, std::move(iFunc));
0035   }
0036 
0037   template <typename F>
0038   void async(edm::global::OutputModuleBase&, oneapi::tbb::task_group& iGroup, F iFunc) {
0039     //NOTE, need the functor since group can not run a 'mutable' lambda
0040     auto t = edm::make_functor_task(iFunc);
0041     iGroup.run([t]() {
0042       edm::TaskSentry s(t);
0043       t->execute();
0044     });
0045   }
0046 }  // namespace
0047 
0048 namespace edm {
0049 
0050   template <typename T>
0051   void OutputModuleCommunicatorT<T>::closeFile() {
0052     module().doCloseFile();
0053   }
0054 
0055   template <typename T>
0056   bool OutputModuleCommunicatorT<T>::shouldWeCloseFile() const {
0057     return module().shouldWeCloseFile();
0058   }
0059 
0060   template <typename T>
0061   void OutputModuleCommunicatorT<T>::openFile(edm::FileBlock const& fb) {
0062     module().doOpenFile(fb);
0063   }
0064 
0065   template <typename T>
0066   void OutputModuleCommunicatorT<T>::writeProcessBlockAsync(WaitingTaskHolder iTask,
0067                                                             ProcessBlockPrincipal const& processBlockPrincipal,
0068                                                             ProcessContext const* processContext,
0069                                                             ActivityRegistry* activityRegistry) {
0070     auto token = ServiceRegistry::instance().presentToken();
0071     GlobalContext globalContext(GlobalContext::Transition::kWriteProcessBlock,
0072                                 LuminosityBlockID(),
0073                                 RunIndex::invalidRunIndex(),
0074                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
0075                                 Timestamp::invalidTimestamp(),
0076                                 processContext);
0077     auto t = [&mod = module(),
0078               &processBlockPrincipal,
0079               globalContext,
0080               token,
0081               desc = &description(),
0082               activityRegistry,
0083               iTask]() mutable {
0084       std::exception_ptr ex;
0085       // Caught exception is propagated via WaitingTaskHolder
0086       CMS_SA_ALLOW try {
0087         ServiceRegistry::Operate op(token);
0088         ParentContext parentContext(&globalContext);
0089         ModuleCallingContext mcc(desc);
0090         ModuleContextSentry moduleContextSentry(&mcc, parentContext);
0091         activityRegistry->preModuleWriteProcessBlockSignal_(globalContext, mcc);
0092         auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
0093           ar->postModuleWriteProcessBlockSignal_(globalContext, mcc);
0094         }));
0095         mod.doWriteProcessBlock(processBlockPrincipal, &mcc);
0096       } catch (...) {
0097         ex = std::current_exception();
0098       }
0099       iTask.doneWaiting(ex);
0100     };
0101     async(module(), *iTask.group(), std::move(t));
0102   }
0103 
0104   template <typename T>
0105   void OutputModuleCommunicatorT<T>::writeRunAsync(WaitingTaskHolder iTask,
0106                                                    edm::RunPrincipal const& rp,
0107                                                    ProcessContext const* processContext,
0108                                                    ActivityRegistry* activityRegistry,
0109                                                    MergeableRunProductMetadata const* mergeableRunProductMetadata) {
0110     auto token = ServiceRegistry::instance().presentToken();
0111     GlobalContext globalContext(GlobalContext::Transition::kWriteRun,
0112                                 LuminosityBlockID(rp.run(), 0),
0113                                 rp.index(),
0114                                 LuminosityBlockIndex::invalidLuminosityBlockIndex(),
0115                                 rp.endTime(),
0116                                 processContext);
0117     auto t = [&mod = module(),
0118               &rp,
0119               globalContext,
0120               token,
0121               desc = &description(),
0122               activityRegistry,
0123               mergeableRunProductMetadata,
0124               iTask]() mutable {
0125       std::exception_ptr ex;
0126       // Caught exception is propagated via WaitingTaskHolder
0127       CMS_SA_ALLOW try {
0128         ServiceRegistry::Operate op(token);
0129         ParentContext parentContext(&globalContext);
0130         ModuleCallingContext mcc(desc);
0131         ModuleContextSentry moduleContextSentry(&mcc, parentContext);
0132         activityRegistry->preModuleWriteRunSignal_(globalContext, mcc);
0133         auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
0134           ar->postModuleWriteRunSignal_(globalContext, mcc);
0135         }));
0136         mod.doWriteRun(rp, &mcc, mergeableRunProductMetadata);
0137       } catch (...) {
0138         ex = std::current_exception();
0139       }
0140       iTask.doneWaiting(ex);
0141     };
0142     async(module(), *iTask.group(), std::move(t));
0143   }
0144 
0145   template <typename T>
0146   void OutputModuleCommunicatorT<T>::writeLumiAsync(WaitingTaskHolder iTask,
0147                                                     edm::LuminosityBlockPrincipal const& lbp,
0148                                                     ProcessContext const* processContext,
0149                                                     ActivityRegistry* activityRegistry) {
0150     auto token = ServiceRegistry::instance().presentToken();
0151     GlobalContext globalContext(GlobalContext::Transition::kWriteLuminosityBlock,
0152                                 lbp.id(),
0153                                 lbp.runPrincipal().index(),
0154                                 lbp.index(),
0155                                 lbp.beginTime(),
0156                                 processContext);
0157     auto t = [&mod = module(), &lbp, activityRegistry, token, globalContext, desc = &description(), iTask]() mutable {
0158       std::exception_ptr ex;
0159       // Caught exception is propagated via WaitingTaskHolder
0160       CMS_SA_ALLOW try {
0161         ServiceRegistry::Operate op(token);
0162 
0163         ParentContext parentContext(&globalContext);
0164         ModuleCallingContext mcc(desc);
0165         ModuleContextSentry moduleContextSentry(&mcc, parentContext);
0166         activityRegistry->preModuleWriteLumiSignal_(globalContext, mcc);
0167         auto sentry(make_sentry(activityRegistry, [&globalContext, &mcc](ActivityRegistry* ar) {
0168           ar->postModuleWriteLumiSignal_(globalContext, mcc);
0169         }));
0170         mod.doWriteLuminosityBlock(lbp, &mcc);
0171       } catch (...) {
0172         ex = std::current_exception();
0173       }
0174       iTask.doneWaiting(ex);
0175     };
0176     async(module(), *iTask.group(), std::move(t));
0177   }
0178 
0179   template <typename T>
0180   bool OutputModuleCommunicatorT<T>::wantAllEvents() const {
0181     return module().wantAllEvents();
0182   }
0183 
0184   template <typename T>
0185   bool OutputModuleCommunicatorT<T>::limitReached() const {
0186     return module().limitReached();
0187   }
0188 
0189   template <typename T>
0190   void OutputModuleCommunicatorT<T>::configure(OutputModuleDescription const& desc) {
0191     module().configure(desc);
0192   }
0193 
0194   template <typename T>
0195   edm::SelectedProductsForBranchType const& OutputModuleCommunicatorT<T>::keptProducts() const {
0196     return module().keptProducts();
0197   }
0198 
0199   template <typename T>
0200   void OutputModuleCommunicatorT<T>::selectProducts(edm::ProductRegistry const& preg,
0201                                                     ThinnedAssociationsHelper const& helper,
0202                                                     ProcessBlockHelperBase const& processBlockHelper) {
0203     module().selectProducts(preg, helper, processBlockHelper);
0204   }
0205 
0206   template <typename T>
0207   void OutputModuleCommunicatorT<T>::setEventSelectionInfo(
0208       std::map<std::string, std::vector<std::pair<std::string, int>>> const& outputModulePathPositions,
0209       bool anyProductProduced) {
0210     module().setEventSelectionInfo(outputModulePathPositions, anyProductProduced);
0211   }
0212 
0213   template <typename T>
0214   ModuleDescription const& OutputModuleCommunicatorT<T>::description() const {
0215     return module().description();
0216   }
0217 
0218   namespace impl {
0219     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(void*) {
0220       return std::unique_ptr<edm::OutputModuleCommunicator>{};
0221     }
0222     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::global::OutputModuleBase* iMod) {
0223       return std::make_unique<OutputModuleCommunicatorT<edm::global::OutputModuleBase>>(iMod);
0224     }
0225     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::one::OutputModuleBase* iMod) {
0226       return std::make_unique<OutputModuleCommunicatorT<edm::one::OutputModuleBase>>(iMod);
0227     }
0228     std::unique_ptr<edm::OutputModuleCommunicator> createCommunicatorIfNeeded(::edm::limited::OutputModuleBase* iMod) {
0229       return std::make_unique<OutputModuleCommunicatorT<edm::limited::OutputModuleBase>>(iMod);
0230     }
0231   }  // namespace impl
0232 }  // namespace edm
0233 
0234 namespace edm {
0235   template class OutputModuleCommunicatorT<one::OutputModuleBase>;
0236   template class OutputModuleCommunicatorT<global::OutputModuleBase>;
0237   template class OutputModuleCommunicatorT<limited::OutputModuleBase>;
0238 }  // namespace edm