Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-25 09:56:59

0001 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0002 #include "Mixing/Base/src/SecondaryEventProvider.h"
0003 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0006 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/Utilities/interface/StreamID.h"
0009 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0010 #include "oneapi/tbb/task_arena.h"
0011 
0012 namespace {
0013   template <typename T, typename U>
0014   void processOneOccurrence(edm::WorkerManager& manager,
0015                             typename T::TransitionInfoType& info,
0016                             edm::StreamID streamID,
0017                             typename T::Context const* topContext,
0018                             U const* context,
0019                             bool cleaningUpAfterException = false) {
0020     manager.resetAll();
0021 
0022     if (manager.allWorkers().empty())
0023       return;
0024 
0025     auto token = edm::ServiceRegistry::instance().presentToken();
0026     //we need the arena to guarantee that the syncWait will return to this thread
0027     // and not cause this callstack to possibly be moved to a new thread
0028     tbb::task_arena localArena{tbb::this_task_arena::max_concurrency()};
0029     std::exception_ptr exceptPtr = localArena.execute([&]() {
0030       return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0031         manager.processOneOccurrenceAsync<T, U>(std::move(iHolder), info, token, streamID, topContext, context);
0032       });
0033     });
0034 
0035     if (exceptPtr) {
0036       try {
0037         edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0038       } catch (cms::Exception& ex) {
0039         if (ex.context().empty()) {
0040           edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, cleaningUpAfterException);
0041         } else {
0042           edm::addContextAndPrintException("", ex, cleaningUpAfterException);
0043         }
0044         throw;
0045       }
0046     }
0047   }
0048 }  // namespace
0049 
0050 namespace edm {
0051   SecondaryEventProvider::SecondaryEventProvider(std::vector<ParameterSet>& psets,
0052                                                  ProductRegistry& preg,
0053                                                  std::shared_ptr<ProcessConfiguration> processConfiguration)
0054       : exceptionToActionTable_(new ExceptionToActionTable),
0055         // no type resolver for modules in SecondaryEventProvider for now
0056         workerManager_(std::make_shared<ActivityRegistry>(), *exceptionToActionTable_, nullptr) {
0057     std::vector<std::string> shouldBeUsedLabels;
0058     std::set<std::string> unscheduledLabels;
0059     const PreallocationConfiguration preallocConfig;
0060     for (auto& pset : psets) {
0061       std::string label = pset.getParameter<std::string>("@module_label");
0062       workerManager_.addToUnscheduledWorkers(
0063           pset, preg, &preallocConfig, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0064     }
0065     if (!unscheduledLabels.empty()) {
0066       preg.setUnscheduledProducts(unscheduledLabels);
0067     }
0068   }  // SecondaryEventProvider::SecondaryEventProvider
0069 
0070   void SecondaryEventProvider::beginJob(ProductRegistry const& iRegistry,
0071                                         eventsetup::ESRecordsToProductResolverIndices const& iIndices) {
0072     ProcessBlockHelper dummyProcessBlockHelper;
0073     workerManager_.beginJob(iRegistry, iIndices, dummyProcessBlockHelper);
0074   }
0075 
0076   //NOTE: When the Stream interfaces are propagated to the modules, this code must be updated
0077   // to also send the stream based transitions
0078   void SecondaryEventProvider::beginRun(RunPrincipal& run,
0079                                         const EventSetupImpl& setup,
0080                                         ModuleCallingContext const* mcc,
0081                                         StreamContext& sContext) {
0082     RunTransitionInfo info(run, setup);
0083     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>>(
0084         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0085     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>>(
0086         workerManager_, info, sContext.streamID(), &sContext, mcc);
0087   }
0088 
0089   void SecondaryEventProvider::beginLuminosityBlock(LuminosityBlockPrincipal& lumi,
0090                                                     const EventSetupImpl& setup,
0091                                                     ModuleCallingContext const* mcc,
0092                                                     StreamContext& sContext) {
0093     LumiTransitionInfo info(lumi, setup);
0094     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>>(
0095         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0096     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>>(
0097         workerManager_, info, sContext.streamID(), &sContext, mcc);
0098   }
0099 
0100   void SecondaryEventProvider::endRun(RunPrincipal& run,
0101                                       const EventSetupImpl& setup,
0102                                       ModuleCallingContext const* mcc,
0103                                       StreamContext& sContext) {
0104     RunTransitionInfo info(run, setup);
0105     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>>(
0106         workerManager_, info, sContext.streamID(), &sContext, mcc);
0107     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>>(
0108         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0109   }
0110 
0111   void SecondaryEventProvider::endLuminosityBlock(LuminosityBlockPrincipal& lumi,
0112                                                   const EventSetupImpl& setup,
0113                                                   ModuleCallingContext const* mcc,
0114                                                   StreamContext& sContext) {
0115     LumiTransitionInfo info(lumi, setup);
0116     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>>(
0117         workerManager_, info, sContext.streamID(), &sContext, mcc);
0118     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>>(
0119         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0120   }
0121 
0122   void SecondaryEventProvider::setupPileUpEvent(EventPrincipal& ep,
0123                                                 const EventSetupImpl& setup,
0124                                                 StreamContext& sContext) {
0125     workerManager_.setupResolvers(ep);
0126     EventTransitionInfo info(ep, setup);
0127     workerManager_.setupOnDemandSystem(info);
0128 
0129     if (workerManager_.unscheduledWorkers().empty()) {
0130       return;
0131     }
0132     auto token = edm::ServiceRegistry::instance().presentToken();
0133     //we need the arena to guarantee that the syncWait will return to this thread
0134     // and not cause this callstack to possibly be moved to a new thread
0135     ParentContext pc(&sContext);
0136     std::exception_ptr exceptPtr = tbb::this_task_arena::isolate([&]() {
0137       return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0138         for (auto& worker : workerManager_.unscheduledWorkers()) {
0139           worker->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0140               iHolder, info, token, sContext.streamID(), pc, &sContext);
0141         }
0142       });
0143     });
0144     if (exceptPtr) {
0145       try {
0146         edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0147       } catch (cms::Exception& ex) {
0148         if (ex.context().empty()) {
0149           edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, false);
0150         } else {
0151           edm::addContextAndPrintException("", ex, false);
0152         }
0153         throw;
0154       }
0155     }
0156   }
0157 
0158   void SecondaryEventProvider::beginStream(edm::StreamID iID, StreamContext& sContext) {
0159     workerManager_.beginStream(iID, sContext);
0160   }
0161 
0162   void SecondaryEventProvider::endStream(edm::StreamID iID, StreamContext& sContext) {
0163     workerManager_.endStream(iID, sContext);
0164   }
0165 }  // namespace edm