Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-31 02:19:22

0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The StreamSchedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The StreamSchedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/WorkerRegistry.h"
0073 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0079 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0080 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0081 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0082 #include "FWCore/Concurrency/interface/FunctorTask.h"
0083 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0084 #include "FWCore/Utilities/interface/Algorithms.h"
0085 #include "FWCore/Utilities/interface/BranchType.h"
0086 #include "FWCore/Utilities/interface/ConvertException.h"
0087 #include "FWCore/Utilities/interface/Exception.h"
0088 #include "FWCore/Utilities/interface/StreamID.h"
0089 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0090 #include "FWCore/Utilities/interface/propagate_const.h"
0091 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0092 
0093 #include "oneapi/tbb/task_arena.h"
0094 
0095 #include <exception>
0096 #include <map>
0097 #include <memory>
0098 #include <mutex>
0099 #include <set>
0100 #include <string>
0101 #include <vector>
0102 #include <sstream>
0103 #include <atomic>
0104 #include <unordered_set>
0105 #include <utility>
0106 
0107 namespace edm {
0108 
0109   class BranchIDListHelper;
0110   class ExceptionCollector;
0111   class ExceptionToActionTable;
0112   class OutputModuleCommunicator;
0113   class UnscheduledCallProducer;
0114   class WorkerInPath;
0115   class ModuleRegistry;
0116   class TriggerResultInserter;
0117   class PathStatusInserter;
0118   class EndPathStatusInserter;
0119   class PreallocationConfiguration;
0120   class ConditionalTaskHelper;
0121 
0122   namespace service {
0123     class TriggerNamesService;
0124   }
0125 
0126   class StreamSchedule {
0127   public:
0128     typedef std::vector<std::string> vstring;
0129     typedef std::vector<Path> TrigPaths;
0130     typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0131     typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0132     typedef std::shared_ptr<Worker> WorkerPtr;
0133     typedef std::vector<Worker*> AllWorkers;
0134 
0135     typedef std::vector<Worker*> Workers;
0136 
0137     typedef std::vector<WorkerInPath> PathWorkers;
0138 
0139     StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0140                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0141                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0142                    std::shared_ptr<ModuleRegistry>,
0143                    ParameterSet& proc_pset,
0144                    service::TriggerNamesService const& tns,
0145                    PreallocationConfiguration const& prealloc,
0146                    SignallingProductRegistry& pregistry,
0147                    ExceptionToActionTable const& actions,
0148                    std::shared_ptr<ActivityRegistry> areg,
0149                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0150                    StreamID streamID,
0151                    ProcessContext const* processContext);
0152 
0153     StreamSchedule(StreamSchedule const&) = delete;
0154 
0155     void processOneEventAsync(
0156         WaitingTaskHolder iTask,
0157         EventTransitionInfo&,
0158         ServiceToken const& token,
0159         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0160 
0161     template <typename T>
0162     void processOneStreamAsync(WaitingTaskHolder iTask,
0163                                typename T::TransitionInfoType& transitionInfo,
0164                                ServiceToken const& token,
0165                                bool cleaningUpAfterException = false);
0166 
0167     void beginStream();
0168     void endStream(ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
0169 
0170     StreamID streamID() const { return streamID_; }
0171 
0172     /// Return a vector allowing const access to all the
0173     /// ModuleDescriptions for this StreamSchedule.
0174 
0175     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0176     /// *** passed to the caller. Do not call delete on these
0177     /// *** pointers!
0178     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0179 
0180     ///adds to oLabelsToFill the labels for all paths in the process
0181     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0182 
0183     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0184     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0185 
0186     void moduleDescriptionsInPath(std::string const& iPathLabel,
0187                                   std::vector<ModuleDescription const*>& descriptions,
0188                                   unsigned int hint) const;
0189 
0190     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0191                                      std::vector<ModuleDescription const*>& descriptions,
0192                                      unsigned int hint) const;
0193 
0194     /// Return the number of events this StreamSchedule has tried to process
0195     /// (inclues both successes and failures, including failures due
0196     /// to exceptions during processing).
0197     int totalEvents() const { return total_events_; }
0198 
0199     /// Return the number of events which have been passed by one or
0200     /// more trigger paths.
0201     int totalEventsPassed() const { return total_passed_; }
0202 
0203     /// Return the number of events that have not passed any trigger.
0204     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0205     int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0206 
0207     /// Return the trigger report information on paths,
0208     /// modules-in-path, modules-in-endpath, and modules.
0209     void getTriggerReport(TriggerReport& rep) const;
0210 
0211     ///  Clear all the counters in the trigger report.
0212     void clearCounters();
0213 
0214     /// clone the type of module with label iLabel but configure with iPSet.
0215     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0216 
0217     /// Delete the module with label iLabel
0218     void deleteModule(std::string const& iLabel);
0219 
0220     void initializeEarlyDelete(ModuleRegistry& modReg,
0221                                std::vector<std::string> const& branchesToDeleteEarly,
0222                                std::multimap<std::string, std::string> const& referencesToBranches,
0223                                std::vector<std::string> const& modulesToSkip,
0224                                edm::SignallingProductRegistry const& preg);
0225 
0226     /// returns the collection of pointers to workers
0227     AllWorkers const& allWorkersBeginEnd() const { return workerManagerBeginEnd_.allWorkers(); }
0228     AllWorkers const& allWorkersRuns() const { return workerManagerRuns_.allWorkers(); }
0229     AllWorkers const& allWorkersLumisAndEvents() const { return workerManagerLumisAndEvents_.allWorkers(); }
0230 
0231     AllWorkers const& unscheduledWorkersLumisAndEvents() const {
0232       return workerManagerLumisAndEvents_.unscheduledWorkers();
0233     }
0234     unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0235 
0236     StreamContext const& context() const { return streamContext_; }
0237 
0238     struct AliasInfo {
0239       std::string friendlyClassName;
0240       std::string instanceLabel;
0241       std::string originalInstanceLabel;
0242       std::string originalModuleLabel;
0243     };
0244 
0245   private:
0246     /// returns the action table
0247     ExceptionToActionTable const& actionTable() const { return workerManagerLumisAndEvents_.actionTable(); }
0248 
0249     void resetAll();
0250 
0251     void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0252 
0253     std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0254 
0255     void reportSkipped(EventPrincipal const& ep) const;
0256 
0257     std::vector<Worker*> tryToPlaceConditionalModules(
0258         Worker*,
0259         std::unordered_set<std::string>& conditionalModules,
0260         std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0261         std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0262         ParameterSet& proc_pset,
0263         SignallingProductRegistry& preg,
0264         PreallocationConfiguration const* prealloc,
0265         std::shared_ptr<ProcessConfiguration const> processConfiguration);
0266     void fillWorkers(ParameterSet& proc_pset,
0267                      SignallingProductRegistry& preg,
0268                      PreallocationConfiguration const* prealloc,
0269                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0270                      std::string const& name,
0271                      bool ignoreFilters,
0272                      PathWorkers& out,
0273                      std::vector<std::string> const& endPathNames,
0274                      ConditionalTaskHelper const& conditionalTaskHelper,
0275                      std::unordered_set<std::string>& allConditionalModules);
0276     void fillTrigPath(ParameterSet& proc_pset,
0277                       SignallingProductRegistry& preg,
0278                       PreallocationConfiguration const* prealloc,
0279                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0280                       int bitpos,
0281                       std::string const& name,
0282                       TrigResPtr,
0283                       std::vector<std::string> const& endPathNames,
0284                       ConditionalTaskHelper const& conditionalTaskHelper,
0285                       std::unordered_set<std::string>& allConditionalModules);
0286     void fillEndPath(ParameterSet& proc_pset,
0287                      SignallingProductRegistry& preg,
0288                      PreallocationConfiguration const* prealloc,
0289                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0290                      int bitpos,
0291                      std::string const& name,
0292                      std::vector<std::string> const& endPathNames,
0293                      ConditionalTaskHelper const& conditionalTaskHelper,
0294                      std::unordered_set<std::string>& allConditionalModules);
0295 
0296     void addToAllWorkers(Worker* w);
0297 
0298     void resetEarlyDelete();
0299 
0300     TrigResConstPtr results() const { return get_underlying_safe(results_); }
0301     TrigResPtr& results() { return get_underlying_safe(results_); }
0302 
0303     void makePathStatusInserters(
0304         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0305         std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0306         ExceptionToActionTable const& actions);
0307 
0308     template <typename T>
0309     void preScheduleSignal(StreamContext const*) const;
0310 
0311     template <typename T>
0312     void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;
0313 
0314     void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;
0315 
0316     WorkerManager workerManagerBeginEnd_;
0317     WorkerManager workerManagerRuns_;
0318     WorkerManager workerManagerLumisAndEvents_;
0319     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0320 
0321     edm::propagate_const<TrigResPtr> results_;
0322 
0323     edm::propagate_const<WorkerPtr> results_inserter_;
0324     std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0325     std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0326 
0327     TrigPaths trig_paths_;
0328     TrigPaths end_paths_;
0329     std::vector<int> empty_trig_paths_;
0330     std::vector<int> empty_end_paths_;
0331 
0332     //For each branch that has been marked for early deletion
0333     // keep track of how many modules are left that read this data but have
0334     // not yet been run in this event
0335     std::vector<BranchToCount> earlyDeleteBranchToCount_;
0336     //NOTE the following is effectively internal data for each EarlyDeleteHelper
0337     // but putting it into one vector makes for better allocation as well as
0338     // faster iteration when used to reset the earlyDeleteBranchToCount_
0339     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
0340     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
0341     // tell which EarlyDeleteHelper is associated with which BranchIDs.
0342     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0343     //There is one EarlyDeleteHelper per Module which are reading data that
0344     // has been marked for early deletion
0345     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0346 
0347     int total_events_;
0348     int total_passed_;
0349     unsigned int number_of_unscheduled_modules_;
0350 
0351     StreamID streamID_;
0352     StreamContext streamContext_;
0353   };
0354 
0355   void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0356     Service<JobReport> reportSvc;
0357     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0358   }
0359 
0360   template <typename T>
0361   void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0362                                              typename T::TransitionInfoType& transitionInfo,
0363                                              ServiceToken const& token,
0364                                              bool cleaningUpAfterException) {
0365     auto group = iHolder.group();
0366     auto const& principal = transitionInfo.principal();
0367     T::setStreamContext(streamContext_, principal);
0368 
0369     ServiceWeakToken weakToken = token;
0370     auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
0371                                           std::exception_ptr const* iPtr) mutable {
0372       std::exception_ptr excpt;
0373       {
0374         ServiceRegistry::Operate op(weakToken.lock());
0375 
0376         if (iPtr) {
0377           excpt = *iPtr;
0378           handleException(streamContext_, cleaningUpAfterException, excpt);
0379         }
0380         postScheduleSignal<T>(&streamContext_, excpt);
0381       }  // release service token before calling doneWaiting
0382       iHolder.doneWaiting(excpt);
0383     });
0384 
0385     auto task =
0386         make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
0387           auto token = weakToken.lock();
0388           ServiceRegistry::Operate op(token);
0389           // Caught exception is propagated via WaitingTaskHolder
0390           WorkerManager* workerManager = &workerManagerRuns_;
0391           if (T::branchType_ == InLumi) {
0392             workerManager = &workerManagerLumisAndEvents_;
0393           }
0394           CMS_SA_ALLOW try {
0395             preScheduleSignal<T>(&streamContext_);
0396             workerManager->resetAll();
0397           } catch (...) {
0398             // Just remember the exception at this point,
0399             // let the destructor of h call doneWaiting() so the
0400             // ServiceRegistry::Operator object is destroyed first
0401             h.presetTaskAsFailed(std::current_exception());
0402             return;
0403           }
0404 
0405           workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0406         });
0407 
0408     if (streamID_.value() == 0) {
0409       //Enqueueing will start another thread if there is only
0410       // one thread in the job. Having stream == 0 use spawn
0411       // avoids starting up another thread when there is only one stream.
0412       group->run([task]() {
0413         TaskSentry s{task};
0414         task->execute();
0415       });
0416     } else {
0417       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0418       arena.enqueue([task]() {
0419         TaskSentry s{task};
0420         task->execute();
0421       });
0422     }
0423   }
0424 
0425   template <typename T>
0426   void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
0427     try {
0428       convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
0429     } catch (cms::Exception& ex) {
0430       std::ostringstream ost;
0431       ex.addContext("Handling pre signal, likely in a service function");
0432       exceptionContext(ost, *streamContext);
0433       ex.addContext(ost.str());
0434       throw;
0435     }
0436   }
0437 
0438   template <typename T>
0439   void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
0440                                           std::exception_ptr& excpt) const noexcept {
0441     try {
0442       convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
0443     } catch (cms::Exception& ex) {
0444       if (not excpt) {
0445         std::ostringstream ost;
0446         ex.addContext("Handling post signal, likely in a service function");
0447         exceptionContext(ost, *streamContext);
0448         ex.addContext(ost.str());
0449         excpt = std::current_exception();
0450       }
0451     }
0452   }
0453 }  // namespace edm
0454 
0455 #endif