File indexing completed on 2024-06-04 04:34:53
0001 #ifndef FWCore_Framework_WorkerManager_h
0002 #define FWCore_Framework_WorkerManager_h
0003
0004 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0005 #include "FWCore/Framework/interface/Frameworkfwd.h"
0006 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0007 #include "FWCore/Framework/interface/WorkerRegistry.h"
0008 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0009 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0010 #include "FWCore/Utilities/interface/StreamID.h"
0011
0012 #include <memory>
0013 #include <set>
0014 #include <string>
0015 #include <utility>
0016 #include <vector>
0017
0018 namespace edm {
0019 class ExceptionCollector;
0020 class ExceptionToActionTable;
0021 class ModuleRegistry;
0022 class ModuleTypeResolverMaker;
0023 class PreallocationConfiguration;
0024 class Worker;
0025 namespace eventsetup {
0026 class ESRecordsToProductResolverIndices;
0027 }
0028
0029 class WorkerManager {
0030 public:
0031 typedef std::vector<Worker*> AllWorkers;
0032
0033 WorkerManager(std::shared_ptr<ActivityRegistry> actReg,
0034 ExceptionToActionTable const& actions,
0035 ModuleTypeResolverMaker const* typeResolverMaker);
0036 WorkerManager(WorkerManager&&) = default;
0037
0038 WorkerManager(std::shared_ptr<ModuleRegistry> modReg,
0039 std::shared_ptr<ActivityRegistry> actReg,
0040 ExceptionToActionTable const& actions);
0041
0042 void deleteModuleIfExists(std::string const& moduleLabel);
0043
0044 void addToUnscheduledWorkers(ParameterSet& pset,
0045 ProductRegistry& preg,
0046 PreallocationConfiguration const* prealloc,
0047 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0048 std::string label,
0049 std::set<std::string>& unscheduledLabels,
0050 std::vector<std::string>& shouldBeUsedLabels);
0051
0052 template <typename T, typename U>
0053 void processOneOccurrenceAsync(WaitingTaskHolder,
0054 typename T::TransitionInfoType&,
0055 ServiceToken const&,
0056 StreamID,
0057 typename T::Context const* topContext,
0058 U const* context) noexcept;
0059
0060 template <typename T>
0061 void processAccumulatorsAsync(WaitingTaskHolder,
0062 typename T::TransitionInfoType const&,
0063 ServiceToken const&,
0064 StreamID,
0065 ParentContext const&,
0066 typename T::Context const*);
0067
0068 void setupResolvers(Principal& principal);
0069 void setupOnDemandSystem(EventTransitionInfo const&);
0070
0071 void beginJob(ProductRegistry const& iRegistry,
0072 eventsetup::ESRecordsToProductResolverIndices const&,
0073 ProcessBlockHelperBase const&);
0074 void endJob();
0075 void endJob(ExceptionCollector& collector);
0076
0077 void beginStream(StreamID iID, StreamContext& streamContext);
0078 void endStream(StreamID iID, StreamContext& streamContext);
0079
0080 AllWorkers const& allWorkers() const { return allWorkers_; }
0081 AllWorkers const& unscheduledWorkers() const { return unscheduled_.workers(); }
0082
0083 void addToAllWorkers(Worker* w);
0084
0085 ExceptionToActionTable const& actionTable() const { return *actionTable_; }
0086
0087 Worker* getWorker(ParameterSet& pset,
0088 ProductRegistry& preg,
0089 PreallocationConfiguration const* prealloc,
0090 std::shared_ptr<ProcessConfiguration const> processConfiguration,
0091 std::string const& label);
0092
0093 void resetAll();
0094
0095 private:
0096 WorkerRegistry workerReg_;
0097 ExceptionToActionTable const* actionTable_;
0098 AllWorkers allWorkers_;
0099 UnscheduledCallProducer unscheduled_;
0100 void const* lastSetupEventPrincipal_;
0101 };
0102
0103 template <typename T, typename U>
0104 void WorkerManager::processOneOccurrenceAsync(WaitingTaskHolder task,
0105 typename T::TransitionInfoType& info,
0106 ServiceToken const& token,
0107 StreamID streamID,
0108 typename T::Context const* topContext,
0109 U const* context) noexcept {
0110 static_assert(!T::isEvent_);
0111
0112
0113
0114 for (auto it = allWorkers_.rbegin(), itEnd = allWorkers_.rend(); it != itEnd; ++it) {
0115 Worker* worker = *it;
0116
0117 ParentContext parentContext(context);
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128 worker->doWorkNoPrefetchingAsync<T>(task, info, token, streamID, parentContext, topContext);
0129 }
0130 }
0131
0132 template <typename T>
0133 void WorkerManager::processAccumulatorsAsync(WaitingTaskHolder task,
0134 typename T::TransitionInfoType const& info,
0135 ServiceToken const& token,
0136 StreamID streamID,
0137 ParentContext const& parentContext,
0138 typename T::Context const* context) {
0139 unscheduled_.runAccumulatorsAsync<T>(std::move(task), info, token, streamID, parentContext, context);
0140 }
0141 }
0142
0143 #endif