File indexing completed on 2023-04-08 00:12:55
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include <memory>
0013
0014 #include <vector>
0015 #include <atomic>
0016 #include <chrono>
0017 #include <iostream>
0018
0019 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0021 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0022 #include "FWCore/Utilities/interface/CPUTimer.h"
0023 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0024 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0025 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0026 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0027
0028 namespace edm {
0029 namespace service {
0030 class ConcurrentModuleTimer {
0031 public:
0032 ConcurrentModuleTimer(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iAR);
0033 ~ConcurrentModuleTimer();
0034 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0035
0036 private:
0037 void start();
0038 void stop();
0039
0040 bool trackModule(ModuleCallingContext const& iContext) const;
0041 std::unique_ptr<std::atomic<std::chrono::steady_clock::rep>[]> m_timeSums;
0042 std::vector<std::string> m_modulesToExclude;
0043 std::vector<unsigned int> m_excludedModuleIds;
0044 std::chrono::steady_clock::time_point m_time;
0045 unsigned int m_nTimeSums = 0;
0046 unsigned int m_nModules;
0047 unsigned int m_maxNModules = 0;
0048 const unsigned int m_padding;
0049 std::atomic<bool> m_spinLock;
0050 bool m_startedTiming;
0051 const bool m_excludeSource;
0052 const bool m_trackGlobalBeginRun;
0053 };
0054 }
0055 }
0056
0057 using namespace edm::service;
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073 ConcurrentModuleTimer::ConcurrentModuleTimer(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iReg)
0074 : m_modulesToExclude(iConfig.getUntrackedParameter<std::vector<std::string>>("modulesToExclude")),
0075 m_time(),
0076 m_nModules(0),
0077 m_padding(iConfig.getUntrackedParameter<unsigned int>("padding")),
0078 m_spinLock{false},
0079 m_startedTiming(false),
0080 m_excludeSource(iConfig.getUntrackedParameter<bool>("excludeSource")),
0081 m_trackGlobalBeginRun(iConfig.getUntrackedParameter<bool>("trackGlobalBeginRun")) {
0082 if (not m_modulesToExclude.empty()) {
0083 iReg.watchPreModuleConstruction([this](ModuleDescription const& iMod) {
0084 for (auto const& name : m_modulesToExclude) {
0085 if (iMod.moduleLabel() == name) {
0086 m_excludedModuleIds.push_back(iMod.id());
0087 break;
0088 }
0089 }
0090 });
0091 iReg.watchPreModuleDestruction([this](ModuleDescription const& iMod) {
0092 auto found = std::find(m_excludedModuleIds.begin(), m_excludedModuleIds.end(), iMod.id());
0093 if (found != m_excludedModuleIds.end()) {
0094 m_excludedModuleIds.erase(found);
0095 }
0096 });
0097 iReg.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const& iContext) {
0098 if (trackModule(iContext)) {
0099 start();
0100 }
0101 });
0102 iReg.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const& iContext) {
0103 if (trackModule(iContext)) {
0104 stop();
0105 }
0106 });
0107
0108 iReg.watchPreModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0109 if (trackModule(iContext)) {
0110 if (iContext.state() == ModuleCallingContext::State::kRunning) {
0111 stop();
0112 }
0113 }
0114 });
0115 iReg.watchPostModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0116 if (trackModule(iContext)) {
0117 if (iContext.state() == ModuleCallingContext::State::kRunning) {
0118 start();
0119 }
0120 }
0121 });
0122
0123 } else {
0124
0125 iReg.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { start(); });
0126 iReg.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { stop(); });
0127
0128 iReg.watchPreModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0129 if (iContext.state() == ModuleCallingContext::State::kRunning) {
0130 stop();
0131 }
0132 });
0133 iReg.watchPostModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0134 if (iContext.state() == ModuleCallingContext::State::kRunning) {
0135 start();
0136 }
0137 });
0138 if (m_trackGlobalBeginRun) {
0139 iReg.watchPreModuleGlobalBeginRun([this](GlobalContext const&, ModuleCallingContext const&) {
0140 if (not m_startedTiming) {
0141 m_time = std::chrono::steady_clock::now();
0142 m_startedTiming = true;
0143 }
0144
0145 start();
0146 });
0147 iReg.watchPostModuleGlobalBeginRun([this](GlobalContext const&, ModuleCallingContext const&) { stop(); });
0148 }
0149 }
0150
0151 iReg.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0152 m_nTimeSums = iBounds.maxNumberOfThreads() + 1 + m_padding;
0153 m_timeSums = std::make_unique<std::atomic<std::chrono::steady_clock::rep>[]>(m_nTimeSums);
0154 for (unsigned int i = 0; i < m_nTimeSums; ++i) {
0155 m_timeSums[i] = 0;
0156 }
0157 });
0158
0159 iReg.watchPreSourceEvent([this](StreamID) {
0160 if (not m_startedTiming) {
0161 m_time = std::chrono::steady_clock::now();
0162 m_startedTiming = true;
0163 }
0164 if (not m_excludeSource) {
0165 start();
0166 }
0167 });
0168 if (not m_excludeSource) {
0169 iReg.watchPostSourceEvent([this](StreamID) { stop(); });
0170 }
0171 }
0172
0173 ConcurrentModuleTimer::~ConcurrentModuleTimer() {
0174 std::cout << "Maximum concurrent running modules: " << m_maxNModules << std::endl;
0175 std::cout << "Fraction of time running n Modules simultaneously" << std::endl;
0176 for (unsigned int i = 0; i < m_nTimeSums; ++i) {
0177 std::cout << i << " " << m_timeSums[i] / double(m_timeSums[0]) << " " << m_timeSums[i] << std::endl;
0178 }
0179 }
0180
0181
0182
0183
0184
0185
0186
0187
0188
0189
0190
0191
0192
0193
0194
0195
0196
0197
0198
0199
0200
0201 void ConcurrentModuleTimer::start() {
0202 auto const newTime = std::chrono::steady_clock::now();
0203 std::chrono::steady_clock::time_point oldTime;
0204 bool expected = false;
0205 unsigned int nModules;
0206 while (not m_spinLock.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
0207 expected = false;
0208 }
0209 {
0210 oldTime = m_time;
0211 m_time = newTime;
0212 nModules = ++m_nModules;
0213 if (nModules > m_maxNModules) {
0214 m_maxNModules = nModules;
0215 }
0216 m_spinLock.store(false, std::memory_order_release);
0217 }
0218 assert(nModules < m_nTimeSums);
0219 auto diff = newTime - oldTime;
0220 for (unsigned int i = 0; i < nModules; ++i) {
0221 m_timeSums[i].fetch_add(diff.count());
0222 }
0223 }
0224
0225 void ConcurrentModuleTimer::stop() {
0226 auto const newTime = std::chrono::steady_clock::now();
0227 std::chrono::steady_clock::time_point oldTime;
0228 bool expected = false;
0229 unsigned int nModules;
0230 while (not m_spinLock.compare_exchange_weak(expected, true, std::memory_order_acq_rel)) {
0231 expected = false;
0232 }
0233 {
0234 oldTime = m_time;
0235 m_time = newTime;
0236 nModules = m_nModules--;
0237 m_spinLock.store(false, std::memory_order_release);
0238 }
0239 assert(nModules < m_nTimeSums);
0240 auto diff = newTime - oldTime;
0241 for (unsigned int i = 0; i <= nModules; ++i) {
0242 m_timeSums[i].fetch_add(diff.count());
0243 }
0244 }
0245
0246
0247
0248
0249 bool ConcurrentModuleTimer::trackModule(ModuleCallingContext const& iContext) const {
0250 auto modId = iContext.moduleDescription()->id();
0251 for (auto const id : m_excludedModuleIds) {
0252 if (modId == id) {
0253 return false;
0254 }
0255 }
0256 return true;
0257 }
0258
0259
0260
0261
0262 void ConcurrentModuleTimer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0263 edm::ParameterSetDescription desc;
0264 desc.addUntracked<std::vector<std::string>>("modulesToExclude", std::vector<std::string>{})
0265 ->setComment("Module labels to exclude from the timing measurements");
0266 desc.addUntracked<bool>("excludeSource", false)->setComment("Exclude the time the source is running");
0267 desc.addUntracked<unsigned int>("padding", 0)
0268 ->setComment(
0269 "[Expert use only] Extra possible concurrent modules beyond thread count.\n Only useful in debugging "
0270 "possible framework scheduling problems.");
0271 desc.addUntracked<bool>("trackGlobalBeginRun", false)
0272 ->setComment("Check for concurrent modules during global begin run");
0273 descriptions.add("ConcurrentModuleTimer", desc);
0274 }
0275
0276 DEFINE_FWK_SERVICE(ConcurrentModuleTimer);