File indexing completed on 2021-12-24 02:18:45
0001
0002
0003
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016
0017 namespace edm {
0018 namespace {
0019 class ModuleBeginJobSignalSentry {
0020 public:
0021 ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0022 if (a_)
0023 a_->preModuleBeginJobSignal_(*md_);
0024 }
0025 ~ModuleBeginJobSignalSentry() {
0026 if (a_)
0027 a_->postModuleBeginJobSignal_(*md_);
0028 }
0029
0030 private:
0031 ActivityRegistry* a_;
0032 ModuleDescription const* md_;
0033 };
0034
0035 class ModuleEndJobSignalSentry {
0036 public:
0037 ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0038 if (a_)
0039 a_->preModuleEndJobSignal_(*md_);
0040 }
0041 ~ModuleEndJobSignalSentry() {
0042 if (a_)
0043 a_->postModuleEndJobSignal_(*md_);
0044 }
0045
0046 private:
0047 ActivityRegistry* a_;
0048 ModuleDescription const* md_;
0049 };
0050
0051 class ModuleBeginStreamSignalSentry {
0052 public:
0053 ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0054 : a_(a), sc_(sc), mcc_(mcc) {
0055 if (a_)
0056 a_->preModuleBeginStreamSignal_(sc_, mcc_);
0057 }
0058 ~ModuleBeginStreamSignalSentry() {
0059 if (a_)
0060 a_->postModuleBeginStreamSignal_(sc_, mcc_);
0061 }
0062
0063 private:
0064 ActivityRegistry* a_;
0065 StreamContext const& sc_;
0066 ModuleCallingContext const& mcc_;
0067 };
0068
0069 class ModuleEndStreamSignalSentry {
0070 public:
0071 ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0072 : a_(a), sc_(sc), mcc_(mcc) {
0073 if (a_)
0074 a_->preModuleEndStreamSignal_(sc_, mcc_);
0075 }
0076 ~ModuleEndStreamSignalSentry() {
0077 if (a_)
0078 a_->postModuleEndStreamSignal_(sc_, mcc_);
0079 }
0080
0081 private:
0082 ActivityRegistry* a_;
0083 StreamContext const& sc_;
0084 ModuleCallingContext const& mcc_;
0085 };
0086
0087 }
0088
0089 Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0090 : timesRun_(0),
0091 timesVisited_(0),
0092 timesPassed_(0),
0093 timesFailed_(0),
0094 timesExcept_(0),
0095 state_(Ready),
0096 numberOfPathsOn_(0),
0097 numberOfPathsLeftToRun_(0),
0098 moduleCallingContext_(&iMD),
0099 actions_(iActions),
0100 cached_exception_(),
0101 actReg_(),
0102 earlyDeleteHelper_(nullptr),
0103 workStarted_(false),
0104 ranAcquireWithoutException_(false) {}
0105
0106 Worker::~Worker() {}
0107
0108 void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0109
0110 bool Worker::shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const {
0111
0112
0113
0114
0115
0116
0117 if (not isEvent) {
0118 return true;
0119 }
0120 try {
0121 convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0122 } catch (cms::Exception& ex) {
0123 exception_actions::ActionCodes action = actions_->find(ex.category());
0124
0125 if (action == exception_actions::Rethrow) {
0126 return true;
0127 }
0128
0129 ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
0130
0131
0132
0133
0134 ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
0135 if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
0136 top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
0137 if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
0138 action == exception_actions::FailPath) {
0139 action = exception_actions::IgnoreCompletely;
0140 }
0141 }
0142 if (action == exception_actions::IgnoreCompletely) {
0143 edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0144 return false;
0145 }
0146 }
0147 return true;
0148 }
0149
0150 void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0151 WaitingTask* successTask,
0152 ServiceToken const& token,
0153 StreamID id,
0154 EventPrincipal const* iPrincipal) {
0155 successTask->increment_ref_count();
0156
0157 ServiceWeakToken weakToken = token;
0158 auto choiceTask =
0159 edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0160 ServiceRegistry::Operate guard(weakToken.lock());
0161
0162 CMS_SA_ALLOW try {
0163 if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0164 timesRun_.fetch_add(1, std::memory_order_relaxed);
0165 setPassed<true>();
0166 waitingTasks_.doneWaiting(nullptr);
0167
0168 if (0 == successTask->decrement_ref_count()) {
0169 TaskSentry s(successTask);
0170 }
0171 return;
0172 }
0173 } catch (...) {
0174 }
0175 if (0 == successTask->decrement_ref_count()) {
0176 group.run([successTask]() {
0177 TaskSentry s(successTask);
0178 successTask->execute();
0179 });
0180 }
0181 });
0182
0183 WaitingTaskHolder choiceHolder{group, choiceTask};
0184
0185 std::vector<ProductResolverIndexAndSkipBit> items;
0186 itemsToGetForSelection(items);
0187
0188 for (auto const& item : items) {
0189 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0190 bool skipCurrentProcess = item.skipCurrentProcess();
0191 if (productResolverIndex != ProductResolverIndexAmbiguous) {
0192 iPrincipal->prefetchAsync(
0193 choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0194 }
0195 }
0196 choiceHolder.doneWaiting(std::exception_ptr{});
0197 }
0198
0199 void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0200 EventSetupImpl const& iImpl,
0201 Transition iTrans,
0202 ServiceToken const& iToken) {
0203 if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0204 return;
0205 }
0206 auto const& recs = esRecordsToGetFrom(iTrans);
0207 auto const& items = esItemsToGetFrom(iTrans);
0208
0209 assert(items.size() == recs.size());
0210 if (items.empty()) {
0211 return;
0212 }
0213
0214 for (size_t i = 0; i != items.size(); ++i) {
0215 if (recs[i] != ESRecordIndex{}) {
0216 auto rec = iImpl.findImpl(recs[i]);
0217 if (rec) {
0218 rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0219 }
0220 }
0221 }
0222 }
0223
0224 void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
0225
0226 std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0227
0228 for (auto const& item : items) {
0229 ProductResolverIndex productResolverIndex = item.productResolverIndex();
0230 bool skipCurrentProcess = item.skipCurrentProcess();
0231 if (productResolverIndex != ProductResolverIndexAmbiguous) {
0232 iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0233 }
0234 }
0235 }
0236
0237 void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0238
0239 void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0240 ModuleCallingContext temp(iDesc,
0241 moduleCallingContext_.state(),
0242 moduleCallingContext_.parent(),
0243 moduleCallingContext_.previousModuleOnThread());
0244 moduleCallingContext_ = temp;
0245 }
0246
0247 void Worker::beginJob() {
0248 try {
0249 convertException::wrap([&]() {
0250 ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
0251 implBeginJob();
0252 });
0253 } catch (cms::Exception& ex) {
0254 state_ = Exception;
0255 std::ostringstream ost;
0256 ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0257 << "'";
0258 ex.addContext(ost.str());
0259 throw;
0260 }
0261 }
0262
0263 void Worker::endJob() {
0264 try {
0265 convertException::wrap([&]() {
0266 ModuleDescription const* desc = description();
0267 assert(desc != nullptr);
0268 ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
0269 implEndJob();
0270 });
0271 } catch (cms::Exception& ex) {
0272 state_ = Exception;
0273 std::ostringstream ost;
0274 ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
0275 ex.addContext(ost.str());
0276 throw;
0277 }
0278 }
0279
0280 void Worker::beginStream(StreamID id, StreamContext& streamContext) {
0281 try {
0282 convertException::wrap([&]() {
0283 streamContext.setTransition(StreamContext::Transition::kBeginStream);
0284 streamContext.setEventID(EventID(0, 0, 0));
0285 streamContext.setRunIndex(RunIndex::invalidRunIndex());
0286 streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0287 streamContext.setTimestamp(Timestamp());
0288 ParentContext parentContext(&streamContext);
0289 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0290 moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0291 ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
0292 implBeginStream(id);
0293 });
0294 } catch (cms::Exception& ex) {
0295 state_ = Exception;
0296 std::ostringstream ost;
0297 ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0298 << "'";
0299 ex.addContext(ost.str());
0300 throw;
0301 }
0302 }
0303
0304 void Worker::endStream(StreamID id, StreamContext& streamContext) {
0305 try {
0306 convertException::wrap([&]() {
0307 streamContext.setTransition(StreamContext::Transition::kEndStream);
0308 streamContext.setEventID(EventID(0, 0, 0));
0309 streamContext.setRunIndex(RunIndex::invalidRunIndex());
0310 streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0311 streamContext.setTimestamp(Timestamp());
0312 ParentContext parentContext(&streamContext);
0313 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0314 moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0315 ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
0316 implEndStream(id);
0317 });
0318 } catch (cms::Exception& ex) {
0319 state_ = Exception;
0320 std::ostringstream ost;
0321 ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0322 << "'";
0323 ex.addContext(ost.str());
0324 throw;
0325 }
0326 }
0327
0328 void Worker::registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) {
0329 try {
0330 implRegisterThinnedAssociations(registry, helper);
0331 } catch (cms::Exception& ex) {
0332 ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
0333 throw ex;
0334 }
0335 }
0336
0337 void Worker::skipOnPath(EventPrincipal const& iEvent) {
0338 if (earlyDeleteHelper_) {
0339 earlyDeleteHelper_->pathFinished(iEvent);
0340 }
0341 if (0 == --numberOfPathsLeftToRun_) {
0342 waitingTasks_.doneWaiting(cached_exception_);
0343 }
0344 }
0345
0346 void Worker::postDoEvent(EventPrincipal const& iEvent) {
0347 if (earlyDeleteHelper_) {
0348 earlyDeleteHelper_->moduleRan(iEvent);
0349 }
0350 }
0351
0352 void Worker::runAcquire(EventTransitionInfo const& info,
0353 ParentContext const& parentContext,
0354 WaitingTaskWithArenaHolder& holder) {
0355 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0356 try {
0357 convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
0358 } catch (cms::Exception& ex) {
0359 edm::exceptionContext(ex, moduleCallingContext_);
0360 if (shouldRethrowException(std::current_exception(), parentContext, true)) {
0361 timesRun_.fetch_add(1, std::memory_order_relaxed);
0362 throw;
0363 }
0364 }
0365 }
0366
0367 void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
0368 EventTransitionInfo const& eventTransitionInfo,
0369 ParentContext const& parentContext,
0370 WaitingTaskWithArenaHolder holder) {
0371 ranAcquireWithoutException_ = false;
0372 std::exception_ptr exceptionPtr;
0373 if (iEPtr) {
0374 assert(*iEPtr);
0375 if (shouldRethrowException(*iEPtr, parentContext, true)) {
0376 exceptionPtr = *iEPtr;
0377 }
0378 moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0379 } else {
0380
0381 CMS_SA_ALLOW try {
0382 runAcquire(eventTransitionInfo, parentContext, holder);
0383 ranAcquireWithoutException_ = true;
0384 } catch (...) {
0385 exceptionPtr = std::current_exception();
0386 }
0387 }
0388
0389 holder.doneWaiting(exceptionPtr);
0390 }
0391
0392 std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr const* iEPtr,
0393 ParentContext const& parentContext) {
0394 if (ranAcquireWithoutException_) {
0395 try {
0396 convertException::wrap([iEPtr]() { std::rethrow_exception(*iEPtr); });
0397 } catch (cms::Exception& ex) {
0398 ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0399 edm::exceptionContext(ex, moduleCallingContext_);
0400 return std::current_exception();
0401 }
0402 }
0403 return *iEPtr;
0404 }
0405
0406 Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0407 oneapi::tbb::task_group* group,
0408 WaitingTask* runModuleTask,
0409 ParentContext const& parentContext)
0410 : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0411
0412 void Worker::HandleExternalWorkExceptionTask::execute() {
0413 auto excptr = exceptionPtr();
0414 WaitingTaskHolder holder(*m_group, m_runModuleTask);
0415 if (excptr) {
0416 holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0417 }
0418 }
0419 }