File indexing completed on 2025-06-29 22:58:02
0001 #ifndef FWCore_Framework_WorkerManager_h
0002 #define FWCore_Framework_WorkerManager_h
0003
0004 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0005 #include "FWCore/Framework/interface/Frameworkfwd.h"
0006 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0007 #include "FWCore/Framework/interface/WorkerRegistry.h"
0008 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0009 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0010 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0011 #include "FWCore/Utilities/interface/StreamID.h"
0012
0013 #include <memory>
0014 #include <mutex>
0015 #include <set>
0016 #include <string>
0017 #include <utility>
0018 #include <vector>
0019
0020 namespace edm {
0021 class ExceptionCollector;
0022 class ExceptionToActionTable;
0023 class ModuleRegistry;
0024 class ModuleTypeResolverMaker;
0025 class PreallocationConfiguration;
0026 class Worker;
0027 namespace eventsetup {
0028 class ESRecordsToProductResolverIndices;
0029 }
0030
0031 class WorkerManager {
0032 public:
0033 typedef std::vector<Worker*> AllWorkers;
0034
0035 WorkerManager(WorkerManager&&) = default;
0036
0037 WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0038 std::shared_ptr<ActivityRegistry> actReg,
0039 ExceptionToActionTable const& actions);
0040
0041 void deleteModuleIfExists(std::string const& moduleLabel);
0042
0043 void addToUnscheduledWorkers(ParameterSet& pset,
0044 SignallingProductRegistryFiller& preg,
0045 PreallocationConfiguration const* prealloc,
0046 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0047 std::string label,
0048 std::set<std::string>& unscheduledLabels,
0049 std::vector<std::string>& shouldBeUsedLabels);
0050
0051 template <typename T, typename U>
0052 void processOneOccurrenceAsync(WaitingTaskHolder,
0053 typename T::TransitionInfoType&,
0054 ServiceToken const&,
0055 StreamID,
0056 typename T::Context const* topContext,
0057 U const* context) noexcept;
0058
0059 template <typename T>
0060 void processAccumulatorsAsync(WaitingTaskHolder,
0061 typename T::TransitionInfoType const&,
0062 ServiceToken const&,
0063 StreamID,
0064 ParentContext const&,
0065 typename T::Context const*);
0066
0067 void setupResolvers(Principal& principal);
0068 void setupOnDemandSystem(EventTransitionInfo const&);
0069
0070 AllWorkers const& allWorkers() const { return allWorkers_; }
0071 AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
0072
0073 void addToAllWorkers(Worker* w);
0074
0075 ExceptionToActionTable const& actionTable() const { return *actionTable_; }
0076
0077 Worker* getWorker(ParameterSet& pset,
0078 SignallingProductRegistryFiller& preg,
0079 PreallocationConfiguration const* prealloc,
0080 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0081 std::string const& label,
0082 bool addToAllWorkers = true);
0083
0084 template <typename T>
0085 Worker* getWorkerForModule(T const& module) {
0086 auto* worker = getWorkerForExistingModule(module.moduleDescription().moduleLabel());
0087 assert(worker != nullptr);
0088 assert(worker->matchesBaseClassPointer(static_cast<typename T::ModuleType const*>(&module)));
0089 return worker;
0090 }
0091 void resetAll();
0092
0093 private:
0094 Worker* getWorkerForExistingModule(std::string const& label);
0095
0096 WorkerRegistry workerReg_;
0097 ExceptionToActionTable const* actionTable_;
0098 AllWorkers allWorkers_;
0099 UnscheduledCallProducer unscheduled_;
0100 void const* lastSetupEventPrincipal_;
0101 };
0102
0103 template <typename T, typename U>
0104 void WorkerManager::processOneOccurrenceAsync(WaitingTaskHolder task,
0105 typename T::TransitionInfoType& info,
0106 ServiceToken const& token,
0107 StreamID streamID,
0108 typename T::Context const* topContext,
0109 U const* context) noexcept {
0110 static_assert(!T::isEvent_);
0111
0112
0113
0114 for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
0115 Worker* worker = *it;
0116
0117 ParentContext parentContext(context);
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128 worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
0129 }
0130 }
0131
0132 template <typename T>
0133 void WorkerManager::processAccumulatorsAsync(WaitingTaskHolder task,
0134 typename T::TransitionInfoType const& info,
0135 ServiceToken const& token,
0136 StreamID streamID,
0137 ParentContext const& parentContext,
0138 typename T::Context const* context) {
0139 unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
0140 }
0141 }
0142
0143 #endif