Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-06-03 00:58:58

0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 Worker: this is a basic scheduling unit - an abstract base class to
0007 something that is really a producer or filter.
0008 
0009 A worker will not actually call through to the module unless it is
0010 in a Ready state.  After a module is actually run, the state will not
0011 be Ready.  The Ready state can only be reestablished by doing a reset().
0012 
0013 Pre/post module signals are posted only in the Ready state.
0014 
0015 Execution statistics are kept here.
0016 
0017 If a module has thrown an exception during execution, that exception
0018 will be rethrown if the worker is entered again and the state is not Ready.
0019 In other words, execution results (status) are cached and reused until
0020 the worker is reset().
0021 
0022 ----------------------------------------------------------------------*/
0023 
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0047 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0048 #include "FWCore/Concurrency/interface/FunctorTask.h"
0049 #include "FWCore/Utilities/interface/Exception.h"
0050 #include "FWCore/Utilities/interface/ConvertException.h"
0051 #include "FWCore/Utilities/interface/BranchType.h"
0052 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0053 #include "FWCore/Utilities/interface/StreamID.h"
0054 #include "FWCore/Utilities/interface/propagate_const.h"
0055 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0056 #include "FWCore/Utilities/interface/ESIndices.h"
0057 #include "FWCore/Utilities/interface/Transition.h"
0058 
0059 #include "FWCore/Framework/interface/Frameworkfwd.h"
0060 
0061 #include <array>
0062 #include <atomic>
0063 #include <cassert>
0064 #include <map>
0065 #include <memory>
0066 #include <sstream>
0067 #include <string>
0068 #include <vector>
0069 #include <exception>
0070 #include <unordered_map>
0071 
0072 namespace edm {
0073   class EventPrincipal;
0074   class EventSetupImpl;
0075   class EarlyDeleteHelper;
0076   class ModuleProcessName;
0077   class ProductResolverIndexHelper;
0078   class ProductResolverIndexAndSkipBit;
0079   class StreamID;
0080   class StreamContext;
0081   class ProductRegistry;
0082   class ThinnedAssociationsHelper;
0083 
0084   namespace workerhelper {
0085     template <typename O>
0086     class CallImpl;
0087   }
0088   namespace eventsetup {
0089     class ESRecordsToProxyIndices;
0090   }
0091 
0092   class Worker {
0093   public:
0094     enum State { Ready, Pass, Fail, Exception };
0095     enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096     enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream, kLegacy };
0097     struct TaskQueueAdaptor {
0098       SerialTaskQueueChain* serial_ = nullptr;
0099       LimitedTaskQueue* limited_ = nullptr;
0100 
0101       TaskQueueAdaptor() = default;
0102       TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103       TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104 
0105       operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106 
0107       template <class F>
0108       void push(oneapi::tbb::task_group& iG, F&& iF) {
0109         if (serial_) {
0110           serial_->push(iG, iF);
0111         } else {
0112           limited_->push(iG, iF);
0113         }
0114       }
0115     };
0116 
0117     Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118     virtual ~Worker();
0119 
0120     Worker(Worker const&) = delete;             // Disallow copying and moving
0121     Worker& operator=(Worker const&) = delete;  // Disallow copying and moving
0122 
0123     void clearModule() {
0124       moduleValid_ = false;
0125       doClearModule();
0126     }
0127 
0128     virtual bool wantsProcessBlocks() const = 0;
0129     virtual bool wantsInputProcessBlocks() const = 0;
0130     virtual bool wantsGlobalRuns() const = 0;
0131     virtual bool wantsGlobalLuminosityBlocks() const = 0;
0132     virtual bool wantsStreamRuns() const = 0;
0133     virtual bool wantsStreamLuminosityBlocks() const = 0;
0134 
0135     virtual SerialTaskQueue* globalRunsQueue() = 0;
0136     virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0137 
0138     void prePrefetchSelectionAsync(
0139         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
0140 
0141     void prePrefetchSelectionAsync(
0142         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
0143       assert(false);
0144     }
0145 
0146     template <typename T>
0147     void doWorkAsync(WaitingTaskHolder,
0148                      typename T::TransitionInfoType const&,
0149                      ServiceToken const&,
0150                      StreamID,
0151                      ParentContext const&,
0152                      typename T::Context const*);
0153 
0154     template <typename T>
0155     void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0156                                   typename T::TransitionInfoType const&,
0157                                   ServiceToken const&,
0158                                   StreamID,
0159                                   ParentContext const&,
0160                                   typename T::Context const*);
0161 
0162     template <typename T>
0163     std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0164                                          StreamID,
0165                                          ParentContext const&,
0166                                          typename T::Context const*);
0167 
0168     void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0169     void skipOnPath(EventPrincipal const& iEvent);
0170     void beginJob();
0171     void endJob();
0172     void beginStream(StreamID id, StreamContext& streamContext);
0173     void endStream(StreamID id, StreamContext& streamContext);
0174     void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0175     void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0176     void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0177     void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0178 
0179     void reset() {
0180       cached_exception_ = std::exception_ptr();
0181       state_ = Ready;
0182       waitingTasks_.reset();
0183       workStarted_ = false;
0184       numberOfPathsLeftToRun_ = numberOfPathsOn_;
0185     }
0186 
0187     void postDoEvent(EventPrincipal const&);
0188 
0189     ModuleDescription const* description() const {
0190       if (moduleValid_) {
0191         return moduleCallingContext_.moduleDescription();
0192       }
0193       return nullptr;
0194     }
0195     ///The signals are required to live longer than the last call to 'doWork'
0196     /// this was done to improve performance based on profiling
0197     void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0198 
0199     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0200 
0201     //Used to make EDGetToken work
0202     virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0203     virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
0204     virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0205     virtual void resolvePutIndicies(
0206         BranchType iBranchType,
0207         std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0208             iIndicies) = 0;
0209 
0210     virtual void modulesWhoseProductsAreConsumed(
0211         std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0212         std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0213         ProductRegistry const& preg,
0214         std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0215 
0216     virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0217 
0218     virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0219 
0220     virtual Types moduleType() const = 0;
0221     virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0222 
0223     void clearCounters() {
0224       timesRun_.store(0, std::memory_order_release);
0225       timesVisited_.store(0, std::memory_order_release);
0226       timesPassed_.store(0, std::memory_order_release);
0227       timesFailed_.store(0, std::memory_order_release);
0228       timesExcept_.store(0, std::memory_order_release);
0229     }
0230 
0231     void addedToPath() { ++numberOfPathsOn_; }
0232     //NOTE: calling state() is done to force synchronization across threads
0233     int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
0234     int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
0235     int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
0236     int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
0237     int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
0238     State state() const { return state_; }
0239 
0240     int timesPass() const { return timesPassed(); }  // for backward compatibility only - to be removed soon
0241 
0242     virtual bool hasAccumulator() const = 0;
0243 
0244   protected:
0245     template <typename O>
0246     friend class workerhelper::CallImpl;
0247 
0248     virtual void doClearModule() = 0;
0249 
0250     virtual std::string workerType() const = 0;
0251     virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0252 
0253     virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0254     virtual bool implNeedToRunSelection() const = 0;
0255 
0256     virtual void implDoAcquire(EventTransitionInfo const&,
0257                                ModuleCallingContext const*,
0258                                WaitingTaskWithArenaHolder&) = 0;
0259 
0260     virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0261     virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0262     virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0263     virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0264     virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0265     virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0266     virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0267     virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0268     virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0269     virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0270     virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0271     virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0272     virtual void implBeginJob() = 0;
0273     virtual void implEndJob() = 0;
0274     virtual void implBeginStream(StreamID) = 0;
0275     virtual void implEndStream(StreamID) = 0;
0276 
0277     void resetModuleDescription(ModuleDescription const*);
0278 
0279     ActivityRegistry* activityRegistry() { return actReg_.get(); }
0280 
0281   private:
0282     template <typename T>
0283     bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0284 
0285     virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0286     virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0287 
0288     virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0289 
0290     virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
0291     virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0292     virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
0293 
0294     virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0295                                               ModuleCallingContext const& moduleCallingContext,
0296                                               Principal const& iPrincipal) const = 0;
0297 
0298     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0299     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0300     virtual void implRespondToCloseOutputFile() = 0;
0301 
0302     virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0303 
0304     virtual TaskQueueAdaptor serializeRunModule() = 0;
0305 
0306     bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
0307 
0308     template <bool IS_EVENT>
0309     bool setPassed() {
0310       if (IS_EVENT) {
0311         timesPassed_.fetch_add(1, std::memory_order_relaxed);
0312       }
0313       state_ = Pass;
0314       return true;
0315     }
0316 
0317     template <bool IS_EVENT>
0318     bool setFailed() {
0319       if (IS_EVENT) {
0320         timesFailed_.fetch_add(1, std::memory_order_relaxed);
0321       }
0322       state_ = Fail;
0323       return false;
0324     }
0325 
0326     template <bool IS_EVENT>
0327     std::exception_ptr setException(std::exception_ptr iException) {
0328       if (IS_EVENT) {
0329         timesExcept_.fetch_add(1, std::memory_order_relaxed);
0330       }
0331       cached_exception_ = iException;  // propagate_const<T> has no reset() function
0332       state_ = Exception;
0333       return cached_exception_;
0334     }
0335 
0336     template <typename T>
0337     void prefetchAsync(WaitingTaskHolder,
0338                        ServiceToken const&,
0339                        ParentContext const&,
0340                        typename T::TransitionInfoType const&,
0341                        Transition);
0342 
0343     void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&);
0344     void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
0345 
0346     bool needsESPrefetching(Transition iTrans) const noexcept {
0347       return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0348     }
0349 
0350     void emitPostModuleEventPrefetchingSignal() {
0351       actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0352     }
0353 
0354     virtual bool hasAcquire() const = 0;
0355 
0356     template <typename T>
0357     std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const*,
0358                                                    typename T::TransitionInfoType const&,
0359                                                    StreamID,
0360                                                    ParentContext const&,
0361                                                    typename T::Context const*);
0362 
0363     void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0364 
0365     void runAcquireAfterAsyncPrefetch(std::exception_ptr const*,
0366                                       EventTransitionInfo const&,
0367                                       ParentContext const&,
0368                                       WaitingTaskWithArenaHolder);
0369 
0370     std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
0371 
0372     template <typename T>
0373     class RunModuleTask : public WaitingTask {
0374     public:
0375       RunModuleTask(Worker* worker,
0376                     typename T::TransitionInfoType const& transitionInfo,
0377                     ServiceToken const& token,
0378                     StreamID streamID,
0379                     ParentContext const& parentContext,
0380                     typename T::Context const* context,
0381                     oneapi::tbb::task_group* iGroup)
0382           : m_worker(worker),
0383             m_transitionInfo(transitionInfo),
0384             m_streamID(streamID),
0385             m_parentContext(parentContext),
0386             m_context(context),
0387             m_serviceToken(token),
0388             m_group(iGroup) {}
0389 
0390       struct EnableQueueGuard {
0391         SerialTaskQueue* queue_;
0392         EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0393         EnableQueueGuard(EnableQueueGuard const&) = delete;
0394         EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0395         EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0396         EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0397         ~EnableQueueGuard() {
0398           if (queue_) {
0399             queue_->resume();
0400           }
0401         }
0402       };
0403 
0404       void execute() final {
0405         //Need to make the services available early so other services can see them
0406         ServiceRegistry::Operate guard(m_serviceToken.lock());
0407 
0408         //incase the emit causes an exception, we need a memory location
0409         // to hold the exception_ptr
0410         std::exception_ptr temp_excptr;
0411         auto excptr = exceptionPtr();
0412         if constexpr (T::isEvent_) {
0413           if (!m_worker->hasAcquire()) {
0414             // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
0415             CMS_SA_ALLOW try {
0416               //pre was called in prefetchAsync
0417               m_worker->emitPostModuleEventPrefetchingSignal();
0418             } catch (...) {
0419               temp_excptr = std::current_exception();
0420               if (not excptr) {
0421                 excptr = &temp_excptr;
0422               }
0423             }
0424           }
0425         }
0426 
0427         if (not excptr) {
0428           if (auto queue = m_worker->serializeRunModule()) {
0429             auto f = [worker = m_worker,
0430                       info = m_transitionInfo,
0431                       streamID = m_streamID,
0432                       parentContext = m_parentContext,
0433                       sContext = m_context,
0434                       serviceToken = m_serviceToken]() {
0435               //Need to make the services available
0436               ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0437 
0438               //If needed, we pause the queue in begin transition and resume it
0439               // at the end transition. This can guarantee that the module
0440               // only processes one run or lumi at a time
0441               EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0442               std::exception_ptr* ptr = nullptr;
0443               worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0444             };
0445             //keep another global transition from running if necessary
0446             auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0447             if (gQueue) {
0448               gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0449                 gQueue->pause();
0450                 queue.push(*group, std::move(f));
0451               });
0452             } else {
0453               queue.push(*m_group, std::move(f));
0454             }
0455             return;
0456           }
0457         }
0458 
0459         m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0460       }
0461 
0462     private:
0463       Worker* m_worker;
0464       typename T::TransitionInfoType m_transitionInfo;
0465       StreamID m_streamID;
0466       ParentContext const m_parentContext;
0467       typename T::Context const* m_context;
0468       ServiceWeakToken m_serviceToken;
0469       oneapi::tbb::task_group* m_group;
0470     };
0471 
0472     // AcquireTask is only used for the Event case, but we define
0473     // it as a template so all cases will compile.
0474     // DUMMY exists to work around the C++ Standard prohibition on
0475     // fully specializing templates nested in other classes.
0476     template <typename T, typename DUMMY = void>
0477     class AcquireTask : public WaitingTask {
0478     public:
0479       AcquireTask(Worker*,
0480                   typename T::TransitionInfoType const&,
0481                   ServiceToken const&,
0482                   ParentContext const&,
0483                   WaitingTaskWithArenaHolder) {}
0484       void execute() final {}
0485     };
0486 
0487     template <typename DUMMY>
0488     class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0489     public:
0490       AcquireTask(Worker* worker,
0491                   EventTransitionInfo const& eventTransitionInfo,
0492                   ServiceToken const& token,
0493                   ParentContext const& parentContext,
0494                   WaitingTaskWithArenaHolder holder)
0495           : m_worker(worker),
0496             m_eventTransitionInfo(eventTransitionInfo),
0497             m_parentContext(parentContext),
0498             m_holder(std::move(holder)),
0499             m_serviceToken(token) {}
0500 
0501       void execute() final {
0502         //Need to make the services available early so other services can see them
0503         ServiceRegistry::Operate guard(m_serviceToken.lock());
0504 
0505         //incase the emit causes an exception, we need a memory location
0506         // to hold the exception_ptr
0507         std::exception_ptr temp_excptr;
0508         auto excptr = exceptionPtr();
0509         // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
0510         CMS_SA_ALLOW try {
0511           //pre was called in prefetchAsync
0512           m_worker->emitPostModuleEventPrefetchingSignal();
0513         } catch (...) {
0514           temp_excptr = std::current_exception();
0515           if (not excptr) {
0516             excptr = &temp_excptr;
0517           }
0518         }
0519 
0520         if (not excptr) {
0521           if (auto queue = m_worker->serializeRunModule()) {
0522             queue.push(*m_holder.group(),
0523                        [worker = m_worker,
0524                         info = m_eventTransitionInfo,
0525                         parentContext = m_parentContext,
0526                         serviceToken = m_serviceToken,
0527                         holder = m_holder]() {
0528                          //Need to make the services available
0529                          ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0530 
0531                          std::exception_ptr* ptr = nullptr;
0532                          worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0533                        });
0534             return;
0535           }
0536         }
0537 
0538         m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0539       }
0540 
0541     private:
0542       Worker* m_worker;
0543       EventTransitionInfo m_eventTransitionInfo;
0544       ParentContext const m_parentContext;
0545       WaitingTaskWithArenaHolder m_holder;
0546       ServiceWeakToken m_serviceToken;
0547     };
0548 
0549     // This class does nothing unless there is an exception originating
0550     // in an "External Worker". In that case, it handles converting the
0551     // exception to a CMS exception and adding context to the exception.
0552     class HandleExternalWorkExceptionTask : public WaitingTask {
0553     public:
0554       HandleExternalWorkExceptionTask(Worker* worker,
0555                                       oneapi::tbb::task_group* group,
0556                                       WaitingTask* runModuleTask,
0557                                       ParentContext const& parentContext);
0558 
0559       void execute() final;
0560 
0561     private:
0562       Worker* m_worker;
0563       WaitingTask* m_runModuleTask;
0564       oneapi::tbb::task_group* m_group;
0565       ParentContext const m_parentContext;
0566     };
0567 
0568     std::atomic<int> timesRun_;
0569     std::atomic<int> timesVisited_;
0570     std::atomic<int> timesPassed_;
0571     std::atomic<int> timesFailed_;
0572     std::atomic<int> timesExcept_;
0573     std::atomic<State> state_;
0574     int numberOfPathsOn_;
0575     std::atomic<int> numberOfPathsLeftToRun_;
0576 
0577     ModuleCallingContext moduleCallingContext_;
0578 
0579     ExceptionToActionTable const* actions_;                         // memory assumed to be managed elsewhere
0580     CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;  // if state is 'exception'
0581 
0582     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0583 
0584     edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0585 
0586     edm::WaitingTaskList waitingTasks_;
0587     std::atomic<bool> workStarted_;
0588     bool ranAcquireWithoutException_;
0589     bool moduleValid_ = true;
0590   };
0591 
0592   namespace {
0593     template <typename T>
0594     class ModuleSignalSentry {
0595     public:
0596       ModuleSignalSentry(ActivityRegistry* a,
0597                          typename T::Context const* context,
0598                          ModuleCallingContext const* moduleCallingContext)
0599           : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
0600         if (a_)
0601           T::preModuleSignal(a_, context, moduleCallingContext_);
0602       }
0603 
0604       ~ModuleSignalSentry() {
0605         if (a_)
0606           T::postModuleSignal(a_, context_, moduleCallingContext_);
0607       }
0608 
0609     private:
0610       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0611       typename T::Context const* context_;
0612       ModuleCallingContext const* moduleCallingContext_;
0613     };
0614 
0615   }  // namespace
0616 
0617   namespace workerhelper {
0618     template <>
0619     class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0620     public:
0621       typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0622       static bool call(Worker* iWorker,
0623                        StreamID,
0624                        EventTransitionInfo const& info,
0625                        ActivityRegistry* /* actReg */,
0626                        ModuleCallingContext const* mcc,
0627                        Arg::Context const* /* context*/) {
0628         //Signal sentry is handled by the module
0629         return iWorker->implDo(info, mcc);
0630       }
0631       static void esPrefetchAsync(Worker* worker,
0632                                   WaitingTaskHolder waitingTask,
0633                                   ServiceToken const& token,
0634                                   EventTransitionInfo const& info,
0635                                   Transition transition) {
0636         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0637       }
0638       static bool wantsTransition(Worker const* iWorker) { return true; }
0639       static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
0640 
0641       static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
0642       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0643     };
0644 
0645     template <>
0646     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0647     public:
0648       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0649       static bool call(Worker* iWorker,
0650                        StreamID,
0651                        RunTransitionInfo const& info,
0652                        ActivityRegistry* actReg,
0653                        ModuleCallingContext const* mcc,
0654                        Arg::Context const* context) {
0655         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0656         return iWorker->implDoBegin(info, mcc);
0657       }
0658       static void esPrefetchAsync(Worker* worker,
0659                                   WaitingTaskHolder waitingTask,
0660                                   ServiceToken const& token,
0661                                   RunTransitionInfo const& info,
0662                                   Transition transition) {
0663         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0664       }
0665       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0666       static bool needToRunSelection(Worker const* iWorker) { return false; }
0667       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0668       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0669     };
0670     template <>
0671     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0672     public:
0673       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0674       static bool call(Worker* iWorker,
0675                        StreamID id,
0676                        RunTransitionInfo const& info,
0677                        ActivityRegistry* actReg,
0678                        ModuleCallingContext const* mcc,
0679                        Arg::Context const* context) {
0680         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0681         return iWorker->implDoStreamBegin(id, info, mcc);
0682       }
0683       static void esPrefetchAsync(Worker* worker,
0684                                   WaitingTaskHolder waitingTask,
0685                                   ServiceToken const& token,
0686                                   RunTransitionInfo const& info,
0687                                   Transition transition) {
0688         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0689       }
0690       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0691       static bool needToRunSelection(Worker const* iWorker) { return false; }
0692       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0693       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0694     };
0695     template <>
0696     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0697     public:
0698       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0699       static bool call(Worker* iWorker,
0700                        StreamID,
0701                        RunTransitionInfo const& info,
0702                        ActivityRegistry* actReg,
0703                        ModuleCallingContext const* mcc,
0704                        Arg::Context const* context) {
0705         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0706         return iWorker->implDoEnd(info, mcc);
0707       }
0708       static void esPrefetchAsync(Worker* worker,
0709                                   WaitingTaskHolder waitingTask,
0710                                   ServiceToken const& token,
0711                                   RunTransitionInfo const& info,
0712                                   Transition transition) {
0713         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0714       }
0715       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0716       static bool needToRunSelection(Worker const* iWorker) { return false; }
0717       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0718       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0719     };
0720     template <>
0721     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0722     public:
0723       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0724       static bool call(Worker* iWorker,
0725                        StreamID id,
0726                        RunTransitionInfo const& info,
0727                        ActivityRegistry* actReg,
0728                        ModuleCallingContext const* mcc,
0729                        Arg::Context const* context) {
0730         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0731         return iWorker->implDoStreamEnd(id, info, mcc);
0732       }
0733       static void esPrefetchAsync(Worker* worker,
0734                                   WaitingTaskHolder waitingTask,
0735                                   ServiceToken const& token,
0736                                   RunTransitionInfo const& info,
0737                                   Transition transition) {
0738         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0739       }
0740       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0741       static bool needToRunSelection(Worker const* iWorker) { return false; }
0742       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0743       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0744     };
0745 
0746     template <>
0747     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0748     public:
0749       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0750       static bool call(Worker* iWorker,
0751                        StreamID,
0752                        LumiTransitionInfo const& info,
0753                        ActivityRegistry* actReg,
0754                        ModuleCallingContext const* mcc,
0755                        Arg::Context const* context) {
0756         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0757         return iWorker->implDoBegin(info, mcc);
0758       }
0759       static void esPrefetchAsync(Worker* worker,
0760                                   WaitingTaskHolder waitingTask,
0761                                   ServiceToken const& token,
0762                                   LumiTransitionInfo const& info,
0763                                   Transition transition) {
0764         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0765       }
0766       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0767       static bool needToRunSelection(Worker const* iWorker) { return false; }
0768       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0769       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0770     };
0771     template <>
0772     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0773     public:
0774       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0775       static bool call(Worker* iWorker,
0776                        StreamID id,
0777                        LumiTransitionInfo const& info,
0778                        ActivityRegistry* actReg,
0779                        ModuleCallingContext const* mcc,
0780                        Arg::Context const* context) {
0781         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0782         return iWorker->implDoStreamBegin(id, info, mcc);
0783       }
0784       static void esPrefetchAsync(Worker* worker,
0785                                   WaitingTaskHolder waitingTask,
0786                                   ServiceToken const& token,
0787                                   LumiTransitionInfo const& info,
0788                                   Transition transition) {
0789         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0790       }
0791       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0792       static bool needToRunSelection(Worker const* iWorker) { return false; }
0793       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0794       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0795     };
0796 
0797     template <>
0798     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0799     public:
0800       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0801       static bool call(Worker* iWorker,
0802                        StreamID,
0803                        LumiTransitionInfo const& info,
0804                        ActivityRegistry* actReg,
0805                        ModuleCallingContext const* mcc,
0806                        Arg::Context const* context) {
0807         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0808         return iWorker->implDoEnd(info, mcc);
0809       }
0810       static void esPrefetchAsync(Worker* worker,
0811                                   WaitingTaskHolder waitingTask,
0812                                   ServiceToken const& token,
0813                                   LumiTransitionInfo const& info,
0814                                   Transition transition) {
0815         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0816       }
0817       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0818       static bool needToRunSelection(Worker const* iWorker) { return false; }
0819       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0820       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0821     };
0822     template <>
0823     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0824     public:
0825       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0826       static bool call(Worker* iWorker,
0827                        StreamID id,
0828                        LumiTransitionInfo const& info,
0829                        ActivityRegistry* actReg,
0830                        ModuleCallingContext const* mcc,
0831                        Arg::Context const* context) {
0832         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0833         return iWorker->implDoStreamEnd(id, info, mcc);
0834       }
0835       static void esPrefetchAsync(Worker* worker,
0836                                   WaitingTaskHolder waitingTask,
0837                                   ServiceToken const& token,
0838                                   LumiTransitionInfo const& info,
0839                                   Transition transition) {
0840         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0841       }
0842       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0843       static bool needToRunSelection(Worker const* iWorker) { return false; }
0844       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0845       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0846     };
0847     template <>
0848     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0849     public:
0850       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0851       static bool call(Worker* iWorker,
0852                        StreamID,
0853                        ProcessBlockTransitionInfo const& info,
0854                        ActivityRegistry* actReg,
0855                        ModuleCallingContext const* mcc,
0856                        Arg::Context const* context) {
0857         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0858         return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0859       }
0860       static void esPrefetchAsync(
0861           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0862       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0863       static bool needToRunSelection(Worker const* iWorker) { return false; }
0864       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0865       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0866     };
0867     template <>
0868     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0869     public:
0870       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0871       static bool call(Worker* iWorker,
0872                        StreamID,
0873                        ProcessBlockTransitionInfo const& info,
0874                        ActivityRegistry* actReg,
0875                        ModuleCallingContext const* mcc,
0876                        Arg::Context const* context) {
0877         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0878         return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0879       }
0880       static void esPrefetchAsync(
0881           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0882       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
0883       static bool needToRunSelection(Worker const* iWorker) { return false; }
0884       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0885       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0886     };
0887     template <>
0888     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
0889     public:
0890       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0891       static bool call(Worker* iWorker,
0892                        StreamID,
0893                        ProcessBlockTransitionInfo const& info,
0894                        ActivityRegistry* actReg,
0895                        ModuleCallingContext const* mcc,
0896                        Arg::Context const* context) {
0897         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0898         return iWorker->implDoEndProcessBlock(info.principal(), mcc);
0899       }
0900       static void esPrefetchAsync(
0901           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0902       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0903       static bool needToRunSelection(Worker const* iWorker) { return false; }
0904       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0905       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0906     };
0907   }  // namespace workerhelper
0908 
0909   template <typename T>
0910   void Worker::prefetchAsync(WaitingTaskHolder iTask,
0911                              ServiceToken const& token,
0912                              ParentContext const& parentContext,
0913                              typename T::TransitionInfoType const& transitionInfo,
0914                              Transition iTransition) {
0915     Principal const& principal = transitionInfo.principal();
0916 
0917     moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0918 
0919     if (principal.branchType() == InEvent) {
0920       actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0921     }
0922 
0923     workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
0924     edPrefetchAsync(iTask, token, principal);
0925 
0926     if (principal.branchType() == InEvent) {
0927       preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
0928     }
0929   }
0930 
0931   template <typename T>
0932   void Worker::doWorkAsync(WaitingTaskHolder task,
0933                            typename T::TransitionInfoType const& transitionInfo,
0934                            ServiceToken const& token,
0935                            StreamID streamID,
0936                            ParentContext const& parentContext,
0937                            typename T::Context const* context) {
0938     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
0939       return;
0940     }
0941 
0942     //Need to check workStarted_ before adding to waitingTasks_
0943     bool expected = false;
0944     bool workStarted = workStarted_.compare_exchange_strong(expected, true);
0945 
0946     waitingTasks_.add(task);
0947     if constexpr (T::isEvent_) {
0948       timesVisited_.fetch_add(1, std::memory_order_relaxed);
0949     }
0950 
0951     if (workStarted) {
0952       moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0953 
0954       //if have TriggerResults based selection we want to reject the event before doing prefetching
0955       if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
0956         //We need to run the selection in a different task so that
0957         // we can prefetch the data needed for the selection
0958         WaitingTask* moduleTask =
0959             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
0960 
0961         //make sure the task is either run or destroyed
0962         struct DestroyTask {
0963           DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
0964 
0965           ~DestroyTask() {
0966             auto p = m_task.exchange(nullptr);
0967             if (p) {
0968               TaskSentry s{p};
0969             }
0970           }
0971 
0972           edm::WaitingTask* release() { return m_task.exchange(nullptr); }
0973 
0974         private:
0975           std::atomic<edm::WaitingTask*> m_task;
0976         };
0977         if constexpr (T::isEvent_) {
0978           if (hasAcquire()) {
0979             auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
0980             ServiceWeakToken weakToken = token;
0981             auto* group = task.group();
0982             moduleTask = make_waiting_task(
0983                 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
0984                   WaitingTaskWithArenaHolder runTaskHolder(
0985                       *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
0986                   AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
0987                   t.execute();
0988                 });
0989           }
0990         }
0991         auto* group = task.group();
0992         auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
0993         ServiceWeakToken weakToken = token;
0994         auto selectionTask =
0995             make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
0996                                   std::exception_ptr const*) mutable {
0997               ServiceRegistry::Operate guard(weakToken.lock());
0998               prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
0999                                weakToken.lock(),
1000                                parentContext,
1001                                info,
1002                                T::transition_);
1003             });
1004         prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1005       } else {
1006         WaitingTask* moduleTask =
1007             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1008         auto group = task.group();
1009         if constexpr (T::isEvent_) {
1010           if (hasAcquire()) {
1011             WaitingTaskWithArenaHolder runTaskHolder(
1012                 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1013             moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1014           }
1015         }
1016         prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1017       }
1018     }
1019   }
1020 
1021   template <typename T>
1022   std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
1023                                                          typename T::TransitionInfoType const& transitionInfo,
1024                                                          StreamID streamID,
1025                                                          ParentContext const& parentContext,
1026                                                          typename T::Context const* context) {
1027     std::exception_ptr exceptionPtr;
1028     if (iEPtr) {
1029       assert(*iEPtr);
1030       if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1031         exceptionPtr = *iEPtr;
1032         setException<T::isEvent_>(exceptionPtr);
1033       } else {
1034         setPassed<T::isEvent_>();
1035       }
1036       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1037     } else {
1038       // Caught exception is propagated via WaitingTaskList
1039       CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1040         exceptionPtr = std::current_exception();
1041       }
1042     }
1043     waitingTasks_.doneWaiting(exceptionPtr);
1044     return exceptionPtr;
1045   }
1046 
1047   template <typename T>
1048   void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1049                                         typename T::TransitionInfoType const& transitionInfo,
1050                                         ServiceToken const& serviceToken,
1051                                         StreamID streamID,
1052                                         ParentContext const& parentContext,
1053                                         typename T::Context const* context) {
1054     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1055       return;
1056     }
1057 
1058     //Need to check workStarted_ before adding to waitingTasks_
1059     bool expected = false;
1060     auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1061 
1062     waitingTasks_.add(task);
1063     if (workStarted) {
1064       ServiceWeakToken weakToken = serviceToken;
1065       auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1066         std::exception_ptr exceptionPtr;
1067         // Caught exception is propagated via WaitingTaskList
1068         CMS_SA_ALLOW try {
1069           //Need to make the services available
1070           ServiceRegistry::Operate guard(weakToken.lock());
1071 
1072           this->runModule<T>(info, streamID, parentContext, context);
1073         } catch (...) {
1074           exceptionPtr = std::current_exception();
1075         }
1076         this->waitingTasks_.doneWaiting(exceptionPtr);
1077       };
1078 
1079       if (needsESPrefetching(T::transition_)) {
1080         auto group = task.group();
1081         auto afterPrefetch =
1082             edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1083               if (iExcept) {
1084                 this->waitingTasks_.doneWaiting(*iExcept);
1085               } else {
1086                 if (auto queue = this->serializeRunModule()) {
1087                   queue.push(*group, toDo);
1088                 } else {
1089                   group->run(toDo);
1090                 }
1091               }
1092             });
1093         esPrefetchAsync(
1094             WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1095       } else {
1096         auto group = task.group();
1097         if (auto queue = this->serializeRunModule()) {
1098           queue.push(*group, toDo);
1099         } else {
1100           group->run(toDo);
1101         }
1102       }
1103     }
1104   }
1105 
1106   template <typename T>
1107   bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1108                          StreamID streamID,
1109                          ParentContext const& parentContext,
1110                          typename T::Context const* context) {
1111     //unscheduled producers should advance this
1112     //if (T::isEvent_) {
1113     //  ++timesVisited_;
1114     //}
1115     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1116     if constexpr (T::isEvent_) {
1117       timesRun_.fetch_add(1, std::memory_order_relaxed);
1118     }
1119 
1120     bool rc = true;
1121     try {
1122       convertException::wrap([&]() {
1123         rc = workerhelper::CallImpl<T>::call(
1124             this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1125 
1126         if (rc) {
1127           setPassed<T::isEvent_>();
1128         } else {
1129           setFailed<T::isEvent_>();
1130         }
1131       });
1132     } catch (cms::Exception& ex) {
1133       edm::exceptionContext(ex, moduleCallingContext_);
1134       if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1135         assert(not cached_exception_);
1136         setException<T::isEvent_>(std::current_exception());
1137         std::rethrow_exception(cached_exception_);
1138       } else {
1139         rc = setPassed<T::isEvent_>();
1140       }
1141     }
1142 
1143     return rc;
1144   }
1145 
1146   template <typename T>
1147   std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1148                                                StreamID streamID,
1149                                                ParentContext const& parentContext,
1150                                                typename T::Context const* context) {
1151     timesVisited_.fetch_add(1, std::memory_order_relaxed);
1152     std::exception_ptr const* prefetchingException = nullptr;  // null because there was no prefetching to do
1153     return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1154   }
1155 }  // namespace edm
1156 #endif