File indexing completed on 2022-06-03 00:58:58
0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0047 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0048 #include "FWCore/Concurrency/interface/FunctorTask.h"
0049 #include "FWCore/Utilities/interface/Exception.h"
0050 #include "FWCore/Utilities/interface/ConvertException.h"
0051 #include "FWCore/Utilities/interface/BranchType.h"
0052 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0053 #include "FWCore/Utilities/interface/StreamID.h"
0054 #include "FWCore/Utilities/interface/propagate_const.h"
0055 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0056 #include "FWCore/Utilities/interface/ESIndices.h"
0057 #include "FWCore/Utilities/interface/Transition.h"
0058
0059 #include "FWCore/Framework/interface/Frameworkfwd.h"
0060
0061 #include <array>
0062 #include <atomic>
0063 #include <cassert>
0064 #include <map>
0065 #include <memory>
0066 #include <sstream>
0067 #include <string>
0068 #include <vector>
0069 #include <exception>
0070 #include <unordered_map>
0071
0072 namespace edm {
0073 class EventPrincipal;
0074 class EventSetupImpl;
0075 class EarlyDeleteHelper;
0076 class ModuleProcessName;
0077 class ProductResolverIndexHelper;
0078 class ProductResolverIndexAndSkipBit;
0079 class StreamID;
0080 class StreamContext;
0081 class ProductRegistry;
0082 class ThinnedAssociationsHelper;
0083
0084 namespace workerhelper {
0085 template <typename O>
0086 class CallImpl;
0087 }
0088 namespace eventsetup {
0089 class ESRecordsToProxyIndices;
0090 }
0091
0092 class Worker {
0093 public:
0094 enum State { Ready, Pass, Fail, Exception };
0095 enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096 enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream, kLegacy };
0097 struct TaskQueueAdaptor {
0098 SerialTaskQueueChain* serial_ = nullptr;
0099 LimitedTaskQueue* limited_ = nullptr;
0100
0101 TaskQueueAdaptor() = default;
0102 TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103 TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104
0105 operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106
0107 template <class F>
0108 void push(oneapi::tbb::task_group& iG, F&& iF) {
0109 if (serial_) {
0110 serial_->push(iG, iF);
0111 } else {
0112 limited_->push(iG, iF);
0113 }
0114 }
0115 };
0116
0117 Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118 virtual ~Worker();
0119
0120 Worker(Worker const&) = delete;
0121 Worker& operator=(Worker const&) = delete;
0122
0123 void clearModule() {
0124 moduleValid_ = false;
0125 doClearModule();
0126 }
0127
0128 virtual bool wantsProcessBlocks() const = 0;
0129 virtual bool wantsInputProcessBlocks() const = 0;
0130 virtual bool wantsGlobalRuns() const = 0;
0131 virtual bool wantsGlobalLuminosityBlocks() const = 0;
0132 virtual bool wantsStreamRuns() const = 0;
0133 virtual bool wantsStreamLuminosityBlocks() const = 0;
0134
0135 virtual SerialTaskQueue* globalRunsQueue() = 0;
0136 virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0137
0138 void prePrefetchSelectionAsync(
0139 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, EventPrincipal const*);
0140
0141 void prePrefetchSelectionAsync(
0142 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) {
0143 assert(false);
0144 }
0145
0146 template <typename T>
0147 void doWorkAsync(WaitingTaskHolder,
0148 typename T::TransitionInfoType const&,
0149 ServiceToken const&,
0150 StreamID,
0151 ParentContext const&,
0152 typename T::Context const*);
0153
0154 template <typename T>
0155 void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0156 typename T::TransitionInfoType const&,
0157 ServiceToken const&,
0158 StreamID,
0159 ParentContext const&,
0160 typename T::Context const*);
0161
0162 template <typename T>
0163 std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0164 StreamID,
0165 ParentContext const&,
0166 typename T::Context const*);
0167
0168 void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0169 void skipOnPath(EventPrincipal const& iEvent);
0170 void beginJob();
0171 void endJob();
0172 void beginStream(StreamID id, StreamContext& streamContext);
0173 void endStream(StreamID id, StreamContext& streamContext);
0174 void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0175 void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0176 void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0177 void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0178
0179 void reset() {
0180 cached_exception_ = std::exception_ptr();
0181 state_ = Ready;
0182 waitingTasks_.reset();
0183 workStarted_ = false;
0184 numberOfPathsLeftToRun_ = numberOfPathsOn_;
0185 }
0186
0187 void postDoEvent(EventPrincipal const&);
0188
0189 ModuleDescription const* description() const {
0190 if (moduleValid_) {
0191 return moduleCallingContext_.moduleDescription();
0192 }
0193 return nullptr;
0194 }
0195
0196
0197 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0198
0199 void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0200
0201
0202 virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0203 virtual void updateLookup(eventsetup::ESRecordsToProxyIndices const&) = 0;
0204 virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0205 virtual void resolvePutIndicies(
0206 BranchType iBranchType,
0207 std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0208 iIndicies) = 0;
0209
0210 virtual void modulesWhoseProductsAreConsumed(
0211 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0212 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0213 ProductRegistry const& preg,
0214 std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0215
0216 virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0217
0218 virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0219
0220 virtual Types moduleType() const = 0;
0221 virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0222
0223 void clearCounters() {
0224 timesRun_.store(0, std::memory_order_release);
0225 timesVisited_.store(0, std::memory_order_release);
0226 timesPassed_.store(0, std::memory_order_release);
0227 timesFailed_.store(0, std::memory_order_release);
0228 timesExcept_.store(0, std::memory_order_release);
0229 }
0230
0231 void addedToPath() { ++numberOfPathsOn_; }
0232
0233 int timesRun() const { return timesRun_.load(std::memory_order_acquire); }
0234 int timesVisited() const { return timesVisited_.load(std::memory_order_acquire); }
0235 int timesPassed() const { return timesPassed_.load(std::memory_order_acquire); }
0236 int timesFailed() const { return timesFailed_.load(std::memory_order_acquire); }
0237 int timesExcept() const { return timesExcept_.load(std::memory_order_acquire); }
0238 State state() const { return state_; }
0239
0240 int timesPass() const { return timesPassed(); }
0241
0242 virtual bool hasAccumulator() const = 0;
0243
0244 protected:
0245 template <typename O>
0246 friend class workerhelper::CallImpl;
0247
0248 virtual void doClearModule() = 0;
0249
0250 virtual std::string workerType() const = 0;
0251 virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0252
0253 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0254 virtual bool implNeedToRunSelection() const = 0;
0255
0256 virtual void implDoAcquire(EventTransitionInfo const&,
0257 ModuleCallingContext const*,
0258 WaitingTaskWithArenaHolder&) = 0;
0259
0260 virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0261 virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0262 virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0263 virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0264 virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0265 virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0266 virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0267 virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0268 virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0269 virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0270 virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0271 virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0272 virtual void implBeginJob() = 0;
0273 virtual void implEndJob() = 0;
0274 virtual void implBeginStream(StreamID) = 0;
0275 virtual void implEndStream(StreamID) = 0;
0276
0277 void resetModuleDescription(ModuleDescription const*);
0278
0279 ActivityRegistry* activityRegistry() { return actReg_.get(); }
0280
0281 private:
0282 template <typename T>
0283 bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0284
0285 virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0286 virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0287
0288 virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0289
0290 virtual std::vector<ESProxyIndex> const& esItemsToGetFrom(Transition) const = 0;
0291 virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0292 virtual std::vector<ProductResolverIndex> const& itemsShouldPutInEvent() const = 0;
0293
0294 virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0295 ModuleCallingContext const& moduleCallingContext,
0296 Principal const& iPrincipal) const = 0;
0297
0298 virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0299 virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0300 virtual void implRespondToCloseOutputFile() = 0;
0301
0302 virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0303
0304 virtual TaskQueueAdaptor serializeRunModule() = 0;
0305
0306 bool shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const;
0307
0308 template <bool IS_EVENT>
0309 bool setPassed() {
0310 if (IS_EVENT) {
0311 timesPassed_.fetch_add(1, std::memory_order_relaxed);
0312 }
0313 state_ = Pass;
0314 return true;
0315 }
0316
0317 template <bool IS_EVENT>
0318 bool setFailed() {
0319 if (IS_EVENT) {
0320 timesFailed_.fetch_add(1, std::memory_order_relaxed);
0321 }
0322 state_ = Fail;
0323 return false;
0324 }
0325
0326 template <bool IS_EVENT>
0327 std::exception_ptr setException(std::exception_ptr iException) {
0328 if (IS_EVENT) {
0329 timesExcept_.fetch_add(1, std::memory_order_relaxed);
0330 }
0331 cached_exception_ = iException;
0332 state_ = Exception;
0333 return cached_exception_;
0334 }
0335
0336 template <typename T>
0337 void prefetchAsync(WaitingTaskHolder,
0338 ServiceToken const&,
0339 ParentContext const&,
0340 typename T::TransitionInfoType const&,
0341 Transition);
0342
0343 void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&);
0344 void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const;
0345
0346 bool needsESPrefetching(Transition iTrans) const noexcept {
0347 return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0348 }
0349
0350 void emitPostModuleEventPrefetchingSignal() {
0351 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0352 }
0353
0354 virtual bool hasAcquire() const = 0;
0355
0356 template <typename T>
0357 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr const*,
0358 typename T::TransitionInfoType const&,
0359 StreamID,
0360 ParentContext const&,
0361 typename T::Context const*);
0362
0363 void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0364
0365 void runAcquireAfterAsyncPrefetch(std::exception_ptr const*,
0366 EventTransitionInfo const&,
0367 ParentContext const&,
0368 WaitingTaskWithArenaHolder);
0369
0370 std::exception_ptr handleExternalWorkException(std::exception_ptr const* iEPtr, ParentContext const& parentContext);
0371
0372 template <typename T>
0373 class RunModuleTask : public WaitingTask {
0374 public:
0375 RunModuleTask(Worker* worker,
0376 typename T::TransitionInfoType const& transitionInfo,
0377 ServiceToken const& token,
0378 StreamID streamID,
0379 ParentContext const& parentContext,
0380 typename T::Context const* context,
0381 oneapi::tbb::task_group* iGroup)
0382 : m_worker(worker),
0383 m_transitionInfo(transitionInfo),
0384 m_streamID(streamID),
0385 m_parentContext(parentContext),
0386 m_context(context),
0387 m_serviceToken(token),
0388 m_group(iGroup) {}
0389
0390 struct EnableQueueGuard {
0391 SerialTaskQueue* queue_;
0392 EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0393 EnableQueueGuard(EnableQueueGuard const&) = delete;
0394 EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0395 EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0396 EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0397 ~EnableQueueGuard() {
0398 if (queue_) {
0399 queue_->resume();
0400 }
0401 }
0402 };
0403
0404 void execute() final {
0405
0406 ServiceRegistry::Operate guard(m_serviceToken.lock());
0407
0408
0409
0410 std::exception_ptr temp_excptr;
0411 auto excptr = exceptionPtr();
0412 if constexpr (T::isEvent_) {
0413 if (!m_worker->hasAcquire()) {
0414
0415 CMS_SA_ALLOW try {
0416
0417 m_worker->emitPostModuleEventPrefetchingSignal();
0418 } catch (...) {
0419 temp_excptr = std::current_exception();
0420 if (not excptr) {
0421 excptr = &temp_excptr;
0422 }
0423 }
0424 }
0425 }
0426
0427 if (not excptr) {
0428 if (auto queue = m_worker->serializeRunModule()) {
0429 auto f = [worker = m_worker,
0430 info = m_transitionInfo,
0431 streamID = m_streamID,
0432 parentContext = m_parentContext,
0433 sContext = m_context,
0434 serviceToken = m_serviceToken]() {
0435
0436 ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0437
0438
0439
0440
0441 EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0442 std::exception_ptr* ptr = nullptr;
0443 worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0444 };
0445
0446 auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0447 if (gQueue) {
0448 gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0449 gQueue->pause();
0450 queue.push(*group, std::move(f));
0451 });
0452 } else {
0453 queue.push(*m_group, std::move(f));
0454 }
0455 return;
0456 }
0457 }
0458
0459 m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0460 }
0461
0462 private:
0463 Worker* m_worker;
0464 typename T::TransitionInfoType m_transitionInfo;
0465 StreamID m_streamID;
0466 ParentContext const m_parentContext;
0467 typename T::Context const* m_context;
0468 ServiceWeakToken m_serviceToken;
0469 oneapi::tbb::task_group* m_group;
0470 };
0471
0472
0473
0474
0475
0476 template <typename T, typename DUMMY = void>
0477 class AcquireTask : public WaitingTask {
0478 public:
0479 AcquireTask(Worker*,
0480 typename T::TransitionInfoType const&,
0481 ServiceToken const&,
0482 ParentContext const&,
0483 WaitingTaskWithArenaHolder) {}
0484 void execute() final {}
0485 };
0486
0487 template <typename DUMMY>
0488 class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0489 public:
0490 AcquireTask(Worker* worker,
0491 EventTransitionInfo const& eventTransitionInfo,
0492 ServiceToken const& token,
0493 ParentContext const& parentContext,
0494 WaitingTaskWithArenaHolder holder)
0495 : m_worker(worker),
0496 m_eventTransitionInfo(eventTransitionInfo),
0497 m_parentContext(parentContext),
0498 m_holder(std::move(holder)),
0499 m_serviceToken(token) {}
0500
0501 void execute() final {
0502
0503 ServiceRegistry::Operate guard(m_serviceToken.lock());
0504
0505
0506
0507 std::exception_ptr temp_excptr;
0508 auto excptr = exceptionPtr();
0509
0510 CMS_SA_ALLOW try {
0511
0512 m_worker->emitPostModuleEventPrefetchingSignal();
0513 } catch (...) {
0514 temp_excptr = std::current_exception();
0515 if (not excptr) {
0516 excptr = &temp_excptr;
0517 }
0518 }
0519
0520 if (not excptr) {
0521 if (auto queue = m_worker->serializeRunModule()) {
0522 queue.push(*m_holder.group(),
0523 [worker = m_worker,
0524 info = m_eventTransitionInfo,
0525 parentContext = m_parentContext,
0526 serviceToken = m_serviceToken,
0527 holder = m_holder]() {
0528
0529 ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0530
0531 std::exception_ptr* ptr = nullptr;
0532 worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0533 });
0534 return;
0535 }
0536 }
0537
0538 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0539 }
0540
0541 private:
0542 Worker* m_worker;
0543 EventTransitionInfo m_eventTransitionInfo;
0544 ParentContext const m_parentContext;
0545 WaitingTaskWithArenaHolder m_holder;
0546 ServiceWeakToken m_serviceToken;
0547 };
0548
0549
0550
0551
0552 class HandleExternalWorkExceptionTask : public WaitingTask {
0553 public:
0554 HandleExternalWorkExceptionTask(Worker* worker,
0555 oneapi::tbb::task_group* group,
0556 WaitingTask* runModuleTask,
0557 ParentContext const& parentContext);
0558
0559 void execute() final;
0560
0561 private:
0562 Worker* m_worker;
0563 WaitingTask* m_runModuleTask;
0564 oneapi::tbb::task_group* m_group;
0565 ParentContext const m_parentContext;
0566 };
0567
0568 std::atomic<int> timesRun_;
0569 std::atomic<int> timesVisited_;
0570 std::atomic<int> timesPassed_;
0571 std::atomic<int> timesFailed_;
0572 std::atomic<int> timesExcept_;
0573 std::atomic<State> state_;
0574 int numberOfPathsOn_;
0575 std::atomic<int> numberOfPathsLeftToRun_;
0576
0577 ModuleCallingContext moduleCallingContext_;
0578
0579 ExceptionToActionTable const* actions_;
0580 CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;
0581
0582 std::shared_ptr<ActivityRegistry> actReg_;
0583
0584 edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0585
0586 edm::WaitingTaskList waitingTasks_;
0587 std::atomic<bool> workStarted_;
0588 bool ranAcquireWithoutException_;
0589 bool moduleValid_ = true;
0590 };
0591
0592 namespace {
0593 template <typename T>
0594 class ModuleSignalSentry {
0595 public:
0596 ModuleSignalSentry(ActivityRegistry* a,
0597 typename T::Context const* context,
0598 ModuleCallingContext const* moduleCallingContext)
0599 : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {
0600 if (a_)
0601 T::preModuleSignal(a_, context, moduleCallingContext_);
0602 }
0603
0604 ~ModuleSignalSentry() {
0605 if (a_)
0606 T::postModuleSignal(a_, context_, moduleCallingContext_);
0607 }
0608
0609 private:
0610 ActivityRegistry* a_;
0611 typename T::Context const* context_;
0612 ModuleCallingContext const* moduleCallingContext_;
0613 };
0614
0615 }
0616
0617 namespace workerhelper {
0618 template <>
0619 class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0620 public:
0621 typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0622 static bool call(Worker* iWorker,
0623 StreamID,
0624 EventTransitionInfo const& info,
0625 ActivityRegistry* ,
0626 ModuleCallingContext const* mcc,
0627 Arg::Context const* ) {
0628
0629 return iWorker->implDo(info, mcc);
0630 }
0631 static void esPrefetchAsync(Worker* worker,
0632 WaitingTaskHolder waitingTask,
0633 ServiceToken const& token,
0634 EventTransitionInfo const& info,
0635 Transition transition) {
0636 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0637 }
0638 static bool wantsTransition(Worker const* iWorker) { return true; }
0639 static bool needToRunSelection(Worker const* iWorker) { return iWorker->implNeedToRunSelection(); }
0640
0641 static SerialTaskQueue* pauseGlobalQueue(Worker*) { return nullptr; }
0642 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0643 };
0644
0645 template <>
0646 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0647 public:
0648 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0649 static bool call(Worker* iWorker,
0650 StreamID,
0651 RunTransitionInfo const& info,
0652 ActivityRegistry* actReg,
0653 ModuleCallingContext const* mcc,
0654 Arg::Context const* context) {
0655 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0656 return iWorker->implDoBegin(info, mcc);
0657 }
0658 static void esPrefetchAsync(Worker* worker,
0659 WaitingTaskHolder waitingTask,
0660 ServiceToken const& token,
0661 RunTransitionInfo const& info,
0662 Transition transition) {
0663 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0664 }
0665 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0666 static bool needToRunSelection(Worker const* iWorker) { return false; }
0667 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0668 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0669 };
0670 template <>
0671 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0672 public:
0673 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0674 static bool call(Worker* iWorker,
0675 StreamID id,
0676 RunTransitionInfo const& info,
0677 ActivityRegistry* actReg,
0678 ModuleCallingContext const* mcc,
0679 Arg::Context const* context) {
0680 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0681 return iWorker->implDoStreamBegin(id, info, mcc);
0682 }
0683 static void esPrefetchAsync(Worker* worker,
0684 WaitingTaskHolder waitingTask,
0685 ServiceToken const& token,
0686 RunTransitionInfo const& info,
0687 Transition transition) {
0688 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0689 }
0690 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0691 static bool needToRunSelection(Worker const* iWorker) { return false; }
0692 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0693 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0694 };
0695 template <>
0696 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0697 public:
0698 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0699 static bool call(Worker* iWorker,
0700 StreamID,
0701 RunTransitionInfo const& info,
0702 ActivityRegistry* actReg,
0703 ModuleCallingContext const* mcc,
0704 Arg::Context const* context) {
0705 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0706 return iWorker->implDoEnd(info, mcc);
0707 }
0708 static void esPrefetchAsync(Worker* worker,
0709 WaitingTaskHolder waitingTask,
0710 ServiceToken const& token,
0711 RunTransitionInfo const& info,
0712 Transition transition) {
0713 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0714 }
0715 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalRuns(); }
0716 static bool needToRunSelection(Worker const* iWorker) { return false; }
0717 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0718 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalRunsQueue(); }
0719 };
0720 template <>
0721 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0722 public:
0723 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0724 static bool call(Worker* iWorker,
0725 StreamID id,
0726 RunTransitionInfo const& info,
0727 ActivityRegistry* actReg,
0728 ModuleCallingContext const* mcc,
0729 Arg::Context const* context) {
0730 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0731 return iWorker->implDoStreamEnd(id, info, mcc);
0732 }
0733 static void esPrefetchAsync(Worker* worker,
0734 WaitingTaskHolder waitingTask,
0735 ServiceToken const& token,
0736 RunTransitionInfo const& info,
0737 Transition transition) {
0738 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0739 }
0740 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamRuns(); }
0741 static bool needToRunSelection(Worker const* iWorker) { return false; }
0742 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0743 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0744 };
0745
0746 template <>
0747 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0748 public:
0749 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0750 static bool call(Worker* iWorker,
0751 StreamID,
0752 LumiTransitionInfo const& info,
0753 ActivityRegistry* actReg,
0754 ModuleCallingContext const* mcc,
0755 Arg::Context const* context) {
0756 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0757 return iWorker->implDoBegin(info, mcc);
0758 }
0759 static void esPrefetchAsync(Worker* worker,
0760 WaitingTaskHolder waitingTask,
0761 ServiceToken const& token,
0762 LumiTransitionInfo const& info,
0763 Transition transition) {
0764 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0765 }
0766 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0767 static bool needToRunSelection(Worker const* iWorker) { return false; }
0768 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0769 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0770 };
0771 template <>
0772 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0773 public:
0774 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0775 static bool call(Worker* iWorker,
0776 StreamID id,
0777 LumiTransitionInfo const& info,
0778 ActivityRegistry* actReg,
0779 ModuleCallingContext const* mcc,
0780 Arg::Context const* context) {
0781 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0782 return iWorker->implDoStreamBegin(id, info, mcc);
0783 }
0784 static void esPrefetchAsync(Worker* worker,
0785 WaitingTaskHolder waitingTask,
0786 ServiceToken const& token,
0787 LumiTransitionInfo const& info,
0788 Transition transition) {
0789 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0790 }
0791 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0792 static bool needToRunSelection(Worker const* iWorker) { return false; }
0793 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0794 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0795 };
0796
0797 template <>
0798 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0799 public:
0800 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0801 static bool call(Worker* iWorker,
0802 StreamID,
0803 LumiTransitionInfo const& info,
0804 ActivityRegistry* actReg,
0805 ModuleCallingContext const* mcc,
0806 Arg::Context const* context) {
0807 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0808 return iWorker->implDoEnd(info, mcc);
0809 }
0810 static void esPrefetchAsync(Worker* worker,
0811 WaitingTaskHolder waitingTask,
0812 ServiceToken const& token,
0813 LumiTransitionInfo const& info,
0814 Transition transition) {
0815 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0816 }
0817 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsGlobalLuminosityBlocks(); }
0818 static bool needToRunSelection(Worker const* iWorker) { return false; }
0819 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0820 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) { return iWorker->globalLuminosityBlocksQueue(); }
0821 };
0822 template <>
0823 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0824 public:
0825 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0826 static bool call(Worker* iWorker,
0827 StreamID id,
0828 LumiTransitionInfo const& info,
0829 ActivityRegistry* actReg,
0830 ModuleCallingContext const* mcc,
0831 Arg::Context const* context) {
0832 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0833 return iWorker->implDoStreamEnd(id, info, mcc);
0834 }
0835 static void esPrefetchAsync(Worker* worker,
0836 WaitingTaskHolder waitingTask,
0837 ServiceToken const& token,
0838 LumiTransitionInfo const& info,
0839 Transition transition) {
0840 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0841 }
0842 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsStreamLuminosityBlocks(); }
0843 static bool needToRunSelection(Worker const* iWorker) { return false; }
0844 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0845 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0846 };
0847 template <>
0848 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0849 public:
0850 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0851 static bool call(Worker* iWorker,
0852 StreamID,
0853 ProcessBlockTransitionInfo const& info,
0854 ActivityRegistry* actReg,
0855 ModuleCallingContext const* mcc,
0856 Arg::Context const* context) {
0857 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0858 return iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0859 }
0860 static void esPrefetchAsync(
0861 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0862 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0863 static bool needToRunSelection(Worker const* iWorker) { return false; }
0864 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0865 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0866 };
0867 template <>
0868 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
0869 public:
0870 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
0871 static bool call(Worker* iWorker,
0872 StreamID,
0873 ProcessBlockTransitionInfo const& info,
0874 ActivityRegistry* actReg,
0875 ModuleCallingContext const* mcc,
0876 Arg::Context const* context) {
0877 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0878 return iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
0879 }
0880 static void esPrefetchAsync(
0881 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0882 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsInputProcessBlocks(); }
0883 static bool needToRunSelection(Worker const* iWorker) { return false; }
0884 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0885 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0886 };
0887 template <>
0888 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
0889 public:
0890 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
0891 static bool call(Worker* iWorker,
0892 StreamID,
0893 ProcessBlockTransitionInfo const& info,
0894 ActivityRegistry* actReg,
0895 ModuleCallingContext const* mcc,
0896 Arg::Context const* context) {
0897 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0898 return iWorker->implDoEndProcessBlock(info.principal(), mcc);
0899 }
0900 static void esPrefetchAsync(
0901 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) {}
0902 static bool wantsTransition(Worker const* iWorker) { return iWorker->wantsProcessBlocks(); }
0903 static bool needToRunSelection(Worker const* iWorker) { return false; }
0904 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) { return nullptr; }
0905 static SerialTaskQueue* enableGlobalQueue(Worker*) { return nullptr; }
0906 };
0907 }
0908
0909 template <typename T>
0910 void Worker::prefetchAsync(WaitingTaskHolder iTask,
0911 ServiceToken const& token,
0912 ParentContext const& parentContext,
0913 typename T::TransitionInfoType const& transitionInfo,
0914 Transition iTransition) {
0915 Principal const& principal = transitionInfo.principal();
0916
0917 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0918
0919 if (principal.branchType() == InEvent) {
0920 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0921 }
0922
0923 workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
0924 edPrefetchAsync(iTask, token, principal);
0925
0926 if (principal.branchType() == InEvent) {
0927 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
0928 }
0929 }
0930
0931 template <typename T>
0932 void Worker::doWorkAsync(WaitingTaskHolder task,
0933 typename T::TransitionInfoType const& transitionInfo,
0934 ServiceToken const& token,
0935 StreamID streamID,
0936 ParentContext const& parentContext,
0937 typename T::Context const* context) {
0938 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
0939 return;
0940 }
0941
0942
0943 bool expected = false;
0944 bool workStarted = workStarted_.compare_exchange_strong(expected, true);
0945
0946 waitingTasks_.add(task);
0947 if constexpr (T::isEvent_) {
0948 timesVisited_.fetch_add(1, std::memory_order_relaxed);
0949 }
0950
0951 if (workStarted) {
0952 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
0953
0954
0955 if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
0956
0957
0958 WaitingTask* moduleTask =
0959 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
0960
0961
0962 struct DestroyTask {
0963 DestroyTask(edm::WaitingTask* iTask) : m_task(iTask) {}
0964
0965 ~DestroyTask() {
0966 auto p = m_task.exchange(nullptr);
0967 if (p) {
0968 TaskSentry s{p};
0969 }
0970 }
0971
0972 edm::WaitingTask* release() { return m_task.exchange(nullptr); }
0973
0974 private:
0975 std::atomic<edm::WaitingTask*> m_task;
0976 };
0977 if constexpr (T::isEvent_) {
0978 if (hasAcquire()) {
0979 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
0980 ServiceWeakToken weakToken = token;
0981 auto* group = task.group();
0982 moduleTask = make_waiting_task(
0983 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
0984 WaitingTaskWithArenaHolder runTaskHolder(
0985 *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
0986 AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
0987 t.execute();
0988 });
0989 }
0990 }
0991 auto* group = task.group();
0992 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
0993 ServiceWeakToken weakToken = token;
0994 auto selectionTask =
0995 make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
0996 std::exception_ptr const*) mutable {
0997 ServiceRegistry::Operate guard(weakToken.lock());
0998 prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
0999 weakToken.lock(),
1000 parentContext,
1001 info,
1002 T::transition_);
1003 });
1004 prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1005 } else {
1006 WaitingTask* moduleTask =
1007 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1008 auto group = task.group();
1009 if constexpr (T::isEvent_) {
1010 if (hasAcquire()) {
1011 WaitingTaskWithArenaHolder runTaskHolder(
1012 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1013 moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1014 }
1015 }
1016 prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1017 }
1018 }
1019 }
1020
1021 template <typename T>
1022 std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
1023 typename T::TransitionInfoType const& transitionInfo,
1024 StreamID streamID,
1025 ParentContext const& parentContext,
1026 typename T::Context const* context) {
1027 std::exception_ptr exceptionPtr;
1028 if (iEPtr) {
1029 assert(*iEPtr);
1030 if (shouldRethrowException(*iEPtr, parentContext, T::isEvent_)) {
1031 exceptionPtr = *iEPtr;
1032 setException<T::isEvent_>(exceptionPtr);
1033 } else {
1034 setPassed<T::isEvent_>();
1035 }
1036 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1037 } else {
1038
1039 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1040 exceptionPtr = std::current_exception();
1041 }
1042 }
1043 waitingTasks_.doneWaiting(exceptionPtr);
1044 return exceptionPtr;
1045 }
1046
1047 template <typename T>
1048 void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1049 typename T::TransitionInfoType const& transitionInfo,
1050 ServiceToken const& serviceToken,
1051 StreamID streamID,
1052 ParentContext const& parentContext,
1053 typename T::Context const* context) {
1054 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1055 return;
1056 }
1057
1058
1059 bool expected = false;
1060 auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1061
1062 waitingTasks_.add(task);
1063 if (workStarted) {
1064 ServiceWeakToken weakToken = serviceToken;
1065 auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1066 std::exception_ptr exceptionPtr;
1067
1068 CMS_SA_ALLOW try {
1069
1070 ServiceRegistry::Operate guard(weakToken.lock());
1071
1072 this->runModule<T>(info, streamID, parentContext, context);
1073 } catch (...) {
1074 exceptionPtr = std::current_exception();
1075 }
1076 this->waitingTasks_.doneWaiting(exceptionPtr);
1077 };
1078
1079 if (needsESPrefetching(T::transition_)) {
1080 auto group = task.group();
1081 auto afterPrefetch =
1082 edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1083 if (iExcept) {
1084 this->waitingTasks_.doneWaiting(*iExcept);
1085 } else {
1086 if (auto queue = this->serializeRunModule()) {
1087 queue.push(*group, toDo);
1088 } else {
1089 group->run(toDo);
1090 }
1091 }
1092 });
1093 esPrefetchAsync(
1094 WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1095 } else {
1096 auto group = task.group();
1097 if (auto queue = this->serializeRunModule()) {
1098 queue.push(*group, toDo);
1099 } else {
1100 group->run(toDo);
1101 }
1102 }
1103 }
1104 }
1105
1106 template <typename T>
1107 bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1108 StreamID streamID,
1109 ParentContext const& parentContext,
1110 typename T::Context const* context) {
1111
1112
1113
1114
1115 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1116 if constexpr (T::isEvent_) {
1117 timesRun_.fetch_add(1, std::memory_order_relaxed);
1118 }
1119
1120 bool rc = true;
1121 try {
1122 convertException::wrap([&]() {
1123 rc = workerhelper::CallImpl<T>::call(
1124 this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1125
1126 if (rc) {
1127 setPassed<T::isEvent_>();
1128 } else {
1129 setFailed<T::isEvent_>();
1130 }
1131 });
1132 } catch (cms::Exception& ex) {
1133 edm::exceptionContext(ex, moduleCallingContext_);
1134 if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_)) {
1135 assert(not cached_exception_);
1136 setException<T::isEvent_>(std::current_exception());
1137 std::rethrow_exception(cached_exception_);
1138 } else {
1139 rc = setPassed<T::isEvent_>();
1140 }
1141 }
1142
1143 return rc;
1144 }
1145
1146 template <typename T>
1147 std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1148 StreamID streamID,
1149 ParentContext const& parentContext,
1150 typename T::Context const* context) {
1151 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1152 std::exception_ptr const* prefetchingException = nullptr;
1153 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1154 }
1155 }
1156 #endif