File indexing completed on 2025-04-13 22:49:49
0001 #include "FWCore/Framework/interface/GlobalSchedule.h"
0002 #include "FWCore/Framework/interface/maker/WorkerMaker.h"
0003 #include "FWCore/Framework/src/TriggerResultInserter.h"
0004 #include "FWCore/Framework/src/PathStatusInserter.h"
0005 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007
0008 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0009 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0011 #include "FWCore/ParameterSet/interface/Registry.h"
0012 #include "FWCore/Utilities/interface/Algorithms.h"
0013 #include "FWCore/Utilities/interface/Exception.h"
0014 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0015
0016 #include <algorithm>
0017 #include <cassert>
0018 #include <cstdlib>
0019 #include <functional>
0020 #include <map>
0021 #include <sstream>
0022
0023 namespace edm {
0024 GlobalSchedule::GlobalSchedule(
0025 std::shared_ptr<TriggerResultInserter> inserter,
0026 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0027 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0028 std::shared_ptr<ModuleRegistry> modReg,
0029 std::vector<std::string> const& iModulesToUse,
0030 ParameterSet& proc_pset,
0031 SignallingProductRegistryFiller& pregistry,
0032 PreallocationConfiguration const& prealloc,
0033 ExceptionToActionTable const& actions,
0034 std::shared_ptr<ActivityRegistry> areg,
0035 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0036 ProcessContext const* processContext)
0037 : actReg_(areg),
0038 processContext_(processContext),
0039 numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()),
0040 numberOfConcurrentRuns_(prealloc.numberOfRuns()) {
0041 unsigned int nManagers = prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns() +
0042 numberOfConcurrentProcessBlocks_ + numberOfConcurrentJobs_;
0043 workerManagers_.reserve(nManagers);
0044 for (unsigned int i = 0; i < nManagers; ++i) {
0045 workerManagers_.emplace_back(modReg, areg, actions);
0046 }
0047 for (auto const& moduleLabel : iModulesToUse) {
0048 bool isTracked;
0049 ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0050 if (modpset != nullptr) {
0051
0052 assert(isTracked);
0053
0054
0055 for (auto& wm : workerManagers_) {
0056 wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
0057 }
0058 }
0059 }
0060 if (inserter) {
0061 inserter->doPreallocate(prealloc);
0062 for (auto& wm : workerManagers_) {
0063 auto results_inserter = WorkerPtr(new edm::WorkerT<TriggerResultInserter::ModuleType>(
0064 inserter, inserter->moduleDescription(), &actions));
0065 results_inserter->setActivityRegistry(actReg_);
0066 wm.addToAllWorkers(results_inserter.get());
0067 extraWorkers_.emplace_back(std::move(results_inserter));
0068 }
0069 }
0070
0071 for (auto& pathStatusInserter : pathStatusInserters) {
0072 std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
0073 inserterPtr->doPreallocate(prealloc);
0074
0075 for (auto& wm : workerManagers_) {
0076 WorkerPtr workerPtr(
0077 new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
0078 workerPtr->setActivityRegistry(actReg_);
0079 wm.addToAllWorkers(workerPtr.get());
0080 extraWorkers_.emplace_back(std::move(workerPtr));
0081 }
0082 }
0083
0084 for (auto& endPathStatusInserter : endPathStatusInserters) {
0085 std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
0086 inserterPtr->doPreallocate(prealloc);
0087 for (auto& wm : workerManagers_) {
0088 WorkerPtr workerPtr(new edm::WorkerT<EndPathStatusInserter::ModuleType>(
0089 inserterPtr, inserterPtr->moduleDescription(), &actions));
0090 workerPtr->setActivityRegistry(actReg_);
0091 wm.addToAllWorkers(workerPtr.get());
0092 extraWorkers_.emplace_back(std::move(workerPtr));
0093 }
0094 }
0095
0096 }
0097
0098 void GlobalSchedule::beginJob(ProductRegistry const& iRegistry,
0099 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0100 ProcessBlockHelperBase const& processBlockHelperBase,
0101 ProcessContext const& processContext) {
0102 GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0103 unsigned int const managerIndex =
0104 numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0105
0106 std::exception_ptr exceptionPtr;
0107 CMS_SA_ALLOW try {
0108 try {
0109 convertException::wrap([this, &processContext]() { actReg_->preBeginJobSignal_(processContext); });
0110 } catch (cms::Exception& ex) {
0111 exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0112 throw;
0113 }
0114 workerManagers_[managerIndex].beginJob(iRegistry, iESIndices, processBlockHelperBase, globalContext);
0115 } catch (...) {
0116 exceptionPtr = std::current_exception();
0117 }
0118
0119 try {
0120 convertException::wrap([this]() { actReg_->postBeginJobSignal_(); });
0121 } catch (cms::Exception& ex) {
0122 if (!exceptionPtr) {
0123 exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0124 exceptionPtr = std::current_exception();
0125 }
0126 }
0127 if (exceptionPtr) {
0128 std::rethrow_exception(exceptionPtr);
0129 }
0130 }
0131
0132 void GlobalSchedule::endJob(ExceptionCollector& collector) {
0133 GlobalContext globalContext(GlobalContext::Transition::kEndJob, processContext_);
0134 unsigned int const managerIndex =
0135 numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0136
0137 std::exception_ptr exceptionPtr;
0138 CMS_SA_ALLOW try {
0139 try {
0140 convertException::wrap([this]() { actReg_->preEndJobSignal_(); });
0141 } catch (cms::Exception& ex) {
0142 exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0143 throw;
0144 }
0145 workerManagers_[managerIndex].endJob(collector, globalContext);
0146 } catch (...) {
0147 exceptionPtr = std::current_exception();
0148 }
0149
0150 try {
0151 convertException::wrap([this]() { actReg_->postEndJobSignal_(); });
0152 } catch (cms::Exception& ex) {
0153 if (!exceptionPtr) {
0154 exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0155 exceptionPtr = std::current_exception();
0156 }
0157 }
0158 if (exceptionPtr) {
0159 collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
0160 }
0161 }
0162
0163 void GlobalSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0164 Worker* found = nullptr;
0165 unsigned int const jobManagerIndex =
0166 numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0167 unsigned int managerIndex = 0;
0168 for (auto& wm : workerManagers_) {
0169 for (auto const& worker : wm.allWorkers()) {
0170 if (worker->description()->moduleLabel() == iLabel) {
0171 found = worker;
0172 break;
0173 }
0174 }
0175 if (nullptr == found) {
0176 return;
0177 }
0178
0179 iMod->replaceModuleFor(found);
0180 if (managerIndex == jobManagerIndex) {
0181 GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0182 found->beginJob(globalContext);
0183 }
0184 ++managerIndex;
0185 }
0186 }
0187
0188 void GlobalSchedule::deleteModule(std::string const& iLabel) {
0189 for (auto& wm : workerManagers_) {
0190 wm.deleteModuleIfExists(iLabel);
0191 }
0192 }
0193
0194 void GlobalSchedule::releaseMemoryPostLookupSignal() {
0195 unsigned int const managerIndex =
0196 numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0197 workerManagers_[managerIndex].releaseMemoryPostLookupSignal();
0198 }
0199
0200 std::vector<ModuleDescription const*> GlobalSchedule::getAllModuleDescriptions() const {
0201 std::vector<ModuleDescription const*> result;
0202 result.reserve(allWorkers().size());
0203
0204 for (auto const& worker : allWorkers()) {
0205 ModuleDescription const* p = worker->description();
0206 result.push_back(p);
0207 }
0208 return result;
0209 }
0210
0211 void GlobalSchedule::handleException(GlobalContext const* globalContext,
0212 ServiceWeakToken const& weakToken,
0213 bool cleaningUpAfterException,
0214 std::exception_ptr& excpt) {
0215
0216 try {
0217 convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
0218 } catch (cms::Exception& ex) {
0219 std::ostringstream ost;
0220
0221
0222 if (ex.context().empty()) {
0223 exceptionContext(ost, *globalContext);
0224 }
0225 ServiceRegistry::Operate op(weakToken.lock());
0226 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0227 excpt = std::current_exception();
0228 }
0229
0230
0231 CMS_SA_ALLOW try {
0232 if (actReg_) {
0233 ServiceRegistry::Operate op(weakToken.lock());
0234 actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0235 }
0236 } catch (...) {
0237 }
0238 }
0239
0240 }