Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-12-24 02:18:45

0001 
0002 /*----------------------------------------------------------------------
0003 ----------------------------------------------------------------------*/
0004 #include "FWCore/Concurrency/interface/include_first_syncWait.h"
0005 #include "FWCore/Framework/interface/maker/Worker.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/interface/EventPrincipal.h"
0008 #include "FWCore/Framework/interface/EventSetupImpl.h"
0009 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0010 #include "FWCore/Framework/interface/ProcessBlockPrincipal.h"
0011 #include "FWCore/Framework/interface/RunPrincipal.h"
0012 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0013 #include "FWCore/ServiceRegistry/interface/ESParentContext.h"
0014 #include "FWCore/Concurrency/interface/WaitingTask.h"
0015 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0016 
0017 namespace edm {
0018   namespace {
0019     class ModuleBeginJobSignalSentry {
0020     public:
0021       ModuleBeginJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0022         if (a_)
0023           a_->preModuleBeginJobSignal_(*md_);
0024       }
0025       ~ModuleBeginJobSignalSentry() {
0026         if (a_)
0027           a_->postModuleBeginJobSignal_(*md_);
0028       }
0029 
0030     private:
0031       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0032       ModuleDescription const* md_;
0033     };
0034 
0035     class ModuleEndJobSignalSentry {
0036     public:
0037       ModuleEndJobSignalSentry(ActivityRegistry* a, ModuleDescription const& md) : a_(a), md_(&md) {
0038         if (a_)
0039           a_->preModuleEndJobSignal_(*md_);
0040       }
0041       ~ModuleEndJobSignalSentry() {
0042         if (a_)
0043           a_->postModuleEndJobSignal_(*md_);
0044       }
0045 
0046     private:
0047       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0048       ModuleDescription const* md_;
0049     };
0050 
0051     class ModuleBeginStreamSignalSentry {
0052     public:
0053       ModuleBeginStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0054           : a_(a), sc_(sc), mcc_(mcc) {
0055         if (a_)
0056           a_->preModuleBeginStreamSignal_(sc_, mcc_);
0057       }
0058       ~ModuleBeginStreamSignalSentry() {
0059         if (a_)
0060           a_->postModuleBeginStreamSignal_(sc_, mcc_);
0061       }
0062 
0063     private:
0064       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0065       StreamContext const& sc_;
0066       ModuleCallingContext const& mcc_;
0067     };
0068 
0069     class ModuleEndStreamSignalSentry {
0070     public:
0071       ModuleEndStreamSignalSentry(ActivityRegistry* a, StreamContext const& sc, ModuleCallingContext const& mcc)
0072           : a_(a), sc_(sc), mcc_(mcc) {
0073         if (a_)
0074           a_->preModuleEndStreamSignal_(sc_, mcc_);
0075       }
0076       ~ModuleEndStreamSignalSentry() {
0077         if (a_)
0078           a_->postModuleEndStreamSignal_(sc_, mcc_);
0079       }
0080 
0081     private:
0082       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0083       StreamContext const& sc_;
0084       ModuleCallingContext const& mcc_;
0085     };
0086 
0087   }  // namespace
0088 
0089   Worker::Worker(ModuleDescription const& iMD, ExceptionToActionTable const* iActions)
0090       : timesRun_(0),
0091         timesVisited_(0),
0092         timesPassed_(0),
0093         timesFailed_(0),
0094         timesExcept_(0),
0095         state_(Ready),
0096         numberOfPathsOn_(0),
0097         numberOfPathsLeftToRun_(0),
0098         moduleCallingContext_(&iMD),
0099         actions_(iActions),
0100         cached_exception_(),
0101         actReg_(),
0102         earlyDeleteHelper_(nullptr),
0103         workStarted_(false),
0104         ranAcquireWithoutException_(false) {}
0105 
0106   Worker::~Worker() {}
0107 
0108   void Worker::setActivityRegistry(std::shared_ptr<ActivityRegistry> areg) { actReg_ = areg; }
0109 
0110   bool Worker::shouldRethrowException(std::exception_ptr iPtr, ParentContext const& parentContext, bool isEvent) const {
0111     // NOTE: the warning printed as a result of ignoring or failing
0112     // a module will only be printed during the full true processing
0113     // pass of this module
0114 
0115     // Get the action corresponding to this exception.  However, if processing
0116     // something other than an event (e.g. run, lumi) always rethrow.
0117     if (not isEvent) {
0118       return true;
0119     }
0120     try {
0121       convertException::wrap([&]() { std::rethrow_exception(iPtr); });
0122     } catch (cms::Exception& ex) {
0123       exception_actions::ActionCodes action = actions_->find(ex.category());
0124 
0125       if (action == exception_actions::Rethrow) {
0126         return true;
0127       }
0128 
0129       ModuleCallingContext tempContext(description(), ModuleCallingContext::State::kInvalid, parentContext, nullptr);
0130 
0131       // If we are processing an endpath and the module was scheduled, treat SkipEvent or FailPath
0132       // as IgnoreCompletely, so any subsequent OutputModules are still run.
0133       // For unscheduled modules only treat FailPath as IgnoreCompletely but still allow SkipEvent to throw
0134       ModuleCallingContext const* top_mcc = tempContext.getTopModuleCallingContext();
0135       if (top_mcc->type() == ParentContext::Type::kPlaceInPath &&
0136           top_mcc->placeInPathContext()->pathContext()->isEndPath()) {
0137         if ((action == exception_actions::SkipEvent && tempContext.type() == ParentContext::Type::kPlaceInPath) ||
0138             action == exception_actions::FailPath) {
0139           action = exception_actions::IgnoreCompletely;
0140         }
0141       }
0142       if (action == exception_actions::IgnoreCompletely) {
0143         edm::printCmsExceptionWarning("IgnoreCompletely", ex);
0144         return false;
0145       }
0146     }
0147     return true;
0148   }
0149 
0150   void Worker::prePrefetchSelectionAsync(oneapi::tbb::task_group& group,
0151                                          WaitingTask* successTask,
0152                                          ServiceToken const& token,
0153                                          StreamID id,
0154                                          EventPrincipal const* iPrincipal) {
0155     successTask->increment_ref_count();
0156 
0157     ServiceWeakToken weakToken = token;
0158     auto choiceTask =
0159         edm::make_waiting_task([id, successTask, iPrincipal, this, weakToken, &group](std::exception_ptr const*) {
0160           ServiceRegistry::Operate guard(weakToken.lock());
0161           // There is no reasonable place to rethrow, and implDoPrePrefetchSelection() should not throw in the first place.
0162           CMS_SA_ALLOW try {
0163             if (not implDoPrePrefetchSelection(id, *iPrincipal, &moduleCallingContext_)) {
0164               timesRun_.fetch_add(1, std::memory_order_relaxed);
0165               setPassed<true>();
0166               waitingTasks_.doneWaiting(nullptr);
0167               //TBB requires that destroyed tasks have count 0
0168               if (0 == successTask->decrement_ref_count()) {
0169                 TaskSentry s(successTask);
0170               }
0171               return;
0172             }
0173           } catch (...) {
0174           }
0175           if (0 == successTask->decrement_ref_count()) {
0176             group.run([successTask]() {
0177               TaskSentry s(successTask);
0178               successTask->execute();
0179             });
0180           }
0181         });
0182 
0183     WaitingTaskHolder choiceHolder{group, choiceTask};
0184 
0185     std::vector<ProductResolverIndexAndSkipBit> items;
0186     itemsToGetForSelection(items);
0187 
0188     for (auto const& item : items) {
0189       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0190       bool skipCurrentProcess = item.skipCurrentProcess();
0191       if (productResolverIndex != ProductResolverIndexAmbiguous) {
0192         iPrincipal->prefetchAsync(
0193             choiceHolder, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0194       }
0195     }
0196     choiceHolder.doneWaiting(std::exception_ptr{});
0197   }
0198 
0199   void Worker::esPrefetchAsync(WaitingTaskHolder iTask,
0200                                EventSetupImpl const& iImpl,
0201                                Transition iTrans,
0202                                ServiceToken const& iToken) {
0203     if (iTrans >= edm::Transition::NumberOfEventSetupTransitions) {
0204       return;
0205     }
0206     auto const& recs = esRecordsToGetFrom(iTrans);
0207     auto const& items = esItemsToGetFrom(iTrans);
0208 
0209     assert(items.size() == recs.size());
0210     if (items.empty()) {
0211       return;
0212     }
0213 
0214     for (size_t i = 0; i != items.size(); ++i) {
0215       if (recs[i] != ESRecordIndex{}) {
0216         auto rec = iImpl.findImpl(recs[i]);
0217         if (rec) {
0218           rec->prefetchAsync(iTask, items[i], &iImpl, iToken, ESParentContext(&moduleCallingContext_));
0219         }
0220       }
0221     }
0222   }
0223 
0224   void Worker::edPrefetchAsync(WaitingTaskHolder iTask, ServiceToken const& token, Principal const& iPrincipal) const {
0225     // Prefetch products the module declares it consumes
0226     std::vector<ProductResolverIndexAndSkipBit> const& items = itemsToGetFrom(iPrincipal.branchType());
0227 
0228     for (auto const& item : items) {
0229       ProductResolverIndex productResolverIndex = item.productResolverIndex();
0230       bool skipCurrentProcess = item.skipCurrentProcess();
0231       if (productResolverIndex != ProductResolverIndexAmbiguous) {
0232         iPrincipal.prefetchAsync(iTask, productResolverIndex, skipCurrentProcess, token, &moduleCallingContext_);
0233       }
0234     }
0235   }
0236 
0237   void Worker::setEarlyDeleteHelper(EarlyDeleteHelper* iHelper) { earlyDeleteHelper_ = iHelper; }
0238 
0239   void Worker::resetModuleDescription(ModuleDescription const* iDesc) {
0240     ModuleCallingContext temp(iDesc,
0241                               moduleCallingContext_.state(),
0242                               moduleCallingContext_.parent(),
0243                               moduleCallingContext_.previousModuleOnThread());
0244     moduleCallingContext_ = temp;
0245   }
0246 
0247   void Worker::beginJob() {
0248     try {
0249       convertException::wrap([&]() {
0250         ModuleBeginJobSignalSentry cpp(actReg_.get(), *description());
0251         implBeginJob();
0252       });
0253     } catch (cms::Exception& ex) {
0254       state_ = Exception;
0255       std::ostringstream ost;
0256       ost << "Calling beginJob for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0257           << "'";
0258       ex.addContext(ost.str());
0259       throw;
0260     }
0261   }
0262 
0263   void Worker::endJob() {
0264     try {
0265       convertException::wrap([&]() {
0266         ModuleDescription const* desc = description();
0267         assert(desc != nullptr);
0268         ModuleEndJobSignalSentry cpp(actReg_.get(), *desc);
0269         implEndJob();
0270       });
0271     } catch (cms::Exception& ex) {
0272       state_ = Exception;
0273       std::ostringstream ost;
0274       ost << "Calling endJob for module " << description()->moduleName() << "/'" << description()->moduleLabel() << "'";
0275       ex.addContext(ost.str());
0276       throw;
0277     }
0278   }
0279 
0280   void Worker::beginStream(StreamID id, StreamContext& streamContext) {
0281     try {
0282       convertException::wrap([&]() {
0283         streamContext.setTransition(StreamContext::Transition::kBeginStream);
0284         streamContext.setEventID(EventID(0, 0, 0));
0285         streamContext.setRunIndex(RunIndex::invalidRunIndex());
0286         streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0287         streamContext.setTimestamp(Timestamp());
0288         ParentContext parentContext(&streamContext);
0289         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0290         moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0291         ModuleBeginStreamSignalSentry beginSentry(actReg_.get(), streamContext, moduleCallingContext_);
0292         implBeginStream(id);
0293       });
0294     } catch (cms::Exception& ex) {
0295       state_ = Exception;
0296       std::ostringstream ost;
0297       ost << "Calling beginStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0298           << "'";
0299       ex.addContext(ost.str());
0300       throw;
0301     }
0302   }
0303 
0304   void Worker::endStream(StreamID id, StreamContext& streamContext) {
0305     try {
0306       convertException::wrap([&]() {
0307         streamContext.setTransition(StreamContext::Transition::kEndStream);
0308         streamContext.setEventID(EventID(0, 0, 0));
0309         streamContext.setRunIndex(RunIndex::invalidRunIndex());
0310         streamContext.setLuminosityBlockIndex(LuminosityBlockIndex::invalidLuminosityBlockIndex());
0311         streamContext.setTimestamp(Timestamp());
0312         ParentContext parentContext(&streamContext);
0313         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0314         moduleCallingContext_.setState(ModuleCallingContext::State::kRunning);
0315         ModuleEndStreamSignalSentry endSentry(actReg_.get(), streamContext, moduleCallingContext_);
0316         implEndStream(id);
0317       });
0318     } catch (cms::Exception& ex) {
0319       state_ = Exception;
0320       std::ostringstream ost;
0321       ost << "Calling endStream for module " << description()->moduleName() << "/'" << description()->moduleLabel()
0322           << "'";
0323       ex.addContext(ost.str());
0324       throw;
0325     }
0326   }
0327 
0328   void Worker::registerThinnedAssociations(ProductRegistry const& registry, ThinnedAssociationsHelper& helper) {
0329     try {
0330       implRegisterThinnedAssociations(registry, helper);
0331     } catch (cms::Exception& ex) {
0332       ex.addContext("Calling registerThinnedAssociations() for module " + description()->moduleLabel());
0333       throw ex;
0334     }
0335   }
0336 
0337   void Worker::skipOnPath(EventPrincipal const& iEvent) {
0338     if (earlyDeleteHelper_) {
0339       earlyDeleteHelper_->pathFinished(iEvent);
0340     }
0341     if (0 == --numberOfPathsLeftToRun_) {
0342       waitingTasks_.doneWaiting(cached_exception_);
0343     }
0344   }
0345 
0346   void Worker::postDoEvent(EventPrincipal const& iEvent) {
0347     if (earlyDeleteHelper_) {
0348       earlyDeleteHelper_->moduleRan(iEvent);
0349     }
0350   }
0351 
0352   void Worker::runAcquire(EventTransitionInfo const& info,
0353                           ParentContext const& parentContext,
0354                           WaitingTaskWithArenaHolder& holder) {
0355     ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0356     try {
0357       convertException::wrap([&]() { this->implDoAcquire(info, &moduleCallingContext_, holder); });
0358     } catch (cms::Exception& ex) {
0359       edm::exceptionContext(ex, moduleCallingContext_);
0360       if (shouldRethrowException(std::current_exception(), parentContext, true)) {
0361         timesRun_.fetch_add(1, std::memory_order_relaxed);
0362         throw;
0363       }
0364     }
0365   }
0366 
0367   void Worker::runAcquireAfterAsyncPrefetch(std::exception_ptr const* iEPtr,
0368                                             EventTransitionInfo const& eventTransitionInfo,
0369                                             ParentContext const& parentContext,
0370                                             WaitingTaskWithArenaHolder holder) {
0371     ranAcquireWithoutException_ = false;
0372     std::exception_ptr exceptionPtr;
0373     if (iEPtr) {
0374       assert(*iEPtr);
0375       if (shouldRethrowException(*iEPtr, parentContext, true)) {
0376         exceptionPtr = *iEPtr;
0377       }
0378       moduleCallingContext_.setContext(ModuleCallingContext::State::kInvalid, ParentContext(), nullptr);
0379     } else {
0380       // Caught exception is propagated via WaitingTaskWithArenaHolder
0381       CMS_SA_ALLOW try {
0382         runAcquire(eventTransitionInfo, parentContext, holder);
0383         ranAcquireWithoutException_ = true;
0384       } catch (...) {
0385         exceptionPtr = std::current_exception();
0386       }
0387     }
0388     // It is important this is after runAcquire completely finishes
0389     holder.doneWaiting(exceptionPtr);
0390   }
0391 
0392   std::exception_ptr Worker::handleExternalWorkException(std::exception_ptr const* iEPtr,
0393                                                          ParentContext const& parentContext) {
0394     if (ranAcquireWithoutException_) {
0395       try {
0396         convertException::wrap([iEPtr]() { std::rethrow_exception(*iEPtr); });
0397       } catch (cms::Exception& ex) {
0398         ModuleContextSentry moduleContextSentry(&moduleCallingContext_, parentContext);
0399         edm::exceptionContext(ex, moduleCallingContext_);
0400         return std::current_exception();
0401       }
0402     }
0403     return *iEPtr;
0404   }
0405 
0406   Worker::HandleExternalWorkExceptionTask::HandleExternalWorkExceptionTask(Worker* worker,
0407                                                                            oneapi::tbb::task_group* group,
0408                                                                            WaitingTask* runModuleTask,
0409                                                                            ParentContext const& parentContext)
0410       : m_worker(worker), m_runModuleTask(runModuleTask), m_group(group), m_parentContext(parentContext) {}
0411 
0412   void Worker::HandleExternalWorkExceptionTask::execute() {
0413     auto excptr = exceptionPtr();
0414     WaitingTaskHolder holder(*m_group, m_runModuleTask);
0415     if (excptr) {
0416       holder.doneWaiting(m_worker->handleExternalWorkException(excptr, m_parentContext));
0417     }
0418   }
0419 }  // namespace edm