Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-06-17 01:30:27

0001 // -*- C++ -*-
0002 //
0003 // Package:     FWCore/Services
0004 // Class  :     ZombieKillerService
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Sat, 22 Mar 2014 16:25:47 GMT
0011 //
0012 
0013 // system include files
0014 #include <atomic>
0015 #include <thread>
0016 #include <mutex>
0017 #include <condition_variable>
0018 #include <exception>
0019 
0020 // user include files
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0026 
0027 namespace edm {
0028   class ZombieKillerService {
0029   public:
0030     ZombieKillerService(edm::ParameterSet const&, edm::ActivityRegistry&);
0031 
0032     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0033 
0034   private:
0035     const unsigned int m_checkThreshold;
0036     const unsigned int m_secsBetweenChecks;
0037     std::thread m_watchingThread;
0038     std::condition_variable m_jobDoneCondition;
0039     std::mutex m_jobDoneMutex;
0040     bool m_jobDone;
0041     std::atomic<bool> m_stillAlive;
0042     std::atomic<unsigned int> m_numberChecksWhenNotAlive;
0043 
0044     void notAZombieYet();
0045     void checkForZombie();
0046     void startThread();
0047     void stopThread();
0048   };
0049 }  // namespace edm
0050 
0051 using namespace edm;
0052 
0053 ZombieKillerService::ZombieKillerService(edm::ParameterSet const& iPSet, edm::ActivityRegistry& iRegistry)
0054     : m_checkThreshold(iPSet.getUntrackedParameter<unsigned int>("numberOfAllowedFailedChecksInARow")),
0055       m_secsBetweenChecks(iPSet.getUntrackedParameter<unsigned int>("secondsBetweenChecks")),
0056       m_jobDone(false),
0057       m_stillAlive(true),
0058       m_numberChecksWhenNotAlive(0) {
0059   iRegistry.watchPostBeginJob([this]() { startThread(); });
0060   iRegistry.watchPostEndJob([this]() { stopThread(); });
0061 
0062   iRegistry.watchPreSourceRun([this](RunIndex) { notAZombieYet(); });
0063   iRegistry.watchPostSourceRun([this](RunIndex) { notAZombieYet(); });
0064 
0065   iRegistry.watchPreSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0066   iRegistry.watchPostSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0067 
0068   iRegistry.watchPreSourceEvent([this](StreamID) { notAZombieYet(); });
0069   iRegistry.watchPostSourceEvent([this](StreamID) { notAZombieYet(); });
0070 
0071   iRegistry.watchPreModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0072   iRegistry.watchPostModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0073 
0074   iRegistry.watchPreModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0075   iRegistry.watchPostModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0076 
0077   iRegistry.watchPreModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0078   iRegistry.watchPostModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0079   iRegistry.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0080   iRegistry.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0081 
0082   iRegistry.watchPreModuleStreamBeginRun(
0083       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0084   iRegistry.watchPostModuleStreamBeginRun(
0085       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0086 
0087   iRegistry.watchPreModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0088   iRegistry.watchPostModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0089 
0090   iRegistry.watchPreModuleStreamBeginLumi(
0091       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0092   iRegistry.watchPostModuleStreamBeginLumi(
0093       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0094 
0095   iRegistry.watchPreModuleStreamEndLumi([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0096   iRegistry.watchPostModuleStreamEndLumi(
0097       [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0098 
0099   iRegistry.watchPreModuleGlobalBeginRun(
0100       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0101   iRegistry.watchPostModuleGlobalBeginRun(
0102       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0103 
0104   iRegistry.watchPreModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0105   iRegistry.watchPostModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0106 
0107   iRegistry.watchPreModuleGlobalBeginLumi(
0108       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0109   iRegistry.watchPostModuleGlobalBeginLumi(
0110       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0111 
0112   iRegistry.watchPreModuleGlobalEndLumi([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0113   iRegistry.watchPostModuleGlobalEndLumi(
0114       [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0115 }
0116 
0117 // ZombieKillerService::ZombieKillerService(const ZombieKillerService& rhs)
0118 // {
0119 //    // do actual copying here;
0120 // }
0121 
0122 //ZombieKillerService::~ZombieKillerService()
0123 //{
0124 //}
0125 
0126 //
0127 // assignment operators
0128 //
0129 // const ZombieKillerService& ZombieKillerService::operator=(const ZombieKillerService& rhs)
0130 // {
0131 //   //An exception safe implementation is
0132 //   ZombieKillerService temp(rhs);
0133 //   swap(rhs);
0134 //
0135 //   return *this;
0136 // }
0137 
0138 //
0139 // member functions
0140 //
0141 void ZombieKillerService::notAZombieYet() {
0142   m_numberChecksWhenNotAlive = 0;
0143   m_stillAlive = true;
0144 }
0145 
0146 void ZombieKillerService::checkForZombie() {
0147   if (not m_stillAlive) {
0148     ++m_numberChecksWhenNotAlive;
0149     if (m_numberChecksWhenNotAlive > m_checkThreshold) {
0150       edm::LogError("JobStuck") << "Too long since the job has last made progress.";
0151       std::terminate();
0152     } else {
0153       edm::LogWarning("JobProgressing") << "It has been " << m_numberChecksWhenNotAlive * m_secsBetweenChecks
0154                                         << " seconds since job seen progressing";
0155     }
0156   }
0157   m_stillAlive = false;
0158 }
0159 
0160 void ZombieKillerService::startThread() {
0161   m_watchingThread = std::thread([this]() {
0162     std::unique_lock<std::mutex> lock(m_jobDoneMutex);
0163     while (not m_jobDoneCondition.wait_for(
0164         lock, std::chrono::seconds(m_secsBetweenChecks), [this]() -> bool { return m_jobDone; })) {
0165       //we timed out
0166       checkForZombie();
0167     }
0168   });
0169 }
0170 
0171 void ZombieKillerService::stopThread() {
0172   {
0173     std::lock_guard<std::mutex> guard(m_jobDoneMutex);
0174     m_jobDone = true;
0175   }
0176   m_jobDoneCondition.notify_all();
0177   m_watchingThread.join();
0178 }
0179 
0180 void ZombieKillerService::fillDescriptions(ConfigurationDescriptions& descriptions) {
0181   ParameterSetDescription desc;
0182   desc.addUntracked<unsigned int>("secondsBetweenChecks", 60)
0183       ->setComment("Number of seconds to wait between checking if progress has been made.");
0184   desc.addUntracked<unsigned int>("numberOfAllowedFailedChecksInARow", 3)
0185       ->setComment("Number of allowed checks in a row with no progress.");
0186   descriptions.add("ZombieKillerService", desc);
0187 }
0188 
0189 //
0190 // const member functions
0191 //
0192 
0193 //
0194 // static member functions
0195 //
0196 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0197 
0198 DEFINE_FWK_SERVICE(ZombieKillerService);