Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:53

0001 #ifndef FWCore_Framework_WorkerManager_h
0002 #define FWCore_Framework_WorkerManager_h
0003 
0004 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0005 #include "FWCore/Framework/interface/Frameworkfwd.h"
0006 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0007 #include "FWCore/Framework/interface/WorkerRegistry.h"
0008 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0009 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0010 #include "FWCore/Utilities/interface/StreamID.h"
0011 
0012 #include <memory>
0013 #include <set>
0014 #include <string>
0015 #include <utility>
0016 #include <vector>
0017 
0018 namespace edm {
0019   class ExceptionCollector;
0020   class ExceptionToActionTable;
0021   class ModuleRegistry;
0022   class ModuleTypeResolverMaker;
0023   class PreallocationConfiguration;
0024   class Worker;
0025   namespace eventsetup {
0026     class ESRecordsToProductResolverIndices;
0027   }
0028 
0029   class WorkerManager {
0030   public:
0031     typedef std::vector<Worker*> AllWorkers;
0032 
0033     WorkerManager(std::shared_ptr<ActivityRegistry> actReg,
0034                   ExceptionToActionTable const& actions,
0035                   ModuleTypeResolverMaker const* typeResolverMaker);
0036     WorkerManager(WorkerManager&&) = default;
0037 
0038     WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0039                   std::shared_ptr<ActivityRegistry> actReg,
0040                   ExceptionToActionTable const& actions);
0041 
0042     void deleteModuleIfExists(std::string const& moduleLabel);
0043 
0044     void addToUnscheduledWorkers(ParameterSet& pset,
0045                                  ProductRegistry& preg,
0046                                  PreallocationConfiguration const* prealloc,
0047                                  std::shared_ptr<ProcessConfiguration const> processConfiguration,
0048                                  std::string label,
0049                                  std::set<std::string>& unscheduledLabels,
0050                                  std::vector<std::string>& shouldBeUsedLabels);
0051 
0052     template <typename T, typename U>
0053     void processOneOccurrenceAsync(WaitingTaskHolder,
0054                                    typename T::TransitionInfoType&,
0055                                    ServiceToken const&,
0056                                    StreamID,
0057                                    typename T::Context const* topContext,
0058                                    U const* context) noexcept;
0059 
0060     template <typename T>
0061     void processAccumulatorsAsync(WaitingTaskHolder,
0062                                   typename T::TransitionInfoType const&,
0063                                   ServiceToken const&,
0064                                   StreamID,
0065                                   ParentContext const&,
0066                                   typename T::Context const*);
0067 
0068     void setupResolvers(Principal& principal);
0069     void setupOnDemandSystem(EventTransitionInfo const&);
0070 
0071     void beginJob(ProductRegistry const& iRegistry,
0072                   eventsetup::ESRecordsToProductResolverIndices const&,
0073                   ProcessBlockHelperBase const&);
0074     void endJob();
0075     void endJob(ExceptionCollector& collector);
0076 
0077     void beginStream(StreamID iID, StreamContext& streamContext);
0078     void endStream(StreamID iID, StreamContext& streamContext);
0079 
0080     AllWorkers const& allWorkers() const { return allWorkers_; }
0081     AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
0082 
0083     void addToAllWorkers(Worker* w);
0084 
0085     ExceptionToActionTable const& actionTable() const { return *actionTable_; }
0086 
0087     Worker* getWorker(ParameterSet& pset,
0088                       ProductRegistry& preg,
0089                       PreallocationConfiguration const* prealloc,
0090                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0091                       std::string const& label);
0092 
0093     void resetAll();
0094 
0095   private:
0096     WorkerRegistry workerReg_;
0097     ExceptionToActionTable const* actionTable_;
0098     AllWorkers allWorkers_;
0099     UnscheduledCallProducer unscheduled_;
0100     void const* lastSetupEventPrincipal_;
0101   };
0102 
0103   template <typename T, typename U>
0104   void WorkerManager::processOneOccurrenceAsync(WaitingTaskHolder task,
0105                                                 typename T::TransitionInfoType& info,
0106                                                 ServiceToken const& token,
0107                                                 StreamID streamID,
0108                                                 typename T::Context const* topContext,
0109                                                 U const* context) noexcept {
0110     static_assert(!T::isEvent_);
0111 
0112     // Spawn them in reverse order. At least in the single threaded case that makes
0113     // them run in forward order (and more likely to with multiple threads).
0114     for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
0115       Worker* worker = *it;
0116 
0117       ParentContext parentContext(context);
0118 
0119       // We do not need to run prefetching here because this only handles
0120       // stream begin/end transitions for runs and lumis. There are no products
0121       // put into the runs or lumis in stream transitions, so there can be
0122       // no data dependencies which require prefetching. Prefetching is
0123       // needed for global transitions, but they are run elsewhere.
0124       // (One exception, the SecondaryEventProvider (used for mixing) sends
0125       // global begin/end run/lumi transitions through here. They shouldn't
0126       // need prefetching either and for some years nothing has been using
0127       // that part of the code anyway...)
0128       worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
0129     }
0130   }
0131 
0132   template <typename T>
0133   void WorkerManager::processAccumulatorsAsync(WaitingTaskHolder task,
0134                                                typename T::TransitionInfoType const& info,
0135                                                ServiceToken const& token,
0136                                                StreamID streamID,
0137                                                ParentContext const& parentContext,
0138                                                typename T::Context const* context) {
0139     unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
0140   }
0141 }  // namespace edm
0142 
0143 #endif