Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-04 04:34:54

0001 
0002 #include "FWCore/Framework/interface/Path.h"
0003 #include "FWCore/Framework/interface/EventPrincipal.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/src/PathStatusInserter.h"
0008 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0009 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0010 #include "FWCore/Utilities/interface/Algorithms.h"
0011 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0014 
0015 #include <algorithm>
0016 #include <sstream>
0017 
0018 namespace edm {
0019   Path::Path(int bitpos,
0020              std::string const& path_name,
0021              WorkersInPath const& workers,
0022              TrigResPtr trptr,
0023              ExceptionToActionTable const& actions,
0024              std::shared_ptr<ActivityRegistry> areg,
0025              StreamContext const* streamContext,
0026              PathContext::PathType pathType)
0027       : timesRun_(),
0028         timesPassed_(),
0029         timesFailed_(),
0030         timesExcept_(),
0031         failedModuleIndex_(workers.size()),
0032         state_(hlt::Ready),
0033         bitpos_(bitpos),
0034         trptr_(trptr),
0035         actReg_(areg),
0036         act_table_(&actions),
0037         workers_(workers),
0038         pathContext_(path_name, streamContext, bitpos, pathType),
0039         pathStatusInserter_(nullptr),
0040         pathStatusInserterWorker_(nullptr) {
0041     for (auto& workerInPath : workers_) {
0042       workerInPath.setPathContext(&pathContext_);
0043     }
0044     modulesToRun_ = workers_.size();
0045   }
0046 
0047   Path::Path(Path const& r)
0048       : timesRun_(r.timesRun_),
0049         timesPassed_(r.timesPassed_),
0050         timesFailed_(r.timesFailed_),
0051         timesExcept_(r.timesExcept_),
0052         failedModuleIndex_(r.failedModuleIndex_),
0053         state_(r.state_),
0054         bitpos_(r.bitpos_),
0055         trptr_(r.trptr_),
0056         actReg_(r.actReg_),
0057         act_table_(r.act_table_),
0058         workers_(r.workers_),
0059         pathContext_(r.pathContext_),
0060         pathStatusInserter_(r.pathStatusInserter_),
0061         pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
0062     for (auto& workerInPath : workers_) {
0063       workerInPath.setPathContext(&pathContext_);
0064     }
0065     modulesToRun_ = workers_.size();
0066   }
0067 
0068   void Path::handleWorkerFailure(cms::Exception& e, int nwrwue, ModuleDescription const& desc, std::string const& id) {
0069     if (e.context().empty()) {
0070       exceptionContext(e, true /*isEvent*/, true /*begin*/, InEvent /*branchType*/, desc, id, pathContext_);
0071     }
0072     // there is no support as of yet for specific paths having
0073     // different exception behavior
0074 
0075     // If not processing an event, always rethrow.
0076     exception_actions::ActionCodes action = act_table_->find(e.category());
0077     switch (action) {
0078       case exception_actions::TryToContinue: {
0079         bool expected = false;
0080         if (printedException_.compare_exchange_strong(expected, true)) {
0081           std::ostringstream s;
0082           s << "Path " << name() << " applying TryToContinue on";
0083           edm::printCmsExceptionWarning(s.str().c_str(), e);
0084         }
0085         break;
0086       }
0087       default: {
0088         if (action == exception_actions::Rethrow) {
0089           std::string pNF = Exception::codeToString(errors::ProductNotFound);
0090           if (e.category() == pNF) {
0091             std::ostringstream ost;
0092             ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
0093                 << "add \"TryToContinue = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
0094                    "configuration.\n";
0095             e.addAdditionalInfo(ost.str());
0096           }
0097         }
0098         //throw will copy which will slice the object
0099         e.raise();
0100       }
0101     }
0102   }
0103 
0104   void Path::exceptionContext(cms::Exception& ex,
0105                               bool isEvent,
0106                               bool begin,
0107                               BranchType branchType,
0108                               ModuleDescription const& desc,
0109                               std::string const& id,
0110                               PathContext const& pathContext) {
0111     std::ostringstream ost;
0112     ost << "Running path '" << pathContext.pathName() << "'";
0113     ex.addContext(ost.str());
0114     ost.str("");
0115     ost << "Processing ";
0116     //For the event case, the Worker has already
0117     // added the necessary module context to the exception
0118     if (begin && branchType == InRun) {
0119       ost << "stream begin Run";
0120     } else if (begin && branchType == InLumi) {
0121       ost << "stream begin LuminosityBlock ";
0122     } else if (!begin && branchType == InLumi) {
0123       ost << "stream end LuminosityBlock ";
0124     } else if (!begin && branchType == InRun) {
0125       ost << "stream end Run ";
0126     } else if (isEvent) {
0127       // It should be impossible to get here ...
0128       ost << "Event ";
0129     }
0130     ost << id;
0131     ex.addContext(ost.str());
0132   }
0133 
0134   void Path::threadsafe_setFailedModuleInfo(int nwrwue, bool iExcept) {
0135     bool expected = false;
0136     while (not stateLock_.compare_exchange_strong(expected, true)) {
0137       expected = false;
0138     }
0139     if (iExcept) {
0140       if (state_ == hlt::Exception) {
0141         if (nwrwue < failedModuleIndex_) {
0142           failedModuleIndex_ = nwrwue;
0143         }
0144       } else {
0145         state_ = hlt::Exception;
0146         failedModuleIndex_ = nwrwue;
0147       }
0148     } else {
0149       if (state_ != hlt::Exception) {
0150         if (nwrwue < failedModuleIndex_) {
0151           failedModuleIndex_ = nwrwue;
0152         }
0153         state_ = hlt::Fail;
0154       }
0155     }
0156 
0157     stateLock_ = false;
0158   }
0159 
0160   void Path::recordStatus(int nwrwue, hlt::HLTState state) {
0161     if (trptr_) {
0162       trptr_->at(bitpos_) = HLTPathStatus(state, nwrwue);
0163     }
0164   }
0165 
0166   void Path::updateCounters(hlt::HLTState state) {
0167     switch (state) {
0168       case hlt::Pass: {
0169         ++timesPassed_;
0170         break;
0171       }
0172       case hlt::Fail: {
0173         ++timesFailed_;
0174         break;
0175       }
0176       case hlt::Exception: {
0177         ++timesExcept_;
0178       }
0179       default:;
0180     }
0181   }
0182 
0183   void Path::clearCounters() {
0184     using std::placeholders::_1;
0185     timesRun_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
0186     for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
0187   }
0188 
0189   void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
0190     for (unsigned int index = 0; index != size(); ++index) {
0191       auto found = iWorkerToDeleter.find(getWorker(index));
0192       if (found != iWorkerToDeleter.end()) {
0193         found->second->addedToPath();
0194       }
0195     }
0196   }
0197 
0198   void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
0199     pathStatusInserter_ = pathStatusInserter;
0200     pathStatusInserterWorker_ = pathStatusInserterWorker;
0201   }
0202 
0203   void Path::processEventUsingPathAsync(WaitingTaskHolder iTask,
0204                                         EventTransitionInfo const& iInfo,
0205                                         ServiceToken const& iToken,
0206                                         StreamID const& iStreamID,
0207                                         StreamContext const* iStreamContext) {
0208     waitingTasks_.reset();
0209     modulesToRun_ = workers_.size();
0210     ++timesRun_;
0211     waitingTasks_.add(iTask);
0212     printedException_ = false;
0213     if (actReg_) {
0214       ServiceRegistry::Operate guard(iToken);
0215       actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
0216     }
0217     //If the Path succeeds, these are the values we have at the end
0218     state_ = hlt::Pass;
0219     failedModuleIndex_ = workers_.size() - 1;
0220 
0221     if (workers_.empty()) {
0222       ServiceRegistry::Operate guard(iToken);
0223       finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
0224       return;
0225     }
0226 
0227     runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
0228   }
0229 
0230   void Path::workerFinished(std::exception_ptr const* iException,
0231                             unsigned int iModuleIndex,
0232                             EventTransitionInfo const& iInfo,
0233                             ServiceToken const& iToken,
0234                             StreamID const& iID,
0235                             StreamContext const* iContext,
0236                             oneapi::tbb::task_group& iGroup) {
0237     EventPrincipal const& iEP = iInfo.principal();
0238     ServiceRegistry::Operate guard(iToken);
0239 
0240     //This call also allows the WorkerInPath to update statistics
0241     // so should be done even if an exception happened
0242     auto& worker = workers_[iModuleIndex];
0243     bool shouldContinue = worker.checkResultsOfRunWorker(true);
0244     std::exception_ptr finalException;
0245     if (iException) {
0246       shouldContinue = false;
0247       std::unique_ptr<cms::Exception> pEx;
0248       try {
0249         std::rethrow_exception(*iException);
0250       } catch (cms::Exception& oldEx) {
0251         pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
0252       } catch (std::exception const& oldEx) {
0253         pEx = std::make_unique<edm::Exception>(errors::StdException);
0254       } catch (...) {
0255         pEx = std::make_unique<edm::Exception>(errors::Unknown);
0256       }
0257       // Caught exception is propagated via WaitingTaskList
0258       CMS_SA_ALLOW try {
0259         std::ostringstream ost;
0260         ost << iEP.id();
0261         ModuleDescription const* desc = worker.getWorker()->description();
0262         assert(desc != nullptr);
0263         handleWorkerFailure(*pEx, iModuleIndex, *desc, ost.str());
0264         //If we didn't rethrow, then we effectively skipped
0265         worker.skipWorker(iEP);
0266       } catch (...) {
0267         finalException = std::current_exception();
0268         //set the exception early to avoid case where another Path is waiting
0269         // on a module in this Path and not running the module will lead to a
0270         // different but related exception in the other Path. We want this
0271         // Paths exception to be the one that gets reported.
0272         waitingTasks_.presetTaskAsFailed(finalException);
0273       }
0274     }
0275     auto const nextIndex = iModuleIndex + 1;
0276     if (shouldContinue and nextIndex < workers_.size()) {
0277       if (not worker.runConcurrently()) {
0278         --modulesToRun_;
0279         runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
0280         return;
0281       }
0282     }
0283 
0284     if (not shouldContinue) {
0285       threadsafe_setFailedModuleInfo(iModuleIndex, iException != nullptr);
0286     }
0287     if (not shouldContinue and not worker.runConcurrently()) {
0288       //we are leaving the path early
0289       for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
0290         --modulesToRun_;
0291         it->skipWorker(iEP);
0292       }
0293     }
0294     if (--modulesToRun_ == 0) {
0295       //The path should only be marked as finished once all outstanding modules finish
0296       finished(finalException, iContext, iInfo, iID);
0297     }
0298   }
0299 
0300   void Path::finished(std::exception_ptr iException,
0301                       StreamContext const* iContext,
0302                       EventTransitionInfo const& iInfo,
0303                       StreamID const& streamID) {
0304     updateCounters(state_);
0305     auto failedModuleBitPosition = bitPosition(failedModuleIndex_);
0306     recordStatus(failedModuleBitPosition, state_);
0307     // Caught exception is propagated via WaitingTaskList
0308     CMS_SA_ALLOW try {
0309       HLTPathStatus status(state_, failedModuleBitPosition);
0310 
0311       if (pathStatusInserter_) {  // pathStatusInserter is null for EndPaths
0312         pathStatusInserter_->setPathStatus(streamID, status);
0313       }
0314       if (pathStatusInserterWorker_) {
0315         std::exception_ptr jException =
0316             pathStatusInserterWorker_->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0317                 iInfo, streamID, ParentContext(iContext), iContext);
0318         if (jException && not iException) {
0319           iException = jException;
0320         }
0321       }
0322       actReg_->postPathEventSignal_(*iContext, pathContext_, status);
0323     } catch (...) {
0324       if (not iException) {
0325         iException = std::current_exception();
0326       }
0327     }
0328     waitingTasks_.doneWaiting(iException);
0329   }
0330 
0331   void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
0332                                 EventTransitionInfo const& iInfo,
0333                                 ServiceToken const& iToken,
0334                                 StreamID const& iID,
0335                                 StreamContext const* iContext,
0336                                 oneapi::tbb::task_group& iGroup) {
0337     //Figure out which next modules can run concurrently
0338     const int firstModuleIndex = iNextModuleIndex;
0339     int lastModuleIndex = firstModuleIndex;
0340     while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
0341       ++lastModuleIndex;
0342     }
0343     for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
0344       ServiceWeakToken weakToken = iToken;
0345       auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
0346                                             std::exception_ptr const* iException) {
0347         this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
0348       });
0349       workers_[lastModuleIndex].runWorkerAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0350           WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
0351     }
0352   }
0353 
0354 }  // namespace edm