Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:02:05

0001 #ifndef FWCore_Framework_Path_h
0002 #define FWCore_Framework_Path_h
0003 
0004 /*
0005   Author: Jim Kowalkowski 28-01-06
0006 
0007   An object of this type represents one path in a job configuration.
0008   It holds the assigned bit position and the list of workers that are
0009   an event must pass through when this parh is processed.  The workers
0010   are held in WorkerInPath wrappers so that per path execution statistics
0011   can be kept for each worker.
0012 */
0013 
0014 #include "FWCore/Framework/interface/WorkerInPath.h"
0015 #include "FWCore/Framework/interface/maker/Worker.h"
0016 #include "DataFormats/Common/interface/HLTenums.h"
0017 #include "DataFormats/Common/interface/TriggerResults.h"
0018 #include "FWCore/ServiceRegistry/interface/PathContext.h"
0019 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0020 #include "FWCore/Utilities/interface/BranchType.h"
0021 #include "FWCore/Utilities/interface/Exception.h"
0022 #include "FWCore/Utilities/interface/ConvertException.h"
0023 #include "FWCore/Utilities/interface/make_sentry.h"
0024 
0025 #include <memory>
0026 
0027 #include <string>
0028 #include <vector>
0029 #include <map>
0030 #include <exception>
0031 #include <sstream>
0032 
0033 namespace edm {
0034   class EventTransitionInfo;
0035   class ModuleDescription;
0036   class PathStatusInserter;
0037   class EarlyDeleteHelper;
0038   class StreamContext;
0039   class StreamID;
0040 
0041   class Path {
0042   public:
0043     typedef hlt::HLTState State;
0044 
0045     typedef std::vector<WorkerInPath> WorkersInPath;
0046     typedef WorkersInPath::size_type size_type;
0047     typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0048 
0049     Path(int bitpos,
0050          std::string const& path_name,
0051          WorkersInPath const& workers,
0052          TrigResPtr trptr,
0053          ExceptionToActionTable const& actions,
0054          std::shared_ptr<ActivityRegistry> reg,
0055          StreamContext const* streamContext,
0056          std::atomic<bool>* stopProcessEvent,
0057          PathContext::PathType pathType);
0058 
0059     Path(Path const&);
0060 
0061     Path& operator=(Path const&) = delete;
0062 
0063     template <typename T>
0064     void runAllModulesAsync(WaitingTaskHolder,
0065                             typename T::TransitionInfoType const&,
0066                             ServiceToken const&,
0067                             StreamID const&,
0068                             typename T::Context const*);
0069 
0070     void processOneOccurrenceAsync(
0071         WaitingTaskHolder, EventTransitionInfo const&, ServiceToken const&, StreamID const&, StreamContext const*);
0072 
0073     int bitPosition() const { return bitpos_; }
0074     std::string const& name() const { return pathContext_.pathName(); }
0075 
0076     void clearCounters();
0077 
0078     int timesRun() const { return timesRun_; }
0079     int timesPassed() const { return timesPassed_; }
0080     int timesFailed() const { return timesFailed_; }
0081     int timesExcept() const { return timesExcept_; }
0082     //int abortWorker() const { return abortWorker_; }
0083 
0084     size_type size() const { return workers_.size(); }
0085     int timesVisited(size_type i) const { return workers_.at(i).timesVisited(); }
0086     int timesPassed(size_type i) const { return workers_.at(i).timesPassed(); }
0087     int timesFailed(size_type i) const { return workers_.at(i).timesFailed(); }
0088     int timesExcept(size_type i) const { return workers_.at(i).timesExcept(); }
0089     Worker const* getWorker(size_type i) const { return workers_.at(i).getWorker(); }
0090     unsigned int bitPosition(size_type i) const { return workers_.at(i).bitPosition(); }
0091 
0092     void setEarlyDeleteHelpers(std::map<const Worker*, EarlyDeleteHelper*> const&);
0093 
0094     void setPathStatusInserter(PathStatusInserter* pathStatusInserter, Worker* pathStatusInserterWorker);
0095 
0096   private:
0097     int timesRun_;
0098     int timesPassed_;
0099     int timesFailed_;
0100     int timesExcept_;
0101     //int abortWorker_;
0102     //When an exception happens, it is possible for multiple modules in a path to fail
0103     // and then try to change the state concurrently.
0104     std::atomic<bool> stateLock_ = false;
0105     CMS_THREAD_GUARD(stateLock_) int failedModuleIndex_;
0106     CMS_THREAD_GUARD(stateLock_) State state_;
0107 
0108     int const bitpos_;
0109     TrigResPtr const trptr_;
0110     // We do not use propagate_const because the registry itself is mutable.
0111     std::shared_ptr<ActivityRegistry> const actReg_;
0112     ExceptionToActionTable const* const act_table_;
0113 
0114     WorkersInPath workers_;
0115 
0116     PathContext pathContext_;
0117     WaitingTaskList waitingTasks_;
0118     std::atomic<bool>* const stopProcessingEvent_;
0119     std::atomic<unsigned int> modulesToRun_;
0120 
0121     PathStatusInserter* pathStatusInserter_;
0122     Worker* pathStatusInserterWorker_;
0123 
0124     // Helper functions
0125     // nwrwue = numWorkersRunWithoutUnhandledException (really!)
0126     bool handleWorkerFailure(cms::Exception& e,
0127                              int nwrwue,
0128                              bool isEvent,
0129                              bool begin,
0130                              BranchType branchType,
0131                              ModuleDescription const&,
0132                              std::string const& id) const;
0133     static void exceptionContext(cms::Exception& ex,
0134                                  bool isEvent,
0135                                  bool begin,
0136                                  BranchType branchType,
0137                                  ModuleDescription const&,
0138                                  std::string const& id,
0139                                  PathContext const&);
0140     void threadsafe_setFailedModuleInfo(int nwrwue, std::exception_ptr);
0141     void recordStatus(int nwrwue, hlt::HLTState state);
0142     void updateCounters(hlt::HLTState state);
0143 
0144     void finished(std::exception_ptr, StreamContext const*, EventTransitionInfo const&, StreamID const&);
0145 
0146     //Handle asynchronous processing
0147     void workerFinished(std::exception_ptr const*,
0148                         unsigned int iModuleIndex,
0149                         EventTransitionInfo const&,
0150                         ServiceToken const&,
0151                         StreamID const&,
0152                         StreamContext const*,
0153                         oneapi::tbb::task_group& iGroup);
0154     void runNextWorkerAsync(unsigned int iNextModuleIndex,
0155                             EventTransitionInfo const&,
0156                             ServiceToken const&,
0157                             StreamID const&,
0158                             StreamContext const*,
0159                             oneapi::tbb::task_group& iGroup);
0160   };
0161 
0162   namespace {
0163     template <typename T>
0164     class PathSignalSentry {
0165     public:
0166       PathSignalSentry(ActivityRegistry* a,
0167                        int const& nwrwue,
0168                        hlt::HLTState const& state,
0169                        PathContext const* pathContext)
0170           : a_(a), nwrwue_(nwrwue), state_(state), pathContext_(pathContext) {
0171         if (a_)
0172           T::prePathSignal(a_, pathContext_);
0173       }
0174       ~PathSignalSentry() {
0175         HLTPathStatus status(state_, nwrwue_);
0176         if (a_)
0177           T::postPathSignal(a_, status, pathContext_);
0178       }
0179 
0180     private:
0181       ActivityRegistry* a_;  // We do not use propagate_const because the registry itself is mutable.
0182       int const& nwrwue_;
0183       hlt::HLTState const& state_;
0184       PathContext const* pathContext_;
0185     };
0186   }  // namespace
0187 
0188   template <typename T>
0189   void Path::runAllModulesAsync(WaitingTaskHolder task,
0190                                 typename T::TransitionInfoType const& info,
0191                                 ServiceToken const& token,
0192                                 StreamID const& streamID,
0193                                 typename T::Context const* context) {
0194     for (auto& worker : workers_) {
0195       worker.runWorkerAsync<T>(task, info, token, streamID, context);
0196     }
0197   }
0198 
0199 }  // namespace edm
0200 
0201 #endif