File indexing completed on 2024-07-28 22:48:23
0001 #include "FWCore/Framework/interface/GlobalSchedule.h"
0002 #include "FWCore/Framework/interface/maker/WorkerMaker.h"
0003 #include "FWCore/Framework/src/TriggerResultInserter.h"
0004 #include "FWCore/Framework/src/PathStatusInserter.h"
0005 #include "FWCore/Framework/src/EndPathStatusInserter.h"
0006 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0007
0008 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0009 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0011 #include "FWCore/ParameterSet/interface/Registry.h"
0012 #include "FWCore/Utilities/interface/Algorithms.h"
0013 #include "FWCore/Utilities/interface/Exception.h"
0014 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0015
0016 #include <algorithm>
0017 #include <cassert>
0018 #include <cstdlib>
0019 #include <functional>
0020 #include <map>
0021 #include <sstream>
0022
0023 namespace edm {
0024 GlobalSchedule::GlobalSchedule(
0025 std::shared_ptr<TriggerResultInserter> inserter,
0026 std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0027 std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0028 std::shared_ptr<ModuleRegistry> modReg,
0029 std::vector<std::string> const& iModulesToUse,
0030 ParameterSet& proc_pset,
0031 ProductRegistry& pregistry,
0032 PreallocationConfiguration const& prealloc,
0033 ExceptionToActionTable const& actions,
0034 std::shared_ptr<ActivityRegistry> areg,
0035 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0036 ProcessContext const* processContext)
0037 : actReg_(areg),
0038 processContext_(processContext),
0039 numberOfConcurrentLumis_(prealloc.numberOfLuminosityBlocks()),
0040 numberOfConcurrentRuns_(prealloc.numberOfRuns()) {
0041 unsigned int nManagers = prealloc.numberOfLuminosityBlocks() + prealloc.numberOfRuns() +
0042 numberOfConcurrentProcessBlocks_ + numberOfConcurrentJobs_;
0043 workerManagers_.reserve(nManagers);
0044 for (unsigned int i = 0; i < nManagers; ++i) {
0045 workerManagers_.emplace_back(modReg, areg, actions);
0046 }
0047 for (auto const& moduleLabel : iModulesToUse) {
0048 bool isTracked;
0049 ParameterSet* modpset = proc_pset.getPSetForUpdate(moduleLabel, isTracked);
0050 if (modpset != nullptr) {
0051
0052 assert(isTracked);
0053
0054
0055 for (auto& wm : workerManagers_) {
0056 wm.addToAllWorkers(wm.getWorker(*modpset, pregistry, &prealloc, processConfiguration, moduleLabel));
0057 }
0058 }
0059 }
0060 if (inserter) {
0061 inserter->doPreallocate(prealloc);
0062 for (auto& wm : workerManagers_) {
0063 auto results_inserter = WorkerPtr(new edm::WorkerT<TriggerResultInserter::ModuleType>(
0064 inserter, inserter->moduleDescription(), &actions));
0065 results_inserter->setActivityRegistry(actReg_);
0066 wm.addToAllWorkers(results_inserter.get());
0067 extraWorkers_.emplace_back(std::move(results_inserter));
0068 }
0069 }
0070
0071 for (auto& pathStatusInserter : pathStatusInserters) {
0072 std::shared_ptr<PathStatusInserter> inserterPtr = get_underlying(pathStatusInserter);
0073 inserterPtr->doPreallocate(prealloc);
0074
0075 for (auto& wm : workerManagers_) {
0076 WorkerPtr workerPtr(
0077 new edm::WorkerT<PathStatusInserter::ModuleType>(inserterPtr, inserterPtr->moduleDescription(), &actions));
0078 workerPtr->setActivityRegistry(actReg_);
0079 wm.addToAllWorkers(workerPtr.get());
0080 extraWorkers_.emplace_back(std::move(workerPtr));
0081 }
0082 }
0083
0084 for (auto& endPathStatusInserter : endPathStatusInserters) {
0085 std::shared_ptr<EndPathStatusInserter> inserterPtr = get_underlying(endPathStatusInserter);
0086 inserterPtr->doPreallocate(prealloc);
0087 for (auto& wm : workerManagers_) {
0088 WorkerPtr workerPtr(new edm::WorkerT<EndPathStatusInserter::ModuleType>(
0089 inserterPtr, inserterPtr->moduleDescription(), &actions));
0090 workerPtr->setActivityRegistry(actReg_);
0091 wm.addToAllWorkers(workerPtr.get());
0092 extraWorkers_.emplace_back(std::move(workerPtr));
0093 }
0094 }
0095
0096 }
0097
0098 void GlobalSchedule::beginJob(ProductRegistry const& iRegistry,
0099 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0100 ProcessBlockHelperBase const& processBlockHelperBase,
0101 PathsAndConsumesOfModulesBase const& pathsAndConsumesOfModules,
0102 ProcessContext const& processContext) {
0103 GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0104 unsigned int const managerIndex =
0105 numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0106
0107 std::exception_ptr exceptionPtr;
0108 CMS_SA_ALLOW try {
0109 try {
0110 convertException::wrap([this, &pathsAndConsumesOfModules, &processContext]() {
0111 actReg_->preBeginJobSignal_(pathsAndConsumesOfModules, processContext);
0112 });
0113 } catch (cms::Exception& ex) {
0114 exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0115 throw;
0116 }
0117 workerManagers_[managerIndex].beginJob(iRegistry, iESIndices, processBlockHelperBase, globalContext);
0118 } catch (...) {
0119 exceptionPtr = std::current_exception();
0120 }
0121
0122 try {
0123 convertException::wrap([this]() { actReg_->postBeginJobSignal_(); });
0124 } catch (cms::Exception& ex) {
0125 if (!exceptionPtr) {
0126 exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0127 exceptionPtr = std::current_exception();
0128 }
0129 }
0130 if (exceptionPtr) {
0131 std::rethrow_exception(exceptionPtr);
0132 }
0133 }
0134
0135 void GlobalSchedule::endJob(ExceptionCollector& collector) {
0136 GlobalContext globalContext(GlobalContext::Transition::kEndJob, processContext_);
0137 unsigned int const managerIndex =
0138 numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0139
0140 std::exception_ptr exceptionPtr;
0141 CMS_SA_ALLOW try {
0142 try {
0143 convertException::wrap([this]() { actReg_->preEndJobSignal_(); });
0144 } catch (cms::Exception& ex) {
0145 exceptionContext(ex, globalContext, "Handling pre signal, likely in a service function");
0146 throw;
0147 }
0148 workerManagers_[managerIndex].endJob(collector, globalContext);
0149 } catch (...) {
0150 exceptionPtr = std::current_exception();
0151 }
0152
0153 try {
0154 convertException::wrap([this]() { actReg_->postEndJobSignal_(); });
0155 } catch (cms::Exception& ex) {
0156 if (!exceptionPtr) {
0157 exceptionContext(ex, globalContext, "Handling post signal, likely in a service function");
0158 exceptionPtr = std::current_exception();
0159 }
0160 }
0161 if (exceptionPtr) {
0162 collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
0163 }
0164 }
0165
0166 void GlobalSchedule::replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel) {
0167 Worker* found = nullptr;
0168 unsigned int const jobManagerIndex =
0169 numberOfConcurrentLumis_ + numberOfConcurrentRuns_ + numberOfConcurrentProcessBlocks_;
0170 unsigned int managerIndex = 0;
0171 for (auto& wm : workerManagers_) {
0172 for (auto const& worker : wm.allWorkers()) {
0173 if (worker->description()->moduleLabel() == iLabel) {
0174 found = worker;
0175 break;
0176 }
0177 }
0178 if (nullptr == found) {
0179 return;
0180 }
0181
0182 iMod->replaceModuleFor(found);
0183 if (managerIndex == jobManagerIndex) {
0184 GlobalContext globalContext(GlobalContext::Transition::kBeginJob, processContext_);
0185 found->beginJob(globalContext);
0186 }
0187 ++managerIndex;
0188 }
0189 }
0190
0191 void GlobalSchedule::deleteModule(std::string const& iLabel) {
0192 for (auto& wm : workerManagers_) {
0193 wm.deleteModuleIfExists(iLabel);
0194 }
0195 }
0196
0197 std::vector<ModuleDescription const*> GlobalSchedule::getAllModuleDescriptions() const {
0198 std::vector<ModuleDescription const*> result;
0199 result.reserve(allWorkers().size());
0200
0201 for (auto const& worker : allWorkers()) {
0202 ModuleDescription const* p = worker->description();
0203 result.push_back(p);
0204 }
0205 return result;
0206 }
0207
0208 void GlobalSchedule::handleException(GlobalContext const* globalContext,
0209 ServiceWeakToken const& weakToken,
0210 bool cleaningUpAfterException,
0211 std::exception_ptr& excpt) {
0212
0213 try {
0214 convertException::wrap([&excpt]() { std::rethrow_exception(excpt); });
0215 } catch (cms::Exception& ex) {
0216 std::ostringstream ost;
0217
0218
0219 if (ex.context().empty()) {
0220 exceptionContext(ost, *globalContext);
0221 }
0222 ServiceRegistry::Operate op(weakToken.lock());
0223 addContextAndPrintException(ost.str().c_str(), ex, cleaningUpAfterException);
0224 excpt = std::current_exception();
0225 }
0226
0227
0228 CMS_SA_ALLOW try {
0229 if (actReg_) {
0230 ServiceRegistry::Operate op(weakToken.lock());
0231 actReg_->preGlobalEarlyTerminationSignal_(*globalContext, TerminationOrigin::ExceptionFromThisContext);
0232 }
0233 } catch (...) {
0234 }
0235 }
0236
0237 }