Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 23:01:06

0001 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0002 #include "Mixing/Base/src/SecondaryEventProvider.h"
0003 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0008 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0009 #include "FWCore/Framework/interface/ModuleRegistry.h"
0010 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0011 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0012 #include "FWCore/Utilities/interface/StreamID.h"
0013 #include "FWCore/Utilities/interface/make_sentry.h"
0014 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0015 #include "oneapi/tbb/task_arena.h"
0016 
0017 #include <mutex>
0018 
0019 namespace {
0020   template <typename T, typename U>
0021   void processOneOccurrence(edm::WorkerManager& manager,
0022                             typename T::TransitionInfoType& info,
0023                             edm::StreamID streamID,
0024                             typename T::Context const* topContext,
0025                             U const* context,
0026                             bool cleaningUpAfterException = false) {
0027     manager.resetAll();
0028 
0029     if (manager.allWorkers().empty())
0030       return;
0031 
0032     auto token = edm::ServiceRegistry::instance().presentToken();
0033     //we need the arena to guarantee that the syncWait will return to this thread
0034     // and not cause this callstack to possibly be moved to a new thread
0035     tbb::task_arena localArena{tbb::this_task_arena::max_concurrency()};
0036     std::exception_ptr exceptPtr = localArena.execute([&]() {
0037       return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0038         manager.processOneOccurrenceAsync<T, U>(std::move(iHolder), info, token, streamID, topContext, context);
0039       });
0040     });
0041 
0042     if (exceptPtr) {
0043       try {
0044         edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0045       } catch (cms::Exception& ex) {
0046         if (ex.context().empty()) {
0047           edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, cleaningUpAfterException);
0048         } else {
0049           edm::addContextAndPrintException("", ex, cleaningUpAfterException);
0050         }
0051         throw;
0052       }
0053     }
0054   }
0055 }  // namespace
0056 
0057 namespace edm {
0058   SecondaryEventProvider::SecondaryEventProvider(std::vector<ParameterSet>& psets,
0059                                                  SignallingProductRegistryFiller& preg,
0060                                                  std::shared_ptr<ProcessConfiguration> processConfiguration)
0061       : exceptionToActionTable_(new ExceptionToActionTable),
0062         moduleRegistry_(std::make_shared<ModuleRegistry>(nullptr)),
0063         activityRegistry_(std::make_shared<ActivityRegistry>()),
0064         // no type resolver for modules in SecondaryEventProvider for now
0065         workerManager_(moduleRegistry_, activityRegistry_, *exceptionToActionTable_) {
0066     std::vector<std::string> shouldBeUsedLabels;
0067     std::set<std::string> unscheduledLabels;
0068     const PreallocationConfiguration preallocConfig;
0069     for (auto& pset : psets) {
0070       std::string label = pset.getParameter<std::string>("@module_label");
0071       workerManager_.addToUnscheduledWorkers(
0072           pset, preg, &preallocConfig, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0073     }
0074     if (!unscheduledLabels.empty()) {
0075       preg.setUnscheduledProducts(unscheduledLabels);
0076     }
0077   }  // SecondaryEventProvider::SecondaryEventProvider
0078 
0079   void SecondaryEventProvider::beginJob(ProductRegistry const& iRegistry,
0080                                         eventsetup::ESRecordsToProductResolverIndices const& iIndices,
0081                                         GlobalContext const& globalContext) {
0082     ProcessBlockHelper dummyProcessBlockHelper;
0083     finishModulesInitialization(*moduleRegistry_,
0084                                 iRegistry,
0085                                 iIndices,
0086                                 dummyProcessBlockHelper,
0087                                 globalContext.processContext()->processConfiguration()->processName());
0088     runBeginJobForModules(globalContext, *moduleRegistry_, *activityRegistry_, modulesThatFailed_);
0089   }
0090 
0091   //NOTE: When the Stream interfaces are propagated to the modules, this code must be updated
0092   // to also send the stream based transitions
0093   void SecondaryEventProvider::beginRun(RunPrincipal& run,
0094                                         const EventSetupImpl& setup,
0095                                         ModuleCallingContext const* mcc,
0096                                         StreamContext& sContext) {
0097     RunTransitionInfo info(run, setup);
0098     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>>(
0099         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0100     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>>(
0101         workerManager_, info, sContext.streamID(), &sContext, mcc);
0102   }
0103 
0104   void SecondaryEventProvider::beginLuminosityBlock(LuminosityBlockPrincipal& lumi,
0105                                                     const EventSetupImpl& setup,
0106                                                     ModuleCallingContext const* mcc,
0107                                                     StreamContext& sContext) {
0108     LumiTransitionInfo info(lumi, setup);
0109     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>>(
0110         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0111     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>>(
0112         workerManager_, info, sContext.streamID(), &sContext, mcc);
0113   }
0114 
0115   void SecondaryEventProvider::endRun(RunPrincipal& run,
0116                                       const EventSetupImpl& setup,
0117                                       ModuleCallingContext const* mcc,
0118                                       StreamContext& sContext) {
0119     RunTransitionInfo info(run, setup);
0120     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>>(
0121         workerManager_, info, sContext.streamID(), &sContext, mcc);
0122     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>>(
0123         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0124   }
0125 
0126   void SecondaryEventProvider::endLuminosityBlock(LuminosityBlockPrincipal& lumi,
0127                                                   const EventSetupImpl& setup,
0128                                                   ModuleCallingContext const* mcc,
0129                                                   StreamContext& sContext) {
0130     LumiTransitionInfo info(lumi, setup);
0131     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>>(
0132         workerManager_, info, sContext.streamID(), &sContext, mcc);
0133     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>>(
0134         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0135   }
0136 
0137   void SecondaryEventProvider::setupPileUpEvent(EventPrincipal& ep,
0138                                                 const EventSetupImpl& setup,
0139                                                 StreamContext& sContext) {
0140     workerManager_.setupResolvers(ep);
0141     EventTransitionInfo info(ep, setup);
0142     workerManager_.setupOnDemandSystem(info);
0143 
0144     if (workerManager_.unscheduledWorkers().empty()) {
0145       return;
0146     }
0147     auto token = edm::ServiceRegistry::instance().presentToken();
0148     //we need the arena to guarantee that the syncWait will return to this thread
0149     // and not cause this callstack to possibly be moved to a new thread
0150     ParentContext pc(&sContext);
0151     std::exception_ptr exceptPtr = tbb::this_task_arena::isolate([&]() {
0152       return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0153         for (auto& worker : workerManager_.unscheduledWorkers()) {
0154           worker->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0155               iHolder, info, token, sContext.streamID(), pc, &sContext);
0156         }
0157       });
0158     });
0159     if (exceptPtr) {
0160       try {
0161         edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0162       } catch (cms::Exception& ex) {
0163         if (ex.context().empty()) {
0164           edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, false);
0165         } else {
0166           edm::addContextAndPrintException("", ex, false);
0167         }
0168         throw;
0169       }
0170     }
0171   }
0172 
0173   void SecondaryEventProvider::beginStream(edm::StreamID iID, StreamContext const& sContext) {
0174     //can reuse modulesThatFailed_ since we can't get here of failed in beginRun
0175     runBeginStreamForModules(sContext, *moduleRegistry_, *activityRegistry_, modulesThatFailed_);
0176   }
0177 
0178   void SecondaryEventProvider::endStream(edm::StreamID iID,
0179                                          StreamContext const& sContext,
0180                                          ExceptionCollector& exceptionCollector) {
0181     // In this context the mutex is not needed because these things are not
0182     // executing concurrently but in general the WorkerManager needs one.
0183     std::mutex exceptionCollectorMutex;
0184     //modulesThatFailed_ gets used in endJob and we can only get here if endJob succeeded
0185     auto sentry = make_sentry(&modulesThatFailed_, [](auto* failed) { failed->clear(); });
0186     runEndStreamForModules(
0187         sContext, *moduleRegistry_, *activityRegistry_, exceptionCollector, exceptionCollectorMutex, modulesThatFailed_);
0188   }
0189 
0190   void SecondaryEventProvider::endJob(ExceptionCollector& exceptionCollector, GlobalContext const& globalContext) {
0191     runEndJobForModules(globalContext, *moduleRegistry_, *activityRegistry_, exceptionCollector, modulesThatFailed_);
0192   }
0193 
0194 }  // namespace edm