File indexing completed on 2025-04-13 22:49:47
0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003
0004 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0005 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0006 #include "FWCore/Framework/interface/EventPrincipal.h"
0007 #include "FWCore/Framework/interface/ExceptionActions.h"
0008 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0009 #include "FWCore/Framework/interface/Frameworkfwd.h"
0010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0012 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/RunPrincipal.h"
0014 #include "FWCore/Framework/interface/WorkerManager.h"
0015 #include "FWCore/Framework/interface/maker/Worker.h"
0016 #include "FWCore/Framework/interface/WorkerRegistry.h"
0017 #include "FWCore/Framework/interface/SignallingProductRegistryFiller.h"
0018 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0019 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0021 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0023 #include "FWCore/Utilities/interface/Algorithms.h"
0024 #include "FWCore/Utilities/interface/BranchType.h"
0025 #include "FWCore/Utilities/interface/ConvertException.h"
0026 #include "FWCore/Utilities/interface/Exception.h"
0027 #include "FWCore/Utilities/interface/StreamID.h"
0028 #include "FWCore/Utilities/interface/propagate_const.h"
0029 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0030 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0031
0032 #include <exception>
0033 #include <map>
0034 #include <memory>
0035 #include <set>
0036 #include <string>
0037 #include <vector>
0038 #include <sstream>
0039 #include "boost/range/adaptor/reversed.hpp"
0040
0041 namespace edm {
0042
0043 class ExceptionCollector;
0044 class PreallocationConfiguration;
0045 class ModuleRegistry;
0046 class TriggerResultInserter;
0047 class PathStatusInserter;
0048 class EndPathStatusInserter;
0049
0050 class GlobalSchedule {
0051 public:
0052 typedef std::vector<std::string> vstring;
0053 typedef std::vector<Worker*> AllWorkers;
0054 typedef std::shared_ptr<Worker> WorkerPtr;
0055 typedef std::vector<Worker*> Workers;
0056
0057 GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0058 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0059 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0060 std::shared_ptr<ModuleRegistry> modReg,
0061 std::vector<std::string> const& modulesToUse,
0062 ParameterSet& proc_pset,
0063 SignallingProductRegistryFiller& pregistry,
0064 PreallocationConfiguration const& prealloc,
0065 ExceptionToActionTable const& actions,
0066 std::shared_ptr<ActivityRegistry> areg,
0067 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0068 ProcessContext const* processContext);
0069 GlobalSchedule(GlobalSchedule const&) = delete;
0070
0071 template <typename T>
0072 void processOneGlobalAsync(WaitingTaskHolder holder,
0073 typename T::TransitionInfoType&,
0074 ServiceToken const& token,
0075 bool cleaningUpAfterException = false);
0076
0077 void beginJob(ProductRegistry const&,
0078 eventsetup::ESRecordsToProductResolverIndices const&,
0079 ProcessBlockHelperBase const&,
0080 ProcessContext const&);
0081 void endJob(ExceptionCollector& collector);
0082
0083
0084
0085
0086
0087
0088
0089 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0090
0091
0092 bool terminate() const;
0093
0094
0095 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0096
0097
0098 void deleteModule(std::string const& iLabel);
0099
0100
0101 AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0102
0103 void releaseMemoryPostLookupSignal();
0104
0105 private:
0106
0107 ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0108
0109 template <typename T>
0110 void preScheduleSignal(GlobalContext const*, ServiceToken const&);
0111
0112 template <typename T>
0113 void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
0114
0115 void handleException(GlobalContext const*,
0116 ServiceWeakToken const&,
0117 bool cleaningUpAfterException,
0118 std::exception_ptr&);
0119
0120 std::vector<WorkerManager> workerManagers_;
0121 std::shared_ptr<ActivityRegistry> actReg_;
0122 std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0123 ProcessContext const* processContext_;
0124
0125
0126
0127
0128 unsigned int numberOfConcurrentLumis_;
0129 unsigned int numberOfConcurrentRuns_;
0130 static constexpr unsigned int numberOfConcurrentProcessBlocks_ = 1;
0131 static constexpr unsigned int numberOfConcurrentJobs_ = 1;
0132 };
0133
0134 template <typename T>
0135 void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0136 typename T::TransitionInfoType& transitionInfo,
0137 ServiceToken const& token,
0138 bool cleaningUpAfterException) {
0139 auto const& principal = transitionInfo.principal();
0140
0141
0142 CMS_SA_ALLOW try {
0143
0144 auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0145
0146 ServiceWeakToken weakToken = token;
0147 auto doneTask = make_waiting_task(
0148 [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0149 std::exception_ptr excpt;
0150 if (iPtr) {
0151 excpt = *iPtr;
0152
0153 handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
0154 }
0155 postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
0156 iHolder.doneWaiting(excpt);
0157 });
0158
0159
0160 WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0161
0162 CMS_SA_ALLOW try {
0163 preScheduleSignal<T>(globalContext.get(), token);
0164
0165 unsigned int managerIndex = principal.index();
0166 if constexpr (T::branchType_ == InRun) {
0167 managerIndex += numberOfConcurrentLumis_;
0168 } else if constexpr (T::branchType_ == InProcess) {
0169 managerIndex += (numberOfConcurrentLumis_ + numberOfConcurrentRuns_);
0170 }
0171 WorkerManager& workerManager = workerManagers_[managerIndex];
0172 workerManager.resetAll();
0173
0174 ParentContext parentContext(globalContext.get());
0175
0176
0177 workerManager.setupResolvers(transitionInfo.principal());
0178
0179 auto& aw = workerManager.allWorkers();
0180 for (Worker* worker : boost::adaptors::reverse(aw)) {
0181 worker->doWorkAsync<T>(
0182 holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0183 }
0184 } catch (...) {
0185 holdForLoop.doneWaiting(std::current_exception());
0186 }
0187 } catch (...) {
0188 iHolder.doneWaiting(std::current_exception());
0189 }
0190 }
0191
0192 template <typename T>
0193 void GlobalSchedule::preScheduleSignal(GlobalContext const* globalContext, ServiceToken const& token) {
0194 if (actReg_) {
0195 try {
0196 ServiceRegistry::Operate op(token);
0197 convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
0198 } catch (cms::Exception& ex) {
0199 exceptionContext(ex, *globalContext, "Handling pre signal, likely in a service function");
0200 throw;
0201 }
0202 }
0203 }
0204
0205 template <typename T>
0206 void GlobalSchedule::postScheduleSignal(GlobalContext const* globalContext,
0207 ServiceWeakToken const& weakToken,
0208 std::exception_ptr& excpt) {
0209 if (actReg_) {
0210 try {
0211 convertException::wrap([this, &weakToken, globalContext]() {
0212 ServiceRegistry::Operate op(weakToken.lock());
0213 T::postScheduleSignal(actReg_.get(), globalContext);
0214 });
0215 } catch (cms::Exception& ex) {
0216 if (not excpt) {
0217 exceptionContext(ex, *globalContext, "Handling post signal, likely in a service function");
0218 excpt = std::current_exception();
0219 }
0220 }
0221 }
0222 }
0223
0224 }
0225
0226 #endif