Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:02

0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The StreamSchedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The StreamSchedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0074 #include "FWCore/MessageLogger/interface/JobReport.h"
0075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0076 #include "FWCore/ServiceRegistry/interface/Service.h"
0077 #include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"
0078 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0079 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
0080 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0081 #include "FWCore/Concurrency/interface/FunctorTask.h"
0082 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0083 #include "FWCore/Utilities/interface/Algorithms.h"
0084 #include "FWCore/Utilities/interface/BranchType.h"
0085 #include "FWCore/Utilities/interface/ConvertException.h"
0086 #include "FWCore/Utilities/interface/Exception.h"
0087 #include "FWCore/Utilities/interface/StreamID.h"
0088 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0089 #include "FWCore/Utilities/interface/propagate_const.h"
0090 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0091 
0092 #include "oneapi/tbb/task_arena.h"
0093 
0094 #include <exception>
0095 #include <map>
0096 #include <memory>
0097 #include <mutex>
0098 #include <set>
0099 #include <string>
0100 #include <vector>
0101 #include <sstream>
0102 #include <atomic>
0103 #include <unordered_set>
0104 #include <utility>
0105 
0106 namespace edm {
0107 
0108   class BranchIDListHelper;
0109   class ExceptionCollector;
0110   class ExceptionToActionTable;
0111   class OutputModuleCommunicator;
0112   class UnscheduledCallProducer;
0113   class WorkerInPath;
0114   class ModuleRegistry;
0115   class TriggerResultInserter;
0116   class PathStatusInserter;
0117   class EndPathStatusInserter;
0118   class PreallocationConfiguration;
0119   class ConditionalTaskHelper;
0120 
0121   namespace service {
0122     class TriggerNamesService;
0123   }
0124 
0125   class StreamSchedule {
0126   public:
0127     typedef std::vector<std::string> vstring;
0128     typedef std::vector<Path> TrigPaths;
0129     typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0130     typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0131     typedef std::vector<Worker*> AllWorkers;
0132 
0133     typedef std::vector<Worker*> Workers;
0134 
0135     typedef std::vector<WorkerInPath> PathWorkers;
0136 
0137     StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0138                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0139                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0140                    std::shared_ptr<ModuleRegistry>,
0141                    ParameterSet& proc_pset,
0142                    service::TriggerNamesService const& tns,
0143                    PreallocationConfiguration const& prealloc,
0144                    SignallingProductRegistryFiller& pregistry,
0145                    ExceptionToActionTable const& actions,
0146                    std::shared_ptr<ActivityRegistry> areg,
0147                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0148                    StreamID streamID,
0149                    ProcessContext const* processContext);
0150 
0151     StreamSchedule(StreamSchedule const&) = delete;
0152 
0153     void processOneEventAsync(
0154         WaitingTaskHolder iTask,
0155         EventTransitionInfo&,
0156         ServiceToken const& token,
0157         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0158 
0159     template <typename T>
0160     void processOneStreamAsync(WaitingTaskHolder iTask,
0161                                typename T::TransitionInfoType& transitionInfo,
0162                                ServiceToken const& token,
0163                                bool cleaningUpAfterException = false);
0164 
0165     void beginStream(ModuleRegistry& iModuleRegistry);
0166     void endStream(ModuleRegistry& iModuleRegistry, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
0167 
0168     StreamID streamID() const { return streamID_; }
0169 
0170     /// Return a vector allowing const access to all the
0171     /// ModuleDescriptions for this StreamSchedule.
0172 
0173     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0174     /// *** passed to the caller. Do not call delete on these
0175     /// *** pointers!
0176     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0177 
0178     ///adds to oLabelsToFill the labels for all paths in the process
0179     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0180 
0181     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0182     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0183 
0184     void moduleDescriptionsInPath(std::string const& iPathLabel,
0185                                   std::vector<ModuleDescription const*>& descriptions,
0186                                   unsigned int hint) const;
0187 
0188     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0189                                      std::vector<ModuleDescription const*>& descriptions,
0190                                      unsigned int hint) const;
0191 
0192     /// Return the number of events this StreamSchedule has tried to process
0193     /// (inclues both successes and failures, including failures due
0194     /// to exceptions during processing).
0195     int totalEvents() const { return total_events_; }
0196 
0197     /// Return the number of events which have been passed by one or
0198     /// more trigger paths.
0199     int totalEventsPassed() const { return total_passed_; }
0200 
0201     /// Return the number of events that have not passed any trigger.
0202     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0203     int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0204 
0205     /// Return the trigger report information on paths,
0206     /// modules-in-path, modules-in-endpath, and modules.
0207     void getTriggerReport(TriggerReport& rep) const;
0208 
0209     ///  Clear all the counters in the trigger report.
0210     void clearCounters();
0211 
0212     /// clone the type of module with label iLabel but configure with iPSet.
0213     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0214 
0215     /// Delete the module with label iLabel
0216     void deleteModule(std::string const& iLabel);
0217 
0218     void initializeEarlyDelete(ModuleRegistry& modReg,
0219                                std::vector<std::string> const& branchesToDeleteEarly,
0220                                std::multimap<std::string, std::string> const& referencesToBranches,
0221                                std::vector<std::string> const& modulesToSkip,
0222                                edm::ProductRegistry const& preg);
0223 
0224     /// returns the collection of pointers to workers
0225     AllWorkers const& allWorkersRuns() const { return workerManagerRuns_.allWorkers(); }
0226     AllWorkers const& allWorkersLumisAndEvents() const { return workerManagerLumisAndEvents_.allWorkers(); }
0227 
0228     AllWorkers const& unscheduledWorkersLumisAndEvents() const {
0229       return workerManagerLumisAndEvents_.unscheduledWorkers();
0230     }
0231     unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0232 
0233     StreamContext const& context() const { return streamContext_; }
0234 
0235     struct AliasInfo {
0236       std::string friendlyClassName;
0237       std::string instanceLabel;
0238       std::string originalInstanceLabel;
0239       std::string originalModuleLabel;
0240     };
0241 
0242   private:
0243     /// returns the action table
0244     ExceptionToActionTable const& actionTable() const { return workerManagerLumisAndEvents_.actionTable(); }
0245 
0246     void resetAll();
0247 
0248     void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0249 
0250     std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0251 
0252     void reportSkipped(EventPrincipal const& ep) const;
0253 
0254     std::vector<Worker*> tryToPlaceConditionalModules(
0255         Worker*,
0256         std::unordered_set<std::string>& conditionalModules,
0257         std::unordered_multimap<std::string, edm::ProductDescription const*> const& conditionalModuleBranches,
0258         std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0259         ParameterSet& proc_pset,
0260         SignallingProductRegistryFiller& preg,
0261         PreallocationConfiguration const* prealloc,
0262         std::shared_ptr<ProcessConfiguration const> processConfiguration);
0263     PathWorkers fillWorkers(ParameterSet& proc_pset,
0264                             SignallingProductRegistryFiller& preg,
0265                             PreallocationConfiguration const* prealloc,
0266                             std::shared_ptr<ProcessConfiguration const> processConfiguration,
0267                             std::string const& name,
0268                             bool ignoreFilters,
0269                             std::vector<std::string> const& endPathNames,
0270                             ConditionalTaskHelper const& conditionalTaskHelper,
0271                             std::unordered_set<std::string>& allConditionalModules);
0272     void fillTrigPath(ParameterSet& proc_pset,
0273                       SignallingProductRegistryFiller& preg,
0274                       PreallocationConfiguration const* prealloc,
0275                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0276                       int bitpos,
0277                       std::string const& name,
0278                       TrigResPtr,
0279                       std::vector<std::string> const& endPathNames,
0280                       ConditionalTaskHelper const& conditionalTaskHelper,
0281                       std::unordered_set<std::string>& allConditionalModules);
0282     void fillEndPath(ParameterSet& proc_pset,
0283                      SignallingProductRegistryFiller& preg,
0284                      PreallocationConfiguration const* prealloc,
0285                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0286                      int bitpos,
0287                      std::string const& name,
0288                      std::vector<std::string> const& endPathNames,
0289                      ConditionalTaskHelper const& conditionalTaskHelper,
0290                      std::unordered_set<std::string>& allConditionalModules);
0291 
0292     void addToAllWorkers(Worker* w);
0293 
0294     void resetEarlyDelete();
0295 
0296     TrigResConstPtr results() const { return get_underlying_safe(results_); }
0297     TrigResPtr& results() { return get_underlying_safe(results_); }
0298 
0299     void makePathStatusInserters(
0300         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0301         std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0302         ExceptionToActionTable const& actions);
0303 
0304     template <typename T>
0305     void preScheduleSignal(StreamContext const*) const;
0306 
0307     template <typename T>
0308     void postScheduleSignal(StreamContext const*, std::exception_ptr&) const noexcept;
0309 
0310     void handleException(StreamContext const&, bool cleaningUpAfterException, std::exception_ptr&) const noexcept;
0311 
0312     std::vector<unsigned int> moduleBeginStreamFailed_;
0313     WorkerManager workerManagerRuns_;
0314     WorkerManager workerManagerLumisAndEvents_;
0315     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0316 
0317     edm::propagate_const<TrigResPtr> results_;
0318 
0319     edm::propagate_const<Worker*> results_inserter_;
0320     std::vector<edm::propagate_const<Worker*>> pathStatusInserterWorkers_;
0321     std::vector<edm::propagate_const<Worker*>> endPathStatusInserterWorkers_;
0322 
0323     TrigPaths trig_paths_;
0324     TrigPaths end_paths_;
0325     std::vector<int> empty_trig_paths_;
0326     std::vector<int> empty_end_paths_;
0327 
0328     //For each branch that has been marked for early deletion
0329     // keep track of how many modules are left that read this data but have
0330     // not yet been run in this event
0331     std::vector<BranchToCount> earlyDeleteBranchToCount_;
0332     //NOTE the following is effectively internal data for each EarlyDeleteHelper
0333     // but putting it into one vector makes for better allocation as well as
0334     // faster iteration when used to reset the earlyDeleteBranchToCount_
0335     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
0336     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
0337     // tell which EarlyDeleteHelper is associated with which BranchIDs.
0338     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0339     //There is one EarlyDeleteHelper per Module which are reading data that
0340     // has been marked for early deletion
0341     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0342 
0343     int total_events_;
0344     int total_passed_;
0345     unsigned int number_of_unscheduled_modules_;
0346 
0347     StreamID streamID_;
0348     StreamContext streamContext_;
0349   };
0350 
0351   void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0352     Service<JobReport> reportSvc;
0353     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0354   }
0355 
0356   template <typename T>
0357   void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0358                                              typename T::TransitionInfoType& transitionInfo,
0359                                              ServiceToken const& token,
0360                                              bool cleaningUpAfterException) {
0361     auto group = iHolder.group();
0362     auto const& principal = transitionInfo.principal();
0363     T::setStreamContext(streamContext_, principal);
0364 
0365     ServiceWeakToken weakToken = token;
0366     auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
0367                                           std::exception_ptr const* iPtr) mutable {
0368       std::exception_ptr excpt;
0369       {
0370         ServiceRegistry::Operate op(weakToken.lock());
0371 
0372         if (iPtr) {
0373           excpt = *iPtr;
0374           handleException(streamContext_, cleaningUpAfterException, excpt);
0375         }
0376         postScheduleSignal<T>(&streamContext_, excpt);
0377       }  // release service token before calling doneWaiting
0378       iHolder.doneWaiting(excpt);
0379     });
0380 
0381     auto task =
0382         make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
0383           auto token = weakToken.lock();
0384           ServiceRegistry::Operate op(token);
0385           // Caught exception is propagated via WaitingTaskHolder
0386           WorkerManager* workerManager = &workerManagerRuns_;
0387           if (T::branchType_ == InLumi) {
0388             workerManager = &workerManagerLumisAndEvents_;
0389           }
0390           CMS_SA_ALLOW try {
0391             preScheduleSignal<T>(&streamContext_);
0392             workerManager->resetAll();
0393           } catch (...) {
0394             // Just remember the exception at this point,
0395             // let the destructor of h call doneWaiting() so the
0396             // ServiceRegistry::Operator object is destroyed first
0397             h.presetTaskAsFailed(std::current_exception());
0398             return;
0399           }
0400 
0401           workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0402         });
0403 
0404     if (streamID_.value() == 0) {
0405       //Enqueueing will start another thread if there is only
0406       // one thread in the job. Having stream == 0 use spawn
0407       // avoids starting up another thread when there is only one stream.
0408       group->run([task]() {
0409         TaskSentry s{task};
0410         task->execute();
0411       });
0412     } else {
0413       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0414       arena.enqueue([task]() {
0415         TaskSentry s{task};
0416         task->execute();
0417       });
0418     }
0419   }
0420 
0421   template <typename T>
0422   void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
0423     try {
0424       convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
0425     } catch (cms::Exception& ex) {
0426       std::ostringstream ost;
0427       ex.addContext("Handling pre signal, likely in a service function");
0428       exceptionContext(ost, *streamContext);
0429       ex.addContext(ost.str());
0430       throw;
0431     }
0432   }
0433 
0434   template <typename T>
0435   void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
0436                                           std::exception_ptr& excpt) const noexcept {
0437     try {
0438       convertException::wrap([this, streamContext]() { T::postScheduleSignal(actReg_.get(), streamContext); });
0439     } catch (cms::Exception& ex) {
0440       if (not excpt) {
0441         std::ostringstream ost;
0442         ex.addContext("Handling post signal, likely in a service function");
0443         exceptionContext(ost, *streamContext);
0444         ex.addContext(ost.str());
0445         excpt = std::current_exception();
0446       }
0447     }
0448   }
0449 }  // namespace edm
0450 
0451 #endif