Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:04

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     ConcurrentModuleTimer
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Tue, 10 Dec 2013 21:16:00 GMT
0011 //
0012 #include <memory>
0013 
0014 #include <vector>
0015 #include <atomic>
0016 #include <chrono>
0017 #include <iostream>
0018 
0019 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0020 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0021 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0022 #include "FWCore/Utilities/interface/CPUTimer.h"
0023 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0024 #include "FWCore/ServiceRegistry/interface/SystemBounds.h"
0025 #include "FWCore/ServiceRegistry/interface/ModuleCallingContext.h"
0026 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0027 
0028 namespace edm {
0029   namespace service {
0030     class ConcurrentModuleTimer {
0031     public:
0032       ConcurrentModuleTimer(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iAR);
0033       ~ConcurrentModuleTimer();
0034       static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
0035 
0036     private:
0037       void start();
0038       void stop();
0039 
0040       bool trackModule(ModuleCallingContext const& iContext) const;
0041       std::unique_ptr<std::atomic<std::chrono::steady_clock::rep>[]> m_timeSums;
0042       std::vector<std::string> m_modulesToExclude;
0043       std::vector<unsigned int> m_excludedModuleIds;
0044       std::chrono::steady_clock::time_point m_time;
0045       unsigned int m_nTimeSums = 0;
0046       unsigned int m_nModules;
0047       unsigned int m_maxNModules = 0;
0048       const unsigned int m_padding;
0049       std::atomic<bool> m_spinLock;
0050       bool m_startedTiming;
0051       const bool m_excludeSource;
0052       const bool m_trackGlobalBeginRun;
0053     };
0054   }  // namespace service
0055 }  // namespace edm
0056 
0057 using namespace edm::service;
0058 // system include files
0059 
0060 // user include files
0061 
0062 //
0063 // constants, enums and typedefs
0064 //
0065 
0066 //
0067 // static data member definitions
0068 //
0069 
0070 //
0071 // constructors and destructor
0072 //
0073 ConcurrentModuleTimer::ConcurrentModuleTimer(edm::ParameterSet const& iConfig, edm::ActivityRegistry& iReg)
0074     : m_modulesToExclude(iConfig.getUntrackedParameter<std::vector<std::string>>("modulesToExclude")),
0075       m_time(),
0076       m_nModules(0),
0077       m_padding(iConfig.getUntrackedParameter<unsigned int>("padding")),
0078       m_spinLock{false},
0079       m_startedTiming(false),
0080       m_excludeSource(iConfig.getUntrackedParameter<bool>("excludeSource")),
0081       m_trackGlobalBeginRun(iConfig.getUntrackedParameter<bool>("trackGlobalBeginRun")) {
0082   if (not m_modulesToExclude.empty()) {
0083     iReg.watchPreModuleConstruction([this](ModuleDescription const& iMod) {
0084       for (auto const& name : m_modulesToExclude) {
0085         if (iMod.moduleLabel() == name) {
0086           m_excludedModuleIds.push_back(iMod.id());
0087           break;
0088         }
0089       }
0090     });
0091     iReg.watchPreModuleDestruction([this](ModuleDescription const& iMod) {
0092       auto found = std::find(m_excludedModuleIds.begin(), m_excludedModuleIds.end(), iMod.id());
0093       if (found != m_excludedModuleIds.end()) {
0094         m_excludedModuleIds.erase(found);
0095       }
0096     });
0097     iReg.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const& iContext) {
0098       if (trackModule(iContext)) {
0099         start();
0100       }
0101     });
0102     iReg.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const& iContext) {
0103       if (trackModule(iContext)) {
0104         stop();
0105       }
0106     });
0107 
0108     iReg.watchPreModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0109       if (trackModule(iContext)) {
0110         if (iContext.state() == ModuleCallingContext::State::kRunning) {
0111           stop();
0112         }
0113       }
0114     });
0115     iReg.watchPostModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0116       if (trackModule(iContext)) {
0117         if (iContext.state() == ModuleCallingContext::State::kRunning) {
0118           start();
0119         }
0120       }
0121     });
0122 
0123   } else {
0124     //apply to all modules so can use faster version
0125     iReg.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { start(); });
0126     iReg.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { stop(); });
0127 
0128     iReg.watchPreModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0129       if (iContext.state() == ModuleCallingContext::State::kRunning) {
0130         stop();
0131       }
0132     });
0133     iReg.watchPostModuleEventDelayedGet([this](StreamContext const&, ModuleCallingContext const& iContext) {
0134       if (iContext.state() == ModuleCallingContext::State::kRunning) {
0135         start();
0136       }
0137     });
0138     if (m_trackGlobalBeginRun) {
0139       iReg.watchPreModuleGlobalBeginRun([this](GlobalContext const&, ModuleCallingContext const&) {
0140         if (not m_startedTiming) {
0141           m_time = std::chrono::steady_clock::now();
0142           m_startedTiming = true;
0143         }
0144 
0145         start();
0146       });
0147       iReg.watchPostModuleGlobalBeginRun([this](GlobalContext const&, ModuleCallingContext const&) { stop(); });
0148     }
0149   }
0150 
0151   iReg.watchPreallocate([this](edm::service::SystemBounds const& iBounds) {
0152     m_nTimeSums = iBounds.maxNumberOfThreads() + 1 + m_padding;
0153     m_timeSums = std::make_unique<std::atomic<std::chrono::steady_clock::rep>[]>(m_nTimeSums);
0154     for (unsigned int i = 0; i < m_nTimeSums; ++i) {
0155       m_timeSums[i] = 0;
0156     }
0157   });
0158 
0159   iReg.watchPreSourceEvent([this](StreamID) {
0160     if (not m_startedTiming) {
0161       m_time = std::chrono::steady_clock::now();
0162       m_startedTiming = true;
0163     }
0164     if (not m_excludeSource) {
0165       start();
0166     }
0167   });
0168   if (not m_excludeSource) {
0169     iReg.watchPostSourceEvent([this](StreamID) { stop(); });
0170   }
0171 }
0172 
0173 ConcurrentModuleTimer::~ConcurrentModuleTimer() {
0174   std::cout << "Maximum concurrent running modules: " << m_maxNModules << std::endl;
0175   std::cout << "Fraction of time running n Modules simultaneously" << std::endl;
0176   for (unsigned int i = 0; i < m_nTimeSums; ++i) {
0177     std::cout << i << " " << m_timeSums[i] / double(m_timeSums[0]) << " " << m_timeSums[i] << std::endl;
0178   }
0179 }
0180 
0181 // ConcurrentModuleTimer::ConcurrentModuleTimer(const ConcurrentModuleTimer& rhs)
0182 // {
0183 //    // do actual copying here;
0184 // }
0185 
0186 //
0187 // assignment operators
0188 //
0189 // const ConcurrentModuleTimer& ConcurrentModuleTimer::operator=(const ConcurrentModuleTimer& rhs)
0190 // {
0191 //   //An exception safe implementation is
0192 //   ConcurrentModuleTimer temp(rhs);
0193 //   swap(rhs);
0194 //
0195 //   return *this;
0196 // }
0197 
0198 //
0199 // member functions
0200 //
0201 void ConcurrentModuleTimer::start() {
0202   auto const newTime = std::chrono::steady_clock::now();
0203   std::chrono::steady_clock::time_point oldTime;
0204   bool expected = false;
0205   unsigned int nModules;
0206   while (not m_spinLock.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
0207     expected = false;
0208   }
0209   {
0210     oldTime = m_time;
0211     m_time = newTime;
0212     nModules = ++m_nModules;
0213     if (nModules > m_maxNModules) {
0214       m_maxNModules = nModules;
0215     }
0216     m_spinLock.store(false, std::memory_order_release);
0217   }
0218   assert(nModules < m_nTimeSums);
0219   auto diff = newTime - oldTime;
0220   for (unsigned int i = 0; i < nModules; ++i) {
0221     m_timeSums[i].fetch_add(diff.count());
0222   }
0223 }
0224 
0225 void ConcurrentModuleTimer::stop() {
0226   auto const newTime = std::chrono::steady_clock::now();
0227   std::chrono::steady_clock::time_point oldTime;
0228   bool expected = false;
0229   unsigned int nModules;
0230   while (not m_spinLock.compare_exchange_weak(expected, true, std::memory_order_acq_rel)) {
0231     expected = false;
0232   }
0233   {
0234     oldTime = m_time;
0235     m_time = newTime;
0236     nModules = m_nModules--;
0237     m_spinLock.store(false, std::memory_order_release);
0238   }
0239   assert(nModules < m_nTimeSums);
0240   auto diff = newTime - oldTime;
0241   for (unsigned int i = 0; i <= nModules; ++i) {
0242     m_timeSums[i].fetch_add(diff.count());
0243   }
0244 }
0245 
0246 //
0247 // const member functions
0248 //
0249 bool ConcurrentModuleTimer::trackModule(ModuleCallingContext const& iContext) const {
0250   auto modId = iContext.moduleDescription()->id();
0251   for (auto const id : m_excludedModuleIds) {
0252     if (modId == id) {
0253       return false;
0254     }
0255   }
0256   return true;
0257 }
0258 
0259 //
0260 // static member functions
0261 //
0262 void ConcurrentModuleTimer::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0263   edm::ParameterSetDescription desc;
0264   desc.addUntracked<std::vector<std::string>>("modulesToExclude", std::vector<std::string>{})
0265       ->setComment("Module labels to exclude from the timing measurements");
0266   desc.addUntracked<bool>("excludeSource", false)->setComment("Exclude the time the source is running");
0267   desc.addUntracked<unsigned int>("padding", 0)
0268       ->setComment(
0269           "[Expert use only] Extra possible concurrent modules beyond thread count.\n Only useful in debugging "
0270           "possible framework scheduling problems.");
0271   desc.addUntracked<bool>("trackGlobalBeginRun", false)
0272       ->setComment("Check for concurrent modules during global begin run");
0273   descriptions.add("ConcurrentModuleTimer", desc);
0274 }
0275 
0276 DEFINE_FWK_SERVICE(ConcurrentModuleTimer);