Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-10-30 02:49:15

0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003 
0004 /*----------------------------------------------------------------------
0005 
0006 Worker: this is a basic scheduling unit - an abstract base class to
0007 something that is really a producer or filter.
0008 
0009 A worker will not actually call through to the module unless it is
0010 in a Ready state.  After a module is actually run, the state will not
0011 be Ready.  The Ready state can only be reestablished by doing a reset().
0012 
0013 Pre/post module signals are posted only in the Ready state.
0014 
0015 Execution statistics are kept here.
0016 
0017 If a module has thrown an exception during execution, that exception
0018 will be rethrown if the worker is entered again and the state is not Ready.
0019 In other words, execution results (status) are cached and reused until
0020 the worker is reset().
0021 
0022 ----------------------------------------------------------------------*/
0023 
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0047 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0048 #include "FWCore/Concurrency/interface/FunctorTask.h"
0049 #include "FWCore/Utilities/interface/Exception.h"
0050 #include "FWCore/Utilities/interface/ConvertException.h"
0051 #include "FWCore/Utilities/interface/BranchType.h"
0052 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0053 #include "FWCore/Utilities/interface/StreamID.h"
0054 #include "FWCore/Utilities/interface/propagate_const.h"
0055 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0056 #include "FWCore/Utilities/interface/ESIndices.h"
0057 #include "FWCore/Utilities/interface/Transition.h"
0058 
0059 #include "FWCore/Framework/interface/Frameworkfwd.h"
0060 
0061 #include <array>
0062 #include <atomic>
0063 #include <cassert>
0064 #include <map>
0065 #include <memory>
0066 #include <sstream>
0067 #include <string>
0068 #include <vector>
0069 #include <exception>
0070 #include <unordered_map>
0071 
0072 namespace edm {
0073   class EventPrincipal;
0074   class EventSetupImpl;
0075   class EarlyDeleteHelper;
0076   class ModuleProcessName;
0077   class ProductResolverIndexHelper;
0078   class ProductResolverIndexAndSkipBit;
0079   class StreamID;
0080   class StreamContext;
0081   class ProductRegistry;
0082   class ThinnedAssociationsHelper;
0083 
0084   namespace workerhelper {
0085     template <typename O>
0086     class CallImpl;
0087   }
0088   namespace eventsetup {
0089     class ESRecordsToProductResolverIndices;
0090   }
0091 
0092   class Worker {
0093   public:
0094     enum State { Ready, Pass, Fail, Exception };
0095     enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096     enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream, kLegacy };
0097     struct TaskQueueAdaptor {
0098       SerialTaskQueueChain* serial_ = nullptr;
0099       LimitedTaskQueue* limited_ = nullptr;
0100 
0101       TaskQueueAdaptor() = default;
0102       TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103       TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104 
0105       operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106 
0107       template <class F>
0108       void push(oneapi::tbb::task_group& iG, F&& iF) {
0109         if (serial_) {
0110           serial_->push(iG, iF);
0111         } else {
0112           limited_->push(iG, iF);
0113         }
0114       }
0115     };
0116 
0117     Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118     virtual ~Worker();
0119 
0120     Worker(Worker const&) = delete;             // Disallow copying and moving
0121     Worker& operator=(Worker const&) = delete;  // Disallow copying and moving
0122 
0123     void clearModule() {
0124       moduleValid_ = false;
0125       doClearModule();
0126     }
0127 
0128     virtual bool wantsProcessBlocks() const = 0;
0129     virtual bool wantsInputProcessBlocks() const = 0;
0130     virtual bool wantsGlobalRuns() const = 0;
0131     virtual bool wantsGlobalLuminosityBlocks() const = 0;
0132     virtual bool wantsStreamRuns() const = 0;
0133     virtual bool wantsStreamLuminosityBlocks() const = 0;
0134 
0135     virtual SerialTaskQueue* globalRunsQueue() = 0;
0136     virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0137 
0138     void prePrefetchSelectionAsync(
0139         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
0140 
0141     void prePrefetchSelectionAsync(
0142         oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
0143       assert(false);
0144     }
0145 
0146     template <typename T>
0147     void doWorkAsync(WaitingTaskHolder,
0148                      typename T::TransitionInfoType const&,
0149                      ServiceToken const&,
0150                      StreamID,
0151                      ParentContext const&,
0152                      typename T::Context const*);
0153 
0154     template <typename T>
0155     void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0156                                   typename T::TransitionInfoType const&,
0157                                   ServiceToken const&,
0158                                   StreamID,
0159                                   ParentContext const&,
0160                                   typename T::Context const*);
0161 
0162     template <typename T>
0163     std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0164                                          StreamID,
0165                                          ParentContext const&,
0166                                          typename T::Context const*);
0167 
0168     virtual size_t transformIndex(edm::BranchDescription const&) const = 0;
0169     void doTransformAsync(WaitingTaskHolder,
0170                           size_t iTransformIndex,
0171                           EventPrincipal const&,
0172                           ServiceToken const&,
0173                           StreamID,
0174                           ModuleCallingContext const&,
0175                           StreamContext const*);
0176 
0177     void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0178     void skipOnPath(EventPrincipal const& iEvent);
0179     void beginJob();
0180     void endJob();
0181     void beginStream(StreamID id, StreamContext& streamContext);
0182     void endStream(StreamID id, StreamContext& streamContext);
0183     void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0184     void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0185     void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0186     void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0187 
0188     void reset() {
0189       cached_exception_ = std::exception_ptr();
0190       state_ = Ready;
0191       waitingTasks_.reset();
0192       workStarted_ = false;
0193       numberOfPathsLeftToRun_ = numberOfPathsOn_;
0194     }
0195 
0196     void postDoEvent(EventPrincipal const&);
0197 
0198     ModuleDescription const* description() const {
0199       if (moduleValid_) {
0200         return moduleCallingContext_.moduleDescription();
0201       }
0202       return nullptr;
0203     }
0204     ///The signals are required to live longer than the last call to 'doWork'
0205     /// this was done to improve performance based on profiling
0206     void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0207 
0208     void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0209 
0210     //Used to make EDGetToken work
0211     virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0212     virtual void updateLookup(eventsetup::ESRecordsToProductResolverIndices const&) = 0;
0213     virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0214     virtual void resolvePutIndicies(
0215         BranchType iBranchType,
0216         std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0217             iIndicies) = 0;
0218 
0219     virtual void modulesWhoseProductsAreConsumed(
0220         std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0221         std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0222         ProductRegistry const& preg,
0223         std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0224 
0225     virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0226 
0227     virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0228 
0229     virtual Types moduleType() const = 0;
0230     virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0231 
0232     void clearCounters() {
0233       timesRun_.store(0, std::memory_order_release);
0234       timesVisited_.store(0, std::memory_order_release);
0235       timesPassed_.store(0, std::memory_order_release);
0236       timesFailed_.store(0, std::memory_order_release);
0237       timesExcept_.store(0, std::memory_order_release);
0238     }
0239 
0240     void addedToPath() { ++numberOfPathsOn_; }
0241     //NOTE: calling state() is done to force synchronization across threads
0242     int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
0243     int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
0244     int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
0245     int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
0246     int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
0247     State state() const { return state_; }
0248 
0249     int timesPass() const { return timesPassed(); }  // for backward compatibility only - to be removed soon
0250 
0251     virtual bool hasAccumulator() const = 0;
0252 
0253     // Used in PuttableProductResolver
0254     edm::WaitingTaskList& waitingTaskList() { return waitingTasks_; }
0255 
0256   protected:
0257     template <typename O>
0258     friend class workerhelper::CallImpl;
0259 
0260     virtual void doClearModule() = 0;
0261 
0262     virtual std::string workerType() const = 0;
0263     virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0264 
0265     virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0266     virtual bool implNeedToRunSelection() const = 0;
0267 
0268     virtual void implDoAcquire(EventTransitionInfo const&,
0269                                ModuleCallingContext const*,
0270                                WaitingTaskWithArenaHolder&) = 0;
0271 
0272     virtual void implDoTransformAsync(WaitingTaskHolder,
0273                                       size_t iTransformIndex,
0274                                       EventPrincipal const&,
0275                                       ParentContext const&,
0276                                       ServiceWeakToken const&) = 0;
0277     virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const = 0;
0278 
0279     virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0280     virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0281     virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0282     virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0283     virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0284     virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0285     virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0286     virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0287     virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0288     virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0289     virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0290     virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0291     virtual void implBeginJob() = 0;
0292     virtual void implEndJob() = 0;
0293     virtual void implBeginStream(StreamID) = 0;
0294     virtual void implEndStream(StreamID) = 0;
0295 
0296     void resetModuleDescription(ModuleDescription const*);
0297 
0298     ActivityRegistry* activityRegistry() { return actReg_.get(); }
0299 
0300   private:
0301     template <typename T>
0302     bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0303 
0304     virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0305     virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0306 
0307     virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0308 
0309     virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0310     virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0311 
0312     virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0313                                               ModuleCallingContext const& moduleCallingContext,
0314                                               Principal const& iPrincipal) const = 0;
0315 
0316     virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0317     virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0318     virtual void implRespondToCloseOutputFile() = 0;
0319 
0320     virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0321 
0322     virtual TaskQueueAdaptor serializeRunModule() = 0;
0323 
0324     bool shouldRethrowException(std::exception_ptr iPtr,
0325                                 ParentContext const& parentContext,
0326                                 bool isEvent,
0327                                 bool isTryToContinue) const;
0328     void checkForShouldTryToContinue(ModuleDescription const&);
0329 
0330     template <bool IS_EVENT>
0331     bool setPassed() {
0332       if (IS_EVENT) {
0333         timesPassed_.fetch_add(1, std::memory_order_relaxed);
0334       }
0335       state_ = Pass;
0336       return true;
0337     }
0338 
0339     template <bool IS_EVENT>
0340     bool setFailed() {
0341       if (IS_EVENT) {
0342         timesFailed_.fetch_add(1, std::memory_order_relaxed);
0343       }
0344       state_ = Fail;
0345       return false;
0346     }
0347 
0348     template <bool IS_EVENT>
0349     std::exception_ptr setException(std::exception_ptr iException) {
0350       if (IS_EVENT) {
0351         timesExcept_.fetch_add(1, std::memory_order_relaxed);
0352       }
0353       cached_exception_ = iException;  // propagate_const<T> has no reset() function
0354       state_ = Exception;
0355       return cached_exception_;
0356     }
0357 
0358     template <typename T>
0359     void prefetchAsync(WaitingTaskHolder,
0360                        ServiceToken const&,
0361                        ParentContext const&,
0362                        typename T::TransitionInfoType const&,
0363                        Transition);
0364 
0365     void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&);
0366     void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
0367 
0368     bool needsESPrefetching(Transition iTrans) const noexcept {
0369       return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0370     }
0371 
0372     void emitPostModuleEventPrefetchingSignal() {
0373       actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0374     }
0375 
0376     void emitPostModuleStreamPrefetchingSignal() {
0377       actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0378                                                        moduleCallingContext_);
0379     }
0380 
0381     void emitPostModuleGlobalPrefetchingSignal() {
0382       actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0383                                                        moduleCallingContext_);
0384     }
0385 
0386     virtual bool hasAcquire() const = 0;
0387 
0388     template <typename T>
0389     std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0390                                                    typename T::TransitionInfoType const&,
0391                                                    StreamID,
0392                                                    ParentContext const&,
0393                                                    typename T::Context const*);
0394 
0395     void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0396 
0397     void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0398                                       EventTransitionInfo const&,
0399                                       ParentContext const&,
0400                                       WaitingTaskWithArenaHolder);
0401 
0402     std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext);
0403 
0404     template <typename T>
0405     class RunModuleTask : public WaitingTask {
0406     public:
0407       RunModuleTask(Worker* worker,
0408                     typename T::TransitionInfoType const& transitionInfo,
0409                     ServiceToken const& token,
0410                     StreamID streamID,
0411                     ParentContext const& parentContext,
0412                     typename T::Context const* context,
0413                     oneapi::tbb::task_group* iGroup)
0414           : m_worker(worker),
0415             m_transitionInfo(transitionInfo),
0416             m_streamID(streamID),
0417             m_parentContext(parentContext),
0418             m_context(context),
0419             m_serviceToken(token),
0420             m_group(iGroup) {}
0421 
0422       struct EnableQueueGuard {
0423         SerialTaskQueue* queue_;
0424         EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0425         EnableQueueGuard(EnableQueueGuard const&) = delete;
0426         EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0427         EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0428         EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0429         ~EnableQueueGuard() {
0430           if (queue_) {
0431             queue_->resume();
0432           }
0433         }
0434       };
0435 
0436       void execute() final {
0437         //Need to make the services available early so other services can see them
0438         ServiceRegistry::Operate guard(m_serviceToken.lock());
0439 
0440         //incase the emit causes an exception, we need a memory location
0441         // to hold the exception_ptr
0442         std::exception_ptr temp_excptr;
0443         auto excptr = exceptionPtr();
0444         if constexpr (T::isEvent_) {
0445           if (!m_worker->hasAcquire()) {
0446             // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskList
0447             CMS_SA_ALLOW try {
0448               //pre was called in prefetchAsync
0449               m_worker->emitPostModuleEventPrefetchingSignal();
0450             } catch (...) {
0451               temp_excptr = std::current_exception();
0452               if (not excptr) {
0453                 excptr = temp_excptr;
0454               }
0455             }
0456           }
0457         } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0458           m_worker->emitPostModuleStreamPrefetchingSignal();
0459         } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0460           m_worker->emitPostModuleGlobalPrefetchingSignal();
0461         }
0462 
0463         if (not excptr) {
0464           if (auto queue = m_worker->serializeRunModule()) {
0465             auto f = [worker = m_worker,
0466                       info = m_transitionInfo,
0467                       streamID = m_streamID,
0468                       parentContext = m_parentContext,
0469                       sContext = m_context,
0470                       serviceToken = m_serviceToken]() {
0471               //Need to make the services available
0472               ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0473 
0474               //If needed, we pause the queue in begin transition and resume it
0475               // at the end transition. This can guarantee that the module
0476               // only processes one run or lumi at a time
0477               EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0478               std::exception_ptr ptr;
0479               worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0480             };
0481             //keep another global transition from running if necessary
0482             auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0483             if (gQueue) {
0484               gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0485                 gQueue->pause();
0486                 queue.push(*group, std::move(f));
0487               });
0488             } else {
0489               queue.push(*m_group, std::move(f));
0490             }
0491             return;
0492           }
0493         }
0494 
0495         m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0496       }
0497 
0498     private:
0499       Worker* m_worker;
0500       typename T::TransitionInfoType m_transitionInfo;
0501       StreamID m_streamID;
0502       ParentContext const m_parentContext;
0503       typename T::Context const* m_context;
0504       ServiceWeakToken m_serviceToken;
0505       oneapi::tbb::task_group* m_group;
0506     };
0507 
0508     // AcquireTask is only used for the Event case, but we define
0509     // it as a template so all cases will compile.
0510     // DUMMY exists to work around the C++ Standard prohibition on
0511     // fully specializing templates nested in other classes.
0512     template <typename T, typename DUMMY = void>
0513     class AcquireTask : public WaitingTask {
0514     public:
0515       AcquireTask(Worker*,
0516                   typename T::TransitionInfoType const&,
0517                   ServiceToken const&,
0518                   ParentContext const&,
0519                   WaitingTaskWithArenaHolder) {}
0520       void execute() final {}
0521     };
0522 
0523     template <typename DUMMY>
0524     class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0525     public:
0526       AcquireTask(Worker* worker,
0527                   EventTransitionInfo const& eventTransitionInfo,
0528                   ServiceToken const& token,
0529                   ParentContext const& parentContext,
0530                   WaitingTaskWithArenaHolder holder)
0531           : m_worker(worker),
0532             m_eventTransitionInfo(eventTransitionInfo),
0533             m_parentContext(parentContext),
0534             m_holder(std::move(holder)),
0535             m_serviceToken(token) {}
0536 
0537       void execute() final {
0538         //Need to make the services available early so other services can see them
0539         ServiceRegistry::Operate guard(m_serviceToken.lock());
0540 
0541         //incase the emit causes an exception, we need a memory location
0542         // to hold the exception_ptr
0543         std::exception_ptr temp_excptr;
0544         auto excptr = exceptionPtr();
0545         // Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
0546         CMS_SA_ALLOW try {
0547           //pre was called in prefetchAsync
0548           m_worker->emitPostModuleEventPrefetchingSignal();
0549         } catch (...) {
0550           temp_excptr = std::current_exception();
0551           if (not excptr) {
0552             excptr = temp_excptr;
0553           }
0554         }
0555 
0556         if (not excptr) {
0557           if (auto queue = m_worker->serializeRunModule()) {
0558             queue.push(*m_holder.group(),
0559                        [worker = m_worker,
0560                         info = m_eventTransitionInfo,
0561                         parentContext = m_parentContext,
0562                         serviceToken = m_serviceToken,
0563                         holder = m_holder]() {
0564                          //Need to make the services available
0565                          ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0566 
0567                          std::exception_ptr ptr;
0568                          worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0569                        });
0570             return;
0571           }
0572         }
0573 
0574         m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0575       }
0576 
0577     private:
0578       Worker* m_worker;
0579       EventTransitionInfo m_eventTransitionInfo;
0580       ParentContext const m_parentContext;
0581       WaitingTaskWithArenaHolder m_holder;
0582       ServiceWeakToken m_serviceToken;
0583     };
0584 
0585     // This class does nothing unless there is an exception originating
0586     // in an "External Worker". In that case, it handles converting the
0587     // exception to a CMS exception and adding context to the exception.
0588     class HandleExternalWorkExceptionTask : public WaitingTask {
0589     public:
0590       HandleExternalWorkExceptionTask(Worker* worker,
0591                                       oneapi::tbb::task_group* group,
0592                                       WaitingTask* runModuleTask,
0593                                       ParentContext const& parentContext);
0594 
0595       void execute() final;
0596 
0597     private:
0598       Worker* m_worker;
0599       WaitingTask* m_runModuleTask;
0600       oneapi::tbb::task_group* m_group;
0601       ParentContext const m_parentContext;
0602     };
0603 
0604     std::atomic<int> timesRun_;
0605     std::atomic<int> timesVisited_;
0606     std::atomic<int> timesPassed_;
0607     std::atomic<int> timesFailed_;
0608     std::atomic<int> timesExcept_;
0609     std::atomic<State> state_;
0610     int numberOfPathsOn_;
0611     std::atomic<int> numberOfPathsLeftToRun_;
0612 
0613     ModuleCallingContext moduleCallingContext_;
0614 
0615     ExceptionToActionTable const* actions_;                         // memory assumed to be managed elsewhere
0616     CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;  // if state is 'exception'
0617 
0618     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0619 
0620     edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0621 
0622     edm::WaitingTaskList waitingTasks_;
0623     std::atomic<bool> workStarted_;
0624     bool ranAcquireWithoutException_;
0625     bool moduleValid_ = true;
0626     bool shouldTryToContinue_ = false;
0627   };
0628 
0629   namespace {
0630     template <typename T>
0631     class ModuleSignalSentry {
0632     public:
0633       ModuleSignalSentry(ActivityRegistry* a,
0634                          typename T::Context const* context,
0635                          ModuleCallingContext const* moduleCallingContext)
0636           : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
0637         if (a_)
0638           T::preModuleSignal(a_, context, moduleCallingContext_);
0639       }
0640 
0641       ~ModuleSignalSentry() {
0642         if (a_)
0643           T::postModuleSignal(a_, context_, moduleCallingContext_);
0644       }
0645 
0646     private:
0647       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0648       typename T::Context const* context_;
0649       ModuleCallingContext const* moduleCallingContext_;
0650     };
0651 
0652   }  // namespace
0653 
0654   namespace workerhelper {
0655     template <>
0656     class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0657     public:
0658       typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0659       static bool call(Worker* iWorker,
0660                        StreamID,
0661                        EventTransitionInfo const& info,
0662                        ActivityRegistry* /* actReg */,
0663                        ModuleCallingContext const* mcc,
0664                        Arg::Context const* /* context*/) {
0665         //Signal sentry is handled by the module
0666         return iWorker->implDo(info, mcc);
0667       }
0668       static void esPrefetchAsync(Worker* worker,
0669                                   WaitingTaskHolder waitingTask,
0670                                   ServiceToken const& token,
0671                                   EventTransitionInfo const& info,
0672                                   Transition transition) {
0673         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0674       }
0675       static bool wantsTransition(Worker const* iWorker) { return true; }
0676       static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
0677 
0678       static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
0679       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0680     };
0681 
0682     template <>
0683     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0684     public:
0685       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0686       static bool call(Worker* iWorker,
0687                        StreamID,
0688                        RunTransitionInfo const& info,
0689                        ActivityRegistry* actReg,
0690                        ModuleCallingContext const* mcc,
0691                        Arg::Context const* context) {
0692         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0693         return iWorker->implDoBegin(info, mcc);
0694       }
0695       static void esPrefetchAsync(Worker* worker,
0696                                   WaitingTaskHolder waitingTask,
0697                                   ServiceToken const& token,
0698                                   RunTransitionInfo const& info,
0699                                   Transition transition) {
0700         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0701       }
0702       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0703       static bool needToRunSelection(Worker const* iWorker) { return false; }
0704       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0705       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0706     };
0707     template <>
0708     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0709     public:
0710       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0711       static bool call(Worker* iWorker,
0712                        StreamID id,
0713                        RunTransitionInfo const& info,
0714                        ActivityRegistry* actReg,
0715                        ModuleCallingContext const* mcc,
0716                        Arg::Context const* context) {
0717         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0718         return iWorker->implDoStreamBegin(id, info, mcc);
0719       }
0720       static void esPrefetchAsync(Worker* worker,
0721                                   WaitingTaskHolder waitingTask,
0722                                   ServiceToken const& token,
0723                                   RunTransitionInfo const& info,
0724                                   Transition transition) {
0725         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0726       }
0727       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0728       static bool needToRunSelection(Worker const* iWorker) { return false; }
0729       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0730       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0731     };
0732     template <>
0733     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0734     public:
0735       typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0736       static bool call(Worker* iWorker,
0737                        StreamID,
0738                        RunTransitionInfo const& info,
0739                        ActivityRegistry* actReg,
0740                        ModuleCallingContext const* mcc,
0741                        Arg::Context const* context) {
0742         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0743         return iWorker->implDoEnd(info, mcc);
0744       }
0745       static void esPrefetchAsync(Worker* worker,
0746                                   WaitingTaskHolder waitingTask,
0747                                   ServiceToken const& token,
0748                                   RunTransitionInfo const& info,
0749                                   Transition transition) {
0750         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0751       }
0752       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0753       static bool needToRunSelection(Worker const* iWorker) { return false; }
0754       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0755       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0756     };
0757     template <>
0758     class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0759     public:
0760       typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0761       static bool call(Worker* iWorker,
0762                        StreamID id,
0763                        RunTransitionInfo const& info,
0764                        ActivityRegistry* actReg,
0765                        ModuleCallingContext const* mcc,
0766                        Arg::Context const* context) {
0767         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0768         return iWorker->implDoStreamEnd(id, info, mcc);
0769       }
0770       static void esPrefetchAsync(Worker* worker,
0771                                   WaitingTaskHolder waitingTask,
0772                                   ServiceToken const& token,
0773                                   RunTransitionInfo const& info,
0774                                   Transition transition) {
0775         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0776       }
0777       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0778       static bool needToRunSelection(Worker const* iWorker) { return false; }
0779       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0780       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0781     };
0782 
0783     template <>
0784     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0785     public:
0786       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0787       static bool call(Worker* iWorker,
0788                        StreamID,
0789                        LumiTransitionInfo const& info,
0790                        ActivityRegistry* actReg,
0791                        ModuleCallingContext const* mcc,
0792                        Arg::Context const* context) {
0793         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0794         return iWorker->implDoBegin(info, mcc);
0795       }
0796       static void esPrefetchAsync(Worker* worker,
0797                                   WaitingTaskHolder waitingTask,
0798                                   ServiceToken const& token,
0799                                   LumiTransitionInfo const& info,
0800                                   Transition transition) {
0801         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0802       }
0803       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0804       static bool needToRunSelection(Worker const* iWorker) { return false; }
0805       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0806       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0807     };
0808     template <>
0809     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0810     public:
0811       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0812       static bool call(Worker* iWorker,
0813                        StreamID id,
0814                        LumiTransitionInfo const& info,
0815                        ActivityRegistry* actReg,
0816                        ModuleCallingContext const* mcc,
0817                        Arg::Context const* context) {
0818         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0819         return iWorker->implDoStreamBegin(id, info, mcc);
0820       }
0821       static void esPrefetchAsync(Worker* worker,
0822                                   WaitingTaskHolder waitingTask,
0823                                   ServiceToken const& token,
0824                                   LumiTransitionInfo const& info,
0825                                   Transition transition) {
0826         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0827       }
0828       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0829       static bool needToRunSelection(Worker const* iWorker) { return false; }
0830       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0831       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0832     };
0833 
0834     template <>
0835     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0836     public:
0837       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0838       static bool call(Worker* iWorker,
0839                        StreamID,
0840                        LumiTransitionInfo const& info,
0841                        ActivityRegistry* actReg,
0842                        ModuleCallingContext const* mcc,
0843                        Arg::Context const* context) {
0844         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0845         return iWorker->implDoEnd(info, mcc);
0846       }
0847       static void esPrefetchAsync(Worker* worker,
0848                                   WaitingTaskHolder waitingTask,
0849                                   ServiceToken const& token,
0850                                   LumiTransitionInfo const& info,
0851                                   Transition transition) {
0852         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0853       }
0854       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0855       static bool needToRunSelection(Worker const* iWorker) { return false; }
0856       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0857       static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0858     };
0859     template <>
0860     class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0861     public:
0862       using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0863       static bool call(Worker* iWorker,
0864                        StreamID id,
0865                        LumiTransitionInfo const& info,
0866                        ActivityRegistry* actReg,
0867                        ModuleCallingContext const* mcc,
0868                        Arg::Context const* context) {
0869         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0870         return iWorker->implDoStreamEnd(id, info, mcc);
0871       }
0872       static void esPrefetchAsync(Worker* worker,
0873                                   WaitingTaskHolder waitingTask,
0874                                   ServiceToken const& token,
0875                                   LumiTransitionInfo const& info,
0876                                   Transition transition) {
0877         worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0878       }
0879       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0880       static bool needToRunSelection(Worker const* iWorker) { return false; }
0881       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0882       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0883     };
0884     template <>
0885     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0886     public:
0887       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0888       static bool call(Worker* iWorker,
0889                        StreamID,
0890                        ProcessBlockTransitionInfo const& info,
0891                        ActivityRegistry* actReg,
0892                        ModuleCallingContext const* mcc,
0893                        Arg::Context const* context) {
0894         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0895         return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0896       }
0897       static void esPrefetchAsync(
0898           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0899       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0900       static bool needToRunSelection(Worker const* iWorker) { return false; }
0901       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0902       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0903     };
0904     template <>
0905     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0906     public:
0907       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0908       static bool call(Worker* iWorker,
0909                        StreamID,
0910                        ProcessBlockTransitionInfo const& info,
0911                        ActivityRegistry* actReg,
0912                        ModuleCallingContext const* mcc,
0913                        Arg::Context const* context) {
0914         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0915         return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0916       }
0917       static void esPrefetchAsync(
0918           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0919       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
0920       static bool needToRunSelection(Worker const* iWorker) { return false; }
0921       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0922       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0923     };
0924     template <>
0925     class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
0926     public:
0927       using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0928       static bool call(Worker* iWorker,
0929                        StreamID,
0930                        ProcessBlockTransitionInfo const& info,
0931                        ActivityRegistry* actReg,
0932                        ModuleCallingContext const* mcc,
0933                        Arg::Context const* context) {
0934         ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0935         return iWorker->implDoEndProcessBlock(info.principal(), mcc);
0936       }
0937       static void esPrefetchAsync(
0938           Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0939       static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0940       static bool needToRunSelection(Worker const* iWorker) { return false; }
0941       static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0942       static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0943     };
0944   }  // namespace workerhelper
0945 
0946   template <typename T>
0947   void Worker::prefetchAsync(WaitingTaskHolder iTask,
0948                              ServiceToken const& token,
0949                              ParentContext const& parentContext,
0950                              typename T::TransitionInfoType const& transitionInfo,
0951                              Transition iTransition) {
0952     Principal const& principal = transitionInfo.principal();
0953 
0954     moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0955 
0956     if constexpr (T::isEvent_) {
0957       actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0958     } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0959       actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0960     } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0961       actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
0962     }
0963 
0964     workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
0965     edPrefetchAsync(iTask, token, principal);
0966 
0967     if (principal.branchType() == InEvent) {
0968       preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
0969     }
0970   }
0971 
0972   template <typename T>
0973   void Worker::doWorkAsync(WaitingTaskHolder task,
0974                            typename T::TransitionInfoType const& transitionInfo,
0975                            ServiceToken const& token,
0976                            StreamID streamID,
0977                            ParentContext const& parentContext,
0978                            typename T::Context const* context) {
0979     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
0980       return;
0981     }
0982 
0983     //Need to check workStarted_ before adding to waitingTasks_
0984     bool expected = false;
0985     bool workStarted = workStarted_.compare_exchange_strong(expected, true);
0986 
0987     waitingTasks_.add(task);
0988     if constexpr (T::isEvent_) {
0989       timesVisited_.fetch_add(1, std::memory_order_relaxed);
0990     }
0991 
0992     if (workStarted) {
0993       moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0994 
0995       //if have TriggerResults based selection we want to reject the event before doing prefetching
0996       if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
0997         //We need to run the selection in a different task so that
0998         // we can prefetch the data needed for the selection
0999         WaitingTask* moduleTask =
1000             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1001 
1002         //make sure the task is either run or destroyed
1003         struct DestroyTask {
1004           DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1005 
1006           ~DestroyTask() {
1007             auto p = m_task.exchange(nullptr);
1008             if (p) {
1009               TaskSentry s{p};
1010             }
1011           }
1012 
1013           edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1014 
1015         private:
1016           std::atomic<edm::WaitingTask*> m_task;
1017         };
1018         if constexpr (T::isEvent_) {
1019           if (hasAcquire()) {
1020             auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1021             ServiceWeakToken weakToken = token;
1022             auto* group = task.group();
1023             moduleTask = make_waiting_task(
1024                 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1025                   WaitingTaskWithArenaHolder runTaskHolder(
1026                       *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1027                   AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1028                   t.execute();
1029                 });
1030           }
1031         }
1032         auto* group = task.group();
1033         auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1034         ServiceWeakToken weakToken = token;
1035         auto selectionTask =
1036             make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1037                                   std::exception_ptr const*) mutable {
1038               ServiceRegistry::Operate guard(weakToken.lock());
1039               prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1040                                weakToken.lock(),
1041                                parentContext,
1042                                info,
1043                                T::transition_);
1044             });
1045         prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1046       } else {
1047         WaitingTask* moduleTask =
1048             new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1049         auto group = task.group();
1050         if constexpr (T::isEvent_) {
1051           if (hasAcquire()) {
1052             WaitingTaskWithArenaHolder runTaskHolder(
1053                 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1054             moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1055           }
1056         }
1057         prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1058       }
1059     }
1060   }
1061 
1062   template <typename T>
1063   std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1064                                                          typename T::TransitionInfoType const& transitionInfo,
1065                                                          StreamID streamID,
1066                                                          ParentContext const& parentContext,
1067                                                          typename T::Context const* context) {
1068     std::exception_ptr exceptionPtr;
1069     bool shouldRun = true;
1070     if (iEPtr) {
1071       if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1072         exceptionPtr = iEPtr;
1073         setException<T::isEvent_>(exceptionPtr);
1074         shouldRun = false;
1075       } else {
1076         if (not shouldTryToContinue_) {
1077           setPassed<T::isEvent_>();
1078           shouldRun = false;
1079         }
1080       }
1081     }
1082     if (shouldRun) {
1083       // Caught exception is propagated via WaitingTaskList
1084       CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1085         exceptionPtr = std::current_exception();
1086       }
1087     } else {
1088       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1089     }
1090     waitingTasks_.doneWaiting(exceptionPtr);
1091     return exceptionPtr;
1092   }
1093 
1094   template <typename T>
1095   void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1096                                         typename T::TransitionInfoType const& transitionInfo,
1097                                         ServiceToken const& serviceToken,
1098                                         StreamID streamID,
1099                                         ParentContext const& parentContext,
1100                                         typename T::Context const* context) {
1101     if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1102       return;
1103     }
1104 
1105     //Need to check workStarted_ before adding to waitingTasks_
1106     bool expected = false;
1107     auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1108 
1109     waitingTasks_.add(task);
1110     if (workStarted) {
1111       ServiceWeakToken weakToken = serviceToken;
1112       auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1113         std::exception_ptr exceptionPtr;
1114         // Caught exception is propagated via WaitingTaskList
1115         CMS_SA_ALLOW try {
1116           //Need to make the services available
1117           ServiceRegistry::Operate guard(weakToken.lock());
1118 
1119           this->runModule<T>(info, streamID, parentContext, context);
1120         } catch (...) {
1121           exceptionPtr = std::current_exception();
1122         }
1123         this->waitingTasks_.doneWaiting(exceptionPtr);
1124       };
1125 
1126       if (needsESPrefetching(T::transition_)) {
1127         auto group = task.group();
1128         auto afterPrefetch =
1129             edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1130               if (iExcept) {
1131                 this->waitingTasks_.doneWaiting(*iExcept);
1132               } else {
1133                 if (auto queue = this->serializeRunModule()) {
1134                   queue.push(*group, toDo);
1135                 } else {
1136                   group->run(toDo);
1137                 }
1138               }
1139             });
1140         moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1141         esPrefetchAsync(
1142             WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1143       } else {
1144         auto group = task.group();
1145         if (auto queue = this->serializeRunModule()) {
1146           queue.push(*group, toDo);
1147         } else {
1148           group->run(toDo);
1149         }
1150       }
1151     }
1152   }
1153 
1154   template <typename T>
1155   bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1156                          StreamID streamID,
1157                          ParentContext const& parentContext,
1158                          typename T::Context const* context) {
1159     //unscheduled producers should advance this
1160     //if (T::isEvent_) {
1161     //  ++timesVisited_;
1162     //}
1163     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1164     if constexpr (T::isEvent_) {
1165       timesRun_.fetch_add(1, std::memory_order_relaxed);
1166     }
1167 
1168     bool rc = true;
1169     try {
1170       convertException::wrap([&]() {
1171         rc = workerhelper::CallImpl<T>::call(
1172             this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1173 
1174         if (rc) {
1175           setPassed<T::isEvent_>();
1176         } else {
1177           setFailed<T::isEvent_>();
1178         }
1179       });
1180     } catch (cms::Exception& ex) {
1181       edm::exceptionContext(ex, moduleCallingContext_);
1182       if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1183         assert(not cached_exception_);
1184         setException<T::isEvent_>(std::current_exception());
1185         std::rethrow_exception(cached_exception_);
1186       } else {
1187         rc = setPassed<T::isEvent_>();
1188       }
1189     }
1190 
1191     return rc;
1192   }
1193 
1194   template <typename T>
1195   std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1196                                                StreamID streamID,
1197                                                ParentContext const& parentContext,
1198                                                typename T::Context const* context) {
1199     timesVisited_.fetch_add(1, std::memory_order_relaxed);
1200     std::exception_ptr prefetchingException;  // null because there was no prefetching to do
1201     return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1202   }
1203 }  // namespace edm
1204 #endif