Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-01-11 16:27:17

0001 #include "FWCore/Framework/interface/WorkerManager.h"
0002 #include "UnscheduledConfigurator.h"
0003 
0004 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0007 #include "FWCore/Utilities/interface/Algorithms.h"
0008 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 
0011 static const std::string kFilterType("EDFilter");
0012 static const std::string kProducerType("EDProducer");
0013 
0014 namespace edm {
0015   // -----------------------------
0016 
0017   WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
0018                                ExceptionToActionTable const& actions,
0019                                ModuleTypeResolverMaker const* typeResolverMaker)
0020       : workerReg_(areg, typeResolverMaker),
0021         actionTable_(&actions),
0022         allWorkers_(),
0023         unscheduled_(*areg),
0024         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0025 
0026   WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0027                                std::shared_ptr<ActivityRegistry> areg,
0028                                ExceptionToActionTable const& actions)
0029       : workerReg_(areg, modReg),
0030         actionTable_(&actions),
0031         allWorkers_(),
0032         unscheduled_(*areg),
0033         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0034 
0035   void WorkerManager::deleteModuleIfExists(std::string const& moduleLabel) {
0036     auto worker = workerReg_.get(moduleLabel);
0037     if (worker != nullptr) {
0038       auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
0039       allWorkers_.erase(eraseBeg, allWorkers_.end());
0040       unscheduled_.removeWorker(worker);
0041       workerReg_.deleteModule(moduleLabel);
0042     }
0043   }
0044 
0045   Worker* WorkerManager::getWorker(ParameterSet& pset,
0046                                    ProductRegistry& preg,
0047                                    PreallocationConfiguration const* prealloc,
0048                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0049                                    std::string const& label) {
0050     WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
0051     return workerReg_.getWorker(params, label);
0052   }
0053 
0054   void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset,
0055                                               ProductRegistry& preg,
0056                                               PreallocationConfiguration const* prealloc,
0057                                               std::shared_ptr<ProcessConfiguration const> processConfiguration,
0058                                               std::string label,
0059                                               std::set<std::string>& unscheduledLabels,
0060                                               std::vector<std::string>& shouldBeUsedLabels) {
0061     //Need to
0062     // 1) create worker
0063     // 2) if it is a WorkerT<EDProducer>, add it to our list
0064     auto modType = pset.getParameter<std::string>("@module_edm_type");
0065     if (modType == kProducerType || modType == kFilterType) {
0066       Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
0067       assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
0068       unscheduledLabels.insert(label);
0069       unscheduled_.addWorker(newWorker);
0070       //add to list so it gets reset each new event
0071       addToAllWorkers(newWorker);
0072     } else {
0073       shouldBeUsedLabels.push_back(label);
0074     }
0075   }
0076 
0077   void WorkerManager::endJob() {
0078     for (auto& worker : allWorkers_) {
0079       worker->endJob();
0080     }
0081   }
0082 
0083   void WorkerManager::endJob(ExceptionCollector& collector) {
0084     for (auto& worker : allWorkers_) {
0085       try {
0086         convertException::wrap([&]() { worker->endJob(); });
0087       } catch (cms::Exception const& ex) {
0088         collector.addException(ex);
0089       }
0090     }
0091   }
0092 
0093   void WorkerManager::beginJob(ProductRegistry const& iRegistry,
0094                                eventsetup::ESRecordsToProxyIndices const& iESIndices,
0095                                ProcessBlockHelperBase const& processBlockHelperBase) {
0096     auto const processBlockLookup = iRegistry.productLookup(InProcess);
0097     auto const runLookup = iRegistry.productLookup(InRun);
0098     auto const lumiLookup = iRegistry.productLookup(InLumi);
0099     auto const eventLookup = iRegistry.productLookup(InEvent);
0100     if (!allWorkers_.empty()) {
0101       auto const& processName = allWorkers_[0]->description()->processName();
0102       auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
0103       auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
0104       auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
0105       auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
0106       for (auto& worker : allWorkers_) {
0107         worker->updateLookup(InProcess, *processBlockLookup);
0108         worker->updateLookup(InRun, *runLookup);
0109         worker->updateLookup(InLumi, *lumiLookup);
0110         worker->updateLookup(InEvent, *eventLookup);
0111         worker->updateLookup(iESIndices);
0112         worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
0113         worker->resolvePutIndicies(InRun, runModuleToIndicies);
0114         worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
0115         worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
0116         worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
0117       }
0118 
0119       for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
0120     }
0121   }
0122 
0123   void WorkerManager::beginStream(StreamID iID, StreamContext& streamContext) {
0124     for (auto& worker : allWorkers_) {
0125       worker->beginStream(iID, streamContext);
0126     }
0127   }
0128 
0129   void WorkerManager::endStream(StreamID iID, StreamContext& streamContext) {
0130     for (auto& worker : allWorkers_) {
0131       worker->endStream(iID, streamContext);
0132     }
0133   }
0134 
0135   void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
0136 
0137   void WorkerManager::addToAllWorkers(Worker* w) {
0138     if (!search_all(allWorkers_, w)) {
0139       allWorkers_.push_back(w);
0140     }
0141   }
0142 
0143   void WorkerManager::setupResolvers(Principal& ep) {
0144     this->resetAll();
0145     if (&ep != lastSetupEventPrincipal_) {
0146       UnscheduledConfigurator config(allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
0147       ep.setupUnscheduled(config);
0148       lastSetupEventPrincipal_ = &ep;
0149     }
0150   }
0151 
0152   void WorkerManager::setupOnDemandSystem(EventTransitionInfo const& info) {
0153     unscheduled_.setEventTransitionInfo(info);
0154   }
0155 
0156 }  // namespace edm