Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-31 02:19:27

0001 
0002 /*----------------------------------------------------------------------
0003 ----------------------------------------------------------------------*/
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016 #include "FWCore/ParameterSet/interface/Registry.h"
0017 
0018 namespace edm {
0019   namespace {
0020     class ModuleBeginJobTraits {
0021     public:
0022       using Context = GlobalContext;
0023       static void preModuleSignal(ActivityRegistry* activityRegistry,
0024                                   GlobalContext const*,
0025                                   ModuleCallingContext const* moduleCallingContext) {
0026         activityRegistry->preModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
0027       }
0028       static void postModuleSignal(ActivityRegistry* activityRegistry,
0029                                    GlobalContext const*,
0030                                    ModuleCallingContext const* moduleCallingContext) {
0031         activityRegistry->postModuleBeginJobSignal_(*moduleCallingContext->moduleDescription());
0032       }
0033     };
0034 
0035     class ModuleEndJobTraits {
0036     public:
0037       using Context = GlobalContext;
0038       static void preModuleSignal(ActivityRegistry* activityRegistry,
0039                                   GlobalContext const*,
0040                                   ModuleCallingContext const* moduleCallingContext) {
0041         activityRegistry->preModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
0042       }
0043       static void postModuleSignal(ActivityRegistry* activityRegistry,
0044                                    GlobalContext const*,
0045                                    ModuleCallingContext const* moduleCallingContext) {
0046         activityRegistry->postModuleEndJobSignal_(*moduleCallingContext->moduleDescription());
0047       }
0048     };
0049 
0050     class ModuleBeginStreamTraits {
0051     public:
0052       using Context = StreamContext;
0053       static void preModuleSignal(ActivityRegistry* activityRegistry,
0054                                   StreamContext const* streamContext,
0055                                   ModuleCallingContext const* moduleCallingContext) {
0056         activityRegistry->preModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
0057       }
0058       static void postModuleSignal(ActivityRegistry* activityRegistry,
0059                                    StreamContext const* streamContext,
0060                                    ModuleCallingContext const* moduleCallingContext) {
0061         activityRegistry->postModuleBeginStreamSignal_(*streamContext, *moduleCallingContext);
0062       }
0063     };
0064 
0065     class ModuleEndStreamTraits {
0066     public:
0067       using Context = StreamContext;
0068       static void preModuleSignal(ActivityRegistry* activityRegistry,
0069                                   StreamContext const* streamContext,
0070                                   ModuleCallingContext const* moduleCallingContext) {
0071         activityRegistry->preModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
0072       }
0073       static void postModuleSignal(ActivityRegistry* activityRegistry,
0074                                    StreamContext const* streamContext,
0075                                    ModuleCallingContext const* moduleCallingContext) {
0076         activityRegistry->postModuleEndStreamSignal_(*streamContext, *moduleCallingContext);
0077       }
0078     };
0079 
0080   }  // namespace
0081 
0082   Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0083       : timesRun_(0),
0084         timesVisited_(0),
0085         timesPassed_(0),
0086         timesFailed_(0),
0087         timesExcept_(0),
0088         state_(Ready),
0089         numberOfPathsOn_(0),
0090         numberOfPathsLeftToRun_(0),
0091         moduleCallingContext_(&iMD),
0092         actions_(iActions),
0093         cached_exception_(),
0094         actReg_(),
0095         earlyDeleteHelper_(nullptr),
0096         workStarted_(false),
0097         ranAcquireWithoutException_(false) {
0098     checkForShouldTryToContinue(iMD);
0099   }
0100 
0101   Worker::~Worker() {}
0102 
0103   void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0104 
0105   void Worker::checkForShouldTryToContinue(ModuleDescription const& iDesc) {
0106     auto pset = edm::pset::Registry::instance()->getMapped(iDesc.parameterSetID());
0107     if (pset and pset->exists("@shouldTryToContinue")) {
0108       shouldTryToContinue_ = true;
0109     }
0110   }
0111 
0112   bool Worker::shouldRethrowException(std::exception_ptr iPtr,
0113                                       ParentContext const& parentContext,
0114                                       bool isEvent,
0115                                       bool shouldTryToContinue) const noexcept {
0116     // NOTE: the warning printed as a result of ignoring or failing
0117     // a module will only be printed during the full true processing
0118     // pass of this module
0119 
0120     // Get the action corresponding to this exception.  However, if processing
0121     // something other than an event (e.g. run, lumi) always rethrow.
0122     if (not isEvent) {
0123       return true;
0124     }
0125     try {
0126       convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0127     } catch (cms::Exception& ex) {
0128       exception_actions::ActionCodes action = actions_->find(ex.category());
0129 
0130       if (action == exception_actions::Rethrow) {
0131         return true;
0132       }
0133       if (action == exception_actions::TryToContinue) {
0134         if (shouldTryToContinue) {
0135           edm::printCmsExceptionWarning("TryToContinue", ex);
0136         }
0137         return not shouldTryToContinue;
0138       }
0139       if (action == exception_actions::IgnoreCompletely) {
0140         edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0141         return false;
0142       }
0143     }
0144     return true;
0145   }
0146 
0147   void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0148                                          WaitingTask* successTask,
0149                                          ServiceToken const& token,
0150                                          StreamID id,
0151                                          EventPrincipal const* iPrincipal) noexcept {
0152     successTask->increment_ref_count();
0153 
0154     ServiceWeakToken weakToken = token;
0155     auto choiceTask =
0156         edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0157           ServiceRegistry::Operate guard(weakToken.lock());
0158           try {
0159             bool selected = convertException::wrap([&]() {
0160               if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0161                 timesRun_.fetch_add(1, std::memory_order_relaxed);
0162                 setPassed<true>();
0163                 waitingTasks_.doneWaiting(nullptr);
0164                 //TBB requires that destroyed tasks have count 0
0165                 if (0 == successTask->decrement_ref_count()) {
0166                   TaskSentry s(successTask);
0167                 }
0168                 return false;
0169               }
0170               return true;
0171             });
0172             if (not selected) {
0173               return;
0174             }
0175 
0176           } catch (cms::Exception& e) {
0177             edm::exceptionContext(e, moduleCallingContext_);
0178             setException<true>(std::current_exception());
0179             waitingTasks_.doneWaiting(std::current_exception());
0180             //TBB requires that destroyed tasks have count 0
0181             if (0 == successTask->decrement_ref_count()) {
0182               TaskSentry s(successTask);
0183             }
0184             return;
0185           }
0186           if (0 == successTask->decrement_ref_count()) {
0187             group.run([successTask]() {
0188               TaskSentry s(successTask);
0189               successTask->execute();
0190             });
0191           }
0192         });
0193 
0194     WaitingTaskHolder choiceHolder{group, choiceTask};
0195 
0196     std::vector<ProductResolverIndexAndSkipBit> items;
0197     itemsToGetForSelection(items);
0198 
0199     for (auto const& item : items) {
0200       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0201       bool skipCurrentProcess = item.skipCurrentProcess();
0202       if (productResolverIndex != ProductResolverIndexAmbiguous and
0203           productResolverIndex != ProductResolverIndexInvalid) {
0204         iPrincipal->prefetchAsync(
0205             choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0206       }
0207     }
0208     choiceHolder.doneWaiting(std::exception_ptr{});
0209   }
0210 
0211   void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0212                                EventSetupImpl const& iImpl,
0213                                Transition iTrans,
0214                                ServiceToken const& iToken) noexcept {
0215     if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0216       return;
0217     }
0218     auto const& recs = esRecordsToGetFrom(iTrans);
0219     auto const& items = esItemsToGetFrom(iTrans);
0220 
0221     assert(items.size() == recs.size());
0222     if (items.empty()) {
0223       return;
0224     }
0225 
0226     for (size_t i = 0; i != items.size(); ++i) {
0227       if (recs[i] != ESRecordIndex{}) {
0228         auto rec = iImpl.findImpl(recs[i]);
0229         if (rec) {
0230           rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0231         }
0232       }
0233     }
0234   }
0235 
0236   void Worker::edPrefetchAsync(WaitingTaskHolder iTask,
0237                                ServiceToken const& token,
0238                                Principal const& iPrincipal) const noexcept {
0239     // Prefetch products the module declares it consumes
0240     std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0241 
0242     for (auto const& item : items) {
0243       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0244       bool skipCurrentProcess = item.skipCurrentProcess();
0245       if (productResolverIndex != ProductResolverIndexAmbiguous) {
0246         iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0247       }
0248     }
0249   }
0250 
0251   void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0252 
0253   size_t Worker::transformIndex(edm::ProductDescription const&) const noexcept { return -1; }
0254   void Worker::doTransformAsync(WaitingTaskHolder iTask,
0255                                 size_t iTransformIndex,
0256                                 EventPrincipal const& iPrincipal,
0257                                 ServiceToken const& iToken,
0258                                 StreamID,
0259                                 ModuleCallingContext const& mcc,
0260                                 StreamContext const*) noexcept {
0261     ServiceWeakToken weakToken = iToken;
0262 
0263     //Need to make the services available early so other services can see them
0264     auto task = make_waiting_task(
0265         [this, iTask, weakToken, &iPrincipal, iTransformIndex, mcc](std::exception_ptr const* iExcept) mutable {
0266           //post prefetch signal
0267           actReg_->postModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0268           if (iExcept) {
0269             iTask.doneWaiting(*iExcept);
0270             return;
0271           }
0272           implDoTransformAsync(iTask, iTransformIndex, iPrincipal, mcc.parent(), weakToken);
0273         });
0274 
0275     //pre prefetch signal
0276     actReg_->preModuleTransformPrefetchingSignal_.emit(*mcc.getStreamContext(), mcc);
0277     iPrincipal.prefetchAsync(
0278         WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
0279   }
0280 
0281   void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0282     ModuleCallingContext temp(iDesc,
0283                               0,
0284                               moduleCallingContext_.state(),
0285                               moduleCallingContext_.parent(),
0286                               moduleCallingContext_.previousModuleOnThread());
0287     moduleCallingContext_ = temp;
0288     assert(iDesc);
0289     checkForShouldTryToContinue(*iDesc);
0290   }
0291 
0292   void Worker::beginJob(GlobalContext const& globalContext) {
0293     ParentContext parentContext(&globalContext);
0294     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0295     ModuleSignalSentry<ModuleBeginJobTraits> sentry(activityRegistry(), &globalContext, &moduleCallingContext_);
0296 
0297     try {
0298       convertException::wrap([this, &sentry]() {
0299         beginSucceeded_ = false;
0300         sentry.preModuleSignal();
0301         implBeginJob();
0302         sentry.postModuleSignal();
0303         beginSucceeded_ = true;
0304       });
0305     } catch (cms::Exception& ex) {
0306       exceptionContext(ex, moduleCallingContext_);
0307       throw;
0308     }
0309   }
0310 
0311   void Worker::endJob(GlobalContext const& globalContext) {
0312     if (beginSucceeded_) {
0313       beginSucceeded_ = false;
0314 
0315       ParentContext parentContext(&globalContext);
0316       ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0317       ModuleSignalSentry<ModuleEndJobTraits> sentry(activityRegistry(), &globalContext, &moduleCallingContext_);
0318 
0319       try {
0320         convertException::wrap([this, &sentry]() {
0321           sentry.preModuleSignal();
0322           implEndJob();
0323           sentry.postModuleSignal();
0324         });
0325       } catch (cms::Exception& ex) {
0326         exceptionContext(ex, moduleCallingContext_);
0327         throw;
0328       }
0329     }
0330   }
0331 
0332   void Worker::beginStream(StreamID streamID, StreamContext const& streamContext) {
0333     ParentContext parentContext(&streamContext);
0334     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0335     ModuleSignalSentry<ModuleBeginStreamTraits> sentry(activityRegistry(), &streamContext, &moduleCallingContext_);
0336 
0337     try {
0338       convertException::wrap([this, &sentry, streamID]() {
0339         beginSucceeded_ = false;
0340         sentry.preModuleSignal();
0341         implBeginStream(streamID);
0342         sentry.postModuleSignal();
0343         beginSucceeded_ = true;
0344       });
0345     } catch (cms::Exception& ex) {
0346       exceptionContext(ex, moduleCallingContext_);
0347       throw;
0348     }
0349   }
0350 
0351   void Worker::endStream(StreamID id, StreamContext const& streamContext) {
0352     if (beginSucceeded_) {
0353       beginSucceeded_ = false;
0354 
0355       ParentContext parentContext(&streamContext);
0356       ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0357       ModuleSignalSentry<ModuleEndStreamTraits> sentry(activityRegistry(), &streamContext, &moduleCallingContext_);
0358 
0359       try {
0360         convertException::wrap([this, &sentry, id]() {
0361           sentry.preModuleSignal();
0362           implEndStream(id);
0363           sentry.postModuleSignal();
0364         });
0365       } catch (cms::Exception& ex) {
0366         exceptionContext(ex, moduleCallingContext_);
0367         throw;
0368       }
0369     }
0370   }
0371 
0372   void Worker::registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) {
0373     try {
0374       implRegisterThinnedAssociations(registry, helper);
0375     } catch (cms::Exception& ex) {
0376       ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
0377       throw ex;
0378     }
0379   }
0380 
0381   void Worker::skipOnPath(EventPrincipal const& iEvent) {
0382     if (earlyDeleteHelper_) {
0383       earlyDeleteHelper_->pathFinished(iEvent);
0384     }
0385     if (0 == --numberOfPathsLeftToRun_) {
0386       waitingTasks_.doneWaiting(cached_exception_);
0387     }
0388   }
0389 
0390   void Worker::postDoEvent(EventPrincipal const& iEvent) {
0391     if (earlyDeleteHelper_) {
0392       earlyDeleteHelper_->moduleRan(iEvent);
0393     }
0394   }
0395 
0396   void Worker::runAcquire(EventTransitionInfo const& info,
0397                           ParentContext const& parentContext,
0398                           WaitingTaskHolder holder) {
0399     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0400     try {
0401       convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, std::move(holder)); });
0402     } catch (cms::Exception& ex) {
0403       edm::exceptionContext(ex, moduleCallingContext_);
0404       if (shouldRethrowException(std::current_exception(), parentContext, true, shouldTryToContinue_)) {
0405         timesRun_.fetch_add(1, std::memory_order_relaxed);
0406         throw;
0407       }
0408     }
0409   }
0410 
0411   void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
0412                                             EventTransitionInfo const& eventTransitionInfo,
0413                                             ParentContext const& parentContext,
0414                                             WaitingTaskHolder holder) noexcept {
0415     ranAcquireWithoutException_ = false;
0416     std::exception_ptr exceptionPtr;
0417     if (iEPtr) {
0418       if (shouldRethrowException(iEPtr, parentContext, true, shouldTryToContinue_)) {
0419         exceptionPtr = iEPtr;
0420       }
0421       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0422     } else {
0423       // Caught exception is propagated via WaitingTaskHolder
0424       CMS_SA_ALLOW try {
0425         // holder is copied to runAcquire in order to be independent
0426         // of the lifetime of the WaitingTaskHolder inside runAcquire
0427         runAcquire(eventTransitionInfo, parentContext, holder);
0428         ranAcquireWithoutException_ = true;
0429       } catch (...) {
0430         exceptionPtr = std::current_exception();
0431       }
0432     }
0433     // It is important this is after runAcquire completely finishes
0434     holder.doneWaiting(exceptionPtr);
0435   }
0436 
0437   std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr,
0438                                                          ParentContext const& parentContext) noexcept {
0439     if (ranAcquireWithoutException_) {
0440       try {
0441         convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
0442       } catch (cms::Exception& ex) {
0443         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0444         edm::exceptionContext(ex, moduleCallingContext_);
0445         return std::current_exception();
0446       }
0447     }
0448     return iEPtr;
0449   }
0450 
0451   Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0452                                                                            oneapi::tbb::task_group* group,
0453                                                                            WaitingTask* runModuleTask,
0454                                                                            ParentContext const& parentContext) noexcept
0455       : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0456 
0457   void Worker::HandleExternalWorkExceptionTask::execute() {
0458     auto excptr = exceptionPtr();
0459     WaitingTaskHolder holder(*m_group, m_runModuleTask);
0460     if (excptr) {
0461       holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0462     }
0463   }
0464 }  // namespace edm