Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:05

0001 // -*- C++ -*-
0002 //
0003 // Package: FWCore/Services
0004 // Class  : StallMonitor
0005 //
0006 // Implementation:
0007 //
0008 // Original Author:  Kyle Knoepfel
0009 //
0010 
0011 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0012 #include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h"
0013 #include "FWCore/Framework/interface/ComponentDescription.h"
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0019 #include "FWCore/ServiceRegistry/interface/Service.h"
0020 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0021 #include "FWCore/ServiceRegistry/interface/StreamContext.h"
0022 #include "FWCore/ServiceRegistry/interface/GlobalContext.h"
0023 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0024 #include "FWCore/ServiceRegistry/interface/ESModuleCallingContext.h"
0025 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0026 #include "FWCore/Utilities/interface/Algorithms.h"
0027 #include "FWCore/Utilities/interface/OStreamColumn.h"
0028 #include "FWCore/Utilities/interface/Exception.h"
0029 #include "FWCore/Utilities/interface/StdPairHasher.h"
0030 
0031 #include "monitor_file_utilities.h"
0032 
0033 #include "oneapi/tbb/concurrent_unordered_map.h"
0034 #include <atomic>
0035 #include <chrono>
0036 #include <iomanip>
0037 #include <iostream>
0038 #include <sstream>
0039 
0040 using namespace edm::service::monitor_file_utilities;
0041 
0042 namespace {
0043 
0044   using duration_t = std::chrono::microseconds;
0045   using clock_t = std::chrono::steady_clock;
0046   auto now = clock_t::now;
0047 
0048   //===============================================================
0049   class StallStatistics {
0050   public:
0051     // c'tor receiving 'std::string const&' type not provided since we
0052     // must be able to call (e.g.) std::vector<StallStatistics>(20),
0053     // for which a default label is not sensible in this context.
0054     StallStatistics() = default;
0055 
0056     std::string const& label() const { return label_; }
0057     unsigned numberOfStalls() const { return stallCounter_; }
0058 
0059     using rep_t = duration_t::rep;
0060 
0061     duration_t totalStalledTime() const { return duration_t{totalTime_.load()}; }
0062     duration_t maxStalledTime() const { return duration_t{maxTime_.load()}; }
0063 
0064     // Modifiers
0065     void setLabel(std::string const& label) { label_ = label; }
0066 
0067     void update(duration_t const ms) {
0068       ++stallCounter_;
0069       auto const thisTime = ms.count();
0070       totalTime_ += thisTime;
0071       rep_t max{maxTime_};
0072       while (thisTime > max && !maxTime_.compare_exchange_strong(max, thisTime))
0073         ;
0074     }
0075 
0076   private:
0077     std::string label_{};
0078     std::atomic<unsigned> stallCounter_{};
0079     std::atomic<rep_t> totalTime_{};
0080     std::atomic<rep_t> maxTime_{};
0081   };
0082 
0083   enum class step : char {
0084     preSourceEvent = 'S',
0085     postSourceEvent = 's',
0086     preEvent = 'E',
0087     postModuleEventPrefetching = 'p',
0088     preModuleEventAcquire = 'A',
0089     postModuleEventAcquire = 'a',
0090     preModuleEvent = 'M',
0091     preEventReadFromSource = 'R',
0092     postEventReadFromSource = 'r',
0093     postModuleEvent = 'm',
0094     postEvent = 'e',
0095     postESModulePrefetching = 'q',
0096     preESModule = 'N',
0097     postESModule = 'n',
0098     preFrameworkTransition = 'F',
0099     postFrameworkTransition = 'f'
0100   };
0101 
0102   enum class Phase : short {
0103     globalEndRun = -4,
0104     streamEndRun = -3,
0105     globalEndLumi = -2,
0106     streamEndLumi = -1,
0107     Event = 0,
0108     streamBeginLumi = 1,
0109     globalBeginLumi = 2,
0110     streamBeginRun = 3,
0111     globalBeginRun = 4,
0112     eventSetupCall = 5
0113   };
0114 
0115   std::ostream& operator<<(std::ostream& os, step const s) {
0116     os << static_cast<std::underlying_type_t<step>>(s);
0117     return os;
0118   }
0119 
0120   std::ostream& operator<<(std::ostream& os, Phase const s) {
0121     os << static_cast<std::underlying_type_t<Phase>>(s);
0122     return os;
0123   }
0124 
0125   template <step S, typename... ARGS>
0126   std::string assembleMessage(ARGS const... args) {
0127     std::ostringstream oss;
0128     oss << S;
0129     concatenate(oss, args...);
0130     oss << '\n';
0131     return oss.str();
0132   }
0133 
0134   Phase toTransitionImpl(edm::StreamContext const& iContext) {
0135     using namespace edm;
0136     switch (iContext.transition()) {
0137       case StreamContext::Transition::kBeginRun:
0138         return Phase::streamBeginRun;
0139       case StreamContext::Transition::kBeginLuminosityBlock:
0140         return Phase::streamBeginLumi;
0141       case StreamContext::Transition::kEvent:
0142         return Phase::Event;
0143       case StreamContext::Transition::kEndLuminosityBlock:
0144         return Phase::streamEndLumi;
0145       case StreamContext::Transition::kEndRun:
0146         return Phase::streamEndRun;
0147       default:
0148         break;
0149     }
0150     assert(false);
0151     return Phase::Event;
0152   }
0153 
0154   auto toTransition(edm::StreamContext const& iContext) -> std::underlying_type_t<Phase> {
0155     return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0156   }
0157 
0158   Phase toTransitionImpl(edm::GlobalContext const& iContext) {
0159     using namespace edm;
0160     switch (iContext.transition()) {
0161       case GlobalContext::Transition::kBeginRun:
0162         return Phase::globalBeginRun;
0163       case GlobalContext::Transition::kBeginLuminosityBlock:
0164         return Phase::globalBeginLumi;
0165       case GlobalContext::Transition::kEndLuminosityBlock:
0166         return Phase::globalEndLumi;
0167       case GlobalContext::Transition::kWriteLuminosityBlock:
0168         return Phase::globalEndLumi;
0169       case GlobalContext::Transition::kEndRun:
0170         return Phase::globalEndRun;
0171       case GlobalContext::Transition::kWriteRun:
0172         return Phase::globalEndRun;
0173       default:
0174         break;
0175     }
0176     assert(false);
0177     return Phase::Event;
0178   }
0179   auto toTransition(edm::GlobalContext const& iContext) -> std::underlying_type_t<Phase> {
0180     return static_cast<std::underlying_type_t<Phase>>(toTransitionImpl(iContext));
0181   }
0182 
0183 }  // namespace
0184 
0185 namespace edm {
0186   namespace service {
0187 
0188     class StallMonitor {
0189     public:
0190       StallMonitor(ParameterSet const&, ActivityRegistry&);
0191       static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0192 
0193     private:
0194       void preModuleConstruction(edm::ModuleDescription const&);
0195       void preModuleDestruction(edm::ModuleDescription const&);
0196       void postBeginJob();
0197       void preSourceEvent(StreamID);
0198       void postSourceEvent(StreamID);
0199       void preEvent(StreamContext const&);
0200       void preModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0201       void postModuleEventAcquire(StreamContext const&, ModuleCallingContext const&);
0202       void preModuleEvent(StreamContext const&, ModuleCallingContext const&);
0203       void postModuleEventPrefetching(StreamContext const&, ModuleCallingContext const&);
0204       void preEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0205       void postEventReadFromSource(StreamContext const&, ModuleCallingContext const&);
0206       void postModuleEvent(StreamContext const&, ModuleCallingContext const&);
0207       void postEvent(StreamContext const&);
0208       void preModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0209       void postModuleStreamTransition(StreamContext const&, ModuleCallingContext const&);
0210       void preModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0211       void postModuleGlobalTransition(GlobalContext const&, ModuleCallingContext const&);
0212       void postEndJob();
0213 
0214       ThreadSafeOutputFileStream file_;
0215       bool const validFile_;  // Separate data member from file to improve efficiency.
0216       duration_t const stallThreshold_;
0217       decltype(now()) beginTime_{};
0218 
0219       // There can be multiple modules per stream.  Therefore, we need
0220       // the combination of StreamID and ModuleID to correctly track
0221       // stalling information.  We use oneapi::tbb::concurrent_unordered_map
0222       // for this purpose.
0223       using StreamID_value = decltype(std::declval<StreamID>().value());
0224       using ModuleID = decltype(std::declval<ModuleDescription>().id());
0225       oneapi::tbb::concurrent_unordered_map<std::pair<StreamID_value, ModuleID>,
0226                                             std::pair<decltype(beginTime_), bool>,
0227                                             edm::StdPairHasher>
0228           stallStart_{};
0229 
0230       std::vector<std::string> moduleLabels_{};
0231       std::vector<StallStatistics> moduleStats_{};
0232       std::vector<std::string> esModuleLabels_{};
0233       unsigned int numStreams_;
0234     };
0235 
0236   }  // namespace service
0237 
0238 }  // namespace edm
0239 
0240 namespace {
0241   constexpr char const* const filename_default{""};
0242   constexpr double threshold_default{0.1};  //default threashold in seconds
0243   std::string const space{"  "};
0244 }  // namespace
0245 
0246 using edm::service::StallMonitor;
0247 using namespace std::chrono;
0248 
0249 StallMonitor::StallMonitor(ParameterSet const& iPS, ActivityRegistry& iRegistry)
0250     : file_{iPS.getUntrackedParameter<std::string>("fileName")},
0251       validFile_{file_},
0252       stallThreshold_{
0253           std::chrono::round<duration_t>(duration<double>(iPS.getUntrackedParameter<double>("stallThreshold")))} {
0254   iRegistry.watchPreModuleConstruction(this, &StallMonitor::preModuleConstruction);
0255   iRegistry.watchPreModuleDestruction(this, &StallMonitor::preModuleDestruction);
0256   iRegistry.watchPostBeginJob(this, &StallMonitor::postBeginJob);
0257   iRegistry.watchPostModuleEventPrefetching(this, &StallMonitor::postModuleEventPrefetching);
0258   iRegistry.watchPreModuleEventAcquire(this, &StallMonitor::preModuleEventAcquire);
0259   iRegistry.watchPreModuleEvent(this, &StallMonitor::preModuleEvent);
0260   iRegistry.watchPostEndJob(this, &StallMonitor::postEndJob);
0261 
0262   if (validFile_) {
0263     // Only enable the following callbacks if writing to a file.
0264     iRegistry.watchPreSourceEvent(this, &StallMonitor::preSourceEvent);
0265     iRegistry.watchPostSourceEvent(this, &StallMonitor::postSourceEvent);
0266     iRegistry.watchPreEvent(this, &StallMonitor::preEvent);
0267     iRegistry.watchPostModuleEventAcquire(this, &StallMonitor::postModuleEventAcquire);
0268     iRegistry.watchPreEventReadFromSource(this, &StallMonitor::preEventReadFromSource);
0269     iRegistry.watchPostEventReadFromSource(this, &StallMonitor::postEventReadFromSource);
0270     iRegistry.watchPostModuleEvent(this, &StallMonitor::postModuleEvent);
0271     iRegistry.watchPostEvent(this, &StallMonitor::postEvent);
0272 
0273     iRegistry.watchPreModuleStreamBeginRun(this, &StallMonitor::preModuleStreamTransition);
0274     iRegistry.watchPostModuleStreamBeginRun(this, &StallMonitor::postModuleStreamTransition);
0275     iRegistry.watchPreModuleStreamEndRun(this, &StallMonitor::preModuleStreamTransition);
0276     iRegistry.watchPostModuleStreamEndRun(this, &StallMonitor::postModuleStreamTransition);
0277 
0278     iRegistry.watchPreModuleStreamBeginLumi(this, &StallMonitor::preModuleStreamTransition);
0279     iRegistry.watchPostModuleStreamBeginLumi(this, &StallMonitor::postModuleStreamTransition);
0280     iRegistry.watchPreModuleStreamEndLumi(this, &StallMonitor::preModuleStreamTransition);
0281     iRegistry.watchPostModuleStreamEndLumi(this, &StallMonitor::postModuleStreamTransition);
0282 
0283     iRegistry.watchPreModuleGlobalBeginRun(this, &StallMonitor::preModuleGlobalTransition);
0284     iRegistry.watchPostModuleGlobalBeginRun(this, &StallMonitor::postModuleGlobalTransition);
0285     iRegistry.watchPreModuleGlobalEndRun(this, &StallMonitor::preModuleGlobalTransition);
0286     iRegistry.watchPostModuleGlobalEndRun(this, &StallMonitor::postModuleGlobalTransition);
0287     iRegistry.watchPreModuleWriteRun(this, &StallMonitor::preModuleGlobalTransition);
0288     iRegistry.watchPostModuleWriteRun(this, &StallMonitor::postModuleGlobalTransition);
0289 
0290     iRegistry.watchPreModuleGlobalBeginLumi(this, &StallMonitor::preModuleGlobalTransition);
0291     iRegistry.watchPostModuleGlobalBeginLumi(this, &StallMonitor::postModuleGlobalTransition);
0292     iRegistry.watchPreModuleGlobalEndLumi(this, &StallMonitor::preModuleGlobalTransition);
0293     iRegistry.watchPostModuleGlobalEndLumi(this, &StallMonitor::postModuleGlobalTransition);
0294     iRegistry.watchPreModuleWriteLumi(this, &StallMonitor::preModuleGlobalTransition);
0295     iRegistry.watchPostModuleWriteLumi(this, &StallMonitor::postModuleGlobalTransition);
0296 
0297     iRegistry.postESModuleRegistrationSignal_.connect([this](auto const& iDescription) {
0298       if (esModuleLabels_.size() <= iDescription.id_) {
0299         esModuleLabels_.resize(iDescription.id_ + 1);
0300       }
0301       if (not iDescription.label_.empty()) {
0302         esModuleLabels_[iDescription.id_] = iDescription.label_;
0303       } else {
0304         esModuleLabels_[iDescription.id_] = iDescription.type_;
0305       }
0306     });
0307 
0308     iRegistry.preESModuleSignal_.connect([this](auto const&, auto const& context) {
0309       auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0310       auto msg = assembleMessage<step::preESModule>(
0311           numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0312       file_.write(std::move(msg));
0313     });
0314     iRegistry.postESModuleSignal_.connect([this](auto const&, auto const& context) {
0315       auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0316       auto msg = assembleMessage<step::postESModule>(
0317           numStreams_, module_id(context), std::underlying_type_t<Phase>(Phase::eventSetupCall), t);
0318       file_.write(std::move(msg));
0319     });
0320 
0321     iRegistry.preallocateSignal_.connect(
0322         [this](service::SystemBounds const& iBounds) { numStreams_ = iBounds.maxNumberOfStreams(); });
0323 
0324     bool recordFrameworkTransitions = iPS.getUntrackedParameter<bool>("recordFrameworkTransitions");
0325     if (recordFrameworkTransitions) {
0326       {
0327         auto preGlobal = [this](GlobalContext const& gc) {
0328           auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0329           auto msg = assembleMessage<step::preFrameworkTransition>(
0330               numStreams_, gc.luminosityBlockID().run(), gc.luminosityBlockID().luminosityBlock(), toTransition(gc), t);
0331           file_.write(std::move(msg));
0332         };
0333         iRegistry.watchPreGlobalBeginRun(preGlobal);
0334         iRegistry.watchPreGlobalBeginLumi(preGlobal);
0335         iRegistry.watchPreGlobalEndLumi(preGlobal);
0336         iRegistry.watchPreGlobalEndRun(preGlobal);
0337       }
0338       {
0339         auto postGlobal = [this](GlobalContext const& gc) {
0340           auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0341           auto msg = assembleMessage<step::postFrameworkTransition>(
0342               numStreams_, gc.luminosityBlockID().run(), gc.luminosityBlockID().luminosityBlock(), toTransition(gc), t);
0343           file_.write(std::move(msg));
0344         };
0345         iRegistry.watchPostGlobalBeginRun(postGlobal);
0346         iRegistry.watchPostGlobalBeginLumi(postGlobal);
0347         iRegistry.watchPostGlobalEndLumi(postGlobal);
0348         iRegistry.watchPostGlobalEndRun(postGlobal);
0349       }
0350       {
0351         auto preStream = [this](StreamContext const& sc) {
0352           auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0353           auto msg = assembleMessage<step::preFrameworkTransition>(
0354               stream_id(sc), sc.eventID().run(), sc.eventID().luminosityBlock(), toTransition(sc), t);
0355           file_.write(std::move(msg));
0356         };
0357         iRegistry.watchPreStreamBeginRun(preStream);
0358         iRegistry.watchPreStreamBeginLumi(preStream);
0359         iRegistry.watchPreStreamEndLumi(preStream);
0360         iRegistry.watchPreStreamEndRun(preStream);
0361       }
0362       {
0363         auto postStream = [this](StreamContext const& sc) {
0364           auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0365           auto msg = assembleMessage<step::postFrameworkTransition>(
0366               stream_id(sc), sc.eventID().run(), sc.eventID().luminosityBlock(), toTransition(sc), t);
0367           file_.write(std::move(msg));
0368         };
0369         iRegistry.watchPostStreamBeginRun(postStream);
0370         iRegistry.watchPostStreamBeginLumi(postStream);
0371         iRegistry.watchPostStreamEndLumi(postStream);
0372         iRegistry.watchPostStreamEndRun(postStream);
0373       }
0374     }
0375 
0376     std::ostringstream oss;
0377     oss << "# Transition       Symbol\n";
0378     oss << "#----------------- ------\n";
0379     oss << "# eventSetupCall  " << Phase::eventSetupCall << "\n"
0380         << "# globalBeginRun  " << Phase::globalBeginRun << "\n"
0381         << "# streamBeginRun  " << Phase::streamBeginRun << "\n"
0382         << "# globalBeginLumi " << Phase::globalBeginLumi << "\n"
0383         << "# streamBeginLumi " << Phase::streamBeginLumi << "\n"
0384         << "# Event           " << Phase::Event << "\n"
0385         << "# streamEndLumi   " << Phase::streamEndLumi << "\n"
0386         << "# globalEndLumi   " << Phase::globalEndLumi << "\n"
0387         << "# streamEndRun    " << Phase::streamEndRun << "\n"
0388         << "# globalEndRun    " << Phase::globalEndRun << "\n";
0389     oss << "# Step                       Symbol Entries\n"
0390         << "# -------------------------- ------ ------------------------------------------\n"
0391         << "# preSourceEvent                " << step::preSourceEvent << "   <Stream ID> <Time since beginJob (ms)>\n"
0392         << "# postSourceEvent               " << step::postSourceEvent << "   <Stream ID> <Time since beginJob (ms)>\n"
0393         << "# preEvent                      " << step::preEvent
0394         << "   <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0395         << "# postModuleEventPrefetching    " << step::postModuleEventPrefetching
0396         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0397         << "# preModuleEventAcquire         " << step::preModuleEventAcquire
0398         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0399         << "# postModuleEventAcquire        " << step::postModuleEventAcquire
0400         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0401         << "# preModuleTransition           " << step::preModuleEvent
0402         << "   <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0403         << "# preEventReadFromSource        " << step::preEventReadFromSource
0404         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0405         << "# postEventReadFromSource       " << step::postEventReadFromSource
0406         << "   <Stream ID> <Module ID> <Time since beginJob (ms)>\n"
0407         << "# postModuleTransition          " << step::postModuleEvent
0408         << "   <Stream ID> <Module ID> <Transition type> <Time since beginJob (ms)>\n"
0409         << "# postEvent                     " << step::postEvent
0410         << "   <Stream ID> <Run#> <LumiBlock#> <Event#> <Time since beginJob (ms)>\n"
0411         << "# postESModulePrefetching       " << step::postESModulePrefetching
0412         << "  <Stream ID> <ESModule ID> <Transition type> <Time since beginJob (ms)>\n"
0413         << "# preESModuleTransition         " << step::preESModule
0414         << "  <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n"
0415         << "# postESModuleTransition        " << step::postESModule
0416         << "  <StreamID> <ESModule ID> <TransitionType> <Time since beginJob (ms)>\n";
0417     if (recordFrameworkTransitions) {
0418       oss << "# preFrameworkTransition        " << step::preFrameworkTransition
0419           << " <Stream ID> <Run#> <LumiBlock#> <Transition type> <Time since beginJob (ms)>\n"
0420           << "# postFrameworkTransition       " << step::postFrameworkTransition
0421           << " <Stream ID> <Run#> <LumiBlock#> <Transition type> <Time since beginJob (ms)>\n";
0422     }
0423     file_.write(oss.str());
0424   }
0425 }
0426 
0427 void StallMonitor::fillDescriptions(ConfigurationDescriptions& descriptions) {
0428   ParameterSetDescription desc;
0429   desc.addUntracked<std::string>("fileName", filename_default)
0430       ->setComment(
0431           "Name of file to which detailed timing information should be written.\n"
0432           "An empty filename argument (the default) indicates that no extra\n"
0433           "information will be written to a dedicated file, but only the summary\n"
0434           "including stalling-modules information will be logged.");
0435   desc.addUntracked<double>("stallThreshold", threshold_default)
0436       ->setComment(
0437           "Threshold (in seconds) used to classify modules as stalled.\n"
0438           "Microsecond granularity allowed.");
0439   desc.addUntracked<bool>("recordFrameworkTransitions", false)
0440       ->setComment(
0441           "When writing a file, include the framework state transitions:\n"
0442           " stream and global, begin and end, Run and LuminosityBlock.");
0443   descriptions.add("StallMonitor", desc);
0444   descriptions.setComment(
0445       "This service keeps track of various times in event-processing to determine which modules are stalling.");
0446 }
0447 
0448 void StallMonitor::preModuleConstruction(ModuleDescription const& md) {
0449   // Module labels are dense, so if the module id is greater than the
0450   // size of moduleLabels_, grow the vector to the correct index and
0451   // assign the last entry to the desired label.  Note that with the
0452   // current implementation, there is no module with ID '0'.  In
0453   // principle, the module-information vectors are therefore each one
0454   // entry too large.  However, since removing the entry at the front
0455   // makes for awkward indexing later on, and since the sizes of these
0456   // extra entries are on the order of bytes, we will leave them in
0457   // and skip over them later when printing out summaries.  The
0458   // extraneous entries can be identified by their module labels being
0459   // empty.
0460   auto const mid = md.id();
0461   if (mid < moduleLabels_.size()) {
0462     moduleLabels_[mid] = md.moduleLabel();
0463   } else {
0464     moduleLabels_.resize(mid + 1);
0465     moduleLabels_.back() = md.moduleLabel();
0466   }
0467 }
0468 
0469 void StallMonitor::preModuleDestruction(ModuleDescription const& md) {
0470   // Reset the module label back if the module is deleted before
0471   // beginJob() so that the entry is ignored in the summary printouts.
0472   moduleLabels_[md.id()] = "";
0473 }
0474 
0475 void StallMonitor::postBeginJob() {
0476   // Since a (push,emplace)_back cannot be called for a vector of a
0477   // type containing atomics (like 'StallStatistics')--i.e. atomics
0478   // have no copy/move-assignment operators, we must specify the size
0479   // of the vector at construction time.
0480   moduleStats_ = std::vector<StallStatistics>(moduleLabels_.size());
0481   for (std::size_t i{}; i < moduleStats_.size(); ++i) {
0482     moduleStats_[i].setLabel(moduleLabels_[i]);
0483   }
0484 
0485   if (validFile_) {
0486     {
0487       std::ostringstream oss;
0488       moduleIdToLabel(oss, moduleLabels_, 'M', "Module ID", "Module label");
0489       file_.write(oss.str());
0490     }
0491     {
0492       std::ostringstream oss;
0493       moduleIdToLabel(oss, esModuleLabels_, 'N', "ESModule ID", "ESModule label");
0494       file_.write(oss.str());
0495     }
0496   }
0497   // Don't need the labels anymore--info. is now part of the
0498   // module-statistics objects.
0499   decltype(moduleLabels_)().swap(moduleLabels_);
0500   decltype(esModuleLabels_)().swap(esModuleLabels_);
0501 
0502   beginTime_ = now();
0503 }
0504 
0505 void StallMonitor::preSourceEvent(StreamID const sid) {
0506   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0507   auto msg = assembleMessage<step::preSourceEvent>(sid.value(), t);
0508   file_.write(std::move(msg));
0509 }
0510 
0511 void StallMonitor::postSourceEvent(StreamID const sid) {
0512   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0513   auto msg = assembleMessage<step::postSourceEvent>(sid.value(), t);
0514   file_.write(std::move(msg));
0515 }
0516 
0517 void StallMonitor::preEvent(StreamContext const& sc) {
0518   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0519   auto const& eid = sc.eventID();
0520   auto msg = assembleMessage<step::preEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0521   file_.write(std::move(msg));
0522 }
0523 
0524 void StallMonitor::postModuleEventPrefetching(StreamContext const& sc, ModuleCallingContext const& mcc) {
0525   auto const sid = stream_id(sc);
0526   auto const mid = module_id(mcc);
0527   auto start = stallStart_[std::make_pair(sid, mid)] = std::make_pair(now(), false);
0528 
0529   if (validFile_) {
0530     auto const t = duration_cast<duration_t>(start.first - beginTime_).count();
0531     auto msg = assembleMessage<step::postModuleEventPrefetching>(sid, mid, t);
0532     file_.write(std::move(msg));
0533   }
0534 }
0535 
0536 void StallMonitor::preModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0537   auto const preModEventAcquire = now();
0538   auto const sid = stream_id(sc);
0539   auto const mid = module_id(mcc);
0540   auto& start = stallStart_[std::make_pair(sid, mid)];
0541   auto startT = start.first.time_since_epoch();
0542   start.second = true;  // record so the preModuleEvent knows that acquire was called
0543   if (validFile_) {
0544     auto t = duration_cast<duration_t>(preModEventAcquire - beginTime_).count();
0545     auto msg = assembleMessage<step::preModuleEventAcquire>(sid, mid, t);
0546     file_.write(std::move(msg));
0547   }
0548   // Check for stalls if prefetch was called
0549   if (duration_t::duration::zero() != startT) {
0550     auto const preFetch_to_preModEventAcquire = duration_cast<duration_t>(preModEventAcquire - start.first);
0551     if (preFetch_to_preModEventAcquire < stallThreshold_)
0552       return;
0553     moduleStats_[mid].update(preFetch_to_preModEventAcquire);
0554   }
0555 }
0556 
0557 void StallMonitor::postModuleEventAcquire(StreamContext const& sc, ModuleCallingContext const& mcc) {
0558   auto const postModEventAcquire = duration_cast<duration_t>(now() - beginTime_).count();
0559   auto msg = assembleMessage<step::postModuleEventAcquire>(stream_id(sc), module_id(mcc), postModEventAcquire);
0560   file_.write(std::move(msg));
0561 }
0562 
0563 void StallMonitor::preModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0564   auto const preModEvent = now();
0565   auto const sid = stream_id(sc);
0566   auto const mid = module_id(mcc);
0567   auto const& start = stallStart_[std::make_pair(sid, mid)];
0568   auto startT = start.first.time_since_epoch();
0569   if (validFile_) {
0570     auto t = duration_cast<duration_t>(preModEvent - beginTime_).count();
0571     auto msg =
0572         assembleMessage<step::preModuleEvent>(sid, mid, static_cast<std::underlying_type_t<Phase>>(Phase::Event), t);
0573     file_.write(std::move(msg));
0574   }
0575   // Check for stalls if prefetch was called and we did not already check before acquire
0576   if (duration_t::duration::zero() != startT && !start.second) {
0577     auto const preFetch_to_preModEvent = duration_cast<duration_t>(preModEvent - start.first);
0578     if (preFetch_to_preModEvent < stallThreshold_)
0579       return;
0580     moduleStats_[mid].update(preFetch_to_preModEvent);
0581   }
0582 }
0583 
0584 void StallMonitor::preModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0585   auto const tNow = now();
0586   auto const sid = stream_id(sc);
0587   auto const mid = module_id(mcc);
0588   auto t = duration_cast<duration_t>(tNow - beginTime_).count();
0589   auto msg = assembleMessage<step::preModuleEvent>(sid, mid, toTransition(sc), t);
0590   file_.write(std::move(msg));
0591 }
0592 
0593 void StallMonitor::postModuleStreamTransition(StreamContext const& sc, ModuleCallingContext const& mcc) {
0594   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0595   auto msg = assembleMessage<step::postModuleEvent>(stream_id(sc), module_id(mcc), toTransition(sc), t);
0596   file_.write(std::move(msg));
0597 }
0598 
0599 void StallMonitor::preModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0600   auto t = duration_cast<duration_t>(now() - beginTime_).count();
0601   auto msg = assembleMessage<step::preModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), t);
0602   file_.write(std::move(msg));
0603 }
0604 
0605 void StallMonitor::postModuleGlobalTransition(GlobalContext const& gc, ModuleCallingContext const& mcc) {
0606   auto const postModTime = duration_cast<duration_t>(now() - beginTime_).count();
0607   auto msg = assembleMessage<step::postModuleEvent>(numStreams_, module_id(mcc), toTransition(gc), postModTime);
0608   file_.write(std::move(msg));
0609 }
0610 
0611 void StallMonitor::preEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0612   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0613   auto msg = assembleMessage<step::preEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0614   file_.write(std::move(msg));
0615 }
0616 
0617 void StallMonitor::postEventReadFromSource(StreamContext const& sc, ModuleCallingContext const& mcc) {
0618   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0619   auto msg = assembleMessage<step::postEventReadFromSource>(stream_id(sc), module_id(mcc), t);
0620   file_.write(std::move(msg));
0621 }
0622 
0623 void StallMonitor::postModuleEvent(StreamContext const& sc, ModuleCallingContext const& mcc) {
0624   auto const postModEvent = duration_cast<duration_t>(now() - beginTime_).count();
0625   auto msg = assembleMessage<step::postModuleEvent>(
0626       stream_id(sc), module_id(mcc), static_cast<std::underlying_type_t<Phase>>(Phase::Event), postModEvent);
0627   file_.write(std::move(msg));
0628 }
0629 
0630 void StallMonitor::postEvent(StreamContext const& sc) {
0631   auto const t = duration_cast<duration_t>(now() - beginTime_).count();
0632   auto const& eid = sc.eventID();
0633   auto msg = assembleMessage<step::postEvent>(stream_id(sc), eid.run(), eid.luminosityBlock(), eid.event(), t);
0634   file_.write(std::move(msg));
0635 }
0636 
0637 void StallMonitor::postEndJob() {
0638   // Prepare summary
0639   std::size_t width{};
0640   edm::for_all(moduleStats_, [&width](auto const& stats) {
0641     if (stats.numberOfStalls() == 0u)
0642       return;
0643     width = std::max(width, stats.label().size());
0644   });
0645 
0646   OStreamColumn tag{"StallMonitor>"};
0647   OStreamColumn col1{"Module label", width};
0648   OStreamColumn col2{"# of stalls"};
0649   OStreamColumn col3{"Total stalled time"};
0650   OStreamColumn col4{"Max stalled time"};
0651 
0652   LogAbsolute out{"StallMonitor"};
0653   out << '\n';
0654   out << tag << space << col1 << space << col2 << space << col3 << space << col4 << '\n';
0655 
0656   out << tag << space << std::setfill('-') << col1(std::string{}) << space << col2(std::string{}) << space
0657       << col3(std::string{}) << space << col4(std::string{}) << '\n';
0658 
0659   using seconds_d = duration<double>;
0660 
0661   auto to_seconds_str = [](auto const& duration) {
0662     std::ostringstream oss;
0663     auto const time = duration_cast<seconds_d>(duration).count();
0664     oss << time << " s";
0665     return oss.str();
0666   };
0667 
0668   out << std::setfill(' ');
0669   for (auto const& stats : moduleStats_) {
0670     if (stats.label().empty() ||  // See comment in filling of moduleLabels_;
0671         stats.numberOfStalls() == 0u)
0672       continue;
0673     out << std::left << tag << space << col1(stats.label()) << space << std::right << col2(stats.numberOfStalls())
0674         << space << col3(to_seconds_str(stats.totalStalledTime())) << space
0675         << col4(to_seconds_str(stats.maxStalledTime())) << '\n';
0676   }
0677 }
0678 
0679 DEFINE_FWK_SERVICE(StallMonitor);