Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-23 23:48:33

0001 
0002 #include "FWCore/Framework/interface/Path.h"
0003 #include "FWCore/Framework/interface/EventPrincipal.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/src/PathStatusInserter.h"
0008 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0009 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0010 #include "FWCore/Utilities/interface/Algorithms.h"
0011 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0012 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0013 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0014 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0015 
0016 #include <algorithm>
0017 #include <sstream>
0018 
0019 namespace edm {
0020   Path::Path(int bitpos,
0021              std::string const& path_name,
0022              WorkersInPath const& workers,
0023              TrigResPtr trptr,
0024              ExceptionToActionTable const& actions,
0025              std::shared_ptr<ActivityRegistry> areg,
0026              StreamContext const* streamContext,
0027              PathContext::PathType pathType)
0028       : timesRun_(),
0029         timesPassed_(),
0030         timesFailed_(),
0031         timesExcept_(),
0032         failedModuleIndex_(workers.size()),
0033         state_(hlt::Ready),
0034         bitpos_(bitpos),
0035         trptr_(trptr),
0036         actReg_(areg),
0037         act_table_(&actions),
0038         workers_(workers),
0039         pathContext_(path_name, streamContext, bitpos, pathType),
0040         pathStatusInserter_(nullptr),
0041         pathStatusInserterWorker_(nullptr) {
0042     for (auto& workerInPath : workers_) {
0043       workerInPath.setPathContext(&pathContext_);
0044     }
0045     modulesToRun_ = workers_.size();
0046   }
0047 
0048   Path::Path(Path const& r)
0049       : timesRun_(r.timesRun_),
0050         timesPassed_(r.timesPassed_),
0051         timesFailed_(r.timesFailed_),
0052         timesExcept_(r.timesExcept_),
0053         failedModuleIndex_(r.failedModuleIndex_),
0054         state_(r.state_),
0055         bitpos_(r.bitpos_),
0056         trptr_(r.trptr_),
0057         actReg_(r.actReg_),
0058         act_table_(r.act_table_),
0059         workers_(r.workers_),
0060         pathContext_(r.pathContext_),
0061         pathStatusInserter_(r.pathStatusInserter_),
0062         pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
0063     for (auto& workerInPath : workers_) {
0064       workerInPath.setPathContext(&pathContext_);
0065     }
0066     modulesToRun_ = workers_.size();
0067   }
0068 
0069   void Path::handleWorkerFailure(cms::Exception& e, int nwrwue, ModuleDescription const& desc, std::string const& id) {
0070     if (e.context().empty()) {
0071       exceptionContext(e, true /*isEvent*/, true /*begin*/, InEvent /*branchType*/, desc, id, pathContext_);
0072     }
0073     // there is no support as of yet for specific paths having
0074     // different exception behavior
0075 
0076     // If not processing an event, always rethrow.
0077     exception_actions::ActionCodes action = act_table_->find(e.category());
0078     switch (action) {
0079       case exception_actions::TryToContinue: {
0080         bool expected = false;
0081         if (printedException_.compare_exchange_strong(expected, true)) {
0082           std::ostringstream s;
0083           s << "Path " << name() << " applying TryToContinue on";
0084           edm::printCmsExceptionWarning(s.str().c_str(), e);
0085         }
0086         break;
0087       }
0088       default: {
0089         if (action == exception_actions::Rethrow) {
0090           std::string pNF = Exception::codeToString(errors::ProductNotFound);
0091           if (e.category() == pNF) {
0092             std::ostringstream ost;
0093             ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
0094                 << "add \"TryToContinue = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
0095                    "configuration.\n";
0096             e.addAdditionalInfo(ost.str());
0097           }
0098         }
0099         //throw will copy which will slice the object
0100         e.raise();
0101       }
0102     }
0103   }
0104 
0105   void Path::exceptionContext(cms::Exception& ex,
0106                               bool isEvent,
0107                               bool begin,
0108                               BranchType branchType,
0109                               ModuleDescription const& desc,
0110                               std::string const& id,
0111                               PathContext const& pathContext) {
0112     std::ostringstream ost;
0113     ost << "Running path '" << pathContext.pathName() << "'";
0114     ex.addContext(ost.str());
0115     ost.str("");
0116     ost << "Processing ";
0117     //For the event case, the Worker has already
0118     // added the necessary module context to the exception
0119     if (begin && branchType == InRun) {
0120       ost << "stream begin Run";
0121     } else if (begin && branchType == InLumi) {
0122       ost << "stream begin LuminosityBlock ";
0123     } else if (!begin && branchType == InLumi) {
0124       ost << "stream end LuminosityBlock ";
0125     } else if (!begin && branchType == InRun) {
0126       ost << "stream end Run ";
0127     } else if (isEvent) {
0128       // It should be impossible to get here ...
0129       ost << "Event ";
0130     }
0131     ost << id;
0132     ex.addContext(ost.str());
0133   }
0134 
0135   void Path::threadsafe_setFailedModuleInfo(int nwrwue, bool iExcept) {
0136     bool expected = false;
0137     while (not stateLock_.compare_exchange_strong(expected, true)) {
0138       expected = false;
0139     }
0140     if (iExcept) {
0141       if (state_ == hlt::Exception) {
0142         if (nwrwue < failedModuleIndex_) {
0143           failedModuleIndex_ = nwrwue;
0144         }
0145       } else {
0146         state_ = hlt::Exception;
0147         failedModuleIndex_ = nwrwue;
0148       }
0149     } else {
0150       if (state_ != hlt::Exception) {
0151         if (nwrwue < failedModuleIndex_) {
0152           failedModuleIndex_ = nwrwue;
0153         }
0154         state_ = hlt::Fail;
0155       }
0156     }
0157 
0158     stateLock_ = false;
0159   }
0160 
0161   void Path::recordStatus(int nwrwue, hlt::HLTState state) {
0162     if (trptr_) {
0163       trptr_->at(bitpos_) = HLTPathStatus(state, nwrwue);
0164     }
0165   }
0166 
0167   void Path::updateCounters(hlt::HLTState state) {
0168     switch (state) {
0169       case hlt::Pass: {
0170         ++timesPassed_;
0171         break;
0172       }
0173       case hlt::Fail: {
0174         ++timesFailed_;
0175         break;
0176       }
0177       case hlt::Exception: {
0178         ++timesExcept_;
0179       }
0180       default:;
0181     }
0182   }
0183 
0184   void Path::clearCounters() {
0185     using std::placeholders::_1;
0186     timesRun_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
0187     for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
0188   }
0189 
0190   void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
0191     for (unsigned int index = 0; index != size(); ++index) {
0192       auto found = iWorkerToDeleter.find(getWorker(index));
0193       if (found != iWorkerToDeleter.end()) {
0194         found->second->addedToPath();
0195       }
0196     }
0197   }
0198 
0199   void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
0200     pathStatusInserter_ = pathStatusInserter;
0201     pathStatusInserterWorker_ = pathStatusInserterWorker;
0202   }
0203 
0204   void Path::processEventUsingPathAsync(WaitingTaskHolder iTask,
0205                                         EventTransitionInfo const& iInfo,
0206                                         ServiceToken const& iToken,
0207                                         StreamID const& iStreamID,
0208                                         StreamContext const* iStreamContext) {
0209     waitingTasks_.reset();
0210     modulesToRun_ = workers_.size();
0211     ++timesRun_;
0212     waitingTasks_.add(iTask);
0213     printedException_ = false;
0214     if (actReg_) {
0215       ServiceRegistry::Operate guard(iToken);
0216       actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
0217     }
0218     //If the Path succeeds, these are the values we have at the end
0219     state_ = hlt::Pass;
0220     failedModuleIndex_ = workers_.size() - 1;
0221 
0222     if (workers_.empty()) {
0223       ServiceRegistry::Operate guard(iToken);
0224       finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
0225       return;
0226     }
0227 
0228     runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
0229   }
0230 
0231   void Path::workerFinished(std::exception_ptr const* iException,
0232                             unsigned int iModuleIndex,
0233                             EventTransitionInfo const& iInfo,
0234                             ServiceToken const& iToken,
0235                             StreamID const& iID,
0236                             StreamContext const* iContext,
0237                             oneapi::tbb::task_group& iGroup) {
0238     EventPrincipal const& iEP = iInfo.principal();
0239     ServiceRegistry::Operate guard(iToken);
0240 
0241     //This call also allows the WorkerInPath to update statistics
0242     // so should be done even if an exception happened
0243     auto& worker = workers_[iModuleIndex];
0244     bool shouldContinue = worker.checkResultsOfRunWorker(true);
0245     std::exception_ptr finalException;
0246     if (iException) {
0247       shouldContinue = false;
0248       std::unique_ptr<cms::Exception> pEx;
0249       CMS_SA_ALLOW try { std::rethrow_exception(*iException); } catch (cms::Exception& oldEx) {
0250         pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
0251       } catch (std::exception const& oldEx) {
0252         pEx = std::make_unique<edm::Exception>(errors::StdException);
0253       } catch (...) {
0254         pEx = std::make_unique<edm::Exception>(errors::Unknown);
0255       }
0256       // Caught exception is propagated via WaitingTaskList
0257       CMS_SA_ALLOW try {
0258         std::ostringstream ost;
0259         ost << iEP.id();
0260         ModuleDescription const* desc = worker.getWorker()->description();
0261         assert(desc != nullptr);
0262         handleWorkerFailure(*pEx, iModuleIndex, *desc, ost.str());
0263         //If we didn't rethrow, then we effectively skipped
0264         worker.skipWorker(iEP);
0265       } catch (...) {
0266         finalException = std::current_exception();
0267         //set the exception early to avoid case where another Path is waiting
0268         // on a module in this Path and not running the module will lead to a
0269         // different but related exception in the other Path. We want this
0270         // Paths exception to be the one that gets reported.
0271         waitingTasks_.presetTaskAsFailed(finalException);
0272       }
0273     }
0274     auto const nextIndex = iModuleIndex + 1;
0275     if (shouldContinue and nextIndex < workers_.size()) {
0276       if (not worker.runConcurrently()) {
0277         --modulesToRun_;
0278         runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
0279         return;
0280       }
0281     }
0282 
0283     if (not shouldContinue) {
0284       threadsafe_setFailedModuleInfo(iModuleIndex, iException != nullptr);
0285     }
0286     if (not shouldContinue and not worker.runConcurrently()) {
0287       //we are leaving the path early
0288       for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
0289         --modulesToRun_;
0290         it->skipWorker(iEP);
0291       }
0292     }
0293     if (--modulesToRun_ == 0) {
0294       //The path should only be marked as finished once all outstanding modules finish
0295       finished(finalException, iContext, iInfo, iID);
0296     }
0297   }
0298 
0299   void Path::finished(std::exception_ptr iException,
0300                       StreamContext const* iContext,
0301                       EventTransitionInfo const& iInfo,
0302                       StreamID const& streamID) {
0303     updateCounters(state_);
0304     auto failedModuleBitPosition = bitPosition(failedModuleIndex_);
0305     recordStatus(failedModuleBitPosition, state_);
0306     // Caught exception is propagated via WaitingTaskList
0307     CMS_SA_ALLOW try {
0308       HLTPathStatus status(state_, failedModuleBitPosition);
0309 
0310       if (pathStatusInserter_) {  // pathStatusInserter is null for EndPaths
0311         pathStatusInserter_->setPathStatus(streamID, status);
0312       }
0313       if (pathStatusInserterWorker_) {
0314         std::exception_ptr jException =
0315             pathStatusInserterWorker_->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0316                 iInfo, streamID, ParentContext(iContext), iContext);
0317         if (jException && not iException) {
0318           iException = jException;
0319         }
0320       }
0321       actReg_->postPathEventSignal_(*iContext, pathContext_, status);
0322     } catch (...) {
0323       if (not iException) {
0324         iException = std::current_exception();
0325       }
0326     }
0327     waitingTasks_.doneWaiting(iException);
0328   }
0329 
0330   void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
0331                                 EventTransitionInfo const& iInfo,
0332                                 ServiceToken const& iToken,
0333                                 StreamID const& iID,
0334                                 StreamContext const* iContext,
0335                                 oneapi::tbb::task_group& iGroup) {
0336     //Figure out which next modules can run concurrently
0337     const int firstModuleIndex = iNextModuleIndex;
0338     int lastModuleIndex = firstModuleIndex;
0339     while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
0340       ++lastModuleIndex;
0341     }
0342     for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
0343       ServiceWeakToken weakToken = iToken;
0344       auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
0345                                             std::exception_ptr const* iException) {
0346         this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
0347       });
0348       workers_[lastModuleIndex].runWorkerAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0349           WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
0350     }
0351   }
0352 
0353 }  // namespace edm