Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-04-08 00:35:12

0001 
0002 #include "FWCore/Framework/interface/Path.h"
0003 #include "FWCore/Framework/interface/EventPrincipal.h"
0004 #include "FWCore/Framework/interface/ExceptionActions.h"
0005 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0006 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0007 #include "FWCore/Framework/src/PathStatusInserter.h"
0008 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0009 #include "FWCore/ServiceRegistry/interface/ParentContext.h"
0010 #include "FWCore/Utilities/interface/Algorithms.h"
0011 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0012 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0013 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0014 
0015 #include <algorithm>
0016 
0017 namespace edm {
0018   Path::Path(int bitpos,
0019              std::string const& path_name,
0020              WorkersInPath const& workers,
0021              TrigResPtr trptr,
0022              ExceptionToActionTable const& actions,
0023              std::shared_ptr<ActivityRegistry> areg,
0024              StreamContext const* streamContext,
0025              std::atomic<bool>* stopProcessingEvent,
0026              PathContext::PathType pathType)
0027       : timesRun_(),
0028         timesPassed_(),
0029         timesFailed_(),
0030         timesExcept_(),
0031         failedModuleIndex_(workers.size()),
0032         state_(hlt::Ready),
0033         bitpos_(bitpos),
0034         trptr_(trptr),
0035         actReg_(areg),
0036         act_table_(&actions),
0037         workers_(workers),
0038         pathContext_(path_name, streamContext, bitpos, pathType),
0039         stopProcessingEvent_(stopProcessingEvent),
0040         pathStatusInserter_(nullptr),
0041         pathStatusInserterWorker_(nullptr) {
0042     for (auto& workerInPath : workers_) {
0043       workerInPath.setPathContext(&pathContext_);
0044     }
0045     modulesToRun_ = workers_.size();
0046   }
0047 
0048   Path::Path(Path const& r)
0049       : timesRun_(r.timesRun_),
0050         timesPassed_(r.timesPassed_),
0051         timesFailed_(r.timesFailed_),
0052         timesExcept_(r.timesExcept_),
0053         failedModuleIndex_(r.failedModuleIndex_),
0054         state_(r.state_),
0055         bitpos_(r.bitpos_),
0056         trptr_(r.trptr_),
0057         actReg_(r.actReg_),
0058         act_table_(r.act_table_),
0059         workers_(r.workers_),
0060         pathContext_(r.pathContext_),
0061         stopProcessingEvent_(r.stopProcessingEvent_),
0062         pathStatusInserter_(r.pathStatusInserter_),
0063         pathStatusInserterWorker_(r.pathStatusInserterWorker_) {
0064     for (auto& workerInPath : workers_) {
0065       workerInPath.setPathContext(&pathContext_);
0066     }
0067     modulesToRun_ = workers_.size();
0068   }
0069 
0070   bool Path::handleWorkerFailure(cms::Exception& e,
0071                                  int nwrwue,
0072                                  bool isEvent,
0073                                  bool begin,
0074                                  BranchType branchType,
0075                                  ModuleDescription const& desc,
0076                                  std::string const& id) const {
0077     if (e.context().empty()) {
0078       exceptionContext(e, isEvent, begin, branchType, desc, id, pathContext_);
0079     }
0080     bool should_continue = true;
0081 
0082     // there is no support as of yet for specific paths having
0083     // different exception behavior
0084 
0085     // If not processing an event, always rethrow.
0086     exception_actions::ActionCodes action = (isEvent ? act_table_->find(e.category()) : exception_actions::Rethrow);
0087     switch (action) {
0088       case exception_actions::FailPath: {
0089         should_continue = false;
0090         edm::printCmsExceptionWarning("FailPath", e);
0091         break;
0092       }
0093       case exception_actions::SkipEvent: {
0094         //Need the other Paths to stop as soon as possible
0095         if (stopProcessingEvent_) {
0096           *stopProcessingEvent_ = true;
0097         }
0098         break;
0099       }
0100       default: {
0101         if (action == exception_actions::Rethrow) {
0102           std::string pNF = Exception::codeToString(errors::ProductNotFound);
0103           if (e.category() == pNF) {
0104             std::ostringstream ost;
0105             ost << "If you wish to continue processing events after a " << pNF << " exception,\n"
0106                 << "add \"SkipEvent = cms.untracked.vstring('ProductNotFound')\" to the \"options\" PSet in the "
0107                    "configuration.\n";
0108             e.addAdditionalInfo(ost.str());
0109           }
0110         }
0111         //throw will copy which will slice the object
0112         e.raise();
0113       }
0114     }
0115 
0116     return should_continue;
0117   }
0118 
0119   void Path::exceptionContext(cms::Exception& ex,
0120                               bool isEvent,
0121                               bool begin,
0122                               BranchType branchType,
0123                               ModuleDescription const& desc,
0124                               std::string const& id,
0125                               PathContext const& pathContext) {
0126     std::ostringstream ost;
0127     ost << "Running path '" << pathContext.pathName() << "'";
0128     ex.addContext(ost.str());
0129     ost.str("");
0130     ost << "Processing ";
0131     //For the event case, the Worker has already
0132     // added the necessary module context to the exception
0133     if (begin && branchType == InRun) {
0134       ost << "stream begin Run";
0135     } else if (begin && branchType == InLumi) {
0136       ost << "stream begin LuminosityBlock ";
0137     } else if (!begin && branchType == InLumi) {
0138       ost << "stream end LuminosityBlock ";
0139     } else if (!begin && branchType == InRun) {
0140       ost << "stream end Run ";
0141     } else if (isEvent) {
0142       // It should be impossible to get here ...
0143       ost << "Event ";
0144     }
0145     ost << id;
0146     ex.addContext(ost.str());
0147   }
0148 
0149   void Path::threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr iExcept) {
0150     bool expected = false;
0151     while (stateLock_.compare_exchange_strong(expected, true)) {
0152       expected = false;
0153     }
0154     if (iExcept) {
0155       if (state_ == hlt::Exception) {
0156         if (nwrwue < failedModuleIndex_) {
0157           failedModuleIndex_ = nwrwue;
0158         }
0159       } else {
0160         state_ = hlt::Exception;
0161         failedModuleIndex_ = nwrwue;
0162       }
0163     } else {
0164       if (state_ != hlt::Exception) {
0165         if (nwrwue < failedModuleIndex_) {
0166           failedModuleIndex_ = nwrwue;
0167         }
0168         state_ = hlt::Fail;
0169       }
0170     }
0171 
0172     stateLock_ = false;
0173   }
0174 
0175   void Path::recordStatus(int nwrwue, hlt::HLTState state) {
0176     if (trptr_) {
0177       (*trptr_)[bitpos_] = HLTPathStatus(state, nwrwue);
0178     }
0179   }
0180 
0181   void Path::updateCounters(hlt::HLTState state) {
0182     switch (state) {
0183       case hlt::Pass: {
0184         ++timesPassed_;
0185         break;
0186       }
0187       case hlt::Fail: {
0188         ++timesFailed_;
0189         break;
0190       }
0191       case hlt::Exception: {
0192         ++timesExcept_;
0193       }
0194       default:;
0195     }
0196   }
0197 
0198   void Path::clearCounters() {
0199     using std::placeholders::_1;
0200     timesRun_ = timesPassed_ = timesFailed_ = timesExcept_ = 0;
0201     for_all(workers_, std::bind(&WorkerInPath::clearCounters, _1));
0202   }
0203 
0204   void Path::setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const& iWorkerToDeleter) {
0205     for (unsigned int index = 0; index != size(); ++index) {
0206       auto found = iWorkerToDeleter.find(getWorker(index));
0207       if (found != iWorkerToDeleter.end()) {
0208         found->second->addedToPath();
0209       }
0210     }
0211   }
0212 
0213   void Path::setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker) {
0214     pathStatusInserter_ = pathStatusInserter;
0215     pathStatusInserterWorker_ = pathStatusInserterWorker;
0216   }
0217 
0218   void Path::processOneOccurrenceAsync(WaitingTaskHolder iTask,
0219                                        EventTransitionInfo const& iInfo,
0220                                        ServiceToken const& iToken,
0221                                        StreamID const& iStreamID,
0222                                        StreamContext const* iStreamContext) {
0223     waitingTasks_.reset();
0224     modulesToRun_ = workers_.size();
0225     ++timesRun_;
0226     waitingTasks_.add(iTask);
0227     if (actReg_) {
0228       ServiceRegistry::Operate guard(iToken);
0229       actReg_->prePathEventSignal_(*iStreamContext, pathContext_);
0230     }
0231     //If the Path succeeds, these are the values we have at the end
0232     state_ = hlt::Pass;
0233     failedModuleIndex_ = workers_.size() - 1;
0234 
0235     if (workers_.empty()) {
0236       ServiceRegistry::Operate guard(iToken);
0237       finished(std::exception_ptr(), iStreamContext, iInfo, iStreamID);
0238       return;
0239     }
0240 
0241     runNextWorkerAsync(0, iInfo, iToken, iStreamID, iStreamContext, *iTask.group());
0242   }
0243 
0244   void Path::workerFinished(std::exception_ptr const* iException,
0245                             unsigned int iModuleIndex,
0246                             EventTransitionInfo const& iInfo,
0247                             ServiceToken const& iToken,
0248                             StreamID const& iID,
0249                             StreamContext const* iContext,
0250                             oneapi::tbb::task_group& iGroup) {
0251     EventPrincipal const& iEP = iInfo.principal();
0252     ServiceRegistry::Operate guard(iToken);
0253 
0254     //This call also allows the WorkerInPath to update statistics
0255     // so should be done even if an exception happened
0256     auto& worker = workers_[iModuleIndex];
0257     bool shouldContinue = worker.checkResultsOfRunWorker(true);
0258     std::exception_ptr finalException;
0259     if (iException) {
0260       std::unique_ptr<cms::Exception> pEx;
0261       try {
0262         std::rethrow_exception(*iException);
0263       } catch (cms::Exception& oldEx) {
0264         pEx = std::unique_ptr<cms::Exception>(oldEx.clone());
0265       } catch (std::exception const& oldEx) {
0266         pEx = std::make_unique<edm::Exception>(errors::StdException);
0267       } catch (...) {
0268         pEx = std::make_unique<edm::Exception>(errors::Unknown);
0269       }
0270       // Caught exception is propagated via WaitingTaskList
0271       CMS_SA_ALLOW try {
0272         std::ostringstream ost;
0273         ost << iEP.id();
0274         ModuleDescription const* desc = worker.getWorker()->description();
0275         assert(desc != nullptr);
0276         shouldContinue = handleWorkerFailure(*pEx,
0277                                              iModuleIndex,
0278                                              /*isEvent*/ true,
0279                                              /*isBegin*/ true,
0280                                              InEvent,
0281                                              *desc,
0282                                              ost.str());
0283         //If we didn't rethrow, then we effectively skipped
0284         worker.skipWorker(iEP);
0285         finalException = std::exception_ptr();
0286       } catch (...) {
0287         shouldContinue = false;
0288         finalException = std::current_exception();
0289         //set the exception early to avoid case where another Path is waiting
0290         // on a module in this Path and not running the module will lead to a
0291         // different but related exception in the other Path. We want this
0292         // Paths exception to be the one that gets reported.
0293         waitingTasks_.presetTaskAsFailed(finalException);
0294       }
0295     }
0296     if (stopProcessingEvent_ and *stopProcessingEvent_) {
0297       shouldContinue = false;
0298     }
0299     auto const nextIndex = iModuleIndex + 1;
0300     if (shouldContinue and nextIndex < workers_.size()) {
0301       if (not worker.runConcurrently()) {
0302         --modulesToRun_;
0303         runNextWorkerAsync(nextIndex, iInfo, iToken, iID, iContext, iGroup);
0304         return;
0305       }
0306     }
0307 
0308     if (not shouldContinue) {
0309       threadsafe_setFailedModuleInfo(iModuleIndex, finalException);
0310     }
0311     if (not shouldContinue and not worker.runConcurrently()) {
0312       //we are leaving the path early
0313       for (auto it = workers_.begin() + nextIndex, itEnd = workers_.end(); it != itEnd; ++it) {
0314         --modulesToRun_;
0315         it->skipWorker(iEP);
0316       }
0317     }
0318     if (--modulesToRun_ == 0) {
0319       //The path should only be marked as finished once all outstanding modules finish
0320       finished(finalException, iContext, iInfo, iID);
0321     }
0322   }
0323 
0324   void Path::finished(std::exception_ptr iException,
0325                       StreamContext const* iContext,
0326                       EventTransitionInfo const& iInfo,
0327                       StreamID const& streamID) {
0328     updateCounters(state_);
0329     auto failedModuleBitPosition = bitPosition(failedModuleIndex_);
0330     recordStatus(failedModuleBitPosition, state_);
0331     // Caught exception is propagated via WaitingTaskList
0332     CMS_SA_ALLOW try {
0333       HLTPathStatus status(state_, failedModuleBitPosition);
0334 
0335       if (pathStatusInserter_) {  // pathStatusInserter is null for EndPaths
0336         pathStatusInserter_->setPathStatus(streamID, status);
0337       }
0338       std::exception_ptr jException =
0339           pathStatusInserterWorker_->runModuleDirectly<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0340               iInfo, streamID, ParentContext(iContext), iContext);
0341       if (jException && not iException) {
0342         iException = jException;
0343       }
0344       actReg_->postPathEventSignal_(*iContext, pathContext_, status);
0345     } catch (...) {
0346       if (not iException) {
0347         iException = std::current_exception();
0348       }
0349     }
0350     waitingTasks_.doneWaiting(iException);
0351   }
0352 
0353   void Path::runNextWorkerAsync(unsigned int iNextModuleIndex,
0354                                 EventTransitionInfo const& iInfo,
0355                                 ServiceToken const& iToken,
0356                                 StreamID const& iID,
0357                                 StreamContext const* iContext,
0358                                 oneapi::tbb::task_group& iGroup) {
0359     //Figure out which next modules can run concurrently
0360     const int firstModuleIndex = iNextModuleIndex;
0361     int lastModuleIndex = firstModuleIndex;
0362     while (lastModuleIndex + 1 != static_cast<int>(workers_.size()) and workers_[lastModuleIndex].runConcurrently()) {
0363       ++lastModuleIndex;
0364     }
0365     for (; lastModuleIndex >= firstModuleIndex; --lastModuleIndex) {
0366       ServiceWeakToken weakToken = iToken;
0367       auto nextTask = make_waiting_task([this, lastModuleIndex, info = iInfo, iID, iContext, weakToken, &iGroup](
0368                                             std::exception_ptr const* iException) {
0369         this->workerFinished(iException, lastModuleIndex, info, weakToken.lock(), iID, iContext, iGroup);
0370       });
0371       workers_[lastModuleIndex].runWorkerAsync<OccurrenceTraits<EventPrincipal, BranchActionStreamBegin>>(
0372           WaitingTaskHolder(iGroup, nextTask), iInfo, iToken, iID, iContext);
0373     }
0374   }
0375 
0376 }  // namespace edm