Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:02:24

0001 
0002 /*----------------------------------------------------------------------
0003 ----------------------------------------------------------------------*/
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016 
0017 namespace edm {
0018   namespace {
0019     class ModuleBeginJobSignalSentry {
0020     public:
0021       ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0022         if (a_)
0023           a_->preModuleBeginJobSignal_(*md_);
0024       }
0025       ~ModuleBeginJobSignalSentry() {
0026         if (a_)
0027           a_->postModuleBeginJobSignal_(*md_);
0028       }
0029 
0030     private:
0031       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0032       ModuleDescription const* md_;
0033     };
0034 
0035     class ModuleEndJobSignalSentry {
0036     public:
0037       ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0038         if (a_)
0039           a_->preModuleEndJobSignal_(*md_);
0040       }
0041       ~ModuleEndJobSignalSentry() {
0042         if (a_)
0043           a_->postModuleEndJobSignal_(*md_);
0044       }
0045 
0046     private:
0047       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0048       ModuleDescription const* md_;
0049     };
0050 
0051     class ModuleBeginStreamSignalSentry {
0052     public:
0053       ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0054           : a_(a), sc_(sc), mcc_(mcc) {
0055         if (a_)
0056           a_->preModuleBeginStreamSignal_(sc_, mcc_);
0057       }
0058       ~ModuleBeginStreamSignalSentry() {
0059         if (a_)
0060           a_->postModuleBeginStreamSignal_(sc_, mcc_);
0061       }
0062 
0063     private:
0064       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0065       StreamContext const& sc_;
0066       ModuleCallingContext const& mcc_;
0067     };
0068 
0069     class ModuleEndStreamSignalSentry {
0070     public:
0071       ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0072           : a_(a), sc_(sc), mcc_(mcc) {
0073         if (a_)
0074           a_->preModuleEndStreamSignal_(sc_, mcc_);
0075       }
0076       ~ModuleEndStreamSignalSentry() {
0077         if (a_)
0078           a_->postModuleEndStreamSignal_(sc_, mcc_);
0079       }
0080 
0081     private:
0082       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0083       StreamContext const& sc_;
0084       ModuleCallingContext const& mcc_;
0085     };
0086 
0087   }  // namespace
0088 
0089   Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0090       : timesRun_(0),
0091         timesVisited_(0),
0092         timesPassed_(0),
0093         timesFailed_(0),
0094         timesExcept_(0),
0095         state_(Ready),
0096         numberOfPathsOn_(0),
0097         numberOfPathsLeftToRun_(0),
0098         moduleCallingContext_(&iMD),
0099         actions_(iActions),
0100         cached_exception_(),
0101         actReg_(),
0102         earlyDeleteHelper_(nullptr),
0103         workStarted_(false),
0104         ranAcquireWithoutException_(false) {}
0105 
0106   Worker::~Worker() {}
0107 
0108   void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0109 
0110   bool Worker::shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const {
0111     // NOTE: the warning printed as a result of ignoring or failing
0112     // a module will only be printed during the full true processing
0113     // pass of this module
0114 
0115     // Get the action corresponding to this exception.  However, if processing
0116     // something other than an event (e.g. run, lumi) always rethrow.
0117     if (not isEvent) {
0118       return true;
0119     }
0120     try {
0121       convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0122     } catch (cms::Exception& ex) {
0123       exception_actions::ActionCodes action = actions_->find(ex.category());
0124 
0125       if (action == exception_actions::Rethrow) {
0126         return true;
0127       }
0128 
0129       ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
0130 
0131       // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
0132       // as IgnoreCompletely, so any subsequent OutputModules are still run.
0133       // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
0134       ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
0135       if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
0136           top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
0137         if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
0138             action == exception_actions::FailPath) {
0139           action = exception_actions::IgnoreCompletely;
0140         }
0141       }
0142       if (action == exception_actions::IgnoreCompletely) {
0143         edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0144         return false;
0145       }
0146     }
0147     return true;
0148   }
0149 
0150   void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0151                                          WaitingTask* successTask,
0152                                          ServiceToken const& token,
0153                                          StreamID id,
0154                                          EventPrincipal const* iPrincipal) {
0155     successTask->increment_ref_count();
0156 
0157     ServiceWeakToken weakToken = token;
0158     auto choiceTask =
0159         edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0160           ServiceRegistry::Operate guard(weakToken.lock());
0161           // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
0162           CMS_SA_ALLOW try {
0163             if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0164               timesRun_.fetch_add(1, std::memory_order_relaxed);
0165               setPassed<true>();
0166               waitingTasks_.doneWaiting(nullptr);
0167               //TBB requires that destroyed tasks have count 0
0168               if (0 == successTask->decrement_ref_count()) {
0169                 TaskSentry s(successTask);
0170               }
0171               return;
0172             }
0173           } catch (...) {
0174           }
0175           if (0 == successTask->decrement_ref_count()) {
0176             group.run([successTask]() {
0177               TaskSentry s(successTask);
0178               successTask->execute();
0179             });
0180           }
0181         });
0182 
0183     WaitingTaskHolder choiceHolder{group, choiceTask};
0184 
0185     std::vector<ProductResolverIndexAndSkipBit> items;
0186     itemsToGetForSelection(items);
0187 
0188     for (auto const& item : items) {
0189       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0190       bool skipCurrentProcess = item.skipCurrentProcess();
0191       if (productResolverIndex != ProductResolverIndexAmbiguous) {
0192         iPrincipal->prefetchAsync(
0193             choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0194       }
0195     }
0196     choiceHolder.doneWaiting(std::exception_ptr{});
0197   }
0198 
0199   void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0200                                EventSetupImpl const& iImpl,
0201                                Transition iTrans,
0202                                ServiceToken const& iToken) {
0203     if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0204       return;
0205     }
0206     auto const& recs = esRecordsToGetFrom(iTrans);
0207     auto const& items = esItemsToGetFrom(iTrans);
0208 
0209     assert(items.size() == recs.size());
0210     if (items.empty()) {
0211       return;
0212     }
0213 
0214     for (size_t i = 0; i != items.size(); ++i) {
0215       if (recs[i] != ESRecordIndex{}) {
0216         auto rec = iImpl.findImpl(recs[i]);
0217         if (rec) {
0218           rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0219         }
0220       }
0221     }
0222   }
0223 
0224   void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
0225     // Prefetch products the module declares it consumes
0226     std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0227 
0228     for (auto const& item : items) {
0229       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0230       bool skipCurrentProcess = item.skipCurrentProcess();
0231       if (productResolverIndex != ProductResolverIndexAmbiguous) {
0232         iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0233       }
0234     }
0235   }
0236 
0237   void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0238 
0239   size_t Worker::transformIndex(edm::BranchDescription const&) const { return -1; }
0240   void Worker::doTransformAsync(WaitingTaskHolder iTask,
0241                                 size_t iTransformIndex,
0242                                 EventPrincipal const& iPrincipal,
0243                                 ServiceToken const& iToken,
0244                                 StreamID,
0245                                 ModuleCallingContext const& mcc,
0246                                 StreamContext const*) {
0247     ServiceWeakToken weakToken = iToken;
0248 
0249     //Need to make the services available early so other services can see them
0250     auto task = make_waiting_task([this, iTask, weakToken, &iPrincipal, iTransformIndex, parent = mcc.parent()](
0251                                       std::exception_ptr const* iExcept) mutable {
0252       if (iExcept) {
0253         iTask.doneWaiting(*iExcept);
0254         return;
0255       }
0256       implDoTransformAsync(iTask, iTransformIndex, iPrincipal, parent, weakToken);
0257     });
0258 
0259     //NOTE: need different ModuleCallingContext. The ProductResolver will copy the context in order to get
0260     // a longer lifetime than this function call.
0261     iPrincipal.prefetchAsync(
0262         WaitingTaskHolder(*iTask.group(), task), itemToGetForTransform(iTransformIndex), false, iToken, &mcc);
0263   }
0264 
0265   void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0266     ModuleCallingContext temp(iDesc,
0267                               moduleCallingContext_.state(),
0268                               moduleCallingContext_.parent(),
0269                               moduleCallingContext_.previousModuleOnThread());
0270     moduleCallingContext_ = temp;
0271   }
0272 
0273   void Worker::beginJob() {
0274     try {
0275       convertException::wrap([&]() {
0276         ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
0277         implBeginJob();
0278       });
0279     } catch (cms::Exception& ex) {
0280       state_ = Exception;
0281       std::ostringstream ost;
0282       ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0283           << "'";
0284       ex.addContext(ost.str());
0285       throw;
0286     }
0287   }
0288 
0289   void Worker::endJob() {
0290     try {
0291       convertException::wrap([&]() {
0292         ModuleDescription const* desc = description();
0293         assert(desc != nullptr);
0294         ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
0295         implEndJob();
0296       });
0297     } catch (cms::Exception& ex) {
0298       state_ = Exception;
0299       std::ostringstream ost;
0300       ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
0301       ex.addContext(ost.str());
0302       throw;
0303     }
0304   }
0305 
0306   void Worker::beginStream(StreamID id, StreamContext& streamContext) {
0307     try {
0308       convertException::wrap([&]() {
0309         streamContext.setTransition(StreamContext::Transition::kBeginStream);
0310         streamContext.setEventID(EventID(0, 0, 0));
0311         streamContext.setRunIndex(RunIndex::invalidRunIndex());
0312         streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0313         streamContext.setTimestamp(Timestamp());
0314         ParentContext parentContext(&streamContext);
0315         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0316         moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0317         ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
0318         implBeginStream(id);
0319       });
0320     } catch (cms::Exception& ex) {
0321       state_ = Exception;
0322       std::ostringstream ost;
0323       ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0324           << "'";
0325       ex.addContext(ost.str());
0326       throw;
0327     }
0328   }
0329 
0330   void Worker::endStream(StreamID id, StreamContext& streamContext) {
0331     try {
0332       convertException::wrap([&]() {
0333         streamContext.setTransition(StreamContext::Transition::kEndStream);
0334         streamContext.setEventID(EventID(0, 0, 0));
0335         streamContext.setRunIndex(RunIndex::invalidRunIndex());
0336         streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0337         streamContext.setTimestamp(Timestamp());
0338         ParentContext parentContext(&streamContext);
0339         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0340         moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0341         ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
0342         implEndStream(id);
0343       });
0344     } catch (cms::Exception& ex) {
0345       state_ = Exception;
0346       std::ostringstream ost;
0347       ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0348           << "'";
0349       ex.addContext(ost.str());
0350       throw;
0351     }
0352   }
0353 
0354   void Worker::registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) {
0355     try {
0356       implRegisterThinnedAssociations(registry, helper);
0357     } catch (cms::Exception& ex) {
0358       ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
0359       throw ex;
0360     }
0361   }
0362 
0363   void Worker::skipOnPath(EventPrincipal const& iEvent) {
0364     if (earlyDeleteHelper_) {
0365       earlyDeleteHelper_->pathFinished(iEvent);
0366     }
0367     if (0 == --numberOfPathsLeftToRun_) {
0368       waitingTasks_.doneWaiting(cached_exception_);
0369     }
0370   }
0371 
0372   void Worker::postDoEvent(EventPrincipal const& iEvent) {
0373     if (earlyDeleteHelper_) {
0374       earlyDeleteHelper_->moduleRan(iEvent);
0375     }
0376   }
0377 
0378   void Worker::runAcquire(EventTransitionInfo const& info,
0379                           ParentContext const& parentContext,
0380                           WaitingTaskWithArenaHolder& holder) {
0381     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0382     try {
0383       convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
0384     } catch (cms::Exception& ex) {
0385       edm::exceptionContext(ex, moduleCallingContext_);
0386       if (shouldRethrowException(std::current_exception(), parentContext, true)) {
0387         timesRun_.fetch_add(1, std::memory_order_relaxed);
0388         throw;
0389       }
0390     }
0391   }
0392 
0393   void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr iEPtr,
0394                                             EventTransitionInfo const& eventTransitionInfo,
0395                                             ParentContext const& parentContext,
0396                                             WaitingTaskWithArenaHolder holder) {
0397     ranAcquireWithoutException_ = false;
0398     std::exception_ptr exceptionPtr;
0399     if (iEPtr) {
0400       if (shouldRethrowException(iEPtr, parentContext, true)) {
0401         exceptionPtr = iEPtr;
0402       }
0403       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0404     } else {
0405       // Caught exception is propagated via WaitingTaskWithArenaHolder
0406       CMS_SA_ALLOW try {
0407         runAcquire(eventTransitionInfo, parentContext, holder);
0408         ranAcquireWithoutException_ = true;
0409       } catch (...) {
0410         exceptionPtr = std::current_exception();
0411       }
0412     }
0413     // It is important this is after runAcquire completely finishes
0414     holder.doneWaiting(exceptionPtr);
0415   }
0416 
0417   std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr iEPtr, ParentContext const& parentContext) {
0418     if (ranAcquireWithoutException_) {
0419       try {
0420         convertException::wrap([iEPtr]() { std::rethrow_exception(iEPtr); });
0421       } catch (cms::Exception& ex) {
0422         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0423         edm::exceptionContext(ex, moduleCallingContext_);
0424         return std::current_exception();
0425       }
0426     }
0427     return iEPtr;
0428   }
0429 
0430   Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0431                                                                            oneapi::tbb::task_group* group,
0432                                                                            WaitingTask* runModuleTask,
0433                                                                            ParentContext const& parentContext)
0434       : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0435 
0436   void Worker::HandleExternalWorkExceptionTask::execute() {
0437     auto excptr = exceptionPtr();
0438     WaitingTaskHolder holder(*m_group, m_runModuleTask);
0439     if (excptr) {
0440       holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0441     }
0442   }
0443 }  // namespace edm