File indexing completed on 2025-03-13 02:31:54
0001 #include "FWCore/Framework/interface/WorkerManager.h"
0002 #include "UnscheduledConfigurator.h"
0003
0004 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0005 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0006 #include "FWCore/Framework/interface/maker/Worker.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/Utilities/interface/Algorithms.h"
0009 #include "FWCore/Utilities/interface/ConvertException.h"
0010 #include "FWCore/Utilities/interface/Exception.h"
0011 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0012 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0013
0014 #include <exception>
0015 #include <functional>
0016
0017 static const std::string kFilterType("EDFilter");
0018 static const std::string kProducerType("EDProducer");
0019
0020 namespace edm {
0021
0022
0023 WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
0024 ExceptionToActionTable const& actions,
0025 ModuleTypeResolverMaker const* typeResolverMaker)
0026 : workerReg_(areg, typeResolverMaker),
0027 actionTable_(&actions),
0028 allWorkers_(),
0029 unscheduled_(*areg),
0030 lastSetupEventPrincipal_(nullptr) {}
0031
0032 WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0033 std::shared_ptr<ActivityRegistry> areg,
0034 ExceptionToActionTable const& actions)
0035 : workerReg_(areg, modReg),
0036 actionTable_(&actions),
0037 allWorkers_(),
0038 unscheduled_(*areg),
0039 lastSetupEventPrincipal_(nullptr) {}
0040
0041 void WorkerManager::deleteModuleIfExists(std::string const& moduleLabel) {
0042 auto worker = workerReg_.get(moduleLabel);
0043 if (worker != nullptr) {
0044 auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
0045 allWorkers_.erase(eraseBeg, allWorkers_.end());
0046 unscheduled_.removeWorker(worker);
0047 workerReg_.deleteModule(moduleLabel);
0048 }
0049 }
0050
0051 Worker* WorkerManager::getWorker(ParameterSet& pset,
0052 SignallingProductRegistryFiller& preg,
0053 PreallocationConfiguration const* prealloc,
0054 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0055 std::string const& label) {
0056 WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
0057 return workerReg_.getWorker(params, label);
0058 }
0059
0060 void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset,
0061 SignallingProductRegistryFiller& preg,
0062 PreallocationConfiguration const* prealloc,
0063 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0064 std::string label,
0065 std::set<std::string>& unscheduledLabels,
0066 std::vector<std::string>& shouldBeUsedLabels) {
0067
0068
0069
0070 auto modType = pset.getParameter<std::string>("@module_edm_type");
0071 if (modType == kProducerType || modType == kFilterType) {
0072 Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
0073 assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
0074 unscheduledLabels.insert(label);
0075 unscheduled_.addWorker(newWorker);
0076
0077 addToAllWorkers(newWorker);
0078 } else {
0079 shouldBeUsedLabels.push_back(label);
0080 }
0081 }
0082
0083 void WorkerManager::releaseMemoryPostLookupSignal() {
0084 for (auto& worker : allWorkers_) {
0085 worker->releaseMemoryPostLookupSignal();
0086 }
0087 }
0088
0089 void WorkerManager::beginJob(ProductRegistry const& iRegistry,
0090 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0091 ProcessBlockHelperBase const& processBlockHelperBase,
0092 GlobalContext const& globalContext) {
0093 std::exception_ptr exceptionPtr;
0094 CMS_SA_ALLOW try {
0095 auto const processBlockLookup = iRegistry.productLookup(InProcess);
0096 auto const runLookup = iRegistry.productLookup(InRun);
0097 auto const lumiLookup = iRegistry.productLookup(InLumi);
0098 auto const eventLookup = iRegistry.productLookup(InEvent);
0099 if (!allWorkers_.empty()) {
0100 auto const& processName = allWorkers_[0]->description()->processName();
0101 auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
0102 auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
0103 auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
0104 auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
0105 for (auto& worker : allWorkers_) {
0106 worker->updateLookup(InProcess, *processBlockLookup);
0107 worker->updateLookup(InRun, *runLookup);
0108 worker->updateLookup(InLumi, *lumiLookup);
0109 worker->updateLookup(InEvent, *eventLookup);
0110 worker->updateLookup(iESIndices);
0111 worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
0112 worker->resolvePutIndicies(InRun, runModuleToIndicies);
0113 worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
0114 worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
0115 worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
0116 }
0117 }
0118 } catch (...) {
0119 exceptionPtr = std::current_exception();
0120 }
0121
0122 for (auto& worker : allWorkers_) {
0123 CMS_SA_ALLOW try { worker->beginJob(globalContext); } catch (...) {
0124 if (!exceptionPtr) {
0125 exceptionPtr = std::current_exception();
0126 }
0127 }
0128 }
0129 if (exceptionPtr) {
0130 std::rethrow_exception(exceptionPtr);
0131 }
0132 }
0133
0134 void WorkerManager::endJob(ExceptionCollector& collector, GlobalContext const& globalContext) {
0135 for (auto& worker : allWorkers_) {
0136 try {
0137 convertException::wrap([&worker, &globalContext]() { worker->endJob(globalContext); });
0138 } catch (cms::Exception const& ex) {
0139 collector.addException(ex);
0140 }
0141 }
0142 }
0143
0144 void WorkerManager::beginStream(StreamID streamID, StreamContext const& streamContext) {
0145 std::exception_ptr exceptionPtr;
0146 for (auto& worker : allWorkers_) {
0147 CMS_SA_ALLOW try { worker->beginStream(streamID, streamContext); } catch (...) {
0148 if (!exceptionPtr) {
0149 exceptionPtr = std::current_exception();
0150 }
0151 }
0152 }
0153 if (exceptionPtr) {
0154 std::rethrow_exception(exceptionPtr);
0155 }
0156 }
0157
0158 void WorkerManager::endStream(StreamID streamID,
0159 StreamContext const& streamContext,
0160 ExceptionCollector& collector,
0161 std::mutex& collectorMutex) noexcept {
0162 for (auto& worker : allWorkers_) {
0163 CMS_SA_ALLOW try { worker->endStream(streamID, streamContext); } catch (...) {
0164 std::exception_ptr exceptionPtr = std::current_exception();
0165 std::lock_guard<std::mutex> collectorLock(collectorMutex);
0166 collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
0167 }
0168 }
0169 }
0170
0171 void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
0172
0173 void WorkerManager::addToAllWorkers(Worker* w) {
0174 if (!search_all(allWorkers_, w)) {
0175 allWorkers_.push_back(w);
0176 }
0177 }
0178
0179 void WorkerManager::setupResolvers(Principal& ep) {
0180 this->resetAll();
0181 if (&ep != lastSetupEventPrincipal_) {
0182 UnscheduledConfigurator config(allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
0183 ep.setupUnscheduled(config);
0184 lastSetupEventPrincipal_ = &ep;
0185 }
0186 }
0187
0188 void WorkerManager::setupOnDemandSystem(EventTransitionInfo const& info) {
0189 unscheduled_.setEventTransitionInfo(info);
0190 }
0191
0192 }