Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-06-03 00:59:00

0001 #ifndef FWCore_Framework_Schedule_h
0002 #define FWCore_Framework_Schedule_h
0003 
0004 /*
0005   Author: Jim Kowalkowski  28-01-06
0006 
0007   A class for creating a schedule based on paths in the configuration file.
0008   The schedule is maintained as a sequence of paths.
0009   After construction, events can be fed to the object and passed through
0010   all the modules in the schedule.  All accounting about processing
0011   of events by modules and paths is contained here or in object held
0012   by containment.
0013 
0014   The trigger results producer and product are generated and managed here.
0015   This class also manages endpaths and calls to endjob and beginjob.
0016   Endpaths are just treated as a simple list of modules that need to
0017   do processing of the event and do not participate in trigger path
0018   activities.
0019 
0020   This class requires the high-level process pset.  It uses @process_name.
0021   If the high-level pset contains an "options" pset, then the
0022   following optional parameter can be present:
0023   bool wantSummary = true/false   # default false
0024 
0025   wantSummary indicates whether or not the pass/fail/error stats
0026   for modules and paths should be printed at the end-of-job.
0027 
0028   A TriggerResults object will always be inserted into the event
0029   for any schedule.  The producer of the TriggerResults EDProduct
0030   is always the first module in the endpath.  The TriggerResultInserter
0031   is given a fixed label of "TriggerResults".
0032 
0033   Processing of an event happens by pushing the event through the Paths.
0034   The scheduler performs the reset() on each of the workers independent
0035   of the Path objects.
0036 
0037   ------------------------
0038 
0039   About Paths:
0040   Paths fit into two categories:
0041   1) trigger paths that contribute directly to saved trigger bits
0042   2) end paths
0043   The Schedule holds these paths in two data structures:
0044   1) main path list
0045   2) end path list
0046 
0047   Trigger path processing always precedes endpath processing.
0048   The order of the paths from the input configuration is
0049   preserved in the main paths list.
0050 
0051   ------------------------
0052 
0053   The Schedule uses the TriggerNamesService to get the names of the
0054   trigger paths and end paths. When a TriggerResults object is created
0055   the results are stored in the same order as the trigger names from
0056   TriggerNamesService.
0057 
0058 */
0059 
0060 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0061 #include "FWCore/Common/interface/FWCoreCommonFwd.h"
0062 #include "FWCore/Framework/interface/ExceptionActions.h"
0063 #include "FWCore/Framework/interface/ExceptionHelpers.h"
0064 #include "FWCore/Framework/interface/Frameworkfwd.h"
0065 #include "FWCore/Framework/interface/OccurrenceTraits.h"
0066 #include "FWCore/Framework/interface/WorkerManager.h"
0067 #include "FWCore/Framework/interface/maker/Worker.h"
0068 #include "FWCore/Framework/interface/WorkerRegistry.h"
0069 #include "FWCore/Framework/interface/GlobalSchedule.h"
0070 #include "FWCore/Framework/interface/StreamSchedule.h"
0071 #include "FWCore/Framework/interface/SystemTimeKeeper.h"
0072 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0073 #include "FWCore/MessageLogger/interface/ExceptionMessages.h"
0074 #include "FWCore/MessageLogger/interface/JobReport.h"
0075 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0076 #include "FWCore/ServiceRegistry/interface/Service.h"
0077 #include "FWCore/Utilities/interface/Algorithms.h"
0078 #include "FWCore/Utilities/interface/BranchType.h"
0079 #include "FWCore/Utilities/interface/ConvertException.h"
0080 #include "FWCore/Utilities/interface/Exception.h"
0081 #include "FWCore/Utilities/interface/StreamID.h"
0082 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0083 #include "FWCore/Utilities/interface/propagate_const.h"
0084 
0085 #include <array>
0086 #include <map>
0087 #include <memory>
0088 #include <set>
0089 #include <string>
0090 #include <vector>
0091 #include <sstream>
0092 #include <utility>
0093 
0094 namespace edm {
0095 
0096   namespace service {
0097     class TriggerNamesService;
0098   }
0099   namespace evetnsetup {
0100     class ESRecordsToProxyIndices;
0101   }
0102 
0103   class ActivityRegistry;
0104   class BranchIDListHelper;
0105   class EventTransitionInfo;
0106   class ExceptionCollector;
0107   class MergeableRunProductMetadata;
0108   class OutputModuleCommunicator;
0109   class ProcessContext;
0110   class ProductRegistry;
0111   class PreallocationConfiguration;
0112   class StreamSchedule;
0113   class GlobalSchedule;
0114   struct TriggerTimingReport;
0115   class ModuleRegistry;
0116   class ThinnedAssociationsHelper;
0117   class SubProcessParentageHelper;
0118   class TriggerResultInserter;
0119   class PathStatusInserter;
0120   class EndPathStatusInserter;
0121   class WaitingTaskHolder;
0122 
0123   class Schedule {
0124   public:
0125     typedef std::vector<std::string> vstring;
0126     typedef std::vector<Worker*> AllWorkers;
0127     typedef std::vector<edm::propagate_const<std::shared_ptr<OutputModuleCommunicator>>> AllOutputModuleCommunicators;
0128 
0129     typedef std::vector<Worker*> Workers;
0130 
0131     Schedule(ParameterSet& proc_pset,
0132              service::TriggerNamesService const& tns,
0133              ProductRegistry& pregistry,
0134              BranchIDListHelper& branchIDListHelper,
0135              ProcessBlockHelperBase&,
0136              ThinnedAssociationsHelper& thinnedAssociationsHelper,
0137              SubProcessParentageHelper const* subProcessParentageHelper,
0138              ExceptionToActionTable const& actions,
0139              std::shared_ptr<ActivityRegistry> areg,
0140              std::shared_ptr<ProcessConfiguration> processConfiguration,
0141              bool hasSubprocesses,
0142              PreallocationConfiguration const& config,
0143              ProcessContext const* processContext);
0144 
0145     void processOneEventAsync(WaitingTaskHolder iTask,
0146                               unsigned int iStreamID,
0147                               EventTransitionInfo&,
0148                               ServiceToken const& token);
0149 
0150     template <typename T>
0151     void processOneGlobalAsync(WaitingTaskHolder iTask,
0152                                typename T::TransitionInfoType& transitionInfo,
0153                                ServiceToken const& token,
0154                                bool cleaningUpAfterException = false);
0155 
0156     template <typename T>
0157     void processOneStreamAsync(WaitingTaskHolder iTask,
0158                                unsigned int iStreamID,
0159                                typename T::TransitionInfoType& transitionInfo,
0160                                ServiceToken const& token,
0161                                bool cleaningUpAfterException = false);
0162 
0163     void beginJob(ProductRegistry const&, eventsetup::ESRecordsToProxyIndices const&, ProcessBlockHelperBase const&);
0164     void endJob(ExceptionCollector& collector);
0165 
0166     void beginStream(unsigned int);
0167     void endStream(unsigned int);
0168 
0169     // Write the luminosity block
0170     void writeLumiAsync(WaitingTaskHolder iTask,
0171                         LuminosityBlockPrincipal const& lbp,
0172                         ProcessContext const*,
0173                         ActivityRegistry*);
0174 
0175     // Write the run
0176     void writeRunAsync(WaitingTaskHolder iTask,
0177                        RunPrincipal const& rp,
0178                        ProcessContext const*,
0179                        ActivityRegistry*,
0180                        MergeableRunProductMetadata const*);
0181 
0182     void writeProcessBlockAsync(WaitingTaskHolder iTask,
0183                                 ProcessBlockPrincipal const&,
0184                                 ProcessContext const*,
0185                                 ActivityRegistry*);
0186 
0187     // Call closeFile() on all OutputModules.
0188     void closeOutputFiles();
0189 
0190     // Call openFiles() on all OutputModules
0191     void openOutputFiles(FileBlock& fb);
0192 
0193     // Call respondToOpenInputFile() on all Modules
0194     void respondToOpenInputFile(FileBlock const& fb);
0195 
0196     // Call respondToCloseInputFile() on all Modules
0197     void respondToCloseInputFile(FileBlock const& fb);
0198 
0199     // Call shouldWeCloseFile() on all OutputModules.
0200     bool shouldWeCloseOutput() const;
0201 
0202     /// Return a vector allowing const access to all the
0203     /// ModuleDescriptions for this Schedule.
0204 
0205     /// *** N.B. *** Ownership of the ModuleDescriptions is *not*
0206     /// *** passed to the caller. Do not call delete on these
0207     /// *** pointers!
0208     std::vector<ModuleDescription const*> getAllModuleDescriptions() const;
0209 
0210     ///adds to oLabelsToFill the labels for all paths in the process
0211     void availablePaths(std::vector<std::string>& oLabelsToFill) const;
0212 
0213     ///Adds to oLabelsToFill the labels for all trigger paths in the process.
0214     ///This is different from availablePaths because it includes the
0215     ///empty paths to match the entries in TriggerResults exactly.
0216     void triggerPaths(std::vector<std::string>& oLabelsToFill) const;
0217 
0218     ///adds to oLabelsToFill the labels for all end paths in the process
0219     void endPaths(std::vector<std::string>& oLabelsToFill) const;
0220 
0221     ///adds to oLabelsToFill in execution order the labels of all modules in path iPathLabel
0222     void modulesInPath(std::string const& iPathLabel, std::vector<std::string>& oLabelsToFill) const;
0223 
0224     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iPathLabel
0225     ///hint is a performance optimization if you might know the position of the module in the path
0226     void moduleDescriptionsInPath(std::string const& iPathLabel,
0227                                   std::vector<ModuleDescription const*>& descriptions,
0228                                   unsigned int hint) const;
0229 
0230     ///adds the ModuleDescriptions into the vector for the modules scheduled in path iEndPathLabel
0231     ///hint is a performance optimization if you might know the position of the module in the path
0232     void moduleDescriptionsInEndPath(std::string const& iEndPathLabel,
0233                                      std::vector<ModuleDescription const*>& descriptions,
0234                                      unsigned int hint) const;
0235 
0236     void fillModuleAndConsumesInfo(
0237         std::vector<ModuleDescription const*>& allModuleDescriptions,
0238         std::vector<std::pair<unsigned int, unsigned int>>& moduleIDToIndex,
0239         std::array<std::vector<std::vector<ModuleDescription const*>>, NumBranchTypes>&
0240             modulesWhoseProductsAreConsumedBy,
0241         std::vector<std::vector<ModuleProcessName>>& modulesInPreviousProcessesWhoseProductsAreConsumedBy,
0242         ProductRegistry const& preg) const;
0243 
0244     /// Return the number of events this Schedule has tried to process
0245     /// (inclues both successes and failures, including failures due
0246     /// to exceptions during processing).
0247     int totalEvents() const;
0248 
0249     /// Return the number of events which have been passed by one or
0250     /// more trigger paths.
0251     int totalEventsPassed() const;
0252 
0253     /// Return the number of events that have not passed any trigger.
0254     /// (N.B. totalEventsFailed() + totalEventsPassed() == totalEvents()
0255     int totalEventsFailed() const;
0256 
0257     /// Return the trigger report information on paths,
0258     /// modules-in-path, modules-in-endpath, and modules.
0259     void getTriggerReport(TriggerReport& rep) const;
0260 
0261     /// Return the trigger timing report information on paths,
0262     /// modules-in-path, modules-in-endpath, and modules.
0263     void getTriggerTimingReport(TriggerTimingReport& rep) const;
0264 
0265     /// Return whether each output module has reached its maximum count.
0266     bool terminate() const;
0267 
0268     ///  Clear all the counters in the trigger report.
0269     void clearCounters();
0270 
0271     /// clone the type of module with label iLabel but configure with iPSet.
0272     /// Returns true if successful.
0273     bool changeModule(std::string const& iLabel,
0274                       ParameterSet const& iPSet,
0275                       const ProductRegistry& iRegistry,
0276                       eventsetup::ESRecordsToProxyIndices const&);
0277 
0278     /// Deletes module with label iLabel
0279     void deleteModule(std::string const& iLabel, ActivityRegistry* areg);
0280 
0281     void initializeEarlyDelete(std::vector<std::string> const& branchesToDeleteEarly, edm::ProductRegistry const& preg);
0282 
0283     /// returns the collection of pointers to workers
0284     AllWorkers const& allWorkers() const;
0285 
0286     /// Convert "@currentProcess" in InputTag process names to the actual current process name.
0287     void convertCurrentProcessAlias(std::string const& processName);
0288 
0289   private:
0290     void limitOutput(ParameterSet const& proc_pset,
0291                      BranchIDLists const& branchIDLists,
0292                      SubProcessParentageHelper const* subProcessParentageHelper);
0293 
0294     std::shared_ptr<TriggerResultInserter const> resultsInserter() const {
0295       return get_underlying_safe(resultsInserter_);
0296     }
0297     std::shared_ptr<TriggerResultInserter>& resultsInserter() { return get_underlying_safe(resultsInserter_); }
0298     std::shared_ptr<ModuleRegistry const> moduleRegistry() const { return get_underlying_safe(moduleRegistry_); }
0299     std::shared_ptr<ModuleRegistry>& moduleRegistry() { return get_underlying_safe(moduleRegistry_); }
0300 
0301     edm::propagate_const<std::shared_ptr<TriggerResultInserter>> resultsInserter_;
0302     std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>> pathStatusInserters_;
0303     std::vector<edm::propagate_const<std::shared_ptr<EndPathStatusInserter>>> endPathStatusInserters_;
0304     edm::propagate_const<std::shared_ptr<ModuleRegistry>> moduleRegistry_;
0305     std::vector<edm::propagate_const<std::shared_ptr<StreamSchedule>>> streamSchedules_;
0306     //In the future, we will have one GlobalSchedule per simultaneous transition
0307     edm::propagate_const<std::unique_ptr<GlobalSchedule>> globalSchedule_;
0308 
0309     AllOutputModuleCommunicators all_output_communicators_;
0310     PreallocationConfiguration preallocConfig_;
0311 
0312     edm::propagate_const<std::unique_ptr<SystemTimeKeeper>> summaryTimeKeeper_;
0313 
0314     std::vector<std::string> const* pathNames_;
0315     std::vector<std::string> const* endPathNames_;
0316     bool wantSummary_;
0317   };
0318 
0319   template <typename T>
0320   void Schedule::processOneStreamAsync(WaitingTaskHolder iTaskHolder,
0321                                        unsigned int iStreamID,
0322                                        typename T::TransitionInfoType& transitionInfo,
0323                                        ServiceToken const& token,
0324                                        bool cleaningUpAfterException) {
0325     assert(iStreamID < streamSchedules_.size());
0326     streamSchedules_[iStreamID]->processOneStreamAsync<T>(
0327         std::move(iTaskHolder), transitionInfo, token, cleaningUpAfterException);
0328   }
0329 
0330   template <typename T>
0331   void Schedule::processOneGlobalAsync(WaitingTaskHolder iTaskHolder,
0332                                        typename T::TransitionInfoType& transitionInfo,
0333                                        ServiceToken const& token,
0334                                        bool cleaningUpAfterException) {
0335     globalSchedule_->processOneGlobalAsync<T>(iTaskHolder, transitionInfo, token, cleaningUpAfterException);
0336   }
0337 
0338 }  // namespace edm
0339 #endif