File indexing completed on 2025-06-17 01:30:26
0001
0002 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0003 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0004 #include "FWCore/AbstractServices/interface/ResourceInformation.h"
0005 #include "FWCore/AbstractServices/interface/TimingServiceBase.h"
0006 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0007 #include "FWCore/ServiceRegistry/interface/Service.h"
0008 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0009 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0010 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0012 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0013 #include "FWCore/ParameterSet/interface/Registry.h"
0014 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0015 #include "Utilities/XrdAdaptor/interface/XrdStatistics.h"
0016 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0017 #include "FWCore/Utilities/interface/processGUID.h"
0018
0019 #include <fcntl.h>
0020 #include <unistd.h>
0021 #include <sys/wait.h>
0022 #include <spawn.h>
0023 #include <iostream>
0024 #include <fstream>
0025 #include <sstream>
0026 #include <cmath>
0027 #include <chrono>
0028 #include <sstream>
0029 #include <atomic>
0030 #include <string>
0031 #include <set>
0032
0033 namespace edm {
0034
0035 namespace service {
0036
0037 class CondorStatusService {
0038 public:
0039 explicit CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar);
0040 ~CondorStatusService() {}
0041 CondorStatusService(const CondorStatusService &) = delete;
0042 CondorStatusService &operator=(const CondorStatusService &) = delete;
0043
0044 static void fillDescriptions(ConfigurationDescriptions &descriptions);
0045
0046 private:
0047 bool isChirpSupported();
0048 template <typename T>
0049 bool updateChirp(const std::string &key_suffix, const T &value);
0050 bool updateChirpQuoted(const std::string &key_suffix, const std::string &value);
0051 bool updateChirpImpl(std::string const &key, std::string const &value);
0052 inline void update();
0053 void firstUpdate();
0054 void secondUpdate();
0055 void lastUpdate();
0056 void updateImpl(std::chrono::steady_clock::duration sinceLastUpdate);
0057
0058 void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
0059 void eventPost(StreamContext const &iContext);
0060 void lumiPost(GlobalContext const &);
0061 void runPost(GlobalContext const &);
0062 void beginPre(ProcessContext const &processContext);
0063 void beginPost();
0064 void endPost();
0065 void filePost(std::string const &);
0066
0067 bool m_debug;
0068 std::atomic_flag m_shouldUpdate;
0069 std::chrono::steady_clock::time_point m_beginJob;
0070 std::chrono::steady_clock::duration m_updateInterval =
0071 std::chrono::duration_cast<std::chrono::steady_clock::duration>(m_defaultUpdateInterval);
0072 float m_emaInterval = m_defaultEmaInterval;
0073 float m_rate = 0;
0074 static constexpr float m_defaultEmaInterval = 15 * 60;
0075 static constexpr std::chrono::minutes m_defaultUpdateInterval = std::chrono::minutes(3);
0076 std::atomic<std::chrono::steady_clock::time_point> m_lastUpdate;
0077 std::atomic<std::uint_least64_t> m_events;
0078 std::atomic<std::uint_least64_t> m_lumis;
0079 std::atomic<std::uint_least64_t> m_runs;
0080 std::atomic<std::uint_least64_t> m_files;
0081 std::string m_tag;
0082 edm::ParameterSetID m_processParameterSetID;
0083
0084 std::uint_least64_t m_lastEventCount = 0;
0085 };
0086
0087 }
0088
0089 }
0090
0091 using namespace edm::service;
0092
0093 constexpr std::chrono::minutes CondorStatusService::m_defaultUpdateInterval;
0094 constexpr float CondorStatusService::m_defaultEmaInterval;
0095
0096 CondorStatusService::CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
0097 : m_debug(pset.getUntrackedParameter("debug", false)),
0098 m_lastUpdate(),
0099 m_events(0),
0100 m_lumis(0),
0101 m_runs(0),
0102 m_files(0) {
0103 m_shouldUpdate.clear();
0104 if (not pset.getUntrackedParameter("enable", true)) {
0105 return;
0106 }
0107 if (!isChirpSupported()) {
0108 return;
0109 }
0110
0111 firstUpdate();
0112
0113 ar.watchPostCloseFile(this, &CondorStatusService::filePost);
0114 ar.watchPostEvent(this, &CondorStatusService::eventPost);
0115 ar.watchPostGlobalEndLumi(this, &CondorStatusService::lumiPost);
0116 ar.watchPostGlobalEndRun(this, &CondorStatusService::runPost);
0117 ar.watchPreBeginJob(this, &CondorStatusService::beginPre);
0118 ar.watchPostBeginJob(this, &CondorStatusService::beginPost);
0119 ar.watchPostEndJob(this, &CondorStatusService::endPost);
0120
0121 if (pset.exists("updateIntervalSeconds")) {
0122 m_updateInterval = std::chrono::seconds(pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds"));
0123 }
0124 if (pset.exists("EMAInterval")) {
0125 m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
0126 }
0127 if (pset.exists("tag")) {
0128 m_tag = pset.getUntrackedParameter<std::string>("tag");
0129 }
0130 }
0131
0132 void CondorStatusService::eventPost(StreamContext const &iContext) {
0133 m_events++;
0134 update();
0135 }
0136
0137 void CondorStatusService::lumiPost(GlobalContext const &) {
0138 m_lumis++;
0139 update();
0140 }
0141
0142 void CondorStatusService::runPost(GlobalContext const &) {
0143 m_runs++;
0144 update();
0145 }
0146
0147 void CondorStatusService::filePost(std::string const & ) {
0148 m_files++;
0149 update();
0150 }
0151
0152 void CondorStatusService::beginPre(ProcessContext const &processContext) {
0153 secondUpdate();
0154 if (!m_processParameterSetID.isValid()) {
0155 m_processParameterSetID = processContext.parameterSetID();
0156 }
0157 }
0158
0159 void CondorStatusService::beginPost() {
0160 ParameterSet const &processParameterSet = edm::getParameterSet(m_processParameterSetID);
0161 const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
0162
0163 int maxEvents =
0164 processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
0165 int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet())
0166 .getUntrackedParameter<int>("input", -1);
0167
0168
0169 std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange>>(
0170 "lumisToProcess", std::vector<LuminosityBlockRange>());
0171 edm::sortAndRemoveOverlaps(toProcess);
0172 uint64_t lumiCount = 0;
0173 for (auto const &range : toProcess) {
0174 if (range.startRun() != range.endRun()) {
0175 break;
0176 }
0177 if (range.endLumi() >= edm::LuminosityBlockID::maxLuminosityBlockNumber()) {
0178 break;
0179 }
0180 lumiCount += (range.endLumi() - range.startLumi());
0181 }
0182
0183 unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
0184 if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0)) {
0185 lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
0186 }
0187
0188 std::vector<std::string> fileNames =
0189 pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
0190 std::stringstream ss_max_files;
0191 ss_max_files << fileNames.size();
0192 updateChirp("MaxFiles", ss_max_files.str());
0193
0194 if (lumiCount > 0) {
0195 if (maxLumis < 0) {
0196 maxLumis = lumiCount;
0197 }
0198 if (maxLumis > static_cast<int>(lumiCount)) {
0199 maxLumis = lumiCount;
0200 }
0201 }
0202 if (maxEvents > 0) {
0203 std::stringstream ss_max_events;
0204 ss_max_events << maxEvents;
0205 updateChirp("MaxEvents", ss_max_events.str());
0206 }
0207 if (maxLumis > 0) {
0208 std::stringstream ss_max_lumis;
0209 ss_max_lumis << maxLumis;
0210 updateChirp("MaxLumis", ss_max_lumis.str());
0211 }
0212 update();
0213 }
0214
0215 void CondorStatusService::endPost() { lastUpdate(); }
0216
0217 bool CondorStatusService::isChirpSupported() {
0218 if (m_debug) {
0219 return true;
0220 }
0221
0222 return std::getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
0223 }
0224
0225 void CondorStatusService::firstUpdate() {
0226
0227
0228
0229 updateImpl(std::chrono::steady_clock::duration());
0230 updateChirp("MaxFiles", "-1");
0231 updateChirp("MaxEvents", "-1");
0232 updateChirp("MaxLumis", "-1");
0233 updateChirp("Done", "false");
0234 updateChirpQuoted("Guid", edm::processGUID().toString());
0235 m_beginJob = TimingServiceBase::jobStartTime();
0236 if (m_beginJob == decltype(m_beginJob)()) {
0237 m_beginJob = std::chrono::steady_clock::now();
0238 }
0239 }
0240
0241 void CondorStatusService::secondUpdate() {
0242 edm::Service<edm::ResourceInformation> resourceInformationService;
0243 if (resourceInformationService.isAvailable()) {
0244 std::string models = resourceInformationService->cpuModelsFormatted();
0245 double avgSpeed = resourceInformationService->cpuAverageSpeed();
0246 if (!models.empty()) {
0247 updateChirpQuoted("CPUModels", models);
0248 updateChirp("CPUSpeed", avgSpeed);
0249 }
0250 }
0251 }
0252
0253 void CondorStatusService::lastUpdate() {
0254 auto now = std::chrono::steady_clock::now();
0255 updateImpl(now - m_lastUpdate.load());
0256 updateChirp("Done", "true");
0257 edm::Service<edm::ResourceInformation> resourceInformationService;
0258 if (!resourceInformationService.isAvailable()) {
0259 edm::LogWarning("CondorStatusService") << "At post, ResourceInformationService is NOT available.\n";
0260 }
0261 }
0262
0263 void CondorStatusService::update() {
0264 auto now = std::chrono::steady_clock::now();
0265 if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval) {
0266 if (!m_shouldUpdate.test_and_set(std::memory_order_acquire)) {
0267
0268 CMS_SA_ALLOW try {
0269 auto sinceLastUpdate = now - m_lastUpdate.load();
0270 m_lastUpdate = now;
0271 updateImpl(sinceLastUpdate);
0272 m_shouldUpdate.clear(std::memory_order_release);
0273 } catch (...) {
0274 m_shouldUpdate.clear(std::memory_order_release);
0275 throw;
0276 }
0277 }
0278 }
0279 }
0280
0281 void CondorStatusService::updateImpl(std::chrono::steady_clock::duration sinceLastUpdate) {
0282 auto now = std::chrono::steady_clock::now();
0283 auto jobTime = std::chrono::duration<double, std::ratio<1, 1>>(now - m_beginJob).count();
0284 auto secsSinceLastUpdate = std::chrono::duration_cast<std::chrono::seconds>(sinceLastUpdate).count();
0285
0286 edm::Service<edm::TimingServiceBase> timingsvc;
0287 if (timingsvc.isAvailable()) {
0288 updateChirp("TotalCPU", timingsvc->getTotalCPU());
0289 }
0290
0291 updateChirp("LastUpdate", std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
0292
0293 if (!m_events || (m_events > m_lastEventCount)) {
0294 updateChirp("Events", m_events);
0295 }
0296
0297 updateChirp("Lumis", m_lumis);
0298
0299 updateChirp("Runs", m_runs);
0300
0301 updateChirp("Files", m_files);
0302
0303 float ema_coeff = 1 - std::exp(-static_cast<float>(secsSinceLastUpdate) /
0304 std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
0305 if (secsSinceLastUpdate > 0) {
0306 updateChirp("Elapsed", jobTime);
0307 m_rate = ema_coeff * static_cast<float>(m_events - m_lastEventCount) / static_cast<float>(secsSinceLastUpdate) +
0308 (1.0 - ema_coeff) * m_rate;
0309 m_lastEventCount = m_events;
0310 updateChirp("EventRate", m_rate);
0311 }
0312
0313
0314 edm::Service<xrd_adaptor::XrdStatistics> xrdsvc;
0315 if (xrdsvc.isAvailable()) {
0316 for (auto const &iter : xrdsvc->condorUpdate()) {
0317 std::string site = iter.first;
0318 site.erase(std::remove_if(site.begin(), site.end(), [](char x) { return !isalnum(x) && (x != '_'); }),
0319 site.end());
0320 auto &iostats = iter.second;
0321 updateChirp("IOSite_" + site + "_ReadBytes", iostats.bytesRead);
0322 updateChirp("IOSite_" + site + "_ReadTimeMS",
0323 std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
0324 }
0325 }
0326
0327 using namespace edm::storage;
0328
0329 auto const &stats = StorageAccount::summary();
0330 uint64_t readOps = 0;
0331 uint64_t readVOps = 0;
0332 uint64_t readSegs = 0;
0333 uint64_t readBytes = 0;
0334 uint64_t readTimeTotal = 0;
0335 uint64_t writeBytes = 0;
0336 uint64_t writeTimeTotal = 0;
0337 const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0338 for (const auto &storage : stats) {
0339
0340
0341
0342
0343 if (storage.first == token.value()) {
0344 continue;
0345 }
0346 for (const auto &counter : storage.second) {
0347 if (counter.first == static_cast<int>(StorageAccount::Operation::read)) {
0348 readOps += counter.second.successes;
0349 readSegs++;
0350 readBytes += counter.second.amount;
0351 readTimeTotal += counter.second.timeTotal;
0352 } else if (counter.first == static_cast<int>(StorageAccount::Operation::readv)) {
0353 readVOps += counter.second.successes;
0354 readSegs += counter.second.vector_count;
0355 readBytes += counter.second.amount;
0356 readTimeTotal += counter.second.timeTotal;
0357 } else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) ||
0358 (counter.first == static_cast<int>(StorageAccount::Operation::writev))) {
0359 writeBytes += counter.second.amount;
0360 writeTimeTotal += counter.second.timeTotal;
0361 }
0362 }
0363 }
0364 updateChirp("ReadOps", readOps);
0365 updateChirp("ReadVOps", readVOps);
0366 updateChirp("ReadSegments", readSegs);
0367 updateChirp("ReadBytes", readBytes);
0368 updateChirp("ReadTimeMsecs", readTimeTotal / (1000 * 1000));
0369 updateChirp("WriteBytes", writeBytes);
0370 updateChirp("WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
0371 }
0372
0373 template <typename T>
0374 bool CondorStatusService::updateChirp(const std::string &key_suffix, const T &value) {
0375 std::stringstream ss;
0376 ss << value;
0377 return updateChirpImpl(key_suffix, ss.str());
0378 }
0379
0380 bool CondorStatusService::updateChirpQuoted(const std::string &key_suffix, const std::string &value) {
0381 std::string value_copy = value;
0382
0383
0384
0385 value_copy.erase(
0386 remove_if(
0387 value_copy.begin(), value_copy.end(), [](const char &c) { return !isascii(c) || (c == '"') || (c == '\\'); }),
0388 value_copy.end());
0389 return updateChirpImpl(key_suffix, "\"" + value_copy + "\"");
0390 }
0391
0392 bool CondorStatusService::updateChirpImpl(const std::string &key_suffix, const std::string &value) {
0393 std::stringstream ss;
0394 ss << "ChirpCMSSW" << m_tag << key_suffix;
0395 std::string key = ss.str();
0396 if (m_debug) {
0397 std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
0398 }
0399 int pid = 0;
0400 posix_spawn_file_actions_t file_actions;
0401 int devnull_fd = open("/dev/null", O_RDWR);
0402 if (devnull_fd == -1) {
0403 return false;
0404 }
0405 posix_spawn_file_actions_init(&file_actions);
0406 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
0407 posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
0408 const std::string chirp_name = "condor_chirp";
0409 const std::string set_job_attr = "set_job_attr_delayed";
0410 std::vector<const char *> argv;
0411 argv.push_back(chirp_name.c_str());
0412 argv.push_back(set_job_attr.c_str());
0413 argv.push_back(key.c_str());
0414 argv.push_back(value.c_str());
0415 argv.push_back(nullptr);
0416 int status = posix_spawnp(&pid, "condor_chirp", &file_actions, nullptr, const_cast<char *const *>(&argv[0]), environ);
0417 close(devnull_fd);
0418 posix_spawn_file_actions_destroy(&file_actions);
0419 if (status) {
0420 return false;
0421 }
0422 while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {
0423 }
0424 return status == 0;
0425 }
0426
0427 void CondorStatusService::fillDescriptions(ConfigurationDescriptions &descriptions) {
0428 ParameterSetDescription desc;
0429 desc.setComment("Service to update HTCondor with the current CMSSW status.");
0430 desc.addOptionalUntracked<unsigned int>(
0431 "updateIntervalSeconds", std::chrono::duration_cast<std::chrono::seconds>(m_defaultUpdateInterval).count())
0432 ->setComment("Interval, in seconds, for HTCondor updates");
0433 desc.addOptionalUntracked<bool>("debug", false)->setComment("Enable debugging of this service");
0434 desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
0435 ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
0436 desc.addOptionalUntracked<std::string>("tag")->setComment(
0437 "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
0438 desc.addOptionalUntracked<bool>("enable", true)->setComment("Enable this service");
0439 descriptions.add("CondorStatusService", desc);
0440 }
0441
0442 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0443
0444 typedef edm::serviceregistry::AllArgsMaker<edm::service::CondorStatusService> CondorStatusServiceMaker;
0445 DEFINE_FWK_SERVICE_MAKER(CondorStatusService, CondorStatusServiceMaker);