File indexing completed on 2024-04-06 12:31:52
0001
0002 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
0003 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0004 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
0005 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0008 #include "FWCore/Utilities/interface/processGUID.h"
0009 #include "FWCore/Version/interface/GetReleaseVersion.h"
0010
0011 #include <string>
0012 #include <cmath>
0013
0014 #include <unistd.h>
0015 #include <fcntl.h>
0016
0017 #include <openssl/x509.h>
0018 #include <openssl/pem.h>
0019
0020 #define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", ";
0021
0022
0023
0024 #ifndef HOST_NAME_MAX
0025 #define HOST_NAME_MAX 128
0026 #endif
0027
0028 static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID";
0029 static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId";
0030
0031 using namespace edm::storage;
0032
0033 StatisticsSenderService::FileStatistics::FileStatistics()
0034 : m_read_single_operations(0),
0035 m_read_single_bytes(0),
0036 m_read_single_square(0),
0037 m_read_vector_operations(0),
0038 m_read_vector_bytes(0),
0039 m_read_vector_square(0),
0040 m_read_vector_count_sum(0),
0041 m_read_vector_count_square(0),
0042 m_start_time(time(nullptr)) {}
0043
0044 void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const {
0045 const StorageAccount::StorageStats &stats = StorageAccount::summary();
0046 ssize_t read_single_operations = 0;
0047 ssize_t read_single_bytes = 0;
0048 ssize_t read_single_square = 0;
0049 ssize_t read_vector_operations = 0;
0050 ssize_t read_vector_bytes = 0;
0051 ssize_t read_vector_square = 0;
0052 ssize_t read_vector_count_sum = 0;
0053 ssize_t read_vector_count_square = 0;
0054 auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0055 for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
0056 if (i->first == token.value()) {
0057 continue;
0058 }
0059 for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
0060 if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
0061 read_vector_operations += j->second.attempts;
0062 read_vector_bytes += j->second.amount;
0063 read_vector_count_square += j->second.vector_square;
0064 read_vector_square += j->second.amount_square;
0065 read_vector_count_sum += j->second.vector_count;
0066 } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
0067 read_single_operations += j->second.attempts;
0068 read_single_bytes += j->second.amount;
0069 read_single_square += j->second.amount_square;
0070 }
0071 }
0072 }
0073 int64_t single_op_count = read_single_operations - m_read_single_operations;
0074 if (single_op_count > 0) {
0075 double single_sum = read_single_bytes - m_read_single_bytes;
0076 double single_average = single_sum / static_cast<double>(single_op_count);
0077 os << "\"read_single_sigma\":"
0078 << sqrt(std::abs((static_cast<double>(read_single_square - m_read_single_square) -
0079 single_average * single_average * single_op_count) /
0080 static_cast<double>(single_op_count)))
0081 << ", ";
0082 os << "\"read_single_average\":" << single_average << ", ";
0083 }
0084 int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
0085 if (vector_op_count > 0) {
0086 double vector_average =
0087 static_cast<double>(read_vector_bytes - m_read_vector_bytes) / static_cast<double>(vector_op_count);
0088 os << "\"read_vector_average\":" << vector_average << ", ";
0089 os << "\"read_vector_sigma\":"
0090 << sqrt(std::abs((static_cast<double>(read_vector_square - m_read_vector_square) -
0091 vector_average * vector_average * vector_op_count) /
0092 static_cast<double>(vector_op_count)))
0093 << ", ";
0094 double vector_count_average =
0095 static_cast<double>(read_vector_count_sum - m_read_vector_count_sum) / static_cast<double>(vector_op_count);
0096 os << "\"read_vector_count_average\":" << vector_count_average << ", ";
0097 os << "\"read_vector_count_sigma\":"
0098 << sqrt(std::abs((static_cast<double>(read_vector_count_square - m_read_vector_count_square) -
0099 vector_count_average * vector_count_average * vector_op_count) /
0100 static_cast<double>(vector_op_count)))
0101 << ", ";
0102 }
0103
0104 os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes)
0105 << ", ";
0106 os << "\"read_bytes_at_close\":"
0107 << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
0108
0109
0110 OUTPUT_STATISTIC(read_single_operations)
0111 OUTPUT_STATISTIC(read_single_bytes)
0112 OUTPUT_STATISTIC(read_vector_operations)
0113 OUTPUT_STATISTIC(read_vector_bytes)
0114
0115 os << "\"start_time\":" << m_start_time << ", ";
0116
0117 os << "\"end_time\":" << time(nullptr);
0118 }
0119
0120 void StatisticsSenderService::FileStatistics::update() {
0121 const StorageAccount::StorageStats &stats = StorageAccount::summary();
0122 ssize_t read_single_operations = 0;
0123 ssize_t read_single_bytes = 0;
0124 ssize_t read_single_square = 0;
0125 ssize_t read_vector_operations = 0;
0126 ssize_t read_vector_bytes = 0;
0127 ssize_t read_vector_square = 0;
0128 ssize_t read_vector_count_sum = 0;
0129 ssize_t read_vector_count_square = 0;
0130 auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
0131 for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
0132 if (i->first == token.value()) {
0133 continue;
0134 }
0135 for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
0136 if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
0137 read_vector_operations += j->second.attempts;
0138 read_vector_bytes += j->second.amount;
0139 read_vector_count_square += j->second.vector_square;
0140 read_vector_square += j->second.amount_square;
0141 read_vector_count_sum += j->second.vector_count;
0142 } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
0143 read_single_operations += j->second.attempts;
0144 read_single_bytes += j->second.amount;
0145 read_single_square += j->second.amount_square;
0146 }
0147 }
0148 }
0149
0150 m_read_single_square = read_single_square;
0151 m_read_vector_square = read_vector_square;
0152 m_read_vector_count_square = read_vector_count_square;
0153 m_read_vector_count_sum = read_vector_count_sum;
0154 m_read_single_operations = read_single_operations;
0155 m_read_single_bytes = read_single_bytes;
0156 m_read_vector_operations = read_vector_operations;
0157 m_read_vector_bytes = read_vector_bytes;
0158 m_start_time = time(nullptr);
0159 }
0160 StatisticsSenderService::FileInfo::FileInfo(std::string const &iLFN, edm::InputType iType)
0161 : m_filelfn(iLFN),
0162 m_serverhost("unknown"),
0163 m_serverdomain("unknown"),
0164 m_type(iType),
0165 m_size(-1),
0166 m_id(0),
0167 m_openCount(1) {}
0168
0169 StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const &iPSet, edm::ActivityRegistry &ar)
0170 : m_clienthost("unknown"),
0171 m_clientdomain("unknown"),
0172 m_filestats(),
0173 m_guid(edm::processGUID().toString()),
0174 m_counter(0),
0175 m_userdn("unknown"),
0176 m_debug(iPSet.getUntrackedParameter<bool>("debug", false)) {
0177 determineHostnames();
0178 ar.watchPostCloseFile(this, &StatisticsSenderService::filePostCloseEvent);
0179 if (!getX509Subject(m_userdn)) {
0180 m_userdn = "unknown";
0181 }
0182 }
0183
0184 const char *StatisticsSenderService::getJobID() {
0185 const char *id = std::getenv(JOB_UNIQUE_ID_ENV);
0186
0187 return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2);
0188 }
0189
0190 std::string const *StatisticsSenderService::matchedLfn(std::string const &iURL) {
0191 auto found = m_urlToLfn.find(iURL);
0192 if (found != m_urlToLfn.end()) {
0193 return &found->second;
0194 }
0195 for (auto const &v : m_lfnToFileInfo) {
0196 if (v.first.size() < iURL.size()) {
0197 if (v.first == iURL.substr(iURL.size() - v.first.size())) {
0198 m_urlToLfn.emplace(iURL, v.first);
0199 return &m_urlToLfn.find(iURL)->second;
0200 }
0201 }
0202 }
0203
0204 if (std::string::npos == iURL.find(':')) {
0205 for (auto const &v : m_lfnToFileInfo) {
0206 if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) {
0207 if (iURL == v.first.substr(v.first.size() - iURL.size())) {
0208 m_urlToLfn.emplace(iURL, v.first);
0209 return &m_urlToLfn.find(iURL)->second;
0210 }
0211 }
0212 }
0213 }
0214
0215 return nullptr;
0216 }
0217
0218 void StatisticsSenderService::setCurrentServer(const std::string &url, const std::string &servername) {
0219 size_t dot_pos = servername.find('.');
0220 std::string serverhost;
0221 std::string serverdomain;
0222 if (dot_pos == std::string::npos) {
0223 serverhost = servername.substr(0, servername.find(':'));
0224 serverdomain = "unknown";
0225 } else {
0226 serverhost = servername.substr(0, dot_pos);
0227 serverdomain = servername.substr(dot_pos + 1, servername.find(':') - dot_pos - 1);
0228 if (serverdomain.empty()) {
0229 serverdomain = "unknown";
0230 }
0231 }
0232 {
0233 auto lfn = matchedLfn(url);
0234 std::lock_guard<std::mutex> sentry(m_servermutex);
0235 if (nullptr != lfn) {
0236 auto found = m_lfnToFileInfo.find(*lfn);
0237 if (found != m_lfnToFileInfo.end()) {
0238 found->second.m_serverhost = std::move(serverhost);
0239 found->second.m_serverdomain = std::move(serverdomain);
0240 }
0241 } else if (m_debug) {
0242 edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n";
0243 }
0244 }
0245 }
0246
0247 void StatisticsSenderService::openingFile(std::string const &lfn, edm::InputType type, size_t size) {
0248 m_urlToLfn.emplace(lfn, lfn);
0249 auto attempt = m_lfnToFileInfo.emplace(lfn, FileInfo{lfn, type});
0250 if (attempt.second) {
0251 attempt.first->second.m_size = size;
0252 attempt.first->second.m_id = m_counter++;
0253 edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n";
0254 } else {
0255 ++(attempt.first->second.m_openCount);
0256 edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n";
0257 }
0258 }
0259
0260 void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) {
0261 edm::Service<edm::SiteLocalConfig> pSLC;
0262 if (!pSLC.isAvailable()) {
0263 return;
0264 }
0265
0266 const struct addrinfo *addresses = pSLC->statisticsDestination();
0267 if (!addresses and !m_debug) {
0268 return;
0269 }
0270
0271 std::set<std::string> const *info = pSLC->statisticsInfo();
0272 if (info && !info->empty() && (m_userdn != "unknown") &&
0273 ((info->find("dn") == info->end()) || (info->find("nodn") != info->end()))) {
0274 m_userdn = "not reported";
0275 }
0276
0277 auto lfn = matchedLfn(url);
0278 if (nullptr != lfn) {
0279 auto found = m_lfnToFileInfo.find(*lfn);
0280 assert(found != m_lfnToFileInfo.end());
0281
0282 std::string results;
0283 fillUDP(pSLC->siteName(), found->second, usedFallback, results);
0284 if (m_debug) {
0285 edm::LogSystem("StatisticSenderService") << "\n" << results << "\n";
0286 }
0287
0288 for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
0289 int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
0290 if (sock < 0) {
0291 continue;
0292 }
0293 auto close_del = [](int *iSocket) { close(*iSocket); };
0294 std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
0295 if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
0296 break;
0297 }
0298 }
0299
0300 auto c = --found->second.m_openCount;
0301 if (m_debug) {
0302 if (c == 0) {
0303 edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n";
0304 } else {
0305 edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n";
0306 }
0307 }
0308 } else if (m_debug) {
0309 edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n";
0310 }
0311 }
0312
0313 void StatisticsSenderService::cleanupOldFiles() {
0314
0315 bool moreToTest = false;
0316 do {
0317 moreToTest = false;
0318 for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) {
0319 if (it->second.m_openCount == 0) {
0320 auto lfn = it->first;
0321 bool moreToTest2 = false;
0322 do {
0323 moreToTest2 = false;
0324 for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) {
0325 if (it2->second == lfn) {
0326 m_urlToLfn.unsafe_erase(it2);
0327 moreToTest2 = true;
0328 break;
0329 }
0330 }
0331 } while (moreToTest2);
0332
0333 m_lfnToFileInfo.unsafe_erase(it);
0334 moreToTest = true;
0335 break;
0336 }
0337 }
0338 } while (moreToTest);
0339 }
0340
0341 void StatisticsSenderService::setSize(const std::string &url, size_t size) {
0342 auto lfn = matchedLfn(url);
0343 if (nullptr != lfn) {
0344 auto itFound = m_lfnToFileInfo.find(*lfn);
0345 if (itFound != m_lfnToFileInfo.end()) {
0346 itFound->second.m_size = size;
0347 }
0348 } else if (m_debug) {
0349 edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n";
0350 }
0351 }
0352
0353 void StatisticsSenderService::filePostCloseEvent(std::string const &lfn) {
0354
0355 cleanupOldFiles();
0356 m_filestats.update();
0357 }
0358
0359 void StatisticsSenderService::determineHostnames(void) {
0360 char tmpName[HOST_NAME_MAX];
0361 if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
0362
0363 m_clienthost = "unknown";
0364 } else {
0365 m_clienthost = tmpName;
0366 }
0367 size_t dot_pos = m_clienthost.find('.');
0368 if (dot_pos == std::string::npos) {
0369 m_clientdomain = "unknown";
0370 } else {
0371 m_clientdomain = m_clienthost.substr(dot_pos + 1, m_clienthost.size() - dot_pos - 1);
0372 m_clienthost = m_clienthost.substr(0, dot_pos);
0373 }
0374 }
0375
0376 void StatisticsSenderService::fillUDP(const std::string &siteName,
0377 const FileInfo &fileinfo,
0378 bool usedFallback,
0379 std::string &udpinfo) const {
0380 std::ostringstream os;
0381
0382
0383 os << "{";
0384 if (!siteName.empty()) {
0385 os << "\"site_name\":\"" << siteName << "\", ";
0386 }
0387
0388
0389 os << "\"cmssw_version\":" << edm::getReleaseVersion() << ", ";
0390 if (usedFallback) {
0391 os << "\"fallback\": true, ";
0392 } else {
0393 os << "\"fallback\": false, ";
0394 }
0395 os << "\"read_type\": ";
0396 switch (fileinfo.m_type) {
0397 case edm::InputType::Primary: {
0398 os << "\"primary\", ";
0399 break;
0400 }
0401 case edm::InputType::SecondaryFile: {
0402 os << "\"secondary\", ";
0403 break;
0404 }
0405 case edm::InputType::SecondarySource: {
0406 os << "\"embedded\", ";
0407 break;
0408 }
0409 }
0410 auto serverhost = fileinfo.m_serverhost;
0411 auto serverdomain = fileinfo.m_serverdomain;
0412
0413 os << "\"user_dn\":\"" << m_userdn << "\", ";
0414 os << "\"client_host\":\"" << m_clienthost << "\", ";
0415 os << "\"client_domain\":\"" << m_clientdomain << "\", ";
0416 os << "\"server_host\":\"" << serverhost << "\", ";
0417 os << "\"server_domain\":\"" << serverdomain << "\", ";
0418 os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", ";
0419 os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", ";
0420
0421
0422 const char *jobId = getJobID();
0423 if (jobId) {
0424 os << "\"app_info\":\"" << jobId << "\", ";
0425 }
0426
0427 if (fileinfo.m_size >= 0) {
0428 os << "\"file_size\":" << fileinfo.m_size << ", ";
0429 }
0430
0431 m_filestats.fillUDP(os);
0432
0433 os << "}";
0434 udpinfo = os.str();
0435 }
0436
0437
0438
0439
0440
0441
0442
0443
0444
0445
0446
0447
0448
0449
0450
0451
0452
0453 static X509 *findEEC(STACK_OF(X509) * certstack) {
0454 int depth = sk_X509_num(certstack);
0455 if (depth == 0) {
0456 return nullptr;
0457 }
0458 int idx = depth - 1;
0459 char *priorsubject = nullptr;
0460 char *subject = nullptr;
0461 X509 *x509cert = sk_X509_value(certstack, idx);
0462 for (; x509cert && idx > 0; idx--) {
0463 subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
0464 if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
0465 break;
0466 }
0467 x509cert = sk_X509_value(certstack, idx);
0468 if (subject) {
0469 OPENSSL_free(subject);
0470 subject = nullptr;
0471 }
0472 }
0473 if (subject) {
0474 OPENSSL_free(subject);
0475 subject = nullptr;
0476 }
0477 return x509cert;
0478 }
0479
0480 static bool getX509SubjectFromFile(const std::string &filename, std::string &result) {
0481 BIO *biof = nullptr;
0482 STACK_OF(X509) *certs = nullptr;
0483 char *subject = nullptr;
0484 unsigned char *data = nullptr;
0485 char *header = nullptr;
0486 char *name = nullptr;
0487 long len = 0U;
0488
0489 if ((biof = BIO_new_file(filename.c_str(), "r"))) {
0490 certs = sk_X509_new_null();
0491 bool encountered_error = false;
0492 while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
0493 if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
0494 X509 *tmp_cert = nullptr;
0495
0496
0497 const unsigned char *p;
0498 p = data;
0499 tmp_cert = d2i_X509(&tmp_cert, &p, len);
0500 if (tmp_cert) {
0501 sk_X509_push(certs, tmp_cert);
0502 } else {
0503 encountered_error = true;
0504 }
0505 }
0506 if (data) {
0507 OPENSSL_free(data);
0508 data = nullptr;
0509 }
0510 if (header) {
0511 OPENSSL_free(header);
0512 header = nullptr;
0513 }
0514 if (name) {
0515 OPENSSL_free(name);
0516 name = nullptr;
0517 }
0518 }
0519 X509 *x509cert = nullptr;
0520 if (!encountered_error && sk_X509_num(certs)) {
0521 x509cert = findEEC(certs);
0522 }
0523 if (x509cert) {
0524 subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
0525 }
0526
0527 if (certs) {
0528 sk_X509_pop_free(certs, X509_free);
0529 x509cert = nullptr;
0530 }
0531 BIO_free(biof);
0532 if (subject) {
0533 result = subject;
0534 OPENSSL_free(subject);
0535 return true;
0536 }
0537 }
0538 return false;
0539 }
0540
0541 bool StatisticsSenderService::getX509Subject(std::string &result) {
0542 char *filename = std::getenv("X509_USER_PROXY");
0543 if (filename && getX509SubjectFromFile(filename, result)) {
0544 return true;
0545 }
0546 std::stringstream ss;
0547 ss << "/tmp/x509up_u" << geteuid();
0548 return getX509SubjectFromFile(ss.str(), result);
0549 }