Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:01

0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 Worker: this is a basic scheduling unit - an abstract base class to
0007 something that is really a producer or filter.
0008 
0009 A worker will not actually call through to the module unless it is
0010 in a Ready state.  After a module is actually run, the state will not
0011 be Ready.  The Ready state can only be reestablished by doing a reset().
0012 
0013 Pre/post module signals are posted only in the Ready state.
0014 
0015 Execution statistics are kept here.
0016 
0017 If a module has thrown an exception during execution, that exception
0018 will be rethrown if the worker is entered again and the state is not Ready.
0019 In other words, execution results (status) are cached and reused until
0020 the worker is reset().
0021 
0022 ----------------------------------------------------------------------*/
0023 
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/maker/ModuleSignalSentry.h"
0030 #include "FWCore/Framework/interface/ExceptionActions.h"
0031 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0032 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0033 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0034 #include "FWCore/Framework/interface/ModuleConsumesMinimalESInfo.h"
0035 #include "FWCore/Concurrency/interface/WaitingTask.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0037 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0038 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0039 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0040 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0041 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0043 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0046 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0047 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0048 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0049 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0050 #include "FWCore/Concurrency/interface/FunctorTask.h"
0051 #include "FWCore/Utilities/interface/Exception.h"
0052 #include "FWCore/Utilities/interface/ConvertException.h"
0053 #include "FWCore/Utilities/interface/BranchType.h"
0054 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0055 #include "FWCore/Utilities/interface/StreamID.h"
0056 #include "FWCore/Utilities/interface/propagate_const.h"
0057 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0058 #include "FWCore/Utilities/interface/ESIndices.h"
0059 #include "FWCore/Utilities/interface/Transition.h"
0060 
0061 #include "FWCore/Framework/interface/Frameworkfwd.h"
0062 
0063 #include <array>
0064 #include <atomic>
0065 #include <cassert>
0066 #include <map>
0067 #include <memory>
0068 #include <sstream>
0069 #include <string>
0070 #include <vector>
0071 #include <exception>
0072 #include <unordered_map>
0073 
0074 namespace edm {
0075   class EventPrincipal;
0076   class EventSetupImpl;
0077   class EarlyDeleteHelper;
0078   class ProductResolverIndexHelper;
0079   class ProductResolverIndexAndSkipBit;
0080   class ProductRegistry;
0081   class ThinnedAssociationsHelper;
0082 
0083   namespace workerhelper {
0084     template <typename O>
0085     class CallImpl;
0086   }
0087   namespace eventsetup {
0088     struct ComponentDescription;
0089     class ESRecordsToProductResolverIndices;
0090   }  // namespace eventsetup
0091 
0092   class Worker {
0093   public:
0094     enum State { Ready, Pass, Fail, Exception };
0095     enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096     enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0097     struct TaskQueueAdaptor {
0098       SerialTaskQueueChain* serial_ = nullptr;
0099       LimitedTaskQueue* limited_ = nullptr;
0100 
0101       TaskQueueAdaptor() = default;
0102       TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103       TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104 
0105       operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106 
0107       template <class F>
0108       void push(oneapi::tbb::task_group& iG, F&& iF) {
0109         if (serial_) {
0110           serial_->push(iG, iF);
0111         } else {
0112           limited_->push(iG, iF);
0113         }
0114       }
0115     };
0116 
0117     Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118     virtual ~Worker();
0119 
0120     Worker(Worker const&) = delete;             // Disallow copying and moving
0121     Worker& operator=(Worker const&) = delete;  // Disallow copying and moving
0122 
0123     void clearModule() {
0124       moduleValid_ = false;
0125       doClearModule();
0126     }
0127 
0128     virtual bool wantsProcessBlocks() const noexcept = 0;
0129     virtual bool wantsInputProcessBlocks() const noexcept = 0;
0130     virtual bool wantsGlobalRuns() const noexcept = 0;
0131     virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0132     virtual bool wantsStreamRuns() const noexcept = 0;
0133     virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0134 
0135     //returns non-nullptr if the module can only process one Run at a time
0136     virtual SerialTaskQueue* globalRunsQueue() = 0;
0137     //returns non-nullptr if the module can only process one LuminosityBlock at a time
0138     virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0139 
0140     //Run only for OutputModules. Causes TriggerResults information to be used to decide if module should run.
0141     void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0142                                    WaitingTask* task,
0143                                    ServiceToken const&,
0144                                    StreamID stream,
0145                                    EventPrincipal const*) noexcept;
0146 
0147     void prePrefetchSelectionAsync(
0148         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0149       assert(false);
0150     }
0151 
0152     template <typename T>
0153     void doWorkAsync(WaitingTaskHolder,
0154                      typename T::TransitionInfoType const&,
0155                      ServiceToken const&,
0156                      StreamID,
0157                      ParentContext const&,
0158                      typename T::Context const*) noexcept;
0159 
0160     template <typename T>
0161     void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0162                                   typename T::TransitionInfoType const&,
0163                                   ServiceToken const&,
0164                                   StreamID,
0165                                   ParentContext const&,
0166                                   typename T::Context const*) noexcept;
0167 
0168     template <typename T>
0169     std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0170                                          StreamID,
0171                                          ParentContext const&,
0172                                          typename T::Context const*) noexcept;
0173 
0174     virtual size_t transformIndex(edm::ProductDescription const&) const noexcept = 0;
0175     void doTransformAsync(WaitingTaskHolder,
0176                           size_t iTransformIndex,
0177                           EventPrincipal const&,
0178                           ServiceToken const&,
0179                           StreamID,
0180                           ModuleCallingContext const&,
0181                           StreamContext const*) noexcept;
0182 
0183     void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0184     // Called if filter earlier in the path has failed.
0185     void skipOnPath(EventPrincipal const& iEvent);
0186 
0187     void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0188     void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0189     void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0190 
0191     void reset() {
0192       cached_exception_ = std::exception_ptr();
0193       state_ = Ready;
0194       waitingTasks_.reset();
0195       workStarted_ = false;
0196       numberOfPathsLeftToRun_ = numberOfPathsOn_;
0197     }
0198 
0199     void postDoEvent(EventPrincipal const&);
0200 
0201     ModuleDescription const* description() const noexcept {
0202       if (moduleValid_) {
0203         return moduleCallingContext_.moduleDescription();
0204       }
0205       return nullptr;
0206     }
0207     ///The signals are required to live longer than the last call to 'doWork'
0208     /// this was done to improve performance based on profiling
0209     void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0210 
0211     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0212 
0213     virtual std::vector<ModuleConsumesInfo> moduleConsumesInfos() const = 0;
0214     virtual std::vector<ModuleConsumesMinimalESInfo> moduleConsumesMinimalESInfos() const = 0;
0215 
0216     virtual Types moduleType() const = 0;
0217     virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0218 
0219     void clearCounters() noexcept {
0220       timesRun_.store(0, std::memory_order_release);
0221       timesVisited_.store(0, std::memory_order_release);
0222       timesPassed_.store(0, std::memory_order_release);
0223       timesFailed_.store(0, std::memory_order_release);
0224       timesExcept_.store(0, std::memory_order_release);
0225     }
0226 
0227     void addedToPath() noexcept { ++numberOfPathsOn_; }
0228     //NOTE: calling state() is done to force synchronization across threads
0229     int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0230     int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0231     int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0232     int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0233     int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0234     State state() const noexcept { return state_; }
0235 
0236     int timesPass() const noexcept { return timesPassed(); }  // for backward compatibility only - to be removed soon
0237 
0238     virtual bool hasAccumulator() const noexcept = 0;
0239 
0240     virtual bool matchesBaseClassPointer(void const* iPtr) const noexcept = 0;
0241     // Used in PuttableProductResolver
0242     edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0243 
0244   protected:
0245     template <typename O>
0246     friend class workerhelper::CallImpl;
0247 
0248     virtual void doClearModule() = 0;
0249 
0250     virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0251 
0252     virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0253     virtual bool implNeedToRunSelection() const noexcept = 0;
0254 
0255     virtual void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) = 0;
0256 
0257     virtual void implDoTransformAsync(WaitingTaskHolder,
0258                                       size_t iTransformIndex,
0259                                       EventPrincipal const&,
0260                                       ParentContext const&,
0261                                       ServiceWeakToken const&) noexcept = 0;
0262     virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0263 
0264     virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0265     virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0266     virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0267     virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0268     virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0269     virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0270     virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0271     virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0272     virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0273     virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0274     virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0275     virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0276 
0277     void resetModuleDescription(ModuleDescription const*);
0278 
0279     ActivityRegistry* activityRegistry() { return actReg_.get(); }
0280 
0281   private:
0282     template <typename T>
0283     bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0284 
0285     virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0286     virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0287 
0288     virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0289 
0290     virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0291     virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0292 
0293     virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0294                                               ModuleCallingContext const& moduleCallingContext,
0295                                               Principal const& iPrincipal) const noexcept = 0;
0296 
0297     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0298     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0299     virtual void implRespondToCloseOutputFile() = 0;
0300 
0301     virtual TaskQueueAdaptor serializeRunModule() = 0;
0302 
0303     bool shouldRethrowException(std::exception_ptr iPtr,
0304                                 ParentContext const& parentContext,
0305                                 bool isEvent,
0306                                 bool isTryToContinue) const noexcept;
0307     void checkForShouldTryToContinue(ModuleDescription const&);
0308 
0309     template <bool IS_EVENT>
0310     bool setPassed() {
0311       if (IS_EVENT) {
0312         timesPassed_.fetch_add(1, std::memory_order_relaxed);
0313       }
0314       state_ = Pass;
0315       return true;
0316     }
0317 
0318     template <bool IS_EVENT>
0319     bool setFailed() {
0320       if (IS_EVENT) {
0321         timesFailed_.fetch_add(1, std::memory_order_relaxed);
0322       }
0323       state_ = Fail;
0324       return false;
0325     }
0326 
0327     template <bool IS_EVENT>
0328     std::exception_ptr setException(std::exception_ptr iException) {
0329       if (IS_EVENT) {
0330         timesExcept_.fetch_add(1, std::memory_order_relaxed);
0331       }
0332       cached_exception_ = iException;  // propagate_const<T> has no reset() function
0333       state_ = Exception;
0334       return cached_exception_;
0335     }
0336 
0337     template <typename T>
0338     void prefetchAsync(WaitingTaskHolder,
0339                        ServiceToken const&,
0340                        ParentContext const&,
0341                        typename T::TransitionInfoType const&,
0342                        Transition) noexcept;
0343 
0344     void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0345     void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0346 
0347     bool needsESPrefetching(Transition iTrans) const noexcept {
0348       return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0349     }
0350 
0351     void emitPostModuleEventPrefetchingSignal() {
0352       actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0353     }
0354 
0355     void emitPostModuleStreamPrefetchingSignal() {
0356       actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0357                                                        moduleCallingContext_);
0358     }
0359 
0360     void emitPostModuleGlobalPrefetchingSignal() {
0361       actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0362                                                        moduleCallingContext_);
0363     }
0364 
0365     virtual bool hasAcquire() const noexcept = 0;
0366 
0367     template <typename T>
0368     std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0369                                                    typename T::TransitionInfoType const&,
0370                                                    StreamID,
0371                                                    ParentContext const&,
0372                                                    typename T::Context const*) noexcept;
0373 
0374     // runAcquire() must take a copy of WaitingTaskHolder
0375     // see comment in runAcquireAfterAsyncPrefetch() definition
0376     void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder);
0377 
0378     void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0379                                       EventTransitionInfo const&,
0380                                       ParentContext const&,
0381                                       WaitingTaskHolder) noexcept;
0382 
0383     std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0384                                                    ParentContext const& parentContext) noexcept;
0385 
0386     template <typename T>
0387     class RunModuleTask : public WaitingTask {
0388     public:
0389       RunModuleTask(Worker* worker,
0390                     typename T::TransitionInfoType const& transitionInfo,
0391                     ServiceToken const& token,
0392                     StreamID streamID,
0393                     ParentContext const& parentContext,
0394                     typename T::Context const* context,
0395                     oneapi::tbb::task_group* iGroup) noexcept
0396           : m_worker(worker),
0397             m_transitionInfo(transitionInfo),
0398             m_streamID(streamID),
0399             m_parentContext(parentContext),
0400             m_context(context),
0401             m_serviceToken(token),
0402             m_group(iGroup) {}
0403 
0404       struct EnableQueueGuard {
0405         SerialTaskQueue* queue_;
0406         EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0407         EnableQueueGuard(EnableQueueGuard const&) = delete;
0408         EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0409         EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0410         EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0411         ~EnableQueueGuard() {
0412           if (queue_) {
0413             queue_->resume();
0414           }
0415         }
0416       };
0417 
0418       void execute() final {
0419         //Need to make the services available early so other services can see them
0420         ServiceRegistry::Operate guard(m_serviceToken.lock());
0421 
0422         //incase the emit causes an exception, we need a memory location
0423         // to hold the exception_ptr
0424         std::exception_ptr temp_excptr;
0425         auto excptr = exceptionPtr();
0426         if constexpr (T::isEvent_) {
0427           if (!m_worker->hasAcquire()) {
0428             // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
0429             CMS_SA_ALLOW try {
0430               //pre was called in prefetchAsync
0431               m_worker->emitPostModuleEventPrefetchingSignal();
0432             } catch (...) {
0433               temp_excptr = std::current_exception();
0434               if (not excptr) {
0435                 excptr = temp_excptr;
0436               }
0437             }
0438           }
0439         } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0440           m_worker->emitPostModuleStreamPrefetchingSignal();
0441         } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0442           m_worker->emitPostModuleGlobalPrefetchingSignal();
0443         }
0444 
0445         if (not excptr) {
0446           if (auto queue = m_worker->serializeRunModule()) {
0447             auto f = [worker = m_worker,
0448                       info = m_transitionInfo,
0449                       streamID = m_streamID,
0450                       parentContext = m_parentContext,
0451                       sContext = m_context,
0452                       serviceToken = m_serviceToken]() {
0453               //Need to make the services available
0454               ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0455 
0456               //If needed, we pause the queue in begin transition and resume it
0457               // at the end transition. This can guarantee that the module
0458               // only processes one run or lumi at a time
0459               EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0460               std::exception_ptr ptr;
0461               worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0462             };
0463             //keep another global transition from running if necessary
0464             auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0465             if (gQueue) {
0466               gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0467                 gQueue->pause();
0468                 queue.push(*group, std::move(f));
0469               });
0470             } else {
0471               queue.push(*m_group, std::move(f));
0472             }
0473             return;
0474           }
0475         }
0476 
0477         m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0478       }
0479 
0480     private:
0481       Worker* m_worker;
0482       typename T::TransitionInfoType m_transitionInfo;
0483       StreamID m_streamID;
0484       ParentContext const m_parentContext;
0485       typename T::Context const* m_context;
0486       ServiceWeakToken m_serviceToken;
0487       oneapi::tbb::task_group* m_group;
0488     };
0489 
0490     // AcquireTask is only used for the Event case, but we define
0491     // it as a template so all cases will compile.
0492     // DUMMY exists to work around the C++ Standard prohibition on
0493     // fully specializing templates nested in other classes.
0494     template <typename T, typename DUMMY = void>
0495     class AcquireTask : public WaitingTask {
0496     public:
0497       AcquireTask(Worker*,
0498                   typename T::TransitionInfoType const&,
0499                   ServiceToken const&,
0500                   ParentContext const&,
0501                   WaitingTaskHolder) noexcept {}
0502       void execute() final {}
0503     };
0504 
0505     template <typename DUMMY>
0506     class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0507     public:
0508       AcquireTask(Worker* worker,
0509                   EventTransitionInfo const& eventTransitionInfo,
0510                   ServiceToken const& token,
0511                   ParentContext const& parentContext,
0512                   WaitingTaskHolder holder) noexcept
0513           : m_worker(worker),
0514             m_eventTransitionInfo(eventTransitionInfo),
0515             m_parentContext(parentContext),
0516             m_holder(std::move(holder)),
0517             m_serviceToken(token) {}
0518 
0519       void execute() final {
0520         //Need to make the services available early so other services can see them
0521         ServiceRegistry::Operate guard(m_serviceToken.lock());
0522 
0523         //incase the emit causes an exception, we need a memory location
0524         // to hold the exception_ptr
0525         std::exception_ptr temp_excptr;
0526         auto excptr = exceptionPtr();
0527         // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskHolder
0528         CMS_SA_ALLOW try {
0529           //pre was called in prefetchAsync
0530           m_worker->emitPostModuleEventPrefetchingSignal();
0531         } catch (...) {
0532           temp_excptr = std::current_exception();
0533           if (not excptr) {
0534             excptr = temp_excptr;
0535           }
0536         }
0537 
0538         if (not excptr) {
0539           if (auto queue = m_worker->serializeRunModule()) {
0540             queue.push(*m_holder.group(),
0541                        [worker = m_worker,
0542                         info = m_eventTransitionInfo,
0543                         parentContext = m_parentContext,
0544                         serviceToken = m_serviceToken,
0545                         holder = std::move(m_holder)]() mutable {
0546                          //Need to make the services available
0547                          ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0548 
0549                          std::exception_ptr ptr;
0550                          worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, std::move(holder));
0551                        });
0552             return;
0553           }
0554         }
0555 
0556         m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0557       }
0558 
0559     private:
0560       Worker* m_worker;
0561       EventTransitionInfo m_eventTransitionInfo;
0562       ParentContext const m_parentContext;
0563       WaitingTaskHolder m_holder;
0564       ServiceWeakToken m_serviceToken;
0565     };
0566 
0567     // This class does nothing unless there is an exception originating
0568     // in an "External Worker". In that case, it handles converting the
0569     // exception to a CMS exception and adding context to the exception.
0570     class HandleExternalWorkExceptionTask : public WaitingTask {
0571     public:
0572       HandleExternalWorkExceptionTask(Worker* worker,
0573                                       oneapi::tbb::task_group* group,
0574                                       WaitingTask* runModuleTask,
0575                                       ParentContext const& parentContext) noexcept;
0576 
0577       void execute() final;
0578 
0579     private:
0580       Worker* m_worker;
0581       WaitingTask* m_runModuleTask;
0582       oneapi::tbb::task_group* m_group;
0583       ParentContext const m_parentContext;
0584     };
0585 
0586     std::atomic<int> timesRun_;
0587     std::atomic<int> timesVisited_;
0588     std::atomic<int> timesPassed_;
0589     std::atomic<int> timesFailed_;
0590     std::atomic<int> timesExcept_;
0591     std::atomic<State> state_;
0592     int numberOfPathsOn_;
0593     std::atomic<int> numberOfPathsLeftToRun_;
0594 
0595     ModuleCallingContext moduleCallingContext_;
0596 
0597     ExceptionToActionTable const* actions_;                         // memory assumed to be managed elsewhere
0598     CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;  // if state is 'exception'
0599 
0600     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0601 
0602     edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0603 
0604     edm::WaitingTaskList waitingTasks_;
0605     std::atomic<bool> workStarted_;
0606     bool ranAcquireWithoutException_;
0607     bool moduleValid_ = true;
0608     bool shouldTryToContinue_ = false;
0609     bool beginSucceeded_ = false;
0610   };
0611   namespace workerhelper {
0612     template <>
0613     class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0614     public:
0615       typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0616       static bool call(Worker* iWorker,
0617                        StreamID,
0618                        EventTransitionInfo const& info,
0619                        ActivityRegistry* /* actReg */,
0620                        ModuleCallingContext const* mcc,
0621                        Arg::Context const* /* context*/) {
0622         //Signal sentry is handled by the module
0623         return iWorker->implDo(info, mcc);
0624       }
0625       static void esPrefetchAsync(Worker* worker,
0626                                   WaitingTaskHolder waitingTask,
0627                                   ServiceToken const& token,
0628                                   EventTransitionInfo const& info,
0629                                   Transition transition) noexcept {
0630         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0631       }
0632       static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0633       static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0634 
0635       static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0636       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0637     };
0638 
0639     template <>
0640     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0641     public:
0642       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0643       static bool call(Worker* iWorker,
0644                        StreamID,
0645                        RunTransitionInfo const& info,
0646                        ActivityRegistry* actReg,
0647                        ModuleCallingContext const* mcc,
0648                        Arg::Context const* context) {
0649         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0650         // If preModuleSignal() throws, implDoBegin() is not called, and the
0651         // cpp destructor calls postModuleSignal (ignoring additional exceptions)
0652         cpp.preModuleSignal();
0653         // If implDoBegin() throws, the cpp destructor calls postModuleSignal
0654         // (ignoring additional exceptions)
0655         auto returnValue = iWorker->implDoBegin(info, mcc);
0656         // If postModuleSignal() throws, the exception will propagate to the framework
0657         cpp.postModuleSignal();
0658         iWorker->beginSucceeded_ = true;
0659         return returnValue;
0660       }
0661       static void esPrefetchAsync(Worker* worker,
0662                                   WaitingTaskHolder waitingTask,
0663                                   ServiceToken const& token,
0664                                   RunTransitionInfo const& info,
0665                                   Transition transition) noexcept {
0666         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0667       }
0668       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0669       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0670       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0671       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0672     };
0673     template <>
0674     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0675     public:
0676       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0677       static bool call(Worker* iWorker,
0678                        StreamID id,
0679                        RunTransitionInfo const& info,
0680                        ActivityRegistry* actReg,
0681                        ModuleCallingContext const* mcc,
0682                        Arg::Context const* context) {
0683         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0684         cpp.preModuleSignal();
0685         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0686         cpp.postModuleSignal();
0687         iWorker->beginSucceeded_ = true;
0688         return returnValue;
0689       }
0690       static void esPrefetchAsync(Worker* worker,
0691                                   WaitingTaskHolder waitingTask,
0692                                   ServiceToken const& token,
0693                                   RunTransitionInfo const& info,
0694                                   Transition transition) noexcept {
0695         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0696       }
0697       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0698       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0699       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0700       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0701     };
0702     template <>
0703     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0704     public:
0705       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0706       static bool call(Worker* iWorker,
0707                        StreamID,
0708                        RunTransitionInfo const& info,
0709                        ActivityRegistry* actReg,
0710                        ModuleCallingContext const* mcc,
0711                        Arg::Context const* context) {
0712         if (iWorker->beginSucceeded_) {
0713           iWorker->beginSucceeded_ = false;
0714 
0715           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0716           cpp.preModuleSignal();
0717           auto returnValue = iWorker->implDoEnd(info, mcc);
0718           cpp.postModuleSignal();
0719           return returnValue;
0720         }
0721         return true;
0722       }
0723       static void esPrefetchAsync(Worker* worker,
0724                                   WaitingTaskHolder waitingTask,
0725                                   ServiceToken const& token,
0726                                   RunTransitionInfo const& info,
0727                                   Transition transition) noexcept {
0728         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0729       }
0730       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0731       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0732       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0733       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0734     };
0735     template <>
0736     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0737     public:
0738       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0739       static bool call(Worker* iWorker,
0740                        StreamID id,
0741                        RunTransitionInfo const& info,
0742                        ActivityRegistry* actReg,
0743                        ModuleCallingContext const* mcc,
0744                        Arg::Context const* context) {
0745         if (iWorker->beginSucceeded_) {
0746           iWorker->beginSucceeded_ = false;
0747 
0748           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0749           cpp.preModuleSignal();
0750           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0751           cpp.postModuleSignal();
0752           return returnValue;
0753         }
0754         return true;
0755       }
0756       static void esPrefetchAsync(Worker* worker,
0757                                   WaitingTaskHolder waitingTask,
0758                                   ServiceToken const& token,
0759                                   RunTransitionInfo const& info,
0760                                   Transition transition) noexcept {
0761         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0762       }
0763       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0764       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0765       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0766       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0767     };
0768 
0769     template <>
0770     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0771     public:
0772       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0773       static bool call(Worker* iWorker,
0774                        StreamID,
0775                        LumiTransitionInfo const& info,
0776                        ActivityRegistry* actReg,
0777                        ModuleCallingContext const* mcc,
0778                        Arg::Context const* context) {
0779         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0780         cpp.preModuleSignal();
0781         auto returnValue = iWorker->implDoBegin(info, mcc);
0782         cpp.postModuleSignal();
0783         iWorker->beginSucceeded_ = true;
0784         return returnValue;
0785       }
0786       static void esPrefetchAsync(Worker* worker,
0787                                   WaitingTaskHolder waitingTask,
0788                                   ServiceToken const& token,
0789                                   LumiTransitionInfo const& info,
0790                                   Transition transition) noexcept {
0791         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0792       }
0793       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0794       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0795       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0796         return iWorker->globalLuminosityBlocksQueue();
0797       }
0798       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0799     };
0800     template <>
0801     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0802     public:
0803       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0804       static bool call(Worker* iWorker,
0805                        StreamID id,
0806                        LumiTransitionInfo const& info,
0807                        ActivityRegistry* actReg,
0808                        ModuleCallingContext const* mcc,
0809                        Arg::Context const* context) {
0810         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0811         cpp.preModuleSignal();
0812         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0813         cpp.postModuleSignal();
0814         iWorker->beginSucceeded_ = true;
0815         return returnValue;
0816       }
0817       static void esPrefetchAsync(Worker* worker,
0818                                   WaitingTaskHolder waitingTask,
0819                                   ServiceToken const& token,
0820                                   LumiTransitionInfo const& info,
0821                                   Transition transition) noexcept {
0822         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0823       }
0824       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0825       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0826       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0827       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0828     };
0829 
0830     template <>
0831     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0832     public:
0833       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0834       static bool call(Worker* iWorker,
0835                        StreamID,
0836                        LumiTransitionInfo const& info,
0837                        ActivityRegistry* actReg,
0838                        ModuleCallingContext const* mcc,
0839                        Arg::Context const* context) {
0840         if (iWorker->beginSucceeded_) {
0841           iWorker->beginSucceeded_ = false;
0842 
0843           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0844           cpp.preModuleSignal();
0845           auto returnValue = iWorker->implDoEnd(info, mcc);
0846           cpp.postModuleSignal();
0847           return returnValue;
0848         }
0849         return true;
0850       }
0851       static void esPrefetchAsync(Worker* worker,
0852                                   WaitingTaskHolder waitingTask,
0853                                   ServiceToken const& token,
0854                                   LumiTransitionInfo const& info,
0855                                   Transition transition) noexcept {
0856         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0857       }
0858       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0859       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0860       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0861       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0862         return iWorker->globalLuminosityBlocksQueue();
0863       }
0864     };
0865     template <>
0866     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0867     public:
0868       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0869       static bool call(Worker* iWorker,
0870                        StreamID id,
0871                        LumiTransitionInfo const& info,
0872                        ActivityRegistry* actReg,
0873                        ModuleCallingContext const* mcc,
0874                        Arg::Context const* context) {
0875         if (iWorker->beginSucceeded_) {
0876           iWorker->beginSucceeded_ = false;
0877 
0878           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0879           cpp.preModuleSignal();
0880           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0881           cpp.postModuleSignal();
0882           return returnValue;
0883         }
0884         return true;
0885       }
0886       static void esPrefetchAsync(Worker* worker,
0887                                   WaitingTaskHolder waitingTask,
0888                                   ServiceToken const& token,
0889                                   LumiTransitionInfo const& info,
0890                                   Transition transition) noexcept {
0891         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0892       }
0893       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0894       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0895       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0896       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0897     };
0898     template <>
0899     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0900     public:
0901       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0902       static bool call(Worker* iWorker,
0903                        StreamID,
0904                        ProcessBlockTransitionInfo const& info,
0905                        ActivityRegistry* actReg,
0906                        ModuleCallingContext const* mcc,
0907                        Arg::Context const* context) {
0908         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0909         cpp.preModuleSignal();
0910         auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0911         cpp.postModuleSignal();
0912         iWorker->beginSucceeded_ = true;
0913         return returnValue;
0914       }
0915       static void esPrefetchAsync(
0916           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0917       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0918       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0919       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0920       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0921     };
0922     template <>
0923     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0924     public:
0925       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0926       static bool call(Worker* iWorker,
0927                        StreamID,
0928                        ProcessBlockTransitionInfo const& info,
0929                        ActivityRegistry* actReg,
0930                        ModuleCallingContext const* mcc,
0931                        Arg::Context const* context) {
0932         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0933         cpp.preModuleSignal();
0934         auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0935         cpp.postModuleSignal();
0936         return returnValue;
0937       }
0938       static void esPrefetchAsync(
0939           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0940       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
0941       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0942       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0943       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0944     };
0945     template <>
0946     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
0947     public:
0948       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0949       static bool call(Worker* iWorker,
0950                        StreamID,
0951                        ProcessBlockTransitionInfo const& info,
0952                        ActivityRegistry* actReg,
0953                        ModuleCallingContext const* mcc,
0954                        Arg::Context const* context) {
0955         if (iWorker->beginSucceeded_) {
0956           iWorker->beginSucceeded_ = false;
0957 
0958           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0959           cpp.preModuleSignal();
0960           auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
0961           cpp.postModuleSignal();
0962           return returnValue;
0963         }
0964         return true;
0965       }
0966       static void esPrefetchAsync(
0967           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0968       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0969       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0970       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0971       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0972     };
0973   }  // namespace workerhelper
0974 
0975   template <typename T>
0976   void Worker::prefetchAsync(WaitingTaskHolder iTask,
0977                              ServiceToken const& token,
0978                              ParentContext const& parentContext,
0979                              typename T::TransitionInfoType const& transitionInfo,
0980                              Transition iTransition) noexcept {
0981     Principal const& principal = transitionInfo.principal();
0982 
0983     moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0984 
0985     if constexpr (T::isEvent_) {
0986       actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0987     } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0988       actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0989     } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0990       actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
0991     }
0992 
0993     workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
0994     edPrefetchAsync(iTask, token, principal);
0995 
0996     if (principal.branchType() == InEvent) {
0997       preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
0998     }
0999   }
1000 
1001   template <typename T>
1002   void Worker::doWorkAsync(WaitingTaskHolder task,
1003                            typename T::TransitionInfoType const& transitionInfo,
1004                            ServiceToken const& token,
1005                            StreamID streamID,
1006                            ParentContext const& parentContext,
1007                            typename T::Context const* context) noexcept {
1008     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1009       return;
1010     }
1011 
1012     //Need to check workStarted_ before adding to waitingTasks_
1013     bool expected = false;
1014     bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1015 
1016     waitingTasks_.add(task);
1017     if constexpr (T::isEvent_) {
1018       timesVisited_.fetch_add(1, std::memory_order_relaxed);
1019     }
1020 
1021     if (workStarted) {
1022       moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1023 
1024       //if have TriggerResults based selection we want to reject the event before doing prefetching
1025       if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1026         //We need to run the selection in a different task so that
1027         // we can prefetch the data needed for the selection
1028         WaitingTask* moduleTask =
1029             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1030 
1031         //make sure the task is either run or destroyed
1032         struct DestroyTask {
1033           DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1034 
1035           ~DestroyTask() noexcept {
1036             auto p = m_task.exchange(nullptr);
1037             if (p) {
1038               TaskSentry s{p};
1039             }
1040           }
1041 
1042           edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1043 
1044         private:
1045           std::atomic<edm::WaitingTask*> m_task;
1046         };
1047         if constexpr (T::isEvent_) {
1048           if (hasAcquire()) {
1049             auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1050             ServiceWeakToken weakToken = token;
1051             auto* group = task.group();
1052             moduleTask = make_waiting_task(
1053                 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1054                   WaitingTaskHolder runTaskHolder(
1055                       *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1056                   AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1057                   t.execute();
1058                 });
1059           }
1060         }
1061         auto* group = task.group();
1062         auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1063         ServiceWeakToken weakToken = token;
1064         auto selectionTask =
1065             make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1066                                   std::exception_ptr const*) mutable {
1067               ServiceRegistry::Operate guard(weakToken.lock());
1068               prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1069                                weakToken.lock(),
1070                                parentContext,
1071                                info,
1072                                T::transition_);
1073             });
1074         prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1075       } else {
1076         WaitingTask* moduleTask =
1077             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1078         auto group = task.group();
1079         if constexpr (T::isEvent_) {
1080           if (hasAcquire()) {
1081             WaitingTaskHolder runTaskHolder(
1082                 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1083             moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1084           }
1085         }
1086         prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1087       }
1088     }
1089   }
1090 
1091   template <typename T>
1092   std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1093                                                          typename T::TransitionInfoType const& transitionInfo,
1094                                                          StreamID streamID,
1095                                                          ParentContext const& parentContext,
1096                                                          typename T::Context const* context) noexcept {
1097     std::exception_ptr exceptionPtr;
1098     bool shouldRun = true;
1099     if (iEPtr) {
1100       if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1101         exceptionPtr = iEPtr;
1102         setException<T::isEvent_>(exceptionPtr);
1103         shouldRun = false;
1104       } else {
1105         if (not shouldTryToContinue_) {
1106           setPassed<T::isEvent_>();
1107           shouldRun = false;
1108         }
1109       }
1110     }
1111     if (shouldRun) {
1112       // Caught exception is propagated via WaitingTaskList
1113       CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1114         exceptionPtr = std::current_exception();
1115       }
1116     } else {
1117       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1118     }
1119     waitingTasks_.doneWaiting(exceptionPtr);
1120     return exceptionPtr;
1121   }
1122 
1123   template <typename T>
1124   void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1125                                         typename T::TransitionInfoType const& transitionInfo,
1126                                         ServiceToken const& serviceToken,
1127                                         StreamID streamID,
1128                                         ParentContext const& parentContext,
1129                                         typename T::Context const* context) noexcept {
1130     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1131       return;
1132     }
1133 
1134     //Need to check workStarted_ before adding to waitingTasks_
1135     bool expected = false;
1136     auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1137 
1138     waitingTasks_.add(task);
1139     if (workStarted) {
1140       ServiceWeakToken weakToken = serviceToken;
1141       auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1142         std::exception_ptr exceptionPtr;
1143         // Caught exception is propagated via WaitingTaskList
1144         CMS_SA_ALLOW try {
1145           //Need to make the services available
1146           ServiceRegistry::Operate guard(weakToken.lock());
1147 
1148           this->runModule<T>(info, streamID, parentContext, context);
1149         } catch (...) {
1150           exceptionPtr = std::current_exception();
1151         }
1152         this->waitingTasks_.doneWaiting(exceptionPtr);
1153       };
1154 
1155       if (needsESPrefetching(T::transition_)) {
1156         auto group = task.group();
1157         auto afterPrefetch =
1158             edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1159               if (iExcept) {
1160                 this->waitingTasks_.doneWaiting(*iExcept);
1161               } else {
1162                 if (auto queue = this->serializeRunModule()) {
1163                   queue.push(*group, toDo);
1164                 } else {
1165                   group->run(toDo);
1166                 }
1167               }
1168             });
1169         moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1170         esPrefetchAsync(
1171             WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1172       } else {
1173         auto group = task.group();
1174         if (auto queue = this->serializeRunModule()) {
1175           queue.push(*group, toDo);
1176         } else {
1177           group->run(toDo);
1178         }
1179       }
1180     }
1181   }
1182 
1183   template <typename T>
1184   bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1185                          StreamID streamID,
1186                          ParentContext const& parentContext,
1187                          typename T::Context const* context) {
1188     //unscheduled producers should advance this
1189     //if (T::isEvent_) {
1190     //  ++timesVisited_;
1191     //}
1192     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1193     if constexpr (T::isEvent_) {
1194       timesRun_.fetch_add(1, std::memory_order_relaxed);
1195     }
1196 
1197     bool rc = true;
1198     try {
1199       convertException::wrap([&]() {
1200         rc = workerhelper::CallImpl<T>::call(
1201             this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1202 
1203         if (rc) {
1204           setPassed<T::isEvent_>();
1205         } else {
1206           setFailed<T::isEvent_>();
1207         }
1208       });
1209     } catch (cms::Exception& ex) {
1210       edm::exceptionContext(ex, moduleCallingContext_);
1211       if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1212         assert(not cached_exception_);
1213         setException<T::isEvent_>(std::current_exception());
1214         std::rethrow_exception(cached_exception_);
1215       } else {
1216         rc = setPassed<T::isEvent_>();
1217       }
1218     }
1219 
1220     return rc;
1221   }
1222 
1223   template <typename T>
1224   std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1225                                                StreamID streamID,
1226                                                ParentContext const& parentContext,
1227                                                typename T::Context const* context) noexcept {
1228     timesVisited_.fetch_add(1, std::memory_order_relaxed);
1229     std::exception_ptr prefetchingException;  // null because there was no prefetching to do
1230     return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1231   }
1232 }  // namespace edm
1233 #endif