File indexing completed on 2023-03-17 11:03:42
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0012 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0013 #include "FWCore/Framework/interface/ComponentDescription.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0019 #include "FWCore/ServiceRegistry/interface/Service.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0021 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0022 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0023 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0024 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0025 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0026 #include "FWCore/Utilities/interface/Algorithms.h"
0027 #include "FWCore/Utilities/interface/OStreamColumn.h"
0028 #include "FWCore/Utilities/interface/Exception.h"
0029 #include "FWCore/Utilities/interface/StdPairHasher.h"
0030 #include "oneapi/tbb/concurrent_unordered_map.h"
0031
0032 #include <atomic>
0033 #include <chrono>
0034 #include <iomanip>
0035 #include <iostream>
0036 #include <sstream>
0037
0038 namespace {
0039
0040 using duration_t = std::chrono::microseconds;
0041 using clock_t = std::chrono::steady_clock;
0042 auto now = clock_t::now;
0043
0044 inline auto stream_id(edm::StreamContext const& cs) { return cs.streamID().value(); }
0045
0046 inline auto module_id(edm::ModuleCallingContext const& mcc) { return mcc.moduleDescription()->id(); }
0047
0048 inline auto module_id(edm::ESModuleCallingContext const& mcc) { return mcc.componentDescription()->id_; }
0049
0050
0051 class StallStatistics {
0052 public:
0053
0054
0055
0056 StallStatistics() = default;
0057
0058 std::string const& label() const { return label_; }
0059 unsigned numberOfStalls() const { return stallCounter_; }
0060
0061 using rep_t = duration_t::rep;
0062
0063 duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
0064 duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
0065
0066
0067 void setLabel(std::string const& label) { label_ = label; }
0068
0069 void update(duration_t const ms) {
0070 ++stallCounter_;
0071 auto const thisTime = ms.count();
0072 totalTime_ += thisTime;
0073 rep_t max{maxTime_};
0074 while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime))
0075 ;
0076 }
0077
0078 private:
0079 std::string label_{};
0080 std::atomic<unsigned> stallCounter_{};
0081 std::atomic<rep_t> totalTime_{};
0082 std::atomic<rep_t> maxTime_{};
0083 };
0084
0085
0086
0087 template <typename T>
0088 std::enable_if_t<std::is_integral<T>::value> concatenate(std::ostream& os, T const t) {
0089 os << ' ' << t;
0090 }
0091
0092 template <typename H, typename... T>
0093 std::enable_if_t<std::is_integral<H>::value> concatenate(std::ostream& os, H const h, T const... t) {
0094 os << ' ' << h;
0095 concatenate(os, t...);
0096 }
0097
0098 enum class step : char {
0099 preSourceEvent = 'S',
0100 postSourceEvent = 's',
0101 preEvent = 'E',
0102 postModuleEventPrefetching = 'p',
0103 preModuleEventAcquire = 'A',
0104 postModuleEventAcquire = 'a',
0105 preModuleEvent = 'M',
0106 preEventReadFromSource = 'R',
0107 postEventReadFromSource = 'r',
0108 postModuleEvent = 'm',
0109 postEvent = 'e',
0110 postESModulePrefetching = 'q',
0111 preESModule = 'N',
0112 postESModule = 'n'
0113 };
0114
0115 enum class Phase : short {
0116 globalEndRun = -4,
0117 streamEndRun = -3,
0118 globalEndLumi = -2,
0119 streamEndLumi = -1,
0120 Event = 0,
0121 streamBeginLumi = 1,
0122 globalBeginLumi = 2,
0123 streamBeginRun = 3,
0124 globalBeginRun = 4,
0125 eventSetupCall = 5
0126 };
0127
0128 std::ostream& operator<<(std::ostream& os, step const s) {
0129 os << static_cast<std::underlying_type_t<step>>(s);
0130 return os;
0131 }
0132
0133 std::ostream& operator<<(std::ostream& os, Phase const s) {
0134 os << static_cast<std::underlying_type_t<Phase>>(s);
0135 return os;
0136 }
0137
0138 template <step S, typename... ARGS>
0139 std::string assembleMessage(ARGS const... args) {
0140 std::ostringstream oss;
0141 oss << S;
0142 concatenate(oss, args...);
0143 oss << '\n';
0144 return oss.str();
0145 }
0146
0147 Phase toTransitionImpl(edm::StreamContext const& iContext) {
0148 using namespace edm;
0149 switch (iContext.transition()) {
0150 case StreamContext::Transition::kBeginRun:
0151 return Phase::streamBeginRun;
0152 case StreamContext::Transition::kBeginLuminosityBlock:
0153 return Phase::streamBeginLumi;
0154 case StreamContext::Transition::kEvent:
0155 return Phase::Event;
0156 case StreamContext::Transition::kEndLuminosityBlock:
0157 return Phase::streamEndLumi;
0158 case StreamContext::Transition::kEndRun:
0159 return Phase::streamEndRun;
0160 default:
0161 break;
0162 }
0163 assert(false);
0164 return Phase::Event;
0165 }
0166
0167 auto toTransition(edm::StreamContext const& iContext) -> std::underlying_type_t<Phase> {
0168 return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0169 }
0170
0171 Phase toTransitionImpl(edm::GlobalContext const& iContext) {
0172 using namespace edm;
0173 switch (iContext.transition()) {
0174 case GlobalContext::Transition::kBeginRun:
0175 return Phase::globalBeginRun;
0176 case GlobalContext::Transition::kBeginLuminosityBlock:
0177 return Phase::globalBeginLumi;
0178 case GlobalContext::Transition::kEndLuminosityBlock:
0179 return Phase::globalEndLumi;
0180 case GlobalContext::Transition::kWriteLuminosityBlock:
0181 return Phase::globalEndLumi;
0182 case GlobalContext::Transition::kEndRun:
0183 return Phase::globalEndRun;
0184 case GlobalContext::Transition::kWriteRun:
0185 return Phase::globalEndRun;
0186 default:
0187 break;
0188 }
0189 assert(false);
0190 return Phase::Event;
0191 }
0192 auto toTransition(edm::GlobalContext const& iContext) -> std::underlying_type_t<Phase> {
0193 return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0194 }
0195
0196 }
0197
0198 namespace edm {
0199 namespace service {
0200
0201 class StallMonitor {
0202 public:
0203 StallMonitor(ParameterSet const&, ActivityRegistry&);
0204 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0205
0206 private:
0207 void preModuleConstruction(edm::ModuleDescription const&);
0208 void preModuleDestruction(edm::ModuleDescription const&);
0209 void postBeginJob();
0210 void preSourceEvent(StreamID);
0211 void postSourceEvent(StreamID);
0212 void preEvent(StreamContext const&);
0213 void preModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0214 void postModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0215 void preModuleEvent(StreamContext const&, ModuleCallingContext const&);
0216 void postModuleEventPrefetching(StreamContext const&, ModuleCallingContext const&);
0217 void preEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0218 void postEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0219 void postModuleEvent(StreamContext const&, ModuleCallingContext const&);
0220 void postEvent(StreamContext const&);
0221 void preModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0222 void postModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0223 void preModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0224 void postModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0225 void postEndJob();
0226
0227 ThreadSafeOutputFileStream file_;
0228 bool const validFile_;
0229 duration_t const stallThreshold_;
0230 decltype(now()) beginTime_{};
0231
0232
0233
0234
0235
0236 using StreamID_value = decltype(std::declval<StreamID>().value());
0237 using ModuleID = decltype(std::declval<ModuleDescription>().id());
0238 oneapi::tbb::concurrent_unordered_map<std::pair<StreamID_value, ModuleID>,
0239 std::pair<decltype(beginTime_), bool>,
0240 edm::StdPairHasher>
0241 stallStart_{};
0242
0243 std::vector<std::string> moduleLabels_{};
0244 std::vector<StallStatistics> moduleStats_{};
0245 std::vector<std::string> esModuleLabels_{};
0246 unsigned int numStreams_;
0247 };
0248
0249 }
0250
0251 }
0252
0253 namespace {
0254 constexpr char const* const filename_default{""};
0255 constexpr double threshold_default{0.1};
0256 std::string const space{" "};
0257 }
0258
0259 using edm::service::StallMonitor;
0260 using namespace std::chrono;
0261
0262 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
0263 : file_{iPS.getUntrackedParameter<std::string>("fileName", filename_default)},
0264 validFile_{file_},
0265 stallThreshold_{
0266 std::chrono::round<duration_t>(duration<double>(iPS.getUntrackedParameter<double>("stallThreshold")))} {
0267 iRegistry.watchPreModuleConstruction(this, &StallMonitor::preModuleConstruction);
0268 iRegistry.watchPreModuleDestruction(this, &StallMonitor::preModuleDestruction);
0269 iRegistry.watchPostBeginJob(this, &StallMonitor::postBeginJob);
0270 iRegistry.watchPostModuleEventPrefetching(this, &StallMonitor::postModuleEventPrefetching);
0271 iRegistry.watchPreModuleEventAcquire(this, &StallMonitor::preModuleEventAcquire);
0272 iRegistry.watchPreModuleEvent(this, &StallMonitor::preModuleEvent);
0273 iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
0274
0275 if (validFile_) {
0276
0277 iRegistry.watchPreSourceEvent(this, &StallMonitor::preSourceEvent);
0278 iRegistry.watchPostSourceEvent(this, &StallMonitor::postSourceEvent);
0279 iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
0280 iRegistry.watchPostModuleEventAcquire(this, &StallMonitor::postModuleEventAcquire);
0281 iRegistry.watchPreEventReadFromSource(this, &StallMonitor::preEventReadFromSource);
0282 iRegistry.watchPostEventReadFromSource(this, &StallMonitor::postEventReadFromSource);
0283 iRegistry.watchPostModuleEvent(this, &StallMonitor::postModuleEvent);
0284 iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
0285
0286 iRegistry.watchPreModuleStreamBeginRun(this, &StallMonitor::preModuleStreamTransition);
0287 iRegistry.watchPostModuleStreamBeginRun(this, &StallMonitor::postModuleStreamTransition);
0288 iRegistry.watchPreModuleStreamEndRun(this, &StallMonitor::preModuleStreamTransition);
0289 iRegistry.watchPostModuleStreamEndRun(this, &StallMonitor::postModuleStreamTransition);
0290
0291 iRegistry.watchPreModuleStreamBeginLumi(this, &StallMonitor::preModuleStreamTransition);
0292 iRegistry.watchPostModuleStreamBeginLumi(this, &StallMonitor::postModuleStreamTransition);
0293 iRegistry.watchPreModuleStreamEndLumi(this, &StallMonitor::preModuleStreamTransition);
0294 iRegistry.watchPostModuleStreamEndLumi(this, &StallMonitor::postModuleStreamTransition);
0295
0296 iRegistry.watchPreModuleGlobalBeginRun(this, &StallMonitor::preModuleGlobalTransition);
0297 iRegistry.watchPostModuleGlobalBeginRun(this, &StallMonitor::postModuleGlobalTransition);
0298 iRegistry.watchPreModuleGlobalEndRun(this, &StallMonitor::preModuleGlobalTransition);
0299 iRegistry.watchPostModuleGlobalEndRun(this, &StallMonitor::postModuleGlobalTransition);
0300 iRegistry.watchPreModuleWriteRun(this, &StallMonitor::preModuleGlobalTransition);
0301 iRegistry.watchPostModuleWriteRun(this, &StallMonitor::postModuleGlobalTransition);
0302
0303 iRegistry.watchPreModuleGlobalBeginLumi(this, &StallMonitor::preModuleGlobalTransition);
0304 iRegistry.watchPostModuleGlobalBeginLumi(this, &StallMonitor::postModuleGlobalTransition);
0305 iRegistry.watchPreModuleGlobalEndLumi(this, &StallMonitor::preModuleGlobalTransition);
0306 iRegistry.watchPostModuleGlobalEndLumi(this, &StallMonitor::postModuleGlobalTransition);
0307 iRegistry.watchPreModuleWriteLumi(this, &StallMonitor::preModuleGlobalTransition);
0308 iRegistry.watchPostModuleWriteLumi(this, &StallMonitor::postModuleGlobalTransition);
0309
0310 iRegistry.postESModuleRegistrationSignal_.connect([this](auto const& iDescription) {
0311 if (esModuleLabels_.size() <= iDescription.id_) {
0312 esModuleLabels_.resize(iDescription.id_ + 1);
0313 }
0314 if (not iDescription.label_.empty()) {
0315 esModuleLabels_[iDescription.id_] = iDescription.label_;
0316 } else {
0317 esModuleLabels_[iDescription.id_] = iDescription.type_;
0318 }
0319 });
0320
0321 iRegistry.preESModuleSignal_.connect([this](auto const&, auto const& context) {
0322 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0323 auto msg = assembleMessage<step::preESModule>(
0324 numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0325 file_.write(std::move(msg));
0326 });
0327 iRegistry.postESModuleSignal_.connect([this](auto const&, auto const& context) {
0328 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0329 auto msg = assembleMessage<step::postESModule>(
0330 numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0331 file_.write(std::move(msg));
0332 });
0333
0334 iRegistry.preallocateSignal_.connect(
0335 [this](service::SystemBounds const& iBounds) { numStreams_ = iBounds.maxNumberOfStreams(); });
0336
0337 std::ostringstream oss;
0338 oss << "# Transition Symbol\n";
0339 oss << "#----------------- ------\n";
0340 oss << "# eventSetupCall " << Phase::eventSetupCall << "\n"
0341 << "# globalBeginRun " << Phase::globalBeginRun << "\n"
0342 << "# streamBeginRun " << Phase::streamBeginRun << "\n"
0343 << "# globalBeginLumi " << Phase::globalBeginLumi << "\n"
0344 << "# streamBeginLumi " << Phase::streamBeginLumi << "\n"
0345 << "# Event " << Phase::Event << "\n"
0346 << "# streamEndLumi " << Phase::streamEndLumi << "\n"
0347 << "# globalEndLumi " << Phase::globalEndLumi << "\n"
0348 << "# streamEndRun " << Phase::streamEndRun << "\n"
0349 << "# globalEndRun " << Phase::globalEndRun << "\n";
0350 oss << "# Step Symbol Entries\n"
0351 << "# -------------------------- ------ ------------------------------------------\n"
0352 << "# preSourceEvent " << step::preSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
0353 << "# postSourceEvent " << step::postSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
0354 << "# preEvent " << step::preEvent
0355 << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0356 << "# postModuleEventPrefetching " << step::postModuleEventPrefetching
0357 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0358 << "# preModuleEventAcquire " << step::preModuleEventAcquire
0359 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0360 << "# postModuleEventAcquire " << step::postModuleEventAcquire
0361 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0362 << "# preModuleTransition " << step::preModuleEvent
0363 << " <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0364 << "# preEventReadFromSource " << step::preEventReadFromSource
0365 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0366 << "# postEventReadFromSource " << step::postEventReadFromSource
0367 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0368 << "# postModuleTransition " << step::postModuleEvent
0369 << " <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0370 << "# postEvent " << step::postEvent
0371 << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0372 << "# postESModulePrefetching " << step::postESModulePrefetching
0373 << " <Stream ID> <ESModule ID> <Transition type> <Time since beginJob (ms)>\n"
0374 << "# preESModuleTransition " << step::preESModule
0375 << " <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n"
0376 << "# postESModuleTransition " << step::postESModule
0377 << " <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n";
0378 file_.write(oss.str());
0379 }
0380 }
0381
0382 void StallMonitor::fillDescriptions(ConfigurationDescriptions& descriptions) {
0383 ParameterSetDescription desc;
0384 desc.addUntracked<std::string>("fileName", filename_default)
0385 ->setComment(
0386 "Name of file to which detailed timing information should be written.\n"
0387 "An empty filename argument (the default) indicates that no extra\n"
0388 "information will be written to a dedicated file, but only the summary\n"
0389 "including stalling-modules information will be logged.");
0390 desc.addUntracked<double>("stallThreshold", threshold_default)
0391 ->setComment(
0392 "Threshold (in seconds) used to classify modules as stalled.\n"
0393 "Microsecond granularity allowed.");
0394 descriptions.add("StallMonitor", desc);
0395 descriptions.setComment(
0396 "This service keeps track of various times in event-processing to determine which modules are stalling.");
0397 }
0398
0399 void StallMonitor::preModuleConstruction(ModuleDescription const& md) {
0400
0401
0402
0403
0404
0405
0406
0407
0408
0409
0410
0411 auto const mid = md.id();
0412 if (mid < moduleLabels_.size()) {
0413 moduleLabels_[mid] = md.moduleLabel();
0414 } else {
0415 moduleLabels_.resize(mid + 1);
0416 moduleLabels_.back() = md.moduleLabel();
0417 }
0418 }
0419
0420 void StallMonitor::preModuleDestruction(ModuleDescription const& md) {
0421
0422
0423 moduleLabels_[md.id()] = "";
0424 }
0425
0426 void StallMonitor::postBeginJob() {
0427
0428
0429
0430
0431 moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
0432 for (std::size_t i{}; i < moduleStats_.size(); ++i) {
0433 moduleStats_[i].setLabel(moduleLabels_[i]);
0434 }
0435
0436 if (validFile_) {
0437 {
0438 std::size_t const width{std::to_string(moduleLabels_.size()).size()};
0439 OStreamColumn col0{"Module ID", width};
0440 std::string const lastCol{"Module label"};
0441
0442 std::ostringstream oss;
0443 oss << "\n# " << col0 << space << lastCol << '\n';
0444 oss << "# " << std::string(col0.width() + space.size() + lastCol.size(), '-') << '\n';
0445
0446 for (std::size_t i{}; i < moduleLabels_.size(); ++i) {
0447 auto const& label = moduleLabels_[i];
0448 if (label.empty())
0449 continue;
0450 oss << "#M " << std::setw(width) << std::left << col0(i) << space << std::left << moduleLabels_[i] << '\n';
0451 }
0452 oss << '\n';
0453 file_.write(oss.str());
0454 }
0455 {
0456 std::size_t const width{std::to_string(esModuleLabels_.size()).size()};
0457 OStreamColumn col0{"ESModule ID", width};
0458 std::string const lastCol{"ESModule label"};
0459
0460 std::ostringstream oss;
0461 oss << "\n# " << col0 << space << lastCol << '\n';
0462 oss << "# " << std::string(col0.width() + space.size() + lastCol.size(), '-') << '\n';
0463
0464 for (std::size_t i{}; i < esModuleLabels_.size(); ++i) {
0465 auto const& label = esModuleLabels_[i];
0466 if (label.empty())
0467 continue;
0468 oss << "#N " << std::setw(width) << std::left << col0(i) << space << std::left << esModuleLabels_[i] << '\n';
0469 }
0470 oss << '\n';
0471 file_.write(oss.str());
0472 }
0473 }
0474
0475
0476 decltype(moduleLabels_)().swap(moduleLabels_);
0477 decltype(esModuleLabels_)().swap(esModuleLabels_);
0478
0479 beginTime_ = now();
0480 }
0481
0482 void StallMonitor::preSourceEvent(StreamID const sid) {
0483 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0484 auto msg = assembleMessage<step::preSourceEvent>(sid.value(), t);
0485 file_.write(std::move(msg));
0486 }
0487
0488 void StallMonitor::postSourceEvent(StreamID const sid) {
0489 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0490 auto msg = assembleMessage<step::postSourceEvent>(sid.value(), t);
0491 file_.write(std::move(msg));
0492 }
0493
0494 void StallMonitor::preEvent(StreamContext const& sc) {
0495 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0496 auto const& eid = sc.eventID();
0497 auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0498 file_.write(std::move(msg));
0499 }
0500
0501 void StallMonitor::postModuleEventPrefetching(StreamContext const& sc, ModuleCallingContext const& mcc) {
0502 auto const sid = stream_id(sc);
0503 auto const mid = module_id(mcc);
0504 auto start = stallStart_[std::make_pair(sid, mid)] = std::make_pair(now(), false);
0505
0506 if (validFile_) {
0507 auto const t = duration_cast<duration_t>(start.first - beginTime_).count();
0508 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
0509 file_.write(std::move(msg));
0510 }
0511 }
0512
0513 void StallMonitor::preModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0514 auto const preModEventAcquire = now();
0515 auto const sid = stream_id(sc);
0516 auto const mid = module_id(mcc);
0517 auto& start = stallStart_[std::make_pair(sid, mid)];
0518 auto startT = start.first.time_since_epoch();
0519 start.second = true;
0520 if (validFile_) {
0521 auto t = duration_cast<duration_t>(preModEventAcquire - beginTime_).count();
0522 auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid, t);
0523 file_.write(std::move(msg));
0524 }
0525
0526 if (duration_t::duration::zero() != startT) {
0527 auto const preFetch_to_preModEventAcquire = duration_cast<duration_t>(preModEventAcquire - start.first);
0528 if (preFetch_to_preModEventAcquire < stallThreshold_)
0529 return;
0530 moduleStats_[mid].update(preFetch_to_preModEventAcquire);
0531 }
0532 }
0533
0534 void StallMonitor::postModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0535 auto const postModEventAcquire = duration_cast<duration_t>(now() - beginTime_).count();
0536 auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
0537 file_.write(std::move(msg));
0538 }
0539
0540 void StallMonitor::preModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0541 auto const preModEvent = now();
0542 auto const sid = stream_id(sc);
0543 auto const mid = module_id(mcc);
0544 auto const& start = stallStart_[std::make_pair(sid, mid)];
0545 auto startT = start.first.time_since_epoch();
0546 if (validFile_) {
0547 auto t = duration_cast<duration_t>(preModEvent - beginTime_).count();
0548 auto msg =
0549 assembleMessage<step::preModuleEvent>(sid, mid, static_cast<std::underlying_type_t<Phase>>(Phase::Event), t);
0550 file_.write(std::move(msg));
0551 }
0552
0553 if (duration_t::duration::zero() != startT && !start.second) {
0554 auto const preFetch_to_preModEvent = duration_cast<duration_t>(preModEvent - start.first);
0555 if (preFetch_to_preModEvent < stallThreshold_)
0556 return;
0557 moduleStats_[mid].update(preFetch_to_preModEvent);
0558 }
0559 }
0560
0561 void StallMonitor::preModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0562 auto const tNow = now();
0563 auto const sid = stream_id(sc);
0564 auto const mid = module_id(mcc);
0565 auto t = duration_cast<duration_t>(tNow - beginTime_).count();
0566 auto msg = assembleMessage<step::preModuleEvent>(sid, mid, toTransition(sc), t);
0567 file_.write(std::move(msg));
0568 }
0569
0570 void StallMonitor::postModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0571 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0572 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), toTransition(sc), t);
0573 file_.write(std::move(msg));
0574 }
0575
0576 void StallMonitor::preModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0577 auto t = duration_cast<duration_t>(now() - beginTime_).count();
0578 auto msg = assembleMessage<step::preModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), t);
0579 file_.write(std::move(msg));
0580 }
0581
0582 void StallMonitor::postModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0583 auto const postModTime = duration_cast<duration_t>(now() - beginTime_).count();
0584 auto msg = assembleMessage<step::postModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), postModTime);
0585 file_.write(std::move(msg));
0586 }
0587
0588 void StallMonitor::preEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0589 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0590 auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0591 file_.write(std::move(msg));
0592 }
0593
0594 void StallMonitor::postEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0595 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0596 auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0597 file_.write(std::move(msg));
0598 }
0599
0600 void StallMonitor::postModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0601 auto const postModEvent = duration_cast<duration_t>(now() - beginTime_).count();
0602 auto msg = assembleMessage<step::postModuleEvent>(
0603 stream_id(sc), module_id(mcc), static_cast<std::underlying_type_t<Phase>>(Phase::Event), postModEvent);
0604 file_.write(std::move(msg));
0605 }
0606
0607 void StallMonitor::postEvent(StreamContext const& sc) {
0608 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0609 auto const& eid = sc.eventID();
0610 auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0611 file_.write(std::move(msg));
0612 }
0613
0614 void StallMonitor::postEndJob() {
0615
0616 std::size_t width{};
0617 edm::for_all(moduleStats_, [&width](auto const& stats) {
0618 if (stats.numberOfStalls() == 0u)
0619 return;
0620 width = std::max(width, stats.label().size());
0621 });
0622
0623 OStreamColumn tag{"StallMonitor>"};
0624 OStreamColumn col1{"Module label", width};
0625 OStreamColumn col2{"# of stalls"};
0626 OStreamColumn col3{"Total stalled time"};
0627 OStreamColumn col4{"Max stalled time"};
0628
0629 LogAbsolute out{"StallMonitor"};
0630 out << '\n';
0631 out << tag << space << col1 << space << col2 << space << col3 << space << col4 << '\n';
0632
0633 out << tag << space << std::setfill('-') << col1(std::string{}) << space << col2(std::string{}) << space
0634 << col3(std::string{}) << space << col4(std::string{}) << '\n';
0635
0636 using seconds_d = duration<double>;
0637
0638 auto to_seconds_str = [](auto const& duration) {
0639 std::ostringstream oss;
0640 auto const time = duration_cast<seconds_d>(duration).count();
0641 oss << time << " s";
0642 return oss.str();
0643 };
0644
0645 out << std::setfill(' ');
0646 for (auto const& stats : moduleStats_) {
0647 if (stats.label().empty() ||
0648 stats.numberOfStalls() == 0u)
0649 continue;
0650 out << std::left << tag << space << col1(stats.label()) << space << std::right << col2(stats.numberOfStalls())
0651 << space << col3(to_seconds_str(stats.totalStalledTime())) << space
0652 << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
0653 }
0654 }
0655
0656 DEFINE_FWK_SERVICE(StallMonitor);