Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-13 03:23:42

0001 #ifndef FWCore_Framework_StreamSchedule_h
0002 #define FWCore_Framework_StreamSchedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The StreamSchedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The StreamSchedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Common/interface/HLTGlobalStatus.h"
0061 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/EventPrincipal.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/UnscheduledCallProducer.h"
0068 #include "FWCore/Framework/interface/WorkerManager.h"
0069 #include "FWCore/Framework/interface/Path.h"
0070 #include "FWCore/Framework/interface/TransitionInfoTypes.h"
0071 #include "FWCore/Framework/interface/maker/Worker.h"
0072 #include "FWCore/Framework/interface/WorkerRegistry.h"
0073 #include "FWCore/Framework/interface/EarlyDeleteHelper.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0079 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0080 #include "FWCore/Concurrency/interface/FunctorTask.h"
0081 #include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
0082 #include "FWCore/Utilities/interface/Algorithms.h"
0083 #include "FWCore/Utilities/interface/BranchType.h"
0084 #include "FWCore/Utilities/interface/ConvertException.h"
0085 #include "FWCore/Utilities/interface/Exception.h"
0086 #include "FWCore/Utilities/interface/StreamID.h"
0087 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0088 #include "FWCore/Utilities/interface/propagate_const.h"
0089 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0090 
0091 #include <map>
0092 #include <memory>
0093 #include <set>
0094 #include <string>
0095 #include <vector>
0096 #include <sstream>
0097 #include <atomic>
0098 #include <unordered_set>
0099 #include <utility>
0100 
0101 namespace edm {
0102 
0103   class BranchIDListHelper;
0104   class ExceptionCollector;
0105   class ExceptionToActionTable;
0106   class OutputModuleCommunicator;
0107   class UnscheduledCallProducer;
0108   class WorkerInPath;
0109   class ModuleRegistry;
0110   class TriggerResultInserter;
0111   class PathStatusInserter;
0112   class EndPathStatusInserter;
0113   class PreallocationConfiguration;
0114   class WaitingTaskHolder;
0115 
0116   class ConditionalTaskHelper;
0117 
0118   namespace service {
0119     class TriggerNamesService;
0120   }
0121 
0122   class StreamSchedule {
0123   public:
0124     typedef std::vector<std::string> vstring;
0125     typedef std::vector<Path> TrigPaths;
0126     typedef std::vector<Path> NonTrigPaths;
0127     typedef std::shared_ptr<HLTGlobalStatus> TrigResPtr;
0128     typedef std::shared_ptr<HLTGlobalStatus const> TrigResConstPtr;
0129     typedef std::shared_ptr<Worker> WorkerPtr;
0130     typedef std::vector<Worker*> AllWorkers;
0131 
0132     typedef std::vector<Worker*> Workers;
0133 
0134     typedef std::vector<WorkerInPath> PathWorkers;
0135 
0136     StreamSchedule(std::shared_ptr<TriggerResultInserter> inserter,
0137                    std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0138                    std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0139                    std::shared_ptr<ModuleRegistry>,
0140                    ParameterSet& proc_pset,
0141                    service::TriggerNamesService const& tns,
0142                    PreallocationConfiguration const& prealloc,
0143                    ProductRegistry& pregistry,
0144                    ExceptionToActionTable const& actions,
0145                    std::shared_ptr<ActivityRegistry> areg,
0146                    std::shared_ptr<ProcessConfiguration const> processConfiguration,
0147                    StreamID streamID,
0148                    ProcessContext const* processContext);
0149 
0150     StreamSchedule(StreamSchedule const&) = delete;
0151 
0152     void processOneEventAsync(
0153         WaitingTaskHolder iTask,
0154         EventTransitionInfo&,
0155         ServiceToken const& token,
0156         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters);
0157 
0158     template <typename T>
0159     void processOneStreamAsync(WaitingTaskHolder iTask,
0160                                typename T::TransitionInfoType& transitionInfo,
0161                                ServiceToken const& token,
0162                                bool cleaningUpAfterException = false);
0163 
0164     void beginStream();
0165     void endStream();
0166 
0167     StreamID streamID() const { return streamID_; }
0168 
0169     /// Return a vector allowing const access to all the
0170     /// ModuleDescriptions for this StreamSchedule.
0171 
0172     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0173     /// *** passed to the caller. Do not call delete on these
0174     /// *** pointers!
0175     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0176 
0177     ///adds to oLabelsToFill the labels for all paths in the process
0178     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0179 
0180     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0181     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0182 
0183     void moduleDescriptionsInPath(std::string const& iPathLabel,
0184                                   std::vector<ModuleDescription const*>& descriptions,
0185                                   unsigned int hint) const;
0186 
0187     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0188                                      std::vector<ModuleDescription const*>& descriptions,
0189                                      unsigned int hint) const;
0190 
0191     /// Return the number of events this StreamSchedule has tried to process
0192     /// (inclues both successes and failures, including failures due
0193     /// to exceptions during processing).
0194     int totalEvents() const { return total_events_; }
0195 
0196     /// Return the number of events which have been passed by one or
0197     /// more trigger paths.
0198     int totalEventsPassed() const { return total_passed_; }
0199 
0200     /// Return the number of events that have not passed any trigger.
0201     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0202     int totalEventsFailed() const { return totalEvents() - totalEventsPassed(); }
0203 
0204     /// Return the trigger report information on paths,
0205     /// modules-in-path, modules-in-endpath, and modules.
0206     void getTriggerReport(TriggerReport& rep) const;
0207 
0208     ///  Clear all the counters in the trigger report.
0209     void clearCounters();
0210 
0211     /// clone the type of module with label iLabel but configure with iPSet.
0212     void replaceModule(maker::ModuleHolder* iMod, std::string const& iLabel);
0213 
0214     /// Delete the module with label iLabel
0215     void deleteModule(std::string const& iLabel);
0216 
0217     void initializeEarlyDelete(ModuleRegistry& modReg,
0218                                std::vector<std::string> const& branchesToDeleteEarly,
0219                                std::multimap<std::string, std::string> const& referencesToBranches,
0220                                std::vector<std::string> const& modulesToSkip,
0221                                edm::ProductRegistry const& preg);
0222 
0223     /// returns the collection of pointers to workers
0224     AllWorkers const& allWorkersBeginEnd() const { return workerManagerBeginEnd_.allWorkers(); }
0225     AllWorkers const& allWorkersRuns() const { return workerManagerRuns_.allWorkers(); }
0226     AllWorkers const& allWorkersLumisAndEvents() const { return workerManagerLumisAndEvents_.allWorkers(); }
0227 
0228     AllWorkers const& unscheduledWorkersLumisAndEvents() const {
0229       return workerManagerLumisAndEvents_.unscheduledWorkers();
0230     }
0231     unsigned int numberOfUnscheduledModules() const { return number_of_unscheduled_modules_; }
0232 
0233     StreamContext const& context() const { return streamContext_; }
0234 
0235     struct AliasInfo {
0236       std::string friendlyClassName;
0237       std::string instanceLabel;
0238       std::string originalInstanceLabel;
0239       std::string originalModuleLabel;
0240     };
0241 
0242   private:
0243     /// returns the action table
0244     ExceptionToActionTable const& actionTable() const { return workerManagerLumisAndEvents_.actionTable(); }
0245 
0246     void resetAll();
0247 
0248     void finishedPaths(std::atomic<std::exception_ptr*>&, WaitingTaskHolder, EventTransitionInfo&);
0249 
0250     std::exception_ptr finishProcessOneEvent(std::exception_ptr);
0251 
0252     void reportSkipped(EventPrincipal const& ep) const;
0253 
0254     std::vector<Worker*> tryToPlaceConditionalModules(
0255         Worker*,
0256         std::unordered_set<std::string>& conditionalModules,
0257         std::unordered_multimap<std::string, edm::BranchDescription const*> const& conditionalModuleBranches,
0258         std::unordered_multimap<std::string, AliasInfo> const& aliasMap,
0259         ParameterSet& proc_pset,
0260         ProductRegistry& preg,
0261         PreallocationConfiguration const* prealloc,
0262         std::shared_ptr<ProcessConfiguration const> processConfiguration);
0263     void fillWorkers(ParameterSet& proc_pset,
0264                      ProductRegistry& preg,
0265                      PreallocationConfiguration const* prealloc,
0266                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0267                      std::string const& name,
0268                      bool ignoreFilters,
0269                      PathWorkers& out,
0270                      std::vector<std::string> const& endPathNames,
0271                      ConditionalTaskHelper const& conditionalTaskHelper,
0272                      std::unordered_set<std::string>& allConditionalModules);
0273     void fillTrigPath(ParameterSet& proc_pset,
0274                       ProductRegistry& preg,
0275                       PreallocationConfiguration const* prealloc,
0276                       std::shared_ptr<ProcessConfiguration const> processConfiguration,
0277                       int bitpos,
0278                       std::string const& name,
0279                       TrigResPtr,
0280                       std::vector<std::string> const& endPathNames,
0281                       ConditionalTaskHelper const& conditionalTaskHelper,
0282                       std::unordered_set<std::string>& allConditionalModules);
0283     void fillEndPath(ParameterSet& proc_pset,
0284                      ProductRegistry& preg,
0285                      PreallocationConfiguration const* prealloc,
0286                      std::shared_ptr<ProcessConfiguration const> processConfiguration,
0287                      int bitpos,
0288                      std::string const& name,
0289                      std::vector<std::string> const& endPathNames,
0290                      ConditionalTaskHelper const& conditionalTaskHelper,
0291                      std::unordered_set<std::string>& allConditionalModules);
0292 
0293     void addToAllWorkers(Worker* w);
0294 
0295     void resetEarlyDelete();
0296 
0297     TrigResConstPtr results() const { return get_underlying_safe(results_); }
0298     TrigResPtr& results() { return get_underlying_safe(results_); }
0299 
0300     void makePathStatusInserters(
0301         std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters,
0302         std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>>& endPathStatusInserters,
0303         ExceptionToActionTable const& actions);
0304 
0305     template <typename T>
0306     void preScheduleSignal(StreamContext const*) const;
0307 
0308     template <typename T>
0309     void postScheduleSignal(StreamContext const*, ServiceWeakToken const&, std::exception_ptr&) const noexcept;
0310 
0311     void handleException(StreamContext const&,
0312                          ServiceWeakToken const&,
0313                          bool cleaningUpAfterException,
0314                          std::exception_ptr&) const noexcept;
0315 
0316     WorkerManager workerManagerBeginEnd_;
0317     WorkerManager workerManagerRuns_;
0318     WorkerManager workerManagerLumisAndEvents_;
0319     std::shared_ptr<ActivityRegistry> actReg_;  // We do not use propagate_const because the registry itself is mutable.
0320 
0321     edm::propagate_const<TrigResPtr> results_;
0322 
0323     edm::propagate_const<WorkerPtr> results_inserter_;
0324     std::vector<edm::propagate_const<WorkerPtr>> pathStatusInserterWorkers_;
0325     std::vector<edm::propagate_const<WorkerPtr>> endPathStatusInserterWorkers_;
0326 
0327     TrigPaths trig_paths_;
0328     TrigPaths end_paths_;
0329     std::vector<int> empty_trig_paths_;
0330     std::vector<int> empty_end_paths_;
0331 
0332     //For each branch that has been marked for early deletion
0333     // keep track of how many modules are left that read this data but have
0334     // not yet been run in this event
0335     std::vector<BranchToCount> earlyDeleteBranchToCount_;
0336     //NOTE the following is effectively internal data for each EarlyDeleteHelper
0337     // but putting it into one vector makes for better allocation as well as
0338     // faster iteration when used to reset the earlyDeleteBranchToCount_
0339     // Each EarlyDeleteHelper hold a begin and end range into this vector. The values
0340     // of this vector correspond to indexes into earlyDeleteBranchToCount_ so
0341     // tell which EarlyDeleteHelper is associated with which BranchIDs.
0342     std::vector<unsigned int> earlyDeleteHelperToBranchIndicies_;
0343     //There is one EarlyDeleteHelper per Module which are reading data that
0344     // has been marked for early deletion
0345     std::vector<EarlyDeleteHelper> earlyDeleteHelpers_;
0346 
0347     int total_events_;
0348     int total_passed_;
0349     unsigned int number_of_unscheduled_modules_;
0350 
0351     StreamID streamID_;
0352     StreamContext streamContext_;
0353   };
0354 
0355   void inline StreamSchedule::reportSkipped(EventPrincipal const& ep) const {
0356     Service<JobReport> reportSvc;
0357     reportSvc->reportSkippedEvent(ep.id().run(), ep.id().event());
0358   }
0359 
0360   template <typename T>
0361   void StreamSchedule::processOneStreamAsync(WaitingTaskHolder iHolder,
0362                                              typename T::TransitionInfoType& transitionInfo,
0363                                              ServiceToken const& token,
0364                                              bool cleaningUpAfterException) {
0365     auto group = iHolder.group();
0366     auto const& principal = transitionInfo.principal();
0367     T::setStreamContext(streamContext_, principal);
0368 
0369     ServiceWeakToken weakToken = token;
0370     auto doneTask = make_waiting_task([this, iHolder = std::move(iHolder), cleaningUpAfterException, weakToken](
0371                                           std::exception_ptr const* iPtr) mutable {
0372       std::exception_ptr excpt;
0373       if (iPtr) {
0374         excpt = *iPtr;
0375         handleException(streamContext_, weakToken, cleaningUpAfterException, excpt);
0376       }
0377       postScheduleSignal<T>(&streamContext_, weakToken, excpt);
0378       iHolder.doneWaiting(excpt);
0379     });
0380 
0381     auto task =
0382         make_functor_task([this, h = WaitingTaskHolder(*group, doneTask), info = transitionInfo, weakToken]() mutable {
0383           auto token = weakToken.lock();
0384           ServiceRegistry::Operate op(token);
0385           // Caught exception is propagated via WaitingTaskHolder
0386           WorkerManager* workerManager = &workerManagerRuns_;
0387           if (T::branchType_ == InLumi) {
0388             workerManager = &workerManagerLumisAndEvents_;
0389           }
0390           CMS_SA_ALLOW try {
0391             preScheduleSignal<T>(&streamContext_);
0392             workerManager->resetAll();
0393           } catch (...) {
0394             h.doneWaiting(std::current_exception());
0395             return;
0396           }
0397 
0398           workerManager->processOneOccurrenceAsync<T>(h, info, token, streamID_, &streamContext_, &streamContext_);
0399         });
0400 
0401     if (streamID_.value() == 0) {
0402       //Enqueueing will start another thread if there is only
0403       // one thread in the job. Having stream == 0 use spawn
0404       // avoids starting up another thread when there is only one stream.
0405       group->run([task]() {
0406         TaskSentry s{task};
0407         task->execute();
0408       });
0409     } else {
0410       oneapi::tbb::task_arena arena{oneapi::tbb::task_arena::attach()};
0411       arena.enqueue([task]() {
0412         TaskSentry s{task};
0413         task->execute();
0414       });
0415     }
0416   }
0417 
0418   template <typename T>
0419   void StreamSchedule::preScheduleSignal(StreamContext const* streamContext) const {
0420     try {
0421       convertException::wrap([this, streamContext]() { T::preScheduleSignal(actReg_.get(), streamContext); });
0422     } catch (cms::Exception& ex) {
0423       std::ostringstream ost;
0424       ex.addContext("Handling pre signal, likely in a service function");
0425       exceptionContext(ost, *streamContext);
0426       ex.addContext(ost.str());
0427       throw;
0428     }
0429   }
0430 
0431   template <typename T>
0432   void StreamSchedule::postScheduleSignal(StreamContext const* streamContext,
0433                                           ServiceWeakToken const& weakToken,
0434                                           std::exception_ptr& excpt) const noexcept {
0435     try {
0436       convertException::wrap([this, &weakToken, streamContext]() {
0437         ServiceRegistry::Operate op(weakToken.lock());
0438         T::postScheduleSignal(actReg_.get(), streamContext);
0439       });
0440     } catch (cms::Exception& ex) {
0441       if (not excpt) {
0442         std::ostringstream ost;
0443         ex.addContext("Handling post signal, likely in a service function");
0444         exceptionContext(ost, *streamContext);
0445         ex.addContext(ost.str());
0446         excpt = std::current_exception();
0447       }
0448     }
0449   }
0450 }  // namespace edm
0451 
0452 #endif