Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-06 22:47:55

0001 // -*- C++ -*-
0002 //
0003 // Package: FWCore/Services
0004 // Class  : StallMonitor
0005 //
0006 // Implementation:
0007 //
0008 // Original Author:  Kyle Knoepfel
0009 //
0010 
0011 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0012 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0013 #include "FWCore/Framework/interface/ComponentDescription.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0019 #include "FWCore/ServiceRegistry/interface/Service.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0021 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0022 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0023 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0024 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0025 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0026 #include "FWCore/Utilities/interface/Algorithms.h"
0027 #include "FWCore/Utilities/interface/OStreamColumn.h"
0028 #include "FWCore/Utilities/interface/Exception.h"
0029 #include "FWCore/Utilities/interface/StdPairHasher.h"
0030 
0031 #include "monitor_file_utilities.h"
0032 
0033 #include "oneapi/tbb/concurrent_unordered_map.h"
0034 #include <atomic>
0035 #include <chrono>
0036 #include <iomanip>
0037 #include <iostream>
0038 #include <sstream>
0039 
0040 using namespace edm::service::monitor_file_utilities;
0041 
0042 namespace {
0043 
0044   using duration_t = std::chrono::microseconds;
0045   using steady_clock = std::chrono::steady_clock;
0046 
0047   //===============================================================
0048   class StallStatistics {
0049   public:
0050     // c'tor receiving 'std::string const&' type not provided since we
0051     // must be able to call (e.g.) std::vector<StallStatistics>(20),
0052     // for which a default label is not sensible in this context.
0053     StallStatistics() = default;
0054 
0055     std::string const& label() const { return label_; }
0056     unsigned numberOfStalls() const { return stallCounter_; }
0057 
0058     using rep_t = duration_t::rep;
0059 
0060     duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
0061     duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
0062 
0063     // Modifiers
0064     void setLabel(std::string const& label) { label_ = label; }
0065 
0066     void update(duration_t const ms) {
0067       ++stallCounter_;
0068       auto const thisTime = ms.count();
0069       totalTime_ += thisTime;
0070       rep_t max{maxTime_};
0071       while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime))
0072         ;
0073     }
0074 
0075   private:
0076     std::string label_{};
0077     std::atomic<unsigned> stallCounter_{};
0078     std::atomic<rep_t> totalTime_{};
0079     std::atomic<rep_t> maxTime_{};
0080   };
0081 
0082   enum class step : char {
0083     preSourceEvent = 'S',
0084     postSourceEvent = 's',
0085     preEvent = 'E',
0086     postModuleEventPrefetching = 'p',
0087     preModuleEventAcquire = 'A',
0088     postModuleEventAcquire = 'a',
0089     preModuleEvent = 'M',
0090     preEventReadFromSource = 'R',
0091     postEventReadFromSource = 'r',
0092     postModuleEvent = 'm',
0093     postEvent = 'e',
0094     postESModulePrefetching = 'q',
0095     preESModule = 'N',
0096     postESModule = 'n',
0097     preFrameworkTransition = 'F',
0098     postFrameworkTransition = 'f'
0099   };
0100 
0101   enum class Phase : short {
0102     globalEndRun = -4,
0103     streamEndRun = -3,
0104     globalEndLumi = -2,
0105     streamEndLumi = -1,
0106     Event = 0,
0107     streamBeginLumi = 1,
0108     globalBeginLumi = 2,
0109     streamBeginRun = 3,
0110     globalBeginRun = 4,
0111     eventSetupCall = 5
0112   };
0113 
0114   std::ostream& operator<<(std::ostream& os, step const s) {
0115     os << static_cast<std::underlying_type_t<step>>(s);
0116     return os;
0117   }
0118 
0119   std::ostream& operator<<(std::ostream& os, Phase const s) {
0120     os << static_cast<std::underlying_type_t<Phase>>(s);
0121     return os;
0122   }
0123 
0124   template <step S, typename... ARGS>
0125   std::string assembleMessage(ARGS const... args) {
0126     std::ostringstream oss;
0127     oss << S;
0128     concatenate(oss, args...);
0129     oss << '\n';
0130     return oss.str();
0131   }
0132 
0133   Phase toTransitionImpl(edm::StreamContext const& iContext) {
0134     using namespace edm;
0135     switch (iContext.transition()) {
0136       case StreamContext::Transition::kBeginRun:
0137         return Phase::streamBeginRun;
0138       case StreamContext::Transition::kBeginLuminosityBlock:
0139         return Phase::streamBeginLumi;
0140       case StreamContext::Transition::kEvent:
0141         return Phase::Event;
0142       case StreamContext::Transition::kEndLuminosityBlock:
0143         return Phase::streamEndLumi;
0144       case StreamContext::Transition::kEndRun:
0145         return Phase::streamEndRun;
0146       default:
0147         break;
0148     }
0149     assert(false);
0150     return Phase::Event;
0151   }
0152 
0153   auto toTransition(edm::StreamContext const& iContext) -> std::underlying_type_t<Phase> {
0154     return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0155   }
0156 
0157   Phase toTransitionImpl(edm::GlobalContext const& iContext) {
0158     using namespace edm;
0159     switch (iContext.transition()) {
0160       case GlobalContext::Transition::kBeginRun:
0161         return Phase::globalBeginRun;
0162       case GlobalContext::Transition::kBeginLuminosityBlock:
0163         return Phase::globalBeginLumi;
0164       case GlobalContext::Transition::kEndLuminosityBlock:
0165         return Phase::globalEndLumi;
0166       case GlobalContext::Transition::kWriteLuminosityBlock:
0167         return Phase::globalEndLumi;
0168       case GlobalContext::Transition::kEndRun:
0169         return Phase::globalEndRun;
0170       case GlobalContext::Transition::kWriteRun:
0171         return Phase::globalEndRun;
0172       default:
0173         break;
0174     }
0175     assert(false);
0176     return Phase::Event;
0177   }
0178   auto toTransition(edm::GlobalContext const& iContext) -> std::underlying_type_t<Phase> {
0179     return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0180   }
0181 
0182 }  // namespace
0183 
0184 namespace edm {
0185   namespace service {
0186 
0187     class StallMonitor {
0188     public:
0189       StallMonitor(ParameterSet const&, ActivityRegistry&);
0190       static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0191 
0192     private:
0193       void preModuleConstruction(edm::ModuleDescription const&);
0194       void preModuleDestruction(edm::ModuleDescription const&);
0195       void postBeginJob();
0196       void preSourceEvent(StreamID);
0197       void postSourceEvent(StreamID);
0198       void preEvent(StreamContext const&);
0199       void preModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0200       void postModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0201       void preModuleEvent(StreamContext const&, ModuleCallingContext const&);
0202       void postModuleEventPrefetching(StreamContext const&, ModuleCallingContext const&);
0203       void preEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0204       void postEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0205       void postModuleEvent(StreamContext const&, ModuleCallingContext const&);
0206       void postEvent(StreamContext const&);
0207       void preModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0208       void postModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0209       void preModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0210       void postModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0211       void postEndJob();
0212 
0213       ThreadSafeOutputFileStream file_;
0214       bool const validFile_;  // Separate data member from file to improve efficiency.
0215       duration_t const stallThreshold_;
0216       decltype(steady_clock::now()) beginTime_{};
0217 
0218       // There can be multiple modules per stream.  Therefore, we need
0219       // the combination of StreamID and ModuleID to correctly track
0220       // stalling information.  We use oneapi::tbb::concurrent_unordered_map
0221       // for this purpose.
0222       using StreamID_value = decltype(std::declval<StreamID>().value());
0223       using ModuleID = decltype(std::declval<ModuleDescription>().id());
0224       oneapi::tbb::concurrent_unordered_map<std::pair<StreamID_value, ModuleID>,
0225                                             std::pair<decltype(beginTime_), bool>,
0226                                             edm::StdPairHasher>
0227           stallStart_{};
0228 
0229       std::vector<std::string> moduleLabels_{};
0230       std::vector<StallStatistics> moduleStats_{};
0231       std::vector<std::string> esModuleLabels_{};
0232       unsigned int numStreams_;
0233     };
0234 
0235   }  // namespace service
0236 
0237 }  // namespace edm
0238 
0239 namespace {
0240   constexpr char const* const filename_default{""};
0241   constexpr double threshold_default{0.1};  //default threashold in seconds
0242   std::string const space{"  "};
0243 }  // namespace
0244 
0245 using edm::service::StallMonitor;
0246 using namespace std::chrono;
0247 
0248 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
0249     : file_{iPS.getUntrackedParameter<std::string>("fileName")},
0250       validFile_{file_},
0251       stallThreshold_{
0252           std::chrono::round<duration_t>(duration<double>(iPS.getUntrackedParameter<double>("stallThreshold")))} {
0253   iRegistry.watchPreModuleConstruction(this, &StallMonitor::preModuleConstruction);
0254   iRegistry.watchPreModuleDestruction(this, &StallMonitor::preModuleDestruction);
0255   iRegistry.watchPostBeginJob(this, &StallMonitor::postBeginJob);
0256   iRegistry.watchPostModuleEventPrefetching(this, &StallMonitor::postModuleEventPrefetching);
0257   iRegistry.watchPreModuleEventAcquire(this, &StallMonitor::preModuleEventAcquire);
0258   iRegistry.watchPreModuleEvent(this, &StallMonitor::preModuleEvent);
0259   iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
0260 
0261   if (validFile_) {
0262     // Only enable the following callbacks if writing to a file.
0263     iRegistry.watchPreSourceEvent(this, &StallMonitor::preSourceEvent);
0264     iRegistry.watchPostSourceEvent(this, &StallMonitor::postSourceEvent);
0265     iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
0266     iRegistry.watchPostModuleEventAcquire(this, &StallMonitor::postModuleEventAcquire);
0267     iRegistry.watchPreEventReadFromSource(this, &StallMonitor::preEventReadFromSource);
0268     iRegistry.watchPostEventReadFromSource(this, &StallMonitor::postEventReadFromSource);
0269     iRegistry.watchPostModuleEvent(this, &StallMonitor::postModuleEvent);
0270     iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
0271 
0272     iRegistry.watchPreModuleStreamBeginRun(this, &StallMonitor::preModuleStreamTransition);
0273     iRegistry.watchPostModuleStreamBeginRun(this, &StallMonitor::postModuleStreamTransition);
0274     iRegistry.watchPreModuleStreamEndRun(this, &StallMonitor::preModuleStreamTransition);
0275     iRegistry.watchPostModuleStreamEndRun(this, &StallMonitor::postModuleStreamTransition);
0276 
0277     iRegistry.watchPreModuleStreamBeginLumi(this, &StallMonitor::preModuleStreamTransition);
0278     iRegistry.watchPostModuleStreamBeginLumi(this, &StallMonitor::postModuleStreamTransition);
0279     iRegistry.watchPreModuleStreamEndLumi(this, &StallMonitor::preModuleStreamTransition);
0280     iRegistry.watchPostModuleStreamEndLumi(this, &StallMonitor::postModuleStreamTransition);
0281 
0282     iRegistry.watchPreModuleGlobalBeginRun(this, &StallMonitor::preModuleGlobalTransition);
0283     iRegistry.watchPostModuleGlobalBeginRun(this, &StallMonitor::postModuleGlobalTransition);
0284     iRegistry.watchPreModuleGlobalEndRun(this, &StallMonitor::preModuleGlobalTransition);
0285     iRegistry.watchPostModuleGlobalEndRun(this, &StallMonitor::postModuleGlobalTransition);
0286     iRegistry.watchPreModuleWriteRun(this, &StallMonitor::preModuleGlobalTransition);
0287     iRegistry.watchPostModuleWriteRun(this, &StallMonitor::postModuleGlobalTransition);
0288 
0289     iRegistry.watchPreModuleGlobalBeginLumi(this, &StallMonitor::preModuleGlobalTransition);
0290     iRegistry.watchPostModuleGlobalBeginLumi(this, &StallMonitor::postModuleGlobalTransition);
0291     iRegistry.watchPreModuleGlobalEndLumi(this, &StallMonitor::preModuleGlobalTransition);
0292     iRegistry.watchPostModuleGlobalEndLumi(this, &StallMonitor::postModuleGlobalTransition);
0293     iRegistry.watchPreModuleWriteLumi(this, &StallMonitor::preModuleGlobalTransition);
0294     iRegistry.watchPostModuleWriteLumi(this, &StallMonitor::postModuleGlobalTransition);
0295 
0296     iRegistry.postESModuleRegistrationSignal_.connect([this](auto const& iDescription) {
0297       if (esModuleLabels_.size() <= iDescription.id_) {
0298         esModuleLabels_.resize(iDescription.id_ + 1);
0299       }
0300       if (not iDescription.label_.empty()) {
0301         esModuleLabels_[iDescription.id_] = iDescription.label_;
0302       } else {
0303         esModuleLabels_[iDescription.id_] = iDescription.type_;
0304       }
0305     });
0306 
0307     iRegistry.preESModuleSignal_.connect([this](auto const&, auto const& context) {
0308       auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0309       auto msg = assembleMessage<step::preESModule>(
0310           numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0311       file_.write(std::move(msg));
0312     });
0313     iRegistry.postESModuleSignal_.connect([this](auto const&, auto const& context) {
0314       auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0315       auto msg = assembleMessage<step::postESModule>(
0316           numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0317       file_.write(std::move(msg));
0318     });
0319 
0320     iRegistry.preallocateSignal_.connect(
0321         [this](service::SystemBounds const& iBounds) { numStreams_ = iBounds.maxNumberOfStreams(); });
0322 
0323     bool recordFrameworkTransitions = iPS.getUntrackedParameter<bool>("recordFrameworkTransitions");
0324     if (recordFrameworkTransitions) {
0325       {
0326         auto preGlobal = [this](GlobalContext const& gc) {
0327           auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0328           auto msg = assembleMessage<step::preFrameworkTransition>(
0329               numStreams_, gc.luminosityBlockID().run(), gc.luminosityBlockID().luminosityBlock(), toTransition(gc), t);
0330           file_.write(std::move(msg));
0331         };
0332         iRegistry.watchPreGlobalBeginRun(preGlobal);
0333         iRegistry.watchPreGlobalBeginLumi(preGlobal);
0334         iRegistry.watchPreGlobalEndLumi(preGlobal);
0335         iRegistry.watchPreGlobalEndRun(preGlobal);
0336       }
0337       {
0338         auto postGlobal = [this](GlobalContext const& gc) {
0339           auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0340           auto msg = assembleMessage<step::postFrameworkTransition>(
0341               numStreams_, gc.luminosityBlockID().run(), gc.luminosityBlockID().luminosityBlock(), toTransition(gc), t);
0342           file_.write(std::move(msg));
0343         };
0344         iRegistry.watchPostGlobalBeginRun(postGlobal);
0345         iRegistry.watchPostGlobalBeginLumi(postGlobal);
0346         iRegistry.watchPostGlobalEndLumi(postGlobal);
0347         iRegistry.watchPostGlobalEndRun(postGlobal);
0348       }
0349       {
0350         auto preStream = [this](StreamContext const& sc) {
0351           auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0352           auto msg = assembleMessage<step::preFrameworkTransition>(
0353               stream_id(sc), sc.eventID().run(), sc.eventID().luminosityBlock(), toTransition(sc), t);
0354           file_.write(std::move(msg));
0355         };
0356         iRegistry.watchPreStreamBeginRun(preStream);
0357         iRegistry.watchPreStreamBeginLumi(preStream);
0358         iRegistry.watchPreStreamEndLumi(preStream);
0359         iRegistry.watchPreStreamEndRun(preStream);
0360       }
0361       {
0362         auto postStream = [this](StreamContext const& sc) {
0363           auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0364           auto msg = assembleMessage<step::postFrameworkTransition>(
0365               stream_id(sc), sc.eventID().run(), sc.eventID().luminosityBlock(), toTransition(sc), t);
0366           file_.write(std::move(msg));
0367         };
0368         iRegistry.watchPostStreamBeginRun(postStream);
0369         iRegistry.watchPostStreamBeginLumi(postStream);
0370         iRegistry.watchPostStreamEndLumi(postStream);
0371         iRegistry.watchPostStreamEndRun(postStream);
0372       }
0373     }
0374 
0375     std::ostringstream oss;
0376     oss << "# Transition       Symbol\n";
0377     oss << "#----------------- ------\n";
0378     oss << "# eventSetupCall  " << Phase::eventSetupCall << "\n"
0379         << "# globalBeginRun  " << Phase::globalBeginRun << "\n"
0380         << "# streamBeginRun  " << Phase::streamBeginRun << "\n"
0381         << "# globalBeginLumi " << Phase::globalBeginLumi << "\n"
0382         << "# streamBeginLumi " << Phase::streamBeginLumi << "\n"
0383         << "# Event           " << Phase::Event << "\n"
0384         << "# streamEndLumi   " << Phase::streamEndLumi << "\n"
0385         << "# globalEndLumi   " << Phase::globalEndLumi << "\n"
0386         << "# streamEndRun    " << Phase::streamEndRun << "\n"
0387         << "# globalEndRun    " << Phase::globalEndRun << "\n";
0388     oss << "# Step                       Symbol Entries\n"
0389         << "# -------------------------- ------ ------------------------------------------\n"
0390         << "# preSourceEvent                " << step::preSourceEvent << "   <Stream ID> <Time since beginJob (ms)>\n"
0391         << "# postSourceEvent               " << step::postSourceEvent << "   <Stream ID> <Time since beginJob (ms)>\n"
0392         << "# preEvent                      " << step::preEvent
0393         << "   <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0394         << "# postModuleEventPrefetching    " << step::postModuleEventPrefetching
0395         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0396         << "# preModuleEventAcquire         " << step::preModuleEventAcquire
0397         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0398         << "# postModuleEventAcquire        " << step::postModuleEventAcquire
0399         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0400         << "# preModuleTransition           " << step::preModuleEvent
0401         << "   <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0402         << "# preEventReadFromSource        " << step::preEventReadFromSource
0403         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0404         << "# postEventReadFromSource       " << step::postEventReadFromSource
0405         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0406         << "# postModuleTransition          " << step::postModuleEvent
0407         << "   <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0408         << "# postEvent                     " << step::postEvent
0409         << "   <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0410         << "# postESModulePrefetching       " << step::postESModulePrefetching
0411         << "  <Stream ID> <ESModule ID> <Transition type> <Time since beginJob (ms)>\n"
0412         << "# preESModuleTransition         " << step::preESModule
0413         << "  <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n"
0414         << "# postESModuleTransition        " << step::postESModule
0415         << "  <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n";
0416     if (recordFrameworkTransitions) {
0417       oss << "# preFrameworkTransition        " << step::preFrameworkTransition
0418           << " <Stream ID> <Run#> <LumiBlock#> <Transition type> <Time since beginJob (ms)>\n"
0419           << "# postFrameworkTransition       " << step::postFrameworkTransition
0420           << " <Stream ID> <Run#> <LumiBlock#> <Transition type> <Time since beginJob (ms)>\n";
0421     }
0422     file_.write(oss.str());
0423   }
0424 }
0425 
0426 void StallMonitor::fillDescriptions(ConfigurationDescriptions& descriptions) {
0427   ParameterSetDescription desc;
0428   desc.addUntracked<std::string>("fileName", filename_default)
0429       ->setComment(
0430           "Name of file to which detailed timing information should be written.\n"
0431           "An empty filename argument (the default) indicates that no extra\n"
0432           "information will be written to a dedicated file, but only the summary\n"
0433           "including stalling-modules information will be logged.");
0434   desc.addUntracked<double>("stallThreshold", threshold_default)
0435       ->setComment(
0436           "Threshold (in seconds) used to classify modules as stalled.\n"
0437           "Microsecond granularity allowed.");
0438   desc.addUntracked<bool>("recordFrameworkTransitions", false)
0439       ->setComment(
0440           "When writing a file, include the framework state transitions:\n"
0441           " stream and global, begin and end, Run and LuminosityBlock.");
0442   descriptions.add("StallMonitor", desc);
0443   descriptions.setComment(
0444       "This service keeps track of various times in event-processing to determine which modules are stalling.");
0445 }
0446 
0447 void StallMonitor::preModuleConstruction(ModuleDescription const& md) {
0448   // Module labels are dense, so if the module id is greater than the
0449   // size of moduleLabels_, grow the vector to the correct index and
0450   // assign the last entry to the desired label.  Note that with the
0451   // current implementation, there is no module with ID '0'.  In
0452   // principle, the module-information vectors are therefore each one
0453   // entry too large.  However, since removing the entry at the front
0454   // makes for awkward indexing later on, and since the sizes of these
0455   // extra entries are on the order of bytes, we will leave them in
0456   // and skip over them later when printing out summaries.  The
0457   // extraneous entries can be identified by their module labels being
0458   // empty.
0459   auto const mid = md.id();
0460   if (mid < moduleLabels_.size()) {
0461     moduleLabels_[mid] = md.moduleLabel();
0462   } else {
0463     moduleLabels_.resize(mid + 1);
0464     moduleLabels_.back() = md.moduleLabel();
0465   }
0466 }
0467 
0468 void StallMonitor::preModuleDestruction(ModuleDescription const& md) {
0469   // Reset the module label back if the module is deleted before
0470   // beginJob() so that the entry is ignored in the summary printouts.
0471   moduleLabels_[md.id()] = "";
0472 }
0473 
0474 void StallMonitor::postBeginJob() {
0475   // Since a (push,emplace)_back cannot be called for a vector of a
0476   // type containing atomics (like 'StallStatistics')--i.e. atomics
0477   // have no copy/move-assignment operators, we must specify the size
0478   // of the vector at construction time.
0479   moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
0480   for (std::size_t i{}; i < moduleStats_.size(); ++i) {
0481     moduleStats_[i].setLabel(moduleLabels_[i]);
0482   }
0483 
0484   if (validFile_) {
0485     {
0486       std::ostringstream oss;
0487       moduleIdToLabel(oss, moduleLabels_, 'M', "Module ID", "Module label");
0488       file_.write(oss.str());
0489     }
0490     {
0491       std::ostringstream oss;
0492       moduleIdToLabel(oss, esModuleLabels_, 'N', "ESModule ID", "ESModule label");
0493       file_.write(oss.str());
0494     }
0495   }
0496   // Don't need the labels anymore--info. is now part of the
0497   // module-statistics objects.
0498   decltype(moduleLabels_)().swap(moduleLabels_);
0499   decltype(esModuleLabels_)().swap(esModuleLabels_);
0500 
0501   beginTime_ = steady_clock::now();
0502 }
0503 
0504 void StallMonitor::preSourceEvent(StreamID const sid) {
0505   auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0506   auto msg = assembleMessage<step::preSourceEvent>(sid.value(), t);
0507   file_.write(std::move(msg));
0508 }
0509 
0510 void StallMonitor::postSourceEvent(StreamID const sid) {
0511   auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0512   auto msg = assembleMessage<step::postSourceEvent>(sid.value(), t);
0513   file_.write(std::move(msg));
0514 }
0515 
0516 void StallMonitor::preEvent(StreamContext const& sc) {
0517   auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0518   auto const& eid = sc.eventID();
0519   auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0520   file_.write(std::move(msg));
0521 }
0522 
0523 void StallMonitor::postModuleEventPrefetching(StreamContext const& sc, ModuleCallingContext const& mcc) {
0524   auto const sid = stream_id(sc);
0525   auto const mid = module_id(mcc);
0526   auto start = stallStart_[std::make_pair(sid, mid)] = std::make_pair(steady_clock::now(), false);
0527 
0528   if (validFile_) {
0529     auto const t = duration_cast<duration_t>(start.first - beginTime_).count();
0530     auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
0531     file_.write(std::move(msg));
0532   }
0533 }
0534 
0535 void StallMonitor::preModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0536   auto const preModEventAcquire = steady_clock::now();
0537   auto const sid = stream_id(sc);
0538   auto const mid = module_id(mcc);
0539   auto& start = stallStart_[std::make_pair(sid, mid)];
0540   auto startT = start.first.time_since_epoch();
0541   start.second = true;  // record so the preModuleEvent knows that acquire was called
0542   if (validFile_) {
0543     auto t = duration_cast<duration_t>(preModEventAcquire - beginTime_).count();
0544     auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid, t);
0545     file_.write(std::move(msg));
0546   }
0547   // Check for stalls if prefetch was called
0548   if (duration_t::duration::zero() != startT) {
0549     auto const preFetch_to_preModEventAcquire = duration_cast<duration_t>(preModEventAcquire - start.first);
0550     if (preFetch_to_preModEventAcquire < stallThreshold_)
0551       return;
0552     moduleStats_[mid].update(preFetch_to_preModEventAcquire);
0553   }
0554 }
0555 
0556 void StallMonitor::postModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0557   auto const postModEventAcquire = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0558   auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
0559   file_.write(std::move(msg));
0560 }
0561 
0562 void StallMonitor::preModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0563   auto const preModEvent = steady_clock::now();
0564   auto const sid = stream_id(sc);
0565   auto const mid = module_id(mcc);
0566   auto const& start = stallStart_[std::make_pair(sid, mid)];
0567   auto startT = start.first.time_since_epoch();
0568   if (validFile_) {
0569     auto t = duration_cast<duration_t>(preModEvent - beginTime_).count();
0570     auto msg =
0571         assembleMessage<step::preModuleEvent>(sid, mid, static_cast<std::underlying_type_t<Phase>>(Phase::Event), t);
0572     file_.write(std::move(msg));
0573   }
0574   // Check for stalls if prefetch was called and we did not already check before acquire
0575   if (duration_t::duration::zero() != startT && !start.second) {
0576     auto const preFetch_to_preModEvent = duration_cast<duration_t>(preModEvent - start.first);
0577     if (preFetch_to_preModEvent < stallThreshold_)
0578       return;
0579     moduleStats_[mid].update(preFetch_to_preModEvent);
0580   }
0581 }
0582 
0583 void StallMonitor::preModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0584   auto const tNow = steady_clock::now();
0585   auto const sid = stream_id(sc);
0586   auto const mid = module_id(mcc);
0587   auto t = duration_cast<duration_t>(tNow - beginTime_).count();
0588   auto msg = assembleMessage<step::preModuleEvent>(sid, mid, toTransition(sc), t);
0589   file_.write(std::move(msg));
0590 }
0591 
0592 void StallMonitor::postModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0593   auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0594   auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), toTransition(sc), t);
0595   file_.write(std::move(msg));
0596 }
0597 
0598 void StallMonitor::preModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0599   auto t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0600   auto msg = assembleMessage<step::preModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), t);
0601   file_.write(std::move(msg));
0602 }
0603 
0604 void StallMonitor::postModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0605   auto const postModTime = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0606   auto msg = assembleMessage<step::postModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), postModTime);
0607   file_.write(std::move(msg));
0608 }
0609 
0610 void StallMonitor::preEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0611   auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0612   auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0613   file_.write(std::move(msg));
0614 }
0615 
0616 void StallMonitor::postEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0617   auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0618   auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0619   file_.write(std::move(msg));
0620 }
0621 
0622 void StallMonitor::postModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0623   auto const postModEvent = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0624   auto msg = assembleMessage<step::postModuleEvent>(
0625       stream_id(sc), module_id(mcc), static_cast<std::underlying_type_t<Phase>>(Phase::Event), postModEvent);
0626   file_.write(std::move(msg));
0627 }
0628 
0629 void StallMonitor::postEvent(StreamContext const& sc) {
0630   auto const t = duration_cast<duration_t>(steady_clock::now() - beginTime_).count();
0631   auto const& eid = sc.eventID();
0632   auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0633   file_.write(std::move(msg));
0634 }
0635 
0636 void StallMonitor::postEndJob() {
0637   // Prepare summary
0638   std::size_t width{};
0639   edm::for_all(moduleStats_, [&width](auto const& stats) {
0640     if (stats.numberOfStalls() == 0u)
0641       return;
0642     width = std::max(width, stats.label().size());
0643   });
0644 
0645   OStreamColumn tag{"StallMonitor>"};
0646   OStreamColumn col1{"Module label", width};
0647   OStreamColumn col2{"# of stalls"};
0648   OStreamColumn col3{"Total stalled time"};
0649   OStreamColumn col4{"Max stalled time"};
0650 
0651   LogAbsolute out{"StallMonitor"};
0652   out << '\n';
0653   out << tag << space << col1 << space << col2 << space << col3 << space << col4 << '\n';
0654 
0655   out << tag << space << std::setfill('-') << col1(std::string{}) << space << col2(std::string{}) << space
0656       << col3(std::string{}) << space << col4(std::string{}) << '\n';
0657 
0658   using seconds_d = duration<double>;
0659 
0660   auto to_seconds_str = [](auto const& duration) {
0661     std::ostringstream oss;
0662     auto const time = duration_cast<seconds_d>(duration).count();
0663     oss << time << " s";
0664     return oss.str();
0665   };
0666 
0667   out << std::setfill(' ');
0668   for (auto const& stats : moduleStats_) {
0669     if (stats.label().empty() ||  // See comment in filling of moduleLabels_;
0670         stats.numberOfStalls() == 0u)
0671       continue;
0672     out << std::left << tag << space << col1(stats.label()) << space << std::right << col2(stats.numberOfStalls())
0673         << space << col3(to_seconds_str(stats.totalStalledTime())) << space
0674         << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
0675   }
0676 }
0677 
0678 DEFINE_FWK_SERVICE(StallMonitor);