File indexing completed on 2024-06-04 04:34:54
0001
0002 #include "FWCore/Framework/interface/Path.h"
0003 #include "FWCore/Framework/interface/EventPrincipal.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/src/PathStatusInserter.h"
0008 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0009 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0010 #include "FWCore/Utilities/interface/Algorithms.h"
0011 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0014
0015 #include <algorithm>
0016 #include <sstream>
0017
0018 namespace edm {
0019 Path::Path(int bitpos,
0020 std::string const& path_name,
0021 WorkersInPath const& workers,
0022 TrigResPtr trptr,
0023 ExceptionToActionTable const& actions,
0024 std::shared_ptr<ActivityRegistry> areg,
0025 StreamContext const* streamContext,
0026 PathContext::PathType pathType)
0027 : timesRun_(),
0028 timesPassed_(),
0029 timesFailed_(),
0030 timesExcept_(),
0031 failedModuleIndex_(workers.size()),
0032 state_(hlt::Ready),
0033 bitpos_(bitpos),
0034 trptr_(trptr),
0035 actReg_(areg),
0036 act_table_(&actions),
0037 workers_(workers),
0038 pathContext_(path_name, streamContext, bitpos, pathType),
0039 pathStatusInserter_(nullptr),
0040 pathStatusInserterWorker_(nullptr) {
0041 for (auto& workerInPath : workers_) {
0042 workerInPath.setPathContext(&pathContext_);
0043 }
0044 modulesToRun_ = workers_.size();
0045 }
0046
0047 Path::Path(Path const& r)
0048 : timesRun_(r.timesRun_),
0049 timesPassed_(r.timesPassed_),
0050 timesFailed_(r.timesFailed_),
0051 timesExcept_(r.timesExcept_),
0052 failedModuleIndex_(r.failedModuleIndex_),
0053 state_(r.state_),
0054 bitpos_(r.bitpos_),
0055 trptr_(r.trptr_),
0056 actReg_(r.actReg_),
0057 act_table_(r.act_table_),
0058 workers_(r.workers_),
0059 pathContext_(r.pathContext_),
0060 pathStatusInserter_(r.pathStatusInserter_),
0061 pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
0062 for (auto& workerInPath : workers_) {
0063 workerInPath.setPathContext(&pathContext_);
0064 }
0065 modulesToRun_ = workers_.size();
0066 }
0067
0068 void Path::handleWorkerFailure(cms::Exception& e, int nwrwue, ModuleDescription const& desc, std::string const& id) {
0069 if (e.context().empty()) {
0070 exceptionContext(e, true , true , InEvent , desc, id, pathContext_);
0071 }
0072
0073
0074
0075
0076 exception_actions::ActionCodes action = act_table_->find(e.category());
0077 switch (action) {
0078 case exception_actions::TryToContinue: {
0079 bool expected = false;
0080 if (printedException_.compare_exchange_strong(expected, true)) {
0081 std::ostringstream s;
0082 s << "Path " << name() << " applying TryToContinue on";
0083 edm::printCmsExceptionWarning(s.str().c_str(), e);
0084 }
0085 break;
0086 }
0087 default: {
0088 if (action == exception_actions::Rethrow) {
0089 std::string pNF = Exception::codeToString(errors::ProductNotFound);
0090 if (e.category() == pNF) {
0091 std::ostringstream ost;
0092 ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
0093 << "add \"TryToContinue = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
0094 "configuration.\n";
0095 e.addAdditionalInfo(ost.str());
0096 }
0097 }
0098
0099 e.raise();
0100 }
0101 }
0102 }
0103
0104 void Path::exceptionContext(cms::Exception& ex,
0105 bool isEvent,
0106 bool begin,
0107 BranchType branchType,
0108 ModuleDescription const& desc,
0109 std::string const& id,
0110 PathContext const& pathContext) {
0111 std::ostringstream ost;
0112 ost << "Running path '" << pathContext.pathName() << "'";
0113 ex.addContext(ost.str());
0114 ost.str("");
0115 ost << "Processing ";
0116
0117
0118 if (begin && branchType == InRun) {
0119 ost << "stream begin Run";
0120 } else if (begin && branchType == InLumi) {
0121 ost << "stream begin LuminosityBlock ";
0122 } else if (!begin && branchType == InLumi) {
0123 ost << "stream end LuminosityBlock ";
0124 } else if (!begin && branchType == InRun) {
0125 ost << "stream end Run ";
0126 } else if (isEvent) {
0127
0128 ost << "Event ";
0129 }
0130 ost << id;
0131 ex.addContext(ost.str());
0132 }
0133
0134 void Path::threadsafe_setFailedModuleInfo(int nwrwue, bool iExcept) {
0135 bool expected = false;
0136 while (not stateLock_.compare_exchange_strong(expected, true)) {
0137 expected = false;
0138 }
0139 if (iExcept) {
0140 if (state_ == hlt::Exception) {
0141 if (nwrwue < failedModuleIndex_) {
0142 failedModuleIndex_ = nwrwue;
0143 }
0144 } else {
0145 state_ = hlt::Exception;
0146 failedModuleIndex_ = nwrwue;
0147 }
0148 } else {
0149 if (state_ != hlt::Exception) {
0150 if (nwrwue < failedModuleIndex_) {
0151 failedModuleIndex_ = nwrwue;
0152 }
0153 state_ = hlt::Fail;
0154 }
0155 }
0156
0157 stateLock_ = false;
0158 }
0159
0160 void Path::recordStatus(int nwrwue, hlt::HLTState state) {
0161 if (trptr_) {
0162 trptr_->at(bitpos_) = HLTPathStatus(state, nwrwue);
0163 }
0164 }
0165
0166 void Path::updateCounters(hlt::HLTState state) {
0167 switch (state) {
0168 case hlt::Pass: {
0169 ++timesPassed_;
0170 break;
0171 }
0172 case hlt::Fail: {
0173 ++timesFailed_;
0174 break;
0175 }
0176 case hlt::Exception: {
0177 ++timesExcept_;
0178 }
0179 default:;
0180 }
0181 }
0182
0183 void Path::clearCounters() {
0184 using std::placeholders::_1;
0185 timesRun_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
0186 for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
0187 }
0188
0189 void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
0190 for (unsigned int index = 0; index != size(); ++index) {
0191 auto found = iWorkerToDeleter.find(getWorker(index));
0192 if (found != iWorkerToDeleter.end()) {
0193 found->second->addedToPath();
0194 }
0195 }
0196 }
0197
0198 void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
0199 pathStatusInserter_ = pathStatusInserter;
0200 pathStatusInserterWorker_ = pathStatusInserterWorker;
0201 }
0202
0203 void Path::processEventUsingPathAsync(WaitingTaskHolder iTask,
0204 EventTransitionInfo const& iInfo,
0205 ServiceToken const& iToken,
0206 StreamID const& iStreamID,
0207 StreamContext const* iStreamContext) {
0208 waitingTasks_.reset();
0209 modulesToRun_ = workers_.size();
0210 ++timesRun_;
0211 waitingTasks_.add(iTask);
0212 printedException_ = false;
0213 if (actReg_) {
0214 ServiceRegistry::Operate guard(iToken);
0215 actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
0216 }
0217
0218 state_ = hlt::Pass;
0219 failedModuleIndex_ = workers_.size() - 1;
0220
0221 if (workers_.empty()) {
0222 ServiceRegistry::Operate guard(iToken);
0223 finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
0224 return;
0225 }
0226
0227 runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
0228 }
0229
0230 void Path::workerFinished(std::exception_ptr const* iException,
0231 unsigned int iModuleIndex,
0232 EventTransitionInfo const& iInfo,
0233 ServiceToken const& iToken,
0234 StreamID const& iID,
0235 StreamContext const* iContext,
0236 oneapi::tbb::task_group& iGroup) {
0237 EventPrincipal const& iEP = iInfo.principal();
0238 ServiceRegistry::Operate guard(iToken);
0239
0240
0241
0242 auto& worker = workers_[iModuleIndex];
0243 bool shouldContinue = worker.checkResultsOfRunWorker(true);
0244 std::exception_ptr finalException;
0245 if (iException) {
0246 shouldContinue = false;
0247 std::unique_ptr<cms::Exception> pEx;
0248 try {
0249 std::rethrow_exception(*iException);
0250 } catch (cms::Exception& oldEx) {
0251 pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
0252 } catch (std::exception const& oldEx) {
0253 pEx = std::make_unique<edm::Exception>(errors::StdException);
0254 } catch (...) {
0255 pEx = std::make_unique<edm::Exception>(errors::Unknown);
0256 }
0257
0258 CMS_SA_ALLOW try {
0259 std::ostringstream ost;
0260 ost << iEP.id();
0261 ModuleDescription const* desc = worker.getWorker()->description();
0262 assert(desc != nullptr);
0263 handleWorkerFailure(*pEx, iModuleIndex, *desc, ost.str());
0264
0265 worker.skipWorker(iEP);
0266 } catch (...) {
0267 finalException = std::current_exception();
0268
0269
0270
0271
0272 waitingTasks_.presetTaskAsFailed(finalException);
0273 }
0274 }
0275 auto const nextIndex = iModuleIndex + 1;
0276 if (shouldContinue and nextIndex < workers_.size()) {
0277 if (not worker.runConcurrently()) {
0278 --modulesToRun_;
0279 runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
0280 return;
0281 }
0282 }
0283
0284 if (not shouldContinue) {
0285 threadsafe_setFailedModuleInfo(iModuleIndex, iException != nullptr);
0286 }
0287 if (not shouldContinue and not worker.runConcurrently()) {
0288
0289 for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
0290 --modulesToRun_;
0291 it->skipWorker(iEP);
0292 }
0293 }
0294 if (--modulesToRun_ == 0) {
0295
0296 finished(finalException, iContext, iInfo, iID);
0297 }
0298 }
0299
0300 void Path::finished(std::exception_ptr iException,
0301 StreamContext const* iContext,
0302 EventTransitionInfo const& iInfo,
0303 StreamID const& streamID) {
0304 updateCounters(state_);
0305 auto failedModuleBitPosition = bitPosition(failedModuleIndex_);
0306 recordStatus(failedModuleBitPosition, state_);
0307
0308 CMS_SA_ALLOW try {
0309 HLTPathStatus status(state_, failedModuleBitPosition);
0310
0311 if (pathStatusInserter_) {
0312 pathStatusInserter_->setPathStatus(streamID, status);
0313 }
0314 if (pathStatusInserterWorker_) {
0315 std::exception_ptr jException =
0316 pathStatusInserterWorker_->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0317 iInfo, streamID, ParentContext(iContext), iContext);
0318 if (jException && not iException) {
0319 iException = jException;
0320 }
0321 }
0322 actReg_->postPathEventSignal_(*iContext, pathContext_, status);
0323 } catch (...) {
0324 if (not iException) {
0325 iException = std::current_exception();
0326 }
0327 }
0328 waitingTasks_.doneWaiting(iException);
0329 }
0330
0331 void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
0332 EventTransitionInfo const& iInfo,
0333 ServiceToken const& iToken,
0334 StreamID const& iID,
0335 StreamContext const* iContext,
0336 oneapi::tbb::task_group& iGroup) {
0337
0338 const int firstModuleIndex = iNextModuleIndex;
0339 int lastModuleIndex = firstModuleIndex;
0340 while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
0341 ++lastModuleIndex;
0342 }
0343 for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
0344 ServiceWeakToken weakToken = iToken;
0345 auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
0346 std::exception_ptr const* iException) {
0347 this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
0348 });
0349 workers_[lastModuleIndex].runWorkerAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0350 WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
0351 }
0352 }
0353
0354 }