1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
// C++ headers
#include <algorithm>
#include <chrono>
#include <ctime>
// {fmt} headers
#include <fmt/printf.h>
// CMSSW headers
#include "DQMServices/Core/interface/DQMStore.h"
#include "FWCore/ParameterSet/interface/EmptyGroupDescription.h"
#include "FWCore/Utilities/interface/TimeOfDay.h"
#include "HLTrigger/Timer/interface/processor_model.h"
#include "ThroughputService.h"
using namespace std::literals;
// describe the module's configuration
void ThroughputService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.addUntracked<uint32_t>("eventRange", 10000)->setComment("Preallocate a buffer for N events");
desc.addUntracked<uint32_t>("eventResolution", 1)->setComment("Sample the processing time every N events");
desc.addUntracked<bool>("printEventSummary", false);
desc.ifValue(edm::ParameterDescription<bool>("enableDQM", true, false), // "false" means untracked
// parameters if "enableDQM" is "true"
true >> (edm::ParameterDescription<bool>("dqmPathByProcesses", false, false) and
edm::ParameterDescription<std::string>("dqmPath", "HLT/Throughput", false) and
edm::ParameterDescription<double>("timeRange", 60000.0, false) and
edm::ParameterDescription<double>("timeResolution", 10.0, false)) or
// parameters if "enableDQM" is "false"
false >> edm::EmptyGroupDescription());
descriptions.add("ThroughputService", desc);
}
ThroughputService::ThroughputService(const edm::ParameterSet& config, edm::ActivityRegistry& registry)
: // startup time
m_startup(std::chrono::system_clock::now()),
// configuration
m_resolution(config.getUntrackedParameter<uint32_t>("eventResolution")),
m_counter(0),
m_events(config.getUntrackedParameter<uint32_t>("eventRange") / m_resolution), // allocate initial size
m_print_event_summary(config.getUntrackedParameter<bool>("printEventSummary")),
m_enable_dqm(config.getUntrackedParameter<bool>("enableDQM")),
m_dqm_bynproc(m_enable_dqm ? config.getUntrackedParameter<bool>("dqmPathByProcesses") : false),
m_dqm_path(m_enable_dqm ? config.getUntrackedParameter<std::string>("dqmPath") : ""),
m_time_range(m_enable_dqm ? config.getUntrackedParameter<double>("timeRange") : 0.),
m_time_resolution(m_enable_dqm ? config.getUntrackedParameter<double>("timeResolution") : 0.) {
m_events.clear(); // erases all elements, but does not free internal arrays
registry.watchPreallocate(this, &ThroughputService::preallocate);
registry.watchPreGlobalBeginRun(this, &ThroughputService::preGlobalBeginRun);
registry.watchPreSourceEvent(this, &ThroughputService::preSourceEvent);
registry.watchPostEvent(this, &ThroughputService::postEvent);
registry.watchPostEndJob(this, &ThroughputService::postEndJob);
}
void ThroughputService::preallocate(edm::service::SystemBounds const& bounds) {
auto concurrent_streams = bounds.maxNumberOfStreams();
auto concurrent_threads = bounds.maxNumberOfThreads();
if (m_enable_dqm and m_dqm_bynproc)
m_dqm_path += fmt::sprintf(
"/Running on %s with %d streams on %d threads", processor_model, concurrent_streams, concurrent_threads);
}
void ThroughputService::preGlobalBeginRun(edm::GlobalContext const& gc) {
// if the DQMStore is available, book the DQM histograms
// check that the DQMStore service is available
if (m_enable_dqm and not edm::Service<DQMStore>().isAvailable()) {
// the DQMStore is not available, disable all DQM plots
m_enable_dqm = false;
edm::LogWarning("ThroughputService") << "The DQMStore is not avalable, the DQM plots will not be generated";
}
if (m_enable_dqm) {
std::string y_axis_title = fmt::sprintf("events / %g s", m_time_resolution);
unsigned int bins = std::round(m_time_range / m_time_resolution);
double range = bins * m_time_resolution;
// clean characters that are deemed unsafe for DQM
// see the definition of `s_safe` in DQMServices/Core/src/DQMStore.cc
auto safe_for_dqm = "/ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-+=_()# "s;
for (auto& c : m_dqm_path)
if (safe_for_dqm.find(c) == std::string::npos)
c = '_';
// define a callback that can book the histograms
auto bookTransactionCallback = [&, this](DQMStore::IBooker& booker, DQMStore::IGetter&) {
auto scope = dqm::reco::DQMStore::IBooker::UseRunScope(booker);
booker.setCurrentFolder(m_dqm_path);
m_sourced_events = booker.book1D("throughput_sourced", "Throughput (sourced events)", bins, 0., range);
m_sourced_events->setXTitle("time [s]");
m_sourced_events->setYTitle(y_axis_title);
m_retired_events = booker.book1D("throughput_retired", "Throughput (retired events)", bins, 0., range);
m_retired_events->setXTitle("time [s]");
m_retired_events->setYTitle(y_axis_title);
};
// book MonitorElement's for this run
edm::Service<DQMStore>()->meBookerGetter(bookTransactionCallback);
} else {
m_sourced_events = nullptr;
m_retired_events = nullptr;
}
}
void ThroughputService::preSourceEvent(edm::StreamID sid) {
auto timestamp = std::chrono::system_clock::now();
auto interval = std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - m_startup).count();
if (m_enable_dqm) {
m_sourced_events->Fill(interval);
}
}
void ThroughputService::postEvent(edm::StreamContext const& sc) {
auto timestamp = std::chrono::system_clock::now();
auto interval = std::chrono::duration_cast<std::chrono::duration<double>>(timestamp - m_startup).count();
if (m_enable_dqm) {
m_retired_events->Fill(interval);
}
++m_counter;
if (m_counter % m_resolution == 0) {
m_events.push_back(timestamp);
}
}
void ThroughputService::postEndJob() {
if (m_counter < 2 * m_resolution) {
// not enough mesurements to estimate the throughput
edm::LogWarning("ThroughputService") << "Not enough events to measure the throughput with a resolution of "
<< m_resolution << " events";
return;
}
edm::LogInfo info("ThroughputService");
if (m_print_event_summary) {
for (uint32_t i = 0; i < m_events.size(); ++i) {
info << std::setw(8) << (i + 1) * m_resolution << ", " << std::setprecision(6) << edm::TimeOfDay(m_events[i])
<< "\n";
}
info << '\n';
}
// measure the time to process each block of m_resolution events
uint32_t blocks = m_counter / m_resolution - 1;
std::vector<double> delta(blocks);
for (uint32_t i = 0; i < blocks; ++i) {
delta[i] = std::chrono::duration_cast<std::chrono::duration<double>>(m_events[i + 1] - m_events[i]).count();
}
// measure the average and standard deviation of the time to process m_resolution
double time_avg = TMath::Mean(delta.begin(), delta.begin() + blocks);
double time_dev = TMath::StdDev(delta.begin(), delta.begin() + blocks);
// compute the throughput and its standard deviation across the job
double throughput_avg = double(m_resolution) / time_avg;
double throughput_dev = double(m_resolution) * time_dev / time_avg / time_avg;
info << "Average throughput: " << throughput_avg << " ± " << throughput_dev << " ev/s";
}
// declare ThroughputService as a framework Service
#include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
DEFINE_FWK_SERVICE(ThroughputService);
|