Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-01-11 16:27:12

0001 #ifndef FWCore_Framework_Schedule_h
0002 #define FWCore_Framework_Schedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The Schedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The Schedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0061 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0064 #include "FWCore/Framework/interface/Frameworkfwd.h"
0065 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0066 #include "FWCore/Framework/interface/WorkerManager.h"
0067 #include "FWCore/Framework/interface/maker/Worker.h"
0068 #include "FWCore/Framework/interface/WorkerRegistry.h"
0069 #include "FWCore/Framework/interface/GlobalSchedule.h"
0070 #include "FWCore/Framework/interface/StreamSchedule.h"
0071 #include "FWCore/Framework/interface/SystemTimeKeeper.h"
0072 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0074 #include "FWCore/MessageLogger/interface/JobReport.h"
0075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0076 #include "FWCore/ServiceRegistry/interface/Service.h"
0077 #include "FWCore/Utilities/interface/Algorithms.h"
0078 #include "FWCore/Utilities/interface/BranchType.h"
0079 #include "FWCore/Utilities/interface/ConvertException.h"
0080 #include "FWCore/Utilities/interface/Exception.h"
0081 #include "FWCore/Utilities/interface/StreamID.h"
0082 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0083 #include "FWCore/Utilities/interface/propagate_const.h"
0084 
0085 #include <array>
0086 #include <map>
0087 #include <memory>
0088 #include <set>
0089 #include <string>
0090 #include <vector>
0091 #include <sstream>
0092 #include <utility>
0093 
0094 namespace edm {
0095 
0096   namespace service {
0097     class TriggerNamesService;
0098   }
0099   namespace evetnsetup {
0100     class ESRecordsToProxyIndices;
0101   }
0102 
0103   class ActivityRegistry;
0104   class BranchIDListHelper;
0105   class EventTransitionInfo;
0106   class ExceptionCollector;
0107   class MergeableRunProductMetadata;
0108   class OutputModuleCommunicator;
0109   class ProcessContext;
0110   class ProductRegistry;
0111   class PreallocationConfiguration;
0112   class StreamSchedule;
0113   class GlobalSchedule;
0114   struct TriggerTimingReport;
0115   class ModuleRegistry;
0116   class ModuleTypeResolverMaker;
0117   class ThinnedAssociationsHelper;
0118   class SubProcessParentageHelper;
0119   class TriggerResultInserter;
0120   class PathStatusInserter;
0121   class EndPathStatusInserter;
0122   class WaitingTaskHolder;
0123 
0124   class Schedule {
0125   public:
0126     typedef std::vector<std::string> vstring;
0127     typedef std::vector<Worker*> AllWorkers;
0128     typedef std::vector<edm::propagate_const<std::shared_ptr<OutputModuleCommunicator>>> AllOutputModuleCommunicators;
0129 
0130     typedef std::vector<Worker*> Workers;
0131 
0132     Schedule(ParameterSet& proc_pset,
0133              service::TriggerNamesService const& tns,
0134              ProductRegistry& pregistry,
0135              ExceptionToActionTable const& actions,
0136              std::shared_ptr<ActivityRegistry> areg,
0137              std::shared_ptr<ProcessConfiguration const> processConfiguration,
0138              PreallocationConfiguration const& config,
0139              ProcessContext const* processContext,
0140              ModuleTypeResolverMaker const* resolverMaker);
0141     void finishSetup(ParameterSet& proc_pset,
0142                      service::TriggerNamesService const& tns,
0143                      ProductRegistry& preg,
0144                      BranchIDListHelper& branchIDListHelper,
0145                      ProcessBlockHelperBase& processBlockHelper,
0146                      ThinnedAssociationsHelper& thinnedAssociationsHelper,
0147                      SubProcessParentageHelper const* subProcessParentageHelper,
0148                      std::shared_ptr<ActivityRegistry> areg,
0149                      std::shared_ptr<ProcessConfiguration> processConfiguration,
0150                      bool hasSubprocesses,
0151                      PreallocationConfiguration const& prealloc,
0152                      ProcessContext const* processContext);
0153 
0154     void processOneEventAsync(WaitingTaskHolder iTask,
0155                               unsigned int iStreamID,
0156                               EventTransitionInfo&,
0157                               ServiceToken const& token);
0158 
0159     template <typename T>
0160     void processOneGlobalAsync(WaitingTaskHolder iTask,
0161                                typename T::TransitionInfoType& transitionInfo,
0162                                ServiceToken const& token,
0163                                bool cleaningUpAfterException = false);
0164 
0165     template <typename T>
0166     void processOneStreamAsync(WaitingTaskHolder iTask,
0167                                unsigned int iStreamID,
0168                                typename T::TransitionInfoType& transitionInfo,
0169                                ServiceToken const& token,
0170                                bool cleaningUpAfterException = false);
0171 
0172     void beginJob(ProductRegistry const&, eventsetup::ESRecordsToProxyIndices const&, ProcessBlockHelperBase const&);
0173     void endJob(ExceptionCollector& collector);
0174 
0175     void beginStream(unsigned int);
0176     void endStream(unsigned int);
0177 
0178     // Write the luminosity block
0179     void writeLumiAsync(WaitingTaskHolder iTask,
0180                         LuminosityBlockPrincipal const& lbp,
0181                         ProcessContext const*,
0182                         ActivityRegistry*);
0183 
0184     // Write the run
0185     void writeRunAsync(WaitingTaskHolder iTask,
0186                        RunPrincipal const& rp,
0187                        ProcessContext const*,
0188                        ActivityRegistry*,
0189                        MergeableRunProductMetadata const*);
0190 
0191     void writeProcessBlockAsync(WaitingTaskHolder iTask,
0192                                 ProcessBlockPrincipal const&,
0193                                 ProcessContext const*,
0194                                 ActivityRegistry*);
0195 
0196     // Call closeFile() on all OutputModules.
0197     void closeOutputFiles();
0198 
0199     // Call openFiles() on all OutputModules
0200     void openOutputFiles(FileBlock& fb);
0201 
0202     // Call respondToOpenInputFile() on all Modules
0203     void respondToOpenInputFile(FileBlock const& fb);
0204 
0205     // Call respondToCloseInputFile() on all Modules
0206     void respondToCloseInputFile(FileBlock const& fb);
0207 
0208     // Call shouldWeCloseFile() on all OutputModules.
0209     bool shouldWeCloseOutput() const;
0210 
0211     /// Return a vector allowing const access to all the
0212     /// ModuleDescriptions for this Schedule.
0213 
0214     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0215     /// *** passed to the caller. Do not call delete on these
0216     /// *** pointers!
0217     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0218 
0219     ///adds to oLabelsToFill the labels for all paths in the process
0220     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0221 
0222     ///Adds to oLabelsToFill the labels for all trigger paths in the process.
0223     ///This is different from availablePaths because it includes the
0224     ///empty paths to match the entries in TriggerResults exactly.
0225     void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
0226 
0227     ///adds to oLabelsToFill the labels for all end paths in the process
0228     void endPaths(std::vector<std::string>& oLabelsToFill) const;
0229 
0230     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0231     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0232 
0233     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iPathLabel
0234     ///hint is a performance optimization if you might know the position of the module in the path
0235     void moduleDescriptionsInPath(std::string const& iPathLabel,
0236                                   std::vector<ModuleDescription const*>& descriptions,
0237                                   unsigned int hint) const;
0238 
0239     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iEndPathLabel
0240     ///hint is a performance optimization if you might know the position of the module in the path
0241     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0242                                      std::vector<ModuleDescription const*>& descriptions,
0243                                      unsigned int hint) const;
0244 
0245     void fillModuleAndConsumesInfo(
0246         std::vector<ModuleDescription const*>& allModuleDescriptions,
0247         std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
0248         std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>&
0249             modulesWhoseProductsAreConsumedBy,
0250         std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
0251         ProductRegistry const& preg) const;
0252 
0253     /// Return the number of events this Schedule has tried to process
0254     /// (inclues both successes and failures, including failures due
0255     /// to exceptions during processing).
0256     int totalEvents() const;
0257 
0258     /// Return the number of events which have been passed by one or
0259     /// more trigger paths.
0260     int totalEventsPassed() const;
0261 
0262     /// Return the number of events that have not passed any trigger.
0263     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0264     int totalEventsFailed() const;
0265 
0266     /// Return the trigger report information on paths,
0267     /// modules-in-path, modules-in-endpath, and modules.
0268     void getTriggerReport(TriggerReport& rep) const;
0269 
0270     /// Return the trigger timing report information on paths,
0271     /// modules-in-path, modules-in-endpath, and modules.
0272     void getTriggerTimingReport(TriggerTimingReport& rep) const;
0273 
0274     /// Return whether each output module has reached its maximum count.
0275     bool terminate() const;
0276 
0277     ///  Clear all the counters in the trigger report.
0278     void clearCounters();
0279 
0280     /// clone the type of module with label iLabel but configure with iPSet.
0281     /// Returns true if successful.
0282     bool changeModule(std::string const& iLabel,
0283                       ParameterSet const& iPSet,
0284                       const ProductRegistry& iRegistry,
0285                       eventsetup::ESRecordsToProxyIndices const&);
0286 
0287     /// Deletes module with label iLabel
0288     void deleteModule(std::string const& iLabel, ActivityRegistry* areg);
0289 
0290     void initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly,
0291                                std::multimap<std::string, std::string> const& referencesToBranches,
0292                                std::vector<std::string> const& modulesToSkip,
0293                                edm::ProductRegistry const& preg);
0294 
0295     /// returns the collection of pointers to workers
0296     AllWorkers const& allWorkers() const;
0297 
0298     /// Convert "@currentProcess" in InputTag process names to the actual current process name.
0299     void convertCurrentProcessAlias(std::string const& processName);
0300 
0301   private:
0302     void limitOutput(ParameterSet const& proc_pset,
0303                      BranchIDLists const& branchIDLists,
0304                      SubProcessParentageHelper const* subProcessParentageHelper);
0305 
0306     std::shared_ptr<TriggerResultInserter const> resultsInserter() const {
0307       return get_underlying_safe(resultsInserter_);
0308     }
0309     std::shared_ptr<TriggerResultInserter>& resultsInserter() { return get_underlying_safe(resultsInserter_); }
0310     std::shared_ptr<ModuleRegistry const> moduleRegistry() const { return get_underlying_safe(moduleRegistry_); }
0311     std::shared_ptr<ModuleRegistry>& moduleRegistry() { return get_underlying_safe(moduleRegistry_); }
0312 
0313     edm::propagate_const<std::shared_ptr<TriggerResultInserter>> resultsInserter_;
0314     std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>> pathStatusInserters_;
0315     std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>> endPathStatusInserters_;
0316     edm::propagate_const<std::shared_ptr<ModuleRegistry>> moduleRegistry_;
0317     std::vector<edm::propagate_const<std::shared_ptr<StreamSchedule>>> streamSchedules_;
0318     //In the future, we will have one GlobalSchedule per simultaneous transition
0319     edm::propagate_const<std::unique_ptr<GlobalSchedule>> globalSchedule_;
0320 
0321     AllOutputModuleCommunicators all_output_communicators_;
0322     PreallocationConfiguration preallocConfig_;
0323 
0324     edm::propagate_const<std::unique_ptr<SystemTimeKeeper>> summaryTimeKeeper_;
0325 
0326     std::vector<std::string> const* pathNames_;
0327     std::vector<std::string> const* endPathNames_;
0328     bool wantSummary_;
0329   };
0330 
0331   template <typename T>
0332   void Schedule::processOneStreamAsync(WaitingTaskHolder iTaskHolder,
0333                                        unsigned int iStreamID,
0334                                        typename T::TransitionInfoType& transitionInfo,
0335                                        ServiceToken const& token,
0336                                        bool cleaningUpAfterException) {
0337     assert(iStreamID < streamSchedules_.size());
0338     streamSchedules_[iStreamID]->processOneStreamAsync<T>(
0339         std::move(iTaskHolder), transitionInfo, token, cleaningUpAfterException);
0340   }
0341 
0342   template <typename T>
0343   void Schedule::processOneGlobalAsync(WaitingTaskHolder iTaskHolder,
0344                                        typename T::TransitionInfoType& transitionInfo,
0345                                        ServiceToken const& token,
0346                                        bool cleaningUpAfterException) {
0347     globalSchedule_->processOneGlobalAsync<T>(iTaskHolder, transitionInfo, token, cleaningUpAfterException);
0348   }
0349 
0350 }  // namespace edm
0351 #endif