File indexing completed on 2024-06-04 04:34:55
0001 #include "FWCore/Framework/interface/WorkerManager.h"
0002 #include "UnscheduledConfigurator.h"
0003
0004 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0005 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0006 #include "FWCore/Framework/interface/maker/Worker.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/Utilities/interface/Algorithms.h"
0009 #include "FWCore/Utilities/interface/ConvertException.h"
0010 #include "FWCore/Utilities/interface/Exception.h"
0011 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0012
0013 #include <functional>
0014
0015 static const std::string kFilterType("EDFilter");
0016 static const std::string kProducerType("EDProducer");
0017
0018 namespace edm {
0019
0020
0021 WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
0022 ExceptionToActionTable const& actions,
0023 ModuleTypeResolverMaker const* typeResolverMaker)
0024 : workerReg_(areg, typeResolverMaker),
0025 actionTable_(&actions),
0026 allWorkers_(),
0027 unscheduled_(*areg),
0028 lastSetupEventPrincipal_(nullptr) {}
0029
0030 WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0031 std::shared_ptr<ActivityRegistry> areg,
0032 ExceptionToActionTable const& actions)
0033 : workerReg_(areg, modReg),
0034 actionTable_(&actions),
0035 allWorkers_(),
0036 unscheduled_(*areg),
0037 lastSetupEventPrincipal_(nullptr) {}
0038
0039 void WorkerManager::deleteModuleIfExists(std::string const& moduleLabel) {
0040 auto worker = workerReg_.get(moduleLabel);
0041 if (worker != nullptr) {
0042 auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
0043 allWorkers_.erase(eraseBeg, allWorkers_.end());
0044 unscheduled_.removeWorker(worker);
0045 workerReg_.deleteModule(moduleLabel);
0046 }
0047 }
0048
0049 Worker* WorkerManager::getWorker(ParameterSet& pset,
0050 ProductRegistry& preg,
0051 PreallocationConfiguration const* prealloc,
0052 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0053 std::string const& label) {
0054 WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
0055 return workerReg_.getWorker(params, label);
0056 }
0057
0058 void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset,
0059 ProductRegistry& preg,
0060 PreallocationConfiguration const* prealloc,
0061 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0062 std::string label,
0063 std::set<std::string>& unscheduledLabels,
0064 std::vector<std::string>& shouldBeUsedLabels) {
0065
0066
0067
0068 auto modType = pset.getParameter<std::string>("@module_edm_type");
0069 if (modType == kProducerType || modType == kFilterType) {
0070 Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
0071 assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
0072 unscheduledLabels.insert(label);
0073 unscheduled_.addWorker(newWorker);
0074
0075 addToAllWorkers(newWorker);
0076 } else {
0077 shouldBeUsedLabels.push_back(label);
0078 }
0079 }
0080
0081 void WorkerManager::endJob() {
0082 for (auto& worker : allWorkers_) {
0083 worker->endJob();
0084 }
0085 }
0086
0087 void WorkerManager::endJob(ExceptionCollector& collector) {
0088 for (auto& worker : allWorkers_) {
0089 try {
0090 convertException::wrap([&]() { worker->endJob(); });
0091 } catch (cms::Exception const& ex) {
0092 collector.addException(ex);
0093 }
0094 }
0095 }
0096
0097 void WorkerManager::beginJob(ProductRegistry const& iRegistry,
0098 eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0099 ProcessBlockHelperBase const& processBlockHelperBase) {
0100 auto const processBlockLookup = iRegistry.productLookup(InProcess);
0101 auto const runLookup = iRegistry.productLookup(InRun);
0102 auto const lumiLookup = iRegistry.productLookup(InLumi);
0103 auto const eventLookup = iRegistry.productLookup(InEvent);
0104 if (!allWorkers_.empty()) {
0105 auto const& processName = allWorkers_[0]->description()->processName();
0106 auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
0107 auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
0108 auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
0109 auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
0110 for (auto& worker : allWorkers_) {
0111 worker->updateLookup(InProcess, *processBlockLookup);
0112 worker->updateLookup(InRun, *runLookup);
0113 worker->updateLookup(InLumi, *lumiLookup);
0114 worker->updateLookup(InEvent, *eventLookup);
0115 worker->updateLookup(iESIndices);
0116 worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
0117 worker->resolvePutIndicies(InRun, runModuleToIndicies);
0118 worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
0119 worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
0120 worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
0121 }
0122
0123 for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
0124 }
0125 }
0126
0127 void WorkerManager::beginStream(StreamID iID, StreamContext& streamContext) {
0128 for (auto& worker : allWorkers_) {
0129 worker->beginStream(iID, streamContext);
0130 }
0131 }
0132
0133 void WorkerManager::endStream(StreamID iID, StreamContext& streamContext) {
0134 for (auto& worker : allWorkers_) {
0135 worker->endStream(iID, streamContext);
0136 }
0137 }
0138
0139 void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
0140
0141 void WorkerManager::addToAllWorkers(Worker* w) {
0142 if (!search_all(allWorkers_, w)) {
0143 allWorkers_.push_back(w);
0144 }
0145 }
0146
0147 void WorkerManager::setupResolvers(Principal& ep) {
0148 this->resetAll();
0149 if (&ep != lastSetupEventPrincipal_) {
0150 UnscheduledConfigurator config(allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
0151 ep.setupUnscheduled(config);
0152 lastSetupEventPrincipal_ = &ep;
0153 }
0154 }
0155
0156 void WorkerManager::setupOnDemandSystem(EventTransitionInfo const& info) {
0157 unscheduled_.setEventTransitionInfo(info);
0158 }
0159
0160 }