Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:03:42

0001 // -*- C++ -*-
0002 //
0003 // Package: FWCore/Services
0004 // Class  : StallMonitor
0005 //
0006 // Implementation:
0007 //
0008 // Original Author:  Kyle Knoepfel
0009 //
0010 
0011 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0012 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0013 #include "FWCore/Framework/interface/ComponentDescription.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0019 #include "FWCore/ServiceRegistry/interface/Service.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0021 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0022 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0023 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0024 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0025 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0026 #include "FWCore/Utilities/interface/Algorithms.h"
0027 #include "FWCore/Utilities/interface/OStreamColumn.h"
0028 #include "FWCore/Utilities/interface/Exception.h"
0029 #include "FWCore/Utilities/interface/StdPairHasher.h"
0030 #include "oneapi/tbb/concurrent_unordered_map.h"
0031 
0032 #include <atomic>
0033 #include <chrono>
0034 #include <iomanip>
0035 #include <iostream>
0036 #include <sstream>
0037 
0038 namespace {
0039 
0040   using duration_t = std::chrono::microseconds;
0041   using clock_t = std::chrono::steady_clock;
0042   auto now = clock_t::now;
0043 
0044   inline auto stream_id(edm::StreamContext const& cs) { return cs.streamID().value(); }
0045 
0046   inline auto module_id(edm::ModuleCallingContext const& mcc) { return mcc.moduleDescription()->id(); }
0047 
0048   inline auto module_id(edm::ESModuleCallingContext const& mcc) { return mcc.componentDescription()->id_; }
0049 
0050   //===============================================================
0051   class StallStatistics {
0052   public:
0053     // c'tor receiving 'std::string const&' type not provided since we
0054     // must be able to call (e.g.) std::vector<StallStatistics>(20),
0055     // for which a default label is not sensible in this context.
0056     StallStatistics() = default;
0057 
0058     std::string const& label() const { return label_; }
0059     unsigned numberOfStalls() const { return stallCounter_; }
0060 
0061     using rep_t = duration_t::rep;
0062 
0063     duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
0064     duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
0065 
0066     // Modifiers
0067     void setLabel(std::string const& label) { label_ = label; }
0068 
0069     void update(duration_t const ms) {
0070       ++stallCounter_;
0071       auto const thisTime = ms.count();
0072       totalTime_ += thisTime;
0073       rep_t max{maxTime_};
0074       while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime))
0075         ;
0076     }
0077 
0078   private:
0079     std::string label_{};
0080     std::atomic<unsigned> stallCounter_{};
0081     std::atomic<rep_t> totalTime_{};
0082     std::atomic<rep_t> maxTime_{};
0083   };
0084 
0085   //===============================================================
0086   // Message-assembly utilities
0087   template <typename T>
0088   std::enable_if_t<std::is_integral<T>::value> concatenate(std::ostream& os, T const t) {
0089     os << ' ' << t;
0090   }
0091 
0092   template <typename H, typename... T>
0093   std::enable_if_t<std::is_integral<H>::value> concatenate(std::ostream& os, H const h, T const... t) {
0094     os << ' ' << h;
0095     concatenate(os, t...);
0096   }
0097 
0098   enum class step : char {
0099     preSourceEvent = 'S',
0100     postSourceEvent = 's',
0101     preEvent = 'E',
0102     postModuleEventPrefetching = 'p',
0103     preModuleEventAcquire = 'A',
0104     postModuleEventAcquire = 'a',
0105     preModuleEvent = 'M',
0106     preEventReadFromSource = 'R',
0107     postEventReadFromSource = 'r',
0108     postModuleEvent = 'm',
0109     postEvent = 'e',
0110     postESModulePrefetching = 'q',
0111     preESModule = 'N',
0112     postESModule = 'n'
0113   };
0114 
0115   enum class Phase : short {
0116     globalEndRun = -4,
0117     streamEndRun = -3,
0118     globalEndLumi = -2,
0119     streamEndLumi = -1,
0120     Event = 0,
0121     streamBeginLumi = 1,
0122     globalBeginLumi = 2,
0123     streamBeginRun = 3,
0124     globalBeginRun = 4,
0125     eventSetupCall = 5
0126   };
0127 
0128   std::ostream& operator<<(std::ostream& os, step const s) {
0129     os << static_cast<std::underlying_type_t<step>>(s);
0130     return os;
0131   }
0132 
0133   std::ostream& operator<<(std::ostream& os, Phase const s) {
0134     os << static_cast<std::underlying_type_t<Phase>>(s);
0135     return os;
0136   }
0137 
0138   template <step S, typename... ARGS>
0139   std::string assembleMessage(ARGS const... args) {
0140     std::ostringstream oss;
0141     oss << S;
0142     concatenate(oss, args...);
0143     oss << '\n';
0144     return oss.str();
0145   }
0146 
0147   Phase toTransitionImpl(edm::StreamContext const& iContext) {
0148     using namespace edm;
0149     switch (iContext.transition()) {
0150       case StreamContext::Transition::kBeginRun:
0151         return Phase::streamBeginRun;
0152       case StreamContext::Transition::kBeginLuminosityBlock:
0153         return Phase::streamBeginLumi;
0154       case StreamContext::Transition::kEvent:
0155         return Phase::Event;
0156       case StreamContext::Transition::kEndLuminosityBlock:
0157         return Phase::streamEndLumi;
0158       case StreamContext::Transition::kEndRun:
0159         return Phase::streamEndRun;
0160       default:
0161         break;
0162     }
0163     assert(false);
0164     return Phase::Event;
0165   }
0166 
0167   auto toTransition(edm::StreamContext const& iContext) -> std::underlying_type_t<Phase> {
0168     return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0169   }
0170 
0171   Phase toTransitionImpl(edm::GlobalContext const& iContext) {
0172     using namespace edm;
0173     switch (iContext.transition()) {
0174       case GlobalContext::Transition::kBeginRun:
0175         return Phase::globalBeginRun;
0176       case GlobalContext::Transition::kBeginLuminosityBlock:
0177         return Phase::globalBeginLumi;
0178       case GlobalContext::Transition::kEndLuminosityBlock:
0179         return Phase::globalEndLumi;
0180       case GlobalContext::Transition::kWriteLuminosityBlock:
0181         return Phase::globalEndLumi;
0182       case GlobalContext::Transition::kEndRun:
0183         return Phase::globalEndRun;
0184       case GlobalContext::Transition::kWriteRun:
0185         return Phase::globalEndRun;
0186       default:
0187         break;
0188     }
0189     assert(false);
0190     return Phase::Event;
0191   }
0192   auto toTransition(edm::GlobalContext const& iContext) -> std::underlying_type_t<Phase> {
0193     return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0194   }
0195 
0196 }  // namespace
0197 
0198 namespace edm {
0199   namespace service {
0200 
0201     class StallMonitor {
0202     public:
0203       StallMonitor(ParameterSet const&, ActivityRegistry&);
0204       static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0205 
0206     private:
0207       void preModuleConstruction(edm::ModuleDescription const&);
0208       void preModuleDestruction(edm::ModuleDescription const&);
0209       void postBeginJob();
0210       void preSourceEvent(StreamID);
0211       void postSourceEvent(StreamID);
0212       void preEvent(StreamContext const&);
0213       void preModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0214       void postModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0215       void preModuleEvent(StreamContext const&, ModuleCallingContext const&);
0216       void postModuleEventPrefetching(StreamContext const&, ModuleCallingContext const&);
0217       void preEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0218       void postEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0219       void postModuleEvent(StreamContext const&, ModuleCallingContext const&);
0220       void postEvent(StreamContext const&);
0221       void preModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0222       void postModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0223       void preModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0224       void postModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0225       void postEndJob();
0226 
0227       ThreadSafeOutputFileStream file_;
0228       bool const validFile_;  // Separate data member from file to improve efficiency.
0229       duration_t const stallThreshold_;
0230       decltype(now()) beginTime_{};
0231 
0232       // There can be multiple modules per stream.  Therefore, we need
0233       // the combination of StreamID and ModuleID to correctly track
0234       // stalling information.  We use oneapi::tbb::concurrent_unordered_map
0235       // for this purpose.
0236       using StreamID_value = decltype(std::declval<StreamID>().value());
0237       using ModuleID = decltype(std::declval<ModuleDescription>().id());
0238       oneapi::tbb::concurrent_unordered_map<std::pair<StreamID_value, ModuleID>,
0239                                             std::pair<decltype(beginTime_), bool>,
0240                                             edm::StdPairHasher>
0241           stallStart_{};
0242 
0243       std::vector<std::string> moduleLabels_{};
0244       std::vector<StallStatistics> moduleStats_{};
0245       std::vector<std::string> esModuleLabels_{};
0246       unsigned int numStreams_;
0247     };
0248 
0249   }  // namespace service
0250 
0251 }  // namespace edm
0252 
0253 namespace {
0254   constexpr char const* const filename_default{""};
0255   constexpr double threshold_default{0.1};  //default threashold in seconds
0256   std::string const space{"  "};
0257 }  // namespace
0258 
0259 using edm::service::StallMonitor;
0260 using namespace std::chrono;
0261 
0262 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
0263     : file_{iPS.getUntrackedParameter<std::string>("fileName", filename_default)},
0264       validFile_{file_},
0265       stallThreshold_{
0266           std::chrono::round<duration_t>(duration<double>(iPS.getUntrackedParameter<double>("stallThreshold")))} {
0267   iRegistry.watchPreModuleConstruction(this, &StallMonitor::preModuleConstruction);
0268   iRegistry.watchPreModuleDestruction(this, &StallMonitor::preModuleDestruction);
0269   iRegistry.watchPostBeginJob(this, &StallMonitor::postBeginJob);
0270   iRegistry.watchPostModuleEventPrefetching(this, &StallMonitor::postModuleEventPrefetching);
0271   iRegistry.watchPreModuleEventAcquire(this, &StallMonitor::preModuleEventAcquire);
0272   iRegistry.watchPreModuleEvent(this, &StallMonitor::preModuleEvent);
0273   iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
0274 
0275   if (validFile_) {
0276     // Only enable the following callbacks if writing to a file.
0277     iRegistry.watchPreSourceEvent(this, &StallMonitor::preSourceEvent);
0278     iRegistry.watchPostSourceEvent(this, &StallMonitor::postSourceEvent);
0279     iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
0280     iRegistry.watchPostModuleEventAcquire(this, &StallMonitor::postModuleEventAcquire);
0281     iRegistry.watchPreEventReadFromSource(this, &StallMonitor::preEventReadFromSource);
0282     iRegistry.watchPostEventReadFromSource(this, &StallMonitor::postEventReadFromSource);
0283     iRegistry.watchPostModuleEvent(this, &StallMonitor::postModuleEvent);
0284     iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
0285 
0286     iRegistry.watchPreModuleStreamBeginRun(this, &StallMonitor::preModuleStreamTransition);
0287     iRegistry.watchPostModuleStreamBeginRun(this, &StallMonitor::postModuleStreamTransition);
0288     iRegistry.watchPreModuleStreamEndRun(this, &StallMonitor::preModuleStreamTransition);
0289     iRegistry.watchPostModuleStreamEndRun(this, &StallMonitor::postModuleStreamTransition);
0290 
0291     iRegistry.watchPreModuleStreamBeginLumi(this, &StallMonitor::preModuleStreamTransition);
0292     iRegistry.watchPostModuleStreamBeginLumi(this, &StallMonitor::postModuleStreamTransition);
0293     iRegistry.watchPreModuleStreamEndLumi(this, &StallMonitor::preModuleStreamTransition);
0294     iRegistry.watchPostModuleStreamEndLumi(this, &StallMonitor::postModuleStreamTransition);
0295 
0296     iRegistry.watchPreModuleGlobalBeginRun(this, &StallMonitor::preModuleGlobalTransition);
0297     iRegistry.watchPostModuleGlobalBeginRun(this, &StallMonitor::postModuleGlobalTransition);
0298     iRegistry.watchPreModuleGlobalEndRun(this, &StallMonitor::preModuleGlobalTransition);
0299     iRegistry.watchPostModuleGlobalEndRun(this, &StallMonitor::postModuleGlobalTransition);
0300     iRegistry.watchPreModuleWriteRun(this, &StallMonitor::preModuleGlobalTransition);
0301     iRegistry.watchPostModuleWriteRun(this, &StallMonitor::postModuleGlobalTransition);
0302 
0303     iRegistry.watchPreModuleGlobalBeginLumi(this, &StallMonitor::preModuleGlobalTransition);
0304     iRegistry.watchPostModuleGlobalBeginLumi(this, &StallMonitor::postModuleGlobalTransition);
0305     iRegistry.watchPreModuleGlobalEndLumi(this, &StallMonitor::preModuleGlobalTransition);
0306     iRegistry.watchPostModuleGlobalEndLumi(this, &StallMonitor::postModuleGlobalTransition);
0307     iRegistry.watchPreModuleWriteLumi(this, &StallMonitor::preModuleGlobalTransition);
0308     iRegistry.watchPostModuleWriteLumi(this, &StallMonitor::postModuleGlobalTransition);
0309 
0310     iRegistry.postESModuleRegistrationSignal_.connect([this](auto const& iDescription) {
0311       if (esModuleLabels_.size() <= iDescription.id_) {
0312         esModuleLabels_.resize(iDescription.id_ + 1);
0313       }
0314       if (not iDescription.label_.empty()) {
0315         esModuleLabels_[iDescription.id_] = iDescription.label_;
0316       } else {
0317         esModuleLabels_[iDescription.id_] = iDescription.type_;
0318       }
0319     });
0320 
0321     iRegistry.preESModuleSignal_.connect([this](auto const&, auto const& context) {
0322       auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0323       auto msg = assembleMessage<step::preESModule>(
0324           numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0325       file_.write(std::move(msg));
0326     });
0327     iRegistry.postESModuleSignal_.connect([this](auto const&, auto const& context) {
0328       auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0329       auto msg = assembleMessage<step::postESModule>(
0330           numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0331       file_.write(std::move(msg));
0332     });
0333 
0334     iRegistry.preallocateSignal_.connect(
0335         [this](service::SystemBounds const& iBounds) { numStreams_ = iBounds.maxNumberOfStreams(); });
0336 
0337     std::ostringstream oss;
0338     oss << "# Transition       Symbol\n";
0339     oss << "#----------------- ------\n";
0340     oss << "# eventSetupCall  " << Phase::eventSetupCall << "\n"
0341         << "# globalBeginRun  " << Phase::globalBeginRun << "\n"
0342         << "# streamBeginRun  " << Phase::streamBeginRun << "\n"
0343         << "# globalBeginLumi " << Phase::globalBeginLumi << "\n"
0344         << "# streamBeginLumi " << Phase::streamBeginLumi << "\n"
0345         << "# Event           " << Phase::Event << "\n"
0346         << "# streamEndLumi   " << Phase::streamEndLumi << "\n"
0347         << "# globalEndLumi   " << Phase::globalEndLumi << "\n"
0348         << "# streamEndRun    " << Phase::streamEndRun << "\n"
0349         << "# globalEndRun    " << Phase::globalEndRun << "\n";
0350     oss << "# Step                       Symbol Entries\n"
0351         << "# -------------------------- ------ ------------------------------------------\n"
0352         << "# preSourceEvent                " << step::preSourceEvent << "   <Stream ID> <Time since beginJob (ms)>\n"
0353         << "# postSourceEvent               " << step::postSourceEvent << "   <Stream ID> <Time since beginJob (ms)>\n"
0354         << "# preEvent                      " << step::preEvent
0355         << "   <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0356         << "# postModuleEventPrefetching    " << step::postModuleEventPrefetching
0357         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0358         << "# preModuleEventAcquire         " << step::preModuleEventAcquire
0359         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0360         << "# postModuleEventAcquire        " << step::postModuleEventAcquire
0361         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0362         << "# preModuleTransition           " << step::preModuleEvent
0363         << "   <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0364         << "# preEventReadFromSource        " << step::preEventReadFromSource
0365         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0366         << "# postEventReadFromSource       " << step::postEventReadFromSource
0367         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0368         << "# postModuleTransition          " << step::postModuleEvent
0369         << "   <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0370         << "# postEvent                     " << step::postEvent
0371         << "   <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0372         << "# postESModulePrefetching       " << step::postESModulePrefetching
0373         << "  <Stream ID> <ESModule ID> <Transition type> <Time since beginJob (ms)>\n"
0374         << "# preESModuleTransition         " << step::preESModule
0375         << "  <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n"
0376         << "# postESModuleTransition        " << step::postESModule
0377         << "  <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n";
0378     file_.write(oss.str());
0379   }
0380 }
0381 
0382 void StallMonitor::fillDescriptions(ConfigurationDescriptions& descriptions) {
0383   ParameterSetDescription desc;
0384   desc.addUntracked<std::string>("fileName", filename_default)
0385       ->setComment(
0386           "Name of file to which detailed timing information should be written.\n"
0387           "An empty filename argument (the default) indicates that no extra\n"
0388           "information will be written to a dedicated file, but only the summary\n"
0389           "including stalling-modules information will be logged.");
0390   desc.addUntracked<double>("stallThreshold", threshold_default)
0391       ->setComment(
0392           "Threshold (in seconds) used to classify modules as stalled.\n"
0393           "Microsecond granularity allowed.");
0394   descriptions.add("StallMonitor", desc);
0395   descriptions.setComment(
0396       "This service keeps track of various times in event-processing to determine which modules are stalling.");
0397 }
0398 
0399 void StallMonitor::preModuleConstruction(ModuleDescription const& md) {
0400   // Module labels are dense, so if the module id is greater than the
0401   // size of moduleLabels_, grow the vector to the correct index and
0402   // assign the last entry to the desired label.  Note that with the
0403   // current implementation, there is no module with ID '0'.  In
0404   // principle, the module-information vectors are therefore each one
0405   // entry too large.  However, since removing the entry at the front
0406   // makes for awkward indexing later on, and since the sizes of these
0407   // extra entries are on the order of bytes, we will leave them in
0408   // and skip over them later when printing out summaries.  The
0409   // extraneous entries can be identified by their module labels being
0410   // empty.
0411   auto const mid = md.id();
0412   if (mid < moduleLabels_.size()) {
0413     moduleLabels_[mid] = md.moduleLabel();
0414   } else {
0415     moduleLabels_.resize(mid + 1);
0416     moduleLabels_.back() = md.moduleLabel();
0417   }
0418 }
0419 
0420 void StallMonitor::preModuleDestruction(ModuleDescription const& md) {
0421   // Reset the module label back if the module is deleted before
0422   // beginJob() so that the entry is ignored in the summary printouts.
0423   moduleLabels_[md.id()] = "";
0424 }
0425 
0426 void StallMonitor::postBeginJob() {
0427   // Since a (push,emplace)_back cannot be called for a vector of a
0428   // type containing atomics (like 'StallStatistics')--i.e. atomics
0429   // have no copy/move-assignment operators, we must specify the size
0430   // of the vector at construction time.
0431   moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
0432   for (std::size_t i{}; i < moduleStats_.size(); ++i) {
0433     moduleStats_[i].setLabel(moduleLabels_[i]);
0434   }
0435 
0436   if (validFile_) {
0437     {
0438       std::size_t const width{std::to_string(moduleLabels_.size()).size()};
0439       OStreamColumn col0{"Module ID", width};
0440       std::string const lastCol{"Module label"};
0441 
0442       std::ostringstream oss;
0443       oss << "\n#  " << col0 << space << lastCol << '\n';
0444       oss << "#  " << std::string(col0.width() + space.size() + lastCol.size(), '-') << '\n';
0445 
0446       for (std::size_t i{}; i < moduleLabels_.size(); ++i) {
0447         auto const& label = moduleLabels_[i];
0448         if (label.empty())
0449           continue;  // See comment in filling of moduleLabels_;
0450         oss << "#M " << std::setw(width) << std::left << col0(i) << space << std::left << moduleLabels_[i] << '\n';
0451       }
0452       oss << '\n';
0453       file_.write(oss.str());
0454     }
0455     {
0456       std::size_t const width{std::to_string(esModuleLabels_.size()).size()};
0457       OStreamColumn col0{"ESModule ID", width};
0458       std::string const lastCol{"ESModule label"};
0459 
0460       std::ostringstream oss;
0461       oss << "\n#  " << col0 << space << lastCol << '\n';
0462       oss << "#  " << std::string(col0.width() + space.size() + lastCol.size(), '-') << '\n';
0463 
0464       for (std::size_t i{}; i < esModuleLabels_.size(); ++i) {
0465         auto const& label = esModuleLabels_[i];
0466         if (label.empty())
0467           continue;  // See comment in filling of moduleLabels_;
0468         oss << "#N " << std::setw(width) << std::left << col0(i) << space << std::left << esModuleLabels_[i] << '\n';
0469       }
0470       oss << '\n';
0471       file_.write(oss.str());
0472     }
0473   }
0474   // Don't need the labels anymore--info. is now part of the
0475   // module-statistics objects.
0476   decltype(moduleLabels_)().swap(moduleLabels_);
0477   decltype(esModuleLabels_)().swap(esModuleLabels_);
0478 
0479   beginTime_ = now();
0480 }
0481 
0482 void StallMonitor::preSourceEvent(StreamID const sid) {
0483   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0484   auto msg = assembleMessage<step::preSourceEvent>(sid.value(), t);
0485   file_.write(std::move(msg));
0486 }
0487 
0488 void StallMonitor::postSourceEvent(StreamID const sid) {
0489   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0490   auto msg = assembleMessage<step::postSourceEvent>(sid.value(), t);
0491   file_.write(std::move(msg));
0492 }
0493 
0494 void StallMonitor::preEvent(StreamContext const& sc) {
0495   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0496   auto const& eid = sc.eventID();
0497   auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0498   file_.write(std::move(msg));
0499 }
0500 
0501 void StallMonitor::postModuleEventPrefetching(StreamContext const& sc, ModuleCallingContext const& mcc) {
0502   auto const sid = stream_id(sc);
0503   auto const mid = module_id(mcc);
0504   auto start = stallStart_[std::make_pair(sid, mid)] = std::make_pair(now(), false);
0505 
0506   if (validFile_) {
0507     auto const t = duration_cast<duration_t>(start.first - beginTime_).count();
0508     auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
0509     file_.write(std::move(msg));
0510   }
0511 }
0512 
0513 void StallMonitor::preModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0514   auto const preModEventAcquire = now();
0515   auto const sid = stream_id(sc);
0516   auto const mid = module_id(mcc);
0517   auto& start = stallStart_[std::make_pair(sid, mid)];
0518   auto startT = start.first.time_since_epoch();
0519   start.second = true;  // record so the preModuleEvent knows that acquire was called
0520   if (validFile_) {
0521     auto t = duration_cast<duration_t>(preModEventAcquire - beginTime_).count();
0522     auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid, t);
0523     file_.write(std::move(msg));
0524   }
0525   // Check for stalls if prefetch was called
0526   if (duration_t::duration::zero() != startT) {
0527     auto const preFetch_to_preModEventAcquire = duration_cast<duration_t>(preModEventAcquire - start.first);
0528     if (preFetch_to_preModEventAcquire < stallThreshold_)
0529       return;
0530     moduleStats_[mid].update(preFetch_to_preModEventAcquire);
0531   }
0532 }
0533 
0534 void StallMonitor::postModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0535   auto const postModEventAcquire = duration_cast<duration_t>(now() - beginTime_).count();
0536   auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
0537   file_.write(std::move(msg));
0538 }
0539 
0540 void StallMonitor::preModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0541   auto const preModEvent = now();
0542   auto const sid = stream_id(sc);
0543   auto const mid = module_id(mcc);
0544   auto const& start = stallStart_[std::make_pair(sid, mid)];
0545   auto startT = start.first.time_since_epoch();
0546   if (validFile_) {
0547     auto t = duration_cast<duration_t>(preModEvent - beginTime_).count();
0548     auto msg =
0549         assembleMessage<step::preModuleEvent>(sid, mid, static_cast<std::underlying_type_t<Phase>>(Phase::Event), t);
0550     file_.write(std::move(msg));
0551   }
0552   // Check for stalls if prefetch was called and we did not already check before acquire
0553   if (duration_t::duration::zero() != startT && !start.second) {
0554     auto const preFetch_to_preModEvent = duration_cast<duration_t>(preModEvent - start.first);
0555     if (preFetch_to_preModEvent < stallThreshold_)
0556       return;
0557     moduleStats_[mid].update(preFetch_to_preModEvent);
0558   }
0559 }
0560 
0561 void StallMonitor::preModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0562   auto const tNow = now();
0563   auto const sid = stream_id(sc);
0564   auto const mid = module_id(mcc);
0565   auto t = duration_cast<duration_t>(tNow - beginTime_).count();
0566   auto msg = assembleMessage<step::preModuleEvent>(sid, mid, toTransition(sc), t);
0567   file_.write(std::move(msg));
0568 }
0569 
0570 void StallMonitor::postModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0571   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0572   auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), toTransition(sc), t);
0573   file_.write(std::move(msg));
0574 }
0575 
0576 void StallMonitor::preModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0577   auto t = duration_cast<duration_t>(now() - beginTime_).count();
0578   auto msg = assembleMessage<step::preModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), t);
0579   file_.write(std::move(msg));
0580 }
0581 
0582 void StallMonitor::postModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0583   auto const postModTime = duration_cast<duration_t>(now() - beginTime_).count();
0584   auto msg = assembleMessage<step::postModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), postModTime);
0585   file_.write(std::move(msg));
0586 }
0587 
0588 void StallMonitor::preEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0589   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0590   auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0591   file_.write(std::move(msg));
0592 }
0593 
0594 void StallMonitor::postEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0595   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0596   auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0597   file_.write(std::move(msg));
0598 }
0599 
0600 void StallMonitor::postModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0601   auto const postModEvent = duration_cast<duration_t>(now() - beginTime_).count();
0602   auto msg = assembleMessage<step::postModuleEvent>(
0603       stream_id(sc), module_id(mcc), static_cast<std::underlying_type_t<Phase>>(Phase::Event), postModEvent);
0604   file_.write(std::move(msg));
0605 }
0606 
0607 void StallMonitor::postEvent(StreamContext const& sc) {
0608   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0609   auto const& eid = sc.eventID();
0610   auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0611   file_.write(std::move(msg));
0612 }
0613 
0614 void StallMonitor::postEndJob() {
0615   // Prepare summary
0616   std::size_t width{};
0617   edm::for_all(moduleStats_, [&width](auto const& stats) {
0618     if (stats.numberOfStalls() == 0u)
0619       return;
0620     width = std::max(width, stats.label().size());
0621   });
0622 
0623   OStreamColumn tag{"StallMonitor>"};
0624   OStreamColumn col1{"Module label", width};
0625   OStreamColumn col2{"# of stalls"};
0626   OStreamColumn col3{"Total stalled time"};
0627   OStreamColumn col4{"Max stalled time"};
0628 
0629   LogAbsolute out{"StallMonitor"};
0630   out << '\n';
0631   out << tag << space << col1 << space << col2 << space << col3 << space << col4 << '\n';
0632 
0633   out << tag << space << std::setfill('-') << col1(std::string{}) << space << col2(std::string{}) << space
0634       << col3(std::string{}) << space << col4(std::string{}) << '\n';
0635 
0636   using seconds_d = duration<double>;
0637 
0638   auto to_seconds_str = [](auto const& duration) {
0639     std::ostringstream oss;
0640     auto const time = duration_cast<seconds_d>(duration).count();
0641     oss << time << " s";
0642     return oss.str();
0643   };
0644 
0645   out << std::setfill(' ');
0646   for (auto const& stats : moduleStats_) {
0647     if (stats.label().empty() ||  // See comment in filling of moduleLabels_;
0648         stats.numberOfStalls() == 0u)
0649       continue;
0650     out << std::left << tag << space << col1(stats.label()) << space << std::right << col2(stats.numberOfStalls())
0651         << space << col3(to_seconds_str(stats.totalStalledTime())) << space
0652         << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
0653   }
0654 }
0655 
0656 DEFINE_FWK_SERVICE(StallMonitor);