File indexing completed on 2024-04-06 12:13:05
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0012 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0013 #include "FWCore/Framework/interface/ComponentDescription.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0019 #include "FWCore/ServiceRegistry/interface/Service.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0021 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0022 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0023 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0024 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0025 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0026 #include "FWCore/Utilities/interface/Algorithms.h"
0027 #include "FWCore/Utilities/interface/OStreamColumn.h"
0028 #include "FWCore/Utilities/interface/Exception.h"
0029 #include "FWCore/Utilities/interface/StdPairHasher.h"
0030
0031 #include "monitor_file_utilities.h"
0032
0033 #include "oneapi/tbb/concurrent_unordered_map.h"
0034 #include <atomic>
0035 #include <chrono>
0036 #include <iomanip>
0037 #include <iostream>
0038 #include <sstream>
0039
0040 using namespace edm::service::monitor_file_utilities;
0041
0042 namespace {
0043
0044 using duration_t = std::chrono::microseconds;
0045 using clock_t = std::chrono::steady_clock;
0046 auto now = clock_t::now;
0047
0048
0049 class StallStatistics {
0050 public:
0051
0052
0053
0054 StallStatistics() = default;
0055
0056 std::string const& label() const { return label_; }
0057 unsigned numberOfStalls() const { return stallCounter_; }
0058
0059 using rep_t = duration_t::rep;
0060
0061 duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
0062 duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
0063
0064
0065 void setLabel(std::string const& label) { label_ = label; }
0066
0067 void update(duration_t const ms) {
0068 ++stallCounter_;
0069 auto const thisTime = ms.count();
0070 totalTime_ += thisTime;
0071 rep_t max{maxTime_};
0072 while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime))
0073 ;
0074 }
0075
0076 private:
0077 std::string label_{};
0078 std::atomic<unsigned> stallCounter_{};
0079 std::atomic<rep_t> totalTime_{};
0080 std::atomic<rep_t> maxTime_{};
0081 };
0082
0083 enum class step : char {
0084 preSourceEvent = 'S',
0085 postSourceEvent = 's',
0086 preEvent = 'E',
0087 postModuleEventPrefetching = 'p',
0088 preModuleEventAcquire = 'A',
0089 postModuleEventAcquire = 'a',
0090 preModuleEvent = 'M',
0091 preEventReadFromSource = 'R',
0092 postEventReadFromSource = 'r',
0093 postModuleEvent = 'm',
0094 postEvent = 'e',
0095 postESModulePrefetching = 'q',
0096 preESModule = 'N',
0097 postESModule = 'n',
0098 preFrameworkTransition = 'F',
0099 postFrameworkTransition = 'f'
0100 };
0101
0102 enum class Phase : short {
0103 globalEndRun = -4,
0104 streamEndRun = -3,
0105 globalEndLumi = -2,
0106 streamEndLumi = -1,
0107 Event = 0,
0108 streamBeginLumi = 1,
0109 globalBeginLumi = 2,
0110 streamBeginRun = 3,
0111 globalBeginRun = 4,
0112 eventSetupCall = 5
0113 };
0114
0115 std::ostream& operator<<(std::ostream& os, step const s) {
0116 os << static_cast<std::underlying_type_t<step>>(s);
0117 return os;
0118 }
0119
0120 std::ostream& operator<<(std::ostream& os, Phase const s) {
0121 os << static_cast<std::underlying_type_t<Phase>>(s);
0122 return os;
0123 }
0124
0125 template <step S, typename... ARGS>
0126 std::string assembleMessage(ARGS const... args) {
0127 std::ostringstream oss;
0128 oss << S;
0129 concatenate(oss, args...);
0130 oss << '\n';
0131 return oss.str();
0132 }
0133
0134 Phase toTransitionImpl(edm::StreamContext const& iContext) {
0135 using namespace edm;
0136 switch (iContext.transition()) {
0137 case StreamContext::Transition::kBeginRun:
0138 return Phase::streamBeginRun;
0139 case StreamContext::Transition::kBeginLuminosityBlock:
0140 return Phase::streamBeginLumi;
0141 case StreamContext::Transition::kEvent:
0142 return Phase::Event;
0143 case StreamContext::Transition::kEndLuminosityBlock:
0144 return Phase::streamEndLumi;
0145 case StreamContext::Transition::kEndRun:
0146 return Phase::streamEndRun;
0147 default:
0148 break;
0149 }
0150 assert(false);
0151 return Phase::Event;
0152 }
0153
0154 auto toTransition(edm::StreamContext const& iContext) -> std::underlying_type_t<Phase> {
0155 return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0156 }
0157
0158 Phase toTransitionImpl(edm::GlobalContext const& iContext) {
0159 using namespace edm;
0160 switch (iContext.transition()) {
0161 case GlobalContext::Transition::kBeginRun:
0162 return Phase::globalBeginRun;
0163 case GlobalContext::Transition::kBeginLuminosityBlock:
0164 return Phase::globalBeginLumi;
0165 case GlobalContext::Transition::kEndLuminosityBlock:
0166 return Phase::globalEndLumi;
0167 case GlobalContext::Transition::kWriteLuminosityBlock:
0168 return Phase::globalEndLumi;
0169 case GlobalContext::Transition::kEndRun:
0170 return Phase::globalEndRun;
0171 case GlobalContext::Transition::kWriteRun:
0172 return Phase::globalEndRun;
0173 default:
0174 break;
0175 }
0176 assert(false);
0177 return Phase::Event;
0178 }
0179 auto toTransition(edm::GlobalContext const& iContext) -> std::underlying_type_t<Phase> {
0180 return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0181 }
0182
0183 }
0184
0185 namespace edm {
0186 namespace service {
0187
0188 class StallMonitor {
0189 public:
0190 StallMonitor(ParameterSet const&, ActivityRegistry&);
0191 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0192
0193 private:
0194 void preModuleConstruction(edm::ModuleDescription const&);
0195 void preModuleDestruction(edm::ModuleDescription const&);
0196 void postBeginJob();
0197 void preSourceEvent(StreamID);
0198 void postSourceEvent(StreamID);
0199 void preEvent(StreamContext const&);
0200 void preModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0201 void postModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0202 void preModuleEvent(StreamContext const&, ModuleCallingContext const&);
0203 void postModuleEventPrefetching(StreamContext const&, ModuleCallingContext const&);
0204 void preEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0205 void postEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0206 void postModuleEvent(StreamContext const&, ModuleCallingContext const&);
0207 void postEvent(StreamContext const&);
0208 void preModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0209 void postModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0210 void preModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0211 void postModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0212 void postEndJob();
0213
0214 ThreadSafeOutputFileStream file_;
0215 bool const validFile_;
0216 duration_t const stallThreshold_;
0217 decltype(now()) beginTime_{};
0218
0219
0220
0221
0222
0223 using StreamID_value = decltype(std::declval<StreamID>().value());
0224 using ModuleID = decltype(std::declval<ModuleDescription>().id());
0225 oneapi::tbb::concurrent_unordered_map<std::pair<StreamID_value, ModuleID>,
0226 std::pair<decltype(beginTime_), bool>,
0227 edm::StdPairHasher>
0228 stallStart_{};
0229
0230 std::vector<std::string> moduleLabels_{};
0231 std::vector<StallStatistics> moduleStats_{};
0232 std::vector<std::string> esModuleLabels_{};
0233 unsigned int numStreams_;
0234 };
0235
0236 }
0237
0238 }
0239
0240 namespace {
0241 constexpr char const* const filename_default{""};
0242 constexpr double threshold_default{0.1};
0243 std::string const space{" "};
0244 }
0245
0246 using edm::service::StallMonitor;
0247 using namespace std::chrono;
0248
0249 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
0250 : file_{iPS.getUntrackedParameter<std::string>("fileName")},
0251 validFile_{file_},
0252 stallThreshold_{
0253 std::chrono::round<duration_t>(duration<double>(iPS.getUntrackedParameter<double>("stallThreshold")))} {
0254 iRegistry.watchPreModuleConstruction(this, &StallMonitor::preModuleConstruction);
0255 iRegistry.watchPreModuleDestruction(this, &StallMonitor::preModuleDestruction);
0256 iRegistry.watchPostBeginJob(this, &StallMonitor::postBeginJob);
0257 iRegistry.watchPostModuleEventPrefetching(this, &StallMonitor::postModuleEventPrefetching);
0258 iRegistry.watchPreModuleEventAcquire(this, &StallMonitor::preModuleEventAcquire);
0259 iRegistry.watchPreModuleEvent(this, &StallMonitor::preModuleEvent);
0260 iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
0261
0262 if (validFile_) {
0263
0264 iRegistry.watchPreSourceEvent(this, &StallMonitor::preSourceEvent);
0265 iRegistry.watchPostSourceEvent(this, &StallMonitor::postSourceEvent);
0266 iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
0267 iRegistry.watchPostModuleEventAcquire(this, &StallMonitor::postModuleEventAcquire);
0268 iRegistry.watchPreEventReadFromSource(this, &StallMonitor::preEventReadFromSource);
0269 iRegistry.watchPostEventReadFromSource(this, &StallMonitor::postEventReadFromSource);
0270 iRegistry.watchPostModuleEvent(this, &StallMonitor::postModuleEvent);
0271 iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
0272
0273 iRegistry.watchPreModuleStreamBeginRun(this, &StallMonitor::preModuleStreamTransition);
0274 iRegistry.watchPostModuleStreamBeginRun(this, &StallMonitor::postModuleStreamTransition);
0275 iRegistry.watchPreModuleStreamEndRun(this, &StallMonitor::preModuleStreamTransition);
0276 iRegistry.watchPostModuleStreamEndRun(this, &StallMonitor::postModuleStreamTransition);
0277
0278 iRegistry.watchPreModuleStreamBeginLumi(this, &StallMonitor::preModuleStreamTransition);
0279 iRegistry.watchPostModuleStreamBeginLumi(this, &StallMonitor::postModuleStreamTransition);
0280 iRegistry.watchPreModuleStreamEndLumi(this, &StallMonitor::preModuleStreamTransition);
0281 iRegistry.watchPostModuleStreamEndLumi(this, &StallMonitor::postModuleStreamTransition);
0282
0283 iRegistry.watchPreModuleGlobalBeginRun(this, &StallMonitor::preModuleGlobalTransition);
0284 iRegistry.watchPostModuleGlobalBeginRun(this, &StallMonitor::postModuleGlobalTransition);
0285 iRegistry.watchPreModuleGlobalEndRun(this, &StallMonitor::preModuleGlobalTransition);
0286 iRegistry.watchPostModuleGlobalEndRun(this, &StallMonitor::postModuleGlobalTransition);
0287 iRegistry.watchPreModuleWriteRun(this, &StallMonitor::preModuleGlobalTransition);
0288 iRegistry.watchPostModuleWriteRun(this, &StallMonitor::postModuleGlobalTransition);
0289
0290 iRegistry.watchPreModuleGlobalBeginLumi(this, &StallMonitor::preModuleGlobalTransition);
0291 iRegistry.watchPostModuleGlobalBeginLumi(this, &StallMonitor::postModuleGlobalTransition);
0292 iRegistry.watchPreModuleGlobalEndLumi(this, &StallMonitor::preModuleGlobalTransition);
0293 iRegistry.watchPostModuleGlobalEndLumi(this, &StallMonitor::postModuleGlobalTransition);
0294 iRegistry.watchPreModuleWriteLumi(this, &StallMonitor::preModuleGlobalTransition);
0295 iRegistry.watchPostModuleWriteLumi(this, &StallMonitor::postModuleGlobalTransition);
0296
0297 iRegistry.postESModuleRegistrationSignal_.connect([this](auto const& iDescription) {
0298 if (esModuleLabels_.size() <= iDescription.id_) {
0299 esModuleLabels_.resize(iDescription.id_ + 1);
0300 }
0301 if (not iDescription.label_.empty()) {
0302 esModuleLabels_[iDescription.id_] = iDescription.label_;
0303 } else {
0304 esModuleLabels_[iDescription.id_] = iDescription.type_;
0305 }
0306 });
0307
0308 iRegistry.preESModuleSignal_.connect([this](auto const&, auto const& context) {
0309 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0310 auto msg = assembleMessage<step::preESModule>(
0311 numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0312 file_.write(std::move(msg));
0313 });
0314 iRegistry.postESModuleSignal_.connect([this](auto const&, auto const& context) {
0315 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0316 auto msg = assembleMessage<step::postESModule>(
0317 numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0318 file_.write(std::move(msg));
0319 });
0320
0321 iRegistry.preallocateSignal_.connect(
0322 [this](service::SystemBounds const& iBounds) { numStreams_ = iBounds.maxNumberOfStreams(); });
0323
0324 bool recordFrameworkTransitions = iPS.getUntrackedParameter<bool>("recordFrameworkTransitions");
0325 if (recordFrameworkTransitions) {
0326 {
0327 auto preGlobal = [this](GlobalContext const& gc) {
0328 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0329 auto msg = assembleMessage<step::preFrameworkTransition>(
0330 numStreams_, gc.luminosityBlockID().run(), gc.luminosityBlockID().luminosityBlock(), toTransition(gc), t);
0331 file_.write(std::move(msg));
0332 };
0333 iRegistry.watchPreGlobalBeginRun(preGlobal);
0334 iRegistry.watchPreGlobalBeginLumi(preGlobal);
0335 iRegistry.watchPreGlobalEndLumi(preGlobal);
0336 iRegistry.watchPreGlobalEndRun(preGlobal);
0337 }
0338 {
0339 auto postGlobal = [this](GlobalContext const& gc) {
0340 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0341 auto msg = assembleMessage<step::postFrameworkTransition>(
0342 numStreams_, gc.luminosityBlockID().run(), gc.luminosityBlockID().luminosityBlock(), toTransition(gc), t);
0343 file_.write(std::move(msg));
0344 };
0345 iRegistry.watchPostGlobalBeginRun(postGlobal);
0346 iRegistry.watchPostGlobalBeginLumi(postGlobal);
0347 iRegistry.watchPostGlobalEndLumi(postGlobal);
0348 iRegistry.watchPostGlobalEndRun(postGlobal);
0349 }
0350 {
0351 auto preStream = [this](StreamContext const& sc) {
0352 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0353 auto msg = assembleMessage<step::preFrameworkTransition>(
0354 stream_id(sc), sc.eventID().run(), sc.eventID().luminosityBlock(), toTransition(sc), t);
0355 file_.write(std::move(msg));
0356 };
0357 iRegistry.watchPreStreamBeginRun(preStream);
0358 iRegistry.watchPreStreamBeginLumi(preStream);
0359 iRegistry.watchPreStreamEndLumi(preStream);
0360 iRegistry.watchPreStreamEndRun(preStream);
0361 }
0362 {
0363 auto postStream = [this](StreamContext const& sc) {
0364 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0365 auto msg = assembleMessage<step::postFrameworkTransition>(
0366 stream_id(sc), sc.eventID().run(), sc.eventID().luminosityBlock(), toTransition(sc), t);
0367 file_.write(std::move(msg));
0368 };
0369 iRegistry.watchPostStreamBeginRun(postStream);
0370 iRegistry.watchPostStreamBeginLumi(postStream);
0371 iRegistry.watchPostStreamEndLumi(postStream);
0372 iRegistry.watchPostStreamEndRun(postStream);
0373 }
0374 }
0375
0376 std::ostringstream oss;
0377 oss << "# Transition Symbol\n";
0378 oss << "#----------------- ------\n";
0379 oss << "# eventSetupCall " << Phase::eventSetupCall << "\n"
0380 << "# globalBeginRun " << Phase::globalBeginRun << "\n"
0381 << "# streamBeginRun " << Phase::streamBeginRun << "\n"
0382 << "# globalBeginLumi " << Phase::globalBeginLumi << "\n"
0383 << "# streamBeginLumi " << Phase::streamBeginLumi << "\n"
0384 << "# Event " << Phase::Event << "\n"
0385 << "# streamEndLumi " << Phase::streamEndLumi << "\n"
0386 << "# globalEndLumi " << Phase::globalEndLumi << "\n"
0387 << "# streamEndRun " << Phase::streamEndRun << "\n"
0388 << "# globalEndRun " << Phase::globalEndRun << "\n";
0389 oss << "# Step Symbol Entries\n"
0390 << "# -------------------------- ------ ------------------------------------------\n"
0391 << "# preSourceEvent " << step::preSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
0392 << "# postSourceEvent " << step::postSourceEvent << " <Stream ID> <Time since beginJob (ms)>\n"
0393 << "# preEvent " << step::preEvent
0394 << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0395 << "# postModuleEventPrefetching " << step::postModuleEventPrefetching
0396 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0397 << "# preModuleEventAcquire " << step::preModuleEventAcquire
0398 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0399 << "# postModuleEventAcquire " << step::postModuleEventAcquire
0400 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0401 << "# preModuleTransition " << step::preModuleEvent
0402 << " <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0403 << "# preEventReadFromSource " << step::preEventReadFromSource
0404 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0405 << "# postEventReadFromSource " << step::postEventReadFromSource
0406 << " <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0407 << "# postModuleTransition " << step::postModuleEvent
0408 << " <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0409 << "# postEvent " << step::postEvent
0410 << " <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0411 << "# postESModulePrefetching " << step::postESModulePrefetching
0412 << " <Stream ID> <ESModule ID> <Transition type> <Time since beginJob (ms)>\n"
0413 << "# preESModuleTransition " << step::preESModule
0414 << " <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n"
0415 << "# postESModuleTransition " << step::postESModule
0416 << " <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n";
0417 if (recordFrameworkTransitions) {
0418 oss << "# preFrameworkTransition " << step::preFrameworkTransition
0419 << " <Stream ID> <Run#> <LumiBlock#> <Transition type> <Time since beginJob (ms)>\n"
0420 << "# postFrameworkTransition " << step::postFrameworkTransition
0421 << " <Stream ID> <Run#> <LumiBlock#> <Transition type> <Time since beginJob (ms)>\n";
0422 }
0423 file_.write(oss.str());
0424 }
0425 }
0426
0427 void StallMonitor::fillDescriptions(ConfigurationDescriptions& descriptions) {
0428 ParameterSetDescription desc;
0429 desc.addUntracked<std::string>("fileName", filename_default)
0430 ->setComment(
0431 "Name of file to which detailed timing information should be written.\n"
0432 "An empty filename argument (the default) indicates that no extra\n"
0433 "information will be written to a dedicated file, but only the summary\n"
0434 "including stalling-modules information will be logged.");
0435 desc.addUntracked<double>("stallThreshold", threshold_default)
0436 ->setComment(
0437 "Threshold (in seconds) used to classify modules as stalled.\n"
0438 "Microsecond granularity allowed.");
0439 desc.addUntracked<bool>("recordFrameworkTransitions", false)
0440 ->setComment(
0441 "When writing a file, include the framework state transitions:\n"
0442 " stream and global, begin and end, Run and LuminosityBlock.");
0443 descriptions.add("StallMonitor", desc);
0444 descriptions.setComment(
0445 "This service keeps track of various times in event-processing to determine which modules are stalling.");
0446 }
0447
0448 void StallMonitor::preModuleConstruction(ModuleDescription const& md) {
0449
0450
0451
0452
0453
0454
0455
0456
0457
0458
0459
0460 auto const mid = md.id();
0461 if (mid < moduleLabels_.size()) {
0462 moduleLabels_[mid] = md.moduleLabel();
0463 } else {
0464 moduleLabels_.resize(mid + 1);
0465 moduleLabels_.back() = md.moduleLabel();
0466 }
0467 }
0468
0469 void StallMonitor::preModuleDestruction(ModuleDescription const& md) {
0470
0471
0472 moduleLabels_[md.id()] = "";
0473 }
0474
0475 void StallMonitor::postBeginJob() {
0476
0477
0478
0479
0480 moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
0481 for (std::size_t i{}; i < moduleStats_.size(); ++i) {
0482 moduleStats_[i].setLabel(moduleLabels_[i]);
0483 }
0484
0485 if (validFile_) {
0486 {
0487 std::ostringstream oss;
0488 moduleIdToLabel(oss, moduleLabels_, 'M', "Module ID", "Module label");
0489 file_.write(oss.str());
0490 }
0491 {
0492 std::ostringstream oss;
0493 moduleIdToLabel(oss, esModuleLabels_, 'N', "ESModule ID", "ESModule label");
0494 file_.write(oss.str());
0495 }
0496 }
0497
0498
0499 decltype(moduleLabels_)().swap(moduleLabels_);
0500 decltype(esModuleLabels_)().swap(esModuleLabels_);
0501
0502 beginTime_ = now();
0503 }
0504
0505 void StallMonitor::preSourceEvent(StreamID const sid) {
0506 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0507 auto msg = assembleMessage<step::preSourceEvent>(sid.value(), t);
0508 file_.write(std::move(msg));
0509 }
0510
0511 void StallMonitor::postSourceEvent(StreamID const sid) {
0512 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0513 auto msg = assembleMessage<step::postSourceEvent>(sid.value(), t);
0514 file_.write(std::move(msg));
0515 }
0516
0517 void StallMonitor::preEvent(StreamContext const& sc) {
0518 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0519 auto const& eid = sc.eventID();
0520 auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0521 file_.write(std::move(msg));
0522 }
0523
0524 void StallMonitor::postModuleEventPrefetching(StreamContext const& sc, ModuleCallingContext const& mcc) {
0525 auto const sid = stream_id(sc);
0526 auto const mid = module_id(mcc);
0527 auto start = stallStart_[std::make_pair(sid, mid)] = std::make_pair(now(), false);
0528
0529 if (validFile_) {
0530 auto const t = duration_cast<duration_t>(start.first - beginTime_).count();
0531 auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
0532 file_.write(std::move(msg));
0533 }
0534 }
0535
0536 void StallMonitor::preModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0537 auto const preModEventAcquire = now();
0538 auto const sid = stream_id(sc);
0539 auto const mid = module_id(mcc);
0540 auto& start = stallStart_[std::make_pair(sid, mid)];
0541 auto startT = start.first.time_since_epoch();
0542 start.second = true;
0543 if (validFile_) {
0544 auto t = duration_cast<duration_t>(preModEventAcquire - beginTime_).count();
0545 auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid, t);
0546 file_.write(std::move(msg));
0547 }
0548
0549 if (duration_t::duration::zero() != startT) {
0550 auto const preFetch_to_preModEventAcquire = duration_cast<duration_t>(preModEventAcquire - start.first);
0551 if (preFetch_to_preModEventAcquire < stallThreshold_)
0552 return;
0553 moduleStats_[mid].update(preFetch_to_preModEventAcquire);
0554 }
0555 }
0556
0557 void StallMonitor::postModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0558 auto const postModEventAcquire = duration_cast<duration_t>(now() - beginTime_).count();
0559 auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
0560 file_.write(std::move(msg));
0561 }
0562
0563 void StallMonitor::preModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0564 auto const preModEvent = now();
0565 auto const sid = stream_id(sc);
0566 auto const mid = module_id(mcc);
0567 auto const& start = stallStart_[std::make_pair(sid, mid)];
0568 auto startT = start.first.time_since_epoch();
0569 if (validFile_) {
0570 auto t = duration_cast<duration_t>(preModEvent - beginTime_).count();
0571 auto msg =
0572 assembleMessage<step::preModuleEvent>(sid, mid, static_cast<std::underlying_type_t<Phase>>(Phase::Event), t);
0573 file_.write(std::move(msg));
0574 }
0575
0576 if (duration_t::duration::zero() != startT && !start.second) {
0577 auto const preFetch_to_preModEvent = duration_cast<duration_t>(preModEvent - start.first);
0578 if (preFetch_to_preModEvent < stallThreshold_)
0579 return;
0580 moduleStats_[mid].update(preFetch_to_preModEvent);
0581 }
0582 }
0583
0584 void StallMonitor::preModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0585 auto const tNow = now();
0586 auto const sid = stream_id(sc);
0587 auto const mid = module_id(mcc);
0588 auto t = duration_cast<duration_t>(tNow - beginTime_).count();
0589 auto msg = assembleMessage<step::preModuleEvent>(sid, mid, toTransition(sc), t);
0590 file_.write(std::move(msg));
0591 }
0592
0593 void StallMonitor::postModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0594 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0595 auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), toTransition(sc), t);
0596 file_.write(std::move(msg));
0597 }
0598
0599 void StallMonitor::preModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0600 auto t = duration_cast<duration_t>(now() - beginTime_).count();
0601 auto msg = assembleMessage<step::preModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), t);
0602 file_.write(std::move(msg));
0603 }
0604
0605 void StallMonitor::postModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0606 auto const postModTime = duration_cast<duration_t>(now() - beginTime_).count();
0607 auto msg = assembleMessage<step::postModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), postModTime);
0608 file_.write(std::move(msg));
0609 }
0610
0611 void StallMonitor::preEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0612 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0613 auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0614 file_.write(std::move(msg));
0615 }
0616
0617 void StallMonitor::postEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0618 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0619 auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0620 file_.write(std::move(msg));
0621 }
0622
0623 void StallMonitor::postModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0624 auto const postModEvent = duration_cast<duration_t>(now() - beginTime_).count();
0625 auto msg = assembleMessage<step::postModuleEvent>(
0626 stream_id(sc), module_id(mcc), static_cast<std::underlying_type_t<Phase>>(Phase::Event), postModEvent);
0627 file_.write(std::move(msg));
0628 }
0629
0630 void StallMonitor::postEvent(StreamContext const& sc) {
0631 auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0632 auto const& eid = sc.eventID();
0633 auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0634 file_.write(std::move(msg));
0635 }
0636
0637 void StallMonitor::postEndJob() {
0638
0639 std::size_t width{};
0640 edm::for_all(moduleStats_, [&width](auto const& stats) {
0641 if (stats.numberOfStalls() == 0u)
0642 return;
0643 width = std::max(width, stats.label().size());
0644 });
0645
0646 OStreamColumn tag{"StallMonitor>"};
0647 OStreamColumn col1{"Module label", width};
0648 OStreamColumn col2{"# of stalls"};
0649 OStreamColumn col3{"Total stalled time"};
0650 OStreamColumn col4{"Max stalled time"};
0651
0652 LogAbsolute out{"StallMonitor"};
0653 out << '\n';
0654 out << tag << space << col1 << space << col2 << space << col3 << space << col4 << '\n';
0655
0656 out << tag << space << std::setfill('-') << col1(std::string{}) << space << col2(std::string{}) << space
0657 << col3(std::string{}) << space << col4(std::string{}) << '\n';
0658
0659 using seconds_d = duration<double>;
0660
0661 auto to_seconds_str = [](auto const& duration) {
0662 std::ostringstream oss;
0663 auto const time = duration_cast<seconds_d>(duration).count();
0664 oss << time << " s";
0665 return oss.str();
0666 };
0667
0668 out << std::setfill(' ');
0669 for (auto const& stats : moduleStats_) {
0670 if (stats.label().empty() ||
0671 stats.numberOfStalls() == 0u)
0672 continue;
0673 out << std::left << tag << space << col1(stats.label()) << space << std::right << col2(stats.numberOfStalls())
0674 << space << col3(to_seconds_str(stats.totalStalledTime())) << space
0675 << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
0676 }
0677 }
0678
0679 DEFINE_FWK_SERVICE(StallMonitor);