File indexing completed on 2025-03-13 02:32:12
0001 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0002 #include "Mixing/Base/src/SecondaryEventProvider.h"
0003 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0009 #include "FWCore/Utilities/interface/StreamID.h"
0010 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0011 #include "oneapi/tbb/task_arena.h"
0012
0013 #include <mutex>
0014
0015 namespace {
0016 template <typename T, typename U>
0017 void processOneOccurrence(edm::WorkerManager& manager,
0018 typename T::TransitionInfoType& info,
0019 edm::StreamID streamID,
0020 typename T::Context const* topContext,
0021 U const* context,
0022 bool cleaningUpAfterException = false) {
0023 manager.resetAll();
0024
0025 if (manager.allWorkers().empty())
0026 return;
0027
0028 auto token = edm::ServiceRegistry::instance().presentToken();
0029
0030
0031 tbb::task_arena localArena{tbb::this_task_arena::max_concurrency()};
0032 std::exception_ptr exceptPtr = localArena.execute([&]() {
0033 return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0034 manager.processOneOccurrenceAsync<T, U>(std::move(iHolder), info, token, streamID, topContext, context);
0035 });
0036 });
0037
0038 if (exceptPtr) {
0039 try {
0040 edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0041 } catch (cms::Exception& ex) {
0042 if (ex.context().empty()) {
0043 edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, cleaningUpAfterException);
0044 } else {
0045 edm::addContextAndPrintException("", ex, cleaningUpAfterException);
0046 }
0047 throw;
0048 }
0049 }
0050 }
0051 }
0052
0053 namespace edm {
0054 SecondaryEventProvider::SecondaryEventProvider(std::vector<ParameterSet>& psets,
0055 SignallingProductRegistryFiller& preg,
0056 std::shared_ptr<ProcessConfiguration> processConfiguration)
0057 : exceptionToActionTable_(new ExceptionToActionTable),
0058
0059 workerManager_(std::make_shared<ActivityRegistry>(), *exceptionToActionTable_, nullptr) {
0060 std::vector<std::string> shouldBeUsedLabels;
0061 std::set<std::string> unscheduledLabels;
0062 const PreallocationConfiguration preallocConfig;
0063 for (auto& pset : psets) {
0064 std::string label = pset.getParameter<std::string>("@module_label");
0065 workerManager_.addToUnscheduledWorkers(
0066 pset, preg, &preallocConfig, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0067 }
0068 if (!unscheduledLabels.empty()) {
0069 preg.setUnscheduledProducts(unscheduledLabels);
0070 }
0071 }
0072
0073 void SecondaryEventProvider::beginJob(ProductRegistry const& iRegistry,
0074 eventsetup::ESRecordsToProductResolverIndices const& iIndices,
0075 GlobalContext const& globalContext) {
0076 ProcessBlockHelper dummyProcessBlockHelper;
0077 workerManager_.beginJob(iRegistry, iIndices, dummyProcessBlockHelper, globalContext);
0078 }
0079
0080
0081
0082 void SecondaryEventProvider::beginRun(RunPrincipal& run,
0083 const EventSetupImpl& setup,
0084 ModuleCallingContext const* mcc,
0085 StreamContext& sContext) {
0086 RunTransitionInfo info(run, setup);
0087 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>>(
0088 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0089 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>>(
0090 workerManager_, info, sContext.streamID(), &sContext, mcc);
0091 }
0092
0093 void SecondaryEventProvider::beginLuminosityBlock(LuminosityBlockPrincipal& lumi,
0094 const EventSetupImpl& setup,
0095 ModuleCallingContext const* mcc,
0096 StreamContext& sContext) {
0097 LumiTransitionInfo info(lumi, setup);
0098 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>>(
0099 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0100 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>>(
0101 workerManager_, info, sContext.streamID(), &sContext, mcc);
0102 }
0103
0104 void SecondaryEventProvider::endRun(RunPrincipal& run,
0105 const EventSetupImpl& setup,
0106 ModuleCallingContext const* mcc,
0107 StreamContext& sContext) {
0108 RunTransitionInfo info(run, setup);
0109 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>>(
0110 workerManager_, info, sContext.streamID(), &sContext, mcc);
0111 processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>>(
0112 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0113 }
0114
0115 void SecondaryEventProvider::endLuminosityBlock(LuminosityBlockPrincipal& lumi,
0116 const EventSetupImpl& setup,
0117 ModuleCallingContext const* mcc,
0118 StreamContext& sContext) {
0119 LumiTransitionInfo info(lumi, setup);
0120 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>>(
0121 workerManager_, info, sContext.streamID(), &sContext, mcc);
0122 processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>>(
0123 workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0124 }
0125
0126 void SecondaryEventProvider::setupPileUpEvent(EventPrincipal& ep,
0127 const EventSetupImpl& setup,
0128 StreamContext& sContext) {
0129 workerManager_.setupResolvers(ep);
0130 EventTransitionInfo info(ep, setup);
0131 workerManager_.setupOnDemandSystem(info);
0132
0133 if (workerManager_.unscheduledWorkers().empty()) {
0134 return;
0135 }
0136 auto token = edm::ServiceRegistry::instance().presentToken();
0137
0138
0139 ParentContext pc(&sContext);
0140 std::exception_ptr exceptPtr = tbb::this_task_arena::isolate([&]() {
0141 return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0142 for (auto& worker : workerManager_.unscheduledWorkers()) {
0143 worker->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0144 iHolder, info, token, sContext.streamID(), pc, &sContext);
0145 }
0146 });
0147 });
0148 if (exceptPtr) {
0149 try {
0150 edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0151 } catch (cms::Exception& ex) {
0152 if (ex.context().empty()) {
0153 edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, false);
0154 } else {
0155 edm::addContextAndPrintException("", ex, false);
0156 }
0157 throw;
0158 }
0159 }
0160 }
0161
0162 void SecondaryEventProvider::beginStream(edm::StreamID iID, StreamContext const& sContext) {
0163 workerManager_.beginStream(iID, sContext);
0164 }
0165
0166 void SecondaryEventProvider::endStream(edm::StreamID iID,
0167 StreamContext const& sContext,
0168 ExceptionCollector& exceptionCollector) {
0169
0170
0171 std::mutex exceptionCollectorMutex;
0172 workerManager_.endStream(iID, sContext, exceptionCollector, exceptionCollectorMutex);
0173 }
0174 }