File indexing completed on 2024-06-04 04:34:52
0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003
0004 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0005 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0006 #include "FWCore/Framework/interface/EventPrincipal.h"
0007 #include "FWCore/Framework/interface/ExceptionActions.h"
0008 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0009 #include "FWCore/Framework/interface/Frameworkfwd.h"
0010 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0012 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0013 #include "FWCore/Framework/interface/RunPrincipal.h"
0014 #include "FWCore/Framework/interface/WorkerManager.h"
0015 #include "FWCore/Framework/interface/maker/Worker.h"
0016 #include "FWCore/Framework/interface/WorkerRegistry.h"
0017 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0018 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0019 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0021 #include "FWCore/Utilities/interface/Algorithms.h"
0022 #include "FWCore/Utilities/interface/BranchType.h"
0023 #include "FWCore/Utilities/interface/ConvertException.h"
0024 #include "FWCore/Utilities/interface/Exception.h"
0025 #include "FWCore/Utilities/interface/StreamID.h"
0026 #include "FWCore/Utilities/interface/propagate_const.h"
0027 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0028 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0029
0030 #include <exception>
0031 #include <map>
0032 #include <memory>
0033 #include <set>
0034 #include <string>
0035 #include <vector>
0036 #include <sstream>
0037 #include "boost/range/adaptor/reversed.hpp"
0038
0039 namespace edm {
0040
0041 class ActivityRegistry;
0042 class ExceptionCollector;
0043 class ProcessContext;
0044 class PreallocationConfiguration;
0045 class ModuleRegistry;
0046 class TriggerResultInserter;
0047 class PathStatusInserter;
0048 class EndPathStatusInserter;
0049
0050 class GlobalSchedule {
0051 public:
0052 typedef std::vector<std::string> vstring;
0053 typedef std::vector<Worker*> AllWorkers;
0054 typedef std::shared_ptr<Worker> WorkerPtr;
0055 typedef std::vector<Worker*> Workers;
0056
0057 GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0058 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0059 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0060 std::shared_ptr<ModuleRegistry> modReg,
0061 std::vector<std::string> const& modulesToUse,
0062 ParameterSet& proc_pset,
0063 ProductRegistry& pregistry,
0064 PreallocationConfiguration const& prealloc,
0065 ExceptionToActionTable const& actions,
0066 std::shared_ptr<ActivityRegistry> areg,
0067 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0068 ProcessContext const* processContext);
0069 GlobalSchedule(GlobalSchedule const&) = delete;
0070
0071 template <typename T>
0072 void processOneGlobalAsync(WaitingTaskHolder holder,
0073 typename T::TransitionInfoType&,
0074 ServiceToken const& token,
0075 bool cleaningUpAfterException = false);
0076
0077 void beginJob(ProductRegistry const&,
0078 eventsetup::ESRecordsToProductResolverIndices const&,
0079 ProcessBlockHelperBase const&);
0080 void endJob(ExceptionCollector& collector);
0081
0082
0083
0084
0085
0086
0087
0088 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0089
0090
0091 bool terminate() const;
0092
0093
0094 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0095
0096
0097 void deleteModule(std::string const& iLabel);
0098
0099
0100 AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0101
0102 private:
0103
0104 ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0105
0106 template <typename T>
0107 void preScheduleSignal(GlobalContext const*, ServiceToken const&);
0108
0109 template <typename T>
0110 void postScheduleSignal(GlobalContext const*, ServiceWeakToken const&, std::exception_ptr&);
0111
0112 void handleException(GlobalContext const*,
0113 ServiceWeakToken const&,
0114 bool cleaningUpAfterException,
0115 std::exception_ptr&);
0116
0117 std::vector<WorkerManager> workerManagers_;
0118 std::shared_ptr<ActivityRegistry> actReg_;
0119 std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0120 ProcessContext const* processContext_;
0121 unsigned int numberOfConcurrentLumis_;
0122 };
0123
0124 template <typename T>
0125 void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0126 typename T::TransitionInfoType& transitionInfo,
0127 ServiceToken const& token,
0128 bool cleaningUpAfterException) {
0129 auto const& principal = transitionInfo.principal();
0130
0131
0132 CMS_SA_ALLOW try {
0133
0134 auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0135
0136 ServiceWeakToken weakToken = token;
0137 auto doneTask = make_waiting_task(
0138 [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0139 std::exception_ptr excpt;
0140 if (iPtr) {
0141 excpt = *iPtr;
0142
0143 handleException(globalContext.get(), weakToken, cleaningUpAfterException, excpt);
0144 }
0145 postScheduleSignal<T>(globalContext.get(), weakToken, excpt);
0146 iHolder.doneWaiting(excpt);
0147 });
0148
0149
0150 WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0151
0152 CMS_SA_ALLOW try {
0153 preScheduleSignal<T>(globalContext.get(), token);
0154
0155 unsigned int managerIndex = principal.index();
0156 if constexpr (T::branchType_ == InRun) {
0157 managerIndex += numberOfConcurrentLumis_;
0158 }
0159 WorkerManager& workerManager = workerManagers_[managerIndex];
0160 workerManager.resetAll();
0161
0162 ParentContext parentContext(globalContext.get());
0163
0164
0165 workerManager.setupResolvers(transitionInfo.principal());
0166
0167 auto& aw = workerManager.allWorkers();
0168 for (Worker* worker : boost::adaptors::reverse(aw)) {
0169 worker->doWorkAsync<T>(
0170 holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0171 }
0172 } catch (...) {
0173 holdForLoop.doneWaiting(std::current_exception());
0174 }
0175 } catch (...) {
0176 iHolder.doneWaiting(std::current_exception());
0177 }
0178 }
0179
0180 template <typename T>
0181 void GlobalSchedule::preScheduleSignal(GlobalContext const* globalContext, ServiceToken const& token) {
0182 if (actReg_) {
0183 try {
0184 ServiceRegistry::Operate op(token);
0185 convertException::wrap([this, globalContext]() { T::preScheduleSignal(actReg_.get(), globalContext); });
0186 } catch (cms::Exception& ex) {
0187 std::ostringstream ost;
0188 ex.addContext("Handling pre signal, likely in a service function");
0189 exceptionContext(ost, *globalContext);
0190 ex.addContext(ost.str());
0191 throw;
0192 }
0193 }
0194 }
0195
0196 template <typename T>
0197 void GlobalSchedule::postScheduleSignal(GlobalContext const* globalContext,
0198 ServiceWeakToken const& weakToken,
0199 std::exception_ptr& excpt) {
0200 if (actReg_) {
0201 try {
0202 convertException::wrap([this, &weakToken, globalContext]() {
0203 ServiceRegistry::Operate op(weakToken.lock());
0204 T::postScheduleSignal(actReg_.get(), globalContext);
0205 });
0206 } catch (cms::Exception& ex) {
0207 if (not excpt) {
0208 std::ostringstream ost;
0209 ex.addContext("Handling post signal, likely in a service function");
0210 exceptionContext(ost, *globalContext);
0211 ex.addContext(ost.str());
0212 excpt = std::current_exception();
0213 }
0214 }
0215 }
0216 }
0217
0218 }
0219
0220 #endif