Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:05

0001 
0002 /*----------------------------------------------------------------------
0003 ----------------------------------------------------------------------*/
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016 #include "FWCore/ParameterSet/interface/Registry.h"
0017 
0018 namespace edm {
0019 
0020   Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0021       : timesRun_(0),
0022         timesVisited_(0),
0023         timesPassed_(0),
0024         timesFailed_(0),
0025         timesExcept_(0),
0026         state_(Ready),
0027         numberOfPathsOn_(0),
0028         numberOfPathsLeftToRun_(0),
0029         moduleCallingContext_(&iMD),
0030         actions_(iActions),
0031         cached_exception_(),
0032         actReg_(),
0033         earlyDeleteHelper_(nullptr),
0034         workStarted_(false),
0035         ranAcquireWithoutException_(false) {
0036     checkForShouldTryToContinue(iMD);
0037   }
0038 
0039   Worker::~Worker() {}
0040 
0041   void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0042 
0043   void Worker::checkForShouldTryToContinue(ModuleDescription const& iDesc) {
0044     auto pset = edm::pset::Registry::instance()->getMapped(iDesc.parameterSetID());
0045     if (pset and pset->exists("@shouldTryToContinue")) {
0046       shouldTryToContinue_ = true;
0047     }
0048   }
0049 
0050   bool Worker::shouldRethrowException(std::exception_ptr iPtr,
0051                                       ParentContext const& parentContext,
0052                                       bool isEvent,
0053                                       bool shouldTryToContinue) const noexcept {
0054     // NOTE: the warning printed as a result of ignoring or failing
0055     // a module will only be printed during the full true processing
0056     // pass of this module
0057 
0058     // Get the action corresponding to this exception.  However, if processing
0059     // something other than an event (e.g. run, lumi) always rethrow.
0060     if (not isEvent) {
0061       return true;
0062     }
0063     try {
0064       convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0065     } catch (cms::Exception& ex) {
0066       exception_actions::ActionCodes action = actions_->find(ex.category());
0067 
0068       if (action == exception_actions::Rethrow) {
0069         return true;
0070       }
0071       if (action == exception_actions::TryToContinue) {
0072         if (shouldTryToContinue) {
0073           edm::printCmsExceptionWarning("TryToContinue", ex);
0074         }
0075         return not shouldTryToContinue;
0076       }
0077       if (action == exception_actions::IgnoreCompletely) {
0078         edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0079         return false;
0080       }
0081     }
0082     return true;
0083   }
0084 
0085   void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0086                                          WaitingTask* successTask,
0087                                          ServiceToken const& token,
0088                                          StreamID id,
0089                                          EventPrincipal const* iPrincipal) noexcept {
0090     successTask->increment_ref_count();
0091 
0092     ServiceWeakToken weakToken = token;
0093     auto choiceTask =
0094         edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0095           ServiceRegistry::Operate guard(weakToken.lock());
0096           try {
0097             bool selected = convertException::wrap([&]() {
0098               if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0099                 timesRun_.fetch_add(1, std::memory_order_relaxed);
0100                 setPassed<true>();
0101                 waitingTasks_.doneWaiting(nullptr);
0102                 //TBB requires that destroyed tasks have count 0
0103                 if (0 == successTask->decrement_ref_count()) {
0104                   TaskSentry s(successTask);
0105                 }
0106                 return false;
0107               }
0108               return true;
0109             });
0110             if (not selected) {
0111               return;
0112             }
0113 
0114           } catch (cms::Exception& e) {
0115             edm::exceptionContext(e, moduleCallingContext_);
0116             setException<true>(std::current_exception());
0117             waitingTasks_.doneWaiting(std::current_exception());
0118             //TBB requires that destroyed tasks have count 0
0119             if (0 == successTask->decrement_ref_count()) {
0120               TaskSentry s(successTask);
0121             }
0122             return;
0123           }
0124           if (0 == successTask->decrement_ref_count()) {
0125             group.run([successTask]() {
0126               TaskSentry s(successTask);
0127               successTask->execute();
0128             });
0129           }
0130         });
0131 
0132     WaitingTaskHolder choiceHolder{group, choiceTask};
0133 
0134     std::vector<ProductResolverIndexAndSkipBit> items;
0135     itemsToGetForSelection(items);
0136 
0137     for (auto const& item : items) {
0138       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0139       bool skipCurrentProcess = item.skipCurrentProcess();
0140       if (productResolverIndex != ProductResolverIndexAmbiguous and
0141           productResolverIndex != ProductResolverIndexInvalid) {
0142         iPrincipal->prefetchAsync(
0143             choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0144       }
0145     }
0146     choiceHolder.doneWaiting(std::exception_ptr{});
0147   }
0148 
0149   void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0150                                EventSetupImpl const& iImpl,
0151                                Transition iTrans,
0152                                ServiceToken const& iToken) noexcept {
0153     if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0154       return;
0155     }
0156     auto const& recs = esRecordsToGetFrom(iTrans);
0157     auto const& items = esItemsToGetFrom(iTrans);
0158 
0159     assert(items.size() == recs.size());
0160     if (items.empty()) {
0161       return;
0162     }
0163 
0164     for (size_t i = 0; i != items.size(); ++i) {
0165       if (recs[i] != ESRecordIndex{}) {
0166         auto rec = iImpl.findImpl(recs[i]);
0167         if (rec) {
0168           rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0169         }
0170       }
0171     }
0172   }
0173 
0174   void Worker::edPrefetchAsync(WaitingTaskHolder iTask,
0175                                ServiceToken const& token,
0176                                Principal const& iPrincipal) const noexcept {
0177     // Prefetch products the module declares it consumes
0178     std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0179 
0180     for (auto const& item : items) {
0181       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0182       bool skipCurrentProcess = item.skipCurrentProcess();
0183       if (productResolverIndex != ProductResolverIndexAmbiguous) {
0184         iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0185       }
0186     }
0187   }
0188 
0189   void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0190 
0191   size_t Worker::transformIndex(edm::ProductDescription const&) const noexcept { return -1; }
0192   void Worker::doTransformAsync(WaitingTaskHolder iTask,
0193                                 size_t iTransformIndex,
0194                                 EventPrincipal const& iPrincipal,
0195                                 ServiceToken const& iToken,
0196                                 StreamID,
0197                                 ModuleCallingContext const& mcc,
0198                                 StreamContext const*) noexcept {
0199     ServiceWeakToken weakToken = iToken;
0200 
0201     //Need to make the services available early so other services can see them
0202     auto task = make_waiting_task(
0203         [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
0204           //post prefetch signal
0205           actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0206           if (iExcept) {
0207             iTask.doneWaiting(*iExcept);
0208             return;
0209           }
0210           implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
0211         });
0212 
0213     //pre prefetch signal
0214     actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0215     iPrincipal.prefetchAsync(
0216         WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
0217   }
0218 
0219   void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0220     ModuleCallingContext temp(iDesc,
0221                               0,
0222                               moduleCallingContext_.state(),
0223                               moduleCallingContext_.parent(),
0224                               moduleCallingContext_.previousModuleOnThread());
0225     moduleCallingContext_ = temp;
0226     assert(iDesc);
0227     checkForShouldTryToContinue(*iDesc);
0228   }
0229 
0230   void Worker::skipOnPath(EventPrincipal const& iEvent) {
0231     if (earlyDeleteHelper_) {
0232       earlyDeleteHelper_->pathFinished(iEvent);
0233     }
0234     if (0 == --numberOfPathsLeftToRun_) {
0235       waitingTasks_.doneWaiting(cached_exception_);
0236     }
0237   }
0238 
0239   void Worker::postDoEvent(EventPrincipal const& iEvent) {
0240     if (earlyDeleteHelper_) {
0241       earlyDeleteHelper_->moduleRan(iEvent);
0242     }
0243   }
0244 
0245   void Worker::runAcquire(EventTransitionInfo const& info,
0246                           ParentContext const& parentContext,
0247                           WaitingTaskHolder holder) {
0248     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0249     try {
0250       convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, std::move(holder)); });
0251     } catch (cms::Exception& ex) {
0252       edm::exceptionContext(ex, moduleCallingContext_);
0253       if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
0254         timesRun_.fetch_add(1, std::memory_order_relaxed);
0255         throw;
0256       }
0257     }
0258   }
0259 
0260   void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
0261                                             EventTransitionInfo const& eventTransitionInfo,
0262                                             ParentContext const& parentContext,
0263                                             WaitingTaskHolder holder) noexcept {
0264     ranAcquireWithoutException_ = false;
0265     std::exception_ptr exceptionPtr;
0266     if (iEPtr) {
0267       if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
0268         exceptionPtr = iEPtr;
0269       }
0270       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0271     } else {
0272       // Caught exception is propagated via WaitingTaskHolder
0273       CMS_SA_ALLOW try {
0274         // holder is copied to runAcquire in order to be independent
0275         // of the lifetime of the WaitingTaskHolder inside runAcquire
0276         runAcquire(eventTransitionInfo, parentContext, holder);
0277         ranAcquireWithoutException_ = true;
0278       } catch (...) {
0279         exceptionPtr = std::current_exception();
0280       }
0281     }
0282     // It is important this is after runAcquire completely finishes
0283     holder.doneWaiting(exceptionPtr);
0284   }
0285 
0286   std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr,
0287                                                          ParentContext const& parentContext) noexcept {
0288     if (ranAcquireWithoutException_) {
0289       try {
0290         convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
0291       } catch (cms::Exception& ex) {
0292         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0293         edm::exceptionContext(ex, moduleCallingContext_);
0294         return std::current_exception();
0295       }
0296     }
0297     return iEPtr;
0298   }
0299 
0300   Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0301                                                                            oneapi::tbb::task_group* group,
0302                                                                            WaitingTask* runModuleTask,
0303                                                                            ParentContext const& parentContext) noexcept
0304       : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0305 
0306   void Worker::HandleExternalWorkExceptionTask::execute() {
0307     auto excptr = exceptionPtr();
0308     WaitingTaskHolder holder(*m_group, m_runModuleTask);
0309     if (excptr) {
0310       holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0311     }
0312   }
0313 }  // namespace edm