Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-13 02:32:12

0001 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0002 #include "Mixing/Base/src/SecondaryEventProvider.h"
0003 #include "FWCore/Common/interface/ProcessBlockHelper.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0009 #include "FWCore/Utilities/interface/StreamID.h"
0010 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0011 #include "oneapi/tbb/task_arena.h"
0012 
0013 #include <mutex>
0014 
0015 namespace {
0016   template <typename T, typename U>
0017   void processOneOccurrence(edm::WorkerManager& manager,
0018                             typename T::TransitionInfoType& info,
0019                             edm::StreamID streamID,
0020                             typename T::Context const* topContext,
0021                             U const* context,
0022                             bool cleaningUpAfterException = false) {
0023     manager.resetAll();
0024 
0025     if (manager.allWorkers().empty())
0026       return;
0027 
0028     auto token = edm::ServiceRegistry::instance().presentToken();
0029     //we need the arena to guarantee that the syncWait will return to this thread
0030     // and not cause this callstack to possibly be moved to a new thread
0031     tbb::task_arena localArena{tbb::this_task_arena::max_concurrency()};
0032     std::exception_ptr exceptPtr = localArena.execute([&]() {
0033       return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0034         manager.processOneOccurrenceAsync<T, U>(std::move(iHolder), info, token, streamID, topContext, context);
0035       });
0036     });
0037 
0038     if (exceptPtr) {
0039       try {
0040         edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0041       } catch (cms::Exception& ex) {
0042         if (ex.context().empty()) {
0043           edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, cleaningUpAfterException);
0044         } else {
0045           edm::addContextAndPrintException("", ex, cleaningUpAfterException);
0046         }
0047         throw;
0048       }
0049     }
0050   }
0051 }  // namespace
0052 
0053 namespace edm {
0054   SecondaryEventProvider::SecondaryEventProvider(std::vector<ParameterSet>& psets,
0055                                                  SignallingProductRegistryFiller& preg,
0056                                                  std::shared_ptr<ProcessConfiguration> processConfiguration)
0057       : exceptionToActionTable_(new ExceptionToActionTable),
0058         // no type resolver for modules in SecondaryEventProvider for now
0059         workerManager_(std::make_shared<ActivityRegistry>(), *exceptionToActionTable_, nullptr) {
0060     std::vector<std::string> shouldBeUsedLabels;
0061     std::set<std::string> unscheduledLabels;
0062     const PreallocationConfiguration preallocConfig;
0063     for (auto& pset : psets) {
0064       std::string label = pset.getParameter<std::string>("@module_label");
0065       workerManager_.addToUnscheduledWorkers(
0066           pset, preg, &preallocConfig, processConfiguration, label, unscheduledLabels, shouldBeUsedLabels);
0067     }
0068     if (!unscheduledLabels.empty()) {
0069       preg.setUnscheduledProducts(unscheduledLabels);
0070     }
0071   }  // SecondaryEventProvider::SecondaryEventProvider
0072 
0073   void SecondaryEventProvider::beginJob(ProductRegistry const& iRegistry,
0074                                         eventsetup::ESRecordsToProductResolverIndices const& iIndices,
0075                                         GlobalContext const& globalContext) {
0076     ProcessBlockHelper dummyProcessBlockHelper;
0077     workerManager_.beginJob(iRegistry, iIndices, dummyProcessBlockHelper, globalContext);
0078   }
0079 
0080   //NOTE: When the Stream interfaces are propagated to the modules, this code must be updated
0081   // to also send the stream based transitions
0082   void SecondaryEventProvider::beginRun(RunPrincipal& run,
0083                                         const EventSetupImpl& setup,
0084                                         ModuleCallingContext const* mcc,
0085                                         StreamContext& sContext) {
0086     RunTransitionInfo info(run, setup);
0087     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>>(
0088         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0089     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>>(
0090         workerManager_, info, sContext.streamID(), &sContext, mcc);
0091   }
0092 
0093   void SecondaryEventProvider::beginLuminosityBlock(LuminosityBlockPrincipal& lumi,
0094                                                     const EventSetupImpl& setup,
0095                                                     ModuleCallingContext const* mcc,
0096                                                     StreamContext& sContext) {
0097     LumiTransitionInfo info(lumi, setup);
0098     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>>(
0099         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0100     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>>(
0101         workerManager_, info, sContext.streamID(), &sContext, mcc);
0102   }
0103 
0104   void SecondaryEventProvider::endRun(RunPrincipal& run,
0105                                       const EventSetupImpl& setup,
0106                                       ModuleCallingContext const* mcc,
0107                                       StreamContext& sContext) {
0108     RunTransitionInfo info(run, setup);
0109     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>>(
0110         workerManager_, info, sContext.streamID(), &sContext, mcc);
0111     processOneOccurrence<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>>(
0112         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0113   }
0114 
0115   void SecondaryEventProvider::endLuminosityBlock(LuminosityBlockPrincipal& lumi,
0116                                                   const EventSetupImpl& setup,
0117                                                   ModuleCallingContext const* mcc,
0118                                                   StreamContext& sContext) {
0119     LumiTransitionInfo info(lumi, setup);
0120     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>>(
0121         workerManager_, info, sContext.streamID(), &sContext, mcc);
0122     processOneOccurrence<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>>(
0123         workerManager_, info, StreamID::invalidStreamID(), nullptr, mcc);
0124   }
0125 
0126   void SecondaryEventProvider::setupPileUpEvent(EventPrincipal& ep,
0127                                                 const EventSetupImpl& setup,
0128                                                 StreamContext& sContext) {
0129     workerManager_.setupResolvers(ep);
0130     EventTransitionInfo info(ep, setup);
0131     workerManager_.setupOnDemandSystem(info);
0132 
0133     if (workerManager_.unscheduledWorkers().empty()) {
0134       return;
0135     }
0136     auto token = edm::ServiceRegistry::instance().presentToken();
0137     //we need the arena to guarantee that the syncWait will return to this thread
0138     // and not cause this callstack to possibly be moved to a new thread
0139     ParentContext pc(&sContext);
0140     std::exception_ptr exceptPtr = tbb::this_task_arena::isolate([&]() {
0141       return edm::syncWait([&](edm::WaitingTaskHolder&& iHolder) {
0142         for (auto& worker : workerManager_.unscheduledWorkers()) {
0143           worker->doWorkAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0144               iHolder, info, token, sContext.streamID(), pc, &sContext);
0145         }
0146       });
0147     });
0148     if (exceptPtr) {
0149       try {
0150         edm::convertException::wrap([&]() { std::rethrow_exception(exceptPtr); });
0151       } catch (cms::Exception& ex) {
0152         if (ex.context().empty()) {
0153           edm::addContextAndPrintException("Calling SecondaryEventProvider", ex, false);
0154         } else {
0155           edm::addContextAndPrintException("", ex, false);
0156         }
0157         throw;
0158       }
0159     }
0160   }
0161 
0162   void SecondaryEventProvider::beginStream(edm::StreamID iID, StreamContext const& sContext) {
0163     workerManager_.beginStream(iID, sContext);
0164   }
0165 
0166   void SecondaryEventProvider::endStream(edm::StreamID iID,
0167                                          StreamContext const& sContext,
0168                                          ExceptionCollector& exceptionCollector) {
0169     // In this context the mutex is not needed because these things are not
0170     // executing concurrently but in general the WorkerManager needs one.
0171     std::mutex exceptionCollectorMutex;
0172     workerManager_.endStream(iID, sContext, exceptionCollector, exceptionCollectorMutex);
0173   }
0174 }  // namespace edm