Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-11 03:34:15

0001 
0002 /*----------------------------------------------------------------------
0003 ----------------------------------------------------------------------*/
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016 #include "FWCore/ParameterSet/interface/Registry.h"
0017 
0018 namespace edm {
0019   namespace {
0020     class ModuleBeginJobSignalSentry {
0021     public:
0022       ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0023         if (a_)
0024           a_->preModuleBeginJobSignal_(*md_);
0025       }
0026       ~ModuleBeginJobSignalSentry() {
0027         if (a_)
0028           a_->postModuleBeginJobSignal_(*md_);
0029       }
0030 
0031     private:
0032       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0033       ModuleDescription const* md_;
0034     };
0035 
0036     class ModuleEndJobSignalSentry {
0037     public:
0038       ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0039         if (a_)
0040           a_->preModuleEndJobSignal_(*md_);
0041       }
0042       ~ModuleEndJobSignalSentry() {
0043         if (a_)
0044           a_->postModuleEndJobSignal_(*md_);
0045       }
0046 
0047     private:
0048       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0049       ModuleDescription const* md_;
0050     };
0051 
0052     class ModuleBeginStreamSignalSentry {
0053     public:
0054       ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0055           : a_(a), sc_(sc), mcc_(mcc) {
0056         if (a_)
0057           a_->preModuleBeginStreamSignal_(sc_, mcc_);
0058       }
0059       ~ModuleBeginStreamSignalSentry() {
0060         if (a_)
0061           a_->postModuleBeginStreamSignal_(sc_, mcc_);
0062       }
0063 
0064     private:
0065       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0066       StreamContext const& sc_;
0067       ModuleCallingContext const& mcc_;
0068     };
0069 
0070     class ModuleEndStreamSignalSentry {
0071     public:
0072       ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0073           : a_(a), sc_(sc), mcc_(mcc) {
0074         if (a_)
0075           a_->preModuleEndStreamSignal_(sc_, mcc_);
0076       }
0077       ~ModuleEndStreamSignalSentry() {
0078         if (a_)
0079           a_->postModuleEndStreamSignal_(sc_, mcc_);
0080       }
0081 
0082     private:
0083       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0084       StreamContext const& sc_;
0085       ModuleCallingContext const& mcc_;
0086     };
0087 
0088   }  // namespace
0089 
0090   Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0091       : timesRun_(0),
0092         timesVisited_(0),
0093         timesPassed_(0),
0094         timesFailed_(0),
0095         timesExcept_(0),
0096         state_(Ready),
0097         numberOfPathsOn_(0),
0098         numberOfPathsLeftToRun_(0),
0099         moduleCallingContext_(&iMD),
0100         actions_(iActions),
0101         cached_exception_(),
0102         actReg_(),
0103         earlyDeleteHelper_(nullptr),
0104         workStarted_(false),
0105         ranAcquireWithoutException_(false) {
0106     checkForShouldTryToContinue(iMD);
0107   }
0108 
0109   Worker::~Worker() {}
0110 
0111   void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0112 
0113   void Worker::checkForShouldTryToContinue(ModuleDescription const& iDesc) {
0114     auto pset = edm::pset::Registry::instance()->getMapped(iDesc.parameterSetID());
0115     if (pset and pset->exists("@shouldTryToContinue")) {
0116       shouldTryToContinue_ = true;
0117     }
0118   }
0119 
0120   bool Worker::shouldRethrowException(std::exception_ptr iPtr,
0121                                       ParentContext const& parentContext,
0122                                       bool isEvent,
0123                                       bool shouldTryToContinue) const noexcept {
0124     // NOTE: the warning printed as a result of ignoring or failing
0125     // a module will only be printed during the full true processing
0126     // pass of this module
0127 
0128     // Get the action corresponding to this exception.  However, if processing
0129     // something other than an event (e.g. run, lumi) always rethrow.
0130     if (not isEvent) {
0131       return true;
0132     }
0133     try {
0134       convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0135     } catch (cms::Exception& ex) {
0136       exception_actions::ActionCodes action = actions_->find(ex.category());
0137 
0138       if (action == exception_actions::Rethrow) {
0139         return true;
0140       }
0141       if (action == exception_actions::TryToContinue) {
0142         if (shouldTryToContinue) {
0143           edm::printCmsExceptionWarning("TryToContinue", ex);
0144         }
0145         return not shouldTryToContinue;
0146       }
0147       if (action == exception_actions::IgnoreCompletely) {
0148         edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0149         return false;
0150       }
0151     }
0152     return true;
0153   }
0154 
0155   void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0156                                          WaitingTask* successTask,
0157                                          ServiceToken const& token,
0158                                          StreamID id,
0159                                          EventPrincipal const* iPrincipal) noexcept {
0160     successTask->increment_ref_count();
0161 
0162     ServiceWeakToken weakToken = token;
0163     auto choiceTask =
0164         edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0165           ServiceRegistry::Operate guard(weakToken.lock());
0166           try {
0167             bool selected = convertException::wrap([&]() {
0168               if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0169                 timesRun_.fetch_add(1, std::memory_order_relaxed);
0170                 setPassed<true>();
0171                 waitingTasks_.doneWaiting(nullptr);
0172                 //TBB requires that destroyed tasks have count 0
0173                 if (0 == successTask->decrement_ref_count()) {
0174                   TaskSentry s(successTask);
0175                 }
0176                 return false;
0177               }
0178               return true;
0179             });
0180             if (not selected) {
0181               return;
0182             }
0183 
0184           } catch (cms::Exception& e) {
0185             edm::exceptionContext(e, moduleCallingContext_);
0186             setException<true>(std::current_exception());
0187             waitingTasks_.doneWaiting(std::current_exception());
0188             //TBB requires that destroyed tasks have count 0
0189             if (0 == successTask->decrement_ref_count()) {
0190               TaskSentry s(successTask);
0191             }
0192             return;
0193           }
0194           if (0 == successTask->decrement_ref_count()) {
0195             group.run([successTask]() {
0196               TaskSentry s(successTask);
0197               successTask->execute();
0198             });
0199           }
0200         });
0201 
0202     WaitingTaskHolder choiceHolder{group, choiceTask};
0203 
0204     std::vector<ProductResolverIndexAndSkipBit> items;
0205     itemsToGetForSelection(items);
0206 
0207     for (auto const& item : items) {
0208       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0209       bool skipCurrentProcess = item.skipCurrentProcess();
0210       if (productResolverIndex != ProductResolverIndexAmbiguous and
0211           productResolverIndex != ProductResolverIndexInvalid) {
0212         iPrincipal->prefetchAsync(
0213             choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0214       }
0215     }
0216     choiceHolder.doneWaiting(std::exception_ptr{});
0217   }
0218 
0219   void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0220                                EventSetupImpl const& iImpl,
0221                                Transition iTrans,
0222                                ServiceToken const& iToken) noexcept {
0223     if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0224       return;
0225     }
0226     auto const& recs = esRecordsToGetFrom(iTrans);
0227     auto const& items = esItemsToGetFrom(iTrans);
0228 
0229     assert(items.size() == recs.size());
0230     if (items.empty()) {
0231       return;
0232     }
0233 
0234     for (size_t i = 0; i != items.size(); ++i) {
0235       if (recs[i] != ESRecordIndex{}) {
0236         auto rec = iImpl.findImpl(recs[i]);
0237         if (rec) {
0238           rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0239         }
0240       }
0241     }
0242   }
0243 
0244   void Worker::edPrefetchAsync(WaitingTaskHolder iTask,
0245                                ServiceToken const& token,
0246                                Principal const& iPrincipal) const noexcept {
0247     // Prefetch products the module declares it consumes
0248     std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0249 
0250     for (auto const& item : items) {
0251       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0252       bool skipCurrentProcess = item.skipCurrentProcess();
0253       if (productResolverIndex != ProductResolverIndexAmbiguous) {
0254         iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0255       }
0256     }
0257   }
0258 
0259   void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0260 
0261   size_t Worker::transformIndex(edm::BranchDescription const&) const noexcept { return -1; }
0262   void Worker::doTransformAsync(WaitingTaskHolder iTask,
0263                                 size_t iTransformIndex,
0264                                 EventPrincipal const& iPrincipal,
0265                                 ServiceToken const& iToken,
0266                                 StreamID,
0267                                 ModuleCallingContext const& mcc,
0268                                 StreamContext const*) noexcept {
0269     ServiceWeakToken weakToken = iToken;
0270 
0271     //Need to make the services available early so other services can see them
0272     auto task = make_waiting_task(
0273         [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
0274           //post prefetch signal
0275           actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0276           if (iExcept) {
0277             iTask.doneWaiting(*iExcept);
0278             return;
0279           }
0280           implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
0281         });
0282 
0283     //pre prefetch signal
0284     actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0285     iPrincipal.prefetchAsync(
0286         WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
0287   }
0288 
0289   void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0290     ModuleCallingContext temp(iDesc,
0291                               0,
0292                               moduleCallingContext_.state(),
0293                               moduleCallingContext_.parent(),
0294                               moduleCallingContext_.previousModuleOnThread());
0295     moduleCallingContext_ = temp;
0296     assert(iDesc);
0297     checkForShouldTryToContinue(*iDesc);
0298   }
0299 
0300   void Worker::beginJob() {
0301     try {
0302       convertException::wrap([&]() {
0303         ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
0304         implBeginJob();
0305       });
0306     } catch (cms::Exception& ex) {
0307       state_ = Exception;
0308       std::ostringstream ost;
0309       ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0310           << "'";
0311       ex.addContext(ost.str());
0312       throw;
0313     }
0314   }
0315 
0316   void Worker::endJob() {
0317     try {
0318       convertException::wrap([&]() {
0319         ModuleDescription const* desc = description();
0320         assert(desc != nullptr);
0321         ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
0322         implEndJob();
0323       });
0324     } catch (cms::Exception& ex) {
0325       state_ = Exception;
0326       std::ostringstream ost;
0327       ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
0328       ex.addContext(ost.str());
0329       throw;
0330     }
0331   }
0332 
0333   void Worker::beginStream(StreamID id, StreamContext& streamContext) {
0334     try {
0335       convertException::wrap([&]() {
0336         streamContext.setTransition(StreamContext::Transition::kBeginStream);
0337         streamContext.setEventID(EventID(0, 0, 0));
0338         streamContext.setRunIndex(RunIndex::invalidRunIndex());
0339         streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0340         streamContext.setTimestamp(Timestamp());
0341         ParentContext parentContext(&streamContext);
0342         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0343         moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0344         ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
0345         implBeginStream(id);
0346       });
0347     } catch (cms::Exception& ex) {
0348       state_ = Exception;
0349       std::ostringstream ost;
0350       ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0351           << "'";
0352       ex.addContext(ost.str());
0353       throw;
0354     }
0355   }
0356 
0357   void Worker::endStream(StreamID id, StreamContext& streamContext) {
0358     try {
0359       convertException::wrap([&]() {
0360         streamContext.setTransition(StreamContext::Transition::kEndStream);
0361         streamContext.setEventID(EventID(0, 0, 0));
0362         streamContext.setRunIndex(RunIndex::invalidRunIndex());
0363         streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0364         streamContext.setTimestamp(Timestamp());
0365         ParentContext parentContext(&streamContext);
0366         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0367         moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0368         ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
0369         implEndStream(id);
0370       });
0371     } catch (cms::Exception& ex) {
0372       state_ = Exception;
0373       std::ostringstream ost;
0374       ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0375           << "'";
0376       ex.addContext(ost.str());
0377       throw;
0378     }
0379   }
0380 
0381   void Worker::registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) {
0382     try {
0383       implRegisterThinnedAssociations(registry, helper);
0384     } catch (cms::Exception& ex) {
0385       ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
0386       throw ex;
0387     }
0388   }
0389 
0390   void Worker::skipOnPath(EventPrincipal const& iEvent) {
0391     if (earlyDeleteHelper_) {
0392       earlyDeleteHelper_->pathFinished(iEvent);
0393     }
0394     if (0 == --numberOfPathsLeftToRun_) {
0395       waitingTasks_.doneWaiting(cached_exception_);
0396     }
0397   }
0398 
0399   void Worker::postDoEvent(EventPrincipal const& iEvent) {
0400     if (earlyDeleteHelper_) {
0401       earlyDeleteHelper_->moduleRan(iEvent);
0402     }
0403   }
0404 
0405   void Worker::runAcquire(EventTransitionInfo const& info,
0406                           ParentContext const& parentContext,
0407                           WaitingTaskWithArenaHolder& holder) {
0408     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0409     try {
0410       convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
0411     } catch (cms::Exception& ex) {
0412       edm::exceptionContext(ex, moduleCallingContext_);
0413       if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
0414         timesRun_.fetch_add(1, std::memory_order_relaxed);
0415         throw;
0416       }
0417     }
0418   }
0419 
0420   void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
0421                                             EventTransitionInfo const& eventTransitionInfo,
0422                                             ParentContext const& parentContext,
0423                                             WaitingTaskWithArenaHolder holder) noexcept {
0424     ranAcquireWithoutException_ = false;
0425     std::exception_ptr exceptionPtr;
0426     if (iEPtr) {
0427       if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
0428         exceptionPtr = iEPtr;
0429       }
0430       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0431     } else {
0432       // Caught exception is propagated via WaitingTaskWithArenaHolder
0433       CMS_SA_ALLOW try {
0434         runAcquire(eventTransitionInfo, parentContext, holder);
0435         ranAcquireWithoutException_ = true;
0436       } catch (...) {
0437         exceptionPtr = std::current_exception();
0438       }
0439     }
0440     // It is important this is after runAcquire completely finishes
0441     holder.doneWaiting(exceptionPtr);
0442   }
0443 
0444   std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr,
0445                                                          ParentContext const& parentContext) noexcept {
0446     if (ranAcquireWithoutException_) {
0447       try {
0448         convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
0449       } catch (cms::Exception& ex) {
0450         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0451         edm::exceptionContext(ex, moduleCallingContext_);
0452         return std::current_exception();
0453       }
0454     }
0455     return iEPtr;
0456   }
0457 
0458   Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0459                                                                            oneapi::tbb::task_group* group,
0460                                                                            WaitingTask* runModuleTask,
0461                                                                            ParentContext const& parentContext) noexcept
0462       : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0463 
0464   void Worker::HandleExternalWorkExceptionTask::execute() {
0465     auto excptr = exceptionPtr();
0466     WaitingTaskHolder holder(*m_group, m_runModuleTask);
0467     if (excptr) {
0468       holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0469     }
0470   }
0471 }  // namespace edm