File indexing completed on 2024-04-06 12:10:14
0001 #include "DQMFileIterator.h"
0002 #include "DQMMonitoringService.h"
0003
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Utilities/interface/TimeOfDay.h"
0008
0009 #include <filesystem>
0010
0011 #include <boost/algorithm/string.hpp>
0012 #include <boost/algorithm/string/predicate.hpp>
0013 #include <boost/property_tree/json_parser.hpp>
0014 #include <boost/property_tree/ptree.hpp>
0015 #include <boost/regex.hpp>
0016
0017 #include <fmt/printf.h>
0018
0019 namespace dqmservices {
0020
0021 DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(const std::string& run_path,
0022 const std::string& filename,
0023 int lumiNumber,
0024 int datafn_position) {
0025 boost::property_tree::ptree pt;
0026 read_json(filename, pt);
0027
0028 LumiEntry lumi;
0029 lumi.filename = filename;
0030 lumi.run_path = run_path;
0031
0032 lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
0033
0034 lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0035
0036 lumi.file_ls = lumiNumber;
0037
0038 if (datafn_position >= 0) {
0039 lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
0040 }
0041
0042 return lumi;
0043 }
0044
0045 std::string DQMFileIterator::LumiEntry::get_data_path() const {
0046 if (boost::starts_with(datafn, "/"))
0047 return datafn;
0048
0049 std::filesystem::path p(run_path);
0050 p /= datafn;
0051 return p.string();
0052 }
0053
0054 std::string DQMFileIterator::LumiEntry::get_json_path() const { return filename; }
0055
0056
0057
0058 DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(const std::string& run_path,
0059 const std::string& filename) {
0060 boost::property_tree::ptree pt;
0061 read_json(filename, pt);
0062
0063 EorEntry eor;
0064 eor.filename = filename;
0065 eor.run_path = run_path;
0066
0067
0068 eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0069 eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
0070
0071 eor.loaded = true;
0072 return eor;
0073 }
0074
0075 DQMFileIterator::DQMFileIterator(edm::ParameterSet const& pset) : state_(EOR) {
0076 runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
0077 datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
0078 runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
0079 streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
0080 delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
0081 nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
0082
0083
0084 flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
0085
0086 forceFileCheckTimeoutMillis_ = 5015;
0087 reset();
0088 }
0089
0090 void DQMFileIterator::reset() {
0091 runPath_.clear();
0092
0093 std::vector<std::string> tokens;
0094 boost::split(tokens, runInputDir_, boost::is_any_of(":"));
0095
0096 for (const auto& token : tokens) {
0097 runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
0098 }
0099
0100 eor_.loaded = false;
0101 state_ = State::OPEN;
0102 nextLumiNumber_ = 1;
0103 lumiSeen_.clear();
0104 filesSeen_.clear();
0105
0106 lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0107
0108 collect(true);
0109 update_state();
0110
0111 if (mon_.isAvailable()) {
0112 boost::property_tree::ptree doc;
0113 doc.put("run", runNumber_);
0114 doc.put("next_lumi", nextLumiNumber_);
0115 doc.put("fi_state", std::to_string(state_));
0116 mon_->outputUpdate(doc);
0117 }
0118 }
0119
0120 DQMFileIterator::LumiEntry DQMFileIterator::open() {
0121 LumiEntry& lumi = lumiSeen_[nextLumiNumber_];
0122 advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
0123 return lumi;
0124 }
0125
0126 bool DQMFileIterator::lumiReady() {
0127 if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
0128 return true;
0129 }
0130
0131 return false;
0132 }
0133
0134 unsigned int DQMFileIterator::lastLumiFound() {
0135 if (!lumiSeen_.empty()) {
0136 return lumiSeen_.rbegin()->first;
0137 }
0138
0139 return 1;
0140 }
0141
0142 void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
0143 unsigned int currentLumi = nextLumiNumber_;
0144
0145 nextLumiNumber_ = lumi;
0146 lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0147
0148 auto iter = lumiSeen_.lower_bound(currentLumi);
0149
0150 while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
0151 iter->second.state = reason;
0152 monUpdateLumi(iter->second);
0153
0154 ++iter;
0155 }
0156
0157 if (mon_.isAvailable()) {
0158
0159 boost::property_tree::ptree doc;
0160 doc.put("next_lumi", nextLumiNumber_);
0161 mon_->outputUpdate(doc);
0162 }
0163 }
0164
0165 void DQMFileIterator::monUpdateLumi(const LumiEntry& lumi) {
0166 if (!mon_.isAvailable())
0167 return;
0168
0169 boost::property_tree::ptree doc;
0170 doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
0171 mon_->outputUpdate(doc);
0172 }
0173
0174 unsigned DQMFileIterator::mtimeHash() const {
0175 unsigned mtime_now = 0;
0176
0177 for (const auto& path : runPath_) {
0178 if (!std::filesystem::exists(path))
0179 continue;
0180
0181 auto write_time = std::filesystem::last_write_time(path);
0182 mtime_now =
0183 mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
0184 }
0185
0186 return mtime_now;
0187 }
0188
0189 void DQMFileIterator::collect(bool ignoreTimers) {
0190
0191
0192
0193 auto now = std::chrono::high_resolution_clock::now();
0194 auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
0195
0196
0197 if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
0198 return;
0199 }
0200
0201
0202 auto mtime_now = mtimeHash();
0203
0204 if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
0205
0206 return;
0207 } else {
0208
0209 }
0210
0211 runPathMTime_ = mtime_now;
0212 runPathLastCollect_ = now;
0213
0214 using std::filesystem::directory_entry;
0215 using std::filesystem::directory_iterator;
0216
0217 std::string fn_eor;
0218
0219 for (const auto& runPath : runPath_) {
0220 if (!std::filesystem::exists(runPath)) {
0221 logFileAction("Directory does not exist: ", runPath);
0222
0223 continue;
0224 }
0225
0226 directory_iterator dend;
0227 for (directory_iterator di(runPath); di != dend; ++di) {
0228 const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
0229
0230 const std::string filename = di->path().filename().string();
0231 const std::string fn = di->path().string();
0232
0233 if (filesSeen_.find(filename) != filesSeen_.end()) {
0234 continue;
0235 }
0236
0237 boost::smatch result;
0238 if (boost::regex_match(filename, result, fn_re)) {
0239 unsigned int run = std::stoi(result[1]);
0240 unsigned int lumi = std::stoi(result[2]);
0241 std::string label = result[3];
0242
0243 filesSeen_.insert(filename);
0244
0245 if (run != runNumber_)
0246 continue;
0247
0248
0249
0250 if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
0251 fn_eor = fn;
0252 continue;
0253 }
0254
0255
0256 if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
0257 continue;
0258 }
0259
0260
0261 if (label != streamLabel_) {
0262 std::string msg("Found and skipped json file (stream label mismatch, ");
0263 msg += label + " [files] != " + streamLabel_ + " [config]";
0264 msg += "): ";
0265 logFileAction(msg, fn);
0266 continue;
0267 }
0268
0269 try {
0270 LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
0271 lumiSeen_.emplace(lumi, lumi_jsn);
0272 logFileAction("Found and loaded json file: ", fn);
0273
0274 monUpdateLumi(lumi_jsn);
0275 } catch (const std::exception& e) {
0276
0277 filesSeen_.erase(filename);
0278
0279 std::string msg("Found, tried to load the json, but failed (");
0280 msg += e.what();
0281 msg += "): ";
0282 logFileAction(msg, fn);
0283 }
0284 }
0285 }
0286 }
0287
0288 if ((!fn_eor.empty()) or flagScanOnce_) {
0289 if (!fn_eor.empty()) {
0290 logFileAction("EoR file found: ", fn_eor);
0291 }
0292
0293
0294
0295
0296
0297
0298 eor_.loaded = true;
0299
0300 if (lumiSeen_.empty()) {
0301 eor_.n_lumi = 0;
0302 } else {
0303 eor_.n_lumi = lumiSeen_.rbegin()->first;
0304 }
0305 }
0306 }
0307
0308 void DQMFileIterator::update_state() {
0309 using std::chrono::duration_cast;
0310 using std::chrono::high_resolution_clock;
0311 using std::chrono::milliseconds;
0312
0313 State old_state = state_;
0314
0315
0316
0317 if (!flagScanOnce_) {
0318 collect(false);
0319 }
0320
0321 if ((state_ == State::OPEN) && (eor_.loaded)) {
0322 state_ = State::EOR_CLOSING;
0323 }
0324
0325
0326
0327 if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
0328 auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
0329 if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
0330 auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
0331 auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
0332
0333 if (elapsed_ms >= nextLumiTimeoutMillis_) {
0334 std::string msg("Timeout reached, skipping lumisection(s) ");
0335 msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
0336 msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
0337 logFileAction(msg);
0338
0339 advanceToLumi(iter->first, "skipped: timeout");
0340 }
0341 }
0342 }
0343
0344 if (state_ == State::EOR_CLOSING) {
0345
0346
0347
0348
0349
0350
0351 if (nextLumiNumber_ > eor_.n_lumi) {
0352 state_ = State::EOR;
0353 }
0354 }
0355
0356 if (state_ != old_state) {
0357 logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
0358
0359 if (mon_) {
0360 boost::property_tree::ptree doc;
0361 doc.put("fi_state", std::to_string(state_));
0362 mon_->outputUpdate(doc);
0363 }
0364 }
0365 }
0366
0367 void DQMFileIterator::logFileAction(const std::string& msg, const std::string& fileName) const {
0368 edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
0369 edm::FlushMessageLog();
0370 }
0371
0372 void DQMFileIterator::logLumiState(const LumiEntry& lumi, const std::string& msg) {
0373 if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
0374 lumiSeen_[lumi.file_ls].state = msg;
0375
0376 monUpdateLumi(lumiSeen_[lumi.file_ls]);
0377 } else {
0378 logFileAction("Internal error: referenced lumi is not the map.");
0379 }
0380 }
0381
0382 void DQMFileIterator::delay() {
0383 if (mon_.isAvailable())
0384 mon_->keepAlive();
0385
0386 usleep(delayMillis_ * 1000);
0387 }
0388
0389 void DQMFileIterator::fillDescription(edm::ParameterSetDescription& desc) {
0390 desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
0391
0392 desc.addUntracked<unsigned int>("datafnPosition", 3)
0393 ->setComment(
0394 "Data filename position in the positional arguments array 'data' in "
0395 "json file.");
0396
0397 desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
0398
0399 desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
0400
0401 desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
0402 ->setComment(
0403 "Number of milliseconds to wait before switching to the next lumi "
0404 "section if the current is missing, -1 to disable.");
0405
0406 desc.addUntracked<bool>("scanOnce", false)
0407 ->setComment(
0408 "Don't repeat file scans: use what was found during the initial scan. "
0409 "EOR file is ignored and the state is set to 'past end of run'.");
0410
0411 desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
0412 }
0413
0414 }