File indexing completed on 2025-06-29 22:58:01
0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/maker/ModuleSignalSentry.h"
0030 #include "FWCore/Framework/interface/ExceptionActions.h"
0031 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0032 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0033 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0034 #include "FWCore/Framework/interface/ModuleConsumesMinimalESInfo.h"
0035 #include "FWCore/Concurrency/interface/WaitingTask.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0037 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0038 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0039 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0040 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0041 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0043 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0046 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0047 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0048 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0049 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0050 #include "FWCore/Concurrency/interface/FunctorTask.h"
0051 #include "FWCore/Utilities/interface/Exception.h"
0052 #include "FWCore/Utilities/interface/ConvertException.h"
0053 #include "FWCore/Utilities/interface/BranchType.h"
0054 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0055 #include "FWCore/Utilities/interface/StreamID.h"
0056 #include "FWCore/Utilities/interface/propagate_const.h"
0057 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0058 #include "FWCore/Utilities/interface/ESIndices.h"
0059 #include "FWCore/Utilities/interface/Transition.h"
0060
0061 #include "FWCore/Framework/interface/Frameworkfwd.h"
0062
0063 #include <array>
0064 #include <atomic>
0065 #include <cassert>
0066 #include <map>
0067 #include <memory>
0068 #include <sstream>
0069 #include <string>
0070 #include <vector>
0071 #include <exception>
0072 #include <unordered_map>
0073
0074 namespace edm {
0075 class EventPrincipal;
0076 class EventSetupImpl;
0077 class EarlyDeleteHelper;
0078 class ProductResolverIndexHelper;
0079 class ProductResolverIndexAndSkipBit;
0080 class ProductRegistry;
0081 class ThinnedAssociationsHelper;
0082
0083 namespace workerhelper {
0084 template <typename O>
0085 class CallImpl;
0086 }
0087 namespace eventsetup {
0088 struct ComponentDescription;
0089 class ESRecordsToProductResolverIndices;
0090 }
0091
0092 class Worker {
0093 public:
0094 enum State { Ready, Pass, Fail, Exception };
0095 enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096 enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0097 struct TaskQueueAdaptor {
0098 SerialTaskQueueChain* serial_ = nullptr;
0099 LimitedTaskQueue* limited_ = nullptr;
0100
0101 TaskQueueAdaptor() = default;
0102 TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103 TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104
0105 operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106
0107 template <class F>
0108 void push(oneapi::tbb::task_group& iG, F&& iF) {
0109 if (serial_) {
0110 serial_->push(iG, iF);
0111 } else {
0112 limited_->push(iG, iF);
0113 }
0114 }
0115 };
0116
0117 Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118 virtual ~Worker();
0119
0120 Worker(Worker const&) = delete;
0121 Worker& operator=(Worker const&) = delete;
0122
0123 void clearModule() {
0124 moduleValid_ = false;
0125 doClearModule();
0126 }
0127
0128 virtual bool wantsProcessBlocks() const noexcept = 0;
0129 virtual bool wantsInputProcessBlocks() const noexcept = 0;
0130 virtual bool wantsGlobalRuns() const noexcept = 0;
0131 virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0132 virtual bool wantsStreamRuns() const noexcept = 0;
0133 virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0134
0135
0136 virtual SerialTaskQueue* globalRunsQueue() = 0;
0137
0138 virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0139
0140
0141 void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0142 WaitingTask* task,
0143 ServiceToken const&,
0144 StreamID stream,
0145 EventPrincipal const*) noexcept;
0146
0147 void prePrefetchSelectionAsync(
0148 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0149 assert(false);
0150 }
0151
0152 template <typename T>
0153 void doWorkAsync(WaitingTaskHolder,
0154 typename T::TransitionInfoType const&,
0155 ServiceToken const&,
0156 StreamID,
0157 ParentContext const&,
0158 typename T::Context const*) noexcept;
0159
0160 template <typename T>
0161 void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0162 typename T::TransitionInfoType const&,
0163 ServiceToken const&,
0164 StreamID,
0165 ParentContext const&,
0166 typename T::Context const*) noexcept;
0167
0168 template <typename T>
0169 std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0170 StreamID,
0171 ParentContext const&,
0172 typename T::Context const*) noexcept;
0173
0174 virtual size_t transformIndex(edm::ProductDescription const&) const noexcept = 0;
0175 void doTransformAsync(WaitingTaskHolder,
0176 size_t iTransformIndex,
0177 EventPrincipal const&,
0178 ServiceToken const&,
0179 StreamID,
0180 ModuleCallingContext const&,
0181 StreamContext const*) noexcept;
0182
0183 void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0184
0185 void skipOnPath(EventPrincipal const& iEvent);
0186
0187 void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0188 void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0189 void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0190
0191 void reset() {
0192 cached_exception_ = std::exception_ptr();
0193 state_ = Ready;
0194 waitingTasks_.reset();
0195 workStarted_ = false;
0196 numberOfPathsLeftToRun_ = numberOfPathsOn_;
0197 }
0198
0199 void postDoEvent(EventPrincipal const&);
0200
0201 ModuleDescription const* description() const noexcept {
0202 if (moduleValid_) {
0203 return moduleCallingContext_.moduleDescription();
0204 }
0205 return nullptr;
0206 }
0207
0208
0209 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0210
0211 void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0212
0213 virtual std::vector<ModuleConsumesInfo> moduleConsumesInfos() const = 0;
0214 virtual std::vector<ModuleConsumesMinimalESInfo> moduleConsumesMinimalESInfos() const = 0;
0215
0216 virtual Types moduleType() const = 0;
0217 virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0218
0219 void clearCounters() noexcept {
0220 timesRun_.store(0, std::memory_order_release);
0221 timesVisited_.store(0, std::memory_order_release);
0222 timesPassed_.store(0, std::memory_order_release);
0223 timesFailed_.store(0, std::memory_order_release);
0224 timesExcept_.store(0, std::memory_order_release);
0225 }
0226
0227 void addedToPath() noexcept { ++numberOfPathsOn_; }
0228
0229 int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0230 int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0231 int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0232 int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0233 int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0234 State state() const noexcept { return state_; }
0235
0236 int timesPass() const noexcept { return timesPassed(); }
0237
0238 virtual bool hasAccumulator() const noexcept = 0;
0239
0240 virtual bool matchesBaseClassPointer(void const* iPtr) const noexcept = 0;
0241
0242 edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0243
0244 protected:
0245 template <typename O>
0246 friend class workerhelper::CallImpl;
0247
0248 virtual void doClearModule() = 0;
0249
0250 virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0251
0252 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0253 virtual bool implNeedToRunSelection() const noexcept = 0;
0254
0255 virtual void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) = 0;
0256
0257 virtual void implDoTransformAsync(WaitingTaskHolder,
0258 size_t iTransformIndex,
0259 EventPrincipal const&,
0260 ParentContext const&,
0261 ServiceWeakToken const&) noexcept = 0;
0262 virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0263
0264 virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0265 virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0266 virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0267 virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0268 virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0269 virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0270 virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0271 virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0272 virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0273 virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0274 virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0275 virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0276
0277 void resetModuleDescription(ModuleDescription const*);
0278
0279 ActivityRegistry* activityRegistry() { return actReg_.get(); }
0280
0281 private:
0282 template <typename T>
0283 bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0284
0285 virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0286 virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0287
0288 virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0289
0290 virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0291 virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0292
0293 virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0294 ModuleCallingContext const& moduleCallingContext,
0295 Principal const& iPrincipal) const noexcept = 0;
0296
0297 virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0298 virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0299 virtual void implRespondToCloseOutputFile() = 0;
0300
0301 virtual TaskQueueAdaptor serializeRunModule() = 0;
0302
0303 bool shouldRethrowException(std::exception_ptr iPtr,
0304 ParentContext const& parentContext,
0305 bool isEvent,
0306 bool isTryToContinue) const noexcept;
0307 void checkForShouldTryToContinue(ModuleDescription const&);
0308
0309 template <bool IS_EVENT>
0310 bool setPassed() {
0311 if (IS_EVENT) {
0312 timesPassed_.fetch_add(1, std::memory_order_relaxed);
0313 }
0314 state_ = Pass;
0315 return true;
0316 }
0317
0318 template <bool IS_EVENT>
0319 bool setFailed() {
0320 if (IS_EVENT) {
0321 timesFailed_.fetch_add(1, std::memory_order_relaxed);
0322 }
0323 state_ = Fail;
0324 return false;
0325 }
0326
0327 template <bool IS_EVENT>
0328 std::exception_ptr setException(std::exception_ptr iException) {
0329 if (IS_EVENT) {
0330 timesExcept_.fetch_add(1, std::memory_order_relaxed);
0331 }
0332 cached_exception_ = iException;
0333 state_ = Exception;
0334 return cached_exception_;
0335 }
0336
0337 template <typename T>
0338 void prefetchAsync(WaitingTaskHolder,
0339 ServiceToken const&,
0340 ParentContext const&,
0341 typename T::TransitionInfoType const&,
0342 Transition) noexcept;
0343
0344 void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0345 void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0346
0347 bool needsESPrefetching(Transition iTrans) const noexcept {
0348 return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0349 }
0350
0351 void emitPostModuleEventPrefetchingSignal() {
0352 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0353 }
0354
0355 void emitPostModuleStreamPrefetchingSignal() {
0356 actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0357 moduleCallingContext_);
0358 }
0359
0360 void emitPostModuleGlobalPrefetchingSignal() {
0361 actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0362 moduleCallingContext_);
0363 }
0364
0365 virtual bool hasAcquire() const noexcept = 0;
0366
0367 template <typename T>
0368 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0369 typename T::TransitionInfoType const&,
0370 StreamID,
0371 ParentContext const&,
0372 typename T::Context const*) noexcept;
0373
0374
0375
0376 void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder);
0377
0378 void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0379 EventTransitionInfo const&,
0380 ParentContext const&,
0381 WaitingTaskHolder) noexcept;
0382
0383 std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0384 ParentContext const& parentContext) noexcept;
0385
0386 template <typename T>
0387 class RunModuleTask : public WaitingTask {
0388 public:
0389 RunModuleTask(Worker* worker,
0390 typename T::TransitionInfoType const& transitionInfo,
0391 ServiceToken const& token,
0392 StreamID streamID,
0393 ParentContext const& parentContext,
0394 typename T::Context const* context,
0395 oneapi::tbb::task_group* iGroup) noexcept
0396 : m_worker(worker),
0397 m_transitionInfo(transitionInfo),
0398 m_streamID(streamID),
0399 m_parentContext(parentContext),
0400 m_context(context),
0401 m_serviceToken(token),
0402 m_group(iGroup) {}
0403
0404 struct EnableQueueGuard {
0405 SerialTaskQueue* queue_;
0406 EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0407 EnableQueueGuard(EnableQueueGuard const&) = delete;
0408 EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0409 EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0410 EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0411 ~EnableQueueGuard() {
0412 if (queue_) {
0413 queue_->resume();
0414 }
0415 }
0416 };
0417
0418 void execute() final {
0419
0420 ServiceRegistry::Operate guard(m_serviceToken.lock());
0421
0422
0423
0424 std::exception_ptr temp_excptr;
0425 auto excptr = exceptionPtr();
0426 if constexpr (T::isEvent_) {
0427 if (!m_worker->hasAcquire()) {
0428
0429 CMS_SA_ALLOW try {
0430
0431 m_worker->emitPostModuleEventPrefetchingSignal();
0432 } catch (...) {
0433 temp_excptr = std::current_exception();
0434 if (not excptr) {
0435 excptr = temp_excptr;
0436 }
0437 }
0438 }
0439 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0440 m_worker->emitPostModuleStreamPrefetchingSignal();
0441 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0442 m_worker->emitPostModuleGlobalPrefetchingSignal();
0443 }
0444
0445 if (not excptr) {
0446 if (auto queue = m_worker->serializeRunModule()) {
0447 auto f = [worker = m_worker,
0448 info = m_transitionInfo,
0449 streamID = m_streamID,
0450 parentContext = m_parentContext,
0451 sContext = m_context,
0452 serviceToken = m_serviceToken]() {
0453
0454 ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0455
0456
0457
0458
0459 EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0460 std::exception_ptr ptr;
0461 worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0462 };
0463
0464 auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0465 if (gQueue) {
0466 gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0467 gQueue->pause();
0468 queue.push(*group, std::move(f));
0469 });
0470 } else {
0471 queue.push(*m_group, std::move(f));
0472 }
0473 return;
0474 }
0475 }
0476
0477 m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0478 }
0479
0480 private:
0481 Worker* m_worker;
0482 typename T::TransitionInfoType m_transitionInfo;
0483 StreamID m_streamID;
0484 ParentContext const m_parentContext;
0485 typename T::Context const* m_context;
0486 ServiceWeakToken m_serviceToken;
0487 oneapi::tbb::task_group* m_group;
0488 };
0489
0490
0491
0492
0493
0494 template <typename T, typename DUMMY = void>
0495 class AcquireTask : public WaitingTask {
0496 public:
0497 AcquireTask(Worker*,
0498 typename T::TransitionInfoType const&,
0499 ServiceToken const&,
0500 ParentContext const&,
0501 WaitingTaskHolder) noexcept {}
0502 void execute() final {}
0503 };
0504
0505 template <typename DUMMY>
0506 class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0507 public:
0508 AcquireTask(Worker* worker,
0509 EventTransitionInfo const& eventTransitionInfo,
0510 ServiceToken const& token,
0511 ParentContext const& parentContext,
0512 WaitingTaskHolder holder) noexcept
0513 : m_worker(worker),
0514 m_eventTransitionInfo(eventTransitionInfo),
0515 m_parentContext(parentContext),
0516 m_holder(std::move(holder)),
0517 m_serviceToken(token) {}
0518
0519 void execute() final {
0520
0521 ServiceRegistry::Operate guard(m_serviceToken.lock());
0522
0523
0524
0525 std::exception_ptr temp_excptr;
0526 auto excptr = exceptionPtr();
0527
0528 CMS_SA_ALLOW try {
0529
0530 m_worker->emitPostModuleEventPrefetchingSignal();
0531 } catch (...) {
0532 temp_excptr = std::current_exception();
0533 if (not excptr) {
0534 excptr = temp_excptr;
0535 }
0536 }
0537
0538 if (not excptr) {
0539 if (auto queue = m_worker->serializeRunModule()) {
0540 queue.push(*m_holder.group(),
0541 [worker = m_worker,
0542 info = m_eventTransitionInfo,
0543 parentContext = m_parentContext,
0544 serviceToken = m_serviceToken,
0545 holder = std::move(m_holder)]() mutable {
0546
0547 ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0548
0549 std::exception_ptr ptr;
0550 worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, std::move(holder));
0551 });
0552 return;
0553 }
0554 }
0555
0556 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0557 }
0558
0559 private:
0560 Worker* m_worker;
0561 EventTransitionInfo m_eventTransitionInfo;
0562 ParentContext const m_parentContext;
0563 WaitingTaskHolder m_holder;
0564 ServiceWeakToken m_serviceToken;
0565 };
0566
0567
0568
0569
0570 class HandleExternalWorkExceptionTask : public WaitingTask {
0571 public:
0572 HandleExternalWorkExceptionTask(Worker* worker,
0573 oneapi::tbb::task_group* group,
0574 WaitingTask* runModuleTask,
0575 ParentContext const& parentContext) noexcept;
0576
0577 void execute() final;
0578
0579 private:
0580 Worker* m_worker;
0581 WaitingTask* m_runModuleTask;
0582 oneapi::tbb::task_group* m_group;
0583 ParentContext const m_parentContext;
0584 };
0585
0586 std::atomic<int> timesRun_;
0587 std::atomic<int> timesVisited_;
0588 std::atomic<int> timesPassed_;
0589 std::atomic<int> timesFailed_;
0590 std::atomic<int> timesExcept_;
0591 std::atomic<State> state_;
0592 int numberOfPathsOn_;
0593 std::atomic<int> numberOfPathsLeftToRun_;
0594
0595 ModuleCallingContext moduleCallingContext_;
0596
0597 ExceptionToActionTable const* actions_;
0598 CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;
0599
0600 std::shared_ptr<ActivityRegistry> actReg_;
0601
0602 edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0603
0604 edm::WaitingTaskList waitingTasks_;
0605 std::atomic<bool> workStarted_;
0606 bool ranAcquireWithoutException_;
0607 bool moduleValid_ = true;
0608 bool shouldTryToContinue_ = false;
0609 bool beginSucceeded_ = false;
0610 };
0611 namespace workerhelper {
0612 template <>
0613 class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0614 public:
0615 typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0616 static bool call(Worker* iWorker,
0617 StreamID,
0618 EventTransitionInfo const& info,
0619 ActivityRegistry* ,
0620 ModuleCallingContext const* mcc,
0621 Arg::Context const* ) {
0622
0623 return iWorker->implDo(info, mcc);
0624 }
0625 static void esPrefetchAsync(Worker* worker,
0626 WaitingTaskHolder waitingTask,
0627 ServiceToken const& token,
0628 EventTransitionInfo const& info,
0629 Transition transition) noexcept {
0630 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0631 }
0632 static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0633 static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0634
0635 static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0636 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0637 };
0638
0639 template <>
0640 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0641 public:
0642 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0643 static bool call(Worker* iWorker,
0644 StreamID,
0645 RunTransitionInfo const& info,
0646 ActivityRegistry* actReg,
0647 ModuleCallingContext const* mcc,
0648 Arg::Context const* context) {
0649 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0650
0651
0652 cpp.preModuleSignal();
0653
0654
0655 auto returnValue = iWorker->implDoBegin(info, mcc);
0656
0657 cpp.postModuleSignal();
0658 iWorker->beginSucceeded_ = true;
0659 return returnValue;
0660 }
0661 static void esPrefetchAsync(Worker* worker,
0662 WaitingTaskHolder waitingTask,
0663 ServiceToken const& token,
0664 RunTransitionInfo const& info,
0665 Transition transition) noexcept {
0666 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0667 }
0668 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0669 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0670 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0671 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0672 };
0673 template <>
0674 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0675 public:
0676 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0677 static bool call(Worker* iWorker,
0678 StreamID id,
0679 RunTransitionInfo const& info,
0680 ActivityRegistry* actReg,
0681 ModuleCallingContext const* mcc,
0682 Arg::Context const* context) {
0683 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0684 cpp.preModuleSignal();
0685 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0686 cpp.postModuleSignal();
0687 iWorker->beginSucceeded_ = true;
0688 return returnValue;
0689 }
0690 static void esPrefetchAsync(Worker* worker,
0691 WaitingTaskHolder waitingTask,
0692 ServiceToken const& token,
0693 RunTransitionInfo const& info,
0694 Transition transition) noexcept {
0695 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0696 }
0697 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0698 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0699 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0700 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0701 };
0702 template <>
0703 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0704 public:
0705 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0706 static bool call(Worker* iWorker,
0707 StreamID,
0708 RunTransitionInfo const& info,
0709 ActivityRegistry* actReg,
0710 ModuleCallingContext const* mcc,
0711 Arg::Context const* context) {
0712 if (iWorker->beginSucceeded_) {
0713 iWorker->beginSucceeded_ = false;
0714
0715 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0716 cpp.preModuleSignal();
0717 auto returnValue = iWorker->implDoEnd(info, mcc);
0718 cpp.postModuleSignal();
0719 return returnValue;
0720 }
0721 return true;
0722 }
0723 static void esPrefetchAsync(Worker* worker,
0724 WaitingTaskHolder waitingTask,
0725 ServiceToken const& token,
0726 RunTransitionInfo const& info,
0727 Transition transition) noexcept {
0728 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0729 }
0730 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0731 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0732 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0733 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0734 };
0735 template <>
0736 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0737 public:
0738 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0739 static bool call(Worker* iWorker,
0740 StreamID id,
0741 RunTransitionInfo const& info,
0742 ActivityRegistry* actReg,
0743 ModuleCallingContext const* mcc,
0744 Arg::Context const* context) {
0745 if (iWorker->beginSucceeded_) {
0746 iWorker->beginSucceeded_ = false;
0747
0748 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0749 cpp.preModuleSignal();
0750 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0751 cpp.postModuleSignal();
0752 return returnValue;
0753 }
0754 return true;
0755 }
0756 static void esPrefetchAsync(Worker* worker,
0757 WaitingTaskHolder waitingTask,
0758 ServiceToken const& token,
0759 RunTransitionInfo const& info,
0760 Transition transition) noexcept {
0761 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0762 }
0763 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0764 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0765 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0766 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0767 };
0768
0769 template <>
0770 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0771 public:
0772 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0773 static bool call(Worker* iWorker,
0774 StreamID,
0775 LumiTransitionInfo const& info,
0776 ActivityRegistry* actReg,
0777 ModuleCallingContext const* mcc,
0778 Arg::Context const* context) {
0779 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0780 cpp.preModuleSignal();
0781 auto returnValue = iWorker->implDoBegin(info, mcc);
0782 cpp.postModuleSignal();
0783 iWorker->beginSucceeded_ = true;
0784 return returnValue;
0785 }
0786 static void esPrefetchAsync(Worker* worker,
0787 WaitingTaskHolder waitingTask,
0788 ServiceToken const& token,
0789 LumiTransitionInfo const& info,
0790 Transition transition) noexcept {
0791 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0792 }
0793 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0794 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0795 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0796 return iWorker->globalLuminosityBlocksQueue();
0797 }
0798 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0799 };
0800 template <>
0801 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0802 public:
0803 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0804 static bool call(Worker* iWorker,
0805 StreamID id,
0806 LumiTransitionInfo const& info,
0807 ActivityRegistry* actReg,
0808 ModuleCallingContext const* mcc,
0809 Arg::Context const* context) {
0810 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0811 cpp.preModuleSignal();
0812 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0813 cpp.postModuleSignal();
0814 iWorker->beginSucceeded_ = true;
0815 return returnValue;
0816 }
0817 static void esPrefetchAsync(Worker* worker,
0818 WaitingTaskHolder waitingTask,
0819 ServiceToken const& token,
0820 LumiTransitionInfo const& info,
0821 Transition transition) noexcept {
0822 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0823 }
0824 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0825 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0826 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0827 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0828 };
0829
0830 template <>
0831 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0832 public:
0833 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0834 static bool call(Worker* iWorker,
0835 StreamID,
0836 LumiTransitionInfo const& info,
0837 ActivityRegistry* actReg,
0838 ModuleCallingContext const* mcc,
0839 Arg::Context const* context) {
0840 if (iWorker->beginSucceeded_) {
0841 iWorker->beginSucceeded_ = false;
0842
0843 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0844 cpp.preModuleSignal();
0845 auto returnValue = iWorker->implDoEnd(info, mcc);
0846 cpp.postModuleSignal();
0847 return returnValue;
0848 }
0849 return true;
0850 }
0851 static void esPrefetchAsync(Worker* worker,
0852 WaitingTaskHolder waitingTask,
0853 ServiceToken const& token,
0854 LumiTransitionInfo const& info,
0855 Transition transition) noexcept {
0856 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0857 }
0858 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0859 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0860 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0861 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0862 return iWorker->globalLuminosityBlocksQueue();
0863 }
0864 };
0865 template <>
0866 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0867 public:
0868 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0869 static bool call(Worker* iWorker,
0870 StreamID id,
0871 LumiTransitionInfo const& info,
0872 ActivityRegistry* actReg,
0873 ModuleCallingContext const* mcc,
0874 Arg::Context const* context) {
0875 if (iWorker->beginSucceeded_) {
0876 iWorker->beginSucceeded_ = false;
0877
0878 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0879 cpp.preModuleSignal();
0880 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0881 cpp.postModuleSignal();
0882 return returnValue;
0883 }
0884 return true;
0885 }
0886 static void esPrefetchAsync(Worker* worker,
0887 WaitingTaskHolder waitingTask,
0888 ServiceToken const& token,
0889 LumiTransitionInfo const& info,
0890 Transition transition) noexcept {
0891 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0892 }
0893 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0894 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0895 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0896 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0897 };
0898 template <>
0899 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0900 public:
0901 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0902 static bool call(Worker* iWorker,
0903 StreamID,
0904 ProcessBlockTransitionInfo const& info,
0905 ActivityRegistry* actReg,
0906 ModuleCallingContext const* mcc,
0907 Arg::Context const* context) {
0908 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0909 cpp.preModuleSignal();
0910 auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0911 cpp.postModuleSignal();
0912 iWorker->beginSucceeded_ = true;
0913 return returnValue;
0914 }
0915 static void esPrefetchAsync(
0916 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0917 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0918 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0919 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0920 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0921 };
0922 template <>
0923 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0924 public:
0925 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0926 static bool call(Worker* iWorker,
0927 StreamID,
0928 ProcessBlockTransitionInfo const& info,
0929 ActivityRegistry* actReg,
0930 ModuleCallingContext const* mcc,
0931 Arg::Context const* context) {
0932 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0933 cpp.preModuleSignal();
0934 auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0935 cpp.postModuleSignal();
0936 return returnValue;
0937 }
0938 static void esPrefetchAsync(
0939 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0940 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
0941 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0942 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0943 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0944 };
0945 template <>
0946 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
0947 public:
0948 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0949 static bool call(Worker* iWorker,
0950 StreamID,
0951 ProcessBlockTransitionInfo const& info,
0952 ActivityRegistry* actReg,
0953 ModuleCallingContext const* mcc,
0954 Arg::Context const* context) {
0955 if (iWorker->beginSucceeded_) {
0956 iWorker->beginSucceeded_ = false;
0957
0958 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0959 cpp.preModuleSignal();
0960 auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
0961 cpp.postModuleSignal();
0962 return returnValue;
0963 }
0964 return true;
0965 }
0966 static void esPrefetchAsync(
0967 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0968 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0969 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0970 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0971 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0972 };
0973 }
0974
0975 template <typename T>
0976 void Worker::prefetchAsync(WaitingTaskHolder iTask,
0977 ServiceToken const& token,
0978 ParentContext const& parentContext,
0979 typename T::TransitionInfoType const& transitionInfo,
0980 Transition iTransition) noexcept {
0981 Principal const& principal = transitionInfo.principal();
0982
0983 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0984
0985 if constexpr (T::isEvent_) {
0986 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0987 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0988 actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0989 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0990 actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
0991 }
0992
0993 workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
0994 edPrefetchAsync(iTask, token, principal);
0995
0996 if (principal.branchType() == InEvent) {
0997 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
0998 }
0999 }
1000
1001 template <typename T>
1002 void Worker::doWorkAsync(WaitingTaskHolder task,
1003 typename T::TransitionInfoType const& transitionInfo,
1004 ServiceToken const& token,
1005 StreamID streamID,
1006 ParentContext const& parentContext,
1007 typename T::Context const* context) noexcept {
1008 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1009 return;
1010 }
1011
1012
1013 bool expected = false;
1014 bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1015
1016 waitingTasks_.add(task);
1017 if constexpr (T::isEvent_) {
1018 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1019 }
1020
1021 if (workStarted) {
1022 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1023
1024
1025 if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1026
1027
1028 WaitingTask* moduleTask =
1029 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1030
1031
1032 struct DestroyTask {
1033 DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1034
1035 ~DestroyTask() noexcept {
1036 auto p = m_task.exchange(nullptr);
1037 if (p) {
1038 TaskSentry s{p};
1039 }
1040 }
1041
1042 edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1043
1044 private:
1045 std::atomic<edm::WaitingTask*> m_task;
1046 };
1047 if constexpr (T::isEvent_) {
1048 if (hasAcquire()) {
1049 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1050 ServiceWeakToken weakToken = token;
1051 auto* group = task.group();
1052 moduleTask = make_waiting_task(
1053 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1054 WaitingTaskHolder runTaskHolder(
1055 *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1056 AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1057 t.execute();
1058 });
1059 }
1060 }
1061 auto* group = task.group();
1062 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1063 ServiceWeakToken weakToken = token;
1064 auto selectionTask =
1065 make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1066 std::exception_ptr const*) mutable {
1067 ServiceRegistry::Operate guard(weakToken.lock());
1068 prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1069 weakToken.lock(),
1070 parentContext,
1071 info,
1072 T::transition_);
1073 });
1074 prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1075 } else {
1076 WaitingTask* moduleTask =
1077 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1078 auto group = task.group();
1079 if constexpr (T::isEvent_) {
1080 if (hasAcquire()) {
1081 WaitingTaskHolder runTaskHolder(
1082 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1083 moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1084 }
1085 }
1086 prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1087 }
1088 }
1089 }
1090
1091 template <typename T>
1092 std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1093 typename T::TransitionInfoType const& transitionInfo,
1094 StreamID streamID,
1095 ParentContext const& parentContext,
1096 typename T::Context const* context) noexcept {
1097 std::exception_ptr exceptionPtr;
1098 bool shouldRun = true;
1099 if (iEPtr) {
1100 if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1101 exceptionPtr = iEPtr;
1102 setException<T::isEvent_>(exceptionPtr);
1103 shouldRun = false;
1104 } else {
1105 if (not shouldTryToContinue_) {
1106 setPassed<T::isEvent_>();
1107 shouldRun = false;
1108 }
1109 }
1110 }
1111 if (shouldRun) {
1112
1113 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1114 exceptionPtr = std::current_exception();
1115 }
1116 } else {
1117 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1118 }
1119 waitingTasks_.doneWaiting(exceptionPtr);
1120 return exceptionPtr;
1121 }
1122
1123 template <typename T>
1124 void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1125 typename T::TransitionInfoType const& transitionInfo,
1126 ServiceToken const& serviceToken,
1127 StreamID streamID,
1128 ParentContext const& parentContext,
1129 typename T::Context const* context) noexcept {
1130 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1131 return;
1132 }
1133
1134
1135 bool expected = false;
1136 auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1137
1138 waitingTasks_.add(task);
1139 if (workStarted) {
1140 ServiceWeakToken weakToken = serviceToken;
1141 auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1142 std::exception_ptr exceptionPtr;
1143
1144 CMS_SA_ALLOW try {
1145
1146 ServiceRegistry::Operate guard(weakToken.lock());
1147
1148 this->runModule<T>(info, streamID, parentContext, context);
1149 } catch (...) {
1150 exceptionPtr = std::current_exception();
1151 }
1152 this->waitingTasks_.doneWaiting(exceptionPtr);
1153 };
1154
1155 if (needsESPrefetching(T::transition_)) {
1156 auto group = task.group();
1157 auto afterPrefetch =
1158 edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1159 if (iExcept) {
1160 this->waitingTasks_.doneWaiting(*iExcept);
1161 } else {
1162 if (auto queue = this->serializeRunModule()) {
1163 queue.push(*group, toDo);
1164 } else {
1165 group->run(toDo);
1166 }
1167 }
1168 });
1169 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1170 esPrefetchAsync(
1171 WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1172 } else {
1173 auto group = task.group();
1174 if (auto queue = this->serializeRunModule()) {
1175 queue.push(*group, toDo);
1176 } else {
1177 group->run(toDo);
1178 }
1179 }
1180 }
1181 }
1182
1183 template <typename T>
1184 bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1185 StreamID streamID,
1186 ParentContext const& parentContext,
1187 typename T::Context const* context) {
1188
1189
1190
1191
1192 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1193 if constexpr (T::isEvent_) {
1194 timesRun_.fetch_add(1, std::memory_order_relaxed);
1195 }
1196
1197 bool rc = true;
1198 try {
1199 convertException::wrap([&]() {
1200 rc = workerhelper::CallImpl<T>::call(
1201 this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1202
1203 if (rc) {
1204 setPassed<T::isEvent_>();
1205 } else {
1206 setFailed<T::isEvent_>();
1207 }
1208 });
1209 } catch (cms::Exception& ex) {
1210 edm::exceptionContext(ex, moduleCallingContext_);
1211 if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1212 assert(not cached_exception_);
1213 setException<T::isEvent_>(std::current_exception());
1214 std::rethrow_exception(cached_exception_);
1215 } else {
1216 rc = setPassed<T::isEvent_>();
1217 }
1218 }
1219
1220 return rc;
1221 }
1222
1223 template <typename T>
1224 std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1225 StreamID streamID,
1226 ParentContext const& parentContext,
1227 typename T::Context const* context) noexcept {
1228 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1229 std::exception_ptr prefetchingException;
1230 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1231 }
1232 }
1233 #endif