Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-14 23:36:19

0001 #ifndef FWCore_MessageLogger_JobReport_h
0002 #define FWCore_MessageLogger_JobReport_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     MessageLogger
0006 // Class  :     JobReport
0007 //
0008 /**\class edm::JobReport
0009 
0010 Description: A service that collections job handling information.
0011 
0012 Usage:
0013 The JobReport service collects 'job handling' information (currently
0014 file handling) from several sources, collates the information, and
0015 at appropriate intervals, reports the information to the job report,
0016 through the MessageLogger.
0017 
0018 CMS-THREADING Some notes about thread safety:
0019 
0020 1. It is assumed non-experts will turn the job report on
0021 and look at the XML output and NOT call the available public
0022 functions. Many of the available service functions can be
0023 called at times that are not thread safe (causing data
0024 races or interleaved output). The following notes are for
0025 Framework experts and detail the assumptions made when
0026 modifying JobReport to run in the multithreaded Framework.
0027 
0028 2. We assume the primary input source performs its activity
0029 serially. There can be multiple secondary input sources running
0030 concurrently.
0031 
0032 3. We assume the following sequence of activities where the lines
0033 of asterisks indicate synchronization points:
0034 
0035 ****************
0036 open primary input file
0037 nothing else running concurrently
0038 Also respondToOpenInputFiles serially
0039 ****************
0040 open output files serially
0041 nothing else running concurrently
0042 ****************
0043 The primary source runs its other tasks concurrently
0044 with the secondary sources running their tasks and
0045 modules running their tasks.
0046 ****************
0047 close primary input file
0048 nothing else running concurrently
0049 ****************
0050 close output files serially
0051 nothing else running concurrently
0052 ****************
0053 repeat the above (the output file opens and closes
0054 are optional except for the first and last)
0055 ***********************
0056 All endJob and postEndJob activities occur serially
0057 not concurrently
0058 ************************
0059 
0060 4. We assume that a single instance of an OutputModule
0061 will only be running on one thread at a time.
0062 Other instances of that type of OutputModule may be running
0063 concurrently. There are several functions where this is
0064 an issue. We have discussed that in the future we might
0065 decide to run the OutputModule for multiple events
0066 concurrently. That would require further modifications
0067 of either the JobReport or the OutputModule.
0068 
0069 5. For Primary and SecondaryFile input sources (not
0070 SecondarySource) the calls to reportBranchName from the
0071 delayed reader need to be serialized.
0072 
0073 */
0074 //
0075 // Original Author:  Marc Paterno
0076 //
0077 
0078 /*
0079 Changes Log 1: 2009/01/14 10:29:00, Natalia Garcia Nebot
0080         Modified and added some methods to report CPU and memory information from /proc/cpuinfo
0081         and /proc/meminfo files and Memory statistics
0082 */
0083 
0084 #include "DataFormats/Provenance/interface/RunLumiEventNumber.h"
0085 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0086 #include "DataFormats/Provenance/interface/ProcessConfigurationID.h"
0087 #include "FWCore/Utilities/interface/InputType.h"
0088 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0089 
0090 #include <atomic>
0091 #include <cstddef>
0092 #include <iosfwd>
0093 #include <map>
0094 #include <memory>
0095 #include <mutex>
0096 #include <set>
0097 #include <string>
0098 #include <vector>
0099 
0100 #include "oneapi/tbb/concurrent_unordered_map.h"
0101 #include "oneapi/tbb/concurrent_vector.h"
0102 
0103 namespace edm {
0104 
0105   class JobReport {
0106   public:
0107     typedef unsigned int RunNumber;
0108     typedef std::size_t Token;
0109 
0110     struct RunReport {
0111       RunNumber runNumber;
0112       std::map<unsigned int, unsigned long> lumiSectionsToNEvents;
0113     };
0114 
0115     /**\struct InputFile
0116 
0117       Description: Holds information about an InputFile.
0118 
0119       Usage: The struct InputFile is a collection of the information that
0120       Data Handling wishes to accumulate about the use of a file that has
0121       been opened for input.
0122       */
0123 
0124     struct InputFile {
0125       typedef std::vector<std::string> StringVector;
0126 
0127       std::string logicalFileName;
0128       std::string physicalFileName;
0129       std::string catalog;
0130       std::string inputType;             // primaryFiles, secondaryFiles, mixingFiles
0131       std::string inputSourceClassName;  // class which created the file
0132       std::string moduleLabel;           // name of class instance
0133       std::string guid;
0134       std::size_t numEventsRead;
0135       StringVector branchNames;
0136       std::map<RunNumber, RunReport> runReports;
0137       bool fileHasBeenClosed;
0138       std::set<std::string> fastClonedBranches;
0139     };
0140 
0141     /**\struct OutputFile
0142 
0143       Description: Holds information about an OutputFile.
0144 
0145       Usage: The struct OutputFile is a collection of the information that
0146       Data Handling wishes to accumulate about the use of a file that has
0147       been opened for output.
0148       */
0149 
0150     struct OutputFile {
0151       typedef InputFile::StringVector StringVector;
0152 
0153       std::string logicalFileName;
0154       std::string physicalFileName;
0155       std::string catalog;
0156       std::string outputModuleClassName;
0157       std::string moduleLabel;  // name of class instance
0158       std::string guid;
0159       std::string dataType;
0160       std::string branchHash;
0161       std::size_t numEventsWritten;
0162       StringVector branchNames;
0163       std::vector<Token> contributingInputs;
0164       oneapi::tbb::concurrent_vector<Token> contributingInputsSecSource;
0165       std::map<std::string, bool> fastCopyingInputs;
0166       std::map<RunNumber, RunReport> runReports;
0167       bool fileHasBeenClosed;
0168     };
0169 
0170     class AtomicLongLong {
0171     public:
0172       AtomicLongLong() : value_(0) {}
0173       AtomicLongLong(AtomicLongLong const& r) : value_(r.value_.load()) {}
0174       std::atomic<long long>& value() { return value_; }
0175       std::atomic<long long> const& value() const { return value_; }
0176 
0177     private:
0178       std::atomic<long long> value_;
0179     };
0180 
0181     struct JobReportImpl {
0182       JobReportImpl(JobReportImpl const&) = delete;
0183       JobReportImpl& operator=(JobReportImpl const&) = delete;
0184       JobReportImpl(JobReportImpl&&) = delete;
0185       JobReportImpl& operator=(JobReportImpl&&) = delete;
0186 
0187       InputFile& getInputFileForToken(InputType inputType, Token t);
0188       OutputFile& getOutputFileForToken(Token t);
0189 
0190       /*
0191          * Add the input file token provided to every output
0192          * file currently available.
0193          * Used whenever a new input file is opened, it's token
0194          * is added to all open output files as a contributor
0195          */
0196       void insertInputForOutputs(InputType inputType, Token t);
0197 
0198       /*
0199          * Associate a Lumi Section to all open output files
0200          *
0201          */
0202       void associateLumiSection(JobReport::Token token,
0203                                 unsigned int runNumber,
0204                                 unsigned int lumiSection,
0205                                 unsigned long nEvents);
0206 
0207       /*
0208          * Associate a Lumi Section to all open input files
0209          *
0210          */
0211       void associateInputLumiSection(unsigned int runNumber, unsigned int lumiSection);
0212 
0213       /*
0214          * Associate a run to all open output files
0215          */
0216       void associateRun(JobReport::Token token, unsigned int runNumber);
0217 
0218       /*
0219          * Associate a run to all open output files
0220          */
0221       void associateInputRun(unsigned int runNumber);
0222 
0223       /*
0224          * Write an InputFile object to the Logger
0225          * Generate XML string for InputFile instance and dispatch to
0226          * job report via MessageLogger
0227          */
0228       void writeInputFile(InputFile const& f);
0229 
0230       /*
0231          * Write an OutputFile object to the Logger
0232          * Generate an XML string for the OutputFile provided and
0233          * dispatch it to the logger
0234          * Contributing input tokens are resolved to the input LFN and PFN
0235          *
0236          * TODO: We have not yet addressed the issue where we cleanup not
0237          * contributing input files.
0238          * Also, it is possible to get fake input to output file mappings
0239          * if an input file is open already when a new output file is opened
0240          * but the input gets closed without contributing events to the
0241          * output file due to filtering etc.
0242          *
0243          */
0244       void writeOutputFile(OutputFile const& f);
0245 
0246       /*
0247          *  Flush all open files to logger in event of a problem.
0248          */
0249       void flushFiles(void);
0250 
0251       JobReportImpl(std::ostream* iOst) : printedReadBranches_(false), ost_(iOst) {}
0252 
0253       std::ostream const* ost() const { return get_underlying_safe(ost_); }
0254       std::ostream*& ost() { return get_underlying_safe(ost_); }
0255 
0256       std::vector<InputFile> inputFiles_;
0257       oneapi::tbb::concurrent_vector<InputFile> inputFilesSecSource_;
0258       oneapi::tbb::concurrent_vector<OutputFile> outputFiles_;
0259       std::map<std::string, long long> readBranches_;
0260       std::map<std::string, long long> readBranchesSecFile_;
0261       oneapi::tbb::concurrent_unordered_map<std::string, AtomicLongLong> readBranchesSecSource_;
0262       std::atomic<bool> printedReadBranches_;
0263       std::vector<InputFile>::size_type lastOpenedPrimaryInputFile_;
0264       edm::propagate_const<std::ostream*> ost_;
0265     };
0266 
0267     JobReport();
0268     //Does not take ownership of pointer
0269     JobReport(std::ostream* outputStream);
0270 
0271     JobReport& operator=(JobReport const&) = delete;
0272     JobReport(JobReport const&) = delete;
0273 
0274     ~JobReport();
0275 
0276     // Insert information about the process
0277     void reportProcess(std::string_view processName,
0278                        ProcessConfigurationID const& reducedProcessID,
0279                        ParameterSetID const& psetID);
0280 
0281     /// Report that an input file has been opened.
0282     /// The returned Token should be used for later identification
0283     /// of this file.
0284     Token inputFileOpened(std::string const& physicalFileName,
0285                           std::string const& logicalFileName,
0286                           std::string const& catalog,
0287                           std::string const& inputType,
0288                           std::string const& inputSourceClassName,
0289                           std::string const& moduleLabel,
0290                           std::string const& guid,
0291                           std::vector<std::string> const& branchNames);
0292 
0293     /// Report that an event has been read from
0294     /// the file identified by the given Token.
0295     void eventReadFromFile(InputType inputType, Token fileToken);
0296 
0297     ///
0298     /// Report the data type of a file after it has been opened
0299     /// Needed since the data type isn't known until an event has been
0300     /// read
0301     // CMS-THREADING Current implementation requires an instance of an
0302     // OuputModule run on only one thread at a time.
0303     void reportDataType(Token fileToken, std::string const& dataType);
0304 
0305     /// Report that the input file identified by the given Token has
0306     /// been closed. An exception will be thrown if the given Token
0307     /// was not obtained from inputFileOpened.
0308     void inputFileClosed(InputType inputType, Token fileToken);
0309 
0310     /// Report that an output file has been opened.
0311     /// The returned Token should be used for later identification
0312     /// of this file.
0313     Token outputFileOpened(std::string const& physicalFileName,
0314                            std::string const& logicalFileName,
0315                            std::string const& catalog,
0316                            std::string const& outputModuleClassName,
0317                            std::string const& moduleLabel,
0318                            std::string const& guid,
0319                            std::string const& dataType,
0320                            std::string const& branchHash,
0321                            std::vector<std::string> const& branchNames);
0322 
0323     /// Report that the event with the given id has been written to
0324     /// the file identified by the given Token.
0325     // CMS-THREADING Current implementation requires an instance of an
0326     // OuputModule run on only one thread at a time.
0327     void eventWrittenToFile(Token fileToken, RunNumber_t run, EventNumber_t event);
0328 
0329     /// Report that the output file identified by the given Token has
0330     /// been closed. An exception will be thrown if the given Token
0331     /// was not obtained from outputFileOpened.
0332     void outputFileClosed(Token fileToken);
0333 
0334     void reportSkippedEvent(RunNumber_t run, EventNumber_t event);
0335 
0336     /// API for reporting a Run to the job report.
0337     /// for output files, call only if Run is written to
0338     /// the output file
0339     ///
0340     void reportRunNumber(JobReport::Token token, unsigned int run);
0341 
0342     /// API for reporting a Lumi Section to the job report.
0343     /// for output files, call only if lumi section is written to
0344     /// the output file
0345     ///
0346     void reportLumiSection(JobReport::Token token, unsigned int run, unsigned int lumiSectId, unsigned long nEvents = 0);
0347 
0348     ///
0349     /// API for reporting a Lumi Section to the job report.
0350     /// for input files, call only if lumi section is physically read
0351     /// from the input file
0352     ///
0353     void reportInputLumiSection(unsigned int run, unsigned int lumiSectId);
0354 
0355     ///
0356     /// API to report a run read from input
0357     ///
0358     void reportInputRunNumber(unsigned int run);
0359 
0360     ///
0361     /// Report an exception, providing details of the problem as
0362     /// a short description (Eg "XXXError") and a long description
0363     /// (Eg "XXX crashed because...")
0364     /// Also overload this method to accept an optional standard exit code
0365     void reportError(std::string const& shortDesc, std::string const& longDesc, int const& exitCode);
0366 
0367     ///
0368     /// Report a unix signal sent to early terminate the job
0369     ///
0370     void reportShutdownSignal();
0371 
0372     ///
0373     /// Report Skipped File
0374     ///
0375     /// Report that a file has been skipped due to it not being
0376     /// found.
0377     void reportSkippedFile(std::string const& pfn, std::string const& lfn);
0378 
0379     void reportFallbackAttempt(std::string const& pfn, std::string const& lfn, std::string const& err);
0380 
0381     void reportAnalysisFile(std::string const& fileName, std::map<std::string, std::string> const& fileData);
0382 
0383     ///
0384     /// Report Memory statistics
0385     /// ALTERNATE FORM - USING THIS MAY NECESSITATE CHANGES IN PARSING XML!
0386     /// Invoked by the Memory service to send an end of job
0387     /// summary about memory usage for inclusion in the job report
0388     ///
0389     void reportMemoryInfo(std::vector<std::string> const& memoryData);
0390 
0391     ///
0392     /// Report Message statistics
0393     /// Invoked by the MessageLogger service to send an end of job
0394     /// summary about numbers of various categories messages issued
0395     /// for inclusion in the job report
0396     ///
0397     void reportMessageInfo(std::map<std::string, double> const& messageData);
0398 
0399     ///
0400     /// Report read branches
0401     /// Invoked by the source that reads ROOT/EDM files at the end of a job
0402     /// to report which product branches in the Events tree have been read,
0403     /// with a count of the number of events for which the branch was read.
0404     ///
0405     void reportReadBranches();
0406 
0407     ///  Inform the job report that a branch has been read.
0408     void reportReadBranch(InputType inputType, std::string const& branchName);
0409 
0410     ///  Inform the job report that branches have been fast Cloned.
0411     void reportFastClonedBranches(std::set<std::string> const& fastClonedBranches, long long nEvents);
0412 
0413     ///
0414     /// Report the name of the random engine persistency file
0415     ///
0416     void reportRandomStateFile(std::string const& name);
0417 
0418     /*
0419        * Report information about fast copying. Called for each open output file
0420        * whenever an input file is opened.
0421        */
0422     void reportFastCopyingStatus(Token t, std::string const& inputFileName, bool fastCopying);
0423 
0424     ///
0425     /// Performance Reports
0426     ///
0427     /// Two categories:  Summary for entire job and module
0428     /// for a specific module
0429     /// Each one requires a performance metric class such
0430     /// as Timing, Memory, CPU, Trigger etc.
0431     void reportPerformanceSummary(std::string const& metricClass, std::map<std::string, std::string> const& metrics);
0432 
0433     void reportPerformanceForModule(std::string const& metricClass,
0434                                     std::string const& moduleName,
0435                                     std::map<std::string, std::string> const& metrics);
0436 
0437     /// debug/test util
0438     std::string dumpFiles(void);
0439 
0440   protected:
0441     edm::propagate_const<std::unique_ptr<JobReportImpl>>& impl() { return impl_; }
0442 
0443   private:
0444     void temporarilyCloseXML();
0445     edm::propagate_const<std::unique_ptr<JobReportImpl>> impl_;
0446     std::mutex write_mutex;
0447     bool errorLogged_ = false;
0448   };
0449 
0450   std::ostream& operator<<(std::ostream& os, JobReport::InputFile const& f);
0451   std::ostream& operator<<(std::ostream& os, JobReport::OutputFile const& f);
0452 }  // namespace edm
0453 #endif