Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:13:07

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Services
0004 // Class  :     ZombieKillerService
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Sat, 22 Mar 2014 16:25:47 GMT
0011 //
0012 
0013 // system include files
0014 #include <atomic>
0015 #include <thread>
0016 #include <mutex>
0017 #include <condition_variable>
0018 #include <exception>
0019 
0020 // user include files
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0026 
0027 namespace edm {
0028   class ZombieKillerService {
0029   public:
0030     ZombieKillerService(edm::ParameterSet const&, edm::ActivityRegistry&);
0031 
0032     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0033 
0034   private:
0035     const unsigned int m_checkThreshold;
0036     const unsigned int m_secsBetweenChecks;
0037     std::thread m_watchingThread;
0038     std::condition_variable m_jobDoneCondition;
0039     std::mutex m_jobDoneMutex;
0040     bool m_jobDone;
0041     std::atomic<bool> m_stillAlive;
0042     std::atomic<unsigned int> m_numberChecksWhenNotAlive;
0043 
0044     void notAZombieYet();
0045     void checkForZombie();
0046     void startThread();
0047     void stopThread();
0048   };
0049 }  // namespace edm
0050 
0051 using namespace edm;
0052 
0053 namespace edm {
0054   namespace service {
0055     inline bool isProcessWideService(ZombieKillerService const*) { return true; }
0056   }  // namespace service
0057 }  // namespace edm
0058 
0059 ZombieKillerService::ZombieKillerService(edm::ParameterSet const& iPSet, edm::ActivityRegistry& iRegistry)
0060     : m_checkThreshold(iPSet.getUntrackedParameter<unsigned int>("numberOfAllowedFailedChecksInARow")),
0061       m_secsBetweenChecks(iPSet.getUntrackedParameter<unsigned int>("secondsBetweenChecks")),
0062       m_jobDone(false),
0063       m_stillAlive(true),
0064       m_numberChecksWhenNotAlive(0) {
0065   iRegistry.watchPostBeginJob([this]() { startThread(); });
0066   iRegistry.watchPostEndJob([this]() { stopThread(); });
0067 
0068   iRegistry.watchPreSourceRun([this](RunIndex) { notAZombieYet(); });
0069   iRegistry.watchPostSourceRun([this](RunIndex) { notAZombieYet(); });
0070 
0071   iRegistry.watchPreSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0072   iRegistry.watchPostSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0073 
0074   iRegistry.watchPreSourceEvent([this](StreamID) { notAZombieYet(); });
0075   iRegistry.watchPostSourceEvent([this](StreamID) { notAZombieYet(); });
0076 
0077   iRegistry.watchPreModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0078   iRegistry.watchPostModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0079 
0080   iRegistry.watchPreModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0081   iRegistry.watchPostModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0082 
0083   iRegistry.watchPreModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0084   iRegistry.watchPostModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0085   iRegistry.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0086   iRegistry.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0087 
0088   iRegistry.watchPreModuleStreamBeginRun(
0089       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0090   iRegistry.watchPostModuleStreamBeginRun(
0091       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0092 
0093   iRegistry.watchPreModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0094   iRegistry.watchPostModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0095 
0096   iRegistry.watchPreModuleStreamBeginLumi(
0097       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0098   iRegistry.watchPostModuleStreamBeginLumi(
0099       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0100 
0101   iRegistry.watchPreModuleStreamEndLumi([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0102   iRegistry.watchPostModuleStreamEndLumi(
0103       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0104 
0105   iRegistry.watchPreModuleGlobalBeginRun(
0106       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0107   iRegistry.watchPostModuleGlobalBeginRun(
0108       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0109 
0110   iRegistry.watchPreModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0111   iRegistry.watchPostModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0112 
0113   iRegistry.watchPreModuleGlobalBeginLumi(
0114       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0115   iRegistry.watchPostModuleGlobalBeginLumi(
0116       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0117 
0118   iRegistry.watchPreModuleGlobalEndLumi([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0119   iRegistry.watchPostModuleGlobalEndLumi(
0120       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0121 }
0122 
0123 // ZombieKillerService::ZombieKillerService(const ZombieKillerService& rhs)
0124 // {
0125 //    // do actual copying here;
0126 // }
0127 
0128 //ZombieKillerService::~ZombieKillerService()
0129 //{
0130 //}
0131 
0132 //
0133 // assignment operators
0134 //
0135 // const ZombieKillerService& ZombieKillerService::operator=(const ZombieKillerService& rhs)
0136 // {
0137 //   //An exception safe implementation is
0138 //   ZombieKillerService temp(rhs);
0139 //   swap(rhs);
0140 //
0141 //   return *this;
0142 // }
0143 
0144 //
0145 // member functions
0146 //
0147 void ZombieKillerService::notAZombieYet() {
0148   m_numberChecksWhenNotAlive = 0;
0149   m_stillAlive = true;
0150 }
0151 
0152 void ZombieKillerService::checkForZombie() {
0153   if (not m_stillAlive) {
0154     ++m_numberChecksWhenNotAlive;
0155     if (m_numberChecksWhenNotAlive > m_checkThreshold) {
0156       edm::LogError("JobStuck") << "Too long since the job has last made progress.";
0157       std::terminate();
0158     } else {
0159       edm::LogWarning("JobProgressing") << "It has been " << m_numberChecksWhenNotAlive * m_secsBetweenChecks
0160                                         << " seconds since job seen progressing";
0161     }
0162   }
0163   m_stillAlive = false;
0164 }
0165 
0166 void ZombieKillerService::startThread() {
0167   m_watchingThread = std::thread([this]() {
0168     std::unique_lock<std::mutex> lock(m_jobDoneMutex);
0169     while (not m_jobDoneCondition.wait_for(
0170         lock, std::chrono::seconds(m_secsBetweenChecks), [this]() -> bool { return m_jobDone; })) {
0171       //we timed out
0172       checkForZombie();
0173     }
0174   });
0175 }
0176 
0177 void ZombieKillerService::stopThread() {
0178   {
0179     std::lock_guard<std::mutex> guard(m_jobDoneMutex);
0180     m_jobDone = true;
0181   }
0182   m_jobDoneCondition.notify_all();
0183   m_watchingThread.join();
0184 }
0185 
0186 void ZombieKillerService::fillDescriptions(ConfigurationDescriptions& descriptions) {
0187   ParameterSetDescription desc;
0188   desc.addUntracked<unsigned int>("secondsBetweenChecks", 60)
0189       ->setComment("Number of seconds to wait between checking if progress has been made.");
0190   desc.addUntracked<unsigned int>("numberOfAllowedFailedChecksInARow", 3)
0191       ->setComment("Number of allowed checks in a row with no progress.");
0192   descriptions.add("ZombieKillerService", desc);
0193 }
0194 
0195 //
0196 // const member functions
0197 //
0198 
0199 //
0200 // static member functions
0201 //
0202 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0203 
0204 DEFINE_FWK_SERVICE(ZombieKillerService);