Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-10-18 04:13:16

0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 Worker: this is a basic scheduling unit - an abstract base class to
0007 something that is really a producer or filter.
0008 
0009 A worker will not actually call through to the module unless it is
0010 in a Ready state.  After a module is actually run, the state will not
0011 be Ready.  The Ready state can only be reestablished by doing a reset().
0012 
0013 Pre/post module signals are posted only in the Ready state.
0014 
0015 Execution statistics are kept here.
0016 
0017 If a module has thrown an exception during execution, that exception
0018 will be rethrown if the worker is entered again and the state is not Ready.
0019 In other words, execution results (status) are cached and reused until
0020 the worker is reset().
0021 
0022 ----------------------------------------------------------------------*/
0023 
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0047 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0048 #include "FWCore/Concurrency/interface/FunctorTask.h"
0049 #include "FWCore/Utilities/interface/Exception.h"
0050 #include "FWCore/Utilities/interface/ConvertException.h"
0051 #include "FWCore/Utilities/interface/BranchType.h"
0052 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0053 #include "FWCore/Utilities/interface/StreamID.h"
0054 #include "FWCore/Utilities/interface/propagate_const.h"
0055 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0056 #include "FWCore/Utilities/interface/ESIndices.h"
0057 #include "FWCore/Utilities/interface/Transition.h"
0058 
0059 #include "FWCore/Framework/interface/Frameworkfwd.h"
0060 
0061 #include <array>
0062 #include <atomic>
0063 #include <cassert>
0064 #include <map>
0065 #include <memory>
0066 #include <sstream>
0067 #include <string>
0068 #include <vector>
0069 #include <exception>
0070 #include <unordered_map>
0071 
0072 namespace edm {
0073   class EventPrincipal;
0074   class EventSetupImpl;
0075   class EarlyDeleteHelper;
0076   class ModuleProcessName;
0077   class ProductResolverIndexHelper;
0078   class ProductResolverIndexAndSkipBit;
0079   class StreamID;
0080   class StreamContext;
0081   class ProductRegistry;
0082   class ThinnedAssociationsHelper;
0083 
0084   namespace workerhelper {
0085     template <typename O>
0086     class CallImpl;
0087   }
0088   namespace eventsetup {
0089     class ESRecordsToProxyIndices;
0090   }
0091 
0092   class Worker {
0093   public:
0094     enum State { Ready, Pass, Fail, Exception };
0095     enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096     enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream, kLegacy };
0097     struct TaskQueueAdaptor {
0098       SerialTaskQueueChain* serial_ = nullptr;
0099       LimitedTaskQueue* limited_ = nullptr;
0100 
0101       TaskQueueAdaptor() = default;
0102       TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103       TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104 
0105       operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106 
0107       template <class F>
0108       void push(oneapi::tbb::task_group& iG, F&& iF) {
0109         if (serial_) {
0110           serial_->push(iG, iF);
0111         } else {
0112           limited_->push(iG, iF);
0113         }
0114       }
0115     };
0116 
0117     Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118     virtual ~Worker();
0119 
0120     Worker(Worker const&) = delete;             // Disallow copying and moving
0121     Worker& operator=(Worker const&) = delete;  // Disallow copying and moving
0122 
0123     void clearModule() {
0124       moduleValid_ = false;
0125       doClearModule();
0126     }
0127 
0128     virtual bool wantsProcessBlocks() const = 0;
0129     virtual bool wantsInputProcessBlocks() const = 0;
0130     virtual bool wantsGlobalRuns() const = 0;
0131     virtual bool wantsGlobalLuminosityBlocks() const = 0;
0132     virtual bool wantsStreamRuns() const = 0;
0133     virtual bool wantsStreamLuminosityBlocks() const = 0;
0134 
0135     virtual SerialTaskQueue* globalRunsQueue() = 0;
0136     virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0137 
0138     void prePrefetchSelectionAsync(
0139         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
0140 
0141     void prePrefetchSelectionAsync(
0142         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
0143       assert(false);
0144     }
0145 
0146     template <typename T>
0147     void doWorkAsync(WaitingTaskHolder,
0148                      typename T::TransitionInfoType const&,
0149                      ServiceToken const&,
0150                      StreamID,
0151                      ParentContext const&,
0152                      typename T::Context const*);
0153 
0154     template <typename T>
0155     void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0156                                   typename T::TransitionInfoType const&,
0157                                   ServiceToken const&,
0158                                   StreamID,
0159                                   ParentContext const&,
0160                                   typename T::Context const*);
0161 
0162     template <typename T>
0163     std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0164                                          StreamID,
0165                                          ParentContext const&,
0166                                          typename T::Context const*);
0167 
0168     virtual size_t transformIndex(edm::BranchDescription const&) const = 0;
0169     void doTransformAsync(WaitingTaskHolder,
0170                           size_t iTransformIndex,
0171                           EventPrincipal const&,
0172                           ServiceToken const&,
0173                           StreamID,
0174                           ModuleCallingContext const&,
0175                           StreamContext const*);
0176 
0177     void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0178     void skipOnPath(EventPrincipal const& iEvent);
0179     void beginJob();
0180     void endJob();
0181     void beginStream(StreamID id, StreamContext& streamContext);
0182     void endStream(StreamID id, StreamContext& streamContext);
0183     void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0184     void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0185     void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0186     void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0187 
0188     void reset() {
0189       cached_exception_ = std::exception_ptr();
0190       state_ = Ready;
0191       waitingTasks_.reset();
0192       workStarted_ = false;
0193       numberOfPathsLeftToRun_ = numberOfPathsOn_;
0194     }
0195 
0196     void postDoEvent(EventPrincipal const&);
0197 
0198     ModuleDescription const* description() const {
0199       if (moduleValid_) {
0200         return moduleCallingContext_.moduleDescription();
0201       }
0202       return nullptr;
0203     }
0204     ///The signals are required to live longer than the last call to 'doWork'
0205     /// this was done to improve performance based on profiling
0206     void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0207 
0208     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0209 
0210     //Used to make EDGetToken work
0211     virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0212     virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
0213     virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0214     virtual void resolvePutIndicies(
0215         BranchType iBranchType,
0216         std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0217             iIndicies) = 0;
0218 
0219     virtual void modulesWhoseProductsAreConsumed(
0220         std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0221         std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0222         ProductRegistry const& preg,
0223         std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0224 
0225     virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0226 
0227     virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0228 
0229     virtual Types moduleType() const = 0;
0230     virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0231 
0232     void clearCounters() {
0233       timesRun_.store(0, std::memory_order_release);
0234       timesVisited_.store(0, std::memory_order_release);
0235       timesPassed_.store(0, std::memory_order_release);
0236       timesFailed_.store(0, std::memory_order_release);
0237       timesExcept_.store(0, std::memory_order_release);
0238     }
0239 
0240     void addedToPath() { ++numberOfPathsOn_; }
0241     //NOTE: calling state() is done to force synchronization across threads
0242     int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
0243     int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
0244     int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
0245     int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
0246     int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
0247     State state() const { return state_; }
0248 
0249     int timesPass() const { return timesPassed(); }  // for backward compatibility only - to be removed soon
0250 
0251     virtual bool hasAccumulator() const = 0;
0252 
0253     // Used in PuttableProductResolver
0254     edm::WaitingTaskList& waitingTaskList() { return waitingTasks_; }
0255 
0256   protected:
0257     template <typename O>
0258     friend class workerhelper::CallImpl;
0259 
0260     virtual void doClearModule() = 0;
0261 
0262     virtual std::string workerType() const = 0;
0263     virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0264 
0265     virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0266     virtual bool implNeedToRunSelection() const = 0;
0267 
0268     virtual void implDoAcquire(EventTransitionInfo const&,
0269                                ModuleCallingContext const*,
0270                                WaitingTaskWithArenaHolder&) = 0;
0271 
0272     virtual void implDoTransformAsync(WaitingTaskHolder,
0273                                       size_t iTransformIndex,
0274                                       EventPrincipal const&,
0275                                       ParentContext const&,
0276                                       ServiceWeakToken const&) = 0;
0277     virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const = 0;
0278 
0279     virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0280     virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0281     virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0282     virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0283     virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0284     virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0285     virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0286     virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0287     virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0288     virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0289     virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0290     virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0291     virtual void implBeginJob() = 0;
0292     virtual void implEndJob() = 0;
0293     virtual void implBeginStream(StreamID) = 0;
0294     virtual void implEndStream(StreamID) = 0;
0295 
0296     void resetModuleDescription(ModuleDescription const*);
0297 
0298     ActivityRegistry* activityRegistry() { return actReg_.get(); }
0299 
0300   private:
0301     template <typename T>
0302     bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0303 
0304     virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0305     virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0306 
0307     virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0308 
0309     virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
0310     virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0311 
0312     virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0313                                               ModuleCallingContext const& moduleCallingContext,
0314                                               Principal const& iPrincipal) const = 0;
0315 
0316     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0317     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0318     virtual void implRespondToCloseOutputFile() = 0;
0319 
0320     virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0321 
0322     virtual TaskQueueAdaptor serializeRunModule() = 0;
0323 
0324     bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
0325 
0326     template <bool IS_EVENT>
0327     bool setPassed() {
0328       if (IS_EVENT) {
0329         timesPassed_.fetch_add(1, std::memory_order_relaxed);
0330       }
0331       state_ = Pass;
0332       return true;
0333     }
0334 
0335     template <bool IS_EVENT>
0336     bool setFailed() {
0337       if (IS_EVENT) {
0338         timesFailed_.fetch_add(1, std::memory_order_relaxed);
0339       }
0340       state_ = Fail;
0341       return false;
0342     }
0343 
0344     template <bool IS_EVENT>
0345     std::exception_ptr setException(std::exception_ptr iException) {
0346       if (IS_EVENT) {
0347         timesExcept_.fetch_add(1, std::memory_order_relaxed);
0348       }
0349       cached_exception_ = iException;  // propagate_const<T> has no reset() function
0350       state_ = Exception;
0351       return cached_exception_;
0352     }
0353 
0354     template <typename T>
0355     void prefetchAsync(WaitingTaskHolder,
0356                        ServiceToken const&,
0357                        ParentContext const&,
0358                        typename T::TransitionInfoType const&,
0359                        Transition);
0360 
0361     void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&);
0362     void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
0363 
0364     bool needsESPrefetching(Transition iTrans) const noexcept {
0365       return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0366     }
0367 
0368     void emitPostModuleEventPrefetchingSignal() {
0369       actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0370     }
0371 
0372     void emitPostModuleStreamPrefetchingSignal() {
0373       actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0374                                                        moduleCallingContext_);
0375     }
0376 
0377     void emitPostModuleGlobalPrefetchingSignal() {
0378       actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0379                                                        moduleCallingContext_);
0380     }
0381 
0382     virtual bool hasAcquire() const = 0;
0383 
0384     template <typename T>
0385     std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0386                                                    typename T::TransitionInfoType const&,
0387                                                    StreamID,
0388                                                    ParentContext const&,
0389                                                    typename T::Context const*);
0390 
0391     void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0392 
0393     void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0394                                       EventTransitionInfo const&,
0395                                       ParentContext const&,
0396                                       WaitingTaskWithArenaHolder);
0397 
0398     std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext);
0399 
0400     template <typename T>
0401     class RunModuleTask : public WaitingTask {
0402     public:
0403       RunModuleTask(Worker* worker,
0404                     typename T::TransitionInfoType const& transitionInfo,
0405                     ServiceToken const& token,
0406                     StreamID streamID,
0407                     ParentContext const& parentContext,
0408                     typename T::Context const* context,
0409                     oneapi::tbb::task_group* iGroup)
0410           : m_worker(worker),
0411             m_transitionInfo(transitionInfo),
0412             m_streamID(streamID),
0413             m_parentContext(parentContext),
0414             m_context(context),
0415             m_serviceToken(token),
0416             m_group(iGroup) {}
0417 
0418       struct EnableQueueGuard {
0419         SerialTaskQueue* queue_;
0420         EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0421         EnableQueueGuard(EnableQueueGuard const&) = delete;
0422         EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0423         EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0424         EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0425         ~EnableQueueGuard() {
0426           if (queue_) {
0427             queue_->resume();
0428           }
0429         }
0430       };
0431 
0432       void execute() final {
0433         //Need to make the services available early so other services can see them
0434         ServiceRegistry::Operate guard(m_serviceToken.lock());
0435 
0436         //incase the emit causes an exception, we need a memory location
0437         // to hold the exception_ptr
0438         std::exception_ptr temp_excptr;
0439         auto excptr = exceptionPtr();
0440         if constexpr (T::isEvent_) {
0441           if (!m_worker->hasAcquire()) {
0442             // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
0443             CMS_SA_ALLOW try {
0444               //pre was called in prefetchAsync
0445               m_worker->emitPostModuleEventPrefetchingSignal();
0446             } catch (...) {
0447               temp_excptr = std::current_exception();
0448               if (not excptr) {
0449                 excptr = temp_excptr;
0450               }
0451             }
0452           }
0453         } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0454           m_worker->emitPostModuleStreamPrefetchingSignal();
0455         } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0456           m_worker->emitPostModuleGlobalPrefetchingSignal();
0457         }
0458 
0459         if (not excptr) {
0460           if (auto queue = m_worker->serializeRunModule()) {
0461             auto f = [worker = m_worker,
0462                       info = m_transitionInfo,
0463                       streamID = m_streamID,
0464                       parentContext = m_parentContext,
0465                       sContext = m_context,
0466                       serviceToken = m_serviceToken]() {
0467               //Need to make the services available
0468               ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0469 
0470               //If needed, we pause the queue in begin transition and resume it
0471               // at the end transition. This can guarantee that the module
0472               // only processes one run or lumi at a time
0473               EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0474               std::exception_ptr ptr;
0475               worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0476             };
0477             //keep another global transition from running if necessary
0478             auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0479             if (gQueue) {
0480               gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0481                 gQueue->pause();
0482                 queue.push(*group, std::move(f));
0483               });
0484             } else {
0485               queue.push(*m_group, std::move(f));
0486             }
0487             return;
0488           }
0489         }
0490 
0491         m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0492       }
0493 
0494     private:
0495       Worker* m_worker;
0496       typename T::TransitionInfoType m_transitionInfo;
0497       StreamID m_streamID;
0498       ParentContext const m_parentContext;
0499       typename T::Context const* m_context;
0500       ServiceWeakToken m_serviceToken;
0501       oneapi::tbb::task_group* m_group;
0502     };
0503 
0504     // AcquireTask is only used for the Event case, but we define
0505     // it as a template so all cases will compile.
0506     // DUMMY exists to work around the C++ Standard prohibition on
0507     // fully specializing templates nested in other classes.
0508     template <typename T, typename DUMMY = void>
0509     class AcquireTask : public WaitingTask {
0510     public:
0511       AcquireTask(Worker*,
0512                   typename T::TransitionInfoType const&,
0513                   ServiceToken const&,
0514                   ParentContext const&,
0515                   WaitingTaskWithArenaHolder) {}
0516       void execute() final {}
0517     };
0518 
0519     template <typename DUMMY>
0520     class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0521     public:
0522       AcquireTask(Worker* worker,
0523                   EventTransitionInfo const& eventTransitionInfo,
0524                   ServiceToken const& token,
0525                   ParentContext const& parentContext,
0526                   WaitingTaskWithArenaHolder holder)
0527           : m_worker(worker),
0528             m_eventTransitionInfo(eventTransitionInfo),
0529             m_parentContext(parentContext),
0530             m_holder(std::move(holder)),
0531             m_serviceToken(token) {}
0532 
0533       void execute() final {
0534         //Need to make the services available early so other services can see them
0535         ServiceRegistry::Operate guard(m_serviceToken.lock());
0536 
0537         //incase the emit causes an exception, we need a memory location
0538         // to hold the exception_ptr
0539         std::exception_ptr temp_excptr;
0540         auto excptr = exceptionPtr();
0541         // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
0542         CMS_SA_ALLOW try {
0543           //pre was called in prefetchAsync
0544           m_worker->emitPostModuleEventPrefetchingSignal();
0545         } catch (...) {
0546           temp_excptr = std::current_exception();
0547           if (not excptr) {
0548             excptr = temp_excptr;
0549           }
0550         }
0551 
0552         if (not excptr) {
0553           if (auto queue = m_worker->serializeRunModule()) {
0554             queue.push(*m_holder.group(),
0555                        [worker = m_worker,
0556                         info = m_eventTransitionInfo,
0557                         parentContext = m_parentContext,
0558                         serviceToken = m_serviceToken,
0559                         holder = m_holder]() {
0560                          //Need to make the services available
0561                          ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0562 
0563                          std::exception_ptr ptr;
0564                          worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0565                        });
0566             return;
0567           }
0568         }
0569 
0570         m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0571       }
0572 
0573     private:
0574       Worker* m_worker;
0575       EventTransitionInfo m_eventTransitionInfo;
0576       ParentContext const m_parentContext;
0577       WaitingTaskWithArenaHolder m_holder;
0578       ServiceWeakToken m_serviceToken;
0579     };
0580 
0581     // This class does nothing unless there is an exception originating
0582     // in an "External Worker". In that case, it handles converting the
0583     // exception to a CMS exception and adding context to the exception.
0584     class HandleExternalWorkExceptionTask : public WaitingTask {
0585     public:
0586       HandleExternalWorkExceptionTask(Worker* worker,
0587                                       oneapi::tbb::task_group* group,
0588                                       WaitingTask* runModuleTask,
0589                                       ParentContext const& parentContext);
0590 
0591       void execute() final;
0592 
0593     private:
0594       Worker* m_worker;
0595       WaitingTask* m_runModuleTask;
0596       oneapi::tbb::task_group* m_group;
0597       ParentContext const m_parentContext;
0598     };
0599 
0600     std::atomic<int> timesRun_;
0601     std::atomic<int> timesVisited_;
0602     std::atomic<int> timesPassed_;
0603     std::atomic<int> timesFailed_;
0604     std::atomic<int> timesExcept_;
0605     std::atomic<State> state_;
0606     int numberOfPathsOn_;
0607     std::atomic<int> numberOfPathsLeftToRun_;
0608 
0609     ModuleCallingContext moduleCallingContext_;
0610 
0611     ExceptionToActionTable const* actions_;                         // memory assumed to be managed elsewhere
0612     CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;  // if state is 'exception'
0613 
0614     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0615 
0616     edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0617 
0618     edm::WaitingTaskList waitingTasks_;
0619     std::atomic<bool> workStarted_;
0620     bool ranAcquireWithoutException_;
0621     bool moduleValid_ = true;
0622   };
0623 
0624   namespace {
0625     template <typename T>
0626     class ModuleSignalSentry {
0627     public:
0628       ModuleSignalSentry(ActivityRegistry* a,
0629                          typename T::Context const* context,
0630                          ModuleCallingContext const* moduleCallingContext)
0631           : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
0632         if (a_)
0633           T::preModuleSignal(a_, context, moduleCallingContext_);
0634       }
0635 
0636       ~ModuleSignalSentry() {
0637         if (a_)
0638           T::postModuleSignal(a_, context_, moduleCallingContext_);
0639       }
0640 
0641     private:
0642       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0643       typename T::Context const* context_;
0644       ModuleCallingContext const* moduleCallingContext_;
0645     };
0646 
0647   }  // namespace
0648 
0649   namespace workerhelper {
0650     template <>
0651     class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0652     public:
0653       typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0654       static bool call(Worker* iWorker,
0655                        StreamID,
0656                        EventTransitionInfo const& info,
0657                        ActivityRegistry* /* actReg */,
0658                        ModuleCallingContext const* mcc,
0659                        Arg::Context const* /* context*/) {
0660         //Signal sentry is handled by the module
0661         return iWorker->implDo(info, mcc);
0662       }
0663       static void esPrefetchAsync(Worker* worker,
0664                                   WaitingTaskHolder waitingTask,
0665                                   ServiceToken const& token,
0666                                   EventTransitionInfo const& info,
0667                                   Transition transition) {
0668         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0669       }
0670       static bool wantsTransition(Worker const* iWorker) { return true; }
0671       static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
0672 
0673       static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
0674       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0675     };
0676 
0677     template <>
0678     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0679     public:
0680       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0681       static bool call(Worker* iWorker,
0682                        StreamID,
0683                        RunTransitionInfo const& info,
0684                        ActivityRegistry* actReg,
0685                        ModuleCallingContext const* mcc,
0686                        Arg::Context const* context) {
0687         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0688         return iWorker->implDoBegin(info, mcc);
0689       }
0690       static void esPrefetchAsync(Worker* worker,
0691                                   WaitingTaskHolder waitingTask,
0692                                   ServiceToken const& token,
0693                                   RunTransitionInfo const& info,
0694                                   Transition transition) {
0695         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0696       }
0697       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0698       static bool needToRunSelection(Worker const* iWorker) { return false; }
0699       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0700       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0701     };
0702     template <>
0703     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0704     public:
0705       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0706       static bool call(Worker* iWorker,
0707                        StreamID id,
0708                        RunTransitionInfo const& info,
0709                        ActivityRegistry* actReg,
0710                        ModuleCallingContext const* mcc,
0711                        Arg::Context const* context) {
0712         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0713         return iWorker->implDoStreamBegin(id, info, mcc);
0714       }
0715       static void esPrefetchAsync(Worker* worker,
0716                                   WaitingTaskHolder waitingTask,
0717                                   ServiceToken const& token,
0718                                   RunTransitionInfo const& info,
0719                                   Transition transition) {
0720         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0721       }
0722       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0723       static bool needToRunSelection(Worker const* iWorker) { return false; }
0724       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0725       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0726     };
0727     template <>
0728     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0729     public:
0730       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0731       static bool call(Worker* iWorker,
0732                        StreamID,
0733                        RunTransitionInfo const& info,
0734                        ActivityRegistry* actReg,
0735                        ModuleCallingContext const* mcc,
0736                        Arg::Context const* context) {
0737         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0738         return iWorker->implDoEnd(info, mcc);
0739       }
0740       static void esPrefetchAsync(Worker* worker,
0741                                   WaitingTaskHolder waitingTask,
0742                                   ServiceToken const& token,
0743                                   RunTransitionInfo const& info,
0744                                   Transition transition) {
0745         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0746       }
0747       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0748       static bool needToRunSelection(Worker const* iWorker) { return false; }
0749       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0750       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0751     };
0752     template <>
0753     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0754     public:
0755       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0756       static bool call(Worker* iWorker,
0757                        StreamID id,
0758                        RunTransitionInfo const& info,
0759                        ActivityRegistry* actReg,
0760                        ModuleCallingContext const* mcc,
0761                        Arg::Context const* context) {
0762         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0763         return iWorker->implDoStreamEnd(id, info, mcc);
0764       }
0765       static void esPrefetchAsync(Worker* worker,
0766                                   WaitingTaskHolder waitingTask,
0767                                   ServiceToken const& token,
0768                                   RunTransitionInfo const& info,
0769                                   Transition transition) {
0770         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0771       }
0772       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0773       static bool needToRunSelection(Worker const* iWorker) { return false; }
0774       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0775       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0776     };
0777 
0778     template <>
0779     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0780     public:
0781       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0782       static bool call(Worker* iWorker,
0783                        StreamID,
0784                        LumiTransitionInfo const& info,
0785                        ActivityRegistry* actReg,
0786                        ModuleCallingContext const* mcc,
0787                        Arg::Context const* context) {
0788         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0789         return iWorker->implDoBegin(info, mcc);
0790       }
0791       static void esPrefetchAsync(Worker* worker,
0792                                   WaitingTaskHolder waitingTask,
0793                                   ServiceToken const& token,
0794                                   LumiTransitionInfo const& info,
0795                                   Transition transition) {
0796         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0797       }
0798       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0799       static bool needToRunSelection(Worker const* iWorker) { return false; }
0800       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0801       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0802     };
0803     template <>
0804     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0805     public:
0806       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0807       static bool call(Worker* iWorker,
0808                        StreamID id,
0809                        LumiTransitionInfo const& info,
0810                        ActivityRegistry* actReg,
0811                        ModuleCallingContext const* mcc,
0812                        Arg::Context const* context) {
0813         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0814         return iWorker->implDoStreamBegin(id, info, mcc);
0815       }
0816       static void esPrefetchAsync(Worker* worker,
0817                                   WaitingTaskHolder waitingTask,
0818                                   ServiceToken const& token,
0819                                   LumiTransitionInfo const& info,
0820                                   Transition transition) {
0821         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0822       }
0823       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0824       static bool needToRunSelection(Worker const* iWorker) { return false; }
0825       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0826       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0827     };
0828 
0829     template <>
0830     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0831     public:
0832       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0833       static bool call(Worker* iWorker,
0834                        StreamID,
0835                        LumiTransitionInfo const& info,
0836                        ActivityRegistry* actReg,
0837                        ModuleCallingContext const* mcc,
0838                        Arg::Context const* context) {
0839         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0840         return iWorker->implDoEnd(info, mcc);
0841       }
0842       static void esPrefetchAsync(Worker* worker,
0843                                   WaitingTaskHolder waitingTask,
0844                                   ServiceToken const& token,
0845                                   LumiTransitionInfo const& info,
0846                                   Transition transition) {
0847         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0848       }
0849       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0850       static bool needToRunSelection(Worker const* iWorker) { return false; }
0851       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0852       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0853     };
0854     template <>
0855     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0856     public:
0857       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0858       static bool call(Worker* iWorker,
0859                        StreamID id,
0860                        LumiTransitionInfo const& info,
0861                        ActivityRegistry* actReg,
0862                        ModuleCallingContext const* mcc,
0863                        Arg::Context const* context) {
0864         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0865         return iWorker->implDoStreamEnd(id, info, mcc);
0866       }
0867       static void esPrefetchAsync(Worker* worker,
0868                                   WaitingTaskHolder waitingTask,
0869                                   ServiceToken const& token,
0870                                   LumiTransitionInfo const& info,
0871                                   Transition transition) {
0872         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0873       }
0874       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0875       static bool needToRunSelection(Worker const* iWorker) { return false; }
0876       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0877       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0878     };
0879     template <>
0880     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0881     public:
0882       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0883       static bool call(Worker* iWorker,
0884                        StreamID,
0885                        ProcessBlockTransitionInfo const& info,
0886                        ActivityRegistry* actReg,
0887                        ModuleCallingContext const* mcc,
0888                        Arg::Context const* context) {
0889         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0890         return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0891       }
0892       static void esPrefetchAsync(
0893           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0894       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0895       static bool needToRunSelection(Worker const* iWorker) { return false; }
0896       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0897       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0898     };
0899     template <>
0900     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0901     public:
0902       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0903       static bool call(Worker* iWorker,
0904                        StreamID,
0905                        ProcessBlockTransitionInfo const& info,
0906                        ActivityRegistry* actReg,
0907                        ModuleCallingContext const* mcc,
0908                        Arg::Context const* context) {
0909         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0910         return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0911       }
0912       static void esPrefetchAsync(
0913           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0914       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
0915       static bool needToRunSelection(Worker const* iWorker) { return false; }
0916       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0917       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0918     };
0919     template <>
0920     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
0921     public:
0922       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0923       static bool call(Worker* iWorker,
0924                        StreamID,
0925                        ProcessBlockTransitionInfo const& info,
0926                        ActivityRegistry* actReg,
0927                        ModuleCallingContext const* mcc,
0928                        Arg::Context const* context) {
0929         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0930         return iWorker->implDoEndProcessBlock(info.principal(), mcc);
0931       }
0932       static void esPrefetchAsync(
0933           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0934       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0935       static bool needToRunSelection(Worker const* iWorker) { return false; }
0936       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0937       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0938     };
0939   }  // namespace workerhelper
0940 
0941   template <typename T>
0942   void Worker::prefetchAsync(WaitingTaskHolder iTask,
0943                              ServiceToken const& token,
0944                              ParentContext const& parentContext,
0945                              typename T::TransitionInfoType const& transitionInfo,
0946                              Transition iTransition) {
0947     Principal const& principal = transitionInfo.principal();
0948 
0949     moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0950 
0951     if constexpr (T::isEvent_) {
0952       actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0953     } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0954       actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0955     } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0956       actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
0957     }
0958 
0959     workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
0960     edPrefetchAsync(iTask, token, principal);
0961 
0962     if (principal.branchType() == InEvent) {
0963       preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
0964     }
0965   }
0966 
0967   template <typename T>
0968   void Worker::doWorkAsync(WaitingTaskHolder task,
0969                            typename T::TransitionInfoType const& transitionInfo,
0970                            ServiceToken const& token,
0971                            StreamID streamID,
0972                            ParentContext const& parentContext,
0973                            typename T::Context const* context) {
0974     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
0975       return;
0976     }
0977 
0978     //Need to check workStarted_ before adding to waitingTasks_
0979     bool expected = false;
0980     bool workStarted = workStarted_.compare_exchange_strong(expected, true);
0981 
0982     waitingTasks_.add(task);
0983     if constexpr (T::isEvent_) {
0984       timesVisited_.fetch_add(1, std::memory_order_relaxed);
0985     }
0986 
0987     if (workStarted) {
0988       moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0989 
0990       //if have TriggerResults based selection we want to reject the event before doing prefetching
0991       if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
0992         //We need to run the selection in a different task so that
0993         // we can prefetch the data needed for the selection
0994         WaitingTask* moduleTask =
0995             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
0996 
0997         //make sure the task is either run or destroyed
0998         struct DestroyTask {
0999           DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1000 
1001           ~DestroyTask() {
1002             auto p = m_task.exchange(nullptr);
1003             if (p) {
1004               TaskSentry s{p};
1005             }
1006           }
1007 
1008           edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1009 
1010         private:
1011           std::atomic<edm::WaitingTask*> m_task;
1012         };
1013         if constexpr (T::isEvent_) {
1014           if (hasAcquire()) {
1015             auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1016             ServiceWeakToken weakToken = token;
1017             auto* group = task.group();
1018             moduleTask = make_waiting_task(
1019                 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1020                   WaitingTaskWithArenaHolder runTaskHolder(
1021                       *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1022                   AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1023                   t.execute();
1024                 });
1025           }
1026         }
1027         auto* group = task.group();
1028         auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1029         ServiceWeakToken weakToken = token;
1030         auto selectionTask =
1031             make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1032                                   std::exception_ptr const*) mutable {
1033               ServiceRegistry::Operate guard(weakToken.lock());
1034               prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1035                                weakToken.lock(),
1036                                parentContext,
1037                                info,
1038                                T::transition_);
1039             });
1040         prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1041       } else {
1042         WaitingTask* moduleTask =
1043             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1044         auto group = task.group();
1045         if constexpr (T::isEvent_) {
1046           if (hasAcquire()) {
1047             WaitingTaskWithArenaHolder runTaskHolder(
1048                 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1049             moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1050           }
1051         }
1052         prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1053       }
1054     }
1055   }
1056 
1057   template <typename T>
1058   std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1059                                                          typename T::TransitionInfoType const& transitionInfo,
1060                                                          StreamID streamID,
1061                                                          ParentContext const& parentContext,
1062                                                          typename T::Context const* context) {
1063     std::exception_ptr exceptionPtr;
1064     if (iEPtr) {
1065       if (shouldRethrowException(iEPtr, parentContext, T::isEvent_)) {
1066         exceptionPtr = iEPtr;
1067         setException<T::isEvent_>(exceptionPtr);
1068       } else {
1069         setPassed<T::isEvent_>();
1070       }
1071       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1072     } else {
1073       // Caught exception is propagated via WaitingTaskList
1074       CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1075         exceptionPtr = std::current_exception();
1076       }
1077     }
1078     waitingTasks_.doneWaiting(exceptionPtr);
1079     return exceptionPtr;
1080   }
1081 
1082   template <typename T>
1083   void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1084                                         typename T::TransitionInfoType const& transitionInfo,
1085                                         ServiceToken const& serviceToken,
1086                                         StreamID streamID,
1087                                         ParentContext const& parentContext,
1088                                         typename T::Context const* context) {
1089     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1090       return;
1091     }
1092 
1093     //Need to check workStarted_ before adding to waitingTasks_
1094     bool expected = false;
1095     auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1096 
1097     waitingTasks_.add(task);
1098     if (workStarted) {
1099       ServiceWeakToken weakToken = serviceToken;
1100       auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1101         std::exception_ptr exceptionPtr;
1102         // Caught exception is propagated via WaitingTaskList
1103         CMS_SA_ALLOW try {
1104           //Need to make the services available
1105           ServiceRegistry::Operate guard(weakToken.lock());
1106 
1107           this->runModule<T>(info, streamID, parentContext, context);
1108         } catch (...) {
1109           exceptionPtr = std::current_exception();
1110         }
1111         this->waitingTasks_.doneWaiting(exceptionPtr);
1112       };
1113 
1114       if (needsESPrefetching(T::transition_)) {
1115         auto group = task.group();
1116         auto afterPrefetch =
1117             edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1118               if (iExcept) {
1119                 this->waitingTasks_.doneWaiting(*iExcept);
1120               } else {
1121                 if (auto queue = this->serializeRunModule()) {
1122                   queue.push(*group, toDo);
1123                 } else {
1124                   group->run(toDo);
1125                 }
1126               }
1127             });
1128         esPrefetchAsync(
1129             WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1130       } else {
1131         auto group = task.group();
1132         if (auto queue = this->serializeRunModule()) {
1133           queue.push(*group, toDo);
1134         } else {
1135           group->run(toDo);
1136         }
1137       }
1138     }
1139   }
1140 
1141   template <typename T>
1142   bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1143                          StreamID streamID,
1144                          ParentContext const& parentContext,
1145                          typename T::Context const* context) {
1146     //unscheduled producers should advance this
1147     //if (T::isEvent_) {
1148     //  ++timesVisited_;
1149     //}
1150     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1151     if constexpr (T::isEvent_) {
1152       timesRun_.fetch_add(1, std::memory_order_relaxed);
1153     }
1154 
1155     bool rc = true;
1156     try {
1157       convertException::wrap([&]() {
1158         rc = workerhelper::CallImpl<T>::call(
1159             this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1160 
1161         if (rc) {
1162           setPassed<T::isEvent_>();
1163         } else {
1164           setFailed<T::isEvent_>();
1165         }
1166       });
1167     } catch (cms::Exception& ex) {
1168       edm::exceptionContext(ex, moduleCallingContext_);
1169       if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1170         assert(not cached_exception_);
1171         setException<T::isEvent_>(std::current_exception());
1172         std::rethrow_exception(cached_exception_);
1173       } else {
1174         rc = setPassed<T::isEvent_>();
1175       }
1176     }
1177 
1178     return rc;
1179   }
1180 
1181   template <typename T>
1182   std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1183                                                StreamID streamID,
1184                                                ParentContext const& parentContext,
1185                                                typename T::Context const* context) {
1186     timesVisited_.fetch_add(1, std::memory_order_relaxed);
1187     std::exception_ptr prefetchingException;  // null because there was no prefetching to do
1188     return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1189   }
1190 }  // namespace edm
1191 #endif