File indexing completed on 2025-06-29 23:01:06
0001 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0002 #include "Mixing/Base/src/SecondaryEventProvider.h"
0003 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0008 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0009 #include "FWCore/Framework/interface/ModuleRegistry.h"
0010 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0011 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0012 #include "FWCore/Utilities/interface/StreamID.h"
0013 #include "FWCore/Utilities/interface/make_sentry.h"
0014 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0015 #include "oneapi/tbb/task_arena.h"
0016
0017 #include <mutex>
0018
0019 namespace {
0020 template <typename T, typename U>
0021 void processOneOccurrence(edm::WorkerManager& manager,
0022 typename T::TransitionInfoType& info,
0023 edm::StreamID streamID,
0024 typename T::Context const* topContext,
0025 U const* context,
0026 bool cleaningUpAfterException = false) {
0027 manager.resetAll();
0028
0029 if (manager.allWorkers().empty())
0030 return;
0031
0032 auto token = edm::ServiceRegistry::instance().presentToken();
0033
0034
0035 tbb::task_arena localArena{tbb::this_task_arena::max_concurrency()};
0036 std::exception_ptr exceptPtr = localArena.execute([&]() {
0037 return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0038 manager.processOneOccurrenceAsync<T, U>(std::move(iHolder), info, token, streamID, topContext, context);
0039 });
0040 });
0041
0042 if (exceptPtr) {
0043 try {
0044 edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0045 } catch (cms::Exception& ex) {
0046 if (ex.context().empty()) {
0047 edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, cleaningUpAfterException);
0048 } else {
0049 edm::addContextAndPrintException("", ex, cleaningUpAfterException);
0050 }
0051 throw;
0052 }
0053 }
0054 }
0055 }
0056
0057 namespace edm {
0058 SecondaryEventProvider::SecondaryEventProvider(std::vector<ParameterSet>& psets,
0059 SignallingProductRegistryFiller& preg,
0060 std::shared_ptr<ProcessConfiguration> processConfiguration)
0061 : exceptionToActionTable_(new ExceptionToActionTable),
0062 moduleRegistry_(std::make_shared<ModuleRegistry>(nullptr)),
0063 activityRegistry_(std::make_shared<ActivityRegistry>()),
0064
0065 workerManager_(moduleRegistry_, activityRegistry_, *exceptionToActionTable_) {
0066 std::vector<std::string> shouldBeUsedLabels;
0067 std::set<std::string> unscheduledLabels;
0068 const PreallocationConfiguration preallocConfig;
0069 for (auto& pset : psets) {
0070 std::string label = pset.getParameter<std::string>("@module_label");
0071 workerManager_.addToUnscheduledWorkers(
0072 pset, preg, &preallocConfig, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0073 }
0074 if (!unscheduledLabels.empty()) {
0075 preg.setUnscheduledProducts(unscheduledLabels);
0076 }
0077 }
0078
0079 void SecondaryEventProvider::beginJob(ProductRegistry const& iRegistry,
0080 eventsetup::ESRecordsToProductResolverIndices const& iIndices,
0081 GlobalContext const& globalContext) {
0082 ProcessBlockHelper dummyProcessBlockHelper;
0083 finishModulesInitialization(*moduleRegistry_,
0084 iRegistry,
0085 iIndices,
0086 dummyProcessBlockHelper,
0087 globalContext.processContext()->processConfiguration()->processName());
0088 runBeginJobForModules(globalContext, *moduleRegistry_, *activityRegistry_, modulesThatFailed_);
0089 }
0090
0091
0092
0093 void SecondaryEventProvider::beginRun(RunPrincipal& run,
0094 const EventSetupImpl& setup,
0095 ModuleCallingContext const* mcc,
0096 StreamContext& sContext) {
0097 RunTransitionInfo info(run, setup);
0098 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>>(
0099 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0100 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>>(
0101 workerManager_, info, sContext.streamID(), &sContext, mcc);
0102 }
0103
0104 void SecondaryEventProvider::beginLuminosityBlock(LuminosityBlockPrincipal& lumi,
0105 const EventSetupImpl& setup,
0106 ModuleCallingContext const* mcc,
0107 StreamContext& sContext) {
0108 LumiTransitionInfo info(lumi, setup);
0109 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>>(
0110 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0111 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>>(
0112 workerManager_, info, sContext.streamID(), &sContext, mcc);
0113 }
0114
0115 void SecondaryEventProvider::endRun(RunPrincipal& run,
0116 const EventSetupImpl& setup,
0117 ModuleCallingContext const* mcc,
0118 StreamContext& sContext) {
0119 RunTransitionInfo info(run, setup);
0120 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>>(
0121 workerManager_, info, sContext.streamID(), &sContext, mcc);
0122 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>>(
0123 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0124 }
0125
0126 void SecondaryEventProvider::endLuminosityBlock(LuminosityBlockPrincipal& lumi,
0127 const EventSetupImpl& setup,
0128 ModuleCallingContext const* mcc,
0129 StreamContext& sContext) {
0130 LumiTransitionInfo info(lumi, setup);
0131 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>>(
0132 workerManager_, info, sContext.streamID(), &sContext, mcc);
0133 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>>(
0134 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0135 }
0136
0137 void SecondaryEventProvider::setupPileUpEvent(EventPrincipal& ep,
0138 const EventSetupImpl& setup,
0139 StreamContext& sContext) {
0140 workerManager_.setupResolvers(ep);
0141 EventTransitionInfo info(ep, setup);
0142 workerManager_.setupOnDemandSystem(info);
0143
0144 if (workerManager_.unscheduledWorkers().empty()) {
0145 return;
0146 }
0147 auto token = edm::ServiceRegistry::instance().presentToken();
0148
0149
0150 ParentContext pc(&sContext);
0151 std::exception_ptr exceptPtr = tbb::this_task_arena::isolate([&]() {
0152 return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0153 for (auto& worker : workerManager_.unscheduledWorkers()) {
0154 worker->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0155 iHolder, info, token, sContext.streamID(), pc, &sContext);
0156 }
0157 });
0158 });
0159 if (exceptPtr) {
0160 try {
0161 edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0162 } catch (cms::Exception& ex) {
0163 if (ex.context().empty()) {
0164 edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, false);
0165 } else {
0166 edm::addContextAndPrintException("", ex, false);
0167 }
0168 throw;
0169 }
0170 }
0171 }
0172
0173 void SecondaryEventProvider::beginStream(edm::StreamID iID, StreamContext const& sContext) {
0174
0175 runBeginStreamForModules(sContext, *moduleRegistry_, *activityRegistry_, modulesThatFailed_);
0176 }
0177
0178 void SecondaryEventProvider::endStream(edm::StreamID iID,
0179 StreamContext const& sContext,
0180 ExceptionCollector& exceptionCollector) {
0181
0182
0183 std::mutex exceptionCollectorMutex;
0184
0185 auto sentry = make_sentry(&modulesThatFailed_, [](auto* failed) { failed->clear(); });
0186 runEndStreamForModules(
0187 sContext, *moduleRegistry_, *activityRegistry_, exceptionCollector, exceptionCollectorMutex, modulesThatFailed_);
0188 }
0189
0190 void SecondaryEventProvider::endJob(ExceptionCollector& exceptionCollector, GlobalContext const& globalContext) {
0191 runEndJobForModules(globalContext, *moduleRegistry_, *activityRegistry_, exceptionCollector, modulesThatFailed_);
0192 }
0193
0194 }