File indexing completed on 2024-04-06 12:18:45
0001
0002 #include <algorithm>
0003 #include <chrono>
0004 #include <ctime>
0005
0006
0007 #include <fmt/printf.h>
0008
0009
0010 #include "DQMServices/Core/interface/DQMStore.h"
0011 #include "FWCore/ParameterSet/interface/EmptyGroupDescription.h"
0012 #include "FWCore/Utilities/interface/TimeOfDay.h"
0013 #include "HLTrigger/Timer/interface/processor_model.h"
0014 #include "ThroughputService.h"
0015
0016 using namespace std::literals;
0017
0018
0019 void ThroughputService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0020 edm::ParameterSetDescription desc;
0021 desc.addUntracked<uint32_t>("eventRange", 10000)->setComment("Preallocate a buffer for N events");
0022 desc.addUntracked<uint32_t>("eventResolution", 1)->setComment("Sample the processing time every N events");
0023 desc.addUntracked<bool>("printEventSummary", false);
0024 desc.ifValue(edm::ParameterDescription<bool>("enableDQM", true, false),
0025
0026 true >> (edm::ParameterDescription<bool>("dqmPathByProcesses", false, false) and
0027 edm::ParameterDescription<std::string>("dqmPath", "HLT/Throughput", false) and
0028 edm::ParameterDescription<double>("timeRange", 60000.0, false) and
0029 edm::ParameterDescription<double>("timeResolution", 10.0, false)) or
0030
0031 false >> edm::EmptyGroupDescription());
0032 descriptions.add("ThroughputService", desc);
0033 }
0034
0035 ThroughputService::ThroughputService(const edm::ParameterSet& config, edm::ActivityRegistry& registry)
0036 :
0037 m_startup(std::chrono::system_clock::now()),
0038
0039 m_resolution(config.getUntrackedParameter<uint32_t>("eventResolution")),
0040 m_counter(0),
0041 m_events(config.getUntrackedParameter<uint32_t>("eventRange") / m_resolution),
0042 m_print_event_summary(config.getUntrackedParameter<bool>("printEventSummary")),
0043 m_enable_dqm(config.getUntrackedParameter<bool>("enableDQM")),
0044 m_dqm_bynproc(m_enable_dqm ? config.getUntrackedParameter<bool>("dqmPathByProcesses") : false),
0045 m_dqm_path(m_enable_dqm ? config.getUntrackedParameter<std::string>("dqmPath") : ""),
0046 m_time_range(m_enable_dqm ? config.getUntrackedParameter<double>("timeRange") : 0.),
0047 m_time_resolution(m_enable_dqm ? config.getUntrackedParameter<double>("timeResolution") : 0.) {
0048 m_events.clear();
0049 registry.watchPreallocate(this, &ThroughputService::preallocate);
0050 registry.watchPreGlobalBeginRun(this, &ThroughputService::preGlobalBeginRun);
0051 registry.watchPreSourceEvent(this, &ThroughputService::preSourceEvent);
0052 registry.watchPostEvent(this, &ThroughputService::postEvent);
0053 registry.watchPostEndJob(this, &ThroughputService::postEndJob);
0054 }
0055
0056 void ThroughputService::preallocate(edm::service::SystemBounds const& bounds) {
0057 auto concurrent_streams = bounds.maxNumberOfStreams();
0058 auto concurrent_threads = bounds.maxNumberOfThreads();
0059
0060 if (m_enable_dqm and m_dqm_bynproc)
0061 m_dqm_path += fmt::sprintf(
0062 "/Running on %s with %d streams on %d threads", processor_model, concurrent_streams, concurrent_threads);
0063 }
0064
0065 void ThroughputService::preGlobalBeginRun(edm::GlobalContext const& gc) {
0066
0067
0068 if (m_enable_dqm and not edm::Service<DQMStore>().isAvailable()) {
0069
0070 m_enable_dqm = false;
0071 edm::LogWarning("ThroughputService") << "The DQMStore is not avalable, the DQM plots will not be generated";
0072 }
0073
0074 if (m_enable_dqm) {
0075 std::string y_axis_title = fmt::sprintf("events / %g s", m_time_resolution);
0076 unsigned int bins = std::round(m_time_range / m_time_resolution);
0077 double range = bins * m_time_resolution;
0078
0079
0080
0081 auto safe_for_dqm = "/ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-+=_()# "s;
0082 for (auto& c : m_dqm_path)
0083 if (safe_for_dqm.find(c) == std::string::npos)
0084 c = '_';
0085
0086
0087 auto bookTransactionCallback = [&, this](DQMStore::IBooker& booker, DQMStore::IGetter&) {
0088 auto scope = dqm::reco::DQMStore::IBooker::UseRunScope(booker);
0089 booker.setCurrentFolder(m_dqm_path);
0090 m_sourced_events = booker.book1D("throughput_sourced", "Throughput (sourced events)", bins, 0., range);
0091 m_sourced_events->setXTitle("time [s]");
0092 m_sourced_events->setYTitle(y_axis_title);
0093 m_retired_events = booker.book1D("throughput_retired", "Throughput (retired events)", bins, 0., range);
0094 m_retired_events->setXTitle("time [s]");
0095 m_retired_events->setYTitle(y_axis_title);
0096 };
0097
0098
0099 edm::Service<DQMStore>()->meBookerGetter(bookTransactionCallback);
0100 } else {
0101 m_sourced_events = nullptr;
0102 m_retired_events = nullptr;
0103 }
0104 }
0105
0106 void ThroughputService::preSourceEvent(edm::StreamID sid) {
0107 auto timestamp = std::chrono::system_clock::now();
0108 auto interval = std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - m_startup).count();
0109 if (m_enable_dqm) {
0110 m_sourced_events->Fill(interval);
0111 }
0112 }
0113
0114 void ThroughputService::postEvent(edm::StreamContext const& sc) {
0115 auto timestamp = std::chrono::system_clock::now();
0116 auto interval = std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - m_startup).count();
0117 if (m_enable_dqm) {
0118 m_retired_events->Fill(interval);
0119 }
0120 ++m_counter;
0121 if (m_counter % m_resolution == 0) {
0122 m_events.push_back(timestamp);
0123 }
0124 }
0125
0126 void ThroughputService::postEndJob() {
0127 if (m_counter < 2 * m_resolution) {
0128
0129 edm::LogWarning("ThroughputService") << "Not enough events to measure the throughput with a resolution of "
0130 << m_resolution << " events";
0131 return;
0132 }
0133
0134 edm::LogInfo info("ThroughputService");
0135
0136 if (m_print_event_summary) {
0137 for (uint32_t i = 0; i < m_events.size(); ++i) {
0138 info << std::setw(8) << (i + 1) * m_resolution << ", " << std::setprecision(6) << edm::TimeOfDay(m_events[i])
0139 << "\n";
0140 }
0141 info << '\n';
0142 }
0143
0144
0145 uint32_t blocks = m_counter / m_resolution - 1;
0146 std::vector<double> delta(blocks);
0147 for (uint32_t i = 0; i < blocks; ++i) {
0148 delta[i] = std::chrono::duration_cast<std::chrono::duration<double>>(m_events[i + 1] - m_events[i]).count();
0149 }
0150
0151 double time_avg = TMath::Mean(delta.begin(), delta.begin() + blocks);
0152 double time_dev = TMath::StdDev(delta.begin(), delta.begin() + blocks);
0153
0154 double throughput_avg = double(m_resolution) / time_avg;
0155 double throughput_dev = double(m_resolution) * time_dev / time_avg / time_avg;
0156
0157 info << "Average throughput: " << throughput_avg << " ± " << throughput_dev << " ev/s";
0158 }
0159
0160
0161 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0162 DEFINE_FWK_SERVICE(ThroughputService);