Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-04-30 22:24:06

0001 #ifndef FWCore_Framework_Schedule_h
0002 #define FWCore_Framework_Schedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The Schedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The Schedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0061 #include "DataFormats/Provenance/interface/BranchIDList.h"
0062 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0063 #include "FWCore/Framework/interface/ExceptionActions.h"
0064 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0065 #include "FWCore/Framework/interface/Frameworkfwd.h"
0066 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0067 #include "FWCore/Framework/interface/WorkerManager.h"
0068 #include "FWCore/Framework/interface/maker/Worker.h"
0069 #include "FWCore/Framework/interface/WorkerRegistry.h"
0070 #include "FWCore/Framework/interface/GlobalSchedule.h"
0071 #include "FWCore/Framework/interface/StreamSchedule.h"
0072 #include "FWCore/Framework/interface/SystemTimeKeeper.h"
0073 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0074 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0075 #include "FWCore/MessageLogger/interface/JobReport.h"
0076 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0077 #include "FWCore/ServiceRegistry/interface/Service.h"
0078 #include "FWCore/ServiceRegistry/interface/ServiceRegistryfwd.h"
0079 #include "FWCore/Utilities/interface/Algorithms.h"
0080 #include "FWCore/Utilities/interface/BranchType.h"
0081 #include "FWCore/Utilities/interface/Exception.h"
0082 #include "FWCore/Utilities/interface/StreamID.h"
0083 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0084 #include "FWCore/Utilities/interface/propagate_const.h"
0085 #include "FWCore/Utilities/interface/Transition.h"
0086 
0087 #include <array>
0088 #include <map>
0089 #include <memory>
0090 #include <mutex>
0091 #include <set>
0092 #include <string>
0093 #include <vector>
0094 #include <sstream>
0095 #include <utility>
0096 
0097 namespace edm {
0098 
0099   namespace service {
0100     class TriggerNamesService;
0101   }
0102   namespace eventsetup {
0103     struct ComponentDescription;
0104     class ESRecordsToProductResolverIndices;
0105   }  // namespace eventsetup
0106 
0107   class BranchIDListHelper;
0108   class EventTransitionInfo;
0109   class ExceptionCollector;
0110   class MergeableRunProductMetadata;
0111   class OutputModuleCommunicator;
0112   class SignallingProductRegistryFiller;
0113   class PreallocationConfiguration;
0114   class StreamSchedule;
0115   class GlobalSchedule;
0116   struct TriggerTimingReport;
0117   class ModuleRegistry;
0118   class ModuleTypeResolverMaker;
0119   class ThinnedAssociationsHelper;
0120   class TriggerResultInserter;
0121   class PathStatusInserter;
0122   class EndPathStatusInserter;
0123   class WaitingTaskHolder;
0124 
0125   class Schedule {
0126   public:
0127     typedef std::vector<std::string> vstring;
0128     typedef std::vector<Worker*> AllWorkers;
0129     typedef std::vector<edm::propagate_const<std::shared_ptr<OutputModuleCommunicator>>> AllOutputModuleCommunicators;
0130 
0131     typedef std::vector<Worker*> Workers;
0132 
0133     Schedule(ParameterSet& proc_pset,
0134              service::TriggerNamesService const& tns,
0135              SignallingProductRegistryFiller& pregistry,
0136              ExceptionToActionTable const& actions,
0137              std::shared_ptr<ActivityRegistry> areg,
0138              std::shared_ptr<ProcessConfiguration const> processConfiguration,
0139              PreallocationConfiguration const& config,
0140              ProcessContext const* processContext,
0141              ModuleTypeResolverMaker const* resolverMaker);
0142     void finishSetup(ParameterSet& proc_pset,
0143                      service::TriggerNamesService const& tns,
0144                      SignallingProductRegistryFiller& preg,
0145                      BranchIDListHelper& branchIDListHelper,
0146                      ProcessBlockHelperBase& processBlockHelper,
0147                      ThinnedAssociationsHelper& thinnedAssociationsHelper,
0148                      std::shared_ptr<ActivityRegistry> areg,
0149                      std::shared_ptr<ProcessConfiguration> processConfiguration,
0150                      PreallocationConfiguration const& prealloc,
0151                      ProcessContext const* processContext);
0152 
0153     void processOneEventAsync(WaitingTaskHolder iTask,
0154                               unsigned int iStreamID,
0155                               EventTransitionInfo&,
0156                               ServiceToken const& token);
0157 
0158     template <typename T>
0159     void processOneGlobalAsync(WaitingTaskHolder iTask,
0160                                typename T::TransitionInfoType& transitionInfo,
0161                                ServiceToken const& token,
0162                                bool cleaningUpAfterException = false);
0163 
0164     template <typename T>
0165     void processOneStreamAsync(WaitingTaskHolder iTask,
0166                                unsigned int iStreamID,
0167                                typename T::TransitionInfoType& transitionInfo,
0168                                ServiceToken const& token,
0169                                bool cleaningUpAfterException = false);
0170 
0171     void beginJob(ProductRegistry const&,
0172                   eventsetup::ESRecordsToProductResolverIndices const&,
0173                   ProcessBlockHelperBase const&,
0174                   ProcessContext const&);
0175     void endJob(ExceptionCollector& collector);
0176     void sendFwkSummaryToMessageLogger() const;
0177 
0178     void beginStream(unsigned int streamID);
0179     void endStream(unsigned int streamID, ExceptionCollector& collector, std::mutex& collectorMutex) noexcept;
0180 
0181     // Write the luminosity block
0182     void writeLumiAsync(WaitingTaskHolder iTask,
0183                         LuminosityBlockPrincipal const& lbp,
0184                         ProcessContext const*,
0185                         ActivityRegistry*);
0186 
0187     // Write the run
0188     void writeRunAsync(WaitingTaskHolder iTask,
0189                        RunPrincipal const& rp,
0190                        ProcessContext const*,
0191                        ActivityRegistry*,
0192                        MergeableRunProductMetadata const*);
0193 
0194     void writeProcessBlockAsync(WaitingTaskHolder iTask,
0195                                 ProcessBlockPrincipal const&,
0196                                 ProcessContext const*,
0197                                 ActivityRegistry*);
0198 
0199     // Call closeFile() on all OutputModules.
0200     void closeOutputFiles();
0201 
0202     // Call openFiles() on all OutputModules
0203     void openOutputFiles(FileBlock& fb);
0204 
0205     // Call respondToOpenInputFile() on all Modules
0206     void respondToOpenInputFile(FileBlock const& fb);
0207 
0208     // Call respondToCloseInputFile() on all Modules
0209     void respondToCloseInputFile(FileBlock const& fb);
0210 
0211     // Call shouldWeCloseFile() on all OutputModules.
0212     bool shouldWeCloseOutput() const;
0213 
0214     /// Return a vector allowing const access to all the
0215     /// ModuleDescriptions for this Schedule.
0216 
0217     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0218     /// *** passed to the caller. Do not call delete on these
0219     /// *** pointers!
0220     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0221 
0222     ///adds to oLabelsToFill the labels for all paths in the process
0223     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0224 
0225     ///Adds to oLabelsToFill the labels for all trigger paths in the process.
0226     ///This is different from availablePaths because it includes the
0227     ///empty paths to match the entries in TriggerResults exactly.
0228     void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
0229 
0230     ///adds to oLabelsToFill the labels for all end paths in the process
0231     void endPaths(std::vector<std::string>& oLabelsToFill) const;
0232 
0233     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0234     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0235 
0236     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iPathLabel
0237     ///hint is a performance optimization if you might know the position of the module in the path
0238     void moduleDescriptionsInPath(std::string const& iPathLabel,
0239                                   std::vector<ModuleDescription const*>& descriptions,
0240                                   unsigned int hint) const;
0241 
0242     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iEndPathLabel
0243     ///hint is a performance optimization if you might know the position of the module in the path
0244     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0245                                      std::vector<ModuleDescription const*>& descriptions,
0246                                      unsigned int hint) const;
0247 
0248     /// Return the number of events this Schedule has tried to process
0249     /// (inclues both successes and failures, including failures due
0250     /// to exceptions during processing).
0251     int totalEvents() const;
0252 
0253     /// Return the number of events which have been passed by one or
0254     /// more trigger paths.
0255     int totalEventsPassed() const;
0256 
0257     /// Return the number of events that have not passed any trigger.
0258     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0259     int totalEventsFailed() const;
0260 
0261     /// Return the trigger report information on paths,
0262     /// modules-in-path, modules-in-endpath, and modules.
0263     void getTriggerReport(TriggerReport& rep) const;
0264 
0265     /// Return the trigger timing report information on paths,
0266     /// modules-in-path, modules-in-endpath, and modules.
0267     void getTriggerTimingReport(TriggerTimingReport& rep) const;
0268 
0269     /// Return whether each output module has reached its maximum count.
0270     bool terminate() const;
0271 
0272     ///  Clear all the counters in the trigger report.
0273     void clearCounters();
0274 
0275     /// clone the type of module with label iLabel but configure with iPSet.
0276     /// Returns true if successful.
0277     bool changeModule(std::string const& iLabel,
0278                       ParameterSet const& iPSet,
0279                       const SignallingProductRegistryFiller& iRegistry,
0280                       eventsetup::ESRecordsToProductResolverIndices const&);
0281 
0282     /// Deletes module with label iLabel
0283     void deleteModule(std::string const& iLabel, ActivityRegistry* areg);
0284 
0285     void initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
0286                                std::multimap<std::string, std::string> const& referencesToBranches,
0287                                std::vector<std::string> const& modulesToSkip,
0288                                edm::ProductRegistry const& preg);
0289 
0290     /// returns the collection of pointers to workers
0291     AllWorkers const& allWorkers() const;
0292 
0293     /// Convert "@currentProcess" in InputTag process names to the actual current process name.
0294     void convertCurrentProcessAlias(std::string const& processName);
0295 
0296     void releaseMemoryPostLookupSignal();
0297 
0298   private:
0299     void limitOutput(ParameterSet const& proc_pset, BranchIDLists const& branchIDLists);
0300 
0301     std::shared_ptr<TriggerResultInserter const> resultsInserter() const {
0302       return get_underlying_safe(resultsInserter_);
0303     }
0304     std::shared_ptr<TriggerResultInserter>& resultsInserter() { return get_underlying_safe(resultsInserter_); }
0305     std::shared_ptr<ModuleRegistry const> moduleRegistry() const { return get_underlying_safe(moduleRegistry_); }
0306     std::shared_ptr<ModuleRegistry>& moduleRegistry() { return get_underlying_safe(moduleRegistry_); }
0307 
0308     edm::propagate_const<std::shared_ptr<TriggerResultInserter>> resultsInserter_;
0309     std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>> pathStatusInserters_;
0310     std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>> endPathStatusInserters_;
0311     edm::propagate_const<std::shared_ptr<ModuleRegistry>> moduleRegistry_;
0312     std::vector<edm::propagate_const<std::shared_ptr<StreamSchedule>>> streamSchedules_;
0313     //In the future, we will have one GlobalSchedule per simultaneous transition
0314     edm::propagate_const<std::unique_ptr<GlobalSchedule>> globalSchedule_;
0315 
0316     AllOutputModuleCommunicators all_output_communicators_;
0317     PreallocationConfiguration preallocConfig_;
0318 
0319     edm::propagate_const<std::unique_ptr<SystemTimeKeeper>> summaryTimeKeeper_;
0320 
0321     std::vector<std::string> const* pathNames_;
0322     std::vector<std::string> const* endPathNames_;
0323     bool wantSummary_;
0324   };
0325 
0326   template <typename T>
0327   void Schedule::processOneStreamAsync(WaitingTaskHolder iTaskHolder,
0328                                        unsigned int iStreamID,
0329                                        typename T::TransitionInfoType& transitionInfo,
0330                                        ServiceToken const& token,
0331                                        bool cleaningUpAfterException) {
0332     assert(iStreamID < streamSchedules_.size());
0333     streamSchedules_[iStreamID]->processOneStreamAsync<T>(
0334         std::move(iTaskHolder), transitionInfo, token, cleaningUpAfterException);
0335   }
0336 
0337   template <typename T>
0338   void Schedule::processOneGlobalAsync(WaitingTaskHolder iTaskHolder,
0339                                        typename T::TransitionInfoType& transitionInfo,
0340                                        ServiceToken const& token,
0341                                        bool cleaningUpAfterException) {
0342     globalSchedule_->processOneGlobalAsync<T>(iTaskHolder, transitionInfo, token, cleaningUpAfterException);
0343   }
0344 
0345 }  // namespace edm
0346 #endif