File indexing completed on 2025-05-23 23:48:33
0001
0002 #include "FWCore/Framework/interface/Path.h"
0003 #include "FWCore/Framework/interface/EventPrincipal.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/src/PathStatusInserter.h"
0008 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0009 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0010 #include "FWCore/Utilities/interface/Algorithms.h"
0011 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0012 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0015
0016 #include <algorithm>
0017 #include <sstream>
0018
0019 namespace edm {
0020 Path::Path(int bitpos,
0021 std::string const& path_name,
0022 WorkersInPath const& workers,
0023 TrigResPtr trptr,
0024 ExceptionToActionTable const& actions,
0025 std::shared_ptr<ActivityRegistry> areg,
0026 StreamContext const* streamContext,
0027 PathContext::PathType pathType)
0028 : timesRun_(),
0029 timesPassed_(),
0030 timesFailed_(),
0031 timesExcept_(),
0032 failedModuleIndex_(workers.size()),
0033 state_(hlt::Ready),
0034 bitpos_(bitpos),
0035 trptr_(trptr),
0036 actReg_(areg),
0037 act_table_(&actions),
0038 workers_(workers),
0039 pathContext_(path_name, streamContext, bitpos, pathType),
0040 pathStatusInserter_(nullptr),
0041 pathStatusInserterWorker_(nullptr) {
0042 for (auto& workerInPath : workers_) {
0043 workerInPath.setPathContext(&pathContext_);
0044 }
0045 modulesToRun_ = workers_.size();
0046 }
0047
0048 Path::Path(Path const& r)
0049 : timesRun_(r.timesRun_),
0050 timesPassed_(r.timesPassed_),
0051 timesFailed_(r.timesFailed_),
0052 timesExcept_(r.timesExcept_),
0053 failedModuleIndex_(r.failedModuleIndex_),
0054 state_(r.state_),
0055 bitpos_(r.bitpos_),
0056 trptr_(r.trptr_),
0057 actReg_(r.actReg_),
0058 act_table_(r.act_table_),
0059 workers_(r.workers_),
0060 pathContext_(r.pathContext_),
0061 pathStatusInserter_(r.pathStatusInserter_),
0062 pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
0063 for (auto& workerInPath : workers_) {
0064 workerInPath.setPathContext(&pathContext_);
0065 }
0066 modulesToRun_ = workers_.size();
0067 }
0068
0069 void Path::handleWorkerFailure(cms::Exception& e, int nwrwue, ModuleDescription const& desc, std::string const& id) {
0070 if (e.context().empty()) {
0071 exceptionContext(e, true , true , InEvent , desc, id, pathContext_);
0072 }
0073
0074
0075
0076
0077 exception_actions::ActionCodes action = act_table_->find(e.category());
0078 switch (action) {
0079 case exception_actions::TryToContinue: {
0080 bool expected = false;
0081 if (printedException_.compare_exchange_strong(expected, true)) {
0082 std::ostringstream s;
0083 s << "Path " << name() << " applying TryToContinue on";
0084 edm::printCmsExceptionWarning(s.str().c_str(), e);
0085 }
0086 break;
0087 }
0088 default: {
0089 if (action == exception_actions::Rethrow) {
0090 std::string pNF = Exception::codeToString(errors::ProductNotFound);
0091 if (e.category() == pNF) {
0092 std::ostringstream ost;
0093 ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
0094 << "add \"TryToContinue = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
0095 "configuration.\n";
0096 e.addAdditionalInfo(ost.str());
0097 }
0098 }
0099
0100 e.raise();
0101 }
0102 }
0103 }
0104
0105 void Path::exceptionContext(cms::Exception& ex,
0106 bool isEvent,
0107 bool begin,
0108 BranchType branchType,
0109 ModuleDescription const& desc,
0110 std::string const& id,
0111 PathContext const& pathContext) {
0112 std::ostringstream ost;
0113 ost << "Running path '" << pathContext.pathName() << "'";
0114 ex.addContext(ost.str());
0115 ost.str("");
0116 ost << "Processing ";
0117
0118
0119 if (begin && branchType == InRun) {
0120 ost << "stream begin Run";
0121 } else if (begin && branchType == InLumi) {
0122 ost << "stream begin LuminosityBlock ";
0123 } else if (!begin && branchType == InLumi) {
0124 ost << "stream end LuminosityBlock ";
0125 } else if (!begin && branchType == InRun) {
0126 ost << "stream end Run ";
0127 } else if (isEvent) {
0128
0129 ost << "Event ";
0130 }
0131 ost << id;
0132 ex.addContext(ost.str());
0133 }
0134
0135 void Path::threadsafe_setFailedModuleInfo(int nwrwue, bool iExcept) {
0136 bool expected = false;
0137 while (not stateLock_.compare_exchange_strong(expected, true)) {
0138 expected = false;
0139 }
0140 if (iExcept) {
0141 if (state_ == hlt::Exception) {
0142 if (nwrwue < failedModuleIndex_) {
0143 failedModuleIndex_ = nwrwue;
0144 }
0145 } else {
0146 state_ = hlt::Exception;
0147 failedModuleIndex_ = nwrwue;
0148 }
0149 } else {
0150 if (state_ != hlt::Exception) {
0151 if (nwrwue < failedModuleIndex_) {
0152 failedModuleIndex_ = nwrwue;
0153 }
0154 state_ = hlt::Fail;
0155 }
0156 }
0157
0158 stateLock_ = false;
0159 }
0160
0161 void Path::recordStatus(int nwrwue, hlt::HLTState state) {
0162 if (trptr_) {
0163 trptr_->at(bitpos_) = HLTPathStatus(state, nwrwue);
0164 }
0165 }
0166
0167 void Path::updateCounters(hlt::HLTState state) {
0168 switch (state) {
0169 case hlt::Pass: {
0170 ++timesPassed_;
0171 break;
0172 }
0173 case hlt::Fail: {
0174 ++timesFailed_;
0175 break;
0176 }
0177 case hlt::Exception: {
0178 ++timesExcept_;
0179 }
0180 default:;
0181 }
0182 }
0183
0184 void Path::clearCounters() {
0185 using std::placeholders::_1;
0186 timesRun_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
0187 for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
0188 }
0189
0190 void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
0191 for (unsigned int index = 0; index != size(); ++index) {
0192 auto found = iWorkerToDeleter.find(getWorker(index));
0193 if (found != iWorkerToDeleter.end()) {
0194 found->second->addedToPath();
0195 }
0196 }
0197 }
0198
0199 void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
0200 pathStatusInserter_ = pathStatusInserter;
0201 pathStatusInserterWorker_ = pathStatusInserterWorker;
0202 }
0203
0204 void Path::processEventUsingPathAsync(WaitingTaskHolder iTask,
0205 EventTransitionInfo const& iInfo,
0206 ServiceToken const& iToken,
0207 StreamID const& iStreamID,
0208 StreamContext const* iStreamContext) {
0209 waitingTasks_.reset();
0210 modulesToRun_ = workers_.size();
0211 ++timesRun_;
0212 waitingTasks_.add(iTask);
0213 printedException_ = false;
0214 if (actReg_) {
0215 ServiceRegistry::Operate guard(iToken);
0216 actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
0217 }
0218
0219 state_ = hlt::Pass;
0220 failedModuleIndex_ = workers_.size() - 1;
0221
0222 if (workers_.empty()) {
0223 ServiceRegistry::Operate guard(iToken);
0224 finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
0225 return;
0226 }
0227
0228 runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
0229 }
0230
0231 void Path::workerFinished(std::exception_ptr const* iException,
0232 unsigned int iModuleIndex,
0233 EventTransitionInfo const& iInfo,
0234 ServiceToken const& iToken,
0235 StreamID const& iID,
0236 StreamContext const* iContext,
0237 oneapi::tbb::task_group& iGroup) {
0238 EventPrincipal const& iEP = iInfo.principal();
0239 ServiceRegistry::Operate guard(iToken);
0240
0241
0242
0243 auto& worker = workers_[iModuleIndex];
0244 bool shouldContinue = worker.checkResultsOfRunWorker(true);
0245 std::exception_ptr finalException;
0246 if (iException) {
0247 shouldContinue = false;
0248 std::unique_ptr<cms::Exception> pEx;
0249 CMS_SA_ALLOW try { std::rethrow_exception(*iException); } catch (cms::Exception& oldEx) {
0250 pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
0251 } catch (std::exception const& oldEx) {
0252 pEx = std::make_unique<edm::Exception>(errors::StdException);
0253 } catch (...) {
0254 pEx = std::make_unique<edm::Exception>(errors::Unknown);
0255 }
0256
0257 CMS_SA_ALLOW try {
0258 std::ostringstream ost;
0259 ost << iEP.id();
0260 ModuleDescription const* desc = worker.getWorker()->description();
0261 assert(desc != nullptr);
0262 handleWorkerFailure(*pEx, iModuleIndex, *desc, ost.str());
0263
0264 worker.skipWorker(iEP);
0265 } catch (...) {
0266 finalException = std::current_exception();
0267
0268
0269
0270
0271 waitingTasks_.presetTaskAsFailed(finalException);
0272 }
0273 }
0274 auto const nextIndex = iModuleIndex + 1;
0275 if (shouldContinue and nextIndex < workers_.size()) {
0276 if (not worker.runConcurrently()) {
0277 --modulesToRun_;
0278 runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
0279 return;
0280 }
0281 }
0282
0283 if (not shouldContinue) {
0284 threadsafe_setFailedModuleInfo(iModuleIndex, iException != nullptr);
0285 }
0286 if (not shouldContinue and not worker.runConcurrently()) {
0287
0288 for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
0289 --modulesToRun_;
0290 it->skipWorker(iEP);
0291 }
0292 }
0293 if (--modulesToRun_ == 0) {
0294
0295 finished(finalException, iContext, iInfo, iID);
0296 }
0297 }
0298
0299 void Path::finished(std::exception_ptr iException,
0300 StreamContext const* iContext,
0301 EventTransitionInfo const& iInfo,
0302 StreamID const& streamID) {
0303 updateCounters(state_);
0304 auto failedModuleBitPosition = bitPosition(failedModuleIndex_);
0305 recordStatus(failedModuleBitPosition, state_);
0306
0307 CMS_SA_ALLOW try {
0308 HLTPathStatus status(state_, failedModuleBitPosition);
0309
0310 if (pathStatusInserter_) {
0311 pathStatusInserter_->setPathStatus(streamID, status);
0312 }
0313 if (pathStatusInserterWorker_) {
0314 std::exception_ptr jException =
0315 pathStatusInserterWorker_->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0316 iInfo, streamID, ParentContext(iContext), iContext);
0317 if (jException && not iException) {
0318 iException = jException;
0319 }
0320 }
0321 actReg_->postPathEventSignal_(*iContext, pathContext_, status);
0322 } catch (...) {
0323 if (not iException) {
0324 iException = std::current_exception();
0325 }
0326 }
0327 waitingTasks_.doneWaiting(iException);
0328 }
0329
0330 void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
0331 EventTransitionInfo const& iInfo,
0332 ServiceToken const& iToken,
0333 StreamID const& iID,
0334 StreamContext const* iContext,
0335 oneapi::tbb::task_group& iGroup) {
0336
0337 const int firstModuleIndex = iNextModuleIndex;
0338 int lastModuleIndex = firstModuleIndex;
0339 while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
0340 ++lastModuleIndex;
0341 }
0342 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
0343 ServiceWeakToken weakToken = iToken;
0344 auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
0345 std::exception_ptr const* iException) {
0346 this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
0347 });
0348 workers_[lastModuleIndex].runWorkerAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0349 WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
0350 }
0351 }
0352
0353 }