Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-03-29 07:44:22

0001 #ifndef FWCore_MessageLogger_JobReport_h
0002 #define FWCore_MessageLogger_JobReport_h
0003 // -*- C++ -*-
0004 //
0005 // Package:     MessageLogger
0006 // Class  :     JobReport
0007 //
0008 /**\class edm::JobReport
0009 
0010 Description: A service that collections job handling information.
0011 
0012 Usage:
0013 The JobReport service collects 'job handling' information (currently
0014 file handling) from several sources, collates the information, and
0015 at appropriate intervals, reports the information to the job report,
0016 through the MessageLogger.
0017 
0018 CMS-THREADING Some notes about thread safety:
0019 
0020 1. It is assumed non-experts will turn the job report on
0021 and look at the XML output and NOT call the available public
0022 functions. Many of the available service functions can be
0023 called at times that are not thread safe (causing data
0024 races or interleaved output). The following notes are for
0025 Framework experts and detail the assumptions made when
0026 modifying JobReport to run in the multithreaded Framework.
0027 
0028 2. We assume the primary input source performs its activity
0029 serially. There can be multiple secondary input sources running
0030 concurrently.
0031 
0032 3. We assume the following sequence of activities where the lines
0033 of asterisks indicate synchronization points:
0034 
0035 ****************
0036 open primary input file
0037 nothing else running concurrently
0038 Also respondToOpenInputFiles serially
0039 ****************
0040 open output files serially
0041 nothing else running concurrently
0042 ****************
0043 The primary source runs its other tasks concurrently
0044 with the secondary sources running their tasks and
0045 modules running their tasks.
0046 ****************
0047 close primary input file
0048 nothing else running concurrently
0049 ****************
0050 close output files serially
0051 nothing else running concurrently
0052 ****************
0053 repeat the above (the output file opens and closes
0054 are optional except for the first and last)
0055 ***********************
0056 All endJob and postEndJob activities occur serially
0057 not concurrently
0058 ************************
0059 
0060 4. We assume that a single instance of an OutputModule
0061 will only be running on one thread at a time.
0062 Other instances of that type of OutputModule may be running
0063 concurrently. There are several functions where this is
0064 an issue. We have discussed that in the future we might
0065 decide to run the OutputModule for multiple events
0066 concurrently. That would require further modifications
0067 of either the JobReport or the OutputModule.
0068 
0069 5. For Primary and SecondaryFile input sources (not
0070 SecondarySource) the calls to reportBranchName from the
0071 delayed reader need to be serialized.
0072 
0073 */
0074 //
0075 // Original Author:  Marc Paterno
0076 //
0077 
0078 /*
0079 Changes Log 1: 2009/01/14 10:29:00, Natalia Garcia Nebot
0080         Modified and added some methods to report CPU and memory information from /proc/cpuinfo
0081         and /proc/meminfo files and Memory statistics
0082 */
0083 
0084 #include "DataFormats/Provenance/interface/RunLumiEventNumber.h"
0085 #include "FWCore/Utilities/interface/InputType.h"
0086 #include "FWCore/Utilities/interface/get_underlying_safe.h"
0087 
0088 #include <atomic>
0089 #include <cstddef>
0090 #include <iosfwd>
0091 #include <map>
0092 #include <memory>
0093 #include <mutex>
0094 #include <set>
0095 #include <string>
0096 #include <vector>
0097 
0098 #include "oneapi/tbb/concurrent_unordered_map.h"
0099 #include "oneapi/tbb/concurrent_vector.h"
0100 
0101 namespace edm {
0102 
0103   class JobReport {
0104   public:
0105     typedef unsigned int RunNumber;
0106     typedef std::size_t Token;
0107 
0108     struct RunReport {
0109       RunNumber runNumber;
0110       std::map<unsigned int, unsigned long> lumiSectionsToNEvents;
0111     };
0112 
0113     /**\struct InputFile
0114 
0115       Description: Holds information about an InputFile.
0116 
0117       Usage: The struct InputFile is a collection of the information that
0118       Data Handling wishes to accumulate about the use of a file that has
0119       been opened for input.
0120       */
0121 
0122     struct InputFile {
0123       typedef std::vector<std::string> StringVector;
0124 
0125       std::string logicalFileName;
0126       std::string physicalFileName;
0127       std::string catalog;
0128       std::string inputType;             // primaryFiles, secondaryFiles, mixingFiles
0129       std::string inputSourceClassName;  // class which created the file
0130       std::string moduleLabel;           // name of class instance
0131       std::string guid;
0132       std::size_t numEventsRead;
0133       StringVector branchNames;
0134       std::map<RunNumber, RunReport> runReports;
0135       bool fileHasBeenClosed;
0136       std::set<std::string> fastClonedBranches;
0137     };
0138 
0139     /**\struct OutputFile
0140 
0141       Description: Holds information about an OutputFile.
0142 
0143       Usage: The struct OutputFile is a collection of the information that
0144       Data Handling wishes to accumulate about the use of a file that has
0145       been opened for output.
0146       */
0147 
0148     struct OutputFile {
0149       typedef InputFile::StringVector StringVector;
0150 
0151       std::string logicalFileName;
0152       std::string physicalFileName;
0153       std::string catalog;
0154       std::string outputModuleClassName;
0155       std::string moduleLabel;  // name of class instance
0156       std::string guid;
0157       std::string dataType;
0158       std::string branchHash;
0159       std::size_t numEventsWritten;
0160       StringVector branchNames;
0161       std::vector<Token> contributingInputs;
0162       oneapi::tbb::concurrent_vector<Token> contributingInputsSecSource;
0163       std::map<std::string, bool> fastCopyingInputs;
0164       std::map<RunNumber, RunReport> runReports;
0165       bool fileHasBeenClosed;
0166     };
0167 
0168     class AtomicLongLong {
0169     public:
0170       AtomicLongLong() : value_(0) {}
0171       AtomicLongLong(AtomicLongLong const& r) : value_(r.value_.load()) {}
0172       std::atomic<long long>& value() { return value_; }
0173       std::atomic<long long> const& value() const { return value_; }
0174 
0175     private:
0176       std::atomic<long long> value_;
0177     };
0178 
0179     struct JobReportImpl {
0180       JobReportImpl& operator=(JobReportImpl const&) = delete;
0181       JobReportImpl(JobReportImpl const&) = delete;
0182 
0183       InputFile& getInputFileForToken(InputType inputType, Token t);
0184       OutputFile& getOutputFileForToken(Token t);
0185 
0186       /*
0187          * Add the input file token provided to every output
0188          * file currently available.
0189          * Used whenever a new input file is opened, it's token
0190          * is added to all open output files as a contributor
0191          */
0192       void insertInputForOutputs(InputType inputType, Token t);
0193 
0194       /*
0195          * Associate a Lumi Section to all open output files
0196          *
0197          */
0198       void associateLumiSection(JobReport::Token token,
0199                                 unsigned int runNumber,
0200                                 unsigned int lumiSection,
0201                                 unsigned long nEvents);
0202 
0203       /*
0204          * Associate a Lumi Section to all open input files
0205          *
0206          */
0207       void associateInputLumiSection(unsigned int runNumber, unsigned int lumiSection);
0208 
0209       /*
0210          * Associate a run to all open output files
0211          */
0212       void associateRun(JobReport::Token token, unsigned int runNumber);
0213 
0214       /*
0215          * Associate a run to all open output files
0216          */
0217       void associateInputRun(unsigned int runNumber);
0218 
0219       /*
0220          * Write an InputFile object to the Logger
0221          * Generate XML string for InputFile instance and dispatch to
0222          * job report via MessageLogger
0223          */
0224       void writeInputFile(InputFile const& f);
0225 
0226       /*
0227          * Write an OutputFile object to the Logger
0228          * Generate an XML string for the OutputFile provided and
0229          * dispatch it to the logger
0230          * Contributing input tokens are resolved to the input LFN and PFN
0231          *
0232          * TODO: We have not yet addressed the issue where we cleanup not
0233          * contributing input files.
0234          * Also, it is possible to get fake input to output file mappings
0235          * if an input file is open already when a new output file is opened
0236          * but the input gets closed without contributing events to the
0237          * output file due to filtering etc.
0238          *
0239          */
0240       void writeOutputFile(OutputFile const& f);
0241 
0242       /*
0243          *  Flush all open files to logger in event of a problem.
0244          */
0245       void flushFiles(void);
0246 
0247       JobReportImpl(std::ostream* iOst) : printedReadBranches_(false), ost_(iOst) {}
0248 
0249       std::ostream const* ost() const { return get_underlying_safe(ost_); }
0250       std::ostream*& ost() { return get_underlying_safe(ost_); }
0251 
0252       std::vector<InputFile> inputFiles_;
0253       oneapi::tbb::concurrent_vector<InputFile> inputFilesSecSource_;
0254       oneapi::tbb::concurrent_vector<OutputFile> outputFiles_;
0255       std::map<std::string, long long> readBranches_;
0256       std::map<std::string, long long> readBranchesSecFile_;
0257       oneapi::tbb::concurrent_unordered_map<std::string, AtomicLongLong> readBranchesSecSource_;
0258       std::atomic<bool> printedReadBranches_;
0259       std::vector<InputFile>::size_type lastOpenedPrimaryInputFile_;
0260       edm::propagate_const<std::ostream*> ost_;
0261     };
0262 
0263     JobReport();
0264     //Does not take ownership of pointer
0265     JobReport(std::ostream* outputStream);
0266 
0267     JobReport& operator=(JobReport const&) = delete;
0268     JobReport(JobReport const&) = delete;
0269 
0270     ~JobReport();
0271 
0272     /// Report that an input file has been opened.
0273     /// The returned Token should be used for later identification
0274     /// of this file.
0275     Token inputFileOpened(std::string const& physicalFileName,
0276                           std::string const& logicalFileName,
0277                           std::string const& catalog,
0278                           std::string const& inputType,
0279                           std::string const& inputSourceClassName,
0280                           std::string const& moduleLabel,
0281                           std::string const& guid,
0282                           std::vector<std::string> const& branchNames);
0283 
0284     /// Report that an event has been read from
0285     /// the file identified by the given Token.
0286     void eventReadFromFile(InputType inputType, Token fileToken);
0287 
0288     ///
0289     /// Report the data type of a file after it has been opened
0290     /// Needed since the data type isn't known until an event has been
0291     /// read
0292     // CMS-THREADING Current implementation requires an instance of an
0293     // OuputModule run on only one thread at a time.
0294     void reportDataType(Token fileToken, std::string const& dataType);
0295 
0296     /// Report that the input file identified by the given Token has
0297     /// been closed. An exception will be thrown if the given Token
0298     /// was not obtained from inputFileOpened.
0299     void inputFileClosed(InputType inputType, Token fileToken);
0300 
0301     /// Report that an output file has been opened.
0302     /// The returned Token should be used for later identification
0303     /// of this file.
0304     Token outputFileOpened(std::string const& physicalFileName,
0305                            std::string const& logicalFileName,
0306                            std::string const& catalog,
0307                            std::string const& outputModuleClassName,
0308                            std::string const& moduleLabel,
0309                            std::string const& guid,
0310                            std::string const& dataType,
0311                            std::string const& branchHash,
0312                            std::vector<std::string> const& branchNames);
0313 
0314     /// Report that the event with the given id has been written to
0315     /// the file identified by the given Token.
0316     // CMS-THREADING Current implementation requires an instance of an
0317     // OuputModule run on only one thread at a time.
0318     void eventWrittenToFile(Token fileToken, RunNumber_t run, EventNumber_t event);
0319 
0320     /// Report that the output file identified by the given Token has
0321     /// been closed. An exception will be thrown if the given Token
0322     /// was not obtained from outputFileOpened.
0323     void outputFileClosed(Token fileToken);
0324 
0325     void reportSkippedEvent(RunNumber_t run, EventNumber_t event);
0326 
0327     /// API for reporting a Run to the job report.
0328     /// for output files, call only if Run is written to
0329     /// the output file
0330     ///
0331     void reportRunNumber(JobReport::Token token, unsigned int run);
0332 
0333     /// API for reporting a Lumi Section to the job report.
0334     /// for output files, call only if lumi section is written to
0335     /// the output file
0336     ///
0337     void reportLumiSection(JobReport::Token token, unsigned int run, unsigned int lumiSectId, unsigned long nEvents = 0);
0338 
0339     ///
0340     /// API for reporting a Lumi Section to the job report.
0341     /// for input files, call only if lumi section is physically read
0342     /// from the input file
0343     ///
0344     void reportInputLumiSection(unsigned int run, unsigned int lumiSectId);
0345 
0346     ///
0347     /// API to report a run read from input
0348     ///
0349     void reportInputRunNumber(unsigned int run);
0350 
0351     ///
0352     /// Report an exception, providing details of the problem as
0353     /// a short description (Eg "XXXError") and a long description
0354     /// (Eg "XXX crashed because...")
0355     /// Also overload this method to accept an optional standard exit code
0356     void reportError(std::string const& shortDesc, std::string const& longDesc, int const& exitCode);
0357 
0358     ///
0359     /// Report Skipped File
0360     ///
0361     /// Report that a file has been skipped due to it not being
0362     /// found.
0363     void reportSkippedFile(std::string const& pfn, std::string const& lfn);
0364 
0365     void reportFallbackAttempt(std::string const& pfn, std::string const& lfn, std::string const& err);
0366 
0367     void reportAnalysisFile(std::string const& fileName, std::map<std::string, std::string> const& fileData);
0368 
0369     ///
0370     /// Report Memory statistics
0371     /// ALTERNATE FORM - USING THIS MAY NECESSITATE CHANGES IN PARSING XML!
0372     /// Invoked by the Memory service to send an end of job
0373     /// summary about memory usage for inclusion in the job report
0374     ///
0375     void reportMemoryInfo(std::vector<std::string> const& memoryData);
0376 
0377     ///
0378     /// Report Message statistics
0379     /// Invoked by the MessageLogger service to send an end of job
0380     /// summary about numbers of various categories messages issued
0381     /// for inclusion in the job report
0382     ///
0383     void reportMessageInfo(std::map<std::string, double> const& messageData);
0384 
0385     ///
0386     /// Report read branches
0387     /// Invoked by the source that reads ROOT/EDM files at the end of a job
0388     /// to report which product branches in the Events tree have been read,
0389     /// with a count of the number of events for which the branch was read.
0390     ///
0391     void reportReadBranches();
0392 
0393     ///  Inform the job report that a branch has been read.
0394     void reportReadBranch(InputType inputType, std::string const& branchName);
0395 
0396     ///  Inform the job report that branches have been fast Cloned.
0397     void reportFastClonedBranches(std::set<std::string> const& fastClonedBranches, long long nEvents);
0398 
0399     ///
0400     /// Report the name of the random engine persistency file
0401     ///
0402     void reportRandomStateFile(std::string const& name);
0403 
0404     /*
0405        * Report information about fast copying. Called for each open output file
0406        * whenever an input file is opened.
0407        */
0408     void reportFastCopyingStatus(Token t, std::string const& inputFileName, bool fastCopying);
0409 
0410     ///
0411     /// Performance Reports
0412     ///
0413     /// Two categories:  Summary for entire job and module
0414     /// for a specific module
0415     /// Each one requires a performance metric class such
0416     /// as Timing, Memory, CPU, Trigger etc.
0417     void reportPerformanceSummary(std::string const& metricClass, std::map<std::string, std::string> const& metrics);
0418 
0419     void reportPerformanceForModule(std::string const& metricClass,
0420                                     std::string const& moduleName,
0421                                     std::map<std::string, std::string> const& metrics);
0422 
0423     /// debug/test util
0424     std::string dumpFiles(void);
0425 
0426   protected:
0427     edm::propagate_const<std::unique_ptr<JobReportImpl>>& impl() { return impl_; }
0428 
0429   private:
0430     edm::propagate_const<std::unique_ptr<JobReportImpl>> impl_;
0431     std::mutex write_mutex;
0432   };
0433 
0434   std::ostream& operator<<(std::ostream& os, JobReport::InputFile const& f);
0435   std::ostream& operator<<(std::ostream& os, JobReport::OutputFile const& f);
0436 }  // namespace edm
0437 #endif