File indexing completed on 2025-01-31 02:19:27
0001
0002
0003
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016 #include "FWCore/ParameterSet/interface/Registry.h"
0017
0018 namespace edm {
0019 namespace {
0020 class ModuleBeginJobTraits {
0021 public:
0022 using Context = GlobalContext;
0023 static void preModuleSignal(ActivityRegistry* activityRegistry,
0024 GlobalContext const*,
0025 ModuleCallingContext const* moduleCallingContext) {
0026 activityRegistry->preModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
0027 }
0028 static void postModuleSignal(ActivityRegistry* activityRegistry,
0029 GlobalContext const*,
0030 ModuleCallingContext const* moduleCallingContext) {
0031 activityRegistry->postModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
0032 }
0033 };
0034
0035 class ModuleEndJobTraits {
0036 public:
0037 using Context = GlobalContext;
0038 static void preModuleSignal(ActivityRegistry* activityRegistry,
0039 GlobalContext const*,
0040 ModuleCallingContext const* moduleCallingContext) {
0041 activityRegistry->preModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
0042 }
0043 static void postModuleSignal(ActivityRegistry* activityRegistry,
0044 GlobalContext const*,
0045 ModuleCallingContext const* moduleCallingContext) {
0046 activityRegistry->postModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
0047 }
0048 };
0049
0050 class ModuleBeginStreamTraits {
0051 public:
0052 using Context = StreamContext;
0053 static void preModuleSignal(ActivityRegistry* activityRegistry,
0054 StreamContext const* streamContext,
0055 ModuleCallingContext const* moduleCallingContext) {
0056 activityRegistry->preModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
0057 }
0058 static void postModuleSignal(ActivityRegistry* activityRegistry,
0059 StreamContext const* streamContext,
0060 ModuleCallingContext const* moduleCallingContext) {
0061 activityRegistry->postModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
0062 }
0063 };
0064
0065 class ModuleEndStreamTraits {
0066 public:
0067 using Context = StreamContext;
0068 static void preModuleSignal(ActivityRegistry* activityRegistry,
0069 StreamContext const* streamContext,
0070 ModuleCallingContext const* moduleCallingContext) {
0071 activityRegistry->preModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
0072 }
0073 static void postModuleSignal(ActivityRegistry* activityRegistry,
0074 StreamContext const* streamContext,
0075 ModuleCallingContext const* moduleCallingContext) {
0076 activityRegistry->postModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
0077 }
0078 };
0079
0080 }
0081
0082 Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0083 : timesRun_(0),
0084 timesVisited_(0),
0085 timesPassed_(0),
0086 timesFailed_(0),
0087 timesExcept_(0),
0088 state_(Ready),
0089 numberOfPathsOn_(0),
0090 numberOfPathsLeftToRun_(0),
0091 moduleCallingContext_(&iMD),
0092 actions_(iActions),
0093 cached_exception_(),
0094 actReg_(),
0095 earlyDeleteHelper_(nullptr),
0096 workStarted_(false),
0097 ranAcquireWithoutException_(false) {
0098 checkForShouldTryToContinue(iMD);
0099 }
0100
0101 Worker::~Worker() {}
0102
0103 void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0104
0105 void Worker::checkForShouldTryToContinue(ModuleDescription const& iDesc) {
0106 auto pset = edm::pset::Registry::instance()->getMapped(iDesc.parameterSetID());
0107 if (pset and pset->exists("@shouldTryToContinue")) {
0108 shouldTryToContinue_ = true;
0109 }
0110 }
0111
0112 bool Worker::shouldRethrowException(std::exception_ptr iPtr,
0113 ParentContext const& parentContext,
0114 bool isEvent,
0115 bool shouldTryToContinue) const noexcept {
0116
0117
0118
0119
0120
0121
0122 if (not isEvent) {
0123 return true;
0124 }
0125 try {
0126 convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0127 } catch (cms::Exception& ex) {
0128 exception_actions::ActionCodes action = actions_->find(ex.category());
0129
0130 if (action == exception_actions::Rethrow) {
0131 return true;
0132 }
0133 if (action == exception_actions::TryToContinue) {
0134 if (shouldTryToContinue) {
0135 edm::printCmsExceptionWarning("TryToContinue", ex);
0136 }
0137 return not shouldTryToContinue;
0138 }
0139 if (action == exception_actions::IgnoreCompletely) {
0140 edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0141 return false;
0142 }
0143 }
0144 return true;
0145 }
0146
0147 void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0148 WaitingTask* successTask,
0149 ServiceToken const& token,
0150 StreamID id,
0151 EventPrincipal const* iPrincipal) noexcept {
0152 successTask->increment_ref_count();
0153
0154 ServiceWeakToken weakToken = token;
0155 auto choiceTask =
0156 edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0157 ServiceRegistry::Operate guard(weakToken.lock());
0158 try {
0159 bool selected = convertException::wrap([&]() {
0160 if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0161 timesRun_.fetch_add(1, std::memory_order_relaxed);
0162 setPassed<true>();
0163 waitingTasks_.doneWaiting(nullptr);
0164
0165 if (0 == successTask->decrement_ref_count()) {
0166 TaskSentry s(successTask);
0167 }
0168 return false;
0169 }
0170 return true;
0171 });
0172 if (not selected) {
0173 return;
0174 }
0175
0176 } catch (cms::Exception& e) {
0177 edm::exceptionContext(e, moduleCallingContext_);
0178 setException<true>(std::current_exception());
0179 waitingTasks_.doneWaiting(std::current_exception());
0180
0181 if (0 == successTask->decrement_ref_count()) {
0182 TaskSentry s(successTask);
0183 }
0184 return;
0185 }
0186 if (0 == successTask->decrement_ref_count()) {
0187 group.run([successTask]() {
0188 TaskSentry s(successTask);
0189 successTask->execute();
0190 });
0191 }
0192 });
0193
0194 WaitingTaskHolder choiceHolder{group, choiceTask};
0195
0196 std::vector<ProductResolverIndexAndSkipBit> items;
0197 itemsToGetForSelection(items);
0198
0199 for (auto const& item : items) {
0200 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0201 bool skipCurrentProcess = item.skipCurrentProcess();
0202 if (productResolverIndex != ProductResolverIndexAmbiguous and
0203 productResolverIndex != ProductResolverIndexInvalid) {
0204 iPrincipal->prefetchAsync(
0205 choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0206 }
0207 }
0208 choiceHolder.doneWaiting(std::exception_ptr{});
0209 }
0210
0211 void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0212 EventSetupImpl const& iImpl,
0213 Transition iTrans,
0214 ServiceToken const& iToken) noexcept {
0215 if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0216 return;
0217 }
0218 auto const& recs = esRecordsToGetFrom(iTrans);
0219 auto const& items = esItemsToGetFrom(iTrans);
0220
0221 assert(items.size() == recs.size());
0222 if (items.empty()) {
0223 return;
0224 }
0225
0226 for (size_t i = 0; i != items.size(); ++i) {
0227 if (recs[i] != ESRecordIndex{}) {
0228 auto rec = iImpl.findImpl(recs[i]);
0229 if (rec) {
0230 rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0231 }
0232 }
0233 }
0234 }
0235
0236 void Worker::edPrefetchAsync(WaitingTaskHolder iTask,
0237 ServiceToken const& token,
0238 Principal const& iPrincipal) const noexcept {
0239
0240 std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0241
0242 for (auto const& item : items) {
0243 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0244 bool skipCurrentProcess = item.skipCurrentProcess();
0245 if (productResolverIndex != ProductResolverIndexAmbiguous) {
0246 iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0247 }
0248 }
0249 }
0250
0251 void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0252
0253 size_t Worker::transformIndex(edm::ProductDescription const&) const noexcept { return -1; }
0254 void Worker::doTransformAsync(WaitingTaskHolder iTask,
0255 size_t iTransformIndex,
0256 EventPrincipal const& iPrincipal,
0257 ServiceToken const& iToken,
0258 StreamID,
0259 ModuleCallingContext const& mcc,
0260 StreamContext const*) noexcept {
0261 ServiceWeakToken weakToken = iToken;
0262
0263
0264 auto task = make_waiting_task(
0265 [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
0266
0267 actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0268 if (iExcept) {
0269 iTask.doneWaiting(*iExcept);
0270 return;
0271 }
0272 implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
0273 });
0274
0275
0276 actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0277 iPrincipal.prefetchAsync(
0278 WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
0279 }
0280
0281 void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0282 ModuleCallingContext temp(iDesc,
0283 0,
0284 moduleCallingContext_.state(),
0285 moduleCallingContext_.parent(),
0286 moduleCallingContext_.previousModuleOnThread());
0287 moduleCallingContext_ = temp;
0288 assert(iDesc);
0289 checkForShouldTryToContinue(*iDesc);
0290 }
0291
0292 void Worker::beginJob(GlobalContext const& globalContext) {
0293 ParentContext parentContext(&globalContext);
0294 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0295 ModuleSignalSentry<ModuleBeginJobTraits> sentry(activityRegistry(), &globalContext, &moduleCallingContext_);
0296
0297 try {
0298 convertException::wrap([this, &sentry]() {
0299 beginSucceeded_ = false;
0300 sentry.preModuleSignal();
0301 implBeginJob();
0302 sentry.postModuleSignal();
0303 beginSucceeded_ = true;
0304 });
0305 } catch (cms::Exception& ex) {
0306 exceptionContext(ex, moduleCallingContext_);
0307 throw;
0308 }
0309 }
0310
0311 void Worker::endJob(GlobalContext const& globalContext) {
0312 if (beginSucceeded_) {
0313 beginSucceeded_ = false;
0314
0315 ParentContext parentContext(&globalContext);
0316 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0317 ModuleSignalSentry<ModuleEndJobTraits> sentry(activityRegistry(), &globalContext, &moduleCallingContext_);
0318
0319 try {
0320 convertException::wrap([this, &sentry]() {
0321 sentry.preModuleSignal();
0322 implEndJob();
0323 sentry.postModuleSignal();
0324 });
0325 } catch (cms::Exception& ex) {
0326 exceptionContext(ex, moduleCallingContext_);
0327 throw;
0328 }
0329 }
0330 }
0331
0332 void Worker::beginStream(StreamID streamID, StreamContext const& streamContext) {
0333 ParentContext parentContext(&streamContext);
0334 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0335 ModuleSignalSentry<ModuleBeginStreamTraits> sentry(activityRegistry(), &streamContext, &moduleCallingContext_);
0336
0337 try {
0338 convertException::wrap([this, &sentry, streamID]() {
0339 beginSucceeded_ = false;
0340 sentry.preModuleSignal();
0341 implBeginStream(streamID);
0342 sentry.postModuleSignal();
0343 beginSucceeded_ = true;
0344 });
0345 } catch (cms::Exception& ex) {
0346 exceptionContext(ex, moduleCallingContext_);
0347 throw;
0348 }
0349 }
0350
0351 void Worker::endStream(StreamID id, StreamContext const& streamContext) {
0352 if (beginSucceeded_) {
0353 beginSucceeded_ = false;
0354
0355 ParentContext parentContext(&streamContext);
0356 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0357 ModuleSignalSentry<ModuleEndStreamTraits> sentry(activityRegistry(), &streamContext, &moduleCallingContext_);
0358
0359 try {
0360 convertException::wrap([this, &sentry, id]() {
0361 sentry.preModuleSignal();
0362 implEndStream(id);
0363 sentry.postModuleSignal();
0364 });
0365 } catch (cms::Exception& ex) {
0366 exceptionContext(ex, moduleCallingContext_);
0367 throw;
0368 }
0369 }
0370 }
0371
0372 void Worker::registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) {
0373 try {
0374 implRegisterThinnedAssociations(registry, helper);
0375 } catch (cms::Exception& ex) {
0376 ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
0377 throw ex;
0378 }
0379 }
0380
0381 void Worker::skipOnPath(EventPrincipal const& iEvent) {
0382 if (earlyDeleteHelper_) {
0383 earlyDeleteHelper_->pathFinished(iEvent);
0384 }
0385 if (0 == --numberOfPathsLeftToRun_) {
0386 waitingTasks_.doneWaiting(cached_exception_);
0387 }
0388 }
0389
0390 void Worker::postDoEvent(EventPrincipal const& iEvent) {
0391 if (earlyDeleteHelper_) {
0392 earlyDeleteHelper_->moduleRan(iEvent);
0393 }
0394 }
0395
0396 void Worker::runAcquire(EventTransitionInfo const& info,
0397 ParentContext const& parentContext,
0398 WaitingTaskHolder holder) {
0399 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0400 try {
0401 convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, std::move(holder)); });
0402 } catch (cms::Exception& ex) {
0403 edm::exceptionContext(ex, moduleCallingContext_);
0404 if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
0405 timesRun_.fetch_add(1, std::memory_order_relaxed);
0406 throw;
0407 }
0408 }
0409 }
0410
0411 void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
0412 EventTransitionInfo const& eventTransitionInfo,
0413 ParentContext const& parentContext,
0414 WaitingTaskHolder holder) noexcept {
0415 ranAcquireWithoutException_ = false;
0416 std::exception_ptr exceptionPtr;
0417 if (iEPtr) {
0418 if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
0419 exceptionPtr = iEPtr;
0420 }
0421 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0422 } else {
0423
0424 CMS_SA_ALLOW try {
0425
0426
0427 runAcquire(eventTransitionInfo, parentContext, holder);
0428 ranAcquireWithoutException_ = true;
0429 } catch (...) {
0430 exceptionPtr = std::current_exception();
0431 }
0432 }
0433
0434 holder.doneWaiting(exceptionPtr);
0435 }
0436
0437 std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr,
0438 ParentContext const& parentContext) noexcept {
0439 if (ranAcquireWithoutException_) {
0440 try {
0441 convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
0442 } catch (cms::Exception& ex) {
0443 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0444 edm::exceptionContext(ex, moduleCallingContext_);
0445 return std::current_exception();
0446 }
0447 }
0448 return iEPtr;
0449 }
0450
0451 Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0452 oneapi::tbb::task_group* group,
0453 WaitingTask* runModuleTask,
0454 ParentContext const& parentContext) noexcept
0455 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0456
0457 void Worker::HandleExternalWorkExceptionTask::execute() {
0458 auto excptr = exceptionPtr();
0459 WaitingTaskHolder holder(*m_group, m_runModuleTask);
0460 if (excptr) {
0461 holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0462 }
0463 }
0464 }