Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-13 02:31:54

0001 #include "FWCore/Framework/interface/WorkerManager.h"
0002 #include "UnscheduledConfigurator.h"
0003 
0004 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0005 #include "DataFormats/Provenance/interface/ProductResolverIndexHelper.h"
0006 #include "FWCore/Framework/interface/maker/Worker.h"
0007 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0008 #include "FWCore/Utilities/interface/Algorithms.h"
0009 #include "FWCore/Utilities/interface/ConvertException.h"
0010 #include "FWCore/Utilities/interface/Exception.h"
0011 #include "FWCore/Utilities/interface/ExceptionCollector.h"
0012 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0013 
0014 #include <exception>
0015 #include <functional>
0016 
0017 static const std::string kFilterType("EDFilter");
0018 static const std::string kProducerType("EDProducer");
0019 
0020 namespace edm {
0021   // -----------------------------
0022 
0023   WorkerManager::WorkerManager(std::shared_ptr<ActivityRegistry> areg,
0024                                ExceptionToActionTable const& actions,
0025                                ModuleTypeResolverMaker const* typeResolverMaker)
0026       : workerReg_(areg, typeResolverMaker),
0027         actionTable_(&actions),
0028         allWorkers_(),
0029         unscheduled_(*areg),
0030         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0031 
0032   WorkerManager::WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0033                                std::shared_ptr<ActivityRegistry> areg,
0034                                ExceptionToActionTable const& actions)
0035       : workerReg_(areg, modReg),
0036         actionTable_(&actions),
0037         allWorkers_(),
0038         unscheduled_(*areg),
0039         lastSetupEventPrincipal_(nullptr) {}  // WorkerManager::WorkerManager
0040 
0041   void WorkerManager::deleteModuleIfExists(std::string const& moduleLabel) {
0042     auto worker = workerReg_.get(moduleLabel);
0043     if (worker != nullptr) {
0044       auto eraseBeg = std::remove(allWorkers_.begin(), allWorkers_.end(), worker);
0045       allWorkers_.erase(eraseBeg, allWorkers_.end());
0046       unscheduled_.removeWorker(worker);
0047       workerReg_.deleteModule(moduleLabel);
0048     }
0049   }
0050 
0051   Worker* WorkerManager::getWorker(ParameterSet& pset,
0052                                    SignallingProductRegistryFiller& preg,
0053                                    PreallocationConfiguration const* prealloc,
0054                                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0055                                    std::string const& label) {
0056     WorkerParams params(&pset, preg, prealloc, processConfiguration, *actionTable_);
0057     return workerReg_.getWorker(params, label);
0058   }
0059 
0060   void WorkerManager::addToUnscheduledWorkers(ParameterSet& pset,
0061                                               SignallingProductRegistryFiller& preg,
0062                                               PreallocationConfiguration const* prealloc,
0063                                               std::shared_ptr<ProcessConfiguration const> processConfiguration,
0064                                               std::string label,
0065                                               std::set<std::string>& unscheduledLabels,
0066                                               std::vector<std::string>& shouldBeUsedLabels) {
0067     //Need to
0068     // 1) create worker
0069     // 2) if it is a WorkerT<EDProducer>, add it to our list
0070     auto modType = pset.getParameter<std::string>("@module_edm_type");
0071     if (modType == kProducerType || modType == kFilterType) {
0072       Worker* newWorker = getWorker(pset, preg, prealloc, processConfiguration, label);
0073       assert(newWorker->moduleType() == Worker::kProducer || newWorker->moduleType() == Worker::kFilter);
0074       unscheduledLabels.insert(label);
0075       unscheduled_.addWorker(newWorker);
0076       //add to list so it gets reset each new event
0077       addToAllWorkers(newWorker);
0078     } else {
0079       shouldBeUsedLabels.push_back(label);
0080     }
0081   }
0082 
0083   void WorkerManager::releaseMemoryPostLookupSignal() {
0084     for (auto& worker : allWorkers_) {
0085       worker->releaseMemoryPostLookupSignal();
0086     }
0087   }
0088 
0089   void WorkerManager::beginJob(ProductRegistry const& iRegistry,
0090                                eventsetup::ESRecordsToProductResolverIndices const& iESIndices,
0091                                ProcessBlockHelperBase const& processBlockHelperBase,
0092                                GlobalContext const& globalContext) {
0093     std::exception_ptr exceptionPtr;
0094     CMS_SA_ALLOW try {
0095       auto const processBlockLookup = iRegistry.productLookup(InProcess);
0096       auto const runLookup = iRegistry.productLookup(InRun);
0097       auto const lumiLookup = iRegistry.productLookup(InLumi);
0098       auto const eventLookup = iRegistry.productLookup(InEvent);
0099       if (!allWorkers_.empty()) {
0100         auto const& processName = allWorkers_[0]->description()->processName();
0101         auto processBlockModuleToIndicies = processBlockLookup->indiciesForModulesInProcess(processName);
0102         auto runModuleToIndicies = runLookup->indiciesForModulesInProcess(processName);
0103         auto lumiModuleToIndicies = lumiLookup->indiciesForModulesInProcess(processName);
0104         auto eventModuleToIndicies = eventLookup->indiciesForModulesInProcess(processName);
0105         for (auto& worker : allWorkers_) {
0106           worker->updateLookup(InProcess, *processBlockLookup);
0107           worker->updateLookup(InRun, *runLookup);
0108           worker->updateLookup(InLumi, *lumiLookup);
0109           worker->updateLookup(InEvent, *eventLookup);
0110           worker->updateLookup(iESIndices);
0111           worker->resolvePutIndicies(InProcess, processBlockModuleToIndicies);
0112           worker->resolvePutIndicies(InRun, runModuleToIndicies);
0113           worker->resolvePutIndicies(InLumi, lumiModuleToIndicies);
0114           worker->resolvePutIndicies(InEvent, eventModuleToIndicies);
0115           worker->selectInputProcessBlocks(iRegistry, processBlockHelperBase);
0116         }
0117       }
0118     } catch (...) {
0119       exceptionPtr = std::current_exception();
0120     }
0121 
0122     for (auto& worker : allWorkers_) {
0123       CMS_SA_ALLOW try { worker->beginJob(globalContext); } catch (...) {
0124         if (!exceptionPtr) {
0125           exceptionPtr = std::current_exception();
0126         }
0127       }
0128     }
0129     if (exceptionPtr) {
0130       std::rethrow_exception(exceptionPtr);
0131     }
0132   }
0133 
0134   void WorkerManager::endJob(ExceptionCollector& collector, GlobalContext const& globalContext) {
0135     for (auto& worker : allWorkers_) {
0136       try {
0137         convertException::wrap([&worker, &globalContext]() { worker->endJob(globalContext); });
0138       } catch (cms::Exception const& ex) {
0139         collector.addException(ex);
0140       }
0141     }
0142   }
0143 
0144   void WorkerManager::beginStream(StreamID streamID, StreamContext const& streamContext) {
0145     std::exception_ptr exceptionPtr;
0146     for (auto& worker : allWorkers_) {
0147       CMS_SA_ALLOW try { worker->beginStream(streamID, streamContext); } catch (...) {
0148         if (!exceptionPtr) {
0149           exceptionPtr = std::current_exception();
0150         }
0151       }
0152     }
0153     if (exceptionPtr) {
0154       std::rethrow_exception(exceptionPtr);
0155     }
0156   }
0157 
0158   void WorkerManager::endStream(StreamID streamID,
0159                                 StreamContext const& streamContext,
0160                                 ExceptionCollector& collector,
0161                                 std::mutex& collectorMutex) noexcept {
0162     for (auto& worker : allWorkers_) {
0163       CMS_SA_ALLOW try { worker->endStream(streamID, streamContext); } catch (...) {
0164         std::exception_ptr exceptionPtr = std::current_exception();
0165         std::lock_guard<std::mutex> collectorLock(collectorMutex);
0166         collector.call([&exceptionPtr]() { std::rethrow_exception(exceptionPtr); });
0167       }
0168     }
0169   }
0170 
0171   void WorkerManager::resetAll() { for_all(allWorkers_, std::bind(&Worker::reset, std::placeholders::_1)); }
0172 
0173   void WorkerManager::addToAllWorkers(Worker* w) {
0174     if (!search_all(allWorkers_, w)) {
0175       allWorkers_.push_back(w);
0176     }
0177   }
0178 
0179   void WorkerManager::setupResolvers(Principal& ep) {
0180     this->resetAll();
0181     if (&ep != lastSetupEventPrincipal_) {
0182       UnscheduledConfigurator config(allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
0183       ep.setupUnscheduled(config);
0184       lastSetupEventPrincipal_ = &ep;
0185     }
0186   }
0187 
0188   void WorkerManager::setupOnDemandSystem(EventTransitionInfo const& info) {
0189     unscheduled_.setEventTransitionInfo(info);
0190   }
0191 
0192 }  // namespace edm