File indexing completed on 2022-10-18 04:13:16
0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0047 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0048 #include "FWCore/Concurrency/interface/FunctorTask.h"
0049 #include "FWCore/Utilities/interface/Exception.h"
0050 #include "FWCore/Utilities/interface/ConvertException.h"
0051 #include "FWCore/Utilities/interface/BranchType.h"
0052 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0053 #include "FWCore/Utilities/interface/StreamID.h"
0054 #include "FWCore/Utilities/interface/propagate_const.h"
0055 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0056 #include "FWCore/Utilities/interface/ESIndices.h"
0057 #include "FWCore/Utilities/interface/Transition.h"
0058
0059 #include "FWCore/Framework/interface/Frameworkfwd.h"
0060
0061 #include <array>
0062 #include <atomic>
0063 #include <cassert>
0064 #include <map>
0065 #include <memory>
0066 #include <sstream>
0067 #include <string>
0068 #include <vector>
0069 #include <exception>
0070 #include <unordered_map>
0071
0072 namespace edm {
0073 class EventPrincipal;
0074 class EventSetupImpl;
0075 class EarlyDeleteHelper;
0076 class ModuleProcessName;
0077 class ProductResolverIndexHelper;
0078 class ProductResolverIndexAndSkipBit;
0079 class StreamID;
0080 class StreamContext;
0081 class ProductRegistry;
0082 class ThinnedAssociationsHelper;
0083
0084 namespace workerhelper {
0085 template <typename O>
0086 class CallImpl;
0087 }
0088 namespace eventsetup {
0089 class ESRecordsToProxyIndices;
0090 }
0091
0092 class Worker {
0093 public:
0094 enum State { Ready, Pass, Fail, Exception };
0095 enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096 enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream, kLegacy };
0097 struct TaskQueueAdaptor {
0098 SerialTaskQueueChain* serial_ = nullptr;
0099 LimitedTaskQueue* limited_ = nullptr;
0100
0101 TaskQueueAdaptor() = default;
0102 TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103 TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104
0105 operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106
0107 template <class F>
0108 void push(oneapi::tbb::task_group& iG, F&& iF) {
0109 if (serial_) {
0110 serial_->push(iG, iF);
0111 } else {
0112 limited_->push(iG, iF);
0113 }
0114 }
0115 };
0116
0117 Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118 virtual ~Worker();
0119
0120 Worker(Worker const&) = delete;
0121 Worker& operator=(Worker const&) = delete;
0122
0123 void clearModule() {
0124 moduleValid_ = false;
0125 doClearModule();
0126 }
0127
0128 virtual bool wantsProcessBlocks() const = 0;
0129 virtual bool wantsInputProcessBlocks() const = 0;
0130 virtual bool wantsGlobalRuns() const = 0;
0131 virtual bool wantsGlobalLuminosityBlocks() const = 0;
0132 virtual bool wantsStreamRuns() const = 0;
0133 virtual bool wantsStreamLuminosityBlocks() const = 0;
0134
0135 virtual SerialTaskQueue* globalRunsQueue() = 0;
0136 virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0137
0138 void prePrefetchSelectionAsync(
0139 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
0140
0141 void prePrefetchSelectionAsync(
0142 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
0143 assert(false);
0144 }
0145
0146 template <typename T>
0147 void doWorkAsync(WaitingTaskHolder,
0148 typename T::TransitionInfoType const&,
0149 ServiceToken const&,
0150 StreamID,
0151 ParentContext const&,
0152 typename T::Context const*);
0153
0154 template <typename T>
0155 void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0156 typename T::TransitionInfoType const&,
0157 ServiceToken const&,
0158 StreamID,
0159 ParentContext const&,
0160 typename T::Context const*);
0161
0162 template <typename T>
0163 std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0164 StreamID,
0165 ParentContext const&,
0166 typename T::Context const*);
0167
0168 virtual size_t transformIndex(edm::BranchDescription const&) const = 0;
0169 void doTransformAsync(WaitingTaskHolder,
0170 size_t iTransformIndex,
0171 EventPrincipal const&,
0172 ServiceToken const&,
0173 StreamID,
0174 ModuleCallingContext const&,
0175 StreamContext const*);
0176
0177 void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0178 void skipOnPath(EventPrincipal const& iEvent);
0179 void beginJob();
0180 void endJob();
0181 void beginStream(StreamID id, StreamContext& streamContext);
0182 void endStream(StreamID id, StreamContext& streamContext);
0183 void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0184 void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0185 void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0186 void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0187
0188 void reset() {
0189 cached_exception_ = std::exception_ptr();
0190 state_ = Ready;
0191 waitingTasks_.reset();
0192 workStarted_ = false;
0193 numberOfPathsLeftToRun_ = numberOfPathsOn_;
0194 }
0195
0196 void postDoEvent(EventPrincipal const&);
0197
0198 ModuleDescription const* description() const {
0199 if (moduleValid_) {
0200 return moduleCallingContext_.moduleDescription();
0201 }
0202 return nullptr;
0203 }
0204
0205
0206 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0207
0208 void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0209
0210
0211 virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0212 virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
0213 virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0214 virtual void resolvePutIndicies(
0215 BranchType iBranchType,
0216 std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0217 iIndicies) = 0;
0218
0219 virtual void modulesWhoseProductsAreConsumed(
0220 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0221 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0222 ProductRegistry const& preg,
0223 std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0224
0225 virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0226
0227 virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0228
0229 virtual Types moduleType() const = 0;
0230 virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0231
0232 void clearCounters() {
0233 timesRun_.store(0, std::memory_order_release);
0234 timesVisited_.store(0, std::memory_order_release);
0235 timesPassed_.store(0, std::memory_order_release);
0236 timesFailed_.store(0, std::memory_order_release);
0237 timesExcept_.store(0, std::memory_order_release);
0238 }
0239
0240 void addedToPath() { ++numberOfPathsOn_; }
0241
0242 int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
0243 int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
0244 int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
0245 int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
0246 int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
0247 State state() const { return state_; }
0248
0249 int timesPass() const { return timesPassed(); }
0250
0251 virtual bool hasAccumulator() const = 0;
0252
0253
0254 edm::WaitingTaskList& waitingTaskList() { return waitingTasks_; }
0255
0256 protected:
0257 template <typename O>
0258 friend class workerhelper::CallImpl;
0259
0260 virtual void doClearModule() = 0;
0261
0262 virtual std::string workerType() const = 0;
0263 virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0264
0265 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0266 virtual bool implNeedToRunSelection() const = 0;
0267
0268 virtual void implDoAcquire(EventTransitionInfo const&,
0269 ModuleCallingContext const*,
0270 WaitingTaskWithArenaHolder&) = 0;
0271
0272 virtual void implDoTransformAsync(WaitingTaskHolder,
0273 size_t iTransformIndex,
0274 EventPrincipal const&,
0275 ParentContext const&,
0276 ServiceWeakToken const&) = 0;
0277 virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const = 0;
0278
0279 virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0280 virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0281 virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0282 virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0283 virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0284 virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0285 virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0286 virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0287 virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0288 virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0289 virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0290 virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0291 virtual void implBeginJob() = 0;
0292 virtual void implEndJob() = 0;
0293 virtual void implBeginStream(StreamID) = 0;
0294 virtual void implEndStream(StreamID) = 0;
0295
0296 void resetModuleDescription(ModuleDescription const*);
0297
0298 ActivityRegistry* activityRegistry() { return actReg_.get(); }
0299
0300 private:
0301 template <typename T>
0302 bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0303
0304 virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0305 virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0306
0307 virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0308
0309 virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
0310 virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0311
0312 virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0313 ModuleCallingContext const& moduleCallingContext,
0314 Principal const& iPrincipal) const = 0;
0315
0316 virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0317 virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0318 virtual void implRespondToCloseOutputFile() = 0;
0319
0320 virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0321
0322 virtual TaskQueueAdaptor serializeRunModule() = 0;
0323
0324 bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
0325
0326 template <bool IS_EVENT>
0327 bool setPassed() {
0328 if (IS_EVENT) {
0329 timesPassed_.fetch_add(1, std::memory_order_relaxed);
0330 }
0331 state_ = Pass;
0332 return true;
0333 }
0334
0335 template <bool IS_EVENT>
0336 bool setFailed() {
0337 if (IS_EVENT) {
0338 timesFailed_.fetch_add(1, std::memory_order_relaxed);
0339 }
0340 state_ = Fail;
0341 return false;
0342 }
0343
0344 template <bool IS_EVENT>
0345 std::exception_ptr setException(std::exception_ptr iException) {
0346 if (IS_EVENT) {
0347 timesExcept_.fetch_add(1, std::memory_order_relaxed);
0348 }
0349 cached_exception_ = iException;
0350 state_ = Exception;
0351 return cached_exception_;
0352 }
0353
0354 template <typename T>
0355 void prefetchAsync(WaitingTaskHolder,
0356 ServiceToken const&,
0357 ParentContext const&,
0358 typename T::TransitionInfoType const&,
0359 Transition);
0360
0361 void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&);
0362 void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
0363
0364 bool needsESPrefetching(Transition iTrans) const noexcept {
0365 return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0366 }
0367
0368 void emitPostModuleEventPrefetchingSignal() {
0369 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0370 }
0371
0372 void emitPostModuleStreamPrefetchingSignal() {
0373 actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0374 moduleCallingContext_);
0375 }
0376
0377 void emitPostModuleGlobalPrefetchingSignal() {
0378 actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0379 moduleCallingContext_);
0380 }
0381
0382 virtual bool hasAcquire() const = 0;
0383
0384 template <typename T>
0385 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0386 typename T::TransitionInfoType const&,
0387 StreamID,
0388 ParentContext const&,
0389 typename T::Context const*);
0390
0391 void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0392
0393 void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0394 EventTransitionInfo const&,
0395 ParentContext const&,
0396 WaitingTaskWithArenaHolder);
0397
0398 std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext);
0399
0400 template <typename T>
0401 class RunModuleTask : public WaitingTask {
0402 public:
0403 RunModuleTask(Worker* worker,
0404 typename T::TransitionInfoType const& transitionInfo,
0405 ServiceToken const& token,
0406 StreamID streamID,
0407 ParentContext const& parentContext,
0408 typename T::Context const* context,
0409 oneapi::tbb::task_group* iGroup)
0410 : m_worker(worker),
0411 m_transitionInfo(transitionInfo),
0412 m_streamID(streamID),
0413 m_parentContext(parentContext),
0414 m_context(context),
0415 m_serviceToken(token),
0416 m_group(iGroup) {}
0417
0418 struct EnableQueueGuard {
0419 SerialTaskQueue* queue_;
0420 EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0421 EnableQueueGuard(EnableQueueGuard const&) = delete;
0422 EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0423 EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0424 EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0425 ~EnableQueueGuard() {
0426 if (queue_) {
0427 queue_->resume();
0428 }
0429 }
0430 };
0431
0432 void execute() final {
0433
0434 ServiceRegistry::Operate guard(m_serviceToken.lock());
0435
0436
0437
0438 std::exception_ptr temp_excptr;
0439 auto excptr = exceptionPtr();
0440 if constexpr (T::isEvent_) {
0441 if (!m_worker->hasAcquire()) {
0442
0443 CMS_SA_ALLOW try {
0444
0445 m_worker->emitPostModuleEventPrefetchingSignal();
0446 } catch (...) {
0447 temp_excptr = std::current_exception();
0448 if (not excptr) {
0449 excptr = temp_excptr;
0450 }
0451 }
0452 }
0453 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0454 m_worker->emitPostModuleStreamPrefetchingSignal();
0455 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0456 m_worker->emitPostModuleGlobalPrefetchingSignal();
0457 }
0458
0459 if (not excptr) {
0460 if (auto queue = m_worker->serializeRunModule()) {
0461 auto f = [worker = m_worker,
0462 info = m_transitionInfo,
0463 streamID = m_streamID,
0464 parentContext = m_parentContext,
0465 sContext = m_context,
0466 serviceToken = m_serviceToken]() {
0467
0468 ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0469
0470
0471
0472
0473 EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0474 std::exception_ptr ptr;
0475 worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0476 };
0477
0478 auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0479 if (gQueue) {
0480 gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0481 gQueue->pause();
0482 queue.push(*group, std::move(f));
0483 });
0484 } else {
0485 queue.push(*m_group, std::move(f));
0486 }
0487 return;
0488 }
0489 }
0490
0491 m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0492 }
0493
0494 private:
0495 Worker* m_worker;
0496 typename T::TransitionInfoType m_transitionInfo;
0497 StreamID m_streamID;
0498 ParentContext const m_parentContext;
0499 typename T::Context const* m_context;
0500 ServiceWeakToken m_serviceToken;
0501 oneapi::tbb::task_group* m_group;
0502 };
0503
0504
0505
0506
0507
0508 template <typename T, typename DUMMY = void>
0509 class AcquireTask : public WaitingTask {
0510 public:
0511 AcquireTask(Worker*,
0512 typename T::TransitionInfoType const&,
0513 ServiceToken const&,
0514 ParentContext const&,
0515 WaitingTaskWithArenaHolder) {}
0516 void execute() final {}
0517 };
0518
0519 template <typename DUMMY>
0520 class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0521 public:
0522 AcquireTask(Worker* worker,
0523 EventTransitionInfo const& eventTransitionInfo,
0524 ServiceToken const& token,
0525 ParentContext const& parentContext,
0526 WaitingTaskWithArenaHolder holder)
0527 : m_worker(worker),
0528 m_eventTransitionInfo(eventTransitionInfo),
0529 m_parentContext(parentContext),
0530 m_holder(std::move(holder)),
0531 m_serviceToken(token) {}
0532
0533 void execute() final {
0534
0535 ServiceRegistry::Operate guard(m_serviceToken.lock());
0536
0537
0538
0539 std::exception_ptr temp_excptr;
0540 auto excptr = exceptionPtr();
0541
0542 CMS_SA_ALLOW try {
0543
0544 m_worker->emitPostModuleEventPrefetchingSignal();
0545 } catch (...) {
0546 temp_excptr = std::current_exception();
0547 if (not excptr) {
0548 excptr = temp_excptr;
0549 }
0550 }
0551
0552 if (not excptr) {
0553 if (auto queue = m_worker->serializeRunModule()) {
0554 queue.push(*m_holder.group(),
0555 [worker = m_worker,
0556 info = m_eventTransitionInfo,
0557 parentContext = m_parentContext,
0558 serviceToken = m_serviceToken,
0559 holder = m_holder]() {
0560
0561 ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0562
0563 std::exception_ptr ptr;
0564 worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0565 });
0566 return;
0567 }
0568 }
0569
0570 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0571 }
0572
0573 private:
0574 Worker* m_worker;
0575 EventTransitionInfo m_eventTransitionInfo;
0576 ParentContext const m_parentContext;
0577 WaitingTaskWithArenaHolder m_holder;
0578 ServiceWeakToken m_serviceToken;
0579 };
0580
0581
0582
0583
0584 class HandleExternalWorkExceptionTask : public WaitingTask {
0585 public:
0586 HandleExternalWorkExceptionTask(Worker* worker,
0587 oneapi::tbb::task_group* group,
0588 WaitingTask* runModuleTask,
0589 ParentContext const& parentContext);
0590
0591 void execute() final;
0592
0593 private:
0594 Worker* m_worker;
0595 WaitingTask* m_runModuleTask;
0596 oneapi::tbb::task_group* m_group;
0597 ParentContext const m_parentContext;
0598 };
0599
0600 std::atomic<int> timesRun_;
0601 std::atomic<int> timesVisited_;
0602 std::atomic<int> timesPassed_;
0603 std::atomic<int> timesFailed_;
0604 std::atomic<int> timesExcept_;
0605 std::atomic<State> state_;
0606 int numberOfPathsOn_;
0607 std::atomic<int> numberOfPathsLeftToRun_;
0608
0609 ModuleCallingContext moduleCallingContext_;
0610
0611 ExceptionToActionTable const* actions_;
0612 CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;
0613
0614 std::shared_ptr<ActivityRegistry> actReg_;
0615
0616 edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0617
0618 edm::WaitingTaskList waitingTasks_;
0619 std::atomic<bool> workStarted_;
0620 bool ranAcquireWithoutException_;
0621 bool moduleValid_ = true;
0622 };
0623
0624 namespace {
0625 template <typename T>
0626 class ModuleSignalSentry {
0627 public:
0628 ModuleSignalSentry(ActivityRegistry* a,
0629 typename T::Context const* context,
0630 ModuleCallingContext const* moduleCallingContext)
0631 : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
0632 if (a_)
0633 T::preModuleSignal(a_, context, moduleCallingContext_);
0634 }
0635
0636 ~ModuleSignalSentry() {
0637 if (a_)
0638 T::postModuleSignal(a_, context_, moduleCallingContext_);
0639 }
0640
0641 private:
0642 ActivityRegistry* a_;
0643 typename T::Context const* context_;
0644 ModuleCallingContext const* moduleCallingContext_;
0645 };
0646
0647 }
0648
0649 namespace workerhelper {
0650 template <>
0651 class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0652 public:
0653 typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0654 static bool call(Worker* iWorker,
0655 StreamID,
0656 EventTransitionInfo const& info,
0657 ActivityRegistry* ,
0658 ModuleCallingContext const* mcc,
0659 Arg::Context const* ) {
0660
0661 return iWorker->implDo(info, mcc);
0662 }
0663 static void esPrefetchAsync(Worker* worker,
0664 WaitingTaskHolder waitingTask,
0665 ServiceToken const& token,
0666 EventTransitionInfo const& info,
0667 Transition transition) {
0668 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0669 }
0670 static bool wantsTransition(Worker const* iWorker) { return true; }
0671 static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
0672
0673 static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
0674 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0675 };
0676
0677 template <>
0678 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0679 public:
0680 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0681 static bool call(Worker* iWorker,
0682 StreamID,
0683 RunTransitionInfo const& info,
0684 ActivityRegistry* actReg,
0685 ModuleCallingContext const* mcc,
0686 Arg::Context const* context) {
0687 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0688 return iWorker->implDoBegin(info, mcc);
0689 }
0690 static void esPrefetchAsync(Worker* worker,
0691 WaitingTaskHolder waitingTask,
0692 ServiceToken const& token,
0693 RunTransitionInfo const& info,
0694 Transition transition) {
0695 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0696 }
0697 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0698 static bool needToRunSelection(Worker const* iWorker) { return false; }
0699 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0700 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0701 };
0702 template <>
0703 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0704 public:
0705 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0706 static bool call(Worker* iWorker,
0707 StreamID id,
0708 RunTransitionInfo const& info,
0709 ActivityRegistry* actReg,
0710 ModuleCallingContext const* mcc,
0711 Arg::Context const* context) {
0712 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0713 return iWorker->implDoStreamBegin(id, info, mcc);
0714 }
0715 static void esPrefetchAsync(Worker* worker,
0716 WaitingTaskHolder waitingTask,
0717 ServiceToken const& token,
0718 RunTransitionInfo const& info,
0719 Transition transition) {
0720 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0721 }
0722 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0723 static bool needToRunSelection(Worker const* iWorker) { return false; }
0724 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0725 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0726 };
0727 template <>
0728 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0729 public:
0730 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0731 static bool call(Worker* iWorker,
0732 StreamID,
0733 RunTransitionInfo const& info,
0734 ActivityRegistry* actReg,
0735 ModuleCallingContext const* mcc,
0736 Arg::Context const* context) {
0737 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0738 return iWorker->implDoEnd(info, mcc);
0739 }
0740 static void esPrefetchAsync(Worker* worker,
0741 WaitingTaskHolder waitingTask,
0742 ServiceToken const& token,
0743 RunTransitionInfo const& info,
0744 Transition transition) {
0745 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0746 }
0747 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0748 static bool needToRunSelection(Worker const* iWorker) { return false; }
0749 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0750 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0751 };
0752 template <>
0753 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0754 public:
0755 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0756 static bool call(Worker* iWorker,
0757 StreamID id,
0758 RunTransitionInfo const& info,
0759 ActivityRegistry* actReg,
0760 ModuleCallingContext const* mcc,
0761 Arg::Context const* context) {
0762 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0763 return iWorker->implDoStreamEnd(id, info, mcc);
0764 }
0765 static void esPrefetchAsync(Worker* worker,
0766 WaitingTaskHolder waitingTask,
0767 ServiceToken const& token,
0768 RunTransitionInfo const& info,
0769 Transition transition) {
0770 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0771 }
0772 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0773 static bool needToRunSelection(Worker const* iWorker) { return false; }
0774 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0775 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0776 };
0777
0778 template <>
0779 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0780 public:
0781 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0782 static bool call(Worker* iWorker,
0783 StreamID,
0784 LumiTransitionInfo const& info,
0785 ActivityRegistry* actReg,
0786 ModuleCallingContext const* mcc,
0787 Arg::Context const* context) {
0788 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0789 return iWorker->implDoBegin(info, mcc);
0790 }
0791 static void esPrefetchAsync(Worker* worker,
0792 WaitingTaskHolder waitingTask,
0793 ServiceToken const& token,
0794 LumiTransitionInfo const& info,
0795 Transition transition) {
0796 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0797 }
0798 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0799 static bool needToRunSelection(Worker const* iWorker) { return false; }
0800 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0801 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0802 };
0803 template <>
0804 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0805 public:
0806 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0807 static bool call(Worker* iWorker,
0808 StreamID id,
0809 LumiTransitionInfo const& info,
0810 ActivityRegistry* actReg,
0811 ModuleCallingContext const* mcc,
0812 Arg::Context const* context) {
0813 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0814 return iWorker->implDoStreamBegin(id, info, mcc);
0815 }
0816 static void esPrefetchAsync(Worker* worker,
0817 WaitingTaskHolder waitingTask,
0818 ServiceToken const& token,
0819 LumiTransitionInfo const& info,
0820 Transition transition) {
0821 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0822 }
0823 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0824 static bool needToRunSelection(Worker const* iWorker) { return false; }
0825 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0826 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0827 };
0828
0829 template <>
0830 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0831 public:
0832 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0833 static bool call(Worker* iWorker,
0834 StreamID,
0835 LumiTransitionInfo const& info,
0836 ActivityRegistry* actReg,
0837 ModuleCallingContext const* mcc,
0838 Arg::Context const* context) {
0839 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0840 return iWorker->implDoEnd(info, mcc);
0841 }
0842 static void esPrefetchAsync(Worker* worker,
0843 WaitingTaskHolder waitingTask,
0844 ServiceToken const& token,
0845 LumiTransitionInfo const& info,
0846 Transition transition) {
0847 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0848 }
0849 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0850 static bool needToRunSelection(Worker const* iWorker) { return false; }
0851 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0852 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0853 };
0854 template <>
0855 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0856 public:
0857 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0858 static bool call(Worker* iWorker,
0859 StreamID id,
0860 LumiTransitionInfo const& info,
0861 ActivityRegistry* actReg,
0862 ModuleCallingContext const* mcc,
0863 Arg::Context const* context) {
0864 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0865 return iWorker->implDoStreamEnd(id, info, mcc);
0866 }
0867 static void esPrefetchAsync(Worker* worker,
0868 WaitingTaskHolder waitingTask,
0869 ServiceToken const& token,
0870 LumiTransitionInfo const& info,
0871 Transition transition) {
0872 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0873 }
0874 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0875 static bool needToRunSelection(Worker const* iWorker) { return false; }
0876 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0877 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0878 };
0879 template <>
0880 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0881 public:
0882 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0883 static bool call(Worker* iWorker,
0884 StreamID,
0885 ProcessBlockTransitionInfo const& info,
0886 ActivityRegistry* actReg,
0887 ModuleCallingContext const* mcc,
0888 Arg::Context const* context) {
0889 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0890 return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0891 }
0892 static void esPrefetchAsync(
0893 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0894 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0895 static bool needToRunSelection(Worker const* iWorker) { return false; }
0896 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0897 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0898 };
0899 template <>
0900 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0901 public:
0902 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0903 static bool call(Worker* iWorker,
0904 StreamID,
0905 ProcessBlockTransitionInfo const& info,
0906 ActivityRegistry* actReg,
0907 ModuleCallingContext const* mcc,
0908 Arg::Context const* context) {
0909 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0910 return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0911 }
0912 static void esPrefetchAsync(
0913 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0914 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
0915 static bool needToRunSelection(Worker const* iWorker) { return false; }
0916 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0917 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0918 };
0919 template <>
0920 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
0921 public:
0922 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0923 static bool call(Worker* iWorker,
0924 StreamID,
0925 ProcessBlockTransitionInfo const& info,
0926 ActivityRegistry* actReg,
0927 ModuleCallingContext const* mcc,
0928 Arg::Context const* context) {
0929 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0930 return iWorker->implDoEndProcessBlock(info.principal(), mcc);
0931 }
0932 static void esPrefetchAsync(
0933 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0934 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0935 static bool needToRunSelection(Worker const* iWorker) { return false; }
0936 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0937 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0938 };
0939 }
0940
0941 template <typename T>
0942 void Worker::prefetchAsync(WaitingTaskHolder iTask,
0943 ServiceToken const& token,
0944 ParentContext const& parentContext,
0945 typename T::TransitionInfoType const& transitionInfo,
0946 Transition iTransition) {
0947 Principal const& principal = transitionInfo.principal();
0948
0949 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0950
0951 if constexpr (T::isEvent_) {
0952 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0953 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0954 actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0955 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0956 actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
0957 }
0958
0959 workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
0960 edPrefetchAsync(iTask, token, principal);
0961
0962 if (principal.branchType() == InEvent) {
0963 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
0964 }
0965 }
0966
0967 template <typename T>
0968 void Worker::doWorkAsync(WaitingTaskHolder task,
0969 typename T::TransitionInfoType const& transitionInfo,
0970 ServiceToken const& token,
0971 StreamID streamID,
0972 ParentContext const& parentContext,
0973 typename T::Context const* context) {
0974 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
0975 return;
0976 }
0977
0978
0979 bool expected = false;
0980 bool workStarted = workStarted_.compare_exchange_strong(expected, true);
0981
0982 waitingTasks_.add(task);
0983 if constexpr (T::isEvent_) {
0984 timesVisited_.fetch_add(1, std::memory_order_relaxed);
0985 }
0986
0987 if (workStarted) {
0988 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0989
0990
0991 if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
0992
0993
0994 WaitingTask* moduleTask =
0995 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
0996
0997
0998 struct DestroyTask {
0999 DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
1000
1001 ~DestroyTask() {
1002 auto p = m_task.exchange(nullptr);
1003 if (p) {
1004 TaskSentry s{p};
1005 }
1006 }
1007
1008 edm::WaitingTask* release() { return m_task.exchange(nullptr); }
1009
1010 private:
1011 std::atomic<edm::WaitingTask*> m_task;
1012 };
1013 if constexpr (T::isEvent_) {
1014 if (hasAcquire()) {
1015 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1016 ServiceWeakToken weakToken = token;
1017 auto* group = task.group();
1018 moduleTask = make_waiting_task(
1019 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1020 WaitingTaskWithArenaHolder runTaskHolder(
1021 *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1022 AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1023 t.execute();
1024 });
1025 }
1026 }
1027 auto* group = task.group();
1028 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1029 ServiceWeakToken weakToken = token;
1030 auto selectionTask =
1031 make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1032 std::exception_ptr const*) mutable {
1033 ServiceRegistry::Operate guard(weakToken.lock());
1034 prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1035 weakToken.lock(),
1036 parentContext,
1037 info,
1038 T::transition_);
1039 });
1040 prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1041 } else {
1042 WaitingTask* moduleTask =
1043 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1044 auto group = task.group();
1045 if constexpr (T::isEvent_) {
1046 if (hasAcquire()) {
1047 WaitingTaskWithArenaHolder runTaskHolder(
1048 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1049 moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1050 }
1051 }
1052 prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1053 }
1054 }
1055 }
1056
1057 template <typename T>
1058 std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1059 typename T::TransitionInfoType const& transitionInfo,
1060 StreamID streamID,
1061 ParentContext const& parentContext,
1062 typename T::Context const* context) {
1063 std::exception_ptr exceptionPtr;
1064 if (iEPtr) {
1065 if (shouldRethrowException(iEPtr, parentContext, T::isEvent_)) {
1066 exceptionPtr = iEPtr;
1067 setException<T::isEvent_>(exceptionPtr);
1068 } else {
1069 setPassed<T::isEvent_>();
1070 }
1071 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1072 } else {
1073
1074 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1075 exceptionPtr = std::current_exception();
1076 }
1077 }
1078 waitingTasks_.doneWaiting(exceptionPtr);
1079 return exceptionPtr;
1080 }
1081
1082 template <typename T>
1083 void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1084 typename T::TransitionInfoType const& transitionInfo,
1085 ServiceToken const& serviceToken,
1086 StreamID streamID,
1087 ParentContext const& parentContext,
1088 typename T::Context const* context) {
1089 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1090 return;
1091 }
1092
1093
1094 bool expected = false;
1095 auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1096
1097 waitingTasks_.add(task);
1098 if (workStarted) {
1099 ServiceWeakToken weakToken = serviceToken;
1100 auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1101 std::exception_ptr exceptionPtr;
1102
1103 CMS_SA_ALLOW try {
1104
1105 ServiceRegistry::Operate guard(weakToken.lock());
1106
1107 this->runModule<T>(info, streamID, parentContext, context);
1108 } catch (...) {
1109 exceptionPtr = std::current_exception();
1110 }
1111 this->waitingTasks_.doneWaiting(exceptionPtr);
1112 };
1113
1114 if (needsESPrefetching(T::transition_)) {
1115 auto group = task.group();
1116 auto afterPrefetch =
1117 edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1118 if (iExcept) {
1119 this->waitingTasks_.doneWaiting(*iExcept);
1120 } else {
1121 if (auto queue = this->serializeRunModule()) {
1122 queue.push(*group, toDo);
1123 } else {
1124 group->run(toDo);
1125 }
1126 }
1127 });
1128 esPrefetchAsync(
1129 WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1130 } else {
1131 auto group = task.group();
1132 if (auto queue = this->serializeRunModule()) {
1133 queue.push(*group, toDo);
1134 } else {
1135 group->run(toDo);
1136 }
1137 }
1138 }
1139 }
1140
1141 template <typename T>
1142 bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1143 StreamID streamID,
1144 ParentContext const& parentContext,
1145 typename T::Context const* context) {
1146
1147
1148
1149
1150 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1151 if constexpr (T::isEvent_) {
1152 timesRun_.fetch_add(1, std::memory_order_relaxed);
1153 }
1154
1155 bool rc = true;
1156 try {
1157 convertException::wrap([&]() {
1158 rc = workerhelper::CallImpl<T>::call(
1159 this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1160
1161 if (rc) {
1162 setPassed<T::isEvent_>();
1163 } else {
1164 setFailed<T::isEvent_>();
1165 }
1166 });
1167 } catch (cms::Exception& ex) {
1168 edm::exceptionContext(ex, moduleCallingContext_);
1169 if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1170 assert(not cached_exception_);
1171 setException<T::isEvent_>(std::current_exception());
1172 std::rethrow_exception(cached_exception_);
1173 } else {
1174 rc = setPassed<T::isEvent_>();
1175 }
1176 }
1177
1178 return rc;
1179 }
1180
1181 template <typename T>
1182 std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1183 StreamID streamID,
1184 ParentContext const& parentContext,
1185 typename T::Context const* context) {
1186 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1187 std::exception_ptr prefetchingException;
1188 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1189 }
1190 }
1191 #endif