File indexing completed on 2024-07-28 22:48:21
0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0047 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0048 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0049 #include "FWCore/Concurrency/interface/FunctorTask.h"
0050 #include "FWCore/Utilities/interface/Exception.h"
0051 #include "FWCore/Utilities/interface/ConvertException.h"
0052 #include "FWCore/Utilities/interface/BranchType.h"
0053 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0054 #include "FWCore/Utilities/interface/StreamID.h"
0055 #include "FWCore/Utilities/interface/propagate_const.h"
0056 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0057 #include "FWCore/Utilities/interface/ESIndices.h"
0058 #include "FWCore/Utilities/interface/Transition.h"
0059
0060 #include "FWCore/Framework/interface/Frameworkfwd.h"
0061
0062 #include <array>
0063 #include <atomic>
0064 #include <cassert>
0065 #include <map>
0066 #include <memory>
0067 #include <sstream>
0068 #include <string>
0069 #include <vector>
0070 #include <exception>
0071 #include <unordered_map>
0072
0073 namespace edm {
0074 class EventPrincipal;
0075 class EventSetupImpl;
0076 class EarlyDeleteHelper;
0077 class ModuleProcessName;
0078 class ProductResolverIndexHelper;
0079 class ProductResolverIndexAndSkipBit;
0080 class ProductRegistry;
0081 class ThinnedAssociationsHelper;
0082
0083 namespace workerhelper {
0084 template <typename O>
0085 class CallImpl;
0086 }
0087 namespace eventsetup {
0088 class ESRecordsToProductResolverIndices;
0089 }
0090
0091 class Worker {
0092 public:
0093 enum State { Ready, Pass, Fail, Exception };
0094 enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0095 enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0096 struct TaskQueueAdaptor {
0097 SerialTaskQueueChain* serial_ = nullptr;
0098 LimitedTaskQueue* limited_ = nullptr;
0099
0100 TaskQueueAdaptor() = default;
0101 TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0102 TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0103
0104 operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0105
0106 template <class F>
0107 void push(oneapi::tbb::task_group& iG, F&& iF) {
0108 if (serial_) {
0109 serial_->push(iG, iF);
0110 } else {
0111 limited_->push(iG, iF);
0112 }
0113 }
0114 };
0115
0116 Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0117 virtual ~Worker();
0118
0119 Worker(Worker const&) = delete;
0120 Worker& operator=(Worker const&) = delete;
0121
0122 void clearModule() {
0123 moduleValid_ = false;
0124 doClearModule();
0125 }
0126
0127 virtual bool wantsProcessBlocks() const noexcept = 0;
0128 virtual bool wantsInputProcessBlocks() const noexcept = 0;
0129 virtual bool wantsGlobalRuns() const noexcept = 0;
0130 virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0131 virtual bool wantsStreamRuns() const noexcept = 0;
0132 virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0133
0134 virtual SerialTaskQueue* globalRunsQueue() = 0;
0135 virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0136
0137 void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0138 WaitingTask* task,
0139 ServiceToken const&,
0140 StreamID stream,
0141 EventPrincipal const*) noexcept;
0142
0143 void prePrefetchSelectionAsync(
0144 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0145 assert(false);
0146 }
0147
0148 template <typename T>
0149 void doWorkAsync(WaitingTaskHolder,
0150 typename T::TransitionInfoType const&,
0151 ServiceToken const&,
0152 StreamID,
0153 ParentContext const&,
0154 typename T::Context const*) noexcept;
0155
0156 template <typename T>
0157 void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0158 typename T::TransitionInfoType const&,
0159 ServiceToken const&,
0160 StreamID,
0161 ParentContext const&,
0162 typename T::Context const*) noexcept;
0163
0164 template <typename T>
0165 std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0166 StreamID,
0167 ParentContext const&,
0168 typename T::Context const*) noexcept;
0169
0170 virtual size_t transformIndex(edm::BranchDescription const&) const noexcept = 0;
0171 void doTransformAsync(WaitingTaskHolder,
0172 size_t iTransformIndex,
0173 EventPrincipal const&,
0174 ServiceToken const&,
0175 StreamID,
0176 ModuleCallingContext const&,
0177 StreamContext const*) noexcept;
0178
0179 void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0180 void skipOnPath(EventPrincipal const& iEvent);
0181 void beginJob(GlobalContext const&);
0182 void endJob(GlobalContext const&);
0183 void beginStream(StreamID, StreamContext const&);
0184 void endStream(StreamID, StreamContext const&);
0185 void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0186 void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0187 void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0188 void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0189
0190 void reset() {
0191 cached_exception_ = std::exception_ptr();
0192 state_ = Ready;
0193 waitingTasks_.reset();
0194 workStarted_ = false;
0195 numberOfPathsLeftToRun_ = numberOfPathsOn_;
0196 }
0197
0198 void postDoEvent(EventPrincipal const&);
0199
0200 ModuleDescription const* description() const noexcept {
0201 if (moduleValid_) {
0202 return moduleCallingContext_.moduleDescription();
0203 }
0204 return nullptr;
0205 }
0206
0207
0208 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0209
0210 void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0211
0212
0213 virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0214 virtual void updateLookup(eventsetup::ESRecordsToProductResolverIndices const&) = 0;
0215 virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0216 virtual void resolvePutIndicies(
0217 BranchType iBranchType,
0218 std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0219 iIndicies) = 0;
0220
0221 virtual void modulesWhoseProductsAreConsumed(
0222 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0223 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0224 ProductRegistry const& preg,
0225 std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0226
0227 virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0228
0229 virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0230
0231 virtual Types moduleType() const = 0;
0232 virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0233
0234 void clearCounters() noexcept {
0235 timesRun_.store(0, std::memory_order_release);
0236 timesVisited_.store(0, std::memory_order_release);
0237 timesPassed_.store(0, std::memory_order_release);
0238 timesFailed_.store(0, std::memory_order_release);
0239 timesExcept_.store(0, std::memory_order_release);
0240 }
0241
0242 void addedToPath() noexcept { ++numberOfPathsOn_; }
0243
0244 int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0245 int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0246 int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0247 int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0248 int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0249 State state() const noexcept { return state_; }
0250
0251 int timesPass() const noexcept { return timesPassed(); }
0252
0253 virtual bool hasAccumulator() const noexcept = 0;
0254
0255
0256 edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0257
0258 protected:
0259 template <typename O>
0260 friend class workerhelper::CallImpl;
0261
0262 virtual void doClearModule() = 0;
0263
0264 virtual std::string workerType() const = 0;
0265 virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0266
0267 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0268 virtual bool implNeedToRunSelection() const noexcept = 0;
0269
0270 virtual void implDoAcquire(EventTransitionInfo const&,
0271 ModuleCallingContext const*,
0272 WaitingTaskWithArenaHolder&) = 0;
0273
0274 virtual void implDoTransformAsync(WaitingTaskHolder,
0275 size_t iTransformIndex,
0276 EventPrincipal const&,
0277 ParentContext const&,
0278 ServiceWeakToken const&) noexcept = 0;
0279 virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0280
0281 virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0282 virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0283 virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0284 virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0285 virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0286 virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0287 virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0288 virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0289 virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0290 virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0291 virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0292 virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0293 virtual void implBeginJob() = 0;
0294 virtual void implEndJob() = 0;
0295 virtual void implBeginStream(StreamID) = 0;
0296 virtual void implEndStream(StreamID) = 0;
0297
0298 void resetModuleDescription(ModuleDescription const*);
0299
0300 ActivityRegistry* activityRegistry() { return actReg_.get(); }
0301
0302 private:
0303 template <typename T>
0304 bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0305
0306 virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0307 virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0308
0309 virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0310
0311 virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0312 virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0313
0314 virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0315 ModuleCallingContext const& moduleCallingContext,
0316 Principal const& iPrincipal) const noexcept = 0;
0317
0318 virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0319 virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0320 virtual void implRespondToCloseOutputFile() = 0;
0321
0322 virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0323
0324 virtual TaskQueueAdaptor serializeRunModule() = 0;
0325
0326 bool shouldRethrowException(std::exception_ptr iPtr,
0327 ParentContext const& parentContext,
0328 bool isEvent,
0329 bool isTryToContinue) const noexcept;
0330 void checkForShouldTryToContinue(ModuleDescription const&);
0331
0332 template <bool IS_EVENT>
0333 bool setPassed() {
0334 if (IS_EVENT) {
0335 timesPassed_.fetch_add(1, std::memory_order_relaxed);
0336 }
0337 state_ = Pass;
0338 return true;
0339 }
0340
0341 template <bool IS_EVENT>
0342 bool setFailed() {
0343 if (IS_EVENT) {
0344 timesFailed_.fetch_add(1, std::memory_order_relaxed);
0345 }
0346 state_ = Fail;
0347 return false;
0348 }
0349
0350 template <bool IS_EVENT>
0351 std::exception_ptr setException(std::exception_ptr iException) {
0352 if (IS_EVENT) {
0353 timesExcept_.fetch_add(1, std::memory_order_relaxed);
0354 }
0355 cached_exception_ = iException;
0356 state_ = Exception;
0357 return cached_exception_;
0358 }
0359
0360 template <typename T>
0361 void prefetchAsync(WaitingTaskHolder,
0362 ServiceToken const&,
0363 ParentContext const&,
0364 typename T::TransitionInfoType const&,
0365 Transition) noexcept;
0366
0367 void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0368 void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0369
0370 bool needsESPrefetching(Transition iTrans) const noexcept {
0371 return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0372 }
0373
0374 void emitPostModuleEventPrefetchingSignal() {
0375 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0376 }
0377
0378 void emitPostModuleStreamPrefetchingSignal() {
0379 actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0380 moduleCallingContext_);
0381 }
0382
0383 void emitPostModuleGlobalPrefetchingSignal() {
0384 actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0385 moduleCallingContext_);
0386 }
0387
0388 virtual bool hasAcquire() const noexcept = 0;
0389
0390 template <typename T>
0391 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0392 typename T::TransitionInfoType const&,
0393 StreamID,
0394 ParentContext const&,
0395 typename T::Context const*) noexcept;
0396
0397 void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0398
0399 void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0400 EventTransitionInfo const&,
0401 ParentContext const&,
0402 WaitingTaskWithArenaHolder) noexcept;
0403
0404 std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0405 ParentContext const& parentContext) noexcept;
0406
0407 template <typename T>
0408 class RunModuleTask : public WaitingTask {
0409 public:
0410 RunModuleTask(Worker* worker,
0411 typename T::TransitionInfoType const& transitionInfo,
0412 ServiceToken const& token,
0413 StreamID streamID,
0414 ParentContext const& parentContext,
0415 typename T::Context const* context,
0416 oneapi::tbb::task_group* iGroup) noexcept
0417 : m_worker(worker),
0418 m_transitionInfo(transitionInfo),
0419 m_streamID(streamID),
0420 m_parentContext(parentContext),
0421 m_context(context),
0422 m_serviceToken(token),
0423 m_group(iGroup) {}
0424
0425 struct EnableQueueGuard {
0426 SerialTaskQueue* queue_;
0427 EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0428 EnableQueueGuard(EnableQueueGuard const&) = delete;
0429 EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0430 EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0431 EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0432 ~EnableQueueGuard() {
0433 if (queue_) {
0434 queue_->resume();
0435 }
0436 }
0437 };
0438
0439 void execute() final {
0440
0441 ServiceRegistry::Operate guard(m_serviceToken.lock());
0442
0443
0444
0445 std::exception_ptr temp_excptr;
0446 auto excptr = exceptionPtr();
0447 if constexpr (T::isEvent_) {
0448 if (!m_worker->hasAcquire()) {
0449
0450 CMS_SA_ALLOW try {
0451
0452 m_worker->emitPostModuleEventPrefetchingSignal();
0453 } catch (...) {
0454 temp_excptr = std::current_exception();
0455 if (not excptr) {
0456 excptr = temp_excptr;
0457 }
0458 }
0459 }
0460 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0461 m_worker->emitPostModuleStreamPrefetchingSignal();
0462 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0463 m_worker->emitPostModuleGlobalPrefetchingSignal();
0464 }
0465
0466 if (not excptr) {
0467 if (auto queue = m_worker->serializeRunModule()) {
0468 auto f = [worker = m_worker,
0469 info = m_transitionInfo,
0470 streamID = m_streamID,
0471 parentContext = m_parentContext,
0472 sContext = m_context,
0473 serviceToken = m_serviceToken]() {
0474
0475 ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0476
0477
0478
0479
0480 EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0481 std::exception_ptr ptr;
0482 worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0483 };
0484
0485 auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0486 if (gQueue) {
0487 gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0488 gQueue->pause();
0489 queue.push(*group, std::move(f));
0490 });
0491 } else {
0492 queue.push(*m_group, std::move(f));
0493 }
0494 return;
0495 }
0496 }
0497
0498 m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0499 }
0500
0501 private:
0502 Worker* m_worker;
0503 typename T::TransitionInfoType m_transitionInfo;
0504 StreamID m_streamID;
0505 ParentContext const m_parentContext;
0506 typename T::Context const* m_context;
0507 ServiceWeakToken m_serviceToken;
0508 oneapi::tbb::task_group* m_group;
0509 };
0510
0511
0512
0513
0514
0515 template <typename T, typename DUMMY = void>
0516 class AcquireTask : public WaitingTask {
0517 public:
0518 AcquireTask(Worker*,
0519 typename T::TransitionInfoType const&,
0520 ServiceToken const&,
0521 ParentContext const&,
0522 WaitingTaskWithArenaHolder) noexcept {}
0523 void execute() final {}
0524 };
0525
0526 template <typename DUMMY>
0527 class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0528 public:
0529 AcquireTask(Worker* worker,
0530 EventTransitionInfo const& eventTransitionInfo,
0531 ServiceToken const& token,
0532 ParentContext const& parentContext,
0533 WaitingTaskWithArenaHolder holder) noexcept
0534 : m_worker(worker),
0535 m_eventTransitionInfo(eventTransitionInfo),
0536 m_parentContext(parentContext),
0537 m_holder(std::move(holder)),
0538 m_serviceToken(token) {}
0539
0540 void execute() final {
0541
0542 ServiceRegistry::Operate guard(m_serviceToken.lock());
0543
0544
0545
0546 std::exception_ptr temp_excptr;
0547 auto excptr = exceptionPtr();
0548
0549 CMS_SA_ALLOW try {
0550
0551 m_worker->emitPostModuleEventPrefetchingSignal();
0552 } catch (...) {
0553 temp_excptr = std::current_exception();
0554 if (not excptr) {
0555 excptr = temp_excptr;
0556 }
0557 }
0558
0559 if (not excptr) {
0560 if (auto queue = m_worker->serializeRunModule()) {
0561 queue.push(*m_holder.group(),
0562 [worker = m_worker,
0563 info = m_eventTransitionInfo,
0564 parentContext = m_parentContext,
0565 serviceToken = m_serviceToken,
0566 holder = m_holder]() {
0567
0568 ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0569
0570 std::exception_ptr ptr;
0571 worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0572 });
0573 return;
0574 }
0575 }
0576
0577 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0578 }
0579
0580 private:
0581 Worker* m_worker;
0582 EventTransitionInfo m_eventTransitionInfo;
0583 ParentContext const m_parentContext;
0584 WaitingTaskWithArenaHolder m_holder;
0585 ServiceWeakToken m_serviceToken;
0586 };
0587
0588
0589
0590
0591 class HandleExternalWorkExceptionTask : public WaitingTask {
0592 public:
0593 HandleExternalWorkExceptionTask(Worker* worker,
0594 oneapi::tbb::task_group* group,
0595 WaitingTask* runModuleTask,
0596 ParentContext const& parentContext) noexcept;
0597
0598 void execute() final;
0599
0600 private:
0601 Worker* m_worker;
0602 WaitingTask* m_runModuleTask;
0603 oneapi::tbb::task_group* m_group;
0604 ParentContext const m_parentContext;
0605 };
0606
0607 std::atomic<int> timesRun_;
0608 std::atomic<int> timesVisited_;
0609 std::atomic<int> timesPassed_;
0610 std::atomic<int> timesFailed_;
0611 std::atomic<int> timesExcept_;
0612 std::atomic<State> state_;
0613 int numberOfPathsOn_;
0614 std::atomic<int> numberOfPathsLeftToRun_;
0615
0616 ModuleCallingContext moduleCallingContext_;
0617
0618 ExceptionToActionTable const* actions_;
0619 CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;
0620
0621 std::shared_ptr<ActivityRegistry> actReg_;
0622
0623 edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0624
0625 edm::WaitingTaskList waitingTasks_;
0626 std::atomic<bool> workStarted_;
0627 bool ranAcquireWithoutException_;
0628 bool moduleValid_ = true;
0629 bool shouldTryToContinue_ = false;
0630 bool beginSucceeded_ = false;
0631 };
0632
0633 namespace {
0634 template <typename T>
0635 class ModuleSignalSentry {
0636 public:
0637 ModuleSignalSentry(ActivityRegistry* a,
0638 typename T::Context const* context,
0639 ModuleCallingContext const* moduleCallingContext)
0640 : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
0641
0642 ~ModuleSignalSentry() {
0643
0644
0645
0646
0647 CMS_SA_ALLOW try {
0648 if (a_) {
0649 T::postModuleSignal(a_, context_, moduleCallingContext_);
0650 }
0651 } catch (...) {
0652 }
0653 }
0654 void preModuleSignal() {
0655 if (a_) {
0656 try {
0657 convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
0658 } catch (cms::Exception& ex) {
0659 ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
0660 throw;
0661 }
0662 }
0663 }
0664 void postModuleSignal() {
0665 if (a_) {
0666 auto temp = a_;
0667
0668
0669 a_ = nullptr;
0670 try {
0671 convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
0672 } catch (cms::Exception& ex) {
0673 ex.addContext("Handling post module signal, likely in a service function immediately after module method");
0674 throw;
0675 }
0676 }
0677 }
0678
0679 private:
0680 ActivityRegistry* a_;
0681 typename T::Context const* context_;
0682 ModuleCallingContext const* moduleCallingContext_;
0683 };
0684
0685 }
0686
0687 namespace workerhelper {
0688 template <>
0689 class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0690 public:
0691 typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0692 static bool call(Worker* iWorker,
0693 StreamID,
0694 EventTransitionInfo const& info,
0695 ActivityRegistry* ,
0696 ModuleCallingContext const* mcc,
0697 Arg::Context const* ) {
0698
0699 return iWorker->implDo(info, mcc);
0700 }
0701 static void esPrefetchAsync(Worker* worker,
0702 WaitingTaskHolder waitingTask,
0703 ServiceToken const& token,
0704 EventTransitionInfo const& info,
0705 Transition transition) noexcept {
0706 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0707 }
0708 static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0709 static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0710
0711 static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0712 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0713 };
0714
0715 template <>
0716 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0717 public:
0718 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0719 static bool call(Worker* iWorker,
0720 StreamID,
0721 RunTransitionInfo const& info,
0722 ActivityRegistry* actReg,
0723 ModuleCallingContext const* mcc,
0724 Arg::Context const* context) {
0725 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0726
0727
0728 cpp.preModuleSignal();
0729
0730
0731 auto returnValue = iWorker->implDoBegin(info, mcc);
0732
0733 cpp.postModuleSignal();
0734 iWorker->beginSucceeded_ = true;
0735 return returnValue;
0736 }
0737 static void esPrefetchAsync(Worker* worker,
0738 WaitingTaskHolder waitingTask,
0739 ServiceToken const& token,
0740 RunTransitionInfo const& info,
0741 Transition transition) noexcept {
0742 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0743 }
0744 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0745 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0746 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0747 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0748 };
0749 template <>
0750 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0751 public:
0752 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0753 static bool call(Worker* iWorker,
0754 StreamID id,
0755 RunTransitionInfo const& info,
0756 ActivityRegistry* actReg,
0757 ModuleCallingContext const* mcc,
0758 Arg::Context const* context) {
0759 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0760 cpp.preModuleSignal();
0761 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0762 cpp.postModuleSignal();
0763 iWorker->beginSucceeded_ = true;
0764 return returnValue;
0765 }
0766 static void esPrefetchAsync(Worker* worker,
0767 WaitingTaskHolder waitingTask,
0768 ServiceToken const& token,
0769 RunTransitionInfo const& info,
0770 Transition transition) noexcept {
0771 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0772 }
0773 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0774 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0775 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0776 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0777 };
0778 template <>
0779 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0780 public:
0781 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0782 static bool call(Worker* iWorker,
0783 StreamID,
0784 RunTransitionInfo const& info,
0785 ActivityRegistry* actReg,
0786 ModuleCallingContext const* mcc,
0787 Arg::Context const* context) {
0788 if (iWorker->beginSucceeded_) {
0789 iWorker->beginSucceeded_ = false;
0790
0791 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0792 cpp.preModuleSignal();
0793 auto returnValue = iWorker->implDoEnd(info, mcc);
0794 cpp.postModuleSignal();
0795 return returnValue;
0796 }
0797 return true;
0798 }
0799 static void esPrefetchAsync(Worker* worker,
0800 WaitingTaskHolder waitingTask,
0801 ServiceToken const& token,
0802 RunTransitionInfo const& info,
0803 Transition transition) noexcept {
0804 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0805 }
0806 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0807 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0808 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0809 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0810 };
0811 template <>
0812 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0813 public:
0814 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0815 static bool call(Worker* iWorker,
0816 StreamID id,
0817 RunTransitionInfo const& info,
0818 ActivityRegistry* actReg,
0819 ModuleCallingContext const* mcc,
0820 Arg::Context const* context) {
0821 if (iWorker->beginSucceeded_) {
0822 iWorker->beginSucceeded_ = false;
0823
0824 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0825 cpp.preModuleSignal();
0826 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0827 cpp.postModuleSignal();
0828 return returnValue;
0829 }
0830 return true;
0831 }
0832 static void esPrefetchAsync(Worker* worker,
0833 WaitingTaskHolder waitingTask,
0834 ServiceToken const& token,
0835 RunTransitionInfo const& info,
0836 Transition transition) noexcept {
0837 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0838 }
0839 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0840 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0841 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0842 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0843 };
0844
0845 template <>
0846 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0847 public:
0848 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0849 static bool call(Worker* iWorker,
0850 StreamID,
0851 LumiTransitionInfo const& info,
0852 ActivityRegistry* actReg,
0853 ModuleCallingContext const* mcc,
0854 Arg::Context const* context) {
0855 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0856 cpp.preModuleSignal();
0857 auto returnValue = iWorker->implDoBegin(info, mcc);
0858 cpp.postModuleSignal();
0859 iWorker->beginSucceeded_ = true;
0860 return returnValue;
0861 }
0862 static void esPrefetchAsync(Worker* worker,
0863 WaitingTaskHolder waitingTask,
0864 ServiceToken const& token,
0865 LumiTransitionInfo const& info,
0866 Transition transition) noexcept {
0867 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0868 }
0869 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0870 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0871 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0872 return iWorker->globalLuminosityBlocksQueue();
0873 }
0874 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0875 };
0876 template <>
0877 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0878 public:
0879 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0880 static bool call(Worker* iWorker,
0881 StreamID id,
0882 LumiTransitionInfo const& info,
0883 ActivityRegistry* actReg,
0884 ModuleCallingContext const* mcc,
0885 Arg::Context const* context) {
0886 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0887 cpp.preModuleSignal();
0888 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0889 cpp.postModuleSignal();
0890 iWorker->beginSucceeded_ = true;
0891 return returnValue;
0892 }
0893 static void esPrefetchAsync(Worker* worker,
0894 WaitingTaskHolder waitingTask,
0895 ServiceToken const& token,
0896 LumiTransitionInfo const& info,
0897 Transition transition) noexcept {
0898 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0899 }
0900 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0901 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0902 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0903 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0904 };
0905
0906 template <>
0907 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0908 public:
0909 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0910 static bool call(Worker* iWorker,
0911 StreamID,
0912 LumiTransitionInfo const& info,
0913 ActivityRegistry* actReg,
0914 ModuleCallingContext const* mcc,
0915 Arg::Context const* context) {
0916 if (iWorker->beginSucceeded_) {
0917 iWorker->beginSucceeded_ = false;
0918
0919 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0920 cpp.preModuleSignal();
0921 auto returnValue = iWorker->implDoEnd(info, mcc);
0922 cpp.postModuleSignal();
0923 return returnValue;
0924 }
0925 return true;
0926 }
0927 static void esPrefetchAsync(Worker* worker,
0928 WaitingTaskHolder waitingTask,
0929 ServiceToken const& token,
0930 LumiTransitionInfo const& info,
0931 Transition transition) noexcept {
0932 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0933 }
0934 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0935 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0936 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0937 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0938 return iWorker->globalLuminosityBlocksQueue();
0939 }
0940 };
0941 template <>
0942 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0943 public:
0944 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0945 static bool call(Worker* iWorker,
0946 StreamID id,
0947 LumiTransitionInfo const& info,
0948 ActivityRegistry* actReg,
0949 ModuleCallingContext const* mcc,
0950 Arg::Context const* context) {
0951 if (iWorker->beginSucceeded_) {
0952 iWorker->beginSucceeded_ = false;
0953
0954 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0955 cpp.preModuleSignal();
0956 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0957 cpp.postModuleSignal();
0958 return returnValue;
0959 }
0960 return true;
0961 }
0962 static void esPrefetchAsync(Worker* worker,
0963 WaitingTaskHolder waitingTask,
0964 ServiceToken const& token,
0965 LumiTransitionInfo const& info,
0966 Transition transition) noexcept {
0967 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0968 }
0969 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0970 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0971 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0972 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0973 };
0974 template <>
0975 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0976 public:
0977 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0978 static bool call(Worker* iWorker,
0979 StreamID,
0980 ProcessBlockTransitionInfo const& info,
0981 ActivityRegistry* actReg,
0982 ModuleCallingContext const* mcc,
0983 Arg::Context const* context) {
0984 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0985 cpp.preModuleSignal();
0986 auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0987 cpp.postModuleSignal();
0988 iWorker->beginSucceeded_ = true;
0989 return returnValue;
0990 }
0991 static void esPrefetchAsync(
0992 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0993 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0994 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0995 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0996 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0997 };
0998 template <>
0999 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
1000 public:
1001 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1002 static bool call(Worker* iWorker,
1003 StreamID,
1004 ProcessBlockTransitionInfo const& info,
1005 ActivityRegistry* actReg,
1006 ModuleCallingContext const* mcc,
1007 Arg::Context const* context) {
1008 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1009 cpp.preModuleSignal();
1010 auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
1011 cpp.postModuleSignal();
1012 return returnValue;
1013 }
1014 static void esPrefetchAsync(
1015 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1016 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1017 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1018 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1019 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1020 };
1021 template <>
1022 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
1023 public:
1024 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1025 static bool call(Worker* iWorker,
1026 StreamID,
1027 ProcessBlockTransitionInfo const& info,
1028 ActivityRegistry* actReg,
1029 ModuleCallingContext const* mcc,
1030 Arg::Context const* context) {
1031 if (iWorker->beginSucceeded_) {
1032 iWorker->beginSucceeded_ = false;
1033
1034 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1035 cpp.preModuleSignal();
1036 auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1037 cpp.postModuleSignal();
1038 return returnValue;
1039 }
1040 return true;
1041 }
1042 static void esPrefetchAsync(
1043 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1044 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1045 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1046 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1047 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1048 };
1049 }
1050
1051 template <typename T>
1052 void Worker::prefetchAsync(WaitingTaskHolder iTask,
1053 ServiceToken const& token,
1054 ParentContext const& parentContext,
1055 typename T::TransitionInfoType const& transitionInfo,
1056 Transition iTransition) noexcept {
1057 Principal const& principal = transitionInfo.principal();
1058
1059 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1060
1061 if constexpr (T::isEvent_) {
1062 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1063 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1064 actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1065 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1066 actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1067 }
1068
1069 workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1070 edPrefetchAsync(iTask, token, principal);
1071
1072 if (principal.branchType() == InEvent) {
1073 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1074 }
1075 }
1076
1077 template <typename T>
1078 void Worker::doWorkAsync(WaitingTaskHolder task,
1079 typename T::TransitionInfoType const& transitionInfo,
1080 ServiceToken const& token,
1081 StreamID streamID,
1082 ParentContext const& parentContext,
1083 typename T::Context const* context) noexcept {
1084 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1085 return;
1086 }
1087
1088
1089 bool expected = false;
1090 bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1091
1092 waitingTasks_.add(task);
1093 if constexpr (T::isEvent_) {
1094 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1095 }
1096
1097 if (workStarted) {
1098 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1099
1100
1101 if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1102
1103
1104 WaitingTask* moduleTask =
1105 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1106
1107
1108 struct DestroyTask {
1109 DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1110
1111 ~DestroyTask() noexcept {
1112 auto p = m_task.exchange(nullptr);
1113 if (p) {
1114 TaskSentry s{p};
1115 }
1116 }
1117
1118 edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1119
1120 private:
1121 std::atomic<edm::WaitingTask*> m_task;
1122 };
1123 if constexpr (T::isEvent_) {
1124 if (hasAcquire()) {
1125 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1126 ServiceWeakToken weakToken = token;
1127 auto* group = task.group();
1128 moduleTask = make_waiting_task(
1129 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1130 WaitingTaskWithArenaHolder runTaskHolder(
1131 *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1132 AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1133 t.execute();
1134 });
1135 }
1136 }
1137 auto* group = task.group();
1138 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1139 ServiceWeakToken weakToken = token;
1140 auto selectionTask =
1141 make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1142 std::exception_ptr const*) mutable {
1143 ServiceRegistry::Operate guard(weakToken.lock());
1144 prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1145 weakToken.lock(),
1146 parentContext,
1147 info,
1148 T::transition_);
1149 });
1150 prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1151 } else {
1152 WaitingTask* moduleTask =
1153 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1154 auto group = task.group();
1155 if constexpr (T::isEvent_) {
1156 if (hasAcquire()) {
1157 WaitingTaskWithArenaHolder runTaskHolder(
1158 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1159 moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1160 }
1161 }
1162 prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1163 }
1164 }
1165 }
1166
1167 template <typename T>
1168 std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1169 typename T::TransitionInfoType const& transitionInfo,
1170 StreamID streamID,
1171 ParentContext const& parentContext,
1172 typename T::Context const* context) noexcept {
1173 std::exception_ptr exceptionPtr;
1174 bool shouldRun = true;
1175 if (iEPtr) {
1176 if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1177 exceptionPtr = iEPtr;
1178 setException<T::isEvent_>(exceptionPtr);
1179 shouldRun = false;
1180 } else {
1181 if (not shouldTryToContinue_) {
1182 setPassed<T::isEvent_>();
1183 shouldRun = false;
1184 }
1185 }
1186 }
1187 if (shouldRun) {
1188
1189 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1190 exceptionPtr = std::current_exception();
1191 }
1192 } else {
1193 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1194 }
1195 waitingTasks_.doneWaiting(exceptionPtr);
1196 return exceptionPtr;
1197 }
1198
1199 template <typename T>
1200 void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1201 typename T::TransitionInfoType const& transitionInfo,
1202 ServiceToken const& serviceToken,
1203 StreamID streamID,
1204 ParentContext const& parentContext,
1205 typename T::Context const* context) noexcept {
1206 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1207 return;
1208 }
1209
1210
1211 bool expected = false;
1212 auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1213
1214 waitingTasks_.add(task);
1215 if (workStarted) {
1216 ServiceWeakToken weakToken = serviceToken;
1217 auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1218 std::exception_ptr exceptionPtr;
1219
1220 CMS_SA_ALLOW try {
1221
1222 ServiceRegistry::Operate guard(weakToken.lock());
1223
1224 this->runModule<T>(info, streamID, parentContext, context);
1225 } catch (...) {
1226 exceptionPtr = std::current_exception();
1227 }
1228 this->waitingTasks_.doneWaiting(exceptionPtr);
1229 };
1230
1231 if (needsESPrefetching(T::transition_)) {
1232 auto group = task.group();
1233 auto afterPrefetch =
1234 edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1235 if (iExcept) {
1236 this->waitingTasks_.doneWaiting(*iExcept);
1237 } else {
1238 if (auto queue = this->serializeRunModule()) {
1239 queue.push(*group, toDo);
1240 } else {
1241 group->run(toDo);
1242 }
1243 }
1244 });
1245 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1246 esPrefetchAsync(
1247 WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1248 } else {
1249 auto group = task.group();
1250 if (auto queue = this->serializeRunModule()) {
1251 queue.push(*group, toDo);
1252 } else {
1253 group->run(toDo);
1254 }
1255 }
1256 }
1257 }
1258
1259 template <typename T>
1260 bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1261 StreamID streamID,
1262 ParentContext const& parentContext,
1263 typename T::Context const* context) {
1264
1265
1266
1267
1268 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1269 if constexpr (T::isEvent_) {
1270 timesRun_.fetch_add(1, std::memory_order_relaxed);
1271 }
1272
1273 bool rc = true;
1274 try {
1275 convertException::wrap([&]() {
1276 rc = workerhelper::CallImpl<T>::call(
1277 this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1278
1279 if (rc) {
1280 setPassed<T::isEvent_>();
1281 } else {
1282 setFailed<T::isEvent_>();
1283 }
1284 });
1285 } catch (cms::Exception& ex) {
1286 edm::exceptionContext(ex, moduleCallingContext_);
1287 if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1288 assert(not cached_exception_);
1289 setException<T::isEvent_>(std::current_exception());
1290 std::rethrow_exception(cached_exception_);
1291 } else {
1292 rc = setPassed<T::isEvent_>();
1293 }
1294 }
1295
1296 return rc;
1297 }
1298
1299 template <typename T>
1300 std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1301 StreamID streamID,
1302 ParentContext const& parentContext,
1303 typename T::Context const* context) noexcept {
1304 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1305 std::exception_ptr prefetchingException;
1306 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1307 }
1308 }
1309 #endif