Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-14 03:16:31

0001 #ifndef FWCore_Framework_WorkerManager_h
0002 #define FWCore_Framework_WorkerManager_h
0003 
0004 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0005 #include "FWCore/Framework/interface/Frameworkfwd.h"
0006 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0007 #include "FWCore/Framework/interface/WorkerRegistry.h"
0008 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0009 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0010 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0011 #include "FWCore/Utilities/interface/StreamID.h"
0012 
0013 #include <memory>
0014 #include <mutex>
0015 #include <set>
0016 #include <string>
0017 #include <utility>
0018 #include <vector>
0019 
0020 namespace edm {
0021   class ExceptionCollector;
0022   class ExceptionToActionTable;
0023   class ModuleRegistry;
0024   class ModuleTypeResolverMaker;
0025   class PreallocationConfiguration;
0026   class Worker;
0027   namespace eventsetup {
0028     class ESRecordsToProductResolverIndices;
0029   }
0030 
0031   class WorkerManager {
0032   public:
0033     typedef std::vector<Worker*> AllWorkers;
0034 
0035     WorkerManager(std::shared_ptr<ActivityRegistry> actReg,
0036                   ExceptionToActionTable const& actions,
0037                   ModuleTypeResolverMaker const* typeResolverMaker);
0038     WorkerManager(WorkerManager&&) = default;
0039 
0040     WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0041                   std::shared_ptr<ActivityRegistry> actReg,
0042                   ExceptionToActionTable const& actions);
0043 
0044     void deleteModuleIfExists(std::string const& moduleLabel);
0045 
0046     void addToUnscheduledWorkers(ParameterSet& pset,
0047                                  SignallingProductRegistry& preg,
0048                                  PreallocationConfiguration const* prealloc,
0049                                  std::shared_ptr<ProcessConfiguration const> processConfiguration,
0050                                  std::string label,
0051                                  std::set<std::string>& unscheduledLabels,
0052                                  std::vector<std::string>& shouldBeUsedLabels);
0053 
0054     template <typename T, typename U>
0055     void processOneOccurrenceAsync(WaitingTaskHolder,
0056                                    typename T::TransitionInfoType&,
0057                                    ServiceToken const&,
0058                                    StreamID,
0059                                    typename T::Context const* topContext,
0060                                    U const* context) noexcept;
0061 
0062     template <typename T>
0063     void processAccumulatorsAsync(WaitingTaskHolder,
0064                                   typename T::TransitionInfoType const&,
0065                                   ServiceToken const&,
0066                                   StreamID,
0067                                   ParentContext const&,
0068                                   typename T::Context const*);
0069 
0070     void setupResolvers(Principal& principal);
0071     void setupOnDemandSystem(EventTransitionInfo const&);
0072 
0073     void beginJob(ProductRegistry const& iRegistry,
0074                   eventsetup::ESRecordsToProductResolverIndices const&,
0075                   ProcessBlockHelperBase const&,
0076                   GlobalContext const&);
0077     void endJob(ExceptionCollector&, GlobalContext const&);
0078 
0079     void beginStream(StreamID, StreamContext const&);
0080     void endStream(StreamID, StreamContext const&, ExceptionCollector&, std::mutex& collectorMutex) noexcept;
0081 
0082     AllWorkers const& allWorkers() const { return allWorkers_; }
0083     AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
0084 
0085     void addToAllWorkers(Worker* w);
0086 
0087     ExceptionToActionTable const& actionTable() const { return *actionTable_; }
0088 
0089     Worker* getWorker(ParameterSet& pset,
0090                       SignallingProductRegistry& preg,
0091                       PreallocationConfiguration const* prealloc,
0092                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0093                       std::string const& label);
0094 
0095     void resetAll();
0096 
0097     void releaseMemoryPostLookupSignal();
0098 
0099   private:
0100     WorkerRegistry workerReg_;
0101     ExceptionToActionTable const* actionTable_;
0102     AllWorkers allWorkers_;
0103     UnscheduledCallProducer unscheduled_;
0104     void const* lastSetupEventPrincipal_;
0105   };
0106 
0107   template <typename T, typename U>
0108   void WorkerManager::processOneOccurrenceAsync(WaitingTaskHolder task,
0109                                                 typename T::TransitionInfoType& info,
0110                                                 ServiceToken const& token,
0111                                                 StreamID streamID,
0112                                                 typename T::Context const* topContext,
0113                                                 U const* context) noexcept {
0114     static_assert(!T::isEvent_);
0115 
0116     // Spawn them in reverse order. At least in the single threaded case that makes
0117     // them run in forward order (and more likely to with multiple threads).
0118     for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
0119       Worker* worker = *it;
0120 
0121       ParentContext parentContext(context);
0122 
0123       // We do not need to run prefetching here because this only handles
0124       // stream begin/end transitions for runs and lumis. There are no products
0125       // put into the runs or lumis in stream transitions, so there can be
0126       // no data dependencies which require prefetching. Prefetching is
0127       // needed for global transitions, but they are run elsewhere.
0128       // (One exception, the SecondaryEventProvider (used for mixing) sends
0129       // global begin/end run/lumi transitions through here. They shouldn't
0130       // need prefetching either and for some years nothing has been using
0131       // that part of the code anyway...)
0132       worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
0133     }
0134   }
0135 
0136   template <typename T>
0137   void WorkerManager::processAccumulatorsAsync(WaitingTaskHolder task,
0138                                                typename T::TransitionInfoType const& info,
0139                                                ServiceToken const& token,
0140                                                StreamID streamID,
0141                                                ParentContext const& parentContext,
0142                                                typename T::Context const* context) {
0143     unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
0144   }
0145 }  // namespace edm
0146 
0147 #endif