File indexing completed on 2023-03-17 11:02:24
0001
0002
0003
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016
0017 namespace edm {
0018 namespace {
0019 class ModuleBeginJobSignalSentry {
0020 public:
0021 ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0022 if (a_)
0023 a_->preModuleBeginJobSignal_(*md_);
0024 }
0025 ~ModuleBeginJobSignalSentry() {
0026 if (a_)
0027 a_->postModuleBeginJobSignal_(*md_);
0028 }
0029
0030 private:
0031 ActivityRegistry* a_;
0032 ModuleDescription const* md_;
0033 };
0034
0035 class ModuleEndJobSignalSentry {
0036 public:
0037 ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0038 if (a_)
0039 a_->preModuleEndJobSignal_(*md_);
0040 }
0041 ~ModuleEndJobSignalSentry() {
0042 if (a_)
0043 a_->postModuleEndJobSignal_(*md_);
0044 }
0045
0046 private:
0047 ActivityRegistry* a_;
0048 ModuleDescription const* md_;
0049 };
0050
0051 class ModuleBeginStreamSignalSentry {
0052 public:
0053 ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0054 : a_(a), sc_(sc), mcc_(mcc) {
0055 if (a_)
0056 a_->preModuleBeginStreamSignal_(sc_, mcc_);
0057 }
0058 ~ModuleBeginStreamSignalSentry() {
0059 if (a_)
0060 a_->postModuleBeginStreamSignal_(sc_, mcc_);
0061 }
0062
0063 private:
0064 ActivityRegistry* a_;
0065 StreamContext const& sc_;
0066 ModuleCallingContext const& mcc_;
0067 };
0068
0069 class ModuleEndStreamSignalSentry {
0070 public:
0071 ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0072 : a_(a), sc_(sc), mcc_(mcc) {
0073 if (a_)
0074 a_->preModuleEndStreamSignal_(sc_, mcc_);
0075 }
0076 ~ModuleEndStreamSignalSentry() {
0077 if (a_)
0078 a_->postModuleEndStreamSignal_(sc_, mcc_);
0079 }
0080
0081 private:
0082 ActivityRegistry* a_;
0083 StreamContext const& sc_;
0084 ModuleCallingContext const& mcc_;
0085 };
0086
0087 }
0088
0089 Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0090 : timesRun_(0),
0091 timesVisited_(0),
0092 timesPassed_(0),
0093 timesFailed_(0),
0094 timesExcept_(0),
0095 state_(Ready),
0096 numberOfPathsOn_(0),
0097 numberOfPathsLeftToRun_(0),
0098 moduleCallingContext_(&iMD),
0099 actions_(iActions),
0100 cached_exception_(),
0101 actReg_(),
0102 earlyDeleteHelper_(nullptr),
0103 workStarted_(false),
0104 ranAcquireWithoutException_(false) {}
0105
0106 Worker::~Worker() {}
0107
0108 void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0109
0110 bool Worker::shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const {
0111
0112
0113
0114
0115
0116
0117 if (not isEvent) {
0118 return true;
0119 }
0120 try {
0121 convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0122 } catch (cms::Exception& ex) {
0123 exception_actions::ActionCodes action = actions_->find(ex.category());
0124
0125 if (action == exception_actions::Rethrow) {
0126 return true;
0127 }
0128
0129 ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
0130
0131
0132
0133
0134 ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
0135 if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
0136 top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
0137 if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
0138 action == exception_actions::FailPath) {
0139 action = exception_actions::IgnoreCompletely;
0140 }
0141 }
0142 if (action == exception_actions::IgnoreCompletely) {
0143 edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0144 return false;
0145 }
0146 }
0147 return true;
0148 }
0149
0150 void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0151 WaitingTask* successTask,
0152 ServiceToken const& token,
0153 StreamID id,
0154 EventPrincipal const* iPrincipal) {
0155 successTask->increment_ref_count();
0156
0157 ServiceWeakToken weakToken = token;
0158 auto choiceTask =
0159 edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0160 ServiceRegistry::Operate guard(weakToken.lock());
0161
0162 CMS_SA_ALLOW try {
0163 if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0164 timesRun_.fetch_add(1, std::memory_order_relaxed);
0165 setPassed<true>();
0166 waitingTasks_.doneWaiting(nullptr);
0167
0168 if (0 == successTask->decrement_ref_count()) {
0169 TaskSentry s(successTask);
0170 }
0171 return;
0172 }
0173 } catch (...) {
0174 }
0175 if (0 == successTask->decrement_ref_count()) {
0176 group.run([successTask]() {
0177 TaskSentry s(successTask);
0178 successTask->execute();
0179 });
0180 }
0181 });
0182
0183 WaitingTaskHolder choiceHolder{group, choiceTask};
0184
0185 std::vector<ProductResolverIndexAndSkipBit> items;
0186 itemsToGetForSelection(items);
0187
0188 for (auto const& item : items) {
0189 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0190 bool skipCurrentProcess = item.skipCurrentProcess();
0191 if (productResolverIndex != ProductResolverIndexAmbiguous) {
0192 iPrincipal->prefetchAsync(
0193 choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0194 }
0195 }
0196 choiceHolder.doneWaiting(std::exception_ptr{});
0197 }
0198
0199 void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0200 EventSetupImpl const& iImpl,
0201 Transition iTrans,
0202 ServiceToken const& iToken) {
0203 if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0204 return;
0205 }
0206 auto const& recs = esRecordsToGetFrom(iTrans);
0207 auto const& items = esItemsToGetFrom(iTrans);
0208
0209 assert(items.size() == recs.size());
0210 if (items.empty()) {
0211 return;
0212 }
0213
0214 for (size_t i = 0; i != items.size(); ++i) {
0215 if (recs[i] != ESRecordIndex{}) {
0216 auto rec = iImpl.findImpl(recs[i]);
0217 if (rec) {
0218 rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0219 }
0220 }
0221 }
0222 }
0223
0224 void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
0225
0226 std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0227
0228 for (auto const& item : items) {
0229 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0230 bool skipCurrentProcess = item.skipCurrentProcess();
0231 if (productResolverIndex != ProductResolverIndexAmbiguous) {
0232 iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0233 }
0234 }
0235 }
0236
0237 void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0238
0239 size_t Worker::transformIndex(edm::BranchDescription const&) const { return -1; }
0240 void Worker::doTransformAsync(WaitingTaskHolder iTask,
0241 size_t iTransformIndex,
0242 EventPrincipal const& iPrincipal,
0243 ServiceToken const& iToken,
0244 StreamID,
0245 ModuleCallingContext const& mcc,
0246 StreamContext const*) {
0247 ServiceWeakToken weakToken = iToken;
0248
0249
0250 auto task = make_waiting_task([this, iTask, weakToken, &iPrincipal, iTransformIndex, parent = mcc.parent()](
0251 std::exception_ptr const* iExcept) mutable {
0252 if (iExcept) {
0253 iTask.doneWaiting(*iExcept);
0254 return;
0255 }
0256 implDoTransformAsync(iTask, iTransformIndex, iPrincipal, parent, weakToken);
0257 });
0258
0259
0260
0261 iPrincipal.prefetchAsync(
0262 WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
0263 }
0264
0265 void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0266 ModuleCallingContext temp(iDesc,
0267 moduleCallingContext_.state(),
0268 moduleCallingContext_.parent(),
0269 moduleCallingContext_.previousModuleOnThread());
0270 moduleCallingContext_ = temp;
0271 }
0272
0273 void Worker::beginJob() {
0274 try {
0275 convertException::wrap([&]() {
0276 ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
0277 implBeginJob();
0278 });
0279 } catch (cms::Exception& ex) {
0280 state_ = Exception;
0281 std::ostringstream ost;
0282 ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0283 << "'";
0284 ex.addContext(ost.str());
0285 throw;
0286 }
0287 }
0288
0289 void Worker::endJob() {
0290 try {
0291 convertException::wrap([&]() {
0292 ModuleDescription const* desc = description();
0293 assert(desc != nullptr);
0294 ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
0295 implEndJob();
0296 });
0297 } catch (cms::Exception& ex) {
0298 state_ = Exception;
0299 std::ostringstream ost;
0300 ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
0301 ex.addContext(ost.str());
0302 throw;
0303 }
0304 }
0305
0306 void Worker::beginStream(StreamID id, StreamContext& streamContext) {
0307 try {
0308 convertException::wrap([&]() {
0309 streamContext.setTransition(StreamContext::Transition::kBeginStream);
0310 streamContext.setEventID(EventID(0, 0, 0));
0311 streamContext.setRunIndex(RunIndex::invalidRunIndex());
0312 streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0313 streamContext.setTimestamp(Timestamp());
0314 ParentContext parentContext(&streamContext);
0315 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0316 moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0317 ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
0318 implBeginStream(id);
0319 });
0320 } catch (cms::Exception& ex) {
0321 state_ = Exception;
0322 std::ostringstream ost;
0323 ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0324 << "'";
0325 ex.addContext(ost.str());
0326 throw;
0327 }
0328 }
0329
0330 void Worker::endStream(StreamID id, StreamContext& streamContext) {
0331 try {
0332 convertException::wrap([&]() {
0333 streamContext.setTransition(StreamContext::Transition::kEndStream);
0334 streamContext.setEventID(EventID(0, 0, 0));
0335 streamContext.setRunIndex(RunIndex::invalidRunIndex());
0336 streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0337 streamContext.setTimestamp(Timestamp());
0338 ParentContext parentContext(&streamContext);
0339 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0340 moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0341 ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
0342 implEndStream(id);
0343 });
0344 } catch (cms::Exception& ex) {
0345 state_ = Exception;
0346 std::ostringstream ost;
0347 ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0348 << "'";
0349 ex.addContext(ost.str());
0350 throw;
0351 }
0352 }
0353
0354 void Worker::registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) {
0355 try {
0356 implRegisterThinnedAssociations(registry, helper);
0357 } catch (cms::Exception& ex) {
0358 ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
0359 throw ex;
0360 }
0361 }
0362
0363 void Worker::skipOnPath(EventPrincipal const& iEvent) {
0364 if (earlyDeleteHelper_) {
0365 earlyDeleteHelper_->pathFinished(iEvent);
0366 }
0367 if (0 == --numberOfPathsLeftToRun_) {
0368 waitingTasks_.doneWaiting(cached_exception_);
0369 }
0370 }
0371
0372 void Worker::postDoEvent(EventPrincipal const& iEvent) {
0373 if (earlyDeleteHelper_) {
0374 earlyDeleteHelper_->moduleRan(iEvent);
0375 }
0376 }
0377
0378 void Worker::runAcquire(EventTransitionInfo const& info,
0379 ParentContext const& parentContext,
0380 WaitingTaskWithArenaHolder& holder) {
0381 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0382 try {
0383 convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
0384 } catch (cms::Exception& ex) {
0385 edm::exceptionContext(ex, moduleCallingContext_);
0386 if (shouldRethrowException(std::current_exception(), parentContext, true)) {
0387 timesRun_.fetch_add(1, std::memory_order_relaxed);
0388 throw;
0389 }
0390 }
0391 }
0392
0393 void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
0394 EventTransitionInfo const& eventTransitionInfo,
0395 ParentContext const& parentContext,
0396 WaitingTaskWithArenaHolder holder) {
0397 ranAcquireWithoutException_ = false;
0398 std::exception_ptr exceptionPtr;
0399 if (iEPtr) {
0400 if (shouldRethrowException(iEPtr, parentContext, true)) {
0401 exceptionPtr = iEPtr;
0402 }
0403 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0404 } else {
0405
0406 CMS_SA_ALLOW try {
0407 runAcquire(eventTransitionInfo, parentContext, holder);
0408 ranAcquireWithoutException_ = true;
0409 } catch (...) {
0410 exceptionPtr = std::current_exception();
0411 }
0412 }
0413
0414 holder.doneWaiting(exceptionPtr);
0415 }
0416
0417 std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext) {
0418 if (ranAcquireWithoutException_) {
0419 try {
0420 convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
0421 } catch (cms::Exception& ex) {
0422 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0423 edm::exceptionContext(ex, moduleCallingContext_);
0424 return std::current_exception();
0425 }
0426 }
0427 return iEPtr;
0428 }
0429
0430 Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0431 oneapi::tbb::task_group* group,
0432 WaitingTask* runModuleTask,
0433 ParentContext const& parentContext)
0434 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0435
0436 void Worker::HandleExternalWorkExceptionTask::execute() {
0437 auto excptr = exceptionPtr();
0438 WaitingTaskHolder holder(*m_group, m_runModuleTask);
0439 if (excptr) {
0440 holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0441 }
0442 }
0443 }