Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-07-28 22:48:21

0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 Worker: this is a basic scheduling unit - an abstract base class to
0007 something that is really a producer or filter.
0008 
0009 A worker will not actually call through to the module unless it is
0010 in a Ready state.  After a module is actually run, the state will not
0011 be Ready.  The Ready state can only be reestablished by doing a reset().
0012 
0013 Pre/post module signals are posted only in the Ready state.
0014 
0015 Execution statistics are kept here.
0016 
0017 If a module has thrown an exception during execution, that exception
0018 will be rethrown if the worker is entered again and the state is not Ready.
0019 In other words, execution results (status) are cached and reused until
0020 the worker is reset().
0021 
0022 ----------------------------------------------------------------------*/
0023 
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0047 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0048 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0049 #include "FWCore/Concurrency/interface/FunctorTask.h"
0050 #include "FWCore/Utilities/interface/Exception.h"
0051 #include "FWCore/Utilities/interface/ConvertException.h"
0052 #include "FWCore/Utilities/interface/BranchType.h"
0053 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0054 #include "FWCore/Utilities/interface/StreamID.h"
0055 #include "FWCore/Utilities/interface/propagate_const.h"
0056 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0057 #include "FWCore/Utilities/interface/ESIndices.h"
0058 #include "FWCore/Utilities/interface/Transition.h"
0059 
0060 #include "FWCore/Framework/interface/Frameworkfwd.h"
0061 
0062 #include <array>
0063 #include <atomic>
0064 #include <cassert>
0065 #include <map>
0066 #include <memory>
0067 #include <sstream>
0068 #include <string>
0069 #include <vector>
0070 #include <exception>
0071 #include <unordered_map>
0072 
0073 namespace edm {
0074   class EventPrincipal;
0075   class EventSetupImpl;
0076   class EarlyDeleteHelper;
0077   class ModuleProcessName;
0078   class ProductResolverIndexHelper;
0079   class ProductResolverIndexAndSkipBit;
0080   class ProductRegistry;
0081   class ThinnedAssociationsHelper;
0082 
0083   namespace workerhelper {
0084     template <typename O>
0085     class CallImpl;
0086   }
0087   namespace eventsetup {
0088     class ESRecordsToProductResolverIndices;
0089   }
0090 
0091   class Worker {
0092   public:
0093     enum State { Ready, Pass, Fail, Exception };
0094     enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0095     enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0096     struct TaskQueueAdaptor {
0097       SerialTaskQueueChain* serial_ = nullptr;
0098       LimitedTaskQueue* limited_ = nullptr;
0099 
0100       TaskQueueAdaptor() = default;
0101       TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0102       TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0103 
0104       operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0105 
0106       template <class F>
0107       void push(oneapi::tbb::task_group& iG, F&& iF) {
0108         if (serial_) {
0109           serial_->push(iG, iF);
0110         } else {
0111           limited_->push(iG, iF);
0112         }
0113       }
0114     };
0115 
0116     Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0117     virtual ~Worker();
0118 
0119     Worker(Worker const&) = delete;             // Disallow copying and moving
0120     Worker& operator=(Worker const&) = delete;  // Disallow copying and moving
0121 
0122     void clearModule() {
0123       moduleValid_ = false;
0124       doClearModule();
0125     }
0126 
0127     virtual bool wantsProcessBlocks() const noexcept = 0;
0128     virtual bool wantsInputProcessBlocks() const noexcept = 0;
0129     virtual bool wantsGlobalRuns() const noexcept = 0;
0130     virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0131     virtual bool wantsStreamRuns() const noexcept = 0;
0132     virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0133 
0134     virtual SerialTaskQueue* globalRunsQueue() = 0;
0135     virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0136 
0137     void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0138                                    WaitingTask* task,
0139                                    ServiceToken const&,
0140                                    StreamID stream,
0141                                    EventPrincipal const*) noexcept;
0142 
0143     void prePrefetchSelectionAsync(
0144         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0145       assert(false);
0146     }
0147 
0148     template <typename T>
0149     void doWorkAsync(WaitingTaskHolder,
0150                      typename T::TransitionInfoType const&,
0151                      ServiceToken const&,
0152                      StreamID,
0153                      ParentContext const&,
0154                      typename T::Context const*) noexcept;
0155 
0156     template <typename T>
0157     void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0158                                   typename T::TransitionInfoType const&,
0159                                   ServiceToken const&,
0160                                   StreamID,
0161                                   ParentContext const&,
0162                                   typename T::Context const*) noexcept;
0163 
0164     template <typename T>
0165     std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0166                                          StreamID,
0167                                          ParentContext const&,
0168                                          typename T::Context const*) noexcept;
0169 
0170     virtual size_t transformIndex(edm::BranchDescription const&) const noexcept = 0;
0171     void doTransformAsync(WaitingTaskHolder,
0172                           size_t iTransformIndex,
0173                           EventPrincipal const&,
0174                           ServiceToken const&,
0175                           StreamID,
0176                           ModuleCallingContext const&,
0177                           StreamContext const*) noexcept;
0178 
0179     void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0180     void skipOnPath(EventPrincipal const& iEvent);
0181     void beginJob(GlobalContext const&);
0182     void endJob(GlobalContext const&);
0183     void beginStream(StreamID, StreamContext const&);
0184     void endStream(StreamID, StreamContext const&);
0185     void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0186     void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0187     void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0188     void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0189 
0190     void reset() {
0191       cached_exception_ = std::exception_ptr();
0192       state_ = Ready;
0193       waitingTasks_.reset();
0194       workStarted_ = false;
0195       numberOfPathsLeftToRun_ = numberOfPathsOn_;
0196     }
0197 
0198     void postDoEvent(EventPrincipal const&);
0199 
0200     ModuleDescription const* description() const noexcept {
0201       if (moduleValid_) {
0202         return moduleCallingContext_.moduleDescription();
0203       }
0204       return nullptr;
0205     }
0206     ///The signals are required to live longer than the last call to 'doWork'
0207     /// this was done to improve performance based on profiling
0208     void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0209 
0210     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0211 
0212     //Used to make EDGetToken work
0213     virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0214     virtual void updateLookup(eventsetup::ESRecordsToProductResolverIndices const&) = 0;
0215     virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0216     virtual void resolvePutIndicies(
0217         BranchType iBranchType,
0218         std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0219             iIndicies) = 0;
0220 
0221     virtual void modulesWhoseProductsAreConsumed(
0222         std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0223         std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0224         ProductRegistry const& preg,
0225         std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0226 
0227     virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0228 
0229     virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0230 
0231     virtual Types moduleType() const = 0;
0232     virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0233 
0234     void clearCounters() noexcept {
0235       timesRun_.store(0, std::memory_order_release);
0236       timesVisited_.store(0, std::memory_order_release);
0237       timesPassed_.store(0, std::memory_order_release);
0238       timesFailed_.store(0, std::memory_order_release);
0239       timesExcept_.store(0, std::memory_order_release);
0240     }
0241 
0242     void addedToPath() noexcept { ++numberOfPathsOn_; }
0243     //NOTE: calling state() is done to force synchronization across threads
0244     int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0245     int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0246     int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0247     int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0248     int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0249     State state() const noexcept { return state_; }
0250 
0251     int timesPass() const noexcept { return timesPassed(); }  // for backward compatibility only - to be removed soon
0252 
0253     virtual bool hasAccumulator() const noexcept = 0;
0254 
0255     // Used in PuttableProductResolver
0256     edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0257 
0258   protected:
0259     template <typename O>
0260     friend class workerhelper::CallImpl;
0261 
0262     virtual void doClearModule() = 0;
0263 
0264     virtual std::string workerType() const = 0;
0265     virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0266 
0267     virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0268     virtual bool implNeedToRunSelection() const noexcept = 0;
0269 
0270     virtual void implDoAcquire(EventTransitionInfo const&,
0271                                ModuleCallingContext const*,
0272                                WaitingTaskWithArenaHolder&) = 0;
0273 
0274     virtual void implDoTransformAsync(WaitingTaskHolder,
0275                                       size_t iTransformIndex,
0276                                       EventPrincipal const&,
0277                                       ParentContext const&,
0278                                       ServiceWeakToken const&) noexcept = 0;
0279     virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0280 
0281     virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0282     virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0283     virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0284     virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0285     virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0286     virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0287     virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0288     virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0289     virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0290     virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0291     virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0292     virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0293     virtual void implBeginJob() = 0;
0294     virtual void implEndJob() = 0;
0295     virtual void implBeginStream(StreamID) = 0;
0296     virtual void implEndStream(StreamID) = 0;
0297 
0298     void resetModuleDescription(ModuleDescription const*);
0299 
0300     ActivityRegistry* activityRegistry() { return actReg_.get(); }
0301 
0302   private:
0303     template <typename T>
0304     bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0305 
0306     virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0307     virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0308 
0309     virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0310 
0311     virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0312     virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0313 
0314     virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0315                                               ModuleCallingContext const& moduleCallingContext,
0316                                               Principal const& iPrincipal) const noexcept = 0;
0317 
0318     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0319     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0320     virtual void implRespondToCloseOutputFile() = 0;
0321 
0322     virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0323 
0324     virtual TaskQueueAdaptor serializeRunModule() = 0;
0325 
0326     bool shouldRethrowException(std::exception_ptr iPtr,
0327                                 ParentContext const& parentContext,
0328                                 bool isEvent,
0329                                 bool isTryToContinue) const noexcept;
0330     void checkForShouldTryToContinue(ModuleDescription const&);
0331 
0332     template <bool IS_EVENT>
0333     bool setPassed() {
0334       if (IS_EVENT) {
0335         timesPassed_.fetch_add(1, std::memory_order_relaxed);
0336       }
0337       state_ = Pass;
0338       return true;
0339     }
0340 
0341     template <bool IS_EVENT>
0342     bool setFailed() {
0343       if (IS_EVENT) {
0344         timesFailed_.fetch_add(1, std::memory_order_relaxed);
0345       }
0346       state_ = Fail;
0347       return false;
0348     }
0349 
0350     template <bool IS_EVENT>
0351     std::exception_ptr setException(std::exception_ptr iException) {
0352       if (IS_EVENT) {
0353         timesExcept_.fetch_add(1, std::memory_order_relaxed);
0354       }
0355       cached_exception_ = iException;  // propagate_const<T> has no reset() function
0356       state_ = Exception;
0357       return cached_exception_;
0358     }
0359 
0360     template <typename T>
0361     void prefetchAsync(WaitingTaskHolder,
0362                        ServiceToken const&,
0363                        ParentContext const&,
0364                        typename T::TransitionInfoType const&,
0365                        Transition) noexcept;
0366 
0367     void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0368     void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0369 
0370     bool needsESPrefetching(Transition iTrans) const noexcept {
0371       return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0372     }
0373 
0374     void emitPostModuleEventPrefetchingSignal() {
0375       actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0376     }
0377 
0378     void emitPostModuleStreamPrefetchingSignal() {
0379       actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0380                                                        moduleCallingContext_);
0381     }
0382 
0383     void emitPostModuleGlobalPrefetchingSignal() {
0384       actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0385                                                        moduleCallingContext_);
0386     }
0387 
0388     virtual bool hasAcquire() const noexcept = 0;
0389 
0390     template <typename T>
0391     std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0392                                                    typename T::TransitionInfoType const&,
0393                                                    StreamID,
0394                                                    ParentContext const&,
0395                                                    typename T::Context const*) noexcept;
0396 
0397     void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0398 
0399     void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0400                                       EventTransitionInfo const&,
0401                                       ParentContext const&,
0402                                       WaitingTaskWithArenaHolder) noexcept;
0403 
0404     std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0405                                                    ParentContext const& parentContext) noexcept;
0406 
0407     template <typename T>
0408     class RunModuleTask : public WaitingTask {
0409     public:
0410       RunModuleTask(Worker* worker,
0411                     typename T::TransitionInfoType const& transitionInfo,
0412                     ServiceToken const& token,
0413                     StreamID streamID,
0414                     ParentContext const& parentContext,
0415                     typename T::Context const* context,
0416                     oneapi::tbb::task_group* iGroup) noexcept
0417           : m_worker(worker),
0418             m_transitionInfo(transitionInfo),
0419             m_streamID(streamID),
0420             m_parentContext(parentContext),
0421             m_context(context),
0422             m_serviceToken(token),
0423             m_group(iGroup) {}
0424 
0425       struct EnableQueueGuard {
0426         SerialTaskQueue* queue_;
0427         EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0428         EnableQueueGuard(EnableQueueGuard const&) = delete;
0429         EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0430         EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0431         EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0432         ~EnableQueueGuard() {
0433           if (queue_) {
0434             queue_->resume();
0435           }
0436         }
0437       };
0438 
0439       void execute() final {
0440         //Need to make the services available early so other services can see them
0441         ServiceRegistry::Operate guard(m_serviceToken.lock());
0442 
0443         //incase the emit causes an exception, we need a memory location
0444         // to hold the exception_ptr
0445         std::exception_ptr temp_excptr;
0446         auto excptr = exceptionPtr();
0447         if constexpr (T::isEvent_) {
0448           if (!m_worker->hasAcquire()) {
0449             // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
0450             CMS_SA_ALLOW try {
0451               //pre was called in prefetchAsync
0452               m_worker->emitPostModuleEventPrefetchingSignal();
0453             } catch (...) {
0454               temp_excptr = std::current_exception();
0455               if (not excptr) {
0456                 excptr = temp_excptr;
0457               }
0458             }
0459           }
0460         } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0461           m_worker->emitPostModuleStreamPrefetchingSignal();
0462         } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0463           m_worker->emitPostModuleGlobalPrefetchingSignal();
0464         }
0465 
0466         if (not excptr) {
0467           if (auto queue = m_worker->serializeRunModule()) {
0468             auto f = [worker = m_worker,
0469                       info = m_transitionInfo,
0470                       streamID = m_streamID,
0471                       parentContext = m_parentContext,
0472                       sContext = m_context,
0473                       serviceToken = m_serviceToken]() {
0474               //Need to make the services available
0475               ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0476 
0477               //If needed, we pause the queue in begin transition and resume it
0478               // at the end transition. This can guarantee that the module
0479               // only processes one run or lumi at a time
0480               EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0481               std::exception_ptr ptr;
0482               worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0483             };
0484             //keep another global transition from running if necessary
0485             auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0486             if (gQueue) {
0487               gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0488                 gQueue->pause();
0489                 queue.push(*group, std::move(f));
0490               });
0491             } else {
0492               queue.push(*m_group, std::move(f));
0493             }
0494             return;
0495           }
0496         }
0497 
0498         m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0499       }
0500 
0501     private:
0502       Worker* m_worker;
0503       typename T::TransitionInfoType m_transitionInfo;
0504       StreamID m_streamID;
0505       ParentContext const m_parentContext;
0506       typename T::Context const* m_context;
0507       ServiceWeakToken m_serviceToken;
0508       oneapi::tbb::task_group* m_group;
0509     };
0510 
0511     // AcquireTask is only used for the Event case, but we define
0512     // it as a template so all cases will compile.
0513     // DUMMY exists to work around the C++ Standard prohibition on
0514     // fully specializing templates nested in other classes.
0515     template <typename T, typename DUMMY = void>
0516     class AcquireTask : public WaitingTask {
0517     public:
0518       AcquireTask(Worker*,
0519                   typename T::TransitionInfoType const&,
0520                   ServiceToken const&,
0521                   ParentContext const&,
0522                   WaitingTaskWithArenaHolder) noexcept {}
0523       void execute() final {}
0524     };
0525 
0526     template <typename DUMMY>
0527     class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0528     public:
0529       AcquireTask(Worker* worker,
0530                   EventTransitionInfo const& eventTransitionInfo,
0531                   ServiceToken const& token,
0532                   ParentContext const& parentContext,
0533                   WaitingTaskWithArenaHolder holder) noexcept
0534           : m_worker(worker),
0535             m_eventTransitionInfo(eventTransitionInfo),
0536             m_parentContext(parentContext),
0537             m_holder(std::move(holder)),
0538             m_serviceToken(token) {}
0539 
0540       void execute() final {
0541         //Need to make the services available early so other services can see them
0542         ServiceRegistry::Operate guard(m_serviceToken.lock());
0543 
0544         //incase the emit causes an exception, we need a memory location
0545         // to hold the exception_ptr
0546         std::exception_ptr temp_excptr;
0547         auto excptr = exceptionPtr();
0548         // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
0549         CMS_SA_ALLOW try {
0550           //pre was called in prefetchAsync
0551           m_worker->emitPostModuleEventPrefetchingSignal();
0552         } catch (...) {
0553           temp_excptr = std::current_exception();
0554           if (not excptr) {
0555             excptr = temp_excptr;
0556           }
0557         }
0558 
0559         if (not excptr) {
0560           if (auto queue = m_worker->serializeRunModule()) {
0561             queue.push(*m_holder.group(),
0562                        [worker = m_worker,
0563                         info = m_eventTransitionInfo,
0564                         parentContext = m_parentContext,
0565                         serviceToken = m_serviceToken,
0566                         holder = m_holder]() {
0567                          //Need to make the services available
0568                          ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0569 
0570                          std::exception_ptr ptr;
0571                          worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0572                        });
0573             return;
0574           }
0575         }
0576 
0577         m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0578       }
0579 
0580     private:
0581       Worker* m_worker;
0582       EventTransitionInfo m_eventTransitionInfo;
0583       ParentContext const m_parentContext;
0584       WaitingTaskWithArenaHolder m_holder;
0585       ServiceWeakToken m_serviceToken;
0586     };
0587 
0588     // This class does nothing unless there is an exception originating
0589     // in an "External Worker". In that case, it handles converting the
0590     // exception to a CMS exception and adding context to the exception.
0591     class HandleExternalWorkExceptionTask : public WaitingTask {
0592     public:
0593       HandleExternalWorkExceptionTask(Worker* worker,
0594                                       oneapi::tbb::task_group* group,
0595                                       WaitingTask* runModuleTask,
0596                                       ParentContext const& parentContext) noexcept;
0597 
0598       void execute() final;
0599 
0600     private:
0601       Worker* m_worker;
0602       WaitingTask* m_runModuleTask;
0603       oneapi::tbb::task_group* m_group;
0604       ParentContext const m_parentContext;
0605     };
0606 
0607     std::atomic<int> timesRun_;
0608     std::atomic<int> timesVisited_;
0609     std::atomic<int> timesPassed_;
0610     std::atomic<int> timesFailed_;
0611     std::atomic<int> timesExcept_;
0612     std::atomic<State> state_;
0613     int numberOfPathsOn_;
0614     std::atomic<int> numberOfPathsLeftToRun_;
0615 
0616     ModuleCallingContext moduleCallingContext_;
0617 
0618     ExceptionToActionTable const* actions_;                         // memory assumed to be managed elsewhere
0619     CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;  // if state is 'exception'
0620 
0621     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0622 
0623     edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0624 
0625     edm::WaitingTaskList waitingTasks_;
0626     std::atomic<bool> workStarted_;
0627     bool ranAcquireWithoutException_;
0628     bool moduleValid_ = true;
0629     bool shouldTryToContinue_ = false;
0630     bool beginSucceeded_ = false;
0631   };
0632 
0633   namespace {
0634     template <typename T>
0635     class ModuleSignalSentry {
0636     public:
0637       ModuleSignalSentry(ActivityRegistry* a,
0638                          typename T::Context const* context,
0639                          ModuleCallingContext const* moduleCallingContext)
0640           : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
0641 
0642       ~ModuleSignalSentry() {
0643         // This destructor does nothing unless we are unwinding the
0644         // the stack from an earlier exception (a_ will be null if we are
0645         // are not). We want to report the earlier exception and ignore any
0646         // addition exceptions from the post module signal.
0647         CMS_SA_ALLOW try {
0648           if (a_) {
0649             T::postModuleSignal(a_, context_, moduleCallingContext_);
0650           }
0651         } catch (...) {
0652         }
0653       }
0654       void preModuleSignal() {
0655         if (a_) {
0656           try {
0657             convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
0658           } catch (cms::Exception& ex) {
0659             ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
0660             throw;
0661           }
0662         }
0663       }
0664       void postModuleSignal() {
0665         if (a_) {
0666           auto temp = a_;
0667           // Setting a_ to null informs the destructor that the signal
0668           // was already run and that it should do nothing.
0669           a_ = nullptr;
0670           try {
0671             convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
0672           } catch (cms::Exception& ex) {
0673             ex.addContext("Handling post module signal, likely in a service function immediately after module method");
0674             throw;
0675           }
0676         }
0677       }
0678 
0679     private:
0680       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0681       typename T::Context const* context_;
0682       ModuleCallingContext const* moduleCallingContext_;
0683     };
0684 
0685   }  // namespace
0686 
0687   namespace workerhelper {
0688     template <>
0689     class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0690     public:
0691       typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0692       static bool call(Worker* iWorker,
0693                        StreamID,
0694                        EventTransitionInfo const& info,
0695                        ActivityRegistry* /* actReg */,
0696                        ModuleCallingContext const* mcc,
0697                        Arg::Context const* /* context*/) {
0698         //Signal sentry is handled by the module
0699         return iWorker->implDo(info, mcc);
0700       }
0701       static void esPrefetchAsync(Worker* worker,
0702                                   WaitingTaskHolder waitingTask,
0703                                   ServiceToken const& token,
0704                                   EventTransitionInfo const& info,
0705                                   Transition transition) noexcept {
0706         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0707       }
0708       static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0709       static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0710 
0711       static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0712       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0713     };
0714 
0715     template <>
0716     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0717     public:
0718       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0719       static bool call(Worker* iWorker,
0720                        StreamID,
0721                        RunTransitionInfo const& info,
0722                        ActivityRegistry* actReg,
0723                        ModuleCallingContext const* mcc,
0724                        Arg::Context const* context) {
0725         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0726         // If preModuleSignal() throws, implDoBegin() is not called, and the
0727         // cpp destructor calls postModuleSignal (ignoring additional exceptions)
0728         cpp.preModuleSignal();
0729         // If implDoBegin() throws, the cpp destructor calls postModuleSignal
0730         // (ignoring additional exceptions)
0731         auto returnValue = iWorker->implDoBegin(info, mcc);
0732         // If postModuleSignal() throws, the exception will propagate to the framework
0733         cpp.postModuleSignal();
0734         iWorker->beginSucceeded_ = true;
0735         return returnValue;
0736       }
0737       static void esPrefetchAsync(Worker* worker,
0738                                   WaitingTaskHolder waitingTask,
0739                                   ServiceToken const& token,
0740                                   RunTransitionInfo const& info,
0741                                   Transition transition) noexcept {
0742         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0743       }
0744       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0745       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0746       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0747       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0748     };
0749     template <>
0750     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0751     public:
0752       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0753       static bool call(Worker* iWorker,
0754                        StreamID id,
0755                        RunTransitionInfo const& info,
0756                        ActivityRegistry* actReg,
0757                        ModuleCallingContext const* mcc,
0758                        Arg::Context const* context) {
0759         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0760         cpp.preModuleSignal();
0761         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0762         cpp.postModuleSignal();
0763         iWorker->beginSucceeded_ = true;
0764         return returnValue;
0765       }
0766       static void esPrefetchAsync(Worker* worker,
0767                                   WaitingTaskHolder waitingTask,
0768                                   ServiceToken const& token,
0769                                   RunTransitionInfo const& info,
0770                                   Transition transition) noexcept {
0771         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0772       }
0773       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0774       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0775       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0776       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0777     };
0778     template <>
0779     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0780     public:
0781       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0782       static bool call(Worker* iWorker,
0783                        StreamID,
0784                        RunTransitionInfo const& info,
0785                        ActivityRegistry* actReg,
0786                        ModuleCallingContext const* mcc,
0787                        Arg::Context const* context) {
0788         if (iWorker->beginSucceeded_) {
0789           iWorker->beginSucceeded_ = false;
0790 
0791           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0792           cpp.preModuleSignal();
0793           auto returnValue = iWorker->implDoEnd(info, mcc);
0794           cpp.postModuleSignal();
0795           return returnValue;
0796         }
0797         return true;
0798       }
0799       static void esPrefetchAsync(Worker* worker,
0800                                   WaitingTaskHolder waitingTask,
0801                                   ServiceToken const& token,
0802                                   RunTransitionInfo const& info,
0803                                   Transition transition) noexcept {
0804         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0805       }
0806       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0807       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0808       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0809       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0810     };
0811     template <>
0812     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0813     public:
0814       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0815       static bool call(Worker* iWorker,
0816                        StreamID id,
0817                        RunTransitionInfo const& info,
0818                        ActivityRegistry* actReg,
0819                        ModuleCallingContext const* mcc,
0820                        Arg::Context const* context) {
0821         if (iWorker->beginSucceeded_) {
0822           iWorker->beginSucceeded_ = false;
0823 
0824           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0825           cpp.preModuleSignal();
0826           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0827           cpp.postModuleSignal();
0828           return returnValue;
0829         }
0830         return true;
0831       }
0832       static void esPrefetchAsync(Worker* worker,
0833                                   WaitingTaskHolder waitingTask,
0834                                   ServiceToken const& token,
0835                                   RunTransitionInfo const& info,
0836                                   Transition transition) noexcept {
0837         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0838       }
0839       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0840       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0841       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0842       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0843     };
0844 
0845     template <>
0846     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0847     public:
0848       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0849       static bool call(Worker* iWorker,
0850                        StreamID,
0851                        LumiTransitionInfo const& info,
0852                        ActivityRegistry* actReg,
0853                        ModuleCallingContext const* mcc,
0854                        Arg::Context const* context) {
0855         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0856         cpp.preModuleSignal();
0857         auto returnValue = iWorker->implDoBegin(info, mcc);
0858         cpp.postModuleSignal();
0859         iWorker->beginSucceeded_ = true;
0860         return returnValue;
0861       }
0862       static void esPrefetchAsync(Worker* worker,
0863                                   WaitingTaskHolder waitingTask,
0864                                   ServiceToken const& token,
0865                                   LumiTransitionInfo const& info,
0866                                   Transition transition) noexcept {
0867         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0868       }
0869       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0870       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0871       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0872         return iWorker->globalLuminosityBlocksQueue();
0873       }
0874       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0875     };
0876     template <>
0877     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0878     public:
0879       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0880       static bool call(Worker* iWorker,
0881                        StreamID id,
0882                        LumiTransitionInfo const& info,
0883                        ActivityRegistry* actReg,
0884                        ModuleCallingContext const* mcc,
0885                        Arg::Context const* context) {
0886         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0887         cpp.preModuleSignal();
0888         auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0889         cpp.postModuleSignal();
0890         iWorker->beginSucceeded_ = true;
0891         return returnValue;
0892       }
0893       static void esPrefetchAsync(Worker* worker,
0894                                   WaitingTaskHolder waitingTask,
0895                                   ServiceToken const& token,
0896                                   LumiTransitionInfo const& info,
0897                                   Transition transition) noexcept {
0898         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0899       }
0900       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0901       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0902       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0903       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0904     };
0905 
0906     template <>
0907     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0908     public:
0909       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0910       static bool call(Worker* iWorker,
0911                        StreamID,
0912                        LumiTransitionInfo const& info,
0913                        ActivityRegistry* actReg,
0914                        ModuleCallingContext const* mcc,
0915                        Arg::Context const* context) {
0916         if (iWorker->beginSucceeded_) {
0917           iWorker->beginSucceeded_ = false;
0918 
0919           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0920           cpp.preModuleSignal();
0921           auto returnValue = iWorker->implDoEnd(info, mcc);
0922           cpp.postModuleSignal();
0923           return returnValue;
0924         }
0925         return true;
0926       }
0927       static void esPrefetchAsync(Worker* worker,
0928                                   WaitingTaskHolder waitingTask,
0929                                   ServiceToken const& token,
0930                                   LumiTransitionInfo const& info,
0931                                   Transition transition) noexcept {
0932         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0933       }
0934       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0935       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0936       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0937       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0938         return iWorker->globalLuminosityBlocksQueue();
0939       }
0940     };
0941     template <>
0942     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0943     public:
0944       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0945       static bool call(Worker* iWorker,
0946                        StreamID id,
0947                        LumiTransitionInfo const& info,
0948                        ActivityRegistry* actReg,
0949                        ModuleCallingContext const* mcc,
0950                        Arg::Context const* context) {
0951         if (iWorker->beginSucceeded_) {
0952           iWorker->beginSucceeded_ = false;
0953 
0954           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0955           cpp.preModuleSignal();
0956           auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0957           cpp.postModuleSignal();
0958           return returnValue;
0959         }
0960         return true;
0961       }
0962       static void esPrefetchAsync(Worker* worker,
0963                                   WaitingTaskHolder waitingTask,
0964                                   ServiceToken const& token,
0965                                   LumiTransitionInfo const& info,
0966                                   Transition transition) noexcept {
0967         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0968       }
0969       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0970       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0971       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0972       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0973     };
0974     template <>
0975     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0976     public:
0977       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0978       static bool call(Worker* iWorker,
0979                        StreamID,
0980                        ProcessBlockTransitionInfo const& info,
0981                        ActivityRegistry* actReg,
0982                        ModuleCallingContext const* mcc,
0983                        Arg::Context const* context) {
0984         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0985         cpp.preModuleSignal();
0986         auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0987         cpp.postModuleSignal();
0988         iWorker->beginSucceeded_ = true;
0989         return returnValue;
0990       }
0991       static void esPrefetchAsync(
0992           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0993       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0994       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0995       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0996       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0997     };
0998     template <>
0999     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
1000     public:
1001       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1002       static bool call(Worker* iWorker,
1003                        StreamID,
1004                        ProcessBlockTransitionInfo const& info,
1005                        ActivityRegistry* actReg,
1006                        ModuleCallingContext const* mcc,
1007                        Arg::Context const* context) {
1008         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1009         cpp.preModuleSignal();
1010         auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
1011         cpp.postModuleSignal();
1012         return returnValue;
1013       }
1014       static void esPrefetchAsync(
1015           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1016       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1017       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1018       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1019       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1020     };
1021     template <>
1022     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
1023     public:
1024       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1025       static bool call(Worker* iWorker,
1026                        StreamID,
1027                        ProcessBlockTransitionInfo const& info,
1028                        ActivityRegistry* actReg,
1029                        ModuleCallingContext const* mcc,
1030                        Arg::Context const* context) {
1031         if (iWorker->beginSucceeded_) {
1032           iWorker->beginSucceeded_ = false;
1033 
1034           ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1035           cpp.preModuleSignal();
1036           auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1037           cpp.postModuleSignal();
1038           return returnValue;
1039         }
1040         return true;
1041       }
1042       static void esPrefetchAsync(
1043           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1044       static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1045       static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1046       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1047       static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1048     };
1049   }  // namespace workerhelper
1050 
1051   template <typename T>
1052   void Worker::prefetchAsync(WaitingTaskHolder iTask,
1053                              ServiceToken const& token,
1054                              ParentContext const& parentContext,
1055                              typename T::TransitionInfoType const& transitionInfo,
1056                              Transition iTransition) noexcept {
1057     Principal const& principal = transitionInfo.principal();
1058 
1059     moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1060 
1061     if constexpr (T::isEvent_) {
1062       actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1063     } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1064       actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1065     } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1066       actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1067     }
1068 
1069     workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1070     edPrefetchAsync(iTask, token, principal);
1071 
1072     if (principal.branchType() == InEvent) {
1073       preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1074     }
1075   }
1076 
1077   template <typename T>
1078   void Worker::doWorkAsync(WaitingTaskHolder task,
1079                            typename T::TransitionInfoType const& transitionInfo,
1080                            ServiceToken const& token,
1081                            StreamID streamID,
1082                            ParentContext const& parentContext,
1083                            typename T::Context const* context) noexcept {
1084     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1085       return;
1086     }
1087 
1088     //Need to check workStarted_ before adding to waitingTasks_
1089     bool expected = false;
1090     bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1091 
1092     waitingTasks_.add(task);
1093     if constexpr (T::isEvent_) {
1094       timesVisited_.fetch_add(1, std::memory_order_relaxed);
1095     }
1096 
1097     if (workStarted) {
1098       moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1099 
1100       //if have TriggerResults based selection we want to reject the event before doing prefetching
1101       if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1102         //We need to run the selection in a different task so that
1103         // we can prefetch the data needed for the selection
1104         WaitingTask* moduleTask =
1105             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1106 
1107         //make sure the task is either run or destroyed
1108         struct DestroyTask {
1109           DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1110 
1111           ~DestroyTask() noexcept {
1112             auto p = m_task.exchange(nullptr);
1113             if (p) {
1114               TaskSentry s{p};
1115             }
1116           }
1117 
1118           edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1119 
1120         private:
1121           std::atomic<edm::WaitingTask*> m_task;
1122         };
1123         if constexpr (T::isEvent_) {
1124           if (hasAcquire()) {
1125             auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1126             ServiceWeakToken weakToken = token;
1127             auto* group = task.group();
1128             moduleTask = make_waiting_task(
1129                 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1130                   WaitingTaskWithArenaHolder runTaskHolder(
1131                       *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1132                   AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1133                   t.execute();
1134                 });
1135           }
1136         }
1137         auto* group = task.group();
1138         auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1139         ServiceWeakToken weakToken = token;
1140         auto selectionTask =
1141             make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1142                                   std::exception_ptr const*) mutable {
1143               ServiceRegistry::Operate guard(weakToken.lock());
1144               prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1145                                weakToken.lock(),
1146                                parentContext,
1147                                info,
1148                                T::transition_);
1149             });
1150         prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1151       } else {
1152         WaitingTask* moduleTask =
1153             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1154         auto group = task.group();
1155         if constexpr (T::isEvent_) {
1156           if (hasAcquire()) {
1157             WaitingTaskWithArenaHolder runTaskHolder(
1158                 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1159             moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1160           }
1161         }
1162         prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1163       }
1164     }
1165   }
1166 
1167   template <typename T>
1168   std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1169                                                          typename T::TransitionInfoType const& transitionInfo,
1170                                                          StreamID streamID,
1171                                                          ParentContext const& parentContext,
1172                                                          typename T::Context const* context) noexcept {
1173     std::exception_ptr exceptionPtr;
1174     bool shouldRun = true;
1175     if (iEPtr) {
1176       if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1177         exceptionPtr = iEPtr;
1178         setException<T::isEvent_>(exceptionPtr);
1179         shouldRun = false;
1180       } else {
1181         if (not shouldTryToContinue_) {
1182           setPassed<T::isEvent_>();
1183           shouldRun = false;
1184         }
1185       }
1186     }
1187     if (shouldRun) {
1188       // Caught exception is propagated via WaitingTaskList
1189       CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1190         exceptionPtr = std::current_exception();
1191       }
1192     } else {
1193       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1194     }
1195     waitingTasks_.doneWaiting(exceptionPtr);
1196     return exceptionPtr;
1197   }
1198 
1199   template <typename T>
1200   void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1201                                         typename T::TransitionInfoType const& transitionInfo,
1202                                         ServiceToken const& serviceToken,
1203                                         StreamID streamID,
1204                                         ParentContext const& parentContext,
1205                                         typename T::Context const* context) noexcept {
1206     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1207       return;
1208     }
1209 
1210     //Need to check workStarted_ before adding to waitingTasks_
1211     bool expected = false;
1212     auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1213 
1214     waitingTasks_.add(task);
1215     if (workStarted) {
1216       ServiceWeakToken weakToken = serviceToken;
1217       auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1218         std::exception_ptr exceptionPtr;
1219         // Caught exception is propagated via WaitingTaskList
1220         CMS_SA_ALLOW try {
1221           //Need to make the services available
1222           ServiceRegistry::Operate guard(weakToken.lock());
1223 
1224           this->runModule<T>(info, streamID, parentContext, context);
1225         } catch (...) {
1226           exceptionPtr = std::current_exception();
1227         }
1228         this->waitingTasks_.doneWaiting(exceptionPtr);
1229       };
1230 
1231       if (needsESPrefetching(T::transition_)) {
1232         auto group = task.group();
1233         auto afterPrefetch =
1234             edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1235               if (iExcept) {
1236                 this->waitingTasks_.doneWaiting(*iExcept);
1237               } else {
1238                 if (auto queue = this->serializeRunModule()) {
1239                   queue.push(*group, toDo);
1240                 } else {
1241                   group->run(toDo);
1242                 }
1243               }
1244             });
1245         moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1246         esPrefetchAsync(
1247             WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1248       } else {
1249         auto group = task.group();
1250         if (auto queue = this->serializeRunModule()) {
1251           queue.push(*group, toDo);
1252         } else {
1253           group->run(toDo);
1254         }
1255       }
1256     }
1257   }
1258 
1259   template <typename T>
1260   bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1261                          StreamID streamID,
1262                          ParentContext const& parentContext,
1263                          typename T::Context const* context) {
1264     //unscheduled producers should advance this
1265     //if (T::isEvent_) {
1266     //  ++timesVisited_;
1267     //}
1268     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1269     if constexpr (T::isEvent_) {
1270       timesRun_.fetch_add(1, std::memory_order_relaxed);
1271     }
1272 
1273     bool rc = true;
1274     try {
1275       convertException::wrap([&]() {
1276         rc = workerhelper::CallImpl<T>::call(
1277             this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1278 
1279         if (rc) {
1280           setPassed<T::isEvent_>();
1281         } else {
1282           setFailed<T::isEvent_>();
1283         }
1284       });
1285     } catch (cms::Exception& ex) {
1286       edm::exceptionContext(ex, moduleCallingContext_);
1287       if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1288         assert(not cached_exception_);
1289         setException<T::isEvent_>(std::current_exception());
1290         std::rethrow_exception(cached_exception_);
1291       } else {
1292         rc = setPassed<T::isEvent_>();
1293       }
1294     }
1295 
1296     return rc;
1297   }
1298 
1299   template <typename T>
1300   std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1301                                                StreamID streamID,
1302                                                ParentContext const& parentContext,
1303                                                typename T::Context const* context) noexcept {
1304     timesVisited_.fetch_add(1, std::memory_order_relaxed);
1305     std::exception_ptr prefetchingException;  // null because there was no prefetching to do
1306     return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1307   }
1308 }  // namespace edm
1309 #endif