File indexing completed on 2024-04-06 12:13:07
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <atomic>
0015 #include <thread>
0016 #include <mutex>
0017 #include <condition_variable>
0018 #include <exception>
0019
0020
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0024 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0025 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0026
0027 namespace edm {
0028 class ZombieKillerService {
0029 public:
0030 ZombieKillerService(edm::ParameterSet const&, edm::ActivityRegistry&);
0031
0032 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0033
0034 private:
0035 const unsigned int m_checkThreshold;
0036 const unsigned int m_secsBetweenChecks;
0037 std::thread m_watchingThread;
0038 std::condition_variable m_jobDoneCondition;
0039 std::mutex m_jobDoneMutex;
0040 bool m_jobDone;
0041 std::atomic<bool> m_stillAlive;
0042 std::atomic<unsigned int> m_numberChecksWhenNotAlive;
0043
0044 void notAZombieYet();
0045 void checkForZombie();
0046 void startThread();
0047 void stopThread();
0048 };
0049 }
0050
0051 using namespace edm;
0052
0053 namespace edm {
0054 namespace service {
0055 inline bool isProcessWideService(ZombieKillerService const*) { return true; }
0056 }
0057 }
0058
0059 ZombieKillerService::ZombieKillerService(edm::ParameterSet const& iPSet, edm::ActivityRegistry& iRegistry)
0060 : m_checkThreshold(iPSet.getUntrackedParameter<unsigned int>("numberOfAllowedFailedChecksInARow")),
0061 m_secsBetweenChecks(iPSet.getUntrackedParameter<unsigned int>("secondsBetweenChecks")),
0062 m_jobDone(false),
0063 m_stillAlive(true),
0064 m_numberChecksWhenNotAlive(0) {
0065 iRegistry.watchPostBeginJob([this]() { startThread(); });
0066 iRegistry.watchPostEndJob([this]() { stopThread(); });
0067
0068 iRegistry.watchPreSourceRun([this](RunIndex) { notAZombieYet(); });
0069 iRegistry.watchPostSourceRun([this](RunIndex) { notAZombieYet(); });
0070
0071 iRegistry.watchPreSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0072 iRegistry.watchPostSourceLumi([this](LuminosityBlockIndex) { notAZombieYet(); });
0073
0074 iRegistry.watchPreSourceEvent([this](StreamID) { notAZombieYet(); });
0075 iRegistry.watchPostSourceEvent([this](StreamID) { notAZombieYet(); });
0076
0077 iRegistry.watchPreModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0078 iRegistry.watchPostModuleBeginStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0079
0080 iRegistry.watchPreModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0081 iRegistry.watchPostModuleEndStream([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0082
0083 iRegistry.watchPreModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0084 iRegistry.watchPostModuleEndJob([this](ModuleDescription const&) { notAZombieYet(); });
0085 iRegistry.watchPreModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0086 iRegistry.watchPostModuleEvent([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0087
0088 iRegistry.watchPreModuleStreamBeginRun(
0089 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0090 iRegistry.watchPostModuleStreamBeginRun(
0091 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0092
0093 iRegistry.watchPreModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0094 iRegistry.watchPostModuleStreamEndRun([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0095
0096 iRegistry.watchPreModuleStreamBeginLumi(
0097 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0098 iRegistry.watchPostModuleStreamBeginLumi(
0099 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0100
0101 iRegistry.watchPreModuleStreamEndLumi([this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0102 iRegistry.watchPostModuleStreamEndLumi(
0103 [this](StreamContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0104
0105 iRegistry.watchPreModuleGlobalBeginRun(
0106 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0107 iRegistry.watchPostModuleGlobalBeginRun(
0108 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0109
0110 iRegistry.watchPreModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0111 iRegistry.watchPostModuleGlobalEndRun([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0112
0113 iRegistry.watchPreModuleGlobalBeginLumi(
0114 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0115 iRegistry.watchPostModuleGlobalBeginLumi(
0116 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0117
0118 iRegistry.watchPreModuleGlobalEndLumi([this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0119 iRegistry.watchPostModuleGlobalEndLumi(
0120 [this](GlobalContext const&, ModuleCallingContext const&) { notAZombieYet(); });
0121 }
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144
0145
0146
0147 void ZombieKillerService::notAZombieYet() {
0148 m_numberChecksWhenNotAlive = 0;
0149 m_stillAlive = true;
0150 }
0151
0152 void ZombieKillerService::checkForZombie() {
0153 if (not m_stillAlive) {
0154 ++m_numberChecksWhenNotAlive;
0155 if (m_numberChecksWhenNotAlive > m_checkThreshold) {
0156 edm::LogError("JobStuck") << "Too long since the job has last made progress.";
0157 std::terminate();
0158 } else {
0159 edm::LogWarning("JobProgressing") << "It has been " << m_numberChecksWhenNotAlive * m_secsBetweenChecks
0160 << " seconds since job seen progressing";
0161 }
0162 }
0163 m_stillAlive = false;
0164 }
0165
0166 void ZombieKillerService::startThread() {
0167 m_watchingThread = std::thread([this]() {
0168 std::unique_lock<std::mutex> lock(m_jobDoneMutex);
0169 while (not m_jobDoneCondition.wait_for(
0170 lock, std::chrono::seconds(m_secsBetweenChecks), [this]() -> bool { return m_jobDone; })) {
0171
0172 checkForZombie();
0173 }
0174 });
0175 }
0176
0177 void ZombieKillerService::stopThread() {
0178 {
0179 std::lock_guard<std::mutex> guard(m_jobDoneMutex);
0180 m_jobDone = true;
0181 }
0182 m_jobDoneCondition.notify_all();
0183 m_watchingThread.join();
0184 }
0185
0186 void ZombieKillerService::fillDescriptions(ConfigurationDescriptions& descriptions) {
0187 ParameterSetDescription desc;
0188 desc.addUntracked<unsigned int>("secondsBetweenChecks", 60)
0189 ->setComment("Number of seconds to wait between checking if progress has been made.");
0190 desc.addUntracked<unsigned int>("numberOfAllowedFailedChecksInARow", 3)
0191 ->setComment("Number of allowed checks in a row with no progress.");
0192 descriptions.add("ZombieKillerService", desc);
0193 }
0194
0195
0196
0197
0198
0199
0200
0201
0202 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0203
0204 DEFINE_FWK_SERVICE(ZombieKillerService);