Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:55

0001 #include "FWCore/Framework/interface/WorkerManager.h"
0002 #include "UnscheduledConfigurator.h"
0003 
0004 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0005 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0006 #include "FWCore/Framework/interface/maker/Worker.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/Utilities/interface/Algorithms.h"
0009 #include "FWCore/Utilities/interface/ConvertException.h"
0010 #include "FWCore/Utilities/interface/Exception.h"
0011 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0012 
0013 #include <functional>
0014 
0015 static const std::string kFilterType("EDFilter");
0016 static const std::string kProducerType("EDProducer");
0017 
0018 namespace edm {
0019   // -----------------------------
0020 
0021   WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
0022                                ExceptionToActionTable const& actions,
0023                                ModuleTypeResolverMaker const* typeResolverMaker)
0024       : workerReg_(areg, typeResolverMaker),
0025         actionTable_(&actions),
0026         allWorkers_(),
0027         unscheduled_(*areg),
0028         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0029 
0030   WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0031                                std::shared_ptr<ActivityRegistry> areg,
0032                                ExceptionToActionTable const& actions)
0033       : workerReg_(areg, modReg),
0034         actionTable_(&actions),
0035         allWorkers_(),
0036         unscheduled_(*areg),
0037         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0038 
0039   void WorkerManager::deleteModuleIfExists(std::string const& moduleLabel) {
0040     auto worker = workerReg_.get(moduleLabel);
0041     if (worker != nullptr) {
0042       auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
0043       allWorkers_.erase(eraseBeg, allWorkers_.end());
0044       unscheduled_.removeWorker(worker);
0045       workerReg_.deleteModule(moduleLabel);
0046     }
0047   }
0048 
0049   Worker* WorkerManager::getWorker(ParameterSet& pset,
0050                                    ProductRegistry& preg,
0051                                    PreallocationConfiguration const* prealloc,
0052                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0053                                    std::string const& label) {
0054     WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
0055     return workerReg_.getWorker(params, label);
0056   }
0057 
0058   void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset,
0059                                               ProductRegistry& preg,
0060                                               PreallocationConfiguration const* prealloc,
0061                                               std::shared_ptr<ProcessConfiguration const> processConfiguration,
0062                                               std::string label,
0063                                               std::set<std::string>& unscheduledLabels,
0064                                               std::vector<std::string>& shouldBeUsedLabels) {
0065     //Need to
0066     // 1) create worker
0067     // 2) if it is a WorkerT<EDProducer>, add it to our list
0068     auto modType = pset.getParameter<std::string>("@module_edm_type");
0069     if (modType == kProducerType || modType == kFilterType) {
0070       Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
0071       assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
0072       unscheduledLabels.insert(label);
0073       unscheduled_.addWorker(newWorker);
0074       //add to list so it gets reset each new event
0075       addToAllWorkers(newWorker);
0076     } else {
0077       shouldBeUsedLabels.push_back(label);
0078     }
0079   }
0080 
0081   void WorkerManager::endJob() {
0082     for (auto& worker : allWorkers_) {
0083       worker->endJob();
0084     }
0085   }
0086 
0087   void WorkerManager::endJob(ExceptionCollector& collector) {
0088     for (auto& worker : allWorkers_) {
0089       try {
0090         convertException::wrap([&]() { worker->endJob(); });
0091       } catch (cms::Exception const& ex) {
0092         collector.addException(ex);
0093       }
0094     }
0095   }
0096 
0097   void WorkerManager::beginJob(ProductRegistry const& iRegistry,
0098                                eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0099                                ProcessBlockHelperBase const& processBlockHelperBase) {
0100     auto const processBlockLookup = iRegistry.productLookup(InProcess);
0101     auto const runLookup = iRegistry.productLookup(InRun);
0102     auto const lumiLookup = iRegistry.productLookup(InLumi);
0103     auto const eventLookup = iRegistry.productLookup(InEvent);
0104     if (!allWorkers_.empty()) {
0105       auto const& processName = allWorkers_[0]->description()->processName();
0106       auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
0107       auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
0108       auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
0109       auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
0110       for (auto& worker : allWorkers_) {
0111         worker->updateLookup(InProcess, *processBlockLookup);
0112         worker->updateLookup(InRun, *runLookup);
0113         worker->updateLookup(InLumi, *lumiLookup);
0114         worker->updateLookup(InEvent, *eventLookup);
0115         worker->updateLookup(iESIndices);
0116         worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
0117         worker->resolvePutIndicies(InRun, runModuleToIndicies);
0118         worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
0119         worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
0120         worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
0121       }
0122 
0123       for_all(allWorkers_, std::bind(&Worker::beginJob, std::placeholders::_1));
0124     }
0125   }
0126 
0127   void WorkerManager::beginStream(StreamID iID, StreamContext& streamContext) {
0128     for (auto& worker : allWorkers_) {
0129       worker->beginStream(iID, streamContext);
0130     }
0131   }
0132 
0133   void WorkerManager::endStream(StreamID iID, StreamContext& streamContext) {
0134     for (auto& worker : allWorkers_) {
0135       worker->endStream(iID, streamContext);
0136     }
0137   }
0138 
0139   void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
0140 
0141   void WorkerManager::addToAllWorkers(Worker* w) {
0142     if (!search_all(allWorkers_, w)) {
0143       allWorkers_.push_back(w);
0144     }
0145   }
0146 
0147   void WorkerManager::setupResolvers(Principal& ep) {
0148     this->resetAll();
0149     if (&ep != lastSetupEventPrincipal_) {
0150       UnscheduledConfigurator config(allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
0151       ep.setupUnscheduled(config);
0152       lastSetupEventPrincipal_ = &ep;
0153     }
0154   }
0155 
0156   void WorkerManager::setupOnDemandSystem(EventTransitionInfo const& info) {
0157     unscheduled_.setEventTransitionInfo(info);
0158   }
0159 
0160 }  // namespace edm