Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:52

0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 Worker: this is a basic scheduling unit - an abstract base class to
0007 something that is really a producer or filter.
0008 
0009 A worker will not actually call through to the module unless it is
0010 in a Ready state.  After a module is actually run, the state will not
0011 be Ready.  The Ready state can only be reestablished by doing a reset().
0012 
0013 Pre/post module signals are posted only in the Ready state.
0014 
0015 Execution statistics are kept here.
0016 
0017 If a module has thrown an exception during execution, that exception
0018 will be rethrown if the worker is entered again and the state is not Ready.
0019 In other words, execution results (status) are cached and reused until
0020 the worker is reset().
0021 
0022 ----------------------------------------------------------------------*/
0023 
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0047 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0048 #include "FWCore/Concurrency/interface/FunctorTask.h"
0049 #include "FWCore/Utilities/interface/Exception.h"
0050 #include "FWCore/Utilities/interface/ConvertException.h"
0051 #include "FWCore/Utilities/interface/BranchType.h"
0052 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0053 #include "FWCore/Utilities/interface/StreamID.h"
0054 #include "FWCore/Utilities/interface/propagate_const.h"
0055 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0056 #include "FWCore/Utilities/interface/ESIndices.h"
0057 #include "FWCore/Utilities/interface/Transition.h"
0058 
0059 #include "FWCore/Framework/interface/Frameworkfwd.h"
0060 
0061 #include <array>
0062 #include <atomic>
0063 #include <cassert>
0064 #include <map>
0065 #include <memory>
0066 #include <sstream>
0067 #include <string>
0068 #include <vector>
0069 #include <exception>
0070 #include <unordered_map>
0071 
0072 namespace edm {
0073   class EventPrincipal;
0074   class EventSetupImpl;
0075   class EarlyDeleteHelper;
0076   class ModuleProcessName;
0077   class ProductResolverIndexHelper;
0078   class ProductResolverIndexAndSkipBit;
0079   class StreamID;
0080   class StreamContext;
0081   class ProductRegistry;
0082   class ThinnedAssociationsHelper;
0083 
0084   namespace workerhelper {
0085     template <typename O>
0086     class CallImpl;
0087   }
0088   namespace eventsetup {
0089     class ESRecordsToProductResolverIndices;
0090   }
0091 
0092   class Worker {
0093   public:
0094     enum State { Ready, Pass, Fail, Exception };
0095     enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096     enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0097     struct TaskQueueAdaptor {
0098       SerialTaskQueueChain* serial_ = nullptr;
0099       LimitedTaskQueue* limited_ = nullptr;
0100 
0101       TaskQueueAdaptor() = default;
0102       TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103       TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104 
0105       operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106 
0107       template <class F>
0108       void push(oneapi::tbb::task_group& iG, F&& iF) {
0109         if (serial_) {
0110           serial_->push(iG, iF);
0111         } else {
0112           limited_->push(iG, iF);
0113         }
0114       }
0115     };
0116 
0117     Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118     virtual ~Worker();
0119 
0120     Worker(Worker const&) = delete;             // Disallow copying and moving
0121     Worker& operator=(Worker const&) = delete;  // Disallow copying and moving
0122 
0123     void clearModule() {
0124       moduleValid_ = false;
0125       doClearModule();
0126     }
0127 
0128     virtual bool wantsProcessBlocks() const noexcept = 0;
0129     virtual bool wantsInputProcessBlocks() const noexcept = 0;
0130     virtual bool wantsGlobalRuns() const noexcept = 0;
0131     virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0132     virtual bool wantsStreamRuns() const noexcept = 0;
0133     virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0134 
0135     virtual SerialTaskQueue* globalRunsQueue() = 0;
0136     virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0137 
0138     void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0139                                    WaitingTask* task,
0140                                    ServiceToken const&,
0141                                    StreamID stream,
0142                                    EventPrincipal const*) noexcept;
0143 
0144     void prePrefetchSelectionAsync(
0145         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0146       assert(false);
0147     }
0148 
0149     template <typename T>
0150     void doWorkAsync(WaitingTaskHolder,
0151                      typename T::TransitionInfoType const&,
0152                      ServiceToken const&,
0153                      StreamID,
0154                      ParentContext const&,
0155                      typename T::Context const*) noexcept;
0156 
0157     template <typename T>
0158     void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0159                                   typename T::TransitionInfoType const&,
0160                                   ServiceToken const&,
0161                                   StreamID,
0162                                   ParentContext const&,
0163                                   typename T::Context const*) noexcept;
0164 
0165     template <typename T>
0166     std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0167                                          StreamID,
0168                                          ParentContext const&,
0169                                          typename T::Context const*) noexcept;
0170 
0171     virtual size_t transformIndex(edm::BranchDescription const&) const noexcept = 0;
0172     void doTransformAsync(WaitingTaskHolder,
0173                           size_t iTransformIndex,
0174                           EventPrincipal const&,
0175                           ServiceToken const&,
0176                           StreamID,
0177                           ModuleCallingContext const&,
0178                           StreamContext const*) noexcept;
0179 
0180     void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0181     void skipOnPath(EventPrincipal const& iEvent);
0182     void beginJob();
0183     void endJob();
0184     void beginStream(StreamID id, StreamContext& streamContext);
0185     void endStream(StreamID id, StreamContext& streamContext);
0186     void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0187     void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0188     void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0189     void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0190 
0191     void reset() {
0192       cached_exception_ = std::exception_ptr();
0193       state_ = Ready;
0194       waitingTasks_.reset();
0195       workStarted_ = false;
0196       numberOfPathsLeftToRun_ = numberOfPathsOn_;
0197     }
0198 
0199     void postDoEvent(EventPrincipal const&);
0200 
0201     ModuleDescription const* description() const noexcept {
0202       if (moduleValid_) {
0203         return moduleCallingContext_.moduleDescription();
0204       }
0205       return nullptr;
0206     }
0207     ///The signals are required to live longer than the last call to 'doWork'
0208     /// this was done to improve performance based on profiling
0209     void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0210 
0211     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0212 
0213     //Used to make EDGetToken work
0214     virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0215     virtual void updateLookup(eventsetup::ESRecordsToProductResolverIndices const&) = 0;
0216     virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0217     virtual void resolvePutIndicies(
0218         BranchType iBranchType,
0219         std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0220             iIndicies) = 0;
0221 
0222     virtual void modulesWhoseProductsAreConsumed(
0223         std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0224         std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0225         ProductRegistry const& preg,
0226         std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0227 
0228     virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0229 
0230     virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0231 
0232     virtual Types moduleType() const = 0;
0233     virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0234 
0235     void clearCounters() noexcept {
0236       timesRun_.store(0, std::memory_order_release);
0237       timesVisited_.store(0, std::memory_order_release);
0238       timesPassed_.store(0, std::memory_order_release);
0239       timesFailed_.store(0, std::memory_order_release);
0240       timesExcept_.store(0, std::memory_order_release);
0241     }
0242 
0243     void addedToPath() noexcept { ++numberOfPathsOn_; }
0244     //NOTE: calling state() is done to force synchronization across threads
0245     int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0246     int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0247     int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0248     int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0249     int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0250     State state() const noexcept { return state_; }
0251 
0252     int timesPass() const noexcept { return timesPassed(); }  // for backward compatibility only - to be removed soon
0253 
0254     virtual bool hasAccumulator() const noexcept = 0;
0255 
0256     // Used in PuttableProductResolver
0257     edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0258 
0259   protected:
0260     template <typename O>
0261     friend class workerhelper::CallImpl;
0262 
0263     virtual void doClearModule() = 0;
0264 
0265     virtual std::string workerType() const = 0;
0266     virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0267 
0268     virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0269     virtual bool implNeedToRunSelection() const noexcept = 0;
0270 
0271     virtual void implDoAcquire(EventTransitionInfo const&,
0272                                ModuleCallingContext const*,
0273                                WaitingTaskWithArenaHolder&) = 0;
0274 
0275     virtual void implDoTransformAsync(WaitingTaskHolder,
0276                                       size_t iTransformIndex,
0277                                       EventPrincipal const&,
0278                                       ParentContext const&,
0279                                       ServiceWeakToken const&) noexcept = 0;
0280     virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0281 
0282     virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0283     virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0284     virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0285     virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0286     virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0287     virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0288     virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0289     virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0290     virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0291     virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0292     virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0293     virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0294     virtual void implBeginJob() = 0;
0295     virtual void implEndJob() = 0;
0296     virtual void implBeginStream(StreamID) = 0;
0297     virtual void implEndStream(StreamID) = 0;
0298 
0299     void resetModuleDescription(ModuleDescription const*);
0300 
0301     ActivityRegistry* activityRegistry() { return actReg_.get(); }
0302 
0303   private:
0304     template <typename T>
0305     bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0306 
0307     virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0308     virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0309 
0310     virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0311 
0312     virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0313     virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0314 
0315     virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0316                                               ModuleCallingContext const& moduleCallingContext,
0317                                               Principal const& iPrincipal) const noexcept = 0;
0318 
0319     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0320     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0321     virtual void implRespondToCloseOutputFile() = 0;
0322 
0323     virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0324 
0325     virtual TaskQueueAdaptor serializeRunModule() = 0;
0326 
0327     bool shouldRethrowException(std::exception_ptr iPtr,
0328                                 ParentContext const& parentContext,
0329                                 bool isEvent,
0330                                 bool isTryToContinue) const noexcept;
0331     void checkForShouldTryToContinue(ModuleDescription const&);
0332 
0333     template <bool IS_EVENT>
0334     bool setPassed() {
0335       if (IS_EVENT) {
0336         timesPassed_.fetch_add(1, std::memory_order_relaxed);
0337       }
0338       state_ = Pass;
0339       return true;
0340     }
0341 
0342     template <bool IS_EVENT>
0343     bool setFailed() {
0344       if (IS_EVENT) {
0345         timesFailed_.fetch_add(1, std::memory_order_relaxed);
0346       }
0347       state_ = Fail;
0348       return false;
0349     }
0350 
0351     template <bool IS_EVENT>
0352     std::exception_ptr setException(std::exception_ptr iException) {
0353       if (IS_EVENT) {
0354         timesExcept_.fetch_add(1, std::memory_order_relaxed);
0355       }
0356       cached_exception_ = iException;  // propagate_const<T> has no reset() function
0357       state_ = Exception;
0358       return cached_exception_;
0359     }
0360 
0361     template <typename T>
0362     void prefetchAsync(WaitingTaskHolder,
0363                        ServiceToken const&,
0364                        ParentContext const&,
0365                        typename T::TransitionInfoType const&,
0366                        Transition) noexcept;
0367 
0368     void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0369     void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0370 
0371     bool needsESPrefetching(Transition iTrans) const noexcept {
0372       return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0373     }
0374 
0375     void emitPostModuleEventPrefetchingSignal() {
0376       actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0377     }
0378 
0379     void emitPostModuleStreamPrefetchingSignal() {
0380       actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0381                                                        moduleCallingContext_);
0382     }
0383 
0384     void emitPostModuleGlobalPrefetchingSignal() {
0385       actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0386                                                        moduleCallingContext_);
0387     }
0388 
0389     virtual bool hasAcquire() const noexcept = 0;
0390 
0391     template <typename T>
0392     std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0393                                                    typename T::TransitionInfoType const&,
0394                                                    StreamID,
0395                                                    ParentContext const&,
0396                                                    typename T::Context const*) noexcept;
0397 
0398     void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0399 
0400     void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0401                                       EventTransitionInfo const&,
0402                                       ParentContext const&,
0403                                       WaitingTaskWithArenaHolder) noexcept;
0404 
0405     std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0406                                                    ParentContext const& parentContext) noexcept;
0407 
0408     template <typename T>
0409     class RunModuleTask : public WaitingTask {
0410     public:
0411       RunModuleTask(Worker* worker,
0412                     typename T::TransitionInfoType const& transitionInfo,
0413                     ServiceToken const& token,
0414                     StreamID streamID,
0415                     ParentContext const& parentContext,
0416                     typename T::Context const* context,
0417                     oneapi::tbb::task_group* iGroup) noexcept
0418           : m_worker(worker),
0419             m_transitionInfo(transitionInfo),
0420             m_streamID(streamID),
0421             m_parentContext(parentContext),
0422             m_context(context),
0423             m_serviceToken(token),
0424             m_group(iGroup) {}
0425 
0426       struct EnableQueueGuard {
0427         SerialTaskQueue* queue_;
0428         EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0429         EnableQueueGuard(EnableQueueGuard const&) = delete;
0430         EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0431         EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0432         EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0433         ~EnableQueueGuard() {
0434           if (queue_) {
0435             queue_->resume();
0436           }
0437         }
0438       };
0439 
0440       void execute() final {
0441         //Need to make the services available early so other services can see them
0442         ServiceRegistry::Operate guard(m_serviceToken.lock());
0443 
0444         //incase the emit causes an exception, we need a memory location
0445         // to hold the exception_ptr
0446         std::exception_ptr temp_excptr;
0447         auto excptr = exceptionPtr();
0448         if constexpr (T::isEvent_) {
0449           if (!m_worker->hasAcquire()) {
0450             // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
0451             CMS_SA_ALLOW try {
0452               //pre was called in prefetchAsync
0453               m_worker->emitPostModuleEventPrefetchingSignal();
0454             } catch (...) {
0455               temp_excptr = std::current_exception();
0456               if (not excptr) {
0457                 excptr = temp_excptr;
0458               }
0459             }
0460           }
0461         } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0462           m_worker->emitPostModuleStreamPrefetchingSignal();
0463         } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0464           m_worker->emitPostModuleGlobalPrefetchingSignal();
0465         }
0466 
0467         if (not excptr) {
0468           if (auto queue = m_worker->serializeRunModule()) {
0469             auto f = [worker = m_worker,
0470                       info = m_transitionInfo,
0471                       streamID = m_streamID,
0472                       parentContext = m_parentContext,
0473                       sContext = m_context,
0474                       serviceToken = m_serviceToken]() {
0475               //Need to make the services available
0476               ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0477 
0478               //If needed, we pause the queue in begin transition and resume it
0479               // at the end transition. This can guarantee that the module
0480               // only processes one run or lumi at a time
0481               EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0482               std::exception_ptr ptr;
0483               worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0484             };
0485             //keep another global transition from running if necessary
0486             auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0487             if (gQueue) {
0488               gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0489                 gQueue->pause();
0490                 queue.push(*group, std::move(f));
0491               });
0492             } else {
0493               queue.push(*m_group, std::move(f));
0494             }
0495             return;
0496           }
0497         }
0498 
0499         m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0500       }
0501 
0502     private:
0503       Worker* m_worker;
0504       typename T::TransitionInfoType m_transitionInfo;
0505       StreamID m_streamID;
0506       ParentContext const m_parentContext;
0507       typename T::Context const* m_context;
0508       ServiceWeakToken m_serviceToken;
0509       oneapi::tbb::task_group* m_group;
0510     };
0511 
0512     // AcquireTask is only used for the Event case, but we define
0513     // it as a template so all cases will compile.
0514     // DUMMY exists to work around the C++ Standard prohibition on
0515     // fully specializing templates nested in other classes.
0516     template <typename T, typename DUMMY = void>
0517     class AcquireTask : public WaitingTask {
0518     public:
0519       AcquireTask(Worker*,
0520                   typename T::TransitionInfoType const&,
0521                   ServiceToken const&,
0522                   ParentContext const&,
0523                   WaitingTaskWithArenaHolder) noexcept {}
0524       void execute() final {}
0525     };
0526 
0527     template <typename DUMMY>
0528     class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0529     public:
0530       AcquireTask(Worker* worker,
0531                   EventTransitionInfo const& eventTransitionInfo,
0532                   ServiceToken const& token,
0533                   ParentContext const& parentContext,
0534                   WaitingTaskWithArenaHolder holder) noexcept
0535           : m_worker(worker),
0536             m_eventTransitionInfo(eventTransitionInfo),
0537             m_parentContext(parentContext),
0538             m_holder(std::move(holder)),
0539             m_serviceToken(token) {}
0540 
0541       void execute() final {
0542         //Need to make the services available early so other services can see them
0543         ServiceRegistry::Operate guard(m_serviceToken.lock());
0544 
0545         //incase the emit causes an exception, we need a memory location
0546         // to hold the exception_ptr
0547         std::exception_ptr temp_excptr;
0548         auto excptr = exceptionPtr();
0549         // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
0550         CMS_SA_ALLOW try {
0551           //pre was called in prefetchAsync
0552           m_worker->emitPostModuleEventPrefetchingSignal();
0553         } catch (...) {
0554           temp_excptr = std::current_exception();
0555           if (not excptr) {
0556             excptr = temp_excptr;
0557           }
0558         }
0559 
0560         if (not excptr) {
0561           if (auto queue = m_worker->serializeRunModule()) {
0562             queue.push(*m_holder.group(),
0563                        [worker = m_worker,
0564                         info = m_eventTransitionInfo,
0565                         parentContext = m_parentContext,
0566                         serviceToken = m_serviceToken,
0567                         holder = m_holder]() {
0568                          //Need to make the services available
0569                          ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0570 
0571                          std::exception_ptr ptr;
0572                          worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0573                        });
0574             return;
0575           }
0576         }
0577 
0578         m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0579       }
0580 
0581     private:
0582       Worker* m_worker;
0583       EventTransitionInfo m_eventTransitionInfo;
0584       ParentContext const m_parentContext;
0585       WaitingTaskWithArenaHolder m_holder;
0586       ServiceWeakToken m_serviceToken;
0587     };
0588 
0589     // This class does nothing unless there is an exception originating
0590     // in an "External Worker". In that case, it handles converting the
0591     // exception to a CMS exception and adding context to the exception.
0592     class HandleExternalWorkExceptionTask : public WaitingTask {
0593     public:
0594       HandleExternalWorkExceptionTask(Worker* worker,
0595                                       oneapi::tbb::task_group* group,
0596                                       WaitingTask* runModuleTask,
0597                                       ParentContext const& parentContext) noexcept;
0598 
0599       void execute() final;
0600 
0601     private:
0602       Worker* m_worker;
0603       WaitingTask* m_runModuleTask;
0604       oneapi::tbb::task_group* m_group;
0605       ParentContext const m_parentContext;
0606     };
0607 
0608     std::atomic<int> timesRun_;
0609     std::atomic<int> timesVisited_;
0610     std::atomic<int> timesPassed_;
0611     std::atomic<int> timesFailed_;
0612     std::atomic<int> timesExcept_;
0613     std::atomic<State> state_;
0614     int numberOfPathsOn_;
0615     std::atomic<int> numberOfPathsLeftToRun_;
0616 
0617     ModuleCallingContext moduleCallingContext_;
0618 
0619     ExceptionToActionTable const* actions_;                         // memory assumed to be managed elsewhere
0620     CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;  // if state is 'exception'
0621 
0622     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0623 
0624     edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0625 
0626     edm::WaitingTaskList waitingTasks_;
0627     std::atomic<bool> workStarted_;
0628     bool ranAcquireWithoutException_;
0629     bool moduleValid_ = true;
0630     bool shouldTryToContinue_ = false;
0631     bool beginSucceeded_ = false;
0632   };
0633 
0634   namespace {
0635     template <typename T>
0636     class ModuleSignalSentry {
0637     public:
0638       ModuleSignalSentry(ActivityRegistry* a,
0639                          typename T::Context const* context,
0640                          ModuleCallingContext const* moduleCallingContext)
0641           : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
0642 
0643       ~ModuleSignalSentry() {
0644         // This destructor does nothing unless we are unwinding the
0645         // the stack from an earlier exception (a_ will be null if we are
0646         // are not). We want to report the earlier exception and ignore any
0647         // addition exceptions from the post module signal.
0648         CMS_SA_ALLOW try {
0649           if (a_) {
0650             T::postModuleSignal(a_, context_, moduleCallingContext_);
0651           }
0652         } catch (...) {
0653         }
0654       }
0655       void preModuleSignal() {
0656         if (a_) {
0657           try {
0658             convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
0659           } catch (cms::Exception& ex) {
0660             ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
0661             throw;
0662           }
0663         }
0664       }
0665       void postModuleSignal() {
0666         if (a_) {
0667           auto temp = a_;
0668           // Setting a_ to null informs the destructor that the signal
0669           // was already run and that it should do nothing.
0670           a_ = nullptr;
0671           try {
0672             convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
0673           } catch (cms::Exception& ex) {
0674             ex.addContext("Handling post module signal, likely in a service function immediately after module method");
0675             throw;
0676           }
0677         }
0678       }
0679 
0680     private:
0681       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0682       typename T::Context const* context_;
0683       ModuleCallingContext const* moduleCallingContext_;
0684     };
0685 
0686   }  // namespace
0687 
0688   namespace workerhelper {
0689     template <>
0690     class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0691     public:
0692       typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0693       static bool call(Worker* iWorker,
0694                        StreamID,
0695                        EventTransitionInfo const& info,
0696                        ActivityRegistry* /* actReg */,
0697                        ModuleCallingContext const* mcc,
0698                        Arg::Context const* /* context*/) {
0699         //Signal sentry is handled by the module
0700         return iWorker->implDo(info, mcc);
0701       }
0702       static void esPrefetchAsync(Worker* worker,
0703                                   WaitingTaskHolder waitingTask,
0704                                   ServiceToken const& token,
0705                                   EventTransitionInfo const& info,
0706                                   Transition transition) noexcept {
0707         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0708       }
0709       static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0710       static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0711 
0712       static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0713       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0714     };
0715 
0716     template <>
0717     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0718     public:
0719       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0720       static bool call(Worker* iWorker,
0721                        StreamID,
0722                        RunTransitionInfo const& info,
0723                        ActivityRegistry* actReg,
0724                        ModuleCallingContext const* mcc,
0725                        Arg::Context const* context) {
0726         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0727         // If preModuleSignal() throws, implDoBegin() is not called, and the
0728         // cpp destructor calls postModuleSignal (ignoring additional exceptions)
0729         cpp.preModuleSignal();
0730         // If implDoBegin() throws, the cpp destructor calls postModuleSignal
0731         // (ignoring additional exceptions)
0732         auto returnValue = iWorker->implDoBegin(info, mcc);
0733         // If postModuleSignal() throws, the exception will propagate to the framework
0734         cpp.postModuleSignal();
0735         iWorker->beginSucceeded_ = true;
0736         return returnValue;
0737       }
0738       static void esPrefetchAsync(Worker* worker,
0739                                   WaitingTaskHolder waitingTask,
0740                                   ServiceToken const& token,
0741                                   RunTransitionInfo const& info,
0742                                   Transition transition) noexcept {
0743         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0744       }
0745       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0746       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0747       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0748       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0749     };
0750     template <>
0751     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0752     public:
0753       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0754       static bool call(Worker* iWorker,
0755                        StreamID id,
0756                        RunTransitionInfo const& info,
0757                        ActivityRegistry* actReg,
0758                        ModuleCallingContext const* mcc,
0759                        Arg::Context const* context) {
0760         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0761         cpp.preModuleSignal();
0762         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0763         cpp.postModuleSignal();
0764         iWorker->beginSucceeded_ = true;
0765         return returnValue;
0766       }
0767       static void esPrefetchAsync(Worker* worker,
0768                                   WaitingTaskHolder waitingTask,
0769                                   ServiceToken const& token,
0770                                   RunTransitionInfo const& info,
0771                                   Transition transition) noexcept {
0772         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0773       }
0774       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0775       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0776       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0777       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0778     };
0779     template <>
0780     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0781     public:
0782       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0783       static bool call(Worker* iWorker,
0784                        StreamID,
0785                        RunTransitionInfo const& info,
0786                        ActivityRegistry* actReg,
0787                        ModuleCallingContext const* mcc,
0788                        Arg::Context const* context) {
0789         if (iWorker->beginSucceeded_) {
0790           iWorker->beginSucceeded_ = false;
0791 
0792           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0793           cpp.preModuleSignal();
0794           auto returnValue = iWorker->implDoEnd(info, mcc);
0795           cpp.postModuleSignal();
0796           return returnValue;
0797         }
0798         return true;
0799       }
0800       static void esPrefetchAsync(Worker* worker,
0801                                   WaitingTaskHolder waitingTask,
0802                                   ServiceToken const& token,
0803                                   RunTransitionInfo const& info,
0804                                   Transition transition) noexcept {
0805         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0806       }
0807       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0808       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0809       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0810       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0811     };
0812     template <>
0813     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0814     public:
0815       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0816       static bool call(Worker* iWorker,
0817                        StreamID id,
0818                        RunTransitionInfo const& info,
0819                        ActivityRegistry* actReg,
0820                        ModuleCallingContext const* mcc,
0821                        Arg::Context const* context) {
0822         if (iWorker->beginSucceeded_) {
0823           iWorker->beginSucceeded_ = false;
0824 
0825           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0826           cpp.preModuleSignal();
0827           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0828           cpp.postModuleSignal();
0829           return returnValue;
0830         }
0831         return true;
0832       }
0833       static void esPrefetchAsync(Worker* worker,
0834                                   WaitingTaskHolder waitingTask,
0835                                   ServiceToken const& token,
0836                                   RunTransitionInfo const& info,
0837                                   Transition transition) noexcept {
0838         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0839       }
0840       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0841       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0842       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0843       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0844     };
0845 
0846     template <>
0847     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0848     public:
0849       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0850       static bool call(Worker* iWorker,
0851                        StreamID,
0852                        LumiTransitionInfo const& info,
0853                        ActivityRegistry* actReg,
0854                        ModuleCallingContext const* mcc,
0855                        Arg::Context const* context) {
0856         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0857         cpp.preModuleSignal();
0858         auto returnValue = iWorker->implDoBegin(info, mcc);
0859         cpp.postModuleSignal();
0860         iWorker->beginSucceeded_ = true;
0861         return returnValue;
0862       }
0863       static void esPrefetchAsync(Worker* worker,
0864                                   WaitingTaskHolder waitingTask,
0865                                   ServiceToken const& token,
0866                                   LumiTransitionInfo const& info,
0867                                   Transition transition) noexcept {
0868         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0869       }
0870       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0871       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0872       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0873         return iWorker->globalLuminosityBlocksQueue();
0874       }
0875       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0876     };
0877     template <>
0878     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0879     public:
0880       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0881       static bool call(Worker* iWorker,
0882                        StreamID id,
0883                        LumiTransitionInfo const& info,
0884                        ActivityRegistry* actReg,
0885                        ModuleCallingContext const* mcc,
0886                        Arg::Context const* context) {
0887         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0888         cpp.preModuleSignal();
0889         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0890         cpp.postModuleSignal();
0891         iWorker->beginSucceeded_ = true;
0892         return returnValue;
0893       }
0894       static void esPrefetchAsync(Worker* worker,
0895                                   WaitingTaskHolder waitingTask,
0896                                   ServiceToken const& token,
0897                                   LumiTransitionInfo const& info,
0898                                   Transition transition) noexcept {
0899         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0900       }
0901       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0902       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0903       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0904       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0905     };
0906 
0907     template <>
0908     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0909     public:
0910       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0911       static bool call(Worker* iWorker,
0912                        StreamID,
0913                        LumiTransitionInfo const& info,
0914                        ActivityRegistry* actReg,
0915                        ModuleCallingContext const* mcc,
0916                        Arg::Context const* context) {
0917         if (iWorker->beginSucceeded_) {
0918           iWorker->beginSucceeded_ = false;
0919 
0920           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0921           cpp.preModuleSignal();
0922           auto returnValue = iWorker->implDoEnd(info, mcc);
0923           cpp.postModuleSignal();
0924           return returnValue;
0925         }
0926         return true;
0927       }
0928       static void esPrefetchAsync(Worker* worker,
0929                                   WaitingTaskHolder waitingTask,
0930                                   ServiceToken const& token,
0931                                   LumiTransitionInfo const& info,
0932                                   Transition transition) noexcept {
0933         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0934       }
0935       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0936       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0937       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0938       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0939         return iWorker->globalLuminosityBlocksQueue();
0940       }
0941     };
0942     template <>
0943     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0944     public:
0945       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0946       static bool call(Worker* iWorker,
0947                        StreamID id,
0948                        LumiTransitionInfo const& info,
0949                        ActivityRegistry* actReg,
0950                        ModuleCallingContext const* mcc,
0951                        Arg::Context const* context) {
0952         if (iWorker->beginSucceeded_) {
0953           iWorker->beginSucceeded_ = false;
0954 
0955           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0956           cpp.preModuleSignal();
0957           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0958           cpp.postModuleSignal();
0959           return returnValue;
0960         }
0961         return true;
0962       }
0963       static void esPrefetchAsync(Worker* worker,
0964                                   WaitingTaskHolder waitingTask,
0965                                   ServiceToken const& token,
0966                                   LumiTransitionInfo const& info,
0967                                   Transition transition) noexcept {
0968         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0969       }
0970       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0971       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0972       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0973       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0974     };
0975     template <>
0976     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0977     public:
0978       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0979       static bool call(Worker* iWorker,
0980                        StreamID,
0981                        ProcessBlockTransitionInfo const& info,
0982                        ActivityRegistry* actReg,
0983                        ModuleCallingContext const* mcc,
0984                        Arg::Context const* context) {
0985         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0986         cpp.preModuleSignal();
0987         auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0988         cpp.postModuleSignal();
0989         return returnValue;
0990       }
0991       static void esPrefetchAsync(
0992           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0993       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0994       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0995       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0996       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0997     };
0998     template <>
0999     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
1000     public:
1001       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1002       static bool call(Worker* iWorker,
1003                        StreamID,
1004                        ProcessBlockTransitionInfo const& info,
1005                        ActivityRegistry* actReg,
1006                        ModuleCallingContext const* mcc,
1007                        Arg::Context const* context) {
1008         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1009         cpp.preModuleSignal();
1010         auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
1011         cpp.postModuleSignal();
1012         return returnValue;
1013       }
1014       static void esPrefetchAsync(
1015           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1016       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1017       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1018       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1019       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1020     };
1021     template <>
1022     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
1023     public:
1024       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1025       static bool call(Worker* iWorker,
1026                        StreamID,
1027                        ProcessBlockTransitionInfo const& info,
1028                        ActivityRegistry* actReg,
1029                        ModuleCallingContext const* mcc,
1030                        Arg::Context const* context) {
1031         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1032         cpp.preModuleSignal();
1033         auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1034         cpp.postModuleSignal();
1035         return returnValue;
1036       }
1037       static void esPrefetchAsync(
1038           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1039       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1040       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1041       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1042       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1043     };
1044   }  // namespace workerhelper
1045 
1046   template <typename T>
1047   void Worker::prefetchAsync(WaitingTaskHolder iTask,
1048                              ServiceToken const& token,
1049                              ParentContext const& parentContext,
1050                              typename T::TransitionInfoType const& transitionInfo,
1051                              Transition iTransition) noexcept {
1052     Principal const& principal = transitionInfo.principal();
1053 
1054     moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1055 
1056     if constexpr (T::isEvent_) {
1057       actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1058     } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1059       actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1060     } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1061       actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1062     }
1063 
1064     workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1065     edPrefetchAsync(iTask, token, principal);
1066 
1067     if (principal.branchType() == InEvent) {
1068       preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1069     }
1070   }
1071 
1072   template <typename T>
1073   void Worker::doWorkAsync(WaitingTaskHolder task,
1074                            typename T::TransitionInfoType const& transitionInfo,
1075                            ServiceToken const& token,
1076                            StreamID streamID,
1077                            ParentContext const& parentContext,
1078                            typename T::Context const* context) noexcept {
1079     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1080       return;
1081     }
1082 
1083     //Need to check workStarted_ before adding to waitingTasks_
1084     bool expected = false;
1085     bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1086 
1087     waitingTasks_.add(task);
1088     if constexpr (T::isEvent_) {
1089       timesVisited_.fetch_add(1, std::memory_order_relaxed);
1090     }
1091 
1092     if (workStarted) {
1093       moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1094 
1095       //if have TriggerResults based selection we want to reject the event before doing prefetching
1096       if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1097         //We need to run the selection in a different task so that
1098         // we can prefetch the data needed for the selection
1099         WaitingTask* moduleTask =
1100             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1101 
1102         //make sure the task is either run or destroyed
1103         struct DestroyTask {
1104           DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1105 
1106           ~DestroyTask() noexcept {
1107             auto p = m_task.exchange(nullptr);
1108             if (p) {
1109               TaskSentry s{p};
1110             }
1111           }
1112 
1113           edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1114 
1115         private:
1116           std::atomic<edm::WaitingTask*> m_task;
1117         };
1118         if constexpr (T::isEvent_) {
1119           if (hasAcquire()) {
1120             auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1121             ServiceWeakToken weakToken = token;
1122             auto* group = task.group();
1123             moduleTask = make_waiting_task(
1124                 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1125                   WaitingTaskWithArenaHolder runTaskHolder(
1126                       *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1127                   AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1128                   t.execute();
1129                 });
1130           }
1131         }
1132         auto* group = task.group();
1133         auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1134         ServiceWeakToken weakToken = token;
1135         auto selectionTask =
1136             make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1137                                   std::exception_ptr const*) mutable {
1138               ServiceRegistry::Operate guard(weakToken.lock());
1139               prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1140                                weakToken.lock(),
1141                                parentContext,
1142                                info,
1143                                T::transition_);
1144             });
1145         prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1146       } else {
1147         WaitingTask* moduleTask =
1148             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1149         auto group = task.group();
1150         if constexpr (T::isEvent_) {
1151           if (hasAcquire()) {
1152             WaitingTaskWithArenaHolder runTaskHolder(
1153                 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1154             moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1155           }
1156         }
1157         prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1158       }
1159     }
1160   }
1161 
1162   template <typename T>
1163   std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1164                                                          typename T::TransitionInfoType const& transitionInfo,
1165                                                          StreamID streamID,
1166                                                          ParentContext const& parentContext,
1167                                                          typename T::Context const* context) noexcept {
1168     std::exception_ptr exceptionPtr;
1169     bool shouldRun = true;
1170     if (iEPtr) {
1171       if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1172         exceptionPtr = iEPtr;
1173         setException<T::isEvent_>(exceptionPtr);
1174         shouldRun = false;
1175       } else {
1176         if (not shouldTryToContinue_) {
1177           setPassed<T::isEvent_>();
1178           shouldRun = false;
1179         }
1180       }
1181     }
1182     if (shouldRun) {
1183       // Caught exception is propagated via WaitingTaskList
1184       CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1185         exceptionPtr = std::current_exception();
1186       }
1187     } else {
1188       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1189     }
1190     waitingTasks_.doneWaiting(exceptionPtr);
1191     return exceptionPtr;
1192   }
1193 
1194   template <typename T>
1195   void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1196                                         typename T::TransitionInfoType const& transitionInfo,
1197                                         ServiceToken const& serviceToken,
1198                                         StreamID streamID,
1199                                         ParentContext const& parentContext,
1200                                         typename T::Context const* context) noexcept {
1201     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1202       return;
1203     }
1204 
1205     //Need to check workStarted_ before adding to waitingTasks_
1206     bool expected = false;
1207     auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1208 
1209     waitingTasks_.add(task);
1210     if (workStarted) {
1211       ServiceWeakToken weakToken = serviceToken;
1212       auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1213         std::exception_ptr exceptionPtr;
1214         // Caught exception is propagated via WaitingTaskList
1215         CMS_SA_ALLOW try {
1216           //Need to make the services available
1217           ServiceRegistry::Operate guard(weakToken.lock());
1218 
1219           this->runModule<T>(info, streamID, parentContext, context);
1220         } catch (...) {
1221           exceptionPtr = std::current_exception();
1222         }
1223         this->waitingTasks_.doneWaiting(exceptionPtr);
1224       };
1225 
1226       if (needsESPrefetching(T::transition_)) {
1227         auto group = task.group();
1228         auto afterPrefetch =
1229             edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1230               if (iExcept) {
1231                 this->waitingTasks_.doneWaiting(*iExcept);
1232               } else {
1233                 if (auto queue = this->serializeRunModule()) {
1234                   queue.push(*group, toDo);
1235                 } else {
1236                   group->run(toDo);
1237                 }
1238               }
1239             });
1240         moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1241         esPrefetchAsync(
1242             WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1243       } else {
1244         auto group = task.group();
1245         if (auto queue = this->serializeRunModule()) {
1246           queue.push(*group, toDo);
1247         } else {
1248           group->run(toDo);
1249         }
1250       }
1251     }
1252   }
1253 
1254   template <typename T>
1255   bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1256                          StreamID streamID,
1257                          ParentContext const& parentContext,
1258                          typename T::Context const* context) {
1259     //unscheduled producers should advance this
1260     //if (T::isEvent_) {
1261     //  ++timesVisited_;
1262     //}
1263     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1264     if constexpr (T::isEvent_) {
1265       timesRun_.fetch_add(1, std::memory_order_relaxed);
1266     }
1267 
1268     bool rc = true;
1269     try {
1270       convertException::wrap([&]() {
1271         rc = workerhelper::CallImpl<T>::call(
1272             this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1273 
1274         if (rc) {
1275           setPassed<T::isEvent_>();
1276         } else {
1277           setFailed<T::isEvent_>();
1278         }
1279       });
1280     } catch (cms::Exception& ex) {
1281       edm::exceptionContext(ex, moduleCallingContext_);
1282       if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1283         assert(not cached_exception_);
1284         setException<T::isEvent_>(std::current_exception());
1285         std::rethrow_exception(cached_exception_);
1286       } else {
1287         rc = setPassed<T::isEvent_>();
1288       }
1289     }
1290 
1291     return rc;
1292   }
1293 
1294   template <typename T>
1295   std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1296                                                StreamID streamID,
1297                                                ParentContext const& parentContext,
1298                                                typename T::Context const* context) noexcept {
1299     timesVisited_.fetch_add(1, std::memory_order_relaxed);
1300     std::exception_ptr prefetchingException;  // null because there was no prefetching to do
1301     return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1302   }
1303 }  // namespace edm
1304 #endif