File indexing completed on 2024-06-04 04:34:52
0001 #ifndef FWCore_Framework_Worker_h
0002 #define FWCore_Framework_Worker_h
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0025 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0026 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0027 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0028 #include "FWCore/Framework/interface/maker/WorkerParams.h"
0029 #include "FWCore/Framework/interface/ExceptionActions.h"
0030 #include "FWCore/Framework/interface/ModuleContextSentry.h"
0031 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0032 #include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
0033 #include "FWCore/Concurrency/interface/WaitingTask.h"
0034 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0035 #include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
0036 #include "FWCore/Concurrency/interface/WaitingTaskList.h"
0037 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0038 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0039 #include "FWCore/ServiceRegistry/interface/ConsumesInfo.h"
0040 #include "FWCore/ServiceRegistry/interface/InternalContext.h"
0041 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0042 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0043 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0044 #include "FWCore/ServiceRegistry/interface/PlaceInPathContext.h"
0045 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0046 #include "FWCore/Concurrency/interface/SerialTaskQueueChain.h"
0047 #include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
0048 #include "FWCore/Concurrency/interface/FunctorTask.h"
0049 #include "FWCore/Utilities/interface/Exception.h"
0050 #include "FWCore/Utilities/interface/ConvertException.h"
0051 #include "FWCore/Utilities/interface/BranchType.h"
0052 #include "FWCore/Utilities/interface/ProductResolverIndex.h"
0053 #include "FWCore/Utilities/interface/StreamID.h"
0054 #include "FWCore/Utilities/interface/propagate_const.h"
0055 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0056 #include "FWCore/Utilities/interface/ESIndices.h"
0057 #include "FWCore/Utilities/interface/Transition.h"
0058
0059 #include "FWCore/Framework/interface/Frameworkfwd.h"
0060
0061 #include <array>
0062 #include <atomic>
0063 #include <cassert>
0064 #include <map>
0065 #include <memory>
0066 #include <sstream>
0067 #include <string>
0068 #include <vector>
0069 #include <exception>
0070 #include <unordered_map>
0071
0072 namespace edm {
0073 class EventPrincipal;
0074 class EventSetupImpl;
0075 class EarlyDeleteHelper;
0076 class ModuleProcessName;
0077 class ProductResolverIndexHelper;
0078 class ProductResolverIndexAndSkipBit;
0079 class StreamID;
0080 class StreamContext;
0081 class ProductRegistry;
0082 class ThinnedAssociationsHelper;
0083
0084 namespace workerhelper {
0085 template <typename O>
0086 class CallImpl;
0087 }
0088 namespace eventsetup {
0089 class ESRecordsToProductResolverIndices;
0090 }
0091
0092 class Worker {
0093 public:
0094 enum State { Ready, Pass, Fail, Exception };
0095 enum Types { kAnalyzer, kFilter, kProducer, kOutputModule };
0096 enum ConcurrencyTypes { kGlobal, kLimited, kOne, kStream };
0097 struct TaskQueueAdaptor {
0098 SerialTaskQueueChain* serial_ = nullptr;
0099 LimitedTaskQueue* limited_ = nullptr;
0100
0101 TaskQueueAdaptor() = default;
0102 TaskQueueAdaptor(SerialTaskQueueChain* iChain) : serial_(iChain) {}
0103 TaskQueueAdaptor(LimitedTaskQueue* iLimited) : limited_(iLimited) {}
0104
0105 operator bool() { return serial_ != nullptr or limited_ != nullptr; }
0106
0107 template <class F>
0108 void push(oneapi::tbb::task_group& iG, F&& iF) {
0109 if (serial_) {
0110 serial_->push(iG, iF);
0111 } else {
0112 limited_->push(iG, iF);
0113 }
0114 }
0115 };
0116
0117 Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions);
0118 virtual ~Worker();
0119
0120 Worker(Worker const&) = delete;
0121 Worker& operator=(Worker const&) = delete;
0122
0123 void clearModule() {
0124 moduleValid_ = false;
0125 doClearModule();
0126 }
0127
0128 virtual bool wantsProcessBlocks() const noexcept = 0;
0129 virtual bool wantsInputProcessBlocks() const noexcept = 0;
0130 virtual bool wantsGlobalRuns() const noexcept = 0;
0131 virtual bool wantsGlobalLuminosityBlocks() const noexcept = 0;
0132 virtual bool wantsStreamRuns() const noexcept = 0;
0133 virtual bool wantsStreamLuminosityBlocks() const noexcept = 0;
0134
0135 virtual SerialTaskQueue* globalRunsQueue() = 0;
0136 virtual SerialTaskQueue* globalLuminosityBlocksQueue() = 0;
0137
0138 void prePrefetchSelectionAsync(oneapi::tbb::task_group&,
0139 WaitingTask* task,
0140 ServiceToken const&,
0141 StreamID stream,
0142 EventPrincipal const*) noexcept;
0143
0144 void prePrefetchSelectionAsync(
0145 oneapi::tbb::task_group&, WaitingTask* task, ServiceToken const&, StreamID stream, void const*) noexcept {
0146 assert(false);
0147 }
0148
0149 template <typename T>
0150 void doWorkAsync(WaitingTaskHolder,
0151 typename T::TransitionInfoType const&,
0152 ServiceToken const&,
0153 StreamID,
0154 ParentContext const&,
0155 typename T::Context const*) noexcept;
0156
0157 template <typename T>
0158 void doWorkNoPrefetchingAsync(WaitingTaskHolder,
0159 typename T::TransitionInfoType const&,
0160 ServiceToken const&,
0161 StreamID,
0162 ParentContext const&,
0163 typename T::Context const*) noexcept;
0164
0165 template <typename T>
0166 std::exception_ptr runModuleDirectly(typename T::TransitionInfoType const&,
0167 StreamID,
0168 ParentContext const&,
0169 typename T::Context const*) noexcept;
0170
0171 virtual size_t transformIndex(edm::BranchDescription const&) const noexcept = 0;
0172 void doTransformAsync(WaitingTaskHolder,
0173 size_t iTransformIndex,
0174 EventPrincipal const&,
0175 ServiceToken const&,
0176 StreamID,
0177 ModuleCallingContext const&,
0178 StreamContext const*) noexcept;
0179
0180 void callWhenDoneAsync(WaitingTaskHolder task) { waitingTasks_.add(std::move(task)); }
0181 void skipOnPath(EventPrincipal const& iEvent);
0182 void beginJob();
0183 void endJob();
0184 void beginStream(StreamID id, StreamContext& streamContext);
0185 void endStream(StreamID id, StreamContext& streamContext);
0186 void respondToOpenInputFile(FileBlock const& fb) { implRespondToOpenInputFile(fb); }
0187 void respondToCloseInputFile(FileBlock const& fb) { implRespondToCloseInputFile(fb); }
0188 void respondToCloseOutputFile() { implRespondToCloseOutputFile(); }
0189 void registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper);
0190
0191 void reset() {
0192 cached_exception_ = std::exception_ptr();
0193 state_ = Ready;
0194 waitingTasks_.reset();
0195 workStarted_ = false;
0196 numberOfPathsLeftToRun_ = numberOfPathsOn_;
0197 }
0198
0199 void postDoEvent(EventPrincipal const&);
0200
0201 ModuleDescription const* description() const noexcept {
0202 if (moduleValid_) {
0203 return moduleCallingContext_.moduleDescription();
0204 }
0205 return nullptr;
0206 }
0207
0208
0209 void setActivityRegistry(std::shared_ptr<ActivityRegistry> areg);
0210
0211 void setEarlyDeleteHelper(EarlyDeleteHelper* iHelper);
0212
0213
0214 virtual void updateLookup(BranchType iBranchType, ProductResolverIndexHelper const&) = 0;
0215 virtual void updateLookup(eventsetup::ESRecordsToProductResolverIndices const&) = 0;
0216 virtual void selectInputProcessBlocks(ProductRegistry const&, ProcessBlockHelperBase const&) = 0;
0217 virtual void resolvePutIndicies(
0218 BranchType iBranchType,
0219 std::unordered_multimap<std::string, std::tuple<TypeID const*, const char*, edm::ProductResolverIndex>> const&
0220 iIndicies) = 0;
0221
0222 virtual void modulesWhoseProductsAreConsumed(
0223 std::array<std::vector<ModuleDescription const*>*, NumBranchTypes>& modules,
0224 std::vector<ModuleProcessName>& modulesInPreviousProcesses,
0225 ProductRegistry const& preg,
0226 std::map<std::string, ModuleDescription const*> const& labelsToDesc) const = 0;
0227
0228 virtual void convertCurrentProcessAlias(std::string const& processName) = 0;
0229
0230 virtual std::vector<ConsumesInfo> consumesInfo() const = 0;
0231
0232 virtual Types moduleType() const = 0;
0233 virtual ConcurrencyTypes moduleConcurrencyType() const = 0;
0234
0235 void clearCounters() noexcept {
0236 timesRun_.store(0, std::memory_order_release);
0237 timesVisited_.store(0, std::memory_order_release);
0238 timesPassed_.store(0, std::memory_order_release);
0239 timesFailed_.store(0, std::memory_order_release);
0240 timesExcept_.store(0, std::memory_order_release);
0241 }
0242
0243 void addedToPath() noexcept { ++numberOfPathsOn_; }
0244
0245 int timesRun() const noexcept { return timesRun_.load(std::memory_order_acquire); }
0246 int timesVisited() const noexcept { return timesVisited_.load(std::memory_order_acquire); }
0247 int timesPassed() const noexcept { return timesPassed_.load(std::memory_order_acquire); }
0248 int timesFailed() const noexcept { return timesFailed_.load(std::memory_order_acquire); }
0249 int timesExcept() const noexcept { return timesExcept_.load(std::memory_order_acquire); }
0250 State state() const noexcept { return state_; }
0251
0252 int timesPass() const noexcept { return timesPassed(); }
0253
0254 virtual bool hasAccumulator() const noexcept = 0;
0255
0256
0257 edm::WaitingTaskList& waitingTaskList() noexcept { return waitingTasks_; }
0258
0259 protected:
0260 template <typename O>
0261 friend class workerhelper::CallImpl;
0262
0263 virtual void doClearModule() = 0;
0264
0265 virtual std::string workerType() const = 0;
0266 virtual bool implDo(EventTransitionInfo const&, ModuleCallingContext const*) = 0;
0267
0268 virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0269 virtual bool implNeedToRunSelection() const noexcept = 0;
0270
0271 virtual void implDoAcquire(EventTransitionInfo const&,
0272 ModuleCallingContext const*,
0273 WaitingTaskWithArenaHolder&) = 0;
0274
0275 virtual void implDoTransformAsync(WaitingTaskHolder,
0276 size_t iTransformIndex,
0277 EventPrincipal const&,
0278 ParentContext const&,
0279 ServiceWeakToken const&) noexcept = 0;
0280 virtual ProductResolverIndex itemToGetForTransform(size_t iTransformIndex) const noexcept = 0;
0281
0282 virtual bool implDoPrePrefetchSelection(StreamID, EventPrincipal const&, ModuleCallingContext const*) = 0;
0283 virtual bool implDoBeginProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0284 virtual bool implDoAccessInputProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0285 virtual bool implDoEndProcessBlock(ProcessBlockPrincipal const&, ModuleCallingContext const*) = 0;
0286 virtual bool implDoBegin(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0287 virtual bool implDoStreamBegin(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0288 virtual bool implDoStreamEnd(StreamID, RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0289 virtual bool implDoEnd(RunTransitionInfo const&, ModuleCallingContext const*) = 0;
0290 virtual bool implDoBegin(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0291 virtual bool implDoStreamBegin(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0292 virtual bool implDoStreamEnd(StreamID, LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0293 virtual bool implDoEnd(LumiTransitionInfo const&, ModuleCallingContext const*) = 0;
0294 virtual void implBeginJob() = 0;
0295 virtual void implEndJob() = 0;
0296 virtual void implBeginStream(StreamID) = 0;
0297 virtual void implEndStream(StreamID) = 0;
0298
0299 void resetModuleDescription(ModuleDescription const*);
0300
0301 ActivityRegistry* activityRegistry() { return actReg_.get(); }
0302
0303 private:
0304 template <typename T>
0305 bool runModule(typename T::TransitionInfoType const&, StreamID, ParentContext const&, typename T::Context const*);
0306
0307 virtual void itemsToGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0308 virtual void itemsMayGet(BranchType, std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
0309
0310 virtual std::vector<ProductResolverIndexAndSkipBit> const& itemsToGetFrom(BranchType) const = 0;
0311
0312 virtual std::vector<ESResolverIndex> const& esItemsToGetFrom(Transition) const = 0;
0313 virtual std::vector<ESRecordIndex> const& esRecordsToGetFrom(Transition) const = 0;
0314
0315 virtual void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
0316 ModuleCallingContext const& moduleCallingContext,
0317 Principal const& iPrincipal) const noexcept = 0;
0318
0319 virtual void implRespondToOpenInputFile(FileBlock const& fb) = 0;
0320 virtual void implRespondToCloseInputFile(FileBlock const& fb) = 0;
0321 virtual void implRespondToCloseOutputFile() = 0;
0322
0323 virtual void implRegisterThinnedAssociations(ProductRegistry const&, ThinnedAssociationsHelper&) = 0;
0324
0325 virtual TaskQueueAdaptor serializeRunModule() = 0;
0326
0327 bool shouldRethrowException(std::exception_ptr iPtr,
0328 ParentContext const& parentContext,
0329 bool isEvent,
0330 bool isTryToContinue) const noexcept;
0331 void checkForShouldTryToContinue(ModuleDescription const&);
0332
0333 template <bool IS_EVENT>
0334 bool setPassed() {
0335 if (IS_EVENT) {
0336 timesPassed_.fetch_add(1, std::memory_order_relaxed);
0337 }
0338 state_ = Pass;
0339 return true;
0340 }
0341
0342 template <bool IS_EVENT>
0343 bool setFailed() {
0344 if (IS_EVENT) {
0345 timesFailed_.fetch_add(1, std::memory_order_relaxed);
0346 }
0347 state_ = Fail;
0348 return false;
0349 }
0350
0351 template <bool IS_EVENT>
0352 std::exception_ptr setException(std::exception_ptr iException) {
0353 if (IS_EVENT) {
0354 timesExcept_.fetch_add(1, std::memory_order_relaxed);
0355 }
0356 cached_exception_ = iException;
0357 state_ = Exception;
0358 return cached_exception_;
0359 }
0360
0361 template <typename T>
0362 void prefetchAsync(WaitingTaskHolder,
0363 ServiceToken const&,
0364 ParentContext const&,
0365 typename T::TransitionInfoType const&,
0366 Transition) noexcept;
0367
0368 void esPrefetchAsync(WaitingTaskHolder, EventSetupImpl const&, Transition, ServiceToken const&) noexcept;
0369 void edPrefetchAsync(WaitingTaskHolder, ServiceToken const&, Principal const&) const noexcept;
0370
0371 bool needsESPrefetching(Transition iTrans) const noexcept {
0372 return iTrans < edm::Transition::NumberOfEventSetupTransitions ? not esItemsToGetFrom(iTrans).empty() : false;
0373 }
0374
0375 void emitPostModuleEventPrefetchingSignal() {
0376 actReg_->postModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
0377 }
0378
0379 void emitPostModuleStreamPrefetchingSignal() {
0380 actReg_->postModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(),
0381 moduleCallingContext_);
0382 }
0383
0384 void emitPostModuleGlobalPrefetchingSignal() {
0385 actReg_->postModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(),
0386 moduleCallingContext_);
0387 }
0388
0389 virtual bool hasAcquire() const noexcept = 0;
0390
0391 template <typename T>
0392 std::exception_ptr runModuleAfterAsyncPrefetch(std::exception_ptr,
0393 typename T::TransitionInfoType const&,
0394 StreamID,
0395 ParentContext const&,
0396 typename T::Context const*) noexcept;
0397
0398 void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
0399
0400 void runAcquireAfterAsyncPrefetch(std::exception_ptr,
0401 EventTransitionInfo const&,
0402 ParentContext const&,
0403 WaitingTaskWithArenaHolder) noexcept;
0404
0405 std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
0406 ParentContext const& parentContext) noexcept;
0407
0408 template <typename T>
0409 class RunModuleTask : public WaitingTask {
0410 public:
0411 RunModuleTask(Worker* worker,
0412 typename T::TransitionInfoType const& transitionInfo,
0413 ServiceToken const& token,
0414 StreamID streamID,
0415 ParentContext const& parentContext,
0416 typename T::Context const* context,
0417 oneapi::tbb::task_group* iGroup) noexcept
0418 : m_worker(worker),
0419 m_transitionInfo(transitionInfo),
0420 m_streamID(streamID),
0421 m_parentContext(parentContext),
0422 m_context(context),
0423 m_serviceToken(token),
0424 m_group(iGroup) {}
0425
0426 struct EnableQueueGuard {
0427 SerialTaskQueue* queue_;
0428 EnableQueueGuard(SerialTaskQueue* iQueue) : queue_{iQueue} {}
0429 EnableQueueGuard(EnableQueueGuard const&) = delete;
0430 EnableQueueGuard& operator=(EnableQueueGuard const&) = delete;
0431 EnableQueueGuard& operator=(EnableQueueGuard&&) = delete;
0432 EnableQueueGuard(EnableQueueGuard&& iGuard) : queue_{iGuard.queue_} { iGuard.queue_ = nullptr; }
0433 ~EnableQueueGuard() {
0434 if (queue_) {
0435 queue_->resume();
0436 }
0437 }
0438 };
0439
0440 void execute() final {
0441
0442 ServiceRegistry::Operate guard(m_serviceToken.lock());
0443
0444
0445
0446 std::exception_ptr temp_excptr;
0447 auto excptr = exceptionPtr();
0448 if constexpr (T::isEvent_) {
0449 if (!m_worker->hasAcquire()) {
0450
0451 CMS_SA_ALLOW try {
0452
0453 m_worker->emitPostModuleEventPrefetchingSignal();
0454 } catch (...) {
0455 temp_excptr = std::current_exception();
0456 if (not excptr) {
0457 excptr = temp_excptr;
0458 }
0459 }
0460 }
0461 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
0462 m_worker->emitPostModuleStreamPrefetchingSignal();
0463 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
0464 m_worker->emitPostModuleGlobalPrefetchingSignal();
0465 }
0466
0467 if (not excptr) {
0468 if (auto queue = m_worker->serializeRunModule()) {
0469 auto f = [worker = m_worker,
0470 info = m_transitionInfo,
0471 streamID = m_streamID,
0472 parentContext = m_parentContext,
0473 sContext = m_context,
0474 serviceToken = m_serviceToken]() {
0475
0476 ServiceRegistry::Operate operateRunModule(serviceToken.lock());
0477
0478
0479
0480
0481 EnableQueueGuard enableQueueGuard{workerhelper::CallImpl<T>::enableGlobalQueue(worker)};
0482 std::exception_ptr ptr;
0483 worker->template runModuleAfterAsyncPrefetch<T>(ptr, info, streamID, parentContext, sContext);
0484 };
0485
0486 auto gQueue = workerhelper::CallImpl<T>::pauseGlobalQueue(m_worker);
0487 if (gQueue) {
0488 gQueue->push(*m_group, [queue, gQueue, f, group = m_group]() mutable {
0489 gQueue->pause();
0490 queue.push(*group, std::move(f));
0491 });
0492 } else {
0493 queue.push(*m_group, std::move(f));
0494 }
0495 return;
0496 }
0497 }
0498
0499 m_worker->runModuleAfterAsyncPrefetch<T>(excptr, m_transitionInfo, m_streamID, m_parentContext, m_context);
0500 }
0501
0502 private:
0503 Worker* m_worker;
0504 typename T::TransitionInfoType m_transitionInfo;
0505 StreamID m_streamID;
0506 ParentContext const m_parentContext;
0507 typename T::Context const* m_context;
0508 ServiceWeakToken m_serviceToken;
0509 oneapi::tbb::task_group* m_group;
0510 };
0511
0512
0513
0514
0515
0516 template <typename T, typename DUMMY = void>
0517 class AcquireTask : public WaitingTask {
0518 public:
0519 AcquireTask(Worker*,
0520 typename T::TransitionInfoType const&,
0521 ServiceToken const&,
0522 ParentContext const&,
0523 WaitingTaskWithArenaHolder) noexcept {}
0524 void execute() final {}
0525 };
0526
0527 template <typename DUMMY>
0528 class AcquireTask<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>, DUMMY> : public WaitingTask {
0529 public:
0530 AcquireTask(Worker* worker,
0531 EventTransitionInfo const& eventTransitionInfo,
0532 ServiceToken const& token,
0533 ParentContext const& parentContext,
0534 WaitingTaskWithArenaHolder holder) noexcept
0535 : m_worker(worker),
0536 m_eventTransitionInfo(eventTransitionInfo),
0537 m_parentContext(parentContext),
0538 m_holder(std::move(holder)),
0539 m_serviceToken(token) {}
0540
0541 void execute() final {
0542
0543 ServiceRegistry::Operate guard(m_serviceToken.lock());
0544
0545
0546
0547 std::exception_ptr temp_excptr;
0548 auto excptr = exceptionPtr();
0549
0550 CMS_SA_ALLOW try {
0551
0552 m_worker->emitPostModuleEventPrefetchingSignal();
0553 } catch (...) {
0554 temp_excptr = std::current_exception();
0555 if (not excptr) {
0556 excptr = temp_excptr;
0557 }
0558 }
0559
0560 if (not excptr) {
0561 if (auto queue = m_worker->serializeRunModule()) {
0562 queue.push(*m_holder.group(),
0563 [worker = m_worker,
0564 info = m_eventTransitionInfo,
0565 parentContext = m_parentContext,
0566 serviceToken = m_serviceToken,
0567 holder = m_holder]() {
0568
0569 ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
0570
0571 std::exception_ptr ptr;
0572 worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
0573 });
0574 return;
0575 }
0576 }
0577
0578 m_worker->runAcquireAfterAsyncPrefetch(excptr, m_eventTransitionInfo, m_parentContext, std::move(m_holder));
0579 }
0580
0581 private:
0582 Worker* m_worker;
0583 EventTransitionInfo m_eventTransitionInfo;
0584 ParentContext const m_parentContext;
0585 WaitingTaskWithArenaHolder m_holder;
0586 ServiceWeakToken m_serviceToken;
0587 };
0588
0589
0590
0591
0592 class HandleExternalWorkExceptionTask : public WaitingTask {
0593 public:
0594 HandleExternalWorkExceptionTask(Worker* worker,
0595 oneapi::tbb::task_group* group,
0596 WaitingTask* runModuleTask,
0597 ParentContext const& parentContext) noexcept;
0598
0599 void execute() final;
0600
0601 private:
0602 Worker* m_worker;
0603 WaitingTask* m_runModuleTask;
0604 oneapi::tbb::task_group* m_group;
0605 ParentContext const m_parentContext;
0606 };
0607
0608 std::atomic<int> timesRun_;
0609 std::atomic<int> timesVisited_;
0610 std::atomic<int> timesPassed_;
0611 std::atomic<int> timesFailed_;
0612 std::atomic<int> timesExcept_;
0613 std::atomic<State> state_;
0614 int numberOfPathsOn_;
0615 std::atomic<int> numberOfPathsLeftToRun_;
0616
0617 ModuleCallingContext moduleCallingContext_;
0618
0619 ExceptionToActionTable const* actions_;
0620 CMS_THREAD_GUARD(state_) std::exception_ptr cached_exception_;
0621
0622 std::shared_ptr<ActivityRegistry> actReg_;
0623
0624 edm::propagate_const<EarlyDeleteHelper*> earlyDeleteHelper_;
0625
0626 edm::WaitingTaskList waitingTasks_;
0627 std::atomic<bool> workStarted_;
0628 bool ranAcquireWithoutException_;
0629 bool moduleValid_ = true;
0630 bool shouldTryToContinue_ = false;
0631 bool beginSucceeded_ = false;
0632 };
0633
0634 namespace {
0635 template <typename T>
0636 class ModuleSignalSentry {
0637 public:
0638 ModuleSignalSentry(ActivityRegistry* a,
0639 typename T::Context const* context,
0640 ModuleCallingContext const* moduleCallingContext)
0641 : a_(a), context_(context), moduleCallingContext_(moduleCallingContext) {}
0642
0643 ~ModuleSignalSentry() {
0644
0645
0646
0647
0648 CMS_SA_ALLOW try {
0649 if (a_) {
0650 T::postModuleSignal(a_, context_, moduleCallingContext_);
0651 }
0652 } catch (...) {
0653 }
0654 }
0655 void preModuleSignal() {
0656 if (a_) {
0657 try {
0658 convertException::wrap([this]() { T::preModuleSignal(a_, context_, moduleCallingContext_); });
0659 } catch (cms::Exception& ex) {
0660 ex.addContext("Handling pre module signal, likely in a service function immediately before module method");
0661 throw;
0662 }
0663 }
0664 }
0665 void postModuleSignal() {
0666 if (a_) {
0667 auto temp = a_;
0668
0669
0670 a_ = nullptr;
0671 try {
0672 convertException::wrap([this, temp]() { T::postModuleSignal(temp, context_, moduleCallingContext_); });
0673 } catch (cms::Exception& ex) {
0674 ex.addContext("Handling post module signal, likely in a service function immediately after module method");
0675 throw;
0676 }
0677 }
0678 }
0679
0680 private:
0681 ActivityRegistry* a_;
0682 typename T::Context const* context_;
0683 ModuleCallingContext const* moduleCallingContext_;
0684 };
0685
0686 }
0687
0688 namespace workerhelper {
0689 template <>
0690 class CallImpl<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>> {
0691 public:
0692 typedef OccurrenceTraits<EventPrincipal, BranchActionStreamBegin> Arg;
0693 static bool call(Worker* iWorker,
0694 StreamID,
0695 EventTransitionInfo const& info,
0696 ActivityRegistry* ,
0697 ModuleCallingContext const* mcc,
0698 Arg::Context const* ) {
0699
0700 return iWorker->implDo(info, mcc);
0701 }
0702 static void esPrefetchAsync(Worker* worker,
0703 WaitingTaskHolder waitingTask,
0704 ServiceToken const& token,
0705 EventTransitionInfo const& info,
0706 Transition transition) noexcept {
0707 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0708 }
0709 static bool wantsTransition(Worker const* iWorker) noexcept { return true; }
0710 static bool needToRunSelection(Worker const* iWorker) noexcept { return iWorker->implNeedToRunSelection(); }
0711
0712 static SerialTaskQueue* pauseGlobalQueue(Worker*) noexcept { return nullptr; }
0713 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0714 };
0715
0716 template <>
0717 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin>> {
0718 public:
0719 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalBegin> Arg;
0720 static bool call(Worker* iWorker,
0721 StreamID,
0722 RunTransitionInfo const& info,
0723 ActivityRegistry* actReg,
0724 ModuleCallingContext const* mcc,
0725 Arg::Context const* context) {
0726 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0727
0728
0729 cpp.preModuleSignal();
0730
0731
0732 auto returnValue = iWorker->implDoBegin(info, mcc);
0733
0734 cpp.postModuleSignal();
0735 iWorker->beginSucceeded_ = true;
0736 return returnValue;
0737 }
0738 static void esPrefetchAsync(Worker* worker,
0739 WaitingTaskHolder waitingTask,
0740 ServiceToken const& token,
0741 RunTransitionInfo const& info,
0742 Transition transition) noexcept {
0743 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0744 }
0745 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0746 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0747 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0748 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0749 };
0750 template <>
0751 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamBegin>> {
0752 public:
0753 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamBegin> Arg;
0754 static bool call(Worker* iWorker,
0755 StreamID id,
0756 RunTransitionInfo const& info,
0757 ActivityRegistry* actReg,
0758 ModuleCallingContext const* mcc,
0759 Arg::Context const* context) {
0760 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0761 cpp.preModuleSignal();
0762 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0763 cpp.postModuleSignal();
0764 iWorker->beginSucceeded_ = true;
0765 return returnValue;
0766 }
0767 static void esPrefetchAsync(Worker* worker,
0768 WaitingTaskHolder waitingTask,
0769 ServiceToken const& token,
0770 RunTransitionInfo const& info,
0771 Transition transition) noexcept {
0772 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0773 }
0774 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0775 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0776 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0777 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0778 };
0779 template <>
0780 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd>> {
0781 public:
0782 typedef OccurrenceTraits<RunPrincipal, BranchActionGlobalEnd> Arg;
0783 static bool call(Worker* iWorker,
0784 StreamID,
0785 RunTransitionInfo const& info,
0786 ActivityRegistry* actReg,
0787 ModuleCallingContext const* mcc,
0788 Arg::Context const* context) {
0789 if (iWorker->beginSucceeded_) {
0790 iWorker->beginSucceeded_ = false;
0791
0792 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0793 cpp.preModuleSignal();
0794 auto returnValue = iWorker->implDoEnd(info, mcc);
0795 cpp.postModuleSignal();
0796 return returnValue;
0797 }
0798 return true;
0799 }
0800 static void esPrefetchAsync(Worker* worker,
0801 WaitingTaskHolder waitingTask,
0802 ServiceToken const& token,
0803 RunTransitionInfo const& info,
0804 Transition transition) noexcept {
0805 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0806 }
0807 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalRuns(); }
0808 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0809 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0810 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept { return iWorker->globalRunsQueue(); }
0811 };
0812 template <>
0813 class CallImpl<OccurrenceTraits<RunPrincipal, BranchActionStreamEnd>> {
0814 public:
0815 typedef OccurrenceTraits<RunPrincipal, BranchActionStreamEnd> Arg;
0816 static bool call(Worker* iWorker,
0817 StreamID id,
0818 RunTransitionInfo const& info,
0819 ActivityRegistry* actReg,
0820 ModuleCallingContext const* mcc,
0821 Arg::Context const* context) {
0822 if (iWorker->beginSucceeded_) {
0823 iWorker->beginSucceeded_ = false;
0824
0825 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0826 cpp.preModuleSignal();
0827 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0828 cpp.postModuleSignal();
0829 return returnValue;
0830 }
0831 return true;
0832 }
0833 static void esPrefetchAsync(Worker* worker,
0834 WaitingTaskHolder waitingTask,
0835 ServiceToken const& token,
0836 RunTransitionInfo const& info,
0837 Transition transition) noexcept {
0838 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0839 }
0840 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamRuns(); }
0841 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0842 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0843 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0844 };
0845
0846 template <>
0847 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>> {
0848 public:
0849 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalBegin>;
0850 static bool call(Worker* iWorker,
0851 StreamID,
0852 LumiTransitionInfo const& info,
0853 ActivityRegistry* actReg,
0854 ModuleCallingContext const* mcc,
0855 Arg::Context const* context) {
0856 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0857 cpp.preModuleSignal();
0858 auto returnValue = iWorker->implDoBegin(info, mcc);
0859 cpp.postModuleSignal();
0860 iWorker->beginSucceeded_ = true;
0861 return returnValue;
0862 }
0863 static void esPrefetchAsync(Worker* worker,
0864 WaitingTaskHolder waitingTask,
0865 ServiceToken const& token,
0866 LumiTransitionInfo const& info,
0867 Transition transition) noexcept {
0868 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0869 }
0870 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0871 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0872 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept {
0873 return iWorker->globalLuminosityBlocksQueue();
0874 }
0875 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0876 };
0877 template <>
0878 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>> {
0879 public:
0880 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamBegin>;
0881 static bool call(Worker* iWorker,
0882 StreamID id,
0883 LumiTransitionInfo const& info,
0884 ActivityRegistry* actReg,
0885 ModuleCallingContext const* mcc,
0886 Arg::Context const* context) {
0887 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0888 cpp.preModuleSignal();
0889 auto returnValue = iWorker->implDoStreamBegin(id, info, mcc);
0890 cpp.postModuleSignal();
0891 iWorker->beginSucceeded_ = true;
0892 return returnValue;
0893 }
0894 static void esPrefetchAsync(Worker* worker,
0895 WaitingTaskHolder waitingTask,
0896 ServiceToken const& token,
0897 LumiTransitionInfo const& info,
0898 Transition transition) noexcept {
0899 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0900 }
0901 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0902 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0903 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0904 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0905 };
0906
0907 template <>
0908 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>> {
0909 public:
0910 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionGlobalEnd>;
0911 static bool call(Worker* iWorker,
0912 StreamID,
0913 LumiTransitionInfo const& info,
0914 ActivityRegistry* actReg,
0915 ModuleCallingContext const* mcc,
0916 Arg::Context const* context) {
0917 if (iWorker->beginSucceeded_) {
0918 iWorker->beginSucceeded_ = false;
0919
0920 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0921 cpp.preModuleSignal();
0922 auto returnValue = iWorker->implDoEnd(info, mcc);
0923 cpp.postModuleSignal();
0924 return returnValue;
0925 }
0926 return true;
0927 }
0928 static void esPrefetchAsync(Worker* worker,
0929 WaitingTaskHolder waitingTask,
0930 ServiceToken const& token,
0931 LumiTransitionInfo const& info,
0932 Transition transition) noexcept {
0933 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0934 }
0935 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsGlobalLuminosityBlocks(); }
0936 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0937 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0938 static SerialTaskQueue* enableGlobalQueue(Worker* iWorker) noexcept {
0939 return iWorker->globalLuminosityBlocksQueue();
0940 }
0941 };
0942 template <>
0943 class CallImpl<OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>> {
0944 public:
0945 using Arg = OccurrenceTraits<LuminosityBlockPrincipal, BranchActionStreamEnd>;
0946 static bool call(Worker* iWorker,
0947 StreamID id,
0948 LumiTransitionInfo const& info,
0949 ActivityRegistry* actReg,
0950 ModuleCallingContext const* mcc,
0951 Arg::Context const* context) {
0952 if (iWorker->beginSucceeded_) {
0953 iWorker->beginSucceeded_ = false;
0954
0955 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0956 cpp.preModuleSignal();
0957 auto returnValue = iWorker->implDoStreamEnd(id, info, mcc);
0958 cpp.postModuleSignal();
0959 return returnValue;
0960 }
0961 return true;
0962 }
0963 static void esPrefetchAsync(Worker* worker,
0964 WaitingTaskHolder waitingTask,
0965 ServiceToken const& token,
0966 LumiTransitionInfo const& info,
0967 Transition transition) noexcept {
0968 worker->esPrefetchAsync(waitingTask, info.eventSetupImpl(), transition, token);
0969 }
0970 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsStreamLuminosityBlocks(); }
0971 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0972 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0973 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0974 };
0975 template <>
0976 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>> {
0977 public:
0978 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalBegin>;
0979 static bool call(Worker* iWorker,
0980 StreamID,
0981 ProcessBlockTransitionInfo const& info,
0982 ActivityRegistry* actReg,
0983 ModuleCallingContext const* mcc,
0984 Arg::Context const* context) {
0985 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
0986 cpp.preModuleSignal();
0987 auto returnValue = iWorker->implDoBeginProcessBlock(info.principal(), mcc);
0988 cpp.postModuleSignal();
0989 return returnValue;
0990 }
0991 static void esPrefetchAsync(
0992 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
0993 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
0994 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
0995 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
0996 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
0997 };
0998 template <>
0999 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>> {
1000 public:
1001 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionProcessBlockInput>;
1002 static bool call(Worker* iWorker,
1003 StreamID,
1004 ProcessBlockTransitionInfo const& info,
1005 ActivityRegistry* actReg,
1006 ModuleCallingContext const* mcc,
1007 Arg::Context const* context) {
1008 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1009 cpp.preModuleSignal();
1010 auto returnValue = iWorker->implDoAccessInputProcessBlock(info.principal(), mcc);
1011 cpp.postModuleSignal();
1012 return returnValue;
1013 }
1014 static void esPrefetchAsync(
1015 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1016 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsInputProcessBlocks(); }
1017 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1018 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1019 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1020 };
1021 template <>
1022 class CallImpl<OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>> {
1023 public:
1024 using Arg = OccurrenceTraits<ProcessBlockPrincipal, BranchActionGlobalEnd>;
1025 static bool call(Worker* iWorker,
1026 StreamID,
1027 ProcessBlockTransitionInfo const& info,
1028 ActivityRegistry* actReg,
1029 ModuleCallingContext const* mcc,
1030 Arg::Context const* context) {
1031 ModuleSignalSentry<Arg> cpp(actReg, context, mcc);
1032 cpp.preModuleSignal();
1033 auto returnValue = iWorker->implDoEndProcessBlock(info.principal(), mcc);
1034 cpp.postModuleSignal();
1035 return returnValue;
1036 }
1037 static void esPrefetchAsync(
1038 Worker*, WaitingTaskHolder, ServiceToken const&, ProcessBlockTransitionInfo const&, Transition) noexcept {}
1039 static bool wantsTransition(Worker const* iWorker) noexcept { return iWorker->wantsProcessBlocks(); }
1040 static bool needToRunSelection(Worker const* iWorker) noexcept { return false; }
1041 static SerialTaskQueue* pauseGlobalQueue(Worker* iWorker) noexcept { return nullptr; }
1042 static SerialTaskQueue* enableGlobalQueue(Worker*) noexcept { return nullptr; }
1043 };
1044 }
1045
1046 template <typename T>
1047 void Worker::prefetchAsync(WaitingTaskHolder iTask,
1048 ServiceToken const& token,
1049 ParentContext const& parentContext,
1050 typename T::TransitionInfoType const& transitionInfo,
1051 Transition iTransition) noexcept {
1052 Principal const& principal = transitionInfo.principal();
1053
1054 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1055
1056 if constexpr (T::isEvent_) {
1057 actReg_->preModuleEventPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1058 } else if constexpr (std::is_same_v<typename T::Context, StreamContext>) {
1059 actReg_->preModuleStreamPrefetchingSignal_.emit(*moduleCallingContext_.getStreamContext(), moduleCallingContext_);
1060 } else if constexpr (std::is_same_v<typename T::Context, GlobalContext>) {
1061 actReg_->preModuleGlobalPrefetchingSignal_.emit(*moduleCallingContext_.getGlobalContext(), moduleCallingContext_);
1062 }
1063
1064 workerhelper::CallImpl<T>::esPrefetchAsync(this, iTask, token, transitionInfo, iTransition);
1065 edPrefetchAsync(iTask, token, principal);
1066
1067 if (principal.branchType() == InEvent) {
1068 preActionBeforeRunEventAsync(iTask, moduleCallingContext_, principal);
1069 }
1070 }
1071
1072 template <typename T>
1073 void Worker::doWorkAsync(WaitingTaskHolder task,
1074 typename T::TransitionInfoType const& transitionInfo,
1075 ServiceToken const& token,
1076 StreamID streamID,
1077 ParentContext const& parentContext,
1078 typename T::Context const* context) noexcept {
1079 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1080 return;
1081 }
1082
1083
1084 bool expected = false;
1085 bool workStarted = workStarted_.compare_exchange_strong(expected, true);
1086
1087 waitingTasks_.add(task);
1088 if constexpr (T::isEvent_) {
1089 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1090 }
1091
1092 if (workStarted) {
1093 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1094
1095
1096 if (workerhelper::CallImpl<T>::needToRunSelection(this)) {
1097
1098
1099 WaitingTask* moduleTask =
1100 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1101
1102
1103 struct DestroyTask {
1104 DestroyTask(edm::WaitingTask* iTask) noexcept : m_task(iTask) {}
1105
1106 ~DestroyTask() noexcept {
1107 auto p = m_task.exchange(nullptr);
1108 if (p) {
1109 TaskSentry s{p};
1110 }
1111 }
1112
1113 edm::WaitingTask* release() noexcept { return m_task.exchange(nullptr); }
1114
1115 private:
1116 std::atomic<edm::WaitingTask*> m_task;
1117 };
1118 if constexpr (T::isEvent_) {
1119 if (hasAcquire()) {
1120 auto ownRunTask = std::make_shared<DestroyTask>(moduleTask);
1121 ServiceWeakToken weakToken = token;
1122 auto* group = task.group();
1123 moduleTask = make_waiting_task(
1124 [this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1125 WaitingTaskWithArenaHolder runTaskHolder(
1126 *group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
1127 AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
1128 t.execute();
1129 });
1130 }
1131 }
1132 auto* group = task.group();
1133 auto ownModuleTask = std::make_shared<DestroyTask>(moduleTask);
1134 ServiceWeakToken weakToken = token;
1135 auto selectionTask =
1136 make_waiting_task([ownModuleTask, parentContext, info = transitionInfo, weakToken, group, this](
1137 std::exception_ptr const*) mutable {
1138 ServiceRegistry::Operate guard(weakToken.lock());
1139 prefetchAsync<T>(WaitingTaskHolder(*group, ownModuleTask->release()),
1140 weakToken.lock(),
1141 parentContext,
1142 info,
1143 T::transition_);
1144 });
1145 prePrefetchSelectionAsync(*group, selectionTask, token, streamID, &transitionInfo.principal());
1146 } else {
1147 WaitingTask* moduleTask =
1148 new RunModuleTask<T>(this, transitionInfo, token, streamID, parentContext, context, task.group());
1149 auto group = task.group();
1150 if constexpr (T::isEvent_) {
1151 if (hasAcquire()) {
1152 WaitingTaskWithArenaHolder runTaskHolder(
1153 *group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
1154 moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
1155 }
1156 }
1157 prefetchAsync<T>(WaitingTaskHolder(*group, moduleTask), token, parentContext, transitionInfo, T::transition_);
1158 }
1159 }
1160 }
1161
1162 template <typename T>
1163 std::exception_ptr Worker::runModuleAfterAsyncPrefetch(std::exception_ptr iEPtr,
1164 typename T::TransitionInfoType const& transitionInfo,
1165 StreamID streamID,
1166 ParentContext const& parentContext,
1167 typename T::Context const* context) noexcept {
1168 std::exception_ptr exceptionPtr;
1169 bool shouldRun = true;
1170 if (iEPtr) {
1171 if (shouldRethrowException(iEPtr, parentContext, T::isEvent_, shouldTryToContinue_)) {
1172 exceptionPtr = iEPtr;
1173 setException<T::isEvent_>(exceptionPtr);
1174 shouldRun = false;
1175 } else {
1176 if (not shouldTryToContinue_) {
1177 setPassed<T::isEvent_>();
1178 shouldRun = false;
1179 }
1180 }
1181 }
1182 if (shouldRun) {
1183
1184 CMS_SA_ALLOW try { runModule<T>(transitionInfo, streamID, parentContext, context); } catch (...) {
1185 exceptionPtr = std::current_exception();
1186 }
1187 } else {
1188 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
1189 }
1190 waitingTasks_.doneWaiting(exceptionPtr);
1191 return exceptionPtr;
1192 }
1193
1194 template <typename T>
1195 void Worker::doWorkNoPrefetchingAsync(WaitingTaskHolder task,
1196 typename T::TransitionInfoType const& transitionInfo,
1197 ServiceToken const& serviceToken,
1198 StreamID streamID,
1199 ParentContext const& parentContext,
1200 typename T::Context const* context) noexcept {
1201 if (not workerhelper::CallImpl<T>::wantsTransition(this)) {
1202 return;
1203 }
1204
1205
1206 bool expected = false;
1207 auto workStarted = workStarted_.compare_exchange_strong(expected, true);
1208
1209 waitingTasks_.add(task);
1210 if (workStarted) {
1211 ServiceWeakToken weakToken = serviceToken;
1212 auto toDo = [this, info = transitionInfo, streamID, parentContext, context, weakToken]() {
1213 std::exception_ptr exceptionPtr;
1214
1215 CMS_SA_ALLOW try {
1216
1217 ServiceRegistry::Operate guard(weakToken.lock());
1218
1219 this->runModule<T>(info, streamID, parentContext, context);
1220 } catch (...) {
1221 exceptionPtr = std::current_exception();
1222 }
1223 this->waitingTasks_.doneWaiting(exceptionPtr);
1224 };
1225
1226 if (needsESPrefetching(T::transition_)) {
1227 auto group = task.group();
1228 auto afterPrefetch =
1229 edm::make_waiting_task([toDo = std::move(toDo), group, this](std::exception_ptr const* iExcept) {
1230 if (iExcept) {
1231 this->waitingTasks_.doneWaiting(*iExcept);
1232 } else {
1233 if (auto queue = this->serializeRunModule()) {
1234 queue.push(*group, toDo);
1235 } else {
1236 group->run(toDo);
1237 }
1238 }
1239 });
1240 moduleCallingContext_.setContext(ModuleCallingContext::State::kPrefetching, parentContext, nullptr);
1241 esPrefetchAsync(
1242 WaitingTaskHolder(*group, afterPrefetch), transitionInfo.eventSetupImpl(), T::transition_, serviceToken);
1243 } else {
1244 auto group = task.group();
1245 if (auto queue = this->serializeRunModule()) {
1246 queue.push(*group, toDo);
1247 } else {
1248 group->run(toDo);
1249 }
1250 }
1251 }
1252 }
1253
1254 template <typename T>
1255 bool Worker::runModule(typename T::TransitionInfoType const& transitionInfo,
1256 StreamID streamID,
1257 ParentContext const& parentContext,
1258 typename T::Context const* context) {
1259
1260
1261
1262
1263 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
1264 if constexpr (T::isEvent_) {
1265 timesRun_.fetch_add(1, std::memory_order_relaxed);
1266 }
1267
1268 bool rc = true;
1269 try {
1270 convertException::wrap([&]() {
1271 rc = workerhelper::CallImpl<T>::call(
1272 this, streamID, transitionInfo, actReg_.get(), &moduleCallingContext_, context);
1273
1274 if (rc) {
1275 setPassed<T::isEvent_>();
1276 } else {
1277 setFailed<T::isEvent_>();
1278 }
1279 });
1280 } catch (cms::Exception& ex) {
1281 edm::exceptionContext(ex, moduleCallingContext_);
1282 if (shouldRethrowException(std::current_exception(), parentContext, T::isEvent_, shouldTryToContinue_)) {
1283 assert(not cached_exception_);
1284 setException<T::isEvent_>(std::current_exception());
1285 std::rethrow_exception(cached_exception_);
1286 } else {
1287 rc = setPassed<T::isEvent_>();
1288 }
1289 }
1290
1291 return rc;
1292 }
1293
1294 template <typename T>
1295 std::exception_ptr Worker::runModuleDirectly(typename T::TransitionInfoType const& transitionInfo,
1296 StreamID streamID,
1297 ParentContext const& parentContext,
1298 typename T::Context const* context) noexcept {
1299 timesVisited_.fetch_add(1, std::memory_order_relaxed);
1300 std::exception_ptr prefetchingException;
1301 return runModuleAfterAsyncPrefetch<T>(prefetchingException, transitionInfo, streamID, parentContext, context);
1302 }
1303 }
1304 #endif