File indexing completed on 2025-06-29 22:58:05
0001
0002
0003
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016 #include "FWCore/ParameterSet/interface/Registry.h"
0017
0018 namespace edm {
0019
0020 Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0021 : timesRun_(0),
0022 timesVisited_(0),
0023 timesPassed_(0),
0024 timesFailed_(0),
0025 timesExcept_(0),
0026 state_(Ready),
0027 numberOfPathsOn_(0),
0028 numberOfPathsLeftToRun_(0),
0029 moduleCallingContext_(&iMD),
0030 actions_(iActions),
0031 cached_exception_(),
0032 actReg_(),
0033 earlyDeleteHelper_(nullptr),
0034 workStarted_(false),
0035 ranAcquireWithoutException_(false) {
0036 checkForShouldTryToContinue(iMD);
0037 }
0038
0039 Worker::~Worker() {}
0040
0041 void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0042
0043 void Worker::checkForShouldTryToContinue(ModuleDescription const& iDesc) {
0044 auto pset = edm::pset::Registry::instance()->getMapped(iDesc.parameterSetID());
0045 if (pset and pset->exists("@shouldTryToContinue")) {
0046 shouldTryToContinue_ = true;
0047 }
0048 }
0049
0050 bool Worker::shouldRethrowException(std::exception_ptr iPtr,
0051 ParentContext const& parentContext,
0052 bool isEvent,
0053 bool shouldTryToContinue) const noexcept {
0054
0055
0056
0057
0058
0059
0060 if (not isEvent) {
0061 return true;
0062 }
0063 try {
0064 convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0065 } catch (cms::Exception& ex) {
0066 exception_actions::ActionCodes action = actions_->find(ex.category());
0067
0068 if (action == exception_actions::Rethrow) {
0069 return true;
0070 }
0071 if (action == exception_actions::TryToContinue) {
0072 if (shouldTryToContinue) {
0073 edm::printCmsExceptionWarning("TryToContinue", ex);
0074 }
0075 return not shouldTryToContinue;
0076 }
0077 if (action == exception_actions::IgnoreCompletely) {
0078 edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0079 return false;
0080 }
0081 }
0082 return true;
0083 }
0084
0085 void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0086 WaitingTask* successTask,
0087 ServiceToken const& token,
0088 StreamID id,
0089 EventPrincipal const* iPrincipal) noexcept {
0090 successTask->increment_ref_count();
0091
0092 ServiceWeakToken weakToken = token;
0093 auto choiceTask =
0094 edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0095 ServiceRegistry::Operate guard(weakToken.lock());
0096 try {
0097 bool selected = convertException::wrap([&]() {
0098 if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0099 timesRun_.fetch_add(1, std::memory_order_relaxed);
0100 setPassed<true>();
0101 waitingTasks_.doneWaiting(nullptr);
0102
0103 if (0 == successTask->decrement_ref_count()) {
0104 TaskSentry s(successTask);
0105 }
0106 return false;
0107 }
0108 return true;
0109 });
0110 if (not selected) {
0111 return;
0112 }
0113
0114 } catch (cms::Exception& e) {
0115 edm::exceptionContext(e, moduleCallingContext_);
0116 setException<true>(std::current_exception());
0117 waitingTasks_.doneWaiting(std::current_exception());
0118
0119 if (0 == successTask->decrement_ref_count()) {
0120 TaskSentry s(successTask);
0121 }
0122 return;
0123 }
0124 if (0 == successTask->decrement_ref_count()) {
0125 group.run([successTask]() {
0126 TaskSentry s(successTask);
0127 successTask->execute();
0128 });
0129 }
0130 });
0131
0132 WaitingTaskHolder choiceHolder{group, choiceTask};
0133
0134 std::vector<ProductResolverIndexAndSkipBit> items;
0135 itemsToGetForSelection(items);
0136
0137 for (auto const& item : items) {
0138 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0139 bool skipCurrentProcess = item.skipCurrentProcess();
0140 if (productResolverIndex != ProductResolverIndexAmbiguous and
0141 productResolverIndex != ProductResolverIndexInvalid) {
0142 iPrincipal->prefetchAsync(
0143 choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0144 }
0145 }
0146 choiceHolder.doneWaiting(std::exception_ptr{});
0147 }
0148
0149 void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0150 EventSetupImpl const& iImpl,
0151 Transition iTrans,
0152 ServiceToken const& iToken) noexcept {
0153 if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0154 return;
0155 }
0156 auto const& recs = esRecordsToGetFrom(iTrans);
0157 auto const& items = esItemsToGetFrom(iTrans);
0158
0159 assert(items.size() == recs.size());
0160 if (items.empty()) {
0161 return;
0162 }
0163
0164 for (size_t i = 0; i != items.size(); ++i) {
0165 if (recs[i] != ESRecordIndex{}) {
0166 auto rec = iImpl.findImpl(recs[i]);
0167 if (rec) {
0168 rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0169 }
0170 }
0171 }
0172 }
0173
0174 void Worker::edPrefetchAsync(WaitingTaskHolder iTask,
0175 ServiceToken const& token,
0176 Principal const& iPrincipal) const noexcept {
0177
0178 std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0179
0180 for (auto const& item : items) {
0181 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0182 bool skipCurrentProcess = item.skipCurrentProcess();
0183 if (productResolverIndex != ProductResolverIndexAmbiguous) {
0184 iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0185 }
0186 }
0187 }
0188
0189 void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0190
0191 size_t Worker::transformIndex(edm::ProductDescription const&) const noexcept { return -1; }
0192 void Worker::doTransformAsync(WaitingTaskHolder iTask,
0193 size_t iTransformIndex,
0194 EventPrincipal const& iPrincipal,
0195 ServiceToken const& iToken,
0196 StreamID,
0197 ModuleCallingContext const& mcc,
0198 StreamContext const*) noexcept {
0199 ServiceWeakToken weakToken = iToken;
0200
0201
0202 auto task = make_waiting_task(
0203 [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
0204
0205 actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0206 if (iExcept) {
0207 iTask.doneWaiting(*iExcept);
0208 return;
0209 }
0210 implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
0211 });
0212
0213
0214 actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0215 iPrincipal.prefetchAsync(
0216 WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
0217 }
0218
0219 void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0220 ModuleCallingContext temp(iDesc,
0221 0,
0222 moduleCallingContext_.state(),
0223 moduleCallingContext_.parent(),
0224 moduleCallingContext_.previousModuleOnThread());
0225 moduleCallingContext_ = temp;
0226 assert(iDesc);
0227 checkForShouldTryToContinue(*iDesc);
0228 }
0229
0230 void Worker::skipOnPath(EventPrincipal const& iEvent) {
0231 if (earlyDeleteHelper_) {
0232 earlyDeleteHelper_->pathFinished(iEvent);
0233 }
0234 if (0 == --numberOfPathsLeftToRun_) {
0235 waitingTasks_.doneWaiting(cached_exception_);
0236 }
0237 }
0238
0239 void Worker::postDoEvent(EventPrincipal const& iEvent) {
0240 if (earlyDeleteHelper_) {
0241 earlyDeleteHelper_->moduleRan(iEvent);
0242 }
0243 }
0244
0245 void Worker::runAcquire(EventTransitionInfo const& info,
0246 ParentContext const& parentContext,
0247 WaitingTaskHolder holder) {
0248 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0249 try {
0250 convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, std::move(holder)); });
0251 } catch (cms::Exception& ex) {
0252 edm::exceptionContext(ex, moduleCallingContext_);
0253 if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
0254 timesRun_.fetch_add(1, std::memory_order_relaxed);
0255 throw;
0256 }
0257 }
0258 }
0259
0260 void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
0261 EventTransitionInfo const& eventTransitionInfo,
0262 ParentContext const& parentContext,
0263 WaitingTaskHolder holder) noexcept {
0264 ranAcquireWithoutException_ = false;
0265 std::exception_ptr exceptionPtr;
0266 if (iEPtr) {
0267 if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
0268 exceptionPtr = iEPtr;
0269 }
0270 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0271 } else {
0272
0273 CMS_SA_ALLOW try {
0274
0275
0276 runAcquire(eventTransitionInfo, parentContext, holder);
0277 ranAcquireWithoutException_ = true;
0278 } catch (...) {
0279 exceptionPtr = std::current_exception();
0280 }
0281 }
0282
0283 holder.doneWaiting(exceptionPtr);
0284 }
0285
0286 std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr,
0287 ParentContext const& parentContext) noexcept {
0288 if (ranAcquireWithoutException_) {
0289 try {
0290 convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
0291 } catch (cms::Exception& ex) {
0292 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0293 edm::exceptionContext(ex, moduleCallingContext_);
0294 return std::current_exception();
0295 }
0296 }
0297 return iEPtr;
0298 }
0299
0300 Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0301 oneapi::tbb::task_group* group,
0302 WaitingTask* runModuleTask,
0303 ParentContext const& parentContext) noexcept
0304 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0305
0306 void Worker::HandleExternalWorkExceptionTask::execute() {
0307 auto excptr = exceptionPtr();
0308 WaitingTaskHolder holder(*m_group, m_runModuleTask);
0309 if (excptr) {
0310 holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0311 }
0312 }
0313 }