File indexing completed on 2024-07-28 22:48:22
0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003
0004 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0005 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0006 #include "FWCore/Framework/interface/EventPrincipal.h"
0007 #include "FWCore/Framework/interface/ExceptionActions.h"
0008 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0009 #include "FWCore/Framework/interface/Frameworkfwd.h"
0010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0012 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/RunPrincipal.h"
0014 #include "FWCore/Framework/interface/WorkerManager.h"
0015 #include "FWCore/Framework/interface/maker/Worker.h"
0016 #include "FWCore/Framework/interface/WorkerRegistry.h"
0017 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0018 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0019 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0021 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0022 #include "FWCore/Utilities/interface/Algorithms.h"
0023 #include "FWCore/Utilities/interface/BranchType.h"
0024 #include "FWCore/Utilities/interface/ConvertException.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/StreamID.h"
0027 #include "FWCore/Utilities/interface/propagate_const.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0030
0031 #include <exception>
0032 #include <map>
0033 #include <memory>
0034 #include <set>
0035 #include <string>
0036 #include <vector>
0037 #include <sstream>
0038 #include "boost/range/adaptor/reversed.hpp"
0039
0040 namespace edm {
0041
0042 class ExceptionCollector;
0043 class PreallocationConfiguration;
0044 class ModuleRegistry;
0045 class TriggerResultInserter;
0046 class PathStatusInserter;
0047 class EndPathStatusInserter;
0048
0049 class GlobalSchedule {
0050 public:
0051 typedef std::vector<std::string> vstring;
0052 typedef std::vector<Worker*> AllWorkers;
0053 typedef std::shared_ptr<Worker> WorkerPtr;
0054 typedef std::vector<Worker*> Workers;
0055
0056 GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0057 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0058 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0059 std::shared_ptr<ModuleRegistry> modReg,
0060 std::vector<std::string> const& modulesToUse,
0061 ParameterSet& proc_pset,
0062 ProductRegistry& pregistry,
0063 PreallocationConfiguration const& prealloc,
0064 ExceptionToActionTable const& actions,
0065 std::shared_ptr<ActivityRegistry> areg,
0066 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0067 ProcessContext const* processContext);
0068 GlobalSchedule(GlobalSchedule const&) = delete;
0069
0070 template <typename T>
0071 void processOneGlobalAsync(WaitingTaskHolder holder,
0072 typename T::TransitionInfoType&,
0073 ServiceToken const& token,
0074 bool cleaningUpAfterException = false);
0075
0076 void beginJob(ProductRegistry const&,
0077 eventsetup::ESRecordsToProductResolverIndices const&,
0078 ProcessBlockHelperBase const&,
0079 PathsAndConsumesOfModulesBase const&,
0080 ProcessContext const&);
0081 void endJob(ExceptionCollector& collector);
0082
0083
0084
0085
0086
0087
0088
0089 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0090
0091
0092 bool terminate() const;
0093
0094
0095 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0096
0097
0098 void deleteModule(std::string const& iLabel);
0099
0100
0101 AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0102
0103 private:
0104
0105 ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0106
0107 template <typename T>
0108 void preScheduleSignal(GlobalContext const*, ServiceToken const&);
0109
0110 template <typename T>
0111 void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
0112
0113 void handleException(GlobalContext const*,
0114 ServiceWeakToken const&,
0115 bool cleaningUpAfterException,
0116 std::exception_ptr&);
0117
0118 std::vector<WorkerManager> workerManagers_;
0119 std::shared_ptr<ActivityRegistry> actReg_;
0120 std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0121 ProcessContext const* processContext_;
0122
0123
0124
0125
0126 unsigned int numberOfConcurrentLumis_;
0127 unsigned int numberOfConcurrentRuns_;
0128 static constexpr unsigned int numberOfConcurrentProcessBlocks_ = 1;
0129 static constexpr unsigned int numberOfConcurrentJobs_ = 1;
0130 };
0131
0132 template <typename T>
0133 void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0134 typename T::TransitionInfoType& transitionInfo,
0135 ServiceToken const& token,
0136 bool cleaningUpAfterException) {
0137 auto const& principal = transitionInfo.principal();
0138
0139
0140 CMS_SA_ALLOW try {
0141
0142 auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0143
0144 ServiceWeakToken weakToken = token;
0145 auto doneTask = make_waiting_task(
0146 [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0147 std::exception_ptr excpt;
0148 if (iPtr) {
0149 excpt = *iPtr;
0150
0151 handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
0152 }
0153 postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
0154 iHolder.doneWaiting(excpt);
0155 });
0156
0157
0158 WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0159
0160 CMS_SA_ALLOW try {
0161 preScheduleSignal<T>(globalContext.get(), token);
0162
0163 unsigned int managerIndex = principal.index();
0164 if constexpr (T::branchType_ == InRun) {
0165 managerIndex += numberOfConcurrentLumis_;
0166 } else if constexpr (T::branchType_ == InProcess) {
0167 managerIndex += (numberOfConcurrentLumis_ + numberOfConcurrentRuns_);
0168 }
0169 WorkerManager& workerManager = workerManagers_[managerIndex];
0170 workerManager.resetAll();
0171
0172 ParentContext parentContext(globalContext.get());
0173
0174
0175 workerManager.setupResolvers(transitionInfo.principal());
0176
0177 auto& aw = workerManager.allWorkers();
0178 for (Worker* worker : boost::adaptors::reverse(aw)) {
0179 worker->doWorkAsync<T>(
0180 holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0181 }
0182 } catch (...) {
0183 holdForLoop.doneWaiting(std::current_exception());
0184 }
0185 } catch (...) {
0186 iHolder.doneWaiting(std::current_exception());
0187 }
0188 }
0189
0190 template <typename T>
0191 void GlobalSchedule::preScheduleSignal(GlobalContext const* globalContext, ServiceToken const& token) {
0192 if (actReg_) {
0193 try {
0194 ServiceRegistry::Operate op(token);
0195 convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
0196 } catch (cms::Exception& ex) {
0197 exceptionContext(ex, *globalContext, "Handling pre signal, likely in a service function");
0198 throw;
0199 }
0200 }
0201 }
0202
0203 template <typename T>
0204 void GlobalSchedule::postScheduleSignal(GlobalContext const* globalContext,
0205 ServiceWeakToken const& weakToken,
0206 std::exception_ptr& excpt) {
0207 if (actReg_) {
0208 try {
0209 convertException::wrap([this, &weakToken, globalContext]() {
0210 ServiceRegistry::Operate op(weakToken.lock());
0211 T::postScheduleSignal(actReg_.get(), globalContext);
0212 });
0213 } catch (cms::Exception& ex) {
0214 if (not excpt) {
0215 exceptionContext(ex, *globalContext, "Handling post signal, likely in a service function");
0216 excpt = std::current_exception();
0217 }
0218 }
0219 }
0220 }
0221
0222 }
0223
0224 #endif