Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:02

0001 #ifndef FWCore_Framework_WorkerManager_h
0002 #define FWCore_Framework_WorkerManager_h
0003 
0004 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0005 #include "FWCore/Framework/interface/Frameworkfwd.h"
0006 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0007 #include "FWCore/Framework/interface/WorkerRegistry.h"
0008 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0009 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0010 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0011 #include "FWCore/Utilities/interface/StreamID.h"
0012 
0013 #include <memory>
0014 #include <mutex>
0015 #include <set>
0016 #include <string>
0017 #include <utility>
0018 #include <vector>
0019 
0020 namespace edm {
0021   class ExceptionCollector;
0022   class ExceptionToActionTable;
0023   class ModuleRegistry;
0024   class ModuleTypeResolverMaker;
0025   class PreallocationConfiguration;
0026   class Worker;
0027   namespace eventsetup {
0028     class ESRecordsToProductResolverIndices;
0029   }
0030 
0031   class WorkerManager {
0032   public:
0033     typedef std::vector<Worker*> AllWorkers;
0034 
0035     WorkerManager(WorkerManager&&) = default;
0036 
0037     WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0038                   std::shared_ptr<ActivityRegistry> actReg,
0039                   ExceptionToActionTable const& actions);
0040 
0041     void deleteModuleIfExists(std::string const& moduleLabel);
0042 
0043     void addToUnscheduledWorkers(ParameterSet& pset,
0044                                  SignallingProductRegistryFiller& preg,
0045                                  PreallocationConfiguration const* prealloc,
0046                                  std::shared_ptr<ProcessConfiguration const> processConfiguration,
0047                                  std::string label,
0048                                  std::set<std::string>& unscheduledLabels,
0049                                  std::vector<std::string>& shouldBeUsedLabels);
0050 
0051     template <typename T, typename U>
0052     void processOneOccurrenceAsync(WaitingTaskHolder,
0053                                    typename T::TransitionInfoType&,
0054                                    ServiceToken const&,
0055                                    StreamID,
0056                                    typename T::Context const* topContext,
0057                                    U const* context) noexcept;
0058 
0059     template <typename T>
0060     void processAccumulatorsAsync(WaitingTaskHolder,
0061                                   typename T::TransitionInfoType const&,
0062                                   ServiceToken const&,
0063                                   StreamID,
0064                                   ParentContext const&,
0065                                   typename T::Context const*);
0066 
0067     void setupResolvers(Principal& principal);
0068     void setupOnDemandSystem(EventTransitionInfo const&);
0069 
0070     AllWorkers const& allWorkers() const { return allWorkers_; }
0071     AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
0072 
0073     void addToAllWorkers(Worker* w);
0074 
0075     ExceptionToActionTable const& actionTable() const { return *actionTable_; }
0076 
0077     Worker* getWorker(ParameterSet& pset,
0078                       SignallingProductRegistryFiller& preg,
0079                       PreallocationConfiguration const* prealloc,
0080                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0081                       std::string const& label,
0082                       bool addToAllWorkers = true);
0083 
0084     template <typename T>
0085     Worker* getWorkerForModule(T const& module) {
0086       auto* worker = getWorkerForExistingModule(module.moduleDescription().moduleLabel());
0087       assert(worker != nullptr);
0088       assert(worker->matchesBaseClassPointer(static_cast<typename T::ModuleType const*>(&module)));
0089       return worker;
0090     }
0091     void resetAll();
0092 
0093   private:
0094     Worker* getWorkerForExistingModule(std::string const& label);
0095 
0096     WorkerRegistry workerReg_;
0097     ExceptionToActionTable const* actionTable_;
0098     AllWorkers allWorkers_;
0099     UnscheduledCallProducer unscheduled_;
0100     void const* lastSetupEventPrincipal_;
0101   };
0102 
0103   template <typename T, typename U>
0104   void WorkerManager::processOneOccurrenceAsync(WaitingTaskHolder task,
0105                                                 typename T::TransitionInfoType& info,
0106                                                 ServiceToken const& token,
0107                                                 StreamID streamID,
0108                                                 typename T::Context const* topContext,
0109                                                 U const* context) noexcept {
0110     static_assert(!T::isEvent_);
0111 
0112     // Spawn them in reverse order. At least in the single threaded case that makes
0113     // them run in forward order (and more likely to with multiple threads).
0114     for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
0115       Worker* worker = *it;
0116 
0117       ParentContext parentContext(context);
0118 
0119       // We do not need to run prefetching here because this only handles
0120       // stream begin/end transitions for runs and lumis. There are no products
0121       // put into the runs or lumis in stream transitions, so there can be
0122       // no data dependencies which require prefetching. Prefetching is
0123       // needed for global transitions, but they are run elsewhere.
0124       // (One exception, the SecondaryEventProvider (used for mixing) sends
0125       // global begin/end run/lumi transitions through here. They shouldn't
0126       // need prefetching either and for some years nothing has been using
0127       // that part of the code anyway...)
0128       worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
0129     }
0130   }
0131 
0132   template <typename T>
0133   void WorkerManager::processAccumulatorsAsync(WaitingTaskHolder task,
0134                                                typename T::TransitionInfoType const& info,
0135                                                ServiceToken const& token,
0136                                                StreamID streamID,
0137                                                ParentContext const& parentContext,
0138                                                typename T::Context const* context) {
0139     unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
0140   }
0141 }  // namespace edm
0142 
0143 #endif