Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-04-13 23:19:05

0001 
0002 #include "DataFormats/Provenance/interface/ModuleDescription.h"
0003 #include "DataFormats/Provenance/interface/ParameterSetID.h"
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "FWCore/Utilities/interface/ResourceInformation.h"
0006 #include "FWCore/Utilities/interface/TimingServiceBase.h"
0007 #include "FWCore/ServiceRegistry/interface/Service.h"
0008 #include "FWCore/ServiceRegistry/interface/ProcessContext.h"
0009 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0010 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0012 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0013 #include "FWCore/ParameterSet/interface/Registry.h"
0014 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0015 #include "Utilities/XrdAdaptor/interface/XrdStatistics.h"
0016 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0017 #include "FWCore/Utilities/interface/processGUID.h"
0018 
0019 #include <fcntl.h>
0020 #include <unistd.h>
0021 #include <sys/wait.h>
0022 #include <spawn.h>
0023 #include <iostream>
0024 #include <fstream>
0025 #include <sstream>
0026 #include <cmath>
0027 #include <chrono>
0028 #include <sstream>
0029 #include <atomic>
0030 #include <string>
0031 #include <set>
0032 
0033 namespace edm {
0034 
0035   namespace service {
0036 
0037     class CondorStatusService {
0038     public:
0039       explicit CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar);
0040       ~CondorStatusService() {}
0041       CondorStatusService(const CondorStatusService &) = delete;
0042       CondorStatusService &operator=(const CondorStatusService &) = delete;
0043 
0044       static void fillDescriptions(ConfigurationDescriptions &descriptions);
0045 
0046     private:
0047       bool isChirpSupported();
0048       template <typename T>
0049       bool updateChirp(const std::string &key_suffix, const T &value);
0050       bool updateChirpQuoted(const std::string &key_suffix, const std::string &value);
0051       bool updateChirpImpl(std::string const &key, std::string const &value);
0052       inline void update();
0053       void firstUpdate();
0054       void secondUpdate();
0055       void lastUpdate();
0056       void updateImpl(std::chrono::steady_clock::duration sinceLastUpdate);
0057 
0058       void preSourceConstruction(ModuleDescription const &md, int maxEvents, int maxLumis, int maxSecondsUntilRampdown);
0059       void eventPost(StreamContext const &iContext);
0060       void lumiPost(GlobalContext const &);
0061       void runPost(GlobalContext const &);
0062       void beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext);
0063       void beginPost();
0064       void endPost();
0065       void filePost(std::string const &);
0066 
0067       bool m_debug;
0068       std::atomic_flag m_shouldUpdate;
0069       std::chrono::steady_clock::time_point m_beginJob;
0070       std::chrono::steady_clock::duration m_updateInterval =
0071           std::chrono::duration_cast<std::chrono::steady_clock::duration>(m_defaultUpdateInterval);
0072       float m_emaInterval = m_defaultEmaInterval;
0073       float m_rate = 0;
0074       static constexpr float m_defaultEmaInterval = 15 * 60;  // Time in seconds to average EMA over for event rate.
0075       static constexpr std::chrono::minutes m_defaultUpdateInterval = std::chrono::minutes(3);
0076       std::atomic<std::chrono::steady_clock::time_point> m_lastUpdate;
0077       std::atomic<std::uint_least64_t> m_events;
0078       std::atomic<std::uint_least64_t> m_lumis;
0079       std::atomic<std::uint_least64_t> m_runs;
0080       std::atomic<std::uint_least64_t> m_files;
0081       std::string m_tag;
0082       edm::ParameterSetID m_processParameterSetID;
0083 
0084       std::uint_least64_t m_lastEventCount = 0;
0085     };
0086     inline bool isProcessWideService(CondorStatusService const *) { return true; }
0087 
0088   }  // namespace service
0089 
0090 }  // namespace edm
0091 
0092 using namespace edm::service;
0093 
0094 constexpr std::chrono::minutes CondorStatusService::m_defaultUpdateInterval;
0095 constexpr float CondorStatusService::m_defaultEmaInterval;
0096 
0097 CondorStatusService::CondorStatusService(ParameterSet const &pset, edm::ActivityRegistry &ar)
0098     : m_debug(pset.getUntrackedParameter("debug", false)),
0099       m_lastUpdate(),
0100       m_events(0),
0101       m_lumis(0),
0102       m_runs(0),
0103       m_files(0) {
0104   m_shouldUpdate.clear();
0105   if (not pset.getUntrackedParameter("enable", true)) {
0106     return;
0107   }
0108   if (!isChirpSupported()) {
0109     return;
0110   }
0111 
0112   firstUpdate();
0113 
0114   ar.watchPostCloseFile(this, &CondorStatusService::filePost);
0115   ar.watchPostEvent(this, &CondorStatusService::eventPost);
0116   ar.watchPostGlobalEndLumi(this, &CondorStatusService::lumiPost);
0117   ar.watchPostGlobalEndRun(this, &CondorStatusService::runPost);
0118   ar.watchPreBeginJob(this, &CondorStatusService::beginPre);
0119   ar.watchPostBeginJob(this, &CondorStatusService::beginPost);
0120   ar.watchPostEndJob(this, &CondorStatusService::endPost);
0121 
0122   if (pset.exists("updateIntervalSeconds")) {
0123     m_updateInterval = std::chrono::seconds(pset.getUntrackedParameter<unsigned int>("updateIntervalSeconds"));
0124   }
0125   if (pset.exists("EMAInterval")) {
0126     m_emaInterval = pset.getUntrackedParameter<double>("EMAInterval");
0127   }
0128   if (pset.exists("tag")) {
0129     m_tag = pset.getUntrackedParameter<std::string>("tag");
0130   }
0131 }
0132 
0133 void CondorStatusService::eventPost(StreamContext const &iContext) {
0134   m_events++;
0135   update();
0136 }
0137 
0138 void CondorStatusService::lumiPost(GlobalContext const &) {
0139   m_lumis++;
0140   update();
0141 }
0142 
0143 void CondorStatusService::runPost(GlobalContext const &) {
0144   m_runs++;
0145   update();
0146 }
0147 
0148 void CondorStatusService::filePost(std::string const & /*lfn*/) {
0149   m_files++;
0150   update();
0151 }
0152 
0153 void CondorStatusService::beginPre(PathsAndConsumesOfModulesBase const &, ProcessContext const &processContext) {
0154   secondUpdate();
0155   if (!m_processParameterSetID.isValid()) {
0156     m_processParameterSetID = processContext.parameterSetID();
0157   }
0158 }
0159 
0160 void CondorStatusService::beginPost() {
0161   ParameterSet const &processParameterSet = edm::getParameterSet(m_processParameterSetID);
0162   const edm::ParameterSet &pset = processParameterSet.getParameterSet("@main_input");
0163   // PSet info from edm::ScheduleItems
0164   int maxEvents =
0165       processParameterSet.getUntrackedParameterSet("maxEvents", ParameterSet()).getUntrackedParameter<int>("input", -1);
0166   int maxLumis = processParameterSet.getUntrackedParameterSet("maxLuminosityBlocks", ParameterSet())
0167                      .getUntrackedParameter<int>("input", -1);
0168 
0169   // lumisToProcess from EventSkipperByID (PoolSource and similar)
0170   std::vector<edm::LuminosityBlockRange> toProcess = pset.getUntrackedParameter<std::vector<LuminosityBlockRange>>(
0171       "lumisToProcess", std::vector<LuminosityBlockRange>());
0172   edm::sortAndRemoveOverlaps(toProcess);
0173   uint64_t lumiCount = 0;
0174   for (auto const &range : toProcess) {
0175     if (range.startRun() != range.endRun()) {
0176       break;
0177     }
0178     if (range.endLumi() >= edm::LuminosityBlockID::maxLuminosityBlockNumber()) {
0179       break;
0180     }
0181     lumiCount += (range.endLumi() - range.startLumi());
0182   }
0183   // Handle sources deriving from ProducerSourceBase
0184   unsigned int eventsPerLumi = pset.getUntrackedParameter<unsigned int>("numberEventsInLuminosityBlock", 0);
0185   if ((lumiCount == 0) && (maxEvents > 0) && (eventsPerLumi > 0)) {
0186     lumiCount = static_cast<unsigned int>(std::ceil(static_cast<float>(maxEvents) / static_cast<float>(eventsPerLumi)));
0187   }
0188 
0189   std::vector<std::string> fileNames =
0190       pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>());
0191   std::stringstream ss_max_files;
0192   ss_max_files << fileNames.size();
0193   updateChirp("MaxFiles", ss_max_files.str());
0194 
0195   if (lumiCount > 0) {
0196     if (maxLumis < 0) {
0197       maxLumis = lumiCount;
0198     }
0199     if (maxLumis > static_cast<int>(lumiCount)) {
0200       maxLumis = lumiCount;
0201     }
0202   }
0203   if (maxEvents > 0) {
0204     std::stringstream ss_max_events;
0205     ss_max_events << maxEvents;
0206     updateChirp("MaxEvents", ss_max_events.str());
0207   }
0208   if (maxLumis > 0) {
0209     std::stringstream ss_max_lumis;
0210     ss_max_lumis << maxLumis;
0211     updateChirp("MaxLumis", ss_max_lumis.str());
0212   }
0213   update();
0214 }
0215 
0216 void CondorStatusService::endPost() { lastUpdate(); }
0217 
0218 bool CondorStatusService::isChirpSupported() {
0219   if (m_debug) {
0220     return true;
0221   }
0222 
0223   return std::getenv("_CONDOR_CHIRP_CONFIG") && updateChirp("Elapsed", "0");
0224 }
0225 
0226 void CondorStatusService::firstUpdate() {
0227   // Note we always update all our statistics to 0 / false / -1
0228   // This allows us to overwrite the activities of a previous cmsRun process
0229   // within this HTCondor job.
0230   updateImpl(std::chrono::steady_clock::duration());
0231   updateChirp("MaxFiles", "-1");
0232   updateChirp("MaxEvents", "-1");
0233   updateChirp("MaxLumis", "-1");
0234   updateChirp("Done", "false");
0235   updateChirpQuoted("Guid", edm::processGUID().toString());
0236   m_beginJob = TimingServiceBase::jobStartTime();
0237   if (m_beginJob == decltype(m_beginJob)()) {
0238     m_beginJob = std::chrono::steady_clock::now();
0239   }
0240 }
0241 
0242 void CondorStatusService::secondUpdate() {
0243   edm::Service<edm::ResourceInformation> resourceInformationService;
0244   if (resourceInformationService.isAvailable()) {
0245     std::string models = resourceInformationService->cpuModelsFormatted();
0246     double avgSpeed = resourceInformationService->cpuAverageSpeed();
0247     if (!models.empty()) {
0248       updateChirpQuoted("CPUModels", models);
0249       updateChirp("CPUSpeed", avgSpeed);
0250     }
0251   }
0252 }
0253 
0254 void CondorStatusService::lastUpdate() {
0255   auto now = std::chrono::steady_clock::now();
0256   updateImpl(now - m_lastUpdate.load());
0257   updateChirp("Done", "true");
0258   edm::Service<edm::ResourceInformation> resourceInformationService;
0259   if (!resourceInformationService.isAvailable()) {
0260     edm::LogWarning("CondorStatusService") << "At post, ResourceInformationService is NOT available.\n";
0261   }
0262 }
0263 
0264 void CondorStatusService::update() {
0265   auto now = std::chrono::steady_clock::now();
0266   if ((now - m_lastUpdate.load(std::memory_order_relaxed)) > m_updateInterval) {
0267     if (!m_shouldUpdate.test_and_set(std::memory_order_acquire)) {
0268       // Caught exception is rethrown
0269       CMS_SA_ALLOW try {
0270         auto sinceLastUpdate = now - m_lastUpdate.load();
0271         m_lastUpdate = now;
0272         updateImpl(sinceLastUpdate);
0273         m_shouldUpdate.clear(std::memory_order_release);
0274       } catch (...) {
0275         m_shouldUpdate.clear(std::memory_order_release);
0276         throw;
0277       }
0278     }
0279   }
0280 }
0281 
0282 void CondorStatusService::updateImpl(std::chrono::steady_clock::duration sinceLastUpdate) {
0283   auto now = std::chrono::steady_clock::now();
0284   auto jobTime = std::chrono::duration<double, std::ratio<1, 1>>(now - m_beginJob).count();
0285   auto secsSinceLastUpdate = std::chrono::duration_cast<std::chrono::seconds>(sinceLastUpdate).count();
0286 
0287   edm::Service<edm::TimingServiceBase> timingsvc;
0288   if (timingsvc.isAvailable()) {
0289     updateChirp("TotalCPU", timingsvc->getTotalCPU());
0290   }
0291 
0292   updateChirp("LastUpdate", std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()));
0293 
0294   if (!m_events || (m_events > m_lastEventCount)) {
0295     updateChirp("Events", m_events);
0296   }
0297 
0298   updateChirp("Lumis", m_lumis);
0299 
0300   updateChirp("Runs", m_runs);
0301 
0302   updateChirp("Files", m_files);
0303 
0304   float ema_coeff = 1 - std::exp(-static_cast<float>(secsSinceLastUpdate) /
0305                                  std::max(std::min(m_emaInterval, static_cast<float>(jobTime)), 1.0f));
0306   if (secsSinceLastUpdate > 0) {
0307     updateChirp("Elapsed", jobTime);
0308     m_rate = ema_coeff * static_cast<float>(m_events - m_lastEventCount) / static_cast<float>(secsSinceLastUpdate) +
0309              (1.0 - ema_coeff) * m_rate;
0310     m_lastEventCount = m_events;
0311     updateChirp("EventRate", m_rate);
0312   }
0313 
0314   // If Xrootd was used, pull the statistics from there.
0315   edm::Service<xrd_adaptor::XrdStatistics> xrdsvc;
0316   if (xrdsvc.isAvailable()) {
0317     for (auto const &iter : xrdsvc->condorUpdate()) {
0318       std::string site = iter.first;
0319       site.erase(std::remove_if(site.begin(), site.end(), [](char x) { return !isalnum(x) && (x != '_'); }),
0320                  site.end());
0321       auto &iostats = iter.second;
0322       updateChirp("IOSite_" + site + "_ReadBytes", iostats.bytesRead);
0323       updateChirp("IOSite_" + site + "_ReadTimeMS",
0324                   std::chrono::duration_cast<std::chrono::milliseconds>(iostats.transferTime).count());
0325     }
0326   }
0327 
0328   using namespace edm::storage;
0329   // Update storage account information
0330   auto const &stats = StorageAccount::summary();
0331   uint64_t readOps = 0;
0332   uint64_t readVOps = 0;
0333   uint64_t readSegs = 0;
0334   uint64_t readBytes = 0;
0335   uint64_t readTimeTotal = 0;
0336   uint64_t writeBytes = 0;
0337   uint64_t writeTimeTotal = 0;
0338   const auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0339   for (const auto &storage : stats) {
0340     // StorageAccount records statistics for both the TFile layer and the
0341     // StorageFactory layer.  However, the StorageFactory statistics tend to
0342     // be more accurate as various backends may alter the incoming read requests
0343     // (such as when lazy-download is used).
0344     if (storage.first == token.value()) {
0345       continue;
0346     }
0347     for (const auto &counter : storage.second) {
0348       if (counter.first == static_cast<int>(StorageAccount::Operation::read)) {
0349         readOps += counter.second.successes;
0350         readSegs++;
0351         readBytes += counter.second.amount;
0352         readTimeTotal += counter.second.timeTotal;
0353       } else if (counter.first == static_cast<int>(StorageAccount::Operation::readv)) {
0354         readVOps += counter.second.successes;
0355         readSegs += counter.second.vector_count;
0356         readBytes += counter.second.amount;
0357         readTimeTotal += counter.second.timeTotal;
0358       } else if ((counter.first == static_cast<int>(StorageAccount::Operation::write)) ||
0359                  (counter.first == static_cast<int>(StorageAccount::Operation::writev))) {
0360         writeBytes += counter.second.amount;
0361         writeTimeTotal += counter.second.timeTotal;
0362       }
0363     }
0364   }
0365   updateChirp("ReadOps", readOps);
0366   updateChirp("ReadVOps", readVOps);
0367   updateChirp("ReadSegments", readSegs);
0368   updateChirp("ReadBytes", readBytes);
0369   updateChirp("ReadTimeMsecs", readTimeTotal / (1000 * 1000));
0370   updateChirp("WriteBytes", writeBytes);
0371   updateChirp("WriteTimeMsecs", writeTimeTotal / (1000 * 1000));
0372 }
0373 
0374 template <typename T>
0375 bool CondorStatusService::updateChirp(const std::string &key_suffix, const T &value) {
0376   std::stringstream ss;
0377   ss << value;
0378   return updateChirpImpl(key_suffix, ss.str());
0379 }
0380 
0381 bool CondorStatusService::updateChirpQuoted(const std::string &key_suffix, const std::string &value) {
0382   std::string value_copy = value;
0383   // Remove double-quotes or the \ character (as it has special escaping semantics in ClassAds).
0384   // Make sure we have ASCII characters.
0385   // Otherwise, remainder is allowed (including tabs, newlines, single-quotes).
0386   value_copy.erase(
0387       remove_if(
0388           value_copy.begin(), value_copy.end(), [](const char &c) { return !isascii(c) || (c == '"') || (c == '\\'); }),
0389       value_copy.end());
0390   return updateChirpImpl(key_suffix, "\"" + value_copy + "\"");
0391 }
0392 
0393 bool CondorStatusService::updateChirpImpl(const std::string &key_suffix, const std::string &value) {
0394   std::stringstream ss;
0395   ss << "ChirpCMSSW" << m_tag << key_suffix;
0396   std::string key = ss.str();
0397   if (m_debug) {
0398     std::cout << "condor_chirp set_job_attr_delayed " << key << " " << value << std::endl;
0399   }
0400   int pid = 0;
0401   posix_spawn_file_actions_t file_actions;
0402   int devnull_fd = open("/dev/null", O_RDWR);
0403   if (devnull_fd == -1) {
0404     return false;
0405   }
0406   posix_spawn_file_actions_init(&file_actions);
0407   posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 1);
0408   posix_spawn_file_actions_adddup2(&file_actions, devnull_fd, 2);
0409   const std::string chirp_name = "condor_chirp";
0410   const std::string set_job_attr = "set_job_attr_delayed";
0411   std::vector<const char *> argv;
0412   argv.push_back(chirp_name.c_str());
0413   argv.push_back(set_job_attr.c_str());
0414   argv.push_back(key.c_str());
0415   argv.push_back(value.c_str());
0416   argv.push_back(nullptr);
0417   int status = posix_spawnp(&pid, "condor_chirp", &file_actions, nullptr, const_cast<char *const *>(&argv[0]), environ);
0418   close(devnull_fd);
0419   posix_spawn_file_actions_destroy(&file_actions);
0420   if (status) {
0421     return false;
0422   }
0423   while ((waitpid(pid, &status, 0) == -1) && errno == -EINTR) {
0424   }
0425   return status == 0;
0426 }
0427 
0428 void CondorStatusService::fillDescriptions(ConfigurationDescriptions &descriptions) {
0429   ParameterSetDescription desc;
0430   desc.setComment("Service to update HTCondor with the current CMSSW status.");
0431   desc.addOptionalUntracked<unsigned int>(
0432           "updateIntervalSeconds", std::chrono::duration_cast<std::chrono::seconds>(m_defaultUpdateInterval).count())
0433       ->setComment("Interval, in seconds, for HTCondor updates");
0434   desc.addOptionalUntracked<bool>("debug", false)->setComment("Enable debugging of this service");
0435   desc.addOptionalUntracked<double>("EMAInterval", m_defaultEmaInterval)
0436       ->setComment("Interval, in seconds, to calculate event rate over (using EMA)");
0437   desc.addOptionalUntracked<std::string>("tag")->setComment(
0438       "Identifier tag for this process (a value of 'Foo' results in ClassAd attributes of the form 'ChirpCMSSWFoo*')");
0439   desc.addOptionalUntracked<bool>("enable", true)->setComment("Enable this service");
0440   descriptions.add("CondorStatusService", desc);
0441 }
0442 
0443 #include "FWCore/ServiceRegistry/interface/ServiceMaker.h"
0444 
0445 typedef edm::serviceregistry::AllArgsMaker<edm::service::CondorStatusService> CondorStatusServiceMaker;
0446 DEFINE_FWK_SERVICE_MAKER(CondorStatusService, CondorStatusServiceMaker);