Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-07-19 01:43:52

0001 #include "FWCore/Framework/interface/WorkerManager.h"
0002 #include "UnscheduledConfigurator.h"
0003 
0004 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0007 #include "FWCore/Utilities/interface/Algorithms.h"
0008 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0009 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0010 
0011 static const std::string kFilterType("EDFilter");
0012 static const std::string kProducerType("EDProducer");
0013 
0014 namespace edm {
0015   // -----------------------------
0016 
0017   WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg, ExceptionToActionTable const& actions)
0018       : workerReg_(areg),
0019         actionTable_(&actions),
0020         allWorkers_(),
0021         unscheduled_(*areg),
0022         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0023 
0024   WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0025                                std::shared_ptr<ActivityRegistry> areg,
0026                                ExceptionToActionTable const& actions)
0027       : workerReg_(areg, modReg),
0028         actionTable_(&actions),
0029         allWorkers_(),
0030         unscheduled_(*areg),
0031         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0032 
0033   void WorkerManager::deleteModuleIfExists(std::string const& moduleLabel) {
0034     auto worker = workerReg_.get(moduleLabel);
0035     if (worker != nullptr) {
0036       auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
0037       allWorkers_.erase(eraseBeg, allWorkers_.end());
0038       unscheduled_.removeWorker(worker);
0039       workerReg_.deleteModule(moduleLabel);
0040     }
0041   }
0042 
0043   Worker* WorkerManager::getWorker(ParameterSet& pset,
0044                                    ProductRegistry& preg,
0045                                    PreallocationConfiguration const* prealloc,
0046                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0047                                    std::string const& label) {
0048     WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
0049     return workerReg_.getWorker(params, label);
0050   }
0051 
0052   void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset,
0053                                               ProductRegistry& preg,
0054                                               PreallocationConfiguration const* prealloc,
0055                                               std::shared_ptr<ProcessConfiguration> processConfiguration,
0056                                               std::string label,
0057                                               std::set<std::string>& unscheduledLabels,
0058                                               std::vector<std::string>& shouldBeUsedLabels) {
0059     //Need to
0060     // 1) create worker
0061     // 2) if it is a WorkerT<EDProducer>, add it to our list
0062     auto modType = pset.getParameter<std::string>("@module_edm_type");
0063     if (modType == kProducerType || modType == kFilterType) {
0064       Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
0065       assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
0066       unscheduledLabels.insert(label);
0067       unscheduled_.addWorker(newWorker);
0068       //add to list so it gets reset each new event
0069       addToAllWorkers(newWorker);
0070     } else {
0071       shouldBeUsedLabels.push_back(label);
0072     }
0073   }
0074 
0075   void WorkerManager::endJob() {
0076     for (auto& worker : allWorkers_) {
0077       worker->endJob();
0078     }
0079   }
0080 
0081   void WorkerManager::endJob(ExceptionCollector& collector) {
0082     for (auto& worker : allWorkers_) {
0083       try {
0084         convertException::wrap([&]() { worker->endJob(); });
0085       } catch (cms::Exception const& ex) {
0086         collector.addException(ex);
0087       }
0088     }
0089   }
0090 
0091   void WorkerManager::beginJob(ProductRegistry const& iRegistry,
0092                                eventsetup::ESRecordsToProxyIndices const& iESIndices,
0093                                ProcessBlockHelperBase const& processBlockHelperBase) {
0094     auto const processBlockLookup = iRegistry.productLookup(InProcess);
0095     auto const runLookup = iRegistry.productLookup(InRun);
0096     auto const lumiLookup = iRegistry.productLookup(InLumi);
0097     auto const eventLookup = iRegistry.productLookup(InEvent);
0098     if (!allWorkers_.empty()) {
0099       auto const& processName = allWorkers_[0]->description()->processName();
0100       auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
0101       auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
0102       auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
0103       auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
0104       for (auto& worker : allWorkers_) {
0105         worker->updateLookup(InProcess, *processBlockLookup);
0106         worker->updateLookup(InRun, *runLookup);
0107         worker->updateLookup(InLumi, *lumiLookup);
0108         worker->updateLookup(InEvent, *eventLookup);
0109         worker->updateLookup(iESIndices);
0110         worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
0111         worker->resolvePutIndicies(InRun, runModuleToIndicies);
0112         worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
0113         worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
0114         worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
0115       }
0116 
0117       for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
0118     }
0119   }
0120 
0121   void WorkerManager::beginStream(StreamID iID, StreamContext& streamContext) {
0122     for (auto& worker : allWorkers_) {
0123       worker->beginStream(iID, streamContext);
0124     }
0125   }
0126 
0127   void WorkerManager::endStream(StreamID iID, StreamContext& streamContext) {
0128     for (auto& worker : allWorkers_) {
0129       worker->endStream(iID, streamContext);
0130     }
0131   }
0132 
0133   void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
0134 
0135   void WorkerManager::addToAllWorkers(Worker* w) {
0136     if (!search_all(allWorkers_, w)) {
0137       allWorkers_.push_back(w);
0138     }
0139   }
0140 
0141   void WorkerManager::setupResolvers(Principal& ep) {
0142     this->resetAll();
0143     if (&ep != lastSetupEventPrincipal_) {
0144       UnscheduledConfigurator config(allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
0145       ep.setupUnscheduled(config);
0146       lastSetupEventPrincipal_ = &ep;
0147     }
0148   }
0149 
0150   void WorkerManager::setupOnDemandSystem(EventTransitionInfo const& info) {
0151     unscheduled_.setEventTransitionInfo(info);
0152   }
0153 
0154 }  // namespace edm