Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:18:45

0001 // C++ headers
0002 #include <algorithm>
0003 #include <chrono>
0004 #include <ctime>
0005 
0006 // {fmt} headers
0007 #include <fmt/printf.h>
0008 
0009 // CMSSW headers
0010 #include "DQMServices/Core/interface/DQMStore.h"
0011 #include "FWCore/ParameterSet/interface/EmptyGroupDescription.h"
0012 #include "FWCore/Utilities/interface/TimeOfDay.h"
0013 #include "HLTrigger/Timer/interface/processor_model.h"
0014 #include "ThroughputService.h"
0015 
0016 using namespace std::literals;
0017 
0018 // describe the module's configuration
0019 void ThroughputService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0020   edm::ParameterSetDescription desc;
0021   desc.addUntracked<uint32_t>("eventRange", 10000)->setComment("Preallocate a buffer for N events");
0022   desc.addUntracked<uint32_t>("eventResolution", 1)->setComment("Sample the processing time every N events");
0023   desc.addUntracked<bool>("printEventSummary", false);
0024   desc.ifValue(edm::ParameterDescription<bool>("enableDQM", true, false),  // "false" means untracked
0025                // parameters if "enableDQM" is "true"
0026                true >> (edm::ParameterDescription<bool>("dqmPathByProcesses", false, false) and
0027                         edm::ParameterDescription<std::string>("dqmPath", "HLT/Throughput", false) and
0028                         edm::ParameterDescription<double>("timeRange", 60000.0, false) and
0029                         edm::ParameterDescription<double>("timeResolution", 10.0, false)) or
0030                    // parameters if "enableDQM" is "false"
0031                    false >> edm::EmptyGroupDescription());
0032   descriptions.add("ThroughputService", desc);
0033 }
0034 
0035 ThroughputService::ThroughputService(const edm::ParameterSet& config, edm::ActivityRegistry& registry)
0036     :  // startup time
0037       m_startup(std::chrono::system_clock::now()),
0038       // configuration
0039       m_resolution(config.getUntrackedParameter<uint32_t>("eventResolution")),
0040       m_counter(0),
0041       m_events(config.getUntrackedParameter<uint32_t>("eventRange") / m_resolution),  // allocate initial size
0042       m_print_event_summary(config.getUntrackedParameter<bool>("printEventSummary")),
0043       m_enable_dqm(config.getUntrackedParameter<bool>("enableDQM")),
0044       m_dqm_bynproc(m_enable_dqm ? config.getUntrackedParameter<bool>("dqmPathByProcesses") : false),
0045       m_dqm_path(m_enable_dqm ? config.getUntrackedParameter<std::string>("dqmPath") : ""),
0046       m_time_range(m_enable_dqm ? config.getUntrackedParameter<double>("timeRange") : 0.),
0047       m_time_resolution(m_enable_dqm ? config.getUntrackedParameter<double>("timeResolution") : 0.) {
0048   m_events.clear();  // erases all elements, but does not free internal arrays
0049   registry.watchPreallocate(this, &ThroughputService::preallocate);
0050   registry.watchPreGlobalBeginRun(this, &ThroughputService::preGlobalBeginRun);
0051   registry.watchPreSourceEvent(this, &ThroughputService::preSourceEvent);
0052   registry.watchPostEvent(this, &ThroughputService::postEvent);
0053   registry.watchPostEndJob(this, &ThroughputService::postEndJob);
0054 }
0055 
0056 void ThroughputService::preallocate(edm::service::SystemBounds const& bounds) {
0057   auto concurrent_streams = bounds.maxNumberOfStreams();
0058   auto concurrent_threads = bounds.maxNumberOfThreads();
0059 
0060   if (m_enable_dqm and m_dqm_bynproc)
0061     m_dqm_path += fmt::sprintf(
0062         "/Running on %s with %d streams on %d threads", processor_model, concurrent_streams, concurrent_threads);
0063 }
0064 
0065 void ThroughputService::preGlobalBeginRun(edm::GlobalContext const& gc) {
0066   // if the DQMStore is available, book the DQM histograms
0067   // check that the DQMStore service is available
0068   if (m_enable_dqm and not edm::Service<DQMStore>().isAvailable()) {
0069     // the DQMStore is not available, disable all DQM plots
0070     m_enable_dqm = false;
0071     edm::LogWarning("ThroughputService") << "The DQMStore is not avalable, the DQM plots will not be generated";
0072   }
0073 
0074   if (m_enable_dqm) {
0075     std::string y_axis_title = fmt::sprintf("events / %g s", m_time_resolution);
0076     unsigned int bins = std::round(m_time_range / m_time_resolution);
0077     double range = bins * m_time_resolution;
0078 
0079     // clean characters that are deemed unsafe for DQM
0080     // see the definition of `s_safe` in DQMServices/Core/src/DQMStore.cc
0081     auto safe_for_dqm = "/ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-+=_()# "s;
0082     for (auto& c : m_dqm_path)
0083       if (safe_for_dqm.find(c) == std::string::npos)
0084         c = '_';
0085 
0086     // define a callback that can book the histograms
0087     auto bookTransactionCallback = [&, this](DQMStore::IBooker& booker, DQMStore::IGetter&) {
0088       auto scope = dqm::reco::DQMStore::IBooker::UseRunScope(booker);
0089       booker.setCurrentFolder(m_dqm_path);
0090       m_sourced_events = booker.book1D("throughput_sourced", "Throughput (sourced events)", bins, 0., range);
0091       m_sourced_events->setXTitle("time [s]");
0092       m_sourced_events->setYTitle(y_axis_title);
0093       m_retired_events = booker.book1D("throughput_retired", "Throughput (retired events)", bins, 0., range);
0094       m_retired_events->setXTitle("time [s]");
0095       m_retired_events->setYTitle(y_axis_title);
0096     };
0097 
0098     // book MonitorElement's for this run
0099     edm::Service<DQMStore>()->meBookerGetter(bookTransactionCallback);
0100   } else {
0101     m_sourced_events = nullptr;
0102     m_retired_events = nullptr;
0103   }
0104 }
0105 
0106 void ThroughputService::preSourceEvent(edm::StreamID sid) {
0107   auto timestamp = std::chrono::system_clock::now();
0108   auto interval = std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - m_startup).count();
0109   if (m_enable_dqm) {
0110     m_sourced_events->Fill(interval);
0111   }
0112 }
0113 
0114 void ThroughputService::postEvent(edm::StreamContext const& sc) {
0115   auto timestamp = std::chrono::system_clock::now();
0116   auto interval = std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - m_startup).count();
0117   if (m_enable_dqm) {
0118     m_retired_events->Fill(interval);
0119   }
0120   ++m_counter;
0121   if (m_counter % m_resolution == 0) {
0122     m_events.push_back(timestamp);
0123   }
0124 }
0125 
0126 void ThroughputService::postEndJob() {
0127   if (m_counter < 2 * m_resolution) {
0128     // not enough mesurements to estimate the throughput
0129     edm::LogWarning("ThroughputService") << "Not enough events to measure the throughput with a resolution of "
0130                                          << m_resolution << " events";
0131     return;
0132   }
0133 
0134   edm::LogInfo info("ThroughputService");
0135 
0136   if (m_print_event_summary) {
0137     for (uint32_t i = 0; i < m_events.size(); ++i) {
0138       info << std::setw(8) << (i + 1) * m_resolution << ", " << std::setprecision(6) << edm::TimeOfDay(m_events[i])
0139            << "\n";
0140     }
0141     info << '\n';
0142   }
0143 
0144   // measure the time to process each block of m_resolution events
0145   uint32_t blocks = m_counter / m_resolution - 1;
0146   std::vector<double> delta(blocks);
0147   for (uint32_t i = 0; i < blocks; ++i) {
0148     delta[i] = std::chrono::duration_cast<std::chrono::duration<double>>(m_events[i + 1] - m_events[i]).count();
0149   }
0150   // measure the average and standard deviation of the time to process m_resolution
0151   double time_avg = TMath::Mean(delta.begin(), delta.begin() + blocks);
0152   double time_dev = TMath::StdDev(delta.begin(), delta.begin() + blocks);
0153   // compute the throughput and its standard deviation across the job
0154   double throughput_avg = double(m_resolution) / time_avg;
0155   double throughput_dev = double(m_resolution) * time_dev / time_avg / time_avg;
0156 
0157   info << "Average throughput: " << throughput_avg << " ± " << throughput_dev << " ev/s";
0158 }
0159 
0160 // declare ThroughputService as a framework Service
0161 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0162 DEFINE_FWK_SERVICE(ThroughputService);