Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-29 22:58:02

0001 #ifndef FWCore_Framework_Schedule_h
0002 #define FWCore_Framework_Schedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The Schedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The Schedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0061 #include "DataFormats/Provenance/interface/BranchIDList.h"
0062 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0063 #include "FWCore/Framework/interface/ExceptionActions.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/WorkerManager.h"
0068 #include "FWCore/Framework/interface/maker/Worker.h"
0069 #include "FWCore/Framework/interface/GlobalSchedule.h"
0070 #include "FWCore/Framework/interface/StreamSchedule.h"
0071 #include "FWCore/Framework/interface/SystemTimeKeeper.h"
0072 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0074 #include "FWCore/MessageLogger/interface/JobReport.h"
0075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0076 #include "FWCore/ServiceRegistry/interface/Service.h"
0077 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0078 #include "FWCore/Utilities/interface/Algorithms.h"
0079 #include "FWCore/Utilities/interface/BranchType.h"
0080 #include "FWCore/Utilities/interface/Exception.h"
0081 #include "FWCore/Utilities/interface/StreamID.h"
0082 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0083 #include "FWCore/Utilities/interface/propagate_const.h"
0084 #include "FWCore/Utilities/interface/Transition.h"
0085 
0086 #include <array>
0087 #include <map>
0088 #include <memory>
0089 #include <mutex>
0090 #include <set>
0091 #include <string>
0092 #include <vector>
0093 #include <sstream>
0094 #include <utility>
0095 
0096 namespace edm {
0097 
0098   namespace service {
0099     class TriggerNamesService;
0100   }
0101   namespace eventsetup {
0102     struct ComponentDescription;
0103     class ESRecordsToProductResolverIndices;
0104   }  // namespace eventsetup
0105 
0106   class BranchIDListHelper;
0107   class EventTransitionInfo;
0108   class ExceptionCollector;
0109   class MergeableRunProductMetadata;
0110   class OutputModuleCommunicator;
0111   class SignallingProductRegistryFiller;
0112   class PreallocationConfiguration;
0113   class StreamSchedule;
0114   class GlobalSchedule;
0115   struct TriggerTimingReport;
0116   class ModuleRegistry;
0117   class ModuleTypeResolverMaker;
0118   class ThinnedAssociationsHelper;
0119   class TriggerResultInserter;
0120   class PathStatusInserter;
0121   class EndPathStatusInserter;
0122   class WaitingTaskHolder;
0123 
0124   class Schedule {
0125   public:
0126     typedef std::vector<std::string> vstring;
0127     typedef std::vector<Worker*> AllWorkers;
0128     typedef std::vector<edm::propagate_const<std::shared_ptr<OutputModuleCommunicator>>> AllOutputModuleCommunicators;
0129 
0130     typedef std::vector<Worker*> Workers;
0131 
0132     Schedule(ParameterSet& proc_pset,
0133              service::TriggerNamesService const& tns,
0134              SignallingProductRegistryFiller& pregistry,
0135              ExceptionToActionTable const& actions,
0136              std::shared_ptr<ActivityRegistry> areg,
0137              std::shared_ptr<ProcessConfiguration const> processConfiguration,
0138              PreallocationConfiguration const& config,
0139              ProcessContext const* processContext,
0140              ModuleTypeResolverMaker const* resolverMaker);
0141     void finishSetup(ParameterSet& proc_pset,
0142                      service::TriggerNamesService const& tns,
0143                      SignallingProductRegistryFiller& preg,
0144                      BranchIDListHelper& branchIDListHelper,
0145                      ProcessBlockHelperBase& processBlockHelper,
0146                      ThinnedAssociationsHelper& thinnedAssociationsHelper,
0147                      std::shared_ptr<ActivityRegistry> areg,
0148                      std::shared_ptr<ProcessConfiguration> processConfiguration,
0149                      PreallocationConfiguration const& prealloc,
0150                      ProcessContext const* processContext);
0151 
0152     void processOneEventAsync(WaitingTaskHolder iTask,
0153                               unsigned int iStreamID,
0154                               EventTransitionInfo&,
0155                               ServiceToken const& token);
0156 
0157     template <typename T>
0158     void processOneGlobalAsync(WaitingTaskHolder iTask,
0159                                typename T::TransitionInfoType& transitionInfo,
0160                                ServiceToken const& token,
0161                                bool cleaningUpAfterException = false);
0162 
0163     template <typename T>
0164     void processOneStreamAsync(WaitingTaskHolder iTask,
0165                                unsigned int iStreamID,
0166                                typename T::TransitionInfoType& transitionInfo,
0167                                ServiceToken const& token,
0168                                bool cleaningUpAfterException = false);
0169 
0170     void beginJob(ProductRegistry const&,
0171                   eventsetup::ESRecordsToProductResolverIndices const&,
0172                   ProcessBlockHelperBase const&,
0173                   std::string const& processName);
0174     void endJob(ExceptionCollector& collector);
0175     void sendFwkSummaryToMessageLogger() const;
0176 
0177     void beginStream(unsigned int streamID);
0178     void endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
0179 
0180     // Write the luminosity block
0181     void writeLumiAsync(WaitingTaskHolder iTask,
0182                         LuminosityBlockPrincipal const& lbp,
0183                         ProcessContext const*,
0184                         ActivityRegistry*);
0185 
0186     // Write the run
0187     void writeRunAsync(WaitingTaskHolder iTask,
0188                        RunPrincipal const& rp,
0189                        ProcessContext const*,
0190                        ActivityRegistry*,
0191                        MergeableRunProductMetadata const*);
0192 
0193     void writeProcessBlockAsync(WaitingTaskHolder iTask,
0194                                 ProcessBlockPrincipal const&,
0195                                 ProcessContext const*,
0196                                 ActivityRegistry*);
0197 
0198     // Call closeFile() on all OutputModules.
0199     void closeOutputFiles();
0200 
0201     // Call openFiles() on all OutputModules
0202     void openOutputFiles(FileBlock& fb);
0203 
0204     // Call respondToOpenInputFile() on all Modules
0205     void respondToOpenInputFile(FileBlock const& fb);
0206 
0207     // Call respondToCloseInputFile() on all Modules
0208     void respondToCloseInputFile(FileBlock const& fb);
0209 
0210     // Call shouldWeCloseFile() on all OutputModules.
0211     bool shouldWeCloseOutput() const;
0212 
0213     /// Return a vector allowing const access to all the
0214     /// ModuleDescriptions for this Schedule.
0215 
0216     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0217     /// *** passed to the caller. Do not call delete on these
0218     /// *** pointers!
0219     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0220 
0221     ///adds to oLabelsToFill the labels for all paths in the process
0222     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0223 
0224     ///Adds to oLabelsToFill the labels for all trigger paths in the process.
0225     ///This is different from availablePaths because it includes the
0226     ///empty paths to match the entries in TriggerResults exactly.
0227     void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
0228 
0229     ///adds to oLabelsToFill the labels for all end paths in the process
0230     void endPaths(std::vector<std::string>& oLabelsToFill) const;
0231 
0232     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0233     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0234 
0235     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iPathLabel
0236     ///hint is a performance optimization if you might know the position of the module in the path
0237     void moduleDescriptionsInPath(std::string const& iPathLabel,
0238                                   std::vector<ModuleDescription const*>& descriptions,
0239                                   unsigned int hint) const;
0240 
0241     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iEndPathLabel
0242     ///hint is a performance optimization if you might know the position of the module in the path
0243     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0244                                      std::vector<ModuleDescription const*>& descriptions,
0245                                      unsigned int hint) const;
0246 
0247     /// Return the number of events this Schedule has tried to process
0248     /// (inclues both successes and failures, including failures due
0249     /// to exceptions during processing).
0250     int totalEvents() const;
0251 
0252     /// Return the number of events which have been passed by one or
0253     /// more trigger paths.
0254     int totalEventsPassed() const;
0255 
0256     /// Return the number of events that have not passed any trigger.
0257     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0258     int totalEventsFailed() const;
0259 
0260     /// Return the trigger report information on paths,
0261     /// modules-in-path, modules-in-endpath, and modules.
0262     void getTriggerReport(TriggerReport& rep) const;
0263 
0264     /// Return the trigger timing report information on paths,
0265     /// modules-in-path, modules-in-endpath, and modules.
0266     void getTriggerTimingReport(TriggerTimingReport& rep) const;
0267 
0268     /// Return whether each output module has reached its maximum count.
0269     bool terminate() const;
0270 
0271     ///  Clear all the counters in the trigger report.
0272     void clearCounters();
0273 
0274     /// clone the type of module with label iLabel but configure with iPSet.
0275     /// Returns true if successful.
0276     bool changeModule(std::string const& iLabel,
0277                       ParameterSet const& iPSet,
0278                       const SignallingProductRegistryFiller& iRegistry,
0279                       eventsetup::ESRecordsToProductResolverIndices const&);
0280 
0281     /// Deletes module with label iLabel
0282     void deleteModule(std::string const& iLabel, ActivityRegistry* areg);
0283 
0284     void initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
0285                                std::multimap<std::string, std::string> const& referencesToBranches,
0286                                std::vector<std::string> const& modulesToSkip,
0287                                edm::ProductRegistry const& preg);
0288 
0289     /// returns the collection of pointers to workers
0290     AllWorkers const& allWorkers() const;
0291 
0292     /// Convert "@currentProcess" in InputTag process names to the actual current process name.
0293     void convertCurrentProcessAlias(std::string const& processName);
0294 
0295     void releaseMemoryPostLookupSignal();
0296 
0297   private:
0298     void limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists);
0299 
0300     std::shared_ptr<TriggerResultInserter const> resultsInserter() const {
0301       return get_underlying_safe(resultsInserter_);
0302     }
0303     std::shared_ptr<TriggerResultInserter>& resultsInserter() { return get_underlying_safe(resultsInserter_); }
0304     std::shared_ptr<ModuleRegistry const> moduleRegistry() const { return get_underlying_safe(moduleRegistry_); }
0305     std::shared_ptr<ModuleRegistry>& moduleRegistry() { return get_underlying_safe(moduleRegistry_); }
0306 
0307     edm::propagate_const<std::shared_ptr<ModuleRegistry>> moduleRegistry_;
0308     edm::propagate_const<std::shared_ptr<TriggerResultInserter>> resultsInserter_;
0309     std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>> pathStatusInserters_;
0310     std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>> endPathStatusInserters_;
0311     std::vector<edm::propagate_const<std::shared_ptr<StreamSchedule>>> streamSchedules_;
0312     //In the future, we will have one GlobalSchedule per simultaneous transition
0313     edm::propagate_const<std::unique_ptr<GlobalSchedule>> globalSchedule_;
0314 
0315     AllOutputModuleCommunicators all_output_communicators_;
0316     PreallocationConfiguration preallocConfig_;
0317 
0318     edm::propagate_const<std::unique_ptr<SystemTimeKeeper>> summaryTimeKeeper_;
0319 
0320     std::vector<std::string> const* pathNames_;
0321     std::vector<std::string> const* endPathNames_;
0322     bool wantSummary_;
0323   };
0324 
0325   template <typename T>
0326   void Schedule::processOneStreamAsync(WaitingTaskHolder iTaskHolder,
0327                                        unsigned int iStreamID,
0328                                        typename T::TransitionInfoType& transitionInfo,
0329                                        ServiceToken const& token,
0330                                        bool cleaningUpAfterException) {
0331     assert(iStreamID < streamSchedules_.size());
0332     streamSchedules_[iStreamID]->processOneStreamAsync<T>(
0333         std::move(iTaskHolder), transitionInfo, token, cleaningUpAfterException);
0334   }
0335 
0336   template <typename T>
0337   void Schedule::processOneGlobalAsync(WaitingTaskHolder iTaskHolder,
0338                                        typename T::TransitionInfoType& transitionInfo,
0339                                        ServiceToken const& token,
0340                                        bool cleaningUpAfterException) {
0341     globalSchedule_->processOneGlobalAsync<T>(iTaskHolder, transitionInfo, token, cleaningUpAfterException);
0342   }
0343 
0344 }  // namespace edm
0345 #endif