Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:31:52

0001 
0002 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0003 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0004 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0005 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/Utilities/interface/processGUID.h"
0009 #include "FWCore/Version/interface/GetReleaseVersion.h"
0010 
0011 #include <string>
0012 #include <cmath>
0013 
0014 #include <unistd.h>
0015 #include <fcntl.h>
0016 
0017 #include <openssl/x509.h>
0018 #include <openssl/pem.h>
0019 
0020 #define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", ";
0021 
0022 // Simple hack to define HOST_NAME_MAX on Mac.
0023 // Allows arrays to be statically allocated
0024 #ifndef HOST_NAME_MAX
0025 #define HOST_NAME_MAX 128
0026 #endif
0027 
0028 static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID";
0029 static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId";
0030 
0031 using namespace edm::storage;
0032 
0033 StatisticsSenderService::FileStatistics::FileStatistics()
0034     : m_read_single_operations(0),
0035       m_read_single_bytes(0),
0036       m_read_single_square(0),
0037       m_read_vector_operations(0),
0038       m_read_vector_bytes(0),
0039       m_read_vector_square(0),
0040       m_read_vector_count_sum(0),
0041       m_read_vector_count_square(0),
0042       m_start_time(time(nullptr)) {}
0043 
0044 void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const {
0045   const StorageAccount::StorageStats &stats = StorageAccount::summary();
0046   ssize_t read_single_operations = 0;
0047   ssize_t read_single_bytes = 0;
0048   ssize_t read_single_square = 0;
0049   ssize_t read_vector_operations = 0;
0050   ssize_t read_vector_bytes = 0;
0051   ssize_t read_vector_square = 0;
0052   ssize_t read_vector_count_sum = 0;
0053   ssize_t read_vector_count_square = 0;
0054   auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0055   for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
0056     if (i->first == token.value()) {
0057       continue;
0058     }
0059     for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
0060       if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
0061         read_vector_operations += j->second.attempts;
0062         read_vector_bytes += j->second.amount;
0063         read_vector_count_square += j->second.vector_square;
0064         read_vector_square += j->second.amount_square;
0065         read_vector_count_sum += j->second.vector_count;
0066       } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
0067         read_single_operations += j->second.attempts;
0068         read_single_bytes += j->second.amount;
0069         read_single_square += j->second.amount_square;
0070       }
0071     }
0072   }
0073   int64_t single_op_count = read_single_operations - m_read_single_operations;
0074   if (single_op_count > 0) {
0075     double single_sum = read_single_bytes - m_read_single_bytes;
0076     double single_average = single_sum / static_cast<double>(single_op_count);
0077     os << "\"read_single_sigma\":"
0078        << sqrt(std::abs((static_cast<double>(read_single_square - m_read_single_square) -
0079                          single_average * single_average * single_op_count) /
0080                         static_cast<double>(single_op_count)))
0081        << ", ";
0082     os << "\"read_single_average\":" << single_average << ", ";
0083   }
0084   int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
0085   if (vector_op_count > 0) {
0086     double vector_average =
0087         static_cast<double>(read_vector_bytes - m_read_vector_bytes) / static_cast<double>(vector_op_count);
0088     os << "\"read_vector_average\":" << vector_average << ", ";
0089     os << "\"read_vector_sigma\":"
0090        << sqrt(std::abs((static_cast<double>(read_vector_square - m_read_vector_square) -
0091                          vector_average * vector_average * vector_op_count) /
0092                         static_cast<double>(vector_op_count)))
0093        << ", ";
0094     double vector_count_average =
0095         static_cast<double>(read_vector_count_sum - m_read_vector_count_sum) / static_cast<double>(vector_op_count);
0096     os << "\"read_vector_count_average\":" << vector_count_average << ", ";
0097     os << "\"read_vector_count_sigma\":"
0098        << sqrt(std::abs((static_cast<double>(read_vector_count_square - m_read_vector_count_square) -
0099                          vector_count_average * vector_count_average * vector_op_count) /
0100                         static_cast<double>(vector_op_count)))
0101        << ", ";
0102   }
0103 
0104   os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes)
0105      << ", ";
0106   os << "\"read_bytes_at_close\":"
0107      << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
0108 
0109   // See top of file for macros; not complex, just avoiding copy/paste
0110   OUTPUT_STATISTIC(read_single_operations)
0111   OUTPUT_STATISTIC(read_single_bytes)
0112   OUTPUT_STATISTIC(read_vector_operations)
0113   OUTPUT_STATISTIC(read_vector_bytes)
0114 
0115   os << "\"start_time\":" << m_start_time << ", ";
0116   // NOTE: last entry doesn't have the trailing comma.
0117   os << "\"end_time\":" << time(nullptr);
0118 }
0119 
0120 void StatisticsSenderService::FileStatistics::update() {
0121   const StorageAccount::StorageStats &stats = StorageAccount::summary();
0122   ssize_t read_single_operations = 0;
0123   ssize_t read_single_bytes = 0;
0124   ssize_t read_single_square = 0;
0125   ssize_t read_vector_operations = 0;
0126   ssize_t read_vector_bytes = 0;
0127   ssize_t read_vector_square = 0;
0128   ssize_t read_vector_count_sum = 0;
0129   ssize_t read_vector_count_square = 0;
0130   auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0131   for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
0132     if (i->first == token.value()) {
0133       continue;
0134     }
0135     for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
0136       if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
0137         read_vector_operations += j->second.attempts;
0138         read_vector_bytes += j->second.amount;
0139         read_vector_count_square += j->second.vector_square;
0140         read_vector_square += j->second.amount_square;
0141         read_vector_count_sum += j->second.vector_count;
0142       } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
0143         read_single_operations += j->second.attempts;
0144         read_single_bytes += j->second.amount;
0145         read_single_square += j->second.amount_square;
0146       }
0147     }
0148   }
0149 
0150   m_read_single_square = read_single_square;
0151   m_read_vector_square = read_vector_square;
0152   m_read_vector_count_square = read_vector_count_square;
0153   m_read_vector_count_sum = read_vector_count_sum;
0154   m_read_single_operations = read_single_operations;
0155   m_read_single_bytes = read_single_bytes;
0156   m_read_vector_operations = read_vector_operations;
0157   m_read_vector_bytes = read_vector_bytes;
0158   m_start_time = time(nullptr);
0159 }
0160 StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN, edm::InputType iType)
0161     : m_filelfn(iLFN),
0162       m_serverhost("unknown"),
0163       m_serverdomain("unknown"),
0164       m_type(iType),
0165       m_size(-1),
0166       m_id(0),
0167       m_openCount(1) {}
0168 
0169 StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, edm::ActivityRegistry &ar)
0170     : m_clienthost("unknown"),
0171       m_clientdomain("unknown"),
0172       m_filestats(),
0173       m_guid(edm::processGUID().toString()),
0174       m_counter(0),
0175       m_userdn("unknown"),
0176       m_debug(iPSet.getUntrackedParameter<bool>("debug", false)) {
0177   determineHostnames();
0178   ar.watchPostCloseFile(this, &StatisticsSenderService::filePostCloseEvent);
0179   if (!getX509Subject(m_userdn)) {
0180     m_userdn = "unknown";
0181   }
0182 }
0183 
0184 const char *StatisticsSenderService::getJobID() {
0185   const char *id = std::getenv(JOB_UNIQUE_ID_ENV);
0186   // Dashboard developers requested that we migrate to this environment variable.
0187   return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2);
0188 }
0189 
0190 std::string const *StatisticsSenderService::matchedLfn(std::string const &iURL) {
0191   auto found = m_urlToLfn.find(iURL);
0192   if (found != m_urlToLfn.end()) {
0193     return &found->second;
0194   }
0195   for (auto const &v : m_lfnToFileInfo) {
0196     if (v.first.size() < iURL.size()) {
0197       if (v.first == iURL.substr(iURL.size() - v.first.size())) {
0198         m_urlToLfn.emplace(iURL, v.first);
0199         return &m_urlToLfn.find(iURL)->second;
0200       }
0201     }
0202   }
0203   //does the lfn have a protocol and the iURL not?
0204   if (std::string::npos == iURL.find(':')) {
0205     for (auto const &v : m_lfnToFileInfo) {
0206       if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) {
0207         if (iURL == v.first.substr(v.first.size() - iURL.size())) {
0208           m_urlToLfn.emplace(iURL, v.first);
0209           return &m_urlToLfn.find(iURL)->second;
0210         }
0211       }
0212     }
0213   }
0214 
0215   return nullptr;
0216 }
0217 
0218 void StatisticsSenderService::setCurrentServer(const std::string &url, const std::string &servername) {
0219   size_t dot_pos = servername.find('.');
0220   std::string serverhost;
0221   std::string serverdomain;
0222   if (dot_pos == std::string::npos) {
0223     serverhost = servername.substr(0, servername.find(':'));
0224     serverdomain = "unknown";
0225   } else {
0226     serverhost = servername.substr(0, dot_pos);
0227     serverdomain = servername.substr(dot_pos + 1, servername.find(':') - dot_pos - 1);
0228     if (serverdomain.empty()) {
0229       serverdomain = "unknown";
0230     }
0231   }
0232   {
0233     auto lfn = matchedLfn(url);
0234     std::lock_guard<std::mutex> sentry(m_servermutex);
0235     if (nullptr != lfn) {
0236       auto found = m_lfnToFileInfo.find(*lfn);
0237       if (found != m_lfnToFileInfo.end()) {
0238         found->second.m_serverhost = std::move(serverhost);
0239         found->second.m_serverdomain = std::move(serverdomain);
0240       }
0241     } else if (m_debug) {
0242       edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n";
0243     }
0244   }
0245 }
0246 
0247 void StatisticsSenderService::openingFile(std::string const &lfn, edm::InputType type, size_t size) {
0248   m_urlToLfn.emplace(lfn, lfn);
0249   auto attempt = m_lfnToFileInfo.emplace(lfn, FileInfo{lfn, type});
0250   if (attempt.second) {
0251     attempt.first->second.m_size = size;
0252     attempt.first->second.m_id = m_counter++;
0253     edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n";
0254   } else {
0255     ++(attempt.first->second.m_openCount);
0256     edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n";
0257   }
0258 }
0259 
0260 void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) {
0261   edm::Service<edm::SiteLocalConfig> pSLC;
0262   if (!pSLC.isAvailable()) {
0263     return;
0264   }
0265 
0266   const struct addrinfo *addresses = pSLC->statisticsDestination();
0267   if (!addresses and !m_debug) {
0268     return;
0269   }
0270 
0271   std::set<std::string> const *info = pSLC->statisticsInfo();
0272   if (info && !info->empty() && (m_userdn != "unknown") &&
0273       ((info->find("dn") == info->end()) || (info->find("nodn") != info->end()))) {
0274     m_userdn = "not reported";
0275   }
0276 
0277   auto lfn = matchedLfn(url);
0278   if (nullptr != lfn) {
0279     auto found = m_lfnToFileInfo.find(*lfn);
0280     assert(found != m_lfnToFileInfo.end());
0281 
0282     std::string results;
0283     fillUDP(pSLC->siteName(), found->second, usedFallback, results);
0284     if (m_debug) {
0285       edm::LogSystem("StatisticSenderService") << "\n" << results << "\n";
0286     }
0287 
0288     for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
0289       int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
0290       if (sock < 0) {
0291         continue;
0292       }
0293       auto close_del = [](int *iSocket) { close(*iSocket); };
0294       std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
0295       if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
0296         break;
0297       }
0298     }
0299 
0300     auto c = --found->second.m_openCount;
0301     if (m_debug) {
0302       if (c == 0) {
0303         edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n";
0304       } else {
0305         edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n";
0306       }
0307     }
0308   } else if (m_debug) {
0309     edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n";
0310   }
0311 }
0312 
0313 void StatisticsSenderService::cleanupOldFiles() {
0314   //remove entries with openCount of 0
0315   bool moreToTest = false;
0316   do {
0317     moreToTest = false;
0318     for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) {
0319       if (it->second.m_openCount == 0) {
0320         auto lfn = it->first;
0321         bool moreToTest2 = false;
0322         do {
0323           moreToTest2 = false;
0324           for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) {
0325             if (it2->second == lfn) {
0326               m_urlToLfn.unsafe_erase(it2);
0327               moreToTest2 = true;
0328               break;
0329             }
0330           }
0331         } while (moreToTest2);
0332 
0333         m_lfnToFileInfo.unsafe_erase(it);
0334         moreToTest = true;
0335         break;
0336       }
0337     }
0338   } while (moreToTest);
0339 }
0340 
0341 void StatisticsSenderService::setSize(const std::string &url, size_t size) {
0342   auto lfn = matchedLfn(url);
0343   if (nullptr != lfn) {
0344     auto itFound = m_lfnToFileInfo.find(*lfn);
0345     if (itFound != m_lfnToFileInfo.end()) {
0346       itFound->second.m_size = size;
0347     }
0348   } else if (m_debug) {
0349     edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n";
0350   }
0351 }
0352 
0353 void StatisticsSenderService::filePostCloseEvent(std::string const &lfn) {
0354   //we are at a sync point in the framwework so no new files are being opened
0355   cleanupOldFiles();
0356   m_filestats.update();
0357 }
0358 
0359 void StatisticsSenderService::determineHostnames(void) {
0360   char tmpName[HOST_NAME_MAX];
0361   if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
0362     // Sigh, no way to log errors from here.
0363     m_clienthost = "unknown";
0364   } else {
0365     m_clienthost = tmpName;
0366   }
0367   size_t dot_pos = m_clienthost.find('.');
0368   if (dot_pos == std::string::npos) {
0369     m_clientdomain = "unknown";
0370   } else {
0371     m_clientdomain = m_clienthost.substr(dot_pos + 1, m_clienthost.size() - dot_pos - 1);
0372     m_clienthost = m_clienthost.substr(0, dot_pos);
0373   }
0374 }
0375 
0376 void StatisticsSenderService::fillUDP(const std::string &siteName,
0377                                       const FileInfo &fileinfo,
0378                                       bool usedFallback,
0379                                       std::string &udpinfo) const {
0380   std::ostringstream os;
0381 
0382   // Header - same for all IO accesses
0383   os << "{";
0384   if (!siteName.empty()) {
0385     os << "\"site_name\":\"" << siteName << "\", ";
0386   }
0387   // edm::getReleaseVersion() returns a string that includes quotation
0388   // marks, therefore they are not added here
0389   os << "\"cmssw_version\":" << edm::getReleaseVersion() << ", ";
0390   if (usedFallback) {
0391     os << "\"fallback\": true, ";
0392   } else {
0393     os << "\"fallback\": false, ";
0394   }
0395   os << "\"read_type\": ";
0396   switch (fileinfo.m_type) {
0397     case edm::InputType::Primary: {
0398       os << "\"primary\", ";
0399       break;
0400     }
0401     case edm::InputType::SecondaryFile: {
0402       os << "\"secondary\", ";
0403       break;
0404     }
0405     case edm::InputType::SecondarySource: {
0406       os << "\"embedded\", ";
0407       break;
0408     }
0409   }
0410   auto serverhost = fileinfo.m_serverhost;
0411   auto serverdomain = fileinfo.m_serverdomain;
0412 
0413   os << "\"user_dn\":\"" << m_userdn << "\", ";
0414   os << "\"client_host\":\"" << m_clienthost << "\", ";
0415   os << "\"client_domain\":\"" << m_clientdomain << "\", ";
0416   os << "\"server_host\":\"" << serverhost << "\", ";
0417   os << "\"server_domain\":\"" << serverdomain << "\", ";
0418   os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", ";
0419   os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", ";
0420   // Dashboard devs requested that we send out no app_info if a job ID
0421   // is not present in the environment.
0422   const char *jobId = getJobID();
0423   if (jobId) {
0424     os << "\"app_info\":\"" << jobId << "\", ";
0425   }
0426 
0427   if (fileinfo.m_size >= 0) {
0428     os << "\"file_size\":" << fileinfo.m_size << ", ";
0429   }
0430 
0431   m_filestats.fillUDP(os);
0432 
0433   os << "}";
0434   udpinfo = os.str();
0435 }
0436 
0437 /*
0438  * Pull the X509 user subject from the environment.
0439  * Based on initial code from the Frontier client:
0440  *   http://cdcvs.fnal.gov/cgi-bin/public-cvs/cvsweb-public.cgi/~checkout~/frontier/client/frontier.c?rev=1.57&content-type=text/plain
0441  * This was further extended by walking up the returned chain similar to the Globus function
0442  *   globus_gsi_cert_utils-6.6/library/globus_gsi_cert_utils.c:globus_gsi_cert_utils_get_eec
0443  *   globus_gsi_credential-3.5/library/globus_gsi_credential.c:globus_gsi_cred_read_proxy_bio
0444  */
0445 
0446 /* 
0447  * Given a stack of x509 proxies, take a guess at the EEC.
0448  * Assumes the proxies are in reverse sorted order and looks for the first
0449  * proxy which is not a substring of the prior proxy.
0450  * THIS DOES NOT VERIFY THE RESULTS, and is a best-effort GUESS.
0451  * Again, DO NOT REUSE THIS CODE THINKING IT VERIFIES THE CHAIN!
0452  */
0453 static X509 *findEEC(STACK_OF(X509) * certstack) {
0454   int depth = sk_X509_num(certstack);
0455   if (depth == 0) {
0456     return nullptr;
0457   }
0458   int idx = depth - 1;
0459   char *priorsubject = nullptr;
0460   char *subject = nullptr;
0461   X509 *x509cert = sk_X509_value(certstack, idx);
0462   for (; x509cert && idx > 0; idx--) {
0463     subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
0464     if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
0465       break;
0466     }
0467     x509cert = sk_X509_value(certstack, idx);
0468     if (subject) {
0469       OPENSSL_free(subject);
0470       subject = nullptr;
0471     }
0472   }
0473   if (subject) {
0474     OPENSSL_free(subject);
0475     subject = nullptr;
0476   }
0477   return x509cert;
0478 }
0479 
0480 static bool getX509SubjectFromFile(const std::string &filename, std::string &result) {
0481   BIO *biof = nullptr;
0482   STACK_OF(X509) *certs = nullptr;
0483   char *subject = nullptr;
0484   unsigned char *data = nullptr;
0485   char *header = nullptr;
0486   char *name = nullptr;
0487   long len = 0U;
0488 
0489   if ((biof = BIO_new_file(filename.c_str(), "r"))) {
0490     certs = sk_X509_new_null();
0491     bool encountered_error = false;
0492     while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
0493       if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
0494         X509 *tmp_cert = nullptr;
0495         // See WARNINGS section in http://www.openssl.org/docs/crypto/d2i_X509.html
0496         // Without this cmsRun crashes on a mac with a valid grid proxy.
0497         const unsigned char *p;
0498         p = data;
0499         tmp_cert = d2i_X509(&tmp_cert, &p, len);
0500         if (tmp_cert) {
0501           sk_X509_push(certs, tmp_cert);
0502         } else {
0503           encountered_error = true;
0504         }
0505       }  // Note we ignore any proxy key in the file.
0506       if (data) {
0507         OPENSSL_free(data);
0508         data = nullptr;
0509       }
0510       if (header) {
0511         OPENSSL_free(header);
0512         header = nullptr;
0513       }
0514       if (name) {
0515         OPENSSL_free(name);
0516         name = nullptr;
0517       }
0518     }
0519     X509 *x509cert = nullptr;
0520     if (!encountered_error && sk_X509_num(certs)) {
0521       x509cert = findEEC(certs);
0522     }
0523     if (x509cert) {
0524       subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
0525     }
0526     // Note we do not free x509cert directly, as it's still owned by the certs stack.
0527     if (certs) {
0528       sk_X509_pop_free(certs, X509_free);
0529       x509cert = nullptr;
0530     }
0531     BIO_free(biof);
0532     if (subject) {
0533       result = subject;
0534       OPENSSL_free(subject);
0535       return true;
0536     }
0537   }
0538   return false;
0539 }
0540 
0541 bool StatisticsSenderService::getX509Subject(std::string &result) {
0542   char *filename = std::getenv("X509_USER_PROXY");
0543   if (filename && getX509SubjectFromFile(filename, result)) {
0544     return true;
0545   }
0546   std::stringstream ss;
0547   ss << "/tmp/x509up_u" << geteuid();
0548   return getX509SubjectFromFile(ss.str(), result);
0549 }