File indexing completed on 2023-01-11 16:27:17
0001 #include "FWCore/Framework/interface/WorkerManager.h"
0002 #include "UnscheduledConfigurator.h"
0003
0004 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0007 #include "FWCore/Utilities/interface/Algorithms.h"
0008 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010
0011 static const std::string kFilterType("EDFilter");
0012 static const std::string kProducerType("EDProducer");
0013
0014 namespace edm {
0015
0016
0017 WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
0018 ExceptionToActionTable const& actions,
0019 ModuleTypeResolverMaker const* typeResolverMaker)
0020 : workerReg_(areg, typeResolverMaker),
0021 actionTable_(&actions),
0022 allWorkers_(),
0023 unscheduled_(*areg),
0024 lastSetupEventPrincipal_(nullptr) {}
0025
0026 WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0027 std::shared_ptr<ActivityRegistry> areg,
0028 ExceptionToActionTable const& actions)
0029 : workerReg_(areg, modReg),
0030 actionTable_(&actions),
0031 allWorkers_(),
0032 unscheduled_(*areg),
0033 lastSetupEventPrincipal_(nullptr) {}
0034
0035 void WorkerManager::deleteModuleIfExists(std::string const& moduleLabel) {
0036 auto worker = workerReg_.get(moduleLabel);
0037 if (worker != nullptr) {
0038 auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
0039 allWorkers_.erase(eraseBeg, allWorkers_.end());
0040 unscheduled_.removeWorker(worker);
0041 workerReg_.deleteModule(moduleLabel);
0042 }
0043 }
0044
0045 Worker* WorkerManager::getWorker(ParameterSet& pset,
0046 ProductRegistry& preg,
0047 PreallocationConfiguration const* prealloc,
0048 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0049 std::string const& label) {
0050 WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
0051 return workerReg_.getWorker(params, label);
0052 }
0053
0054 void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset,
0055 ProductRegistry& preg,
0056 PreallocationConfiguration const* prealloc,
0057 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0058 std::string label,
0059 std::set<std::string>& unscheduledLabels,
0060 std::vector<std::string>& shouldBeUsedLabels) {
0061
0062
0063
0064 auto modType = pset.getParameter<std::string>("@module_edm_type");
0065 if (modType == kProducerType || modType == kFilterType) {
0066 Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
0067 assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
0068 unscheduledLabels.insert(label);
0069 unscheduled_.addWorker(newWorker);
0070
0071 addToAllWorkers(newWorker);
0072 } else {
0073 shouldBeUsedLabels.push_back(label);
0074 }
0075 }
0076
0077 void WorkerManager::endJob() {
0078 for (auto& worker : allWorkers_) {
0079 worker->endJob();
0080 }
0081 }
0082
0083 void WorkerManager::endJob(ExceptionCollector& collector) {
0084 for (auto& worker : allWorkers_) {
0085 try {
0086 convertException::wrap([&]() { worker->endJob(); });
0087 } catch (cms::Exception const& ex) {
0088 collector.addException(ex);
0089 }
0090 }
0091 }
0092
0093 void WorkerManager::beginJob(ProductRegistry const& iRegistry,
0094 eventsetup::ESRecordsToProxyIndices const& iESIndices,
0095 ProcessBlockHelperBase const& processBlockHelperBase) {
0096 auto const processBlockLookup = iRegistry.productLookup(InProcess);
0097 auto const runLookup = iRegistry.productLookup(InRun);
0098 auto const lumiLookup = iRegistry.productLookup(InLumi);
0099 auto const eventLookup = iRegistry.productLookup(InEvent);
0100 if (!allWorkers_.empty()) {
0101 auto const& processName = allWorkers_[0]->description()->processName();
0102 auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
0103 auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
0104 auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
0105 auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
0106 for (auto& worker : allWorkers_) {
0107 worker->updateLookup(InProcess, *processBlockLookup);
0108 worker->updateLookup(InRun, *runLookup);
0109 worker->updateLookup(InLumi, *lumiLookup);
0110 worker->updateLookup(InEvent, *eventLookup);
0111 worker->updateLookup(iESIndices);
0112 worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
0113 worker->resolvePutIndicies(InRun, runModuleToIndicies);
0114 worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
0115 worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
0116 worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
0117 }
0118
0119 for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
0120 }
0121 }
0122
0123 void WorkerManager::beginStream(StreamID iID, StreamContext& streamContext) {
0124 for (auto& worker : allWorkers_) {
0125 worker->beginStream(iID, streamContext);
0126 }
0127 }
0128
0129 void WorkerManager::endStream(StreamID iID, StreamContext& streamContext) {
0130 for (auto& worker : allWorkers_) {
0131 worker->endStream(iID, streamContext);
0132 }
0133 }
0134
0135 void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
0136
0137 void WorkerManager::addToAllWorkers(Worker* w) {
0138 if (!search_all(allWorkers_, w)) {
0139 allWorkers_.push_back(w);
0140 }
0141 }
0142
0143 void WorkerManager::setupResolvers(Principal& ep) {
0144 this->resetAll();
0145 if (&ep != lastSetupEventPrincipal_) {
0146 UnscheduledConfigurator config(allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
0147 ep.setupUnscheduled(config);
0148 lastSetupEventPrincipal_ = &ep;
0149 }
0150 }
0151
0152 void WorkerManager::setupOnDemandSystem(EventTransitionInfo const& info) {
0153 unscheduled_.setEventTransitionInfo(info);
0154 }
0155
0156 }