File indexing completed on 2025-06-20 01:53:17
0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Framework/interface/ModuleConsumesMinimalESInfo.h"
0034 #include "FWCore/Concurrency/interface/WaitingTask.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0047 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0048 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0049 #include "FWCore/Concurrency/interface/FunctorTask.h"
0050 #include "FWCore/Utilities/interface/Exception.h"
0051 #include "FWCore/Utilities/interface/ConvertException.h"
0052 #include "FWCore/Utilities/interface/BranchType.h"
0053 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0054 #include "FWCore/Utilities/interface/StreamID.h"
0055 #include "FWCore/Utilities/interface/propagate_const.h"
0056 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0057 #include "FWCore/Utilities/interface/ESIndices.h"
0058 #include "FWCore/Utilities/interface/Transition.h"
0059
0060 #include "FWCore/Framework/interface/Frameworkfwd.h"
0061
0062 #include <array>
0063 #include <atomic>
0064 #include <cassert>
0065 #include <map>
0066 #include <memory>
0067 #include <sstream>
0068 #include <string>
0069 #include <vector>
0070 #include <exception>
0071 #include <unordered_map>
0072
0073 namespace edm {
0074 class EventPrincipal;
0075 class EventSetupImpl;
0076 class EarlyDeleteHelper;
0077 class ProductResolverIndexHelper;
0078 class ProductResolverIndexAndSkipBit;
0079 class ProductRegistry;
0080 class ThinnedAssociationsHelper;
0081
0082 namespace workerhelper {
0083 template <typename O>
0084 class CallImpl;
0085 }
0086 namespace eventsetup {
0087 struct ComponentDescription;
0088 class ESRecordsToProductResolverIndices;
0089 }
0090
0091 class Worker {
0092 public:
0093 enum State { Ready, Pass, Fail, Exception };
0094 enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0095 enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0096 struct TaskQueueAdaptor {
0097 SerialTaskQueueChain* serial_ = nullptr;
0098 LimitedTaskQueue* limited_ = nullptr;
0099
0100 TaskQueueAdaptor() = default;
0101 TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0102 TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0103
0104 operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0105
0106 template <class F>
0107 void push(oneapi::tbb::task_group& iG, F&& iF) {
0108 if (serial_) {
0109 serial_->push(iG, iF);
0110 } else {
0111 limited_->push(iG, iF);
0112 }
0113 }
0114 };
0115
0116 Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0117 virtual ~Worker();
0118
0119 Worker(Worker const&) = delete;
0120 Worker& operator=(Worker const&) = delete;
0121
0122 void clearModule() {
0123 moduleValid_ = false;
0124 doClearModule();
0125 }
0126
0127 virtual bool wantsProcessBlocks() const noexcept = 0;
0128 virtual bool wantsInputProcessBlocks() const noexcept = 0;
0129 virtual bool wantsGlobalRuns() const noexcept = 0;
0130 virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0131 virtual bool wantsStreamRuns() const noexcept = 0;
0132 virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0133
0134
0135 virtual SerialTaskQueue* globalRunsQueue() = 0;
0136
0137 virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0138
0139
0140 void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0141 WaitingTask* task,
0142 ServiceToken const&,
0143 StreamID stream,
0144 EventPrincipal const*) noexcept;
0145
0146 void prePrefetchSelectionAsync(
0147 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0148 assert(false);
0149 }
0150
0151 template <typename T>
0152 void doWorkAsync(WaitingTaskHolder,
0153 typename T::TransitionInfoType const&,
0154 ServiceToken const&,
0155 StreamID,
0156 ParentContext const&,
0157 typename T::Context const*) noexcept;
0158
0159 template <typename T>
0160 void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0161 typename T::TransitionInfoType const&,
0162 ServiceToken const&,
0163 StreamID,
0164 ParentContext const&,
0165 typename T::Context const*) noexcept;
0166
0167 template <typename T>
0168 std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0169 StreamID,
0170 ParentContext const&,
0171 typename T::Context const*) noexcept;
0172
0173 virtual size_t transformIndex(edm::ProductDescription const&) const noexcept = 0;
0174 void doTransformAsync(WaitingTaskHolder,
0175 size_t iTransformIndex,
0176 EventPrincipal const&,
0177 ServiceToken const&,
0178 StreamID,
0179 ModuleCallingContext const&,
0180 StreamContext const*) noexcept;
0181
0182 void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0183
0184 void skipOnPath(EventPrincipal const& iEvent);
0185 void beginJob(GlobalContext const&);
0186 void endJob(GlobalContext const&);
0187 void beginStream(StreamID, StreamContext const&);
0188 void endStream(StreamID, StreamContext const&);
0189 void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0190 void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0191 void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0192
0193 void reset() {
0194 cached_exception_ = std::exception_ptr();
0195 state_ = Ready;
0196 waitingTasks_.reset();
0197 workStarted_ = false;
0198 numberOfPathsLeftToRun_ = numberOfPathsOn_;
0199 }
0200
0201 void postDoEvent(EventPrincipal const&);
0202
0203 ModuleDescription const* description() const noexcept {
0204 if (moduleValid_) {
0205 return moduleCallingContext_.moduleDescription();
0206 }
0207 return nullptr;
0208 }
0209
0210
0211 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0212
0213 void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0214
0215 virtual std::vector<ModuleConsumesInfo> moduleConsumesInfos() const = 0;
0216 virtual std::vector<ModuleConsumesMinimalESInfo> moduleConsumesMinimalESInfos() const = 0;
0217
0218 virtual Types moduleType() const = 0;
0219 virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0220
0221 void clearCounters() noexcept {
0222 timesRun_.store(0, std::memory_order_release);
0223 timesVisited_.store(0, std::memory_order_release);
0224 timesPassed_.store(0, std::memory_order_release);
0225 timesFailed_.store(0, std::memory_order_release);
0226 timesExcept_.store(0, std::memory_order_release);
0227 }
0228
0229 void addedToPath() noexcept { ++numberOfPathsOn_; }
0230
0231 int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0232 int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0233 int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0234 int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0235 int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0236 State state() const noexcept { return state_; }
0237
0238 int timesPass() const noexcept { return timesPassed(); }
0239
0240 virtual bool hasAccumulator() const noexcept = 0;
0241
0242 virtual bool matchesBaseClassPointer(void const* iPtr) const noexcept = 0;
0243
0244 edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0245
0246 protected:
0247 template <typename O>
0248 friend class workerhelper::CallImpl;
0249
0250 virtual void doClearModule() = 0;
0251
0252 virtual std::string workerType() const = 0;
0253 virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0254
0255 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0256 virtual bool implNeedToRunSelection() const noexcept = 0;
0257
0258 virtual void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) = 0;
0259
0260 virtual void implDoTransformAsync(WaitingTaskHolder,
0261 size_t iTransformIndex,
0262 EventPrincipal const&,
0263 ParentContext const&,
0264 ServiceWeakToken const&) noexcept = 0;
0265 virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0266
0267 virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0268 virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0269 virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0270 virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0271 virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0272 virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0273 virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0274 virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0275 virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0276 virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0277 virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0278 virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0279 virtual void implBeginJob() = 0;
0280 virtual void implEndJob() = 0;
0281 virtual void implBeginStream(StreamID) = 0;
0282 virtual void implEndStream(StreamID) = 0;
0283
0284 void resetModuleDescription(ModuleDescription const*);
0285
0286 ActivityRegistry* activityRegistry() { return actReg_.get(); }
0287
0288 private:
0289 template <typename T>
0290 bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0291
0292 virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0293 virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0294
0295 virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0296
0297 virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0298 virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0299
0300 virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0301 ModuleCallingContext const& moduleCallingContext,
0302 Principal const& iPrincipal) const noexcept = 0;
0303
0304 virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0305 virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0306 virtual void implRespondToCloseOutputFile() = 0;
0307
0308 virtual TaskQueueAdaptor serializeRunModule() = 0;
0309
0310 bool shouldRethrowException(std::exception_ptr iPtr,
0311 ParentContext const& parentContext,
0312 bool isEvent,
0313 bool isTryToContinue) const noexcept;
0314 void checkForShouldTryToContinue(ModuleDescription const&);
0315
0316 template <bool IS_EVENT>
0317 bool setPassed() {
0318 if (IS_EVENT) {
0319 timesPassed_.fetch_add(1, std::memory_order_relaxed);
0320 }
0321 state_ = Pass;
0322 return true;
0323 }
0324
0325 template <bool IS_EVENT>
0326 bool setFailed() {
0327 if (IS_EVENT) {
0328 timesFailed_.fetch_add(1, std::memory_order_relaxed);
0329 }
0330 state_ = Fail;
0331 return false;
0332 }
0333
0334 template <bool IS_EVENT>
0335 std::exception_ptr setException(std::exception_ptr iException) {
0336 if (IS_EVENT) {
0337 timesExcept_.fetch_add(1, std::memory_order_relaxed);
0338 }
0339 cached_exception_ = iException;
0340 state_ = Exception;
0341 return cached_exception_;
0342 }
0343
0344 template <typename T>
0345 void prefetchAsync(WaitingTaskHolder,
0346 ServiceToken const&,
0347 ParentContext const&,
0348 typename T::TransitionInfoType const&,
0349 Transition) noexcept;
0350
0351 void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0352 void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0353
0354 bool needsESPrefetching(Transition iTrans) const noexcept {
0355 return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0356 }
0357
0358 void emitPostModuleEventPrefetchingSignal() {
0359 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0360 }
0361
0362 void emitPostModuleStreamPrefetchingSignal() {
0363 actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0364 moduleCallingContext_);
0365 }
0366
0367 void emitPostModuleGlobalPrefetchingSignal() {
0368 actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0369 moduleCallingContext_);
0370 }
0371
0372 virtual bool hasAcquire() const noexcept = 0;
0373
0374 template <typename T>
0375 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0376 typename T::TransitionInfoType const&,
0377 StreamID,
0378 ParentContext const&,
0379 typename T::Context const*) noexcept;
0380
0381
0382
0383 void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder);
0384
0385 void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0386 EventTransitionInfo const&,
0387 ParentContext const&,
0388 WaitingTaskHolder) noexcept;
0389
0390 std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0391 ParentContext const& parentContext) noexcept;
0392
0393 template <typename T>
0394 class RunModuleTask : public WaitingTask {
0395 public:
0396 RunModuleTask(Worker* worker,
0397 typename T::TransitionInfoType const& transitionInfo,
0398 ServiceToken const& token,
0399 StreamID streamID,
0400 ParentContext const& parentContext,
0401 typename T::Context const* context,
0402 oneapi::tbb::task_group* iGroup) noexcept
0403 : m_worker(worker),
0404 m_transitionInfo(transitionInfo),
0405 m_streamID(streamID),
0406 m_parentContext(parentContext),
0407 m_context(context),
0408 m_serviceToken(token),
0409 m_group(iGroup) {}
0410
0411 struct EnableQueueGuard {
0412 SerialTaskQueue* queue_;
0413 EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0414 EnableQueueGuard(EnableQueueGuard const&) = delete;
0415 EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0416 EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0417 EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0418 ~EnableQueueGuard() {
0419 if (queue_) {
0420 queue_->resume();
0421 }
0422 }
0423 };
0424
0425 void execute() final {
0426
0427 ServiceRegistry::Operate guard(m_serviceToken.lock());
0428
0429
0430
0431 std::exception_ptr temp_excptr;
0432 auto excptr = exceptionPtr();
0433 if constexpr (T::isEvent_) {
0434 if (!m_worker->hasAcquire()) {
0435
0436 CMS_SA_ALLOW try {
0437
0438 m_worker->emitPostModuleEventPrefetchingSignal();
0439 } catch (...) {
0440 temp_excptr = std::current_exception();
0441 if (not excptr) {
0442 excptr = temp_excptr;
0443 }
0444 }
0445 }
0446 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0447 m_worker->emitPostModuleStreamPrefetchingSignal();
0448 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0449 m_worker->emitPostModuleGlobalPrefetchingSignal();
0450 }
0451
0452 if (not excptr) {
0453 if (auto queue = m_worker->serializeRunModule()) {
0454 auto f = [worker = m_worker,
0455 info = m_transitionInfo,
0456 streamID = m_streamID,
0457 parentContext = m_parentContext,
0458 sContext = m_context,
0459 serviceToken = m_serviceToken]() {
0460
0461 ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0462
0463
0464
0465
0466 EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0467 std::exception_ptr ptr;
0468 worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0469 };
0470
0471 auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0472 if (gQueue) {
0473 gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0474 gQueue->pause();
0475 queue.push(*group, std::move(f));
0476 });
0477 } else {
0478 queue.push(*m_group, std::move(f));
0479 }
0480 return;
0481 }
0482 }
0483
0484 m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0485 }
0486
0487 private:
0488 Worker* m_worker;
0489 typename T::TransitionInfoType m_transitionInfo;
0490 StreamID m_streamID;
0491 ParentContext const m_parentContext;
0492 typename T::Context const* m_context;
0493 ServiceWeakToken m_serviceToken;
0494 oneapi::tbb::task_group* m_group;
0495 };
0496
0497
0498
0499
0500
0501 template <typename T, typename DUMMY = void>
0502 class AcquireTask : public WaitingTask {
0503 public:
0504 AcquireTask(Worker*,
0505 typename T::TransitionInfoType const&,
0506 ServiceToken const&,
0507 ParentContext const&,
0508 WaitingTaskHolder) noexcept {}
0509 void execute() final {}
0510 };
0511
0512 template <typename DUMMY>
0513 class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0514 public:
0515 AcquireTask(Worker* worker,
0516 EventTransitionInfo const& eventTransitionInfo,
0517 ServiceToken const& token,
0518 ParentContext const& parentContext,
0519 WaitingTaskHolder holder) noexcept
0520 : m_worker(worker),
0521 m_eventTransitionInfo(eventTransitionInfo),
0522 m_parentContext(parentContext),
0523 m_holder(std::move(holder)),
0524 m_serviceToken(token) {}
0525
0526 void execute() final {
0527
0528 ServiceRegistry::Operate guard(m_serviceToken.lock());
0529
0530
0531
0532 std::exception_ptr temp_excptr;
0533 auto excptr = exceptionPtr();
0534
0535 CMS_SA_ALLOW try {
0536
0537 m_worker->emitPostModuleEventPrefetchingSignal();
0538 } catch (...) {
0539 temp_excptr = std::current_exception();
0540 if (not excptr) {
0541 excptr = temp_excptr;
0542 }
0543 }
0544
0545 if (not excptr) {
0546 if (auto queue = m_worker->serializeRunModule()) {
0547 queue.push(*m_holder.group(),
0548 [worker = m_worker,
0549 info = m_eventTransitionInfo,
0550 parentContext = m_parentContext,
0551 serviceToken = m_serviceToken,
0552 holder = std::move(m_holder)]() mutable {
0553
0554 ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0555
0556 std::exception_ptr ptr;
0557 worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, std::move(holder));
0558 });
0559 return;
0560 }
0561 }
0562
0563 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0564 }
0565
0566 private:
0567 Worker* m_worker;
0568 EventTransitionInfo m_eventTransitionInfo;
0569 ParentContext const m_parentContext;
0570 WaitingTaskHolder m_holder;
0571 ServiceWeakToken m_serviceToken;
0572 };
0573
0574
0575
0576
0577 class HandleExternalWorkExceptionTask : public WaitingTask {
0578 public:
0579 HandleExternalWorkExceptionTask(Worker* worker,
0580 oneapi::tbb::task_group* group,
0581 WaitingTask* runModuleTask,
0582 ParentContext const& parentContext) noexcept;
0583
0584 void execute() final;
0585
0586 private:
0587 Worker* m_worker;
0588 WaitingTask* m_runModuleTask;
0589 oneapi::tbb::task_group* m_group;
0590 ParentContext const m_parentContext;
0591 };
0592
0593 std::atomic<int> timesRun_;
0594 std::atomic<int> timesVisited_;
0595 std::atomic<int> timesPassed_;
0596 std::atomic<int> timesFailed_;
0597 std::atomic<int> timesExcept_;
0598 std::atomic<State> state_;
0599 int numberOfPathsOn_;
0600 std::atomic<int> numberOfPathsLeftToRun_;
0601
0602 ModuleCallingContext moduleCallingContext_;
0603
0604 ExceptionToActionTable const* actions_;
0605 CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;
0606
0607 std::shared_ptr<ActivityRegistry> actReg_;
0608
0609 edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0610
0611 edm::WaitingTaskList waitingTasks_;
0612 std::atomic<bool> workStarted_;
0613 bool ranAcquireWithoutException_;
0614 bool moduleValid_ = true;
0615 bool shouldTryToContinue_ = false;
0616 bool beginSucceeded_ = false;
0617 };
0618
0619 namespace {
0620 template <typename T>
0621 class ModuleSignalSentry {
0622 public:
0623 ModuleSignalSentry(ActivityRegistry* a,
0624 typename T::Context const* context,
0625 ModuleCallingContext const* moduleCallingContext)
0626 : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
0627
0628 ~ModuleSignalSentry() {
0629
0630
0631
0632
0633 CMS_SA_ALLOW try {
0634 if (a_) {
0635 T::postModuleSignal(a_, context_, moduleCallingContext_);
0636 }
0637 } catch (...) {
0638 }
0639 }
0640 void preModuleSignal() {
0641 if (a_) {
0642 try {
0643 convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
0644 } catch (cms::Exception& ex) {
0645 ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
0646 throw;
0647 }
0648 }
0649 }
0650 void postModuleSignal() {
0651 if (a_) {
0652 auto temp = a_;
0653
0654
0655 a_ = nullptr;
0656 try {
0657 convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
0658 } catch (cms::Exception& ex) {
0659 ex.addContext("Handling post module signal, likely in a service function immediately after module method");
0660 throw;
0661 }
0662 }
0663 }
0664
0665 private:
0666 ActivityRegistry* a_;
0667 typename T::Context const* context_;
0668 ModuleCallingContext const* moduleCallingContext_;
0669 };
0670
0671 }
0672
0673 namespace workerhelper {
0674 template <>
0675 class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0676 public:
0677 typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0678 static bool call(Worker* iWorker,
0679 StreamID,
0680 EventTransitionInfo const& info,
0681 ActivityRegistry* ,
0682 ModuleCallingContext const* mcc,
0683 Arg::Context const* ) {
0684
0685 return iWorker->implDo(info, mcc);
0686 }
0687 static void esPrefetchAsync(Worker* worker,
0688 WaitingTaskHolder waitingTask,
0689 ServiceToken const& token,
0690 EventTransitionInfo const& info,
0691 Transition transition) noexcept {
0692 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0693 }
0694 static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0695 static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0696
0697 static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0698 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0699 };
0700
0701 template <>
0702 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0703 public:
0704 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0705 static bool call(Worker* iWorker,
0706 StreamID,
0707 RunTransitionInfo const& info,
0708 ActivityRegistry* actReg,
0709 ModuleCallingContext const* mcc,
0710 Arg::Context const* context) {
0711 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0712
0713
0714 cpp.preModuleSignal();
0715
0716
0717 auto returnValue = iWorker->implDoBegin(info, mcc);
0718
0719 cpp.postModuleSignal();
0720 iWorker->beginSucceeded_ = true;
0721 return returnValue;
0722 }
0723 static void esPrefetchAsync(Worker* worker,
0724 WaitingTaskHolder waitingTask,
0725 ServiceToken const& token,
0726 RunTransitionInfo const& info,
0727 Transition transition) noexcept {
0728 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0729 }
0730 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0731 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0732 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0733 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0734 };
0735 template <>
0736 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0737 public:
0738 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0739 static bool call(Worker* iWorker,
0740 StreamID id,
0741 RunTransitionInfo const& info,
0742 ActivityRegistry* actReg,
0743 ModuleCallingContext const* mcc,
0744 Arg::Context const* context) {
0745 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0746 cpp.preModuleSignal();
0747 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0748 cpp.postModuleSignal();
0749 iWorker->beginSucceeded_ = true;
0750 return returnValue;
0751 }
0752 static void esPrefetchAsync(Worker* worker,
0753 WaitingTaskHolder waitingTask,
0754 ServiceToken const& token,
0755 RunTransitionInfo const& info,
0756 Transition transition) noexcept {
0757 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0758 }
0759 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0760 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0761 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0762 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0763 };
0764 template <>
0765 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0766 public:
0767 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0768 static bool call(Worker* iWorker,
0769 StreamID,
0770 RunTransitionInfo const& info,
0771 ActivityRegistry* actReg,
0772 ModuleCallingContext const* mcc,
0773 Arg::Context const* context) {
0774 if (iWorker->beginSucceeded_) {
0775 iWorker->beginSucceeded_ = false;
0776
0777 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0778 cpp.preModuleSignal();
0779 auto returnValue = iWorker->implDoEnd(info, mcc);
0780 cpp.postModuleSignal();
0781 return returnValue;
0782 }
0783 return true;
0784 }
0785 static void esPrefetchAsync(Worker* worker,
0786 WaitingTaskHolder waitingTask,
0787 ServiceToken const& token,
0788 RunTransitionInfo const& info,
0789 Transition transition) noexcept {
0790 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0791 }
0792 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0793 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0794 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0795 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0796 };
0797 template <>
0798 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0799 public:
0800 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0801 static bool call(Worker* iWorker,
0802 StreamID id,
0803 RunTransitionInfo const& info,
0804 ActivityRegistry* actReg,
0805 ModuleCallingContext const* mcc,
0806 Arg::Context const* context) {
0807 if (iWorker->beginSucceeded_) {
0808 iWorker->beginSucceeded_ = false;
0809
0810 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0811 cpp.preModuleSignal();
0812 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0813 cpp.postModuleSignal();
0814 return returnValue;
0815 }
0816 return true;
0817 }
0818 static void esPrefetchAsync(Worker* worker,
0819 WaitingTaskHolder waitingTask,
0820 ServiceToken const& token,
0821 RunTransitionInfo const& info,
0822 Transition transition) noexcept {
0823 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0824 }
0825 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0826 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0827 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0828 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0829 };
0830
0831 template <>
0832 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0833 public:
0834 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0835 static bool call(Worker* iWorker,
0836 StreamID,
0837 LumiTransitionInfo const& info,
0838 ActivityRegistry* actReg,
0839 ModuleCallingContext const* mcc,
0840 Arg::Context const* context) {
0841 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0842 cpp.preModuleSignal();
0843 auto returnValue = iWorker->implDoBegin(info, mcc);
0844 cpp.postModuleSignal();
0845 iWorker->beginSucceeded_ = true;
0846 return returnValue;
0847 }
0848 static void esPrefetchAsync(Worker* worker,
0849 WaitingTaskHolder waitingTask,
0850 ServiceToken const& token,
0851 LumiTransitionInfo const& info,
0852 Transition transition) noexcept {
0853 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0854 }
0855 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0856 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0857 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0858 return iWorker->globalLuminosityBlocksQueue();
0859 }
0860 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0861 };
0862 template <>
0863 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0864 public:
0865 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0866 static bool call(Worker* iWorker,
0867 StreamID id,
0868 LumiTransitionInfo const& info,
0869 ActivityRegistry* actReg,
0870 ModuleCallingContext const* mcc,
0871 Arg::Context const* context) {
0872 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0873 cpp.preModuleSignal();
0874 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0875 cpp.postModuleSignal();
0876 iWorker->beginSucceeded_ = true;
0877 return returnValue;
0878 }
0879 static void esPrefetchAsync(Worker* worker,
0880 WaitingTaskHolder waitingTask,
0881 ServiceToken const& token,
0882 LumiTransitionInfo const& info,
0883 Transition transition) noexcept {
0884 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0885 }
0886 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0887 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0888 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0889 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0890 };
0891
0892 template <>
0893 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0894 public:
0895 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0896 static bool call(Worker* iWorker,
0897 StreamID,
0898 LumiTransitionInfo const& info,
0899 ActivityRegistry* actReg,
0900 ModuleCallingContext const* mcc,
0901 Arg::Context const* context) {
0902 if (iWorker->beginSucceeded_) {
0903 iWorker->beginSucceeded_ = false;
0904
0905 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0906 cpp.preModuleSignal();
0907 auto returnValue = iWorker->implDoEnd(info, mcc);
0908 cpp.postModuleSignal();
0909 return returnValue;
0910 }
0911 return true;
0912 }
0913 static void esPrefetchAsync(Worker* worker,
0914 WaitingTaskHolder waitingTask,
0915 ServiceToken const& token,
0916 LumiTransitionInfo const& info,
0917 Transition transition) noexcept {
0918 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0919 }
0920 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0921 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0922 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0923 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0924 return iWorker->globalLuminosityBlocksQueue();
0925 }
0926 };
0927 template <>
0928 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0929 public:
0930 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0931 static bool call(Worker* iWorker,
0932 StreamID id,
0933 LumiTransitionInfo const& info,
0934 ActivityRegistry* actReg,
0935 ModuleCallingContext const* mcc,
0936 Arg::Context const* context) {
0937 if (iWorker->beginSucceeded_) {
0938 iWorker->beginSucceeded_ = false;
0939
0940 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0941 cpp.preModuleSignal();
0942 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0943 cpp.postModuleSignal();
0944 return returnValue;
0945 }
0946 return true;
0947 }
0948 static void esPrefetchAsync(Worker* worker,
0949 WaitingTaskHolder waitingTask,
0950 ServiceToken const& token,
0951 LumiTransitionInfo const& info,
0952 Transition transition) noexcept {
0953 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0954 }
0955 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0956 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0957 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0958 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0959 };
0960 template <>
0961 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0962 public:
0963 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0964 static bool call(Worker* iWorker,
0965 StreamID,
0966 ProcessBlockTransitionInfo const& info,
0967 ActivityRegistry* actReg,
0968 ModuleCallingContext const* mcc,
0969 Arg::Context const* context) {
0970 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0971 cpp.preModuleSignal();
0972 auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0973 cpp.postModuleSignal();
0974 iWorker->beginSucceeded_ = true;
0975 return returnValue;
0976 }
0977 static void esPrefetchAsync(
0978 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0979 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0980 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0981 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0982 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0983 };
0984 template <>
0985 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0986 public:
0987 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0988 static bool call(Worker* iWorker,
0989 StreamID,
0990 ProcessBlockTransitionInfo const& info,
0991 ActivityRegistry* actReg,
0992 ModuleCallingContext const* mcc,
0993 Arg::Context const* context) {
0994 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0995 cpp.preModuleSignal();
0996 auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0997 cpp.postModuleSignal();
0998 return returnValue;
0999 }
1000 static void esPrefetchAsync(
1001 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1002 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1003 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1004 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1005 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1006 };
1007 template <>
1008 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
1009 public:
1010 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1011 static bool call(Worker* iWorker,
1012 StreamID,
1013 ProcessBlockTransitionInfo const& info,
1014 ActivityRegistry* actReg,
1015 ModuleCallingContext const* mcc,
1016 Arg::Context const* context) {
1017 if (iWorker->beginSucceeded_) {
1018 iWorker->beginSucceeded_ = false;
1019
1020 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1021 cpp.preModuleSignal();
1022 auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1023 cpp.postModuleSignal();
1024 return returnValue;
1025 }
1026 return true;
1027 }
1028 static void esPrefetchAsync(
1029 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1030 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1031 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1032 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1033 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1034 };
1035 }
1036
1037 template <typename T>
1038 void Worker::prefetchAsync(WaitingTaskHolder iTask,
1039 ServiceToken const& token,
1040 ParentContext const& parentContext,
1041 typename T::TransitionInfoType const& transitionInfo,
1042 Transition iTransition) noexcept {
1043 Principal const& principal = transitionInfo.principal();
1044
1045 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1046
1047 if constexpr (T::isEvent_) {
1048 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1049 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1050 actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1051 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1052 actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1053 }
1054
1055 workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1056 edPrefetchAsync(iTask, token, principal);
1057
1058 if (principal.branchType() == InEvent) {
1059 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1060 }
1061 }
1062
1063 template <typename T>
1064 void Worker::doWorkAsync(WaitingTaskHolder task,
1065 typename T::TransitionInfoType const& transitionInfo,
1066 ServiceToken const& token,
1067 StreamID streamID,
1068 ParentContext const& parentContext,
1069 typename T::Context const* context) noexcept {
1070 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1071 return;
1072 }
1073
1074
1075 bool expected = false;
1076 bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1077
1078 waitingTasks_.add(task);
1079 if constexpr (T::isEvent_) {
1080 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1081 }
1082
1083 if (workStarted) {
1084 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1085
1086
1087 if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1088
1089
1090 WaitingTask* moduleTask =
1091 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1092
1093
1094 struct DestroyTask {
1095 DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1096
1097 ~DestroyTask() noexcept {
1098 auto p = m_task.exchange(nullptr);
1099 if (p) {
1100 TaskSentry s{p};
1101 }
1102 }
1103
1104 edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1105
1106 private:
1107 std::atomic<edm::WaitingTask*> m_task;
1108 };
1109 if constexpr (T::isEvent_) {
1110 if (hasAcquire()) {
1111 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1112 ServiceWeakToken weakToken = token;
1113 auto* group = task.group();
1114 moduleTask = make_waiting_task(
1115 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1116 WaitingTaskHolder runTaskHolder(
1117 *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1118 AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1119 t.execute();
1120 });
1121 }
1122 }
1123 auto* group = task.group();
1124 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1125 ServiceWeakToken weakToken = token;
1126 auto selectionTask =
1127 make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1128 std::exception_ptr const*) mutable {
1129 ServiceRegistry::Operate guard(weakToken.lock());
1130 prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1131 weakToken.lock(),
1132 parentContext,
1133 info,
1134 T::transition_);
1135 });
1136 prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1137 } else {
1138 WaitingTask* moduleTask =
1139 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1140 auto group = task.group();
1141 if constexpr (T::isEvent_) {
1142 if (hasAcquire()) {
1143 WaitingTaskHolder runTaskHolder(
1144 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1145 moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1146 }
1147 }
1148 prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1149 }
1150 }
1151 }
1152
1153 template <typename T>
1154 std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1155 typename T::TransitionInfoType const& transitionInfo,
1156 StreamID streamID,
1157 ParentContext const& parentContext,
1158 typename T::Context const* context) noexcept {
1159 std::exception_ptr exceptionPtr;
1160 bool shouldRun = true;
1161 if (iEPtr) {
1162 if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1163 exceptionPtr = iEPtr;
1164 setException<T::isEvent_>(exceptionPtr);
1165 shouldRun = false;
1166 } else {
1167 if (not shouldTryToContinue_) {
1168 setPassed<T::isEvent_>();
1169 shouldRun = false;
1170 }
1171 }
1172 }
1173 if (shouldRun) {
1174
1175 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1176 exceptionPtr = std::current_exception();
1177 }
1178 } else {
1179 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1180 }
1181 waitingTasks_.doneWaiting(exceptionPtr);
1182 return exceptionPtr;
1183 }
1184
1185 template <typename T>
1186 void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1187 typename T::TransitionInfoType const& transitionInfo,
1188 ServiceToken const& serviceToken,
1189 StreamID streamID,
1190 ParentContext const& parentContext,
1191 typename T::Context const* context) noexcept {
1192 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1193 return;
1194 }
1195
1196
1197 bool expected = false;
1198 auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1199
1200 waitingTasks_.add(task);
1201 if (workStarted) {
1202 ServiceWeakToken weakToken = serviceToken;
1203 auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1204 std::exception_ptr exceptionPtr;
1205
1206 CMS_SA_ALLOW try {
1207
1208 ServiceRegistry::Operate guard(weakToken.lock());
1209
1210 this->runModule<T>(info, streamID, parentContext, context);
1211 } catch (...) {
1212 exceptionPtr = std::current_exception();
1213 }
1214 this->waitingTasks_.doneWaiting(exceptionPtr);
1215 };
1216
1217 if (needsESPrefetching(T::transition_)) {
1218 auto group = task.group();
1219 auto afterPrefetch =
1220 edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1221 if (iExcept) {
1222 this->waitingTasks_.doneWaiting(*iExcept);
1223 } else {
1224 if (auto queue = this->serializeRunModule()) {
1225 queue.push(*group, toDo);
1226 } else {
1227 group->run(toDo);
1228 }
1229 }
1230 });
1231 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1232 esPrefetchAsync(
1233 WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1234 } else {
1235 auto group = task.group();
1236 if (auto queue = this->serializeRunModule()) {
1237 queue.push(*group, toDo);
1238 } else {
1239 group->run(toDo);
1240 }
1241 }
1242 }
1243 }
1244
1245 template <typename T>
1246 bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1247 StreamID streamID,
1248 ParentContext const& parentContext,
1249 typename T::Context const* context) {
1250
1251
1252
1253
1254 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1255 if constexpr (T::isEvent_) {
1256 timesRun_.fetch_add(1, std::memory_order_relaxed);
1257 }
1258
1259 bool rc = true;
1260 try {
1261 convertException::wrap([&]() {
1262 rc = workerhelper::CallImpl<T>::call(
1263 this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1264
1265 if (rc) {
1266 setPassed<T::isEvent_>();
1267 } else {
1268 setFailed<T::isEvent_>();
1269 }
1270 });
1271 } catch (cms::Exception& ex) {
1272 edm::exceptionContext(ex, moduleCallingContext_);
1273 if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1274 assert(not cached_exception_);
1275 setException<T::isEvent_>(std::current_exception());
1276 std::rethrow_exception(cached_exception_);
1277 } else {
1278 rc = setPassed<T::isEvent_>();
1279 }
1280 }
1281
1282 return rc;
1283 }
1284
1285 template <typename T>
1286 std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1287 StreamID streamID,
1288 ParentContext const& parentContext,
1289 typename T::Context const* context) noexcept {
1290 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1291 std::exception_ptr prefetchingException;
1292 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1293 }
1294 }
1295 #endif