File indexing completed on 2025-06-17 01:30:27
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <atomic>
0015 #include <thread>
0016 #include <mutex>
0017 #include <condition_variable>
0018 #include <exception>
0019
0020
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0026
0027 namespace edm {
0028 class ZombieKillerService {
0029 public:
0030 ZombieKillerService(edm::ParameterSet const&, edm::ActivityRegistry&);
0031
0032 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0033
0034 private:
0035 const unsigned int m_checkThreshold;
0036 const unsigned int m_secsBetweenChecks;
0037 std::thread m_watchingThread;
0038 std::condition_variable m_jobDoneCondition;
0039 std::mutex m_jobDoneMutex;
0040 bool m_jobDone;
0041 std::atomic<bool> m_stillAlive;
0042 std::atomic<unsigned int> m_numberChecksWhenNotAlive;
0043
0044 void notAZombieYet();
0045 void checkForZombie();
0046 void startThread();
0047 void stopThread();
0048 };
0049 }
0050
0051 using namespace edm;
0052
0053 ZombieKillerService::ZombieKillerService(edm::ParameterSet const& iPSet, edm::ActivityRegistry& iRegistry)
0054 : m_checkThreshold(iPSet.getUntrackedParameter<unsigned int>("numberOfAllowedFailedChecksInARow")),
0055 m_secsBetweenChecks(iPSet.getUntrackedParameter<unsigned int>("secondsBetweenChecks")),
0056 m_jobDone(false),
0057 m_stillAlive(true),
0058 m_numberChecksWhenNotAlive(0) {
0059 iRegistry.watchPostBeginJob([this]() { startThread(); });
0060 iRegistry.watchPostEndJob([this]() { stopThread(); });
0061
0062 iRegistry.watchPreSourceRun([this](RunIndex) { notAZombieYet(); });
0063 iRegistry.watchPostSourceRun([this](RunIndex) { notAZombieYet(); });
0064
0065 iRegistry.watchPreSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0066 iRegistry.watchPostSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0067
0068 iRegistry.watchPreSourceEvent([this](StreamID) { notAZombieYet(); });
0069 iRegistry.watchPostSourceEvent([this](StreamID) { notAZombieYet(); });
0070
0071 iRegistry.watchPreModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0072 iRegistry.watchPostModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0073
0074 iRegistry.watchPreModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0075 iRegistry.watchPostModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0076
0077 iRegistry.watchPreModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0078 iRegistry.watchPostModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0079 iRegistry.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0080 iRegistry.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0081
0082 iRegistry.watchPreModuleStreamBeginRun(
0083 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0084 iRegistry.watchPostModuleStreamBeginRun(
0085 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0086
0087 iRegistry.watchPreModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0088 iRegistry.watchPostModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0089
0090 iRegistry.watchPreModuleStreamBeginLumi(
0091 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0092 iRegistry.watchPostModuleStreamBeginLumi(
0093 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0094
0095 iRegistry.watchPreModuleStreamEndLumi([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0096 iRegistry.watchPostModuleStreamEndLumi(
0097 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0098
0099 iRegistry.watchPreModuleGlobalBeginRun(
0100 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0101 iRegistry.watchPostModuleGlobalBeginRun(
0102 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0103
0104 iRegistry.watchPreModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0105 iRegistry.watchPostModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0106
0107 iRegistry.watchPreModuleGlobalBeginLumi(
0108 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0109 iRegistry.watchPostModuleGlobalBeginLumi(
0110 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0111
0112 iRegistry.watchPreModuleGlobalEndLumi([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0113 iRegistry.watchPostModuleGlobalEndLumi(
0114 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0115 }
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141 void ZombieKillerService::notAZombieYet() {
0142 m_numberChecksWhenNotAlive = 0;
0143 m_stillAlive = true;
0144 }
0145
0146 void ZombieKillerService::checkForZombie() {
0147 if (not m_stillAlive) {
0148 ++m_numberChecksWhenNotAlive;
0149 if (m_numberChecksWhenNotAlive > m_checkThreshold) {
0150 edm::LogError("JobStuck") << "Too long since the job has last made progress.";
0151 std::terminate();
0152 } else {
0153 edm::LogWarning("JobProgressing") << "It has been " << m_numberChecksWhenNotAlive * m_secsBetweenChecks
0154 << " seconds since job seen progressing";
0155 }
0156 }
0157 m_stillAlive = false;
0158 }
0159
0160 void ZombieKillerService::startThread() {
0161 m_watchingThread = std::thread([this]() {
0162 std::unique_lock<std::mutex> lock(m_jobDoneMutex);
0163 while (not m_jobDoneCondition.wait_for(
0164 lock, std::chrono::seconds(m_secsBetweenChecks), [this]() -> bool { return m_jobDone; })) {
0165
0166 checkForZombie();
0167 }
0168 });
0169 }
0170
0171 void ZombieKillerService::stopThread() {
0172 {
0173 std::lock_guard<std::mutex> guard(m_jobDoneMutex);
0174 m_jobDone = true;
0175 }
0176 m_jobDoneCondition.notify_all();
0177 m_watchingThread.join();
0178 }
0179
0180 void ZombieKillerService::fillDescriptions(ConfigurationDescriptions& descriptions) {
0181 ParameterSetDescription desc;
0182 desc.addUntracked<unsigned int>("secondsBetweenChecks", 60)
0183 ->setComment("Number of seconds to wait between checking if progress has been made.");
0184 desc.addUntracked<unsigned int>("numberOfAllowedFailedChecksInARow", 3)
0185 ->setComment("Number of allowed checks in a row with no progress.");
0186 descriptions.add("ZombieKillerService", desc);
0187 }
0188
0189
0190
0191
0192
0193
0194
0195
0196 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0197
0198 DEFINE_FWK_SERVICE(ZombieKillerService);