File indexing completed on 2025-06-29 22:58:03
0001 #include "FWCore/Framework/interface/GlobalSchedule.h"
0002 #include "FWCore/Framework/interface/maker/ModuleMaker.h"
0003 #include "FWCore/Framework/src/TriggerResultInserter.h"
0004 #include "FWCore/Framework/src/PathStatusInserter.h"
0005 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007 #include "FWCore/Framework/interface/ModuleRegistry.h"
0008 #include "FWCore/Framework/interface/ModuleRegistryUtilities.h"
0009
0010 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0013 #include "FWCore/ParameterSet/interface/Registry.h"
0014 #include "FWCore/Utilities/interface/Algorithms.h"
0015 #include "FWCore/Utilities/interface/Exception.h"
0016 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0017 #include "FWCore/Utilities/interface/ConvertException.h"
0018 #include "FWCore/Utilities/interface/make_sentry.h"
0019
0020 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0021 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0022
0023 #include <algorithm>
0024 #include <cassert>
0025 #include <cstdlib>
0026 #include <functional>
0027 #include <map>
0028 #include <sstream>
0029
0030 namespace edm {
0031 GlobalSchedule::GlobalSchedule(
0032 std::shared_ptr<TriggerResultInserter> inserter,
0033 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0034 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0035 std::shared_ptr<ModuleRegistry> modReg,
0036 std::vector<std::string> const& iModulesToUse,
0037 ParameterSet& proc_pset,
0038 SignallingProductRegistryFiller& pregistry,
0039 PreallocationConfiguration const& prealloc,
0040 ExceptionToActionTable const& actions,
0041 std::shared_ptr<ActivityRegistry> areg,
0042 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0043 ProcessContext const* processContext)
0044 : actReg_(areg),
0045 processContext_(processContext),
0046 numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()),
0047 numberOfConcurrentRuns_(prealloc.numberOfRuns()) {
0048 unsigned int nManagers =
0049 prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns() + numberOfConcurrentProcessBlocks_;
0050 workerManagers_.reserve(nManagers);
0051 for (unsigned int i = 0; i < nManagers; ++i) {
0052 workerManagers_.emplace_back(modReg, areg, actions);
0053 }
0054 for (auto const& moduleLabel : iModulesToUse) {
0055 bool isTracked;
0056 ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0057 if (modpset != nullptr) {
0058
0059 assert(isTracked);
0060
0061
0062 for (auto& wm : workerManagers_) {
0063 (void)wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel);
0064 }
0065 }
0066 }
0067 if (inserter) {
0068 for (auto& wm : workerManagers_) {
0069 (void)wm.getWorkerForModule(*inserter);
0070 }
0071 }
0072
0073 for (auto& pathStatusInserter : pathStatusInserters) {
0074 std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
0075
0076 for (auto& wm : workerManagers_) {
0077 (void)wm.getWorkerForModule(*inserterPtr);
0078 }
0079 }
0080
0081 for (auto& endPathStatusInserter : endPathStatusInserters) {
0082 std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
0083 for (auto& wm : workerManagers_) {
0084 (void)wm.getWorkerForModule(*inserterPtr);
0085 }
0086 }
0087
0088 }
0089
0090 void GlobalSchedule::beginJob(ModuleRegistry& modReg) {
0091 constexpr static char const* const globalContext = "Processing begin Job";
0092
0093 GlobalContext gc(GlobalContext::Transition::kBeginJob, processContext_);
0094 std::exception_ptr exceptionPtr;
0095 try {
0096 convertException::wrap([this]() { actReg_->preBeginJobSignal_(*processContext_); });
0097 } catch (cms::Exception& ex) {
0098 ex.addContext("Handling pre signal, likely in a service function");
0099 ex.addContext(globalContext);
0100 exceptionPtr = std::current_exception();
0101 }
0102 if (not exceptionPtr) {
0103 try {
0104 runBeginJobForModules(gc, modReg, *actReg_, beginJobFailedForModule_);
0105 } catch (cms::Exception& ex) {
0106 ex.addContext(globalContext);
0107 exceptionPtr = std::current_exception();
0108 }
0109 }
0110 try {
0111 convertException::wrap([this]() { actReg_->postBeginJobSignal_(); });
0112 } catch (cms::Exception& ex) {
0113 if (!exceptionPtr) {
0114 ex.addContext("Handling post signal, likely in a service function");
0115 ex.addContext(globalContext);
0116 exceptionPtr = std::current_exception();
0117 }
0118 }
0119 if (exceptionPtr) {
0120 std::rethrow_exception(exceptionPtr);
0121 }
0122 }
0123
0124 void GlobalSchedule::endJob(ExceptionCollector& collector, ModuleRegistry& modReg) {
0125 constexpr static char const* const context = "Processing end Job";
0126 GlobalContext gc(GlobalContext::Transition::kEndJob, processContext_);
0127 std::exception_ptr exceptionPtr;
0128 try {
0129 convertException::wrap([this]() { actReg_->preEndJobSignal_(); });
0130 } catch (cms::Exception& ex) {
0131 ex.addContext("Handling pre signal, likely in a service function");
0132 ex.addContext(context);
0133 exceptionPtr = std::current_exception();
0134 }
0135 if (not exceptionPtr) {
0136 runEndJobForModules(gc, modReg, *actReg_, collector, beginJobFailedForModule_);
0137 }
0138
0139 try {
0140 convertException::wrap([this]() { actReg_->postEndJobSignal_(); });
0141 } catch (cms::Exception& ex) {
0142 if (!exceptionPtr) {
0143 ex.addContext("Handling post signal, likely in a service function");
0144 ex.addContext(context);
0145 exceptionPtr = std::current_exception();
0146 }
0147 }
0148 if (exceptionPtr) {
0149 collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
0150 }
0151 }
0152
0153 void GlobalSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0154 Worker* found = nullptr;
0155 for (auto& wm : workerManagers_) {
0156 for (auto const& worker : wm.allWorkers()) {
0157 if (worker->description()->moduleLabel() == iLabel) {
0158 found = worker;
0159 break;
0160 }
0161 }
0162 if (nullptr == found) {
0163 return;
0164 }
0165 iMod->replaceModuleFor(found);
0166 }
0167 auto sentry = make_sentry(
0168 iMod, [&](auto const* mod) { beginJobFailedForModule_.emplace_back(mod->moduleDescription().id()); });
0169 iMod->beginJob();
0170 sentry.release();
0171 }
0172
0173 void GlobalSchedule::deleteModule(std::string const& iLabel) {
0174 for (auto& wm : workerManagers_) {
0175 wm.deleteModuleIfExists(iLabel);
0176 }
0177 }
0178
0179 std::vector<ModuleDescription const*> GlobalSchedule::getAllModuleDescriptions() const {
0180 std::vector<ModuleDescription const*> result;
0181 result.reserve(allWorkers().size());
0182
0183 for (auto const& worker : allWorkers()) {
0184 ModuleDescription const* p = worker->description();
0185 result.push_back(p);
0186 }
0187 return result;
0188 }
0189
0190 void GlobalSchedule::handleException(GlobalContext const* globalContext,
0191 ServiceWeakToken const& weakToken,
0192 bool cleaningUpAfterException,
0193 std::exception_ptr& excpt) {
0194
0195 try {
0196 convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
0197 } catch (cms::Exception& ex) {
0198 std::ostringstream ost;
0199
0200
0201 if (ex.context().empty()) {
0202 exceptionContext(ost, *globalContext);
0203 }
0204 ServiceRegistry::Operate op(weakToken.lock());
0205 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0206 excpt = std::current_exception();
0207 }
0208
0209
0210 CMS_SA_ALLOW try {
0211 if (actReg_) {
0212 ServiceRegistry::Operate op(weakToken.lock());
0213 actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0214 }
0215 } catch (...) {
0216 }
0217 }
0218
0219 }