File indexing completed on 2022-10-13 03:37:56
0001 #ifndef FWCore_Framework_GlobalSchedule_h
0002 #define FWCore_Framework_GlobalSchedule_h
0003
0004
0005
0006
0007 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0008 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0009 #include "FWCore/Framework/interface/EventPrincipal.h"
0010 #include "FWCore/Framework/interface/ExceptionActions.h"
0011 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0012 #include "FWCore/Framework/interface/Frameworkfwd.h"
0013 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0014 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0015 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0016 #include "FWCore/Framework/interface/RunPrincipal.h"
0017 #include "FWCore/Framework/interface/WorkerManager.h"
0018 #include "FWCore/Framework/interface/maker/Worker.h"
0019 #include "FWCore/Framework/interface/WorkerRegistry.h"
0020 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0021 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0022 #include "FWCore/Utilities/interface/Algorithms.h"
0023 #include "FWCore/Utilities/interface/BranchType.h"
0024 #include "FWCore/Utilities/interface/ConvertException.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/StreamID.h"
0027 #include "FWCore/Utilities/interface/propagate_const.h"
0028 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0029 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0030
0031 #include <map>
0032 #include <memory>
0033 #include <set>
0034 #include <string>
0035 #include <vector>
0036 #include <sstream>
0037 #include "boost/range/adaptor/reversed.hpp"
0038
0039 namespace edm {
0040
0041 namespace {
0042 template <typename T>
0043 class GlobalScheduleSignalSentry {
0044 public:
0045 GlobalScheduleSignalSentry(ActivityRegistry* a, typename T::Context const* context)
0046 : a_(a), context_(context), allowThrow_(false) {
0047 if (a_)
0048 T::preScheduleSignal(a_, context_);
0049 }
0050 ~GlobalScheduleSignalSentry() noexcept(false) {
0051
0052 CMS_SA_ALLOW try {
0053 if (a_)
0054 T::postScheduleSignal(a_, context_);
0055 } catch (...) {
0056 if (allowThrow_) {
0057 throw;
0058 }
0059 }
0060 }
0061
0062 void allowThrow() { allowThrow_ = true; }
0063
0064 private:
0065
0066 ActivityRegistry* a_;
0067 typename T::Context const* context_;
0068 bool allowThrow_;
0069 };
0070 }
0071
0072 class ActivityRegistry;
0073 class ExceptionCollector;
0074 class ProcessContext;
0075 class PreallocationConfiguration;
0076 class ModuleRegistry;
0077 class TriggerResultInserter;
0078 class PathStatusInserter;
0079 class EndPathStatusInserter;
0080
0081 class GlobalSchedule {
0082 public:
0083 typedef std::vector<std::string> vstring;
0084 typedef std::vector<Worker*> AllWorkers;
0085 typedef std::shared_ptr<Worker> WorkerPtr;
0086 typedef std::vector<Worker*> Workers;
0087
0088 GlobalSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0089 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0090 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0091 std::shared_ptr<ModuleRegistry> modReg,
0092 std::vector<std::string> const& modulesToUse,
0093 ParameterSet& proc_pset,
0094 ProductRegistry& pregistry,
0095 PreallocationConfiguration const& prealloc,
0096 ExceptionToActionTable const& actions,
0097 std::shared_ptr<ActivityRegistry> areg,
0098 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0099 ProcessContext const* processContext);
0100 GlobalSchedule(GlobalSchedule const&) = delete;
0101
0102 template <typename T>
0103 void processOneGlobalAsync(WaitingTaskHolder holder,
0104 typename T::TransitionInfoType&,
0105 ServiceToken const& token,
0106 bool cleaningUpAfterException = false);
0107
0108 void beginJob(ProductRegistry const&, eventsetup::ESRecordsToProxyIndices const&, ProcessBlockHelperBase const&);
0109 void endJob(ExceptionCollector& collector);
0110
0111
0112
0113
0114
0115
0116
0117 std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0118
0119
0120 bool terminate() const;
0121
0122
0123 void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0124
0125
0126 void deleteModule(std::string const& iLabel);
0127
0128
0129 AllWorkers const& allWorkers() const { return workerManagers_[0].allWorkers(); }
0130
0131 private:
0132
0133
0134
0135
0136 class SendTerminationSignalIfException {
0137 public:
0138 SendTerminationSignalIfException(edm::ActivityRegistry* iReg, edm::GlobalContext const* iContext)
0139 : reg_(iReg), context_(iContext) {}
0140 ~SendTerminationSignalIfException() {
0141 if (reg_) {
0142 reg_->preGlobalEarlyTerminationSignal_(*context_, TerminationOrigin::ExceptionFromThisContext);
0143 }
0144 }
0145 void completedSuccessfully() { reg_ = nullptr; }
0146
0147 private:
0148 edm::ActivityRegistry* reg_;
0149 GlobalContext const* context_;
0150 };
0151
0152
0153 ExceptionToActionTable const& actionTable() const { return workerManagers_[0].actionTable(); }
0154
0155 std::vector<WorkerManager> workerManagers_;
0156 std::shared_ptr<ActivityRegistry> actReg_;
0157 std::vector<edm::propagate_const<WorkerPtr>> extraWorkers_;
0158 ProcessContext const* processContext_;
0159 unsigned int numberOfConcurrentLumis_;
0160 };
0161
0162 template <typename T>
0163 void GlobalSchedule::processOneGlobalAsync(WaitingTaskHolder iHolder,
0164 typename T::TransitionInfoType& transitionInfo,
0165 ServiceToken const& token,
0166 bool cleaningUpAfterException) {
0167 auto const& principal = transitionInfo.principal();
0168
0169
0170 CMS_SA_ALLOW try {
0171
0172 auto globalContext = std::make_shared<GlobalContext>(T::makeGlobalContext(principal, processContext_));
0173
0174 if (actReg_) {
0175
0176 ServiceRegistry::Operate op(token);
0177 T::preScheduleSignal(actReg_.get(), globalContext.get());
0178 }
0179
0180 ServiceWeakToken weakToken = token;
0181 auto doneTask = make_waiting_task(
0182 [this, iHolder, cleaningUpAfterException, globalContext, weakToken](std::exception_ptr const* iPtr) mutable {
0183 std::exception_ptr excpt;
0184 if (iPtr) {
0185 excpt = *iPtr;
0186
0187 try {
0188 convertException::wrap([&]() { std::rethrow_exception(excpt); });
0189 } catch (cms::Exception& ex) {
0190
0191 std::ostringstream ost;
0192 if (ex.context().empty()) {
0193 ost << "Processing " << T::transitionName() << " ";
0194 }
0195 ServiceRegistry::Operate op(weakToken.lock());
0196 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0197 excpt = std::current_exception();
0198 }
0199 if (actReg_) {
0200 ServiceRegistry::Operate op(weakToken.lock());
0201 actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0202 }
0203 }
0204 if (actReg_) {
0205
0206 CMS_SA_ALLOW try {
0207 ServiceRegistry::Operate op(weakToken.lock());
0208 T::postScheduleSignal(actReg_.get(), globalContext.get());
0209 } catch (...) {
0210 if (not excpt) {
0211 excpt = std::current_exception();
0212 }
0213 }
0214 }
0215 iHolder.doneWaiting(excpt);
0216 });
0217 unsigned int managerIndex = principal.index();
0218 if constexpr (T::branchType_ == InRun) {
0219 managerIndex += numberOfConcurrentLumis_;
0220 }
0221 WorkerManager& workerManager = workerManagers_[managerIndex];
0222 workerManager.resetAll();
0223
0224 ParentContext parentContext(globalContext.get());
0225
0226
0227 workerManager.setupResolvers(transitionInfo.principal());
0228
0229
0230 WaitingTaskHolder holdForLoop(*iHolder.group(), doneTask);
0231 auto& aw = workerManager.allWorkers();
0232 for (Worker* worker : boost::adaptors::reverse(aw)) {
0233 worker->doWorkAsync<T>(
0234 holdForLoop, transitionInfo, token, StreamID::invalidStreamID(), parentContext, globalContext.get());
0235 }
0236 } catch (...) {
0237 iHolder.doneWaiting(std::current_exception());
0238 }
0239 }
0240 }
0241
0242 #endif