File indexing completed on 2025-02-14 03:16:31
0001 #ifndef FWCore_Framework_WorkerManager_h
0002 #define FWCore_Framework_WorkerManager_h
0003
0004 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0005 #include "FWCore/Framework/interface/Frameworkfwd.h"
0006 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0007 #include "FWCore/Framework/interface/WorkerRegistry.h"
0008 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0009 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0010 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0011 #include "FWCore/Utilities/interface/StreamID.h"
0012
0013 #include <memory>
0014 #include <mutex>
0015 #include <set>
0016 #include <string>
0017 #include <utility>
0018 #include <vector>
0019
0020 namespace edm {
0021 class ExceptionCollector;
0022 class ExceptionToActionTable;
0023 class ModuleRegistry;
0024 class ModuleTypeResolverMaker;
0025 class PreallocationConfiguration;
0026 class Worker;
0027 namespace eventsetup {
0028 class ESRecordsToProductResolverIndices;
0029 }
0030
0031 class WorkerManager {
0032 public:
0033 typedef std::vector<Worker*> AllWorkers;
0034
0035 WorkerManager(std::shared_ptr<ActivityRegistry> actReg,
0036 ExceptionToActionTable const& actions,
0037 ModuleTypeResolverMaker const* typeResolverMaker);
0038 WorkerManager(WorkerManager&&) = default;
0039
0040 WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0041 std::shared_ptr<ActivityRegistry> actReg,
0042 ExceptionToActionTable const& actions);
0043
0044 void deleteModuleIfExists(std::string const& moduleLabel);
0045
0046 void addToUnscheduledWorkers(ParameterSet& pset,
0047 SignallingProductRegistry& preg,
0048 PreallocationConfiguration const* prealloc,
0049 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0050 std::string label,
0051 std::set<std::string>& unscheduledLabels,
0052 std::vector<std::string>& shouldBeUsedLabels);
0053
0054 template <typename T, typename U>
0055 void processOneOccurrenceAsync(WaitingTaskHolder,
0056 typename T::TransitionInfoType&,
0057 ServiceToken const&,
0058 StreamID,
0059 typename T::Context const* topContext,
0060 U const* context) noexcept;
0061
0062 template <typename T>
0063 void processAccumulatorsAsync(WaitingTaskHolder,
0064 typename T::TransitionInfoType const&,
0065 ServiceToken const&,
0066 StreamID,
0067 ParentContext const&,
0068 typename T::Context const*);
0069
0070 void setupResolvers(Principal& principal);
0071 void setupOnDemandSystem(EventTransitionInfo const&);
0072
0073 void beginJob(ProductRegistry const& iRegistry,
0074 eventsetup::ESRecordsToProductResolverIndices const&,
0075 ProcessBlockHelperBase const&,
0076 GlobalContext const&);
0077 void endJob(ExceptionCollector&, GlobalContext const&);
0078
0079 void beginStream(StreamID, StreamContext const&);
0080 void endStream(StreamID, StreamContext const&, ExceptionCollector&, std::mutex& collectorMutex) noexcept;
0081
0082 AllWorkers const& allWorkers() const { return allWorkers_; }
0083 AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
0084
0085 void addToAllWorkers(Worker* w);
0086
0087 ExceptionToActionTable const& actionTable() const { return *actionTable_; }
0088
0089 Worker* getWorker(ParameterSet& pset,
0090 SignallingProductRegistry& preg,
0091 PreallocationConfiguration const* prealloc,
0092 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0093 std::string const& label);
0094
0095 void resetAll();
0096
0097 void releaseMemoryPostLookupSignal();
0098
0099 private:
0100 WorkerRegistry workerReg_;
0101 ExceptionToActionTable const* actionTable_;
0102 AllWorkers allWorkers_;
0103 UnscheduledCallProducer unscheduled_;
0104 void const* lastSetupEventPrincipal_;
0105 };
0106
0107 template <typename T, typename U>
0108 void WorkerManager::processOneOccurrenceAsync(WaitingTaskHolder task,
0109 typename T::TransitionInfoType& info,
0110 ServiceToken const& token,
0111 StreamID streamID,
0112 typename T::Context const* topContext,
0113 U const* context) noexcept {
0114 static_assert(!T::isEvent_);
0115
0116
0117
0118 for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
0119 Worker* worker = *it;
0120
0121 ParentContext parentContext(context);
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132 worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
0133 }
0134 }
0135
0136 template <typename T>
0137 void WorkerManager::processAccumulatorsAsync(WaitingTaskHolder task,
0138 typename T::TransitionInfoType const& info,
0139 ServiceToken const& token,
0140 StreamID streamID,
0141 ParentContext const& parentContext,
0142 typename T::Context const* context) {
0143 unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
0144 }
0145 }
0146
0147 #endif