Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:05

0001 #ifndef FWCore_Framework_Schedule_h
0002 #define FWCore_Framework_Schedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The Schedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The Schedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0061 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0064 #include "FWCore/Framework/interface/Frameworkfwd.h"
0065 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0066 #include "FWCore/Framework/interface/WorkerManager.h"
0067 #include "FWCore/Framework/interface/maker/Worker.h"
0068 #include "FWCore/Framework/interface/WorkerRegistry.h"
0069 #include "FWCore/Framework/interface/GlobalSchedule.h"
0070 #include "FWCore/Framework/interface/StreamSchedule.h"
0071 #include "FWCore/Framework/interface/SystemTimeKeeper.h"
0072 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0074 #include "FWCore/MessageLogger/interface/JobReport.h"
0075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0076 #include "FWCore/ServiceRegistry/interface/Service.h"
0077 #include "FWCore/Utilities/interface/Algorithms.h"
0078 #include "FWCore/Utilities/interface/BranchType.h"
0079 #include "FWCore/Utilities/interface/ConvertException.h"
0080 #include "FWCore/Utilities/interface/Exception.h"
0081 #include "FWCore/Utilities/interface/StreamID.h"
0082 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0083 #include "FWCore/Utilities/interface/propagate_const.h"
0084 
0085 #include <array>
0086 #include <map>
0087 #include <memory>
0088 #include <set>
0089 #include <string>
0090 #include <vector>
0091 #include <sstream>
0092 #include <utility>
0093 
0094 namespace edm {
0095 
0096   namespace service {
0097     class TriggerNamesService;
0098   }
0099   namespace evetnsetup {
0100     class ESRecordsToProductResolverIndices;
0101   }
0102 
0103   class ActivityRegistry;
0104   class BranchIDListHelper;
0105   class EventTransitionInfo;
0106   class ExceptionCollector;
0107   class MergeableRunProductMetadata;
0108   class OutputModuleCommunicator;
0109   class ProcessContext;
0110   class ProductRegistry;
0111   class PreallocationConfiguration;
0112   class StreamSchedule;
0113   class GlobalSchedule;
0114   struct TriggerTimingReport;
0115   class ModuleRegistry;
0116   class ModuleTypeResolverMaker;
0117   class ThinnedAssociationsHelper;
0118   class SubProcessParentageHelper;
0119   class TriggerResultInserter;
0120   class PathStatusInserter;
0121   class EndPathStatusInserter;
0122   class WaitingTaskHolder;
0123 
0124   class Schedule {
0125   public:
0126     typedef std::vector<std::string> vstring;
0127     typedef std::vector<Worker*> AllWorkers;
0128     typedef std::vector<edm::propagate_const<std::shared_ptr<OutputModuleCommunicator>>> AllOutputModuleCommunicators;
0129 
0130     typedef std::vector<Worker*> Workers;
0131 
0132     Schedule(ParameterSet& proc_pset,
0133              service::TriggerNamesService const& tns,
0134              ProductRegistry& pregistry,
0135              ExceptionToActionTable const& actions,
0136              std::shared_ptr<ActivityRegistry> areg,
0137              std::shared_ptr<ProcessConfiguration const> processConfiguration,
0138              PreallocationConfiguration const& config,
0139              ProcessContext const* processContext,
0140              ModuleTypeResolverMaker const* resolverMaker);
0141     void finishSetup(ParameterSet& proc_pset,
0142                      service::TriggerNamesService const& tns,
0143                      ProductRegistry& preg,
0144                      BranchIDListHelper& branchIDListHelper,
0145                      ProcessBlockHelperBase& processBlockHelper,
0146                      ThinnedAssociationsHelper& thinnedAssociationsHelper,
0147                      SubProcessParentageHelper const* subProcessParentageHelper,
0148                      std::shared_ptr<ActivityRegistry> areg,
0149                      std::shared_ptr<ProcessConfiguration> processConfiguration,
0150                      bool hasSubprocesses,
0151                      PreallocationConfiguration const& prealloc,
0152                      ProcessContext const* processContext);
0153 
0154     void processOneEventAsync(WaitingTaskHolder iTask,
0155                               unsigned int iStreamID,
0156                               EventTransitionInfo&,
0157                               ServiceToken const& token);
0158 
0159     template <typename T>
0160     void processOneGlobalAsync(WaitingTaskHolder iTask,
0161                                typename T::TransitionInfoType& transitionInfo,
0162                                ServiceToken const& token,
0163                                bool cleaningUpAfterException = false);
0164 
0165     template <typename T>
0166     void processOneStreamAsync(WaitingTaskHolder iTask,
0167                                unsigned int iStreamID,
0168                                typename T::TransitionInfoType& transitionInfo,
0169                                ServiceToken const& token,
0170                                bool cleaningUpAfterException = false);
0171 
0172     void beginJob(ProductRegistry const&,
0173                   eventsetup::ESRecordsToProductResolverIndices const&,
0174                   ProcessBlockHelperBase const&);
0175     void endJob(ExceptionCollector& collector);
0176 
0177     void beginStream(unsigned int);
0178     void endStream(unsigned int);
0179 
0180     // Write the luminosity block
0181     void writeLumiAsync(WaitingTaskHolder iTask,
0182                         LuminosityBlockPrincipal const& lbp,
0183                         ProcessContext const*,
0184                         ActivityRegistry*);
0185 
0186     // Write the run
0187     void writeRunAsync(WaitingTaskHolder iTask,
0188                        RunPrincipal const& rp,
0189                        ProcessContext const*,
0190                        ActivityRegistry*,
0191                        MergeableRunProductMetadata const*);
0192 
0193     void writeProcessBlockAsync(WaitingTaskHolder iTask,
0194                                 ProcessBlockPrincipal const&,
0195                                 ProcessContext const*,
0196                                 ActivityRegistry*);
0197 
0198     // Call closeFile() on all OutputModules.
0199     void closeOutputFiles();
0200 
0201     // Call openFiles() on all OutputModules
0202     void openOutputFiles(FileBlock& fb);
0203 
0204     // Call respondToOpenInputFile() on all Modules
0205     void respondToOpenInputFile(FileBlock const& fb);
0206 
0207     // Call respondToCloseInputFile() on all Modules
0208     void respondToCloseInputFile(FileBlock const& fb);
0209 
0210     // Call shouldWeCloseFile() on all OutputModules.
0211     bool shouldWeCloseOutput() const;
0212 
0213     /// Return a vector allowing const access to all the
0214     /// ModuleDescriptions for this Schedule.
0215 
0216     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0217     /// *** passed to the caller. Do not call delete on these
0218     /// *** pointers!
0219     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0220 
0221     ///adds to oLabelsToFill the labels for all paths in the process
0222     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0223 
0224     ///Adds to oLabelsToFill the labels for all trigger paths in the process.
0225     ///This is different from availablePaths because it includes the
0226     ///empty paths to match the entries in TriggerResults exactly.
0227     void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
0228 
0229     ///adds to oLabelsToFill the labels for all end paths in the process
0230     void endPaths(std::vector<std::string>& oLabelsToFill) const;
0231 
0232     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0233     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0234 
0235     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iPathLabel
0236     ///hint is a performance optimization if you might know the position of the module in the path
0237     void moduleDescriptionsInPath(std::string const& iPathLabel,
0238                                   std::vector<ModuleDescription const*>& descriptions,
0239                                   unsigned int hint) const;
0240 
0241     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iEndPathLabel
0242     ///hint is a performance optimization if you might know the position of the module in the path
0243     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0244                                      std::vector<ModuleDescription const*>& descriptions,
0245                                      unsigned int hint) const;
0246 
0247     void fillModuleAndConsumesInfo(
0248         std::vector<ModuleDescription const*>& allModuleDescriptions,
0249         std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
0250         std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>&
0251             modulesWhoseProductsAreConsumedBy,
0252         std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
0253         ProductRegistry const& preg) const;
0254 
0255     /// Return the number of events this Schedule has tried to process
0256     /// (inclues both successes and failures, including failures due
0257     /// to exceptions during processing).
0258     int totalEvents() const;
0259 
0260     /// Return the number of events which have been passed by one or
0261     /// more trigger paths.
0262     int totalEventsPassed() const;
0263 
0264     /// Return the number of events that have not passed any trigger.
0265     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0266     int totalEventsFailed() const;
0267 
0268     /// Return the trigger report information on paths,
0269     /// modules-in-path, modules-in-endpath, and modules.
0270     void getTriggerReport(TriggerReport& rep) const;
0271 
0272     /// Return the trigger timing report information on paths,
0273     /// modules-in-path, modules-in-endpath, and modules.
0274     void getTriggerTimingReport(TriggerTimingReport& rep) const;
0275 
0276     /// Return whether each output module has reached its maximum count.
0277     bool terminate() const;
0278 
0279     ///  Clear all the counters in the trigger report.
0280     void clearCounters();
0281 
0282     /// clone the type of module with label iLabel but configure with iPSet.
0283     /// Returns true if successful.
0284     bool changeModule(std::string const& iLabel,
0285                       ParameterSet const& iPSet,
0286                       const ProductRegistry& iRegistry,
0287                       eventsetup::ESRecordsToProductResolverIndices const&);
0288 
0289     /// Deletes module with label iLabel
0290     void deleteModule(std::string const& iLabel, ActivityRegistry* areg);
0291 
0292     void initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
0293                                std::multimap<std::string, std::string> const& referencesToBranches,
0294                                std::vector<std::string> const& modulesToSkip,
0295                                edm::ProductRegistry const& preg);
0296 
0297     /// returns the collection of pointers to workers
0298     AllWorkers const& allWorkers() const;
0299 
0300     /// Convert "@currentProcess" in InputTag process names to the actual current process name.
0301     void convertCurrentProcessAlias(std::string const& processName);
0302 
0303   private:
0304     void limitOutput(ParameterSet const& proc_pset,
0305                      BranchIDLists const& branchIDLists,
0306                      SubProcessParentageHelper const* subProcessParentageHelper);
0307 
0308     std::shared_ptr<TriggerResultInserter const> resultsInserter() const {
0309       return get_underlying_safe(resultsInserter_);
0310     }
0311     std::shared_ptr<TriggerResultInserter>& resultsInserter() { return get_underlying_safe(resultsInserter_); }
0312     std::shared_ptr<ModuleRegistry const> moduleRegistry() const { return get_underlying_safe(moduleRegistry_); }
0313     std::shared_ptr<ModuleRegistry>& moduleRegistry() { return get_underlying_safe(moduleRegistry_); }
0314 
0315     edm::propagate_const<std::shared_ptr<TriggerResultInserter>> resultsInserter_;
0316     std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>> pathStatusInserters_;
0317     std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>> endPathStatusInserters_;
0318     edm::propagate_const<std::shared_ptr<ModuleRegistry>> moduleRegistry_;
0319     std::vector<edm::propagate_const<std::shared_ptr<StreamSchedule>>> streamSchedules_;
0320     //In the future, we will have one GlobalSchedule per simultaneous transition
0321     edm::propagate_const<std::unique_ptr<GlobalSchedule>> globalSchedule_;
0322 
0323     AllOutputModuleCommunicators all_output_communicators_;
0324     PreallocationConfiguration preallocConfig_;
0325 
0326     edm::propagate_const<std::unique_ptr<SystemTimeKeeper>> summaryTimeKeeper_;
0327 
0328     std::vector<std::string> const* pathNames_;
0329     std::vector<std::string> const* endPathNames_;
0330     bool wantSummary_;
0331   };
0332 
0333   template <typename T>
0334   void Schedule::processOneStreamAsync(WaitingTaskHolder iTaskHolder,
0335                                        unsigned int iStreamID,
0336                                        typename T::TransitionInfoType& transitionInfo,
0337                                        ServiceToken const& token,
0338                                        bool cleaningUpAfterException) {
0339     assert(iStreamID < streamSchedules_.size());
0340     streamSchedules_[iStreamID]->processOneStreamAsync<T>(
0341         std::move(iTaskHolder), transitionInfo, token, cleaningUpAfterException);
0342   }
0343 
0344   template <typename T>
0345   void Schedule::processOneGlobalAsync(WaitingTaskHolder iTaskHolder,
0346                                        typename T::TransitionInfoType& transitionInfo,
0347                                        ServiceToken const& token,
0348                                        bool cleaningUpAfterException) {
0349     globalSchedule_->processOneGlobalAsync<T>(iTaskHolder, transitionInfo, token, cleaningUpAfterException);
0350   }
0351 
0352 }  // namespace edm
0353 #endif