Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-20 01:53:17

0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 Worker: this is a basic scheduling unit - an abstract base class to
0007 something that is really a producer or filter.
0008 
0009 A worker will not actually call through to the module unless it is
0010 in a Ready state.  After a module is actually run, the state will not
0011 be Ready.  The Ready state can only be reestablished by doing a reset().
0012 
0013 Pre/post module signals are posted only in the Ready state.
0014 
0015 Execution statistics are kept here.
0016 
0017 If a module has thrown an exception during execution, that exception
0018 will be rethrown if the worker is entered again and the state is not Ready.
0019 In other words, execution results (status) are cached and reused until
0020 the worker is reset().
0021 
0022 ----------------------------------------------------------------------*/
0023 
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Framework/interface/ModuleConsumesMinimalESInfo.h"
0034 #include "FWCore/Concurrency/interface/WaitingTask.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0047 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0048 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0049 #include "FWCore/Concurrency/interface/FunctorTask.h"
0050 #include "FWCore/Utilities/interface/Exception.h"
0051 #include "FWCore/Utilities/interface/ConvertException.h"
0052 #include "FWCore/Utilities/interface/BranchType.h"
0053 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0054 #include "FWCore/Utilities/interface/StreamID.h"
0055 #include "FWCore/Utilities/interface/propagate_const.h"
0056 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0057 #include "FWCore/Utilities/interface/ESIndices.h"
0058 #include "FWCore/Utilities/interface/Transition.h"
0059 
0060 #include "FWCore/Framework/interface/Frameworkfwd.h"
0061 
0062 #include <array>
0063 #include <atomic>
0064 #include <cassert>
0065 #include <map>
0066 #include <memory>
0067 #include <sstream>
0068 #include <string>
0069 #include <vector>
0070 #include <exception>
0071 #include <unordered_map>
0072 
0073 namespace edm {
0074   class EventPrincipal;
0075   class EventSetupImpl;
0076   class EarlyDeleteHelper;
0077   class ProductResolverIndexHelper;
0078   class ProductResolverIndexAndSkipBit;
0079   class ProductRegistry;
0080   class ThinnedAssociationsHelper;
0081 
0082   namespace workerhelper {
0083     template <typename O>
0084     class CallImpl;
0085   }
0086   namespace eventsetup {
0087     struct ComponentDescription;
0088     class ESRecordsToProductResolverIndices;
0089   }  // namespace eventsetup
0090 
0091   class Worker {
0092   public:
0093     enum State { Ready, Pass, Fail, Exception };
0094     enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0095     enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0096     struct TaskQueueAdaptor {
0097       SerialTaskQueueChain* serial_ = nullptr;
0098       LimitedTaskQueue* limited_ = nullptr;
0099 
0100       TaskQueueAdaptor() = default;
0101       TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0102       TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0103 
0104       operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0105 
0106       template <class F>
0107       void push(oneapi::tbb::task_group& iG, F&& iF) {
0108         if (serial_) {
0109           serial_->push(iG, iF);
0110         } else {
0111           limited_->push(iG, iF);
0112         }
0113       }
0114     };
0115 
0116     Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0117     virtual ~Worker();
0118 
0119     Worker(Worker const&) = delete;             // Disallow copying and moving
0120     Worker& operator=(Worker const&) = delete;  // Disallow copying and moving
0121 
0122     void clearModule() {
0123       moduleValid_ = false;
0124       doClearModule();
0125     }
0126 
0127     virtual bool wantsProcessBlocks() const noexcept = 0;
0128     virtual bool wantsInputProcessBlocks() const noexcept = 0;
0129     virtual bool wantsGlobalRuns() const noexcept = 0;
0130     virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0131     virtual bool wantsStreamRuns() const noexcept = 0;
0132     virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0133 
0134     //returns non-nullptr if the module can only process one Run at a time
0135     virtual SerialTaskQueue* globalRunsQueue() = 0;
0136     //returns non-nullptr if the module can only process one LuminosityBlock at a time
0137     virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0138 
0139     //Run only for OutputModules. Causes TriggerResults information to be used to decide if module should run.
0140     void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0141                                    WaitingTask* task,
0142                                    ServiceToken const&,
0143                                    StreamID stream,
0144                                    EventPrincipal const*) noexcept;
0145 
0146     void prePrefetchSelectionAsync(
0147         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0148       assert(false);
0149     }
0150 
0151     template <typename T>
0152     void doWorkAsync(WaitingTaskHolder,
0153                      typename T::TransitionInfoType const&,
0154                      ServiceToken const&,
0155                      StreamID,
0156                      ParentContext const&,
0157                      typename T::Context const*) noexcept;
0158 
0159     template <typename T>
0160     void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0161                                   typename T::TransitionInfoType const&,
0162                                   ServiceToken const&,
0163                                   StreamID,
0164                                   ParentContext const&,
0165                                   typename T::Context const*) noexcept;
0166 
0167     template <typename T>
0168     std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0169                                          StreamID,
0170                                          ParentContext const&,
0171                                          typename T::Context const*) noexcept;
0172 
0173     virtual size_t transformIndex(edm::ProductDescription const&) const noexcept = 0;
0174     void doTransformAsync(WaitingTaskHolder,
0175                           size_t iTransformIndex,
0176                           EventPrincipal const&,
0177                           ServiceToken const&,
0178                           StreamID,
0179                           ModuleCallingContext const&,
0180                           StreamContext const*) noexcept;
0181 
0182     void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0183     // Called if filter earlier in the path has failed.
0184     void skipOnPath(EventPrincipal const& iEvent);
0185     void beginJob(GlobalContext const&);
0186     void endJob(GlobalContext const&);
0187     void beginStream(StreamID, StreamContext const&);
0188     void endStream(StreamID, StreamContext const&);
0189     void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0190     void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0191     void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0192 
0193     void reset() {
0194       cached_exception_ = std::exception_ptr();
0195       state_ = Ready;
0196       waitingTasks_.reset();
0197       workStarted_ = false;
0198       numberOfPathsLeftToRun_ = numberOfPathsOn_;
0199     }
0200 
0201     void postDoEvent(EventPrincipal const&);
0202 
0203     ModuleDescription const* description() const noexcept {
0204       if (moduleValid_) {
0205         return moduleCallingContext_.moduleDescription();
0206       }
0207       return nullptr;
0208     }
0209     ///The signals are required to live longer than the last call to 'doWork'
0210     /// this was done to improve performance based on profiling
0211     void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0212 
0213     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0214 
0215     virtual std::vector<ModuleConsumesInfo> moduleConsumesInfos() const = 0;
0216     virtual std::vector<ModuleConsumesMinimalESInfo> moduleConsumesMinimalESInfos() const = 0;
0217 
0218     virtual Types moduleType() const = 0;
0219     virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0220 
0221     void clearCounters() noexcept {
0222       timesRun_.store(0, std::memory_order_release);
0223       timesVisited_.store(0, std::memory_order_release);
0224       timesPassed_.store(0, std::memory_order_release);
0225       timesFailed_.store(0, std::memory_order_release);
0226       timesExcept_.store(0, std::memory_order_release);
0227     }
0228 
0229     void addedToPath() noexcept { ++numberOfPathsOn_; }
0230     //NOTE: calling state() is done to force synchronization across threads
0231     int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0232     int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0233     int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0234     int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0235     int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0236     State state() const noexcept { return state_; }
0237 
0238     int timesPass() const noexcept { return timesPassed(); }  // for backward compatibility only - to be removed soon
0239 
0240     virtual bool hasAccumulator() const noexcept = 0;
0241 
0242     virtual bool matchesBaseClassPointer(void const* iPtr) const noexcept = 0;
0243     // Used in PuttableProductResolver
0244     edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0245 
0246   protected:
0247     template <typename O>
0248     friend class workerhelper::CallImpl;
0249 
0250     virtual void doClearModule() = 0;
0251 
0252     virtual std::string workerType() const = 0;
0253     virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0254 
0255     virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0256     virtual bool implNeedToRunSelection() const noexcept = 0;
0257 
0258     virtual void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) = 0;
0259 
0260     virtual void implDoTransformAsync(WaitingTaskHolder,
0261                                       size_t iTransformIndex,
0262                                       EventPrincipal const&,
0263                                       ParentContext const&,
0264                                       ServiceWeakToken const&) noexcept = 0;
0265     virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0266 
0267     virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0268     virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0269     virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0270     virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0271     virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0272     virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0273     virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0274     virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0275     virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0276     virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0277     virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0278     virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0279     virtual void implBeginJob() = 0;
0280     virtual void implEndJob() = 0;
0281     virtual void implBeginStream(StreamID) = 0;
0282     virtual void implEndStream(StreamID) = 0;
0283 
0284     void resetModuleDescription(ModuleDescription const*);
0285 
0286     ActivityRegistry* activityRegistry() { return actReg_.get(); }
0287 
0288   private:
0289     template <typename T>
0290     bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0291 
0292     virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0293     virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0294 
0295     virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0296 
0297     virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0298     virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0299 
0300     virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0301                                               ModuleCallingContext const& moduleCallingContext,
0302                                               Principal const& iPrincipal) const noexcept = 0;
0303 
0304     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0305     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0306     virtual void implRespondToCloseOutputFile() = 0;
0307 
0308     virtual TaskQueueAdaptor serializeRunModule() = 0;
0309 
0310     bool shouldRethrowException(std::exception_ptr iPtr,
0311                                 ParentContext const& parentContext,
0312                                 bool isEvent,
0313                                 bool isTryToContinue) const noexcept;
0314     void checkForShouldTryToContinue(ModuleDescription const&);
0315 
0316     template <bool IS_EVENT>
0317     bool setPassed() {
0318       if (IS_EVENT) {
0319         timesPassed_.fetch_add(1, std::memory_order_relaxed);
0320       }
0321       state_ = Pass;
0322       return true;
0323     }
0324 
0325     template <bool IS_EVENT>
0326     bool setFailed() {
0327       if (IS_EVENT) {
0328         timesFailed_.fetch_add(1, std::memory_order_relaxed);
0329       }
0330       state_ = Fail;
0331       return false;
0332     }
0333 
0334     template <bool IS_EVENT>
0335     std::exception_ptr setException(std::exception_ptr iException) {
0336       if (IS_EVENT) {
0337         timesExcept_.fetch_add(1, std::memory_order_relaxed);
0338       }
0339       cached_exception_ = iException;  // propagate_const<T> has no reset() function
0340       state_ = Exception;
0341       return cached_exception_;
0342     }
0343 
0344     template <typename T>
0345     void prefetchAsync(WaitingTaskHolder,
0346                        ServiceToken const&,
0347                        ParentContext const&,
0348                        typename T::TransitionInfoType const&,
0349                        Transition) noexcept;
0350 
0351     void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0352     void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0353 
0354     bool needsESPrefetching(Transition iTrans) const noexcept {
0355       return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0356     }
0357 
0358     void emitPostModuleEventPrefetchingSignal() {
0359       actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0360     }
0361 
0362     void emitPostModuleStreamPrefetchingSignal() {
0363       actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0364                                                        moduleCallingContext_);
0365     }
0366 
0367     void emitPostModuleGlobalPrefetchingSignal() {
0368       actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0369                                                        moduleCallingContext_);
0370     }
0371 
0372     virtual bool hasAcquire() const noexcept = 0;
0373 
0374     template <typename T>
0375     std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0376                                                    typename T::TransitionInfoType const&,
0377                                                    StreamID,
0378                                                    ParentContext const&,
0379                                                    typename T::Context const*) noexcept;
0380 
0381     // runAcquire() must take a copy of WaitingTaskHolder
0382     // see comment in runAcquireAfterAsyncPrefetch() definition
0383     void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder);
0384 
0385     void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0386                                       EventTransitionInfo const&,
0387                                       ParentContext const&,
0388                                       WaitingTaskHolder) noexcept;
0389 
0390     std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0391                                                    ParentContext const& parentContext) noexcept;
0392 
0393     template <typename T>
0394     class RunModuleTask : public WaitingTask {
0395     public:
0396       RunModuleTask(Worker* worker,
0397                     typename T::TransitionInfoType const& transitionInfo,
0398                     ServiceToken const& token,
0399                     StreamID streamID,
0400                     ParentContext const& parentContext,
0401                     typename T::Context const* context,
0402                     oneapi::tbb::task_group* iGroup) noexcept
0403           : m_worker(worker),
0404             m_transitionInfo(transitionInfo),
0405             m_streamID(streamID),
0406             m_parentContext(parentContext),
0407             m_context(context),
0408             m_serviceToken(token),
0409             m_group(iGroup) {}
0410 
0411       struct EnableQueueGuard {
0412         SerialTaskQueue* queue_;
0413         EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0414         EnableQueueGuard(EnableQueueGuard const&) = delete;
0415         EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0416         EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0417         EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0418         ~EnableQueueGuard() {
0419           if (queue_) {
0420             queue_->resume();
0421           }
0422         }
0423       };
0424 
0425       void execute() final {
0426         //Need to make the services available early so other services can see them
0427         ServiceRegistry::Operate guard(m_serviceToken.lock());
0428 
0429         //incase the emit causes an exception, we need a memory location
0430         // to hold the exception_ptr
0431         std::exception_ptr temp_excptr;
0432         auto excptr = exceptionPtr();
0433         if constexpr (T::isEvent_) {
0434           if (!m_worker->hasAcquire()) {
0435             // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
0436             CMS_SA_ALLOW try {
0437               //pre was called in prefetchAsync
0438               m_worker->emitPostModuleEventPrefetchingSignal();
0439             } catch (...) {
0440               temp_excptr = std::current_exception();
0441               if (not excptr) {
0442                 excptr = temp_excptr;
0443               }
0444             }
0445           }
0446         } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0447           m_worker->emitPostModuleStreamPrefetchingSignal();
0448         } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0449           m_worker->emitPostModuleGlobalPrefetchingSignal();
0450         }
0451 
0452         if (not excptr) {
0453           if (auto queue = m_worker->serializeRunModule()) {
0454             auto f = [worker = m_worker,
0455                       info = m_transitionInfo,
0456                       streamID = m_streamID,
0457                       parentContext = m_parentContext,
0458                       sContext = m_context,
0459                       serviceToken = m_serviceToken]() {
0460               //Need to make the services available
0461               ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0462 
0463               //If needed, we pause the queue in begin transition and resume it
0464               // at the end transition. This can guarantee that the module
0465               // only processes one run or lumi at a time
0466               EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0467               std::exception_ptr ptr;
0468               worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0469             };
0470             //keep another global transition from running if necessary
0471             auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0472             if (gQueue) {
0473               gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0474                 gQueue->pause();
0475                 queue.push(*group, std::move(f));
0476               });
0477             } else {
0478               queue.push(*m_group, std::move(f));
0479             }
0480             return;
0481           }
0482         }
0483 
0484         m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0485       }
0486 
0487     private:
0488       Worker* m_worker;
0489       typename T::TransitionInfoType m_transitionInfo;
0490       StreamID m_streamID;
0491       ParentContext const m_parentContext;
0492       typename T::Context const* m_context;
0493       ServiceWeakToken m_serviceToken;
0494       oneapi::tbb::task_group* m_group;
0495     };
0496 
0497     // AcquireTask is only used for the Event case, but we define
0498     // it as a template so all cases will compile.
0499     // DUMMY exists to work around the C++ Standard prohibition on
0500     // fully specializing templates nested in other classes.
0501     template <typename T, typename DUMMY = void>
0502     class AcquireTask : public WaitingTask {
0503     public:
0504       AcquireTask(Worker*,
0505                   typename T::TransitionInfoType const&,
0506                   ServiceToken const&,
0507                   ParentContext const&,
0508                   WaitingTaskHolder) noexcept {}
0509       void execute() final {}
0510     };
0511 
0512     template <typename DUMMY>
0513     class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0514     public:
0515       AcquireTask(Worker* worker,
0516                   EventTransitionInfo const& eventTransitionInfo,
0517                   ServiceToken const& token,
0518                   ParentContext const& parentContext,
0519                   WaitingTaskHolder holder) noexcept
0520           : m_worker(worker),
0521             m_eventTransitionInfo(eventTransitionInfo),
0522             m_parentContext(parentContext),
0523             m_holder(std::move(holder)),
0524             m_serviceToken(token) {}
0525 
0526       void execute() final {
0527         //Need to make the services available early so other services can see them
0528         ServiceRegistry::Operate guard(m_serviceToken.lock());
0529 
0530         //incase the emit causes an exception, we need a memory location
0531         // to hold the exception_ptr
0532         std::exception_ptr temp_excptr;
0533         auto excptr = exceptionPtr();
0534         // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskHolder
0535         CMS_SA_ALLOW try {
0536           //pre was called in prefetchAsync
0537           m_worker->emitPostModuleEventPrefetchingSignal();
0538         } catch (...) {
0539           temp_excptr = std::current_exception();
0540           if (not excptr) {
0541             excptr = temp_excptr;
0542           }
0543         }
0544 
0545         if (not excptr) {
0546           if (auto queue = m_worker->serializeRunModule()) {
0547             queue.push(*m_holder.group(),
0548                        [worker = m_worker,
0549                         info = m_eventTransitionInfo,
0550                         parentContext = m_parentContext,
0551                         serviceToken = m_serviceToken,
0552                         holder = std::move(m_holder)]() mutable {
0553                          //Need to make the services available
0554                          ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0555 
0556                          std::exception_ptr ptr;
0557                          worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, std::move(holder));
0558                        });
0559             return;
0560           }
0561         }
0562 
0563         m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0564       }
0565 
0566     private:
0567       Worker* m_worker;
0568       EventTransitionInfo m_eventTransitionInfo;
0569       ParentContext const m_parentContext;
0570       WaitingTaskHolder m_holder;
0571       ServiceWeakToken m_serviceToken;
0572     };
0573 
0574     // This class does nothing unless there is an exception originating
0575     // in an "External Worker". In that case, it handles converting the
0576     // exception to a CMS exception and adding context to the exception.
0577     class HandleExternalWorkExceptionTask : public WaitingTask {
0578     public:
0579       HandleExternalWorkExceptionTask(Worker* worker,
0580                                       oneapi::tbb::task_group* group,
0581                                       WaitingTask* runModuleTask,
0582                                       ParentContext const& parentContext) noexcept;
0583 
0584       void execute() final;
0585 
0586     private:
0587       Worker* m_worker;
0588       WaitingTask* m_runModuleTask;
0589       oneapi::tbb::task_group* m_group;
0590       ParentContext const m_parentContext;
0591     };
0592 
0593     std::atomic<int> timesRun_;
0594     std::atomic<int> timesVisited_;
0595     std::atomic<int> timesPassed_;
0596     std::atomic<int> timesFailed_;
0597     std::atomic<int> timesExcept_;
0598     std::atomic<State> state_;
0599     int numberOfPathsOn_;
0600     std::atomic<int> numberOfPathsLeftToRun_;
0601 
0602     ModuleCallingContext moduleCallingContext_;
0603 
0604     ExceptionToActionTable const* actions_;                         // memory assumed to be managed elsewhere
0605     CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;  // if state is 'exception'
0606 
0607     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0608 
0609     edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0610 
0611     edm::WaitingTaskList waitingTasks_;
0612     std::atomic<bool> workStarted_;
0613     bool ranAcquireWithoutException_;
0614     bool moduleValid_ = true;
0615     bool shouldTryToContinue_ = false;
0616     bool beginSucceeded_ = false;
0617   };
0618 
0619   namespace {
0620     template <typename T>
0621     class ModuleSignalSentry {
0622     public:
0623       ModuleSignalSentry(ActivityRegistry* a,
0624                          typename T::Context const* context,
0625                          ModuleCallingContext const* moduleCallingContext)
0626           : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
0627 
0628       ~ModuleSignalSentry() {
0629         // This destructor does nothing unless we are unwinding the
0630         // the stack from an earlier exception (a_ will be null if we are
0631         // are not). We want to report the earlier exception and ignore any
0632         // addition exceptions from the post module signal.
0633         CMS_SA_ALLOW try {
0634           if (a_) {
0635             T::postModuleSignal(a_, context_, moduleCallingContext_);
0636           }
0637         } catch (...) {
0638         }
0639       }
0640       void preModuleSignal() {
0641         if (a_) {
0642           try {
0643             convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
0644           } catch (cms::Exception& ex) {
0645             ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
0646             throw;
0647           }
0648         }
0649       }
0650       void postModuleSignal() {
0651         if (a_) {
0652           auto temp = a_;
0653           // Setting a_ to null informs the destructor that the signal
0654           // was already run and that it should do nothing.
0655           a_ = nullptr;
0656           try {
0657             convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
0658           } catch (cms::Exception& ex) {
0659             ex.addContext("Handling post module signal, likely in a service function immediately after module method");
0660             throw;
0661           }
0662         }
0663       }
0664 
0665     private:
0666       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0667       typename T::Context const* context_;
0668       ModuleCallingContext const* moduleCallingContext_;
0669     };
0670 
0671   }  // namespace
0672 
0673   namespace workerhelper {
0674     template <>
0675     class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0676     public:
0677       typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0678       static bool call(Worker* iWorker,
0679                        StreamID,
0680                        EventTransitionInfo const& info,
0681                        ActivityRegistry* /* actReg */,
0682                        ModuleCallingContext const* mcc,
0683                        Arg::Context const* /* context*/) {
0684         //Signal sentry is handled by the module
0685         return iWorker->implDo(info, mcc);
0686       }
0687       static void esPrefetchAsync(Worker* worker,
0688                                   WaitingTaskHolder waitingTask,
0689                                   ServiceToken const& token,
0690                                   EventTransitionInfo const& info,
0691                                   Transition transition) noexcept {
0692         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0693       }
0694       static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0695       static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0696 
0697       static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0698       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0699     };
0700 
0701     template <>
0702     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0703     public:
0704       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0705       static bool call(Worker* iWorker,
0706                        StreamID,
0707                        RunTransitionInfo const& info,
0708                        ActivityRegistry* actReg,
0709                        ModuleCallingContext const* mcc,
0710                        Arg::Context const* context) {
0711         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0712         // If preModuleSignal() throws, implDoBegin() is not called, and the
0713         // cpp destructor calls postModuleSignal (ignoring additional exceptions)
0714         cpp.preModuleSignal();
0715         // If implDoBegin() throws, the cpp destructor calls postModuleSignal
0716         // (ignoring additional exceptions)
0717         auto returnValue = iWorker->implDoBegin(info, mcc);
0718         // If postModuleSignal() throws, the exception will propagate to the framework
0719         cpp.postModuleSignal();
0720         iWorker->beginSucceeded_ = true;
0721         return returnValue;
0722       }
0723       static void esPrefetchAsync(Worker* worker,
0724                                   WaitingTaskHolder waitingTask,
0725                                   ServiceToken const& token,
0726                                   RunTransitionInfo const& info,
0727                                   Transition transition) noexcept {
0728         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0729       }
0730       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0731       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0732       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0733       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0734     };
0735     template <>
0736     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0737     public:
0738       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0739       static bool call(Worker* iWorker,
0740                        StreamID id,
0741                        RunTransitionInfo const& info,
0742                        ActivityRegistry* actReg,
0743                        ModuleCallingContext const* mcc,
0744                        Arg::Context const* context) {
0745         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0746         cpp.preModuleSignal();
0747         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0748         cpp.postModuleSignal();
0749         iWorker->beginSucceeded_ = true;
0750         return returnValue;
0751       }
0752       static void esPrefetchAsync(Worker* worker,
0753                                   WaitingTaskHolder waitingTask,
0754                                   ServiceToken const& token,
0755                                   RunTransitionInfo const& info,
0756                                   Transition transition) noexcept {
0757         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0758       }
0759       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0760       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0761       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0762       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0763     };
0764     template <>
0765     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0766     public:
0767       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0768       static bool call(Worker* iWorker,
0769                        StreamID,
0770                        RunTransitionInfo const& info,
0771                        ActivityRegistry* actReg,
0772                        ModuleCallingContext const* mcc,
0773                        Arg::Context const* context) {
0774         if (iWorker->beginSucceeded_) {
0775           iWorker->beginSucceeded_ = false;
0776 
0777           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0778           cpp.preModuleSignal();
0779           auto returnValue = iWorker->implDoEnd(info, mcc);
0780           cpp.postModuleSignal();
0781           return returnValue;
0782         }
0783         return true;
0784       }
0785       static void esPrefetchAsync(Worker* worker,
0786                                   WaitingTaskHolder waitingTask,
0787                                   ServiceToken const& token,
0788                                   RunTransitionInfo const& info,
0789                                   Transition transition) noexcept {
0790         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0791       }
0792       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0793       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0794       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0795       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0796     };
0797     template <>
0798     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0799     public:
0800       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0801       static bool call(Worker* iWorker,
0802                        StreamID id,
0803                        RunTransitionInfo const& info,
0804                        ActivityRegistry* actReg,
0805                        ModuleCallingContext const* mcc,
0806                        Arg::Context const* context) {
0807         if (iWorker->beginSucceeded_) {
0808           iWorker->beginSucceeded_ = false;
0809 
0810           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0811           cpp.preModuleSignal();
0812           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0813           cpp.postModuleSignal();
0814           return returnValue;
0815         }
0816         return true;
0817       }
0818       static void esPrefetchAsync(Worker* worker,
0819                                   WaitingTaskHolder waitingTask,
0820                                   ServiceToken const& token,
0821                                   RunTransitionInfo const& info,
0822                                   Transition transition) noexcept {
0823         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0824       }
0825       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0826       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0827       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0828       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0829     };
0830 
0831     template <>
0832     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0833     public:
0834       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0835       static bool call(Worker* iWorker,
0836                        StreamID,
0837                        LumiTransitionInfo const& info,
0838                        ActivityRegistry* actReg,
0839                        ModuleCallingContext const* mcc,
0840                        Arg::Context const* context) {
0841         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0842         cpp.preModuleSignal();
0843         auto returnValue = iWorker->implDoBegin(info, mcc);
0844         cpp.postModuleSignal();
0845         iWorker->beginSucceeded_ = true;
0846         return returnValue;
0847       }
0848       static void esPrefetchAsync(Worker* worker,
0849                                   WaitingTaskHolder waitingTask,
0850                                   ServiceToken const& token,
0851                                   LumiTransitionInfo const& info,
0852                                   Transition transition) noexcept {
0853         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0854       }
0855       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0856       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0857       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0858         return iWorker->globalLuminosityBlocksQueue();
0859       }
0860       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0861     };
0862     template <>
0863     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0864     public:
0865       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0866       static bool call(Worker* iWorker,
0867                        StreamID id,
0868                        LumiTransitionInfo const& info,
0869                        ActivityRegistry* actReg,
0870                        ModuleCallingContext const* mcc,
0871                        Arg::Context const* context) {
0872         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0873         cpp.preModuleSignal();
0874         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0875         cpp.postModuleSignal();
0876         iWorker->beginSucceeded_ = true;
0877         return returnValue;
0878       }
0879       static void esPrefetchAsync(Worker* worker,
0880                                   WaitingTaskHolder waitingTask,
0881                                   ServiceToken const& token,
0882                                   LumiTransitionInfo const& info,
0883                                   Transition transition) noexcept {
0884         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0885       }
0886       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0887       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0888       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0889       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0890     };
0891 
0892     template <>
0893     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0894     public:
0895       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0896       static bool call(Worker* iWorker,
0897                        StreamID,
0898                        LumiTransitionInfo const& info,
0899                        ActivityRegistry* actReg,
0900                        ModuleCallingContext const* mcc,
0901                        Arg::Context const* context) {
0902         if (iWorker->beginSucceeded_) {
0903           iWorker->beginSucceeded_ = false;
0904 
0905           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0906           cpp.preModuleSignal();
0907           auto returnValue = iWorker->implDoEnd(info, mcc);
0908           cpp.postModuleSignal();
0909           return returnValue;
0910         }
0911         return true;
0912       }
0913       static void esPrefetchAsync(Worker* worker,
0914                                   WaitingTaskHolder waitingTask,
0915                                   ServiceToken const& token,
0916                                   LumiTransitionInfo const& info,
0917                                   Transition transition) noexcept {
0918         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0919       }
0920       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0921       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0922       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0923       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0924         return iWorker->globalLuminosityBlocksQueue();
0925       }
0926     };
0927     template <>
0928     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0929     public:
0930       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0931       static bool call(Worker* iWorker,
0932                        StreamID id,
0933                        LumiTransitionInfo const& info,
0934                        ActivityRegistry* actReg,
0935                        ModuleCallingContext const* mcc,
0936                        Arg::Context const* context) {
0937         if (iWorker->beginSucceeded_) {
0938           iWorker->beginSucceeded_ = false;
0939 
0940           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0941           cpp.preModuleSignal();
0942           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0943           cpp.postModuleSignal();
0944           return returnValue;
0945         }
0946         return true;
0947       }
0948       static void esPrefetchAsync(Worker* worker,
0949                                   WaitingTaskHolder waitingTask,
0950                                   ServiceToken const& token,
0951                                   LumiTransitionInfo const& info,
0952                                   Transition transition) noexcept {
0953         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0954       }
0955       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0956       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0957       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0958       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0959     };
0960     template <>
0961     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0962     public:
0963       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0964       static bool call(Worker* iWorker,
0965                        StreamID,
0966                        ProcessBlockTransitionInfo const& info,
0967                        ActivityRegistry* actReg,
0968                        ModuleCallingContext const* mcc,
0969                        Arg::Context const* context) {
0970         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0971         cpp.preModuleSignal();
0972         auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0973         cpp.postModuleSignal();
0974         iWorker->beginSucceeded_ = true;
0975         return returnValue;
0976       }
0977       static void esPrefetchAsync(
0978           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0979       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0980       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0981       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0982       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0983     };
0984     template <>
0985     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0986     public:
0987       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0988       static bool call(Worker* iWorker,
0989                        StreamID,
0990                        ProcessBlockTransitionInfo const& info,
0991                        ActivityRegistry* actReg,
0992                        ModuleCallingContext const* mcc,
0993                        Arg::Context const* context) {
0994         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0995         cpp.preModuleSignal();
0996         auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0997         cpp.postModuleSignal();
0998         return returnValue;
0999       }
1000       static void esPrefetchAsync(
1001           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1002       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1003       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1004       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1005       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1006     };
1007     template <>
1008     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
1009     public:
1010       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1011       static bool call(Worker* iWorker,
1012                        StreamID,
1013                        ProcessBlockTransitionInfo const& info,
1014                        ActivityRegistry* actReg,
1015                        ModuleCallingContext const* mcc,
1016                        Arg::Context const* context) {
1017         if (iWorker->beginSucceeded_) {
1018           iWorker->beginSucceeded_ = false;
1019 
1020           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1021           cpp.preModuleSignal();
1022           auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1023           cpp.postModuleSignal();
1024           return returnValue;
1025         }
1026         return true;
1027       }
1028       static void esPrefetchAsync(
1029           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1030       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1031       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1032       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1033       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1034     };
1035   }  // namespace workerhelper
1036 
1037   template <typename T>
1038   void Worker::prefetchAsync(WaitingTaskHolder iTask,
1039                              ServiceToken const& token,
1040                              ParentContext const& parentContext,
1041                              typename T::TransitionInfoType const& transitionInfo,
1042                              Transition iTransition) noexcept {
1043     Principal const& principal = transitionInfo.principal();
1044 
1045     moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1046 
1047     if constexpr (T::isEvent_) {
1048       actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1049     } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1050       actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1051     } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1052       actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1053     }
1054 
1055     workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1056     edPrefetchAsync(iTask, token, principal);
1057 
1058     if (principal.branchType() == InEvent) {
1059       preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1060     }
1061   }
1062 
1063   template <typename T>
1064   void Worker::doWorkAsync(WaitingTaskHolder task,
1065                            typename T::TransitionInfoType const& transitionInfo,
1066                            ServiceToken const& token,
1067                            StreamID streamID,
1068                            ParentContext const& parentContext,
1069                            typename T::Context const* context) noexcept {
1070     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1071       return;
1072     }
1073 
1074     //Need to check workStarted_ before adding to waitingTasks_
1075     bool expected = false;
1076     bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1077 
1078     waitingTasks_.add(task);
1079     if constexpr (T::isEvent_) {
1080       timesVisited_.fetch_add(1, std::memory_order_relaxed);
1081     }
1082 
1083     if (workStarted) {
1084       moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1085 
1086       //if have TriggerResults based selection we want to reject the event before doing prefetching
1087       if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1088         //We need to run the selection in a different task so that
1089         // we can prefetch the data needed for the selection
1090         WaitingTask* moduleTask =
1091             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1092 
1093         //make sure the task is either run or destroyed
1094         struct DestroyTask {
1095           DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1096 
1097           ~DestroyTask() noexcept {
1098             auto p = m_task.exchange(nullptr);
1099             if (p) {
1100               TaskSentry s{p};
1101             }
1102           }
1103 
1104           edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1105 
1106         private:
1107           std::atomic<edm::WaitingTask*> m_task;
1108         };
1109         if constexpr (T::isEvent_) {
1110           if (hasAcquire()) {
1111             auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1112             ServiceWeakToken weakToken = token;
1113             auto* group = task.group();
1114             moduleTask = make_waiting_task(
1115                 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1116                   WaitingTaskHolder runTaskHolder(
1117                       *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1118                   AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1119                   t.execute();
1120                 });
1121           }
1122         }
1123         auto* group = task.group();
1124         auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1125         ServiceWeakToken weakToken = token;
1126         auto selectionTask =
1127             make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1128                                   std::exception_ptr const*) mutable {
1129               ServiceRegistry::Operate guard(weakToken.lock());
1130               prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1131                                weakToken.lock(),
1132                                parentContext,
1133                                info,
1134                                T::transition_);
1135             });
1136         prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1137       } else {
1138         WaitingTask* moduleTask =
1139             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1140         auto group = task.group();
1141         if constexpr (T::isEvent_) {
1142           if (hasAcquire()) {
1143             WaitingTaskHolder runTaskHolder(
1144                 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1145             moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1146           }
1147         }
1148         prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1149       }
1150     }
1151   }
1152 
1153   template <typename T>
1154   std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1155                                                          typename T::TransitionInfoType const& transitionInfo,
1156                                                          StreamID streamID,
1157                                                          ParentContext const& parentContext,
1158                                                          typename T::Context const* context) noexcept {
1159     std::exception_ptr exceptionPtr;
1160     bool shouldRun = true;
1161     if (iEPtr) {
1162       if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1163         exceptionPtr = iEPtr;
1164         setException<T::isEvent_>(exceptionPtr);
1165         shouldRun = false;
1166       } else {
1167         if (not shouldTryToContinue_) {
1168           setPassed<T::isEvent_>();
1169           shouldRun = false;
1170         }
1171       }
1172     }
1173     if (shouldRun) {
1174       // Caught exception is propagated via WaitingTaskList
1175       CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1176         exceptionPtr = std::current_exception();
1177       }
1178     } else {
1179       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1180     }
1181     waitingTasks_.doneWaiting(exceptionPtr);
1182     return exceptionPtr;
1183   }
1184 
1185   template <typename T>
1186   void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1187                                         typename T::TransitionInfoType const& transitionInfo,
1188                                         ServiceToken const& serviceToken,
1189                                         StreamID streamID,
1190                                         ParentContext const& parentContext,
1191                                         typename T::Context const* context) noexcept {
1192     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1193       return;
1194     }
1195 
1196     //Need to check workStarted_ before adding to waitingTasks_
1197     bool expected = false;
1198     auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1199 
1200     waitingTasks_.add(task);
1201     if (workStarted) {
1202       ServiceWeakToken weakToken = serviceToken;
1203       auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1204         std::exception_ptr exceptionPtr;
1205         // Caught exception is propagated via WaitingTaskList
1206         CMS_SA_ALLOW try {
1207           //Need to make the services available
1208           ServiceRegistry::Operate guard(weakToken.lock());
1209 
1210           this->runModule<T>(info, streamID, parentContext, context);
1211         } catch (...) {
1212           exceptionPtr = std::current_exception();
1213         }
1214         this->waitingTasks_.doneWaiting(exceptionPtr);
1215       };
1216 
1217       if (needsESPrefetching(T::transition_)) {
1218         auto group = task.group();
1219         auto afterPrefetch =
1220             edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1221               if (iExcept) {
1222                 this->waitingTasks_.doneWaiting(*iExcept);
1223               } else {
1224                 if (auto queue = this->serializeRunModule()) {
1225                   queue.push(*group, toDo);
1226                 } else {
1227                   group->run(toDo);
1228                 }
1229               }
1230             });
1231         moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1232         esPrefetchAsync(
1233             WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1234       } else {
1235         auto group = task.group();
1236         if (auto queue = this->serializeRunModule()) {
1237           queue.push(*group, toDo);
1238         } else {
1239           group->run(toDo);
1240         }
1241       }
1242     }
1243   }
1244 
1245   template <typename T>
1246   bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1247                          StreamID streamID,
1248                          ParentContext const& parentContext,
1249                          typename T::Context const* context) {
1250     //unscheduled producers should advance this
1251     //if (T::isEvent_) {
1252     //  ++timesVisited_;
1253     //}
1254     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1255     if constexpr (T::isEvent_) {
1256       timesRun_.fetch_add(1, std::memory_order_relaxed);
1257     }
1258 
1259     bool rc = true;
1260     try {
1261       convertException::wrap([&]() {
1262         rc = workerhelper::CallImpl<T>::call(
1263             this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1264 
1265         if (rc) {
1266           setPassed<T::isEvent_>();
1267         } else {
1268           setFailed<T::isEvent_>();
1269         }
1270       });
1271     } catch (cms::Exception& ex) {
1272       edm::exceptionContext(ex, moduleCallingContext_);
1273       if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1274         assert(not cached_exception_);
1275         setException<T::isEvent_>(std::current_exception());
1276         std::rethrow_exception(cached_exception_);
1277       } else {
1278         rc = setPassed<T::isEvent_>();
1279       }
1280     }
1281 
1282     return rc;
1283   }
1284 
1285   template <typename T>
1286   std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1287                                                StreamID streamID,
1288                                                ParentContext const& parentContext,
1289                                                typename T::Context const* context) noexcept {
1290     timesVisited_.fetch_add(1, std::memory_order_relaxed);
1291     std::exception_ptr prefetchingException;  // null because there was no prefetching to do
1292     return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1293   }
1294 }  // namespace edm
1295 #endif