File indexing completed on 2021-03-19 06:07:20
0001 #include <filesystem>
0002 #include <iterator>
0003 #include <memory>
0004 #include <string>
0005
0006 #include <boost/algorithm/string.hpp>
0007 #include <boost/algorithm/string/predicate.hpp>
0008 #include <boost/property_tree/json_parser.hpp>
0009 #include <boost/property_tree/ptree.hpp>
0010 #include <boost/range.hpp>
0011 #include <boost/regex.hpp>
0012 #include <fmt/printf.h>
0013
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/TimeOfDay.h"
0016 #include "DQMFileIterator.h"
0017
0018 namespace dqmservices {
0019
0020 DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(const std::string& run_path,
0021 const std::string& filename,
0022 int lumiNumber,
0023 int datafn_position) {
0024 boost::property_tree::ptree pt;
0025 read_json(filename, pt);
0026
0027 LumiEntry lumi;
0028 lumi.filename = filename;
0029 lumi.run_path = run_path;
0030
0031 lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
0032
0033 lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0034
0035 lumi.file_ls = lumiNumber;
0036
0037 if (datafn_position >= 0) {
0038 lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
0039 }
0040
0041 return lumi;
0042 }
0043
0044 std::string DQMFileIterator::LumiEntry::get_data_path() const {
0045 if (boost::starts_with(datafn, "/"))
0046 return datafn;
0047
0048 std::filesystem::path p(run_path);
0049 p /= datafn;
0050 return p.string();
0051 }
0052
0053 std::string DQMFileIterator::LumiEntry::get_json_path() const { return filename; }
0054
0055
0056
0057 DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(const std::string& run_path,
0058 const std::string& filename) {
0059 boost::property_tree::ptree pt;
0060 read_json(filename, pt);
0061
0062 EorEntry eor;
0063 eor.filename = filename;
0064 eor.run_path = run_path;
0065
0066
0067 eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0068 eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
0069
0070 eor.loaded = true;
0071 return eor;
0072 }
0073
0074 DQMFileIterator::DQMFileIterator(edm::ParameterSet const& pset) : state_(EOR) {
0075 runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
0076 datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
0077 runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
0078 streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
0079 delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
0080 nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
0081
0082
0083 flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
0084
0085 forceFileCheckTimeoutMillis_ = 5015;
0086 reset();
0087 }
0088
0089 DQMFileIterator::~DQMFileIterator() {}
0090
0091 void DQMFileIterator::reset() {
0092 runPath_.clear();
0093
0094 std::vector<std::string> tokens;
0095 boost::split(tokens, runInputDir_, boost::is_any_of(":"));
0096
0097 for (const auto& token : tokens) {
0098 runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
0099 }
0100
0101 eor_.loaded = false;
0102 state_ = State::OPEN;
0103 nextLumiNumber_ = 1;
0104 lumiSeen_.clear();
0105 filesSeen_.clear();
0106
0107 lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0108
0109 collect(true);
0110 update_state();
0111
0112 if (mon_.isAvailable()) {
0113 ptree doc;
0114 doc.put("run", runNumber_);
0115 doc.put("next_lumi", nextLumiNumber_);
0116 doc.put("fi_state", std::to_string(state_));
0117 mon_->outputUpdate(doc);
0118 }
0119 }
0120
0121 DQMFileIterator::State DQMFileIterator::state() { return state_; }
0122
0123 DQMFileIterator::LumiEntry DQMFileIterator::open() {
0124 LumiEntry& lumi = lumiSeen_[nextLumiNumber_];
0125 advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
0126 return lumi;
0127 }
0128
0129 bool DQMFileIterator::lumiReady() {
0130 if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
0131 return true;
0132 }
0133
0134 return false;
0135 }
0136
0137 unsigned int DQMFileIterator::runNumber() { return runNumber_; }
0138
0139 unsigned int DQMFileIterator::lastLumiFound() {
0140 if (!lumiSeen_.empty()) {
0141 return lumiSeen_.rbegin()->first;
0142 }
0143
0144 return 1;
0145 }
0146
0147 void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
0148 using boost::str;
0149 using boost::property_tree::ptree;
0150
0151 unsigned int currentLumi = nextLumiNumber_;
0152
0153 nextLumiNumber_ = lumi;
0154 lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0155
0156 auto iter = lumiSeen_.lower_bound(currentLumi);
0157
0158 while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
0159 iter->second.state = reason;
0160 monUpdateLumi(iter->second);
0161
0162 ++iter;
0163 }
0164
0165 if (mon_.isAvailable()) {
0166
0167 ptree doc;
0168 doc.put("next_lumi", nextLumiNumber_);
0169 mon_->outputUpdate(doc);
0170 }
0171 }
0172
0173 void DQMFileIterator::monUpdateLumi(const LumiEntry& lumi) {
0174 if (!mon_.isAvailable())
0175 return;
0176
0177 ptree doc;
0178 doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
0179 mon_->outputUpdate(doc);
0180 }
0181
0182 unsigned DQMFileIterator::mtimeHash() const {
0183 unsigned mtime_now = 0;
0184
0185 for (const auto& path : runPath_) {
0186 if (!std::filesystem::exists(path))
0187 continue;
0188
0189 auto write_time = std::filesystem::last_write_time(path);
0190 mtime_now =
0191 mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
0192 }
0193
0194 return mtime_now;
0195 }
0196
0197 void DQMFileIterator::collect(bool ignoreTimers) {
0198
0199
0200
0201 auto now = std::chrono::high_resolution_clock::now();
0202 auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
0203
0204
0205 if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
0206 return;
0207 }
0208
0209
0210 auto mtime_now = mtimeHash();
0211
0212 if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
0213
0214 return;
0215 } else {
0216
0217 }
0218
0219 runPathMTime_ = mtime_now;
0220 runPathLastCollect_ = now;
0221
0222 using std::filesystem::directory_entry;
0223 using std::filesystem::directory_iterator;
0224
0225 std::string fn_eor;
0226
0227 for (const auto& runPath : runPath_) {
0228 if (!std::filesystem::exists(runPath)) {
0229 logFileAction("Directory does not exist: ", runPath);
0230
0231 continue;
0232 }
0233
0234 directory_iterator dend;
0235 for (directory_iterator di(runPath); di != dend; ++di) {
0236 const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
0237
0238 const std::string filename = di->path().filename().string();
0239 const std::string fn = di->path().string();
0240
0241 if (filesSeen_.find(filename) != filesSeen_.end()) {
0242 continue;
0243 }
0244
0245 boost::smatch result;
0246 if (boost::regex_match(filename, result, fn_re)) {
0247 unsigned int run = std::stoi(result[1]);
0248 unsigned int lumi = std::stoi(result[2]);
0249 std::string label = result[3];
0250
0251 filesSeen_.insert(filename);
0252
0253 if (run != runNumber_)
0254 continue;
0255
0256
0257
0258 if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
0259 fn_eor = fn;
0260 continue;
0261 }
0262
0263
0264 if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
0265 continue;
0266 }
0267
0268
0269 if (label != streamLabel_) {
0270 std::string msg("Found and skipped json file (stream label mismatch, ");
0271 msg += label + " [files] != " + streamLabel_ + " [config]";
0272 msg += "): ";
0273 logFileAction(msg, fn);
0274 continue;
0275 }
0276
0277 try {
0278 LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
0279 lumiSeen_.emplace(lumi, lumi_jsn);
0280 logFileAction("Found and loaded json file: ", fn);
0281
0282 monUpdateLumi(lumi_jsn);
0283 } catch (const std::exception& e) {
0284
0285 filesSeen_.erase(filename);
0286
0287 std::string msg("Found, tried to load the json, but failed (");
0288 msg += e.what();
0289 msg += "): ";
0290 logFileAction(msg, fn);
0291 }
0292 }
0293 }
0294 }
0295
0296 if ((!fn_eor.empty()) or flagScanOnce_) {
0297 if (!fn_eor.empty()) {
0298 logFileAction("EoR file found: ", fn_eor);
0299 }
0300
0301
0302
0303
0304
0305
0306 eor_.loaded = true;
0307
0308 if (lumiSeen_.empty()) {
0309 eor_.n_lumi = 0;
0310 } else {
0311 eor_.n_lumi = lumiSeen_.rbegin()->first;
0312 }
0313 }
0314 }
0315
0316 void DQMFileIterator::update_state() {
0317 using std::chrono::duration_cast;
0318 using std::chrono::high_resolution_clock;
0319 using std::chrono::milliseconds;
0320
0321 State old_state = state_;
0322
0323
0324
0325 if (!flagScanOnce_) {
0326 collect(false);
0327 }
0328
0329 if ((state_ == State::OPEN) && (eor_.loaded)) {
0330 state_ = State::EOR_CLOSING;
0331 }
0332
0333
0334
0335 if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
0336 auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
0337 if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
0338 auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
0339 auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
0340
0341 if (elapsed_ms >= nextLumiTimeoutMillis_) {
0342 std::string msg("Timeout reached, skipping lumisection(s) ");
0343 msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
0344 msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
0345 logFileAction(msg);
0346
0347 advanceToLumi(iter->first, "skipped: timeout");
0348 }
0349 }
0350 }
0351
0352 if (state_ == State::EOR_CLOSING) {
0353
0354
0355
0356
0357
0358
0359 if (nextLumiNumber_ > eor_.n_lumi) {
0360 state_ = State::EOR;
0361 }
0362 }
0363
0364 if (state_ != old_state) {
0365 logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
0366
0367 if (mon_) {
0368 ptree doc;
0369 doc.put("fi_state", std::to_string(state_));
0370 mon_->outputUpdate(doc);
0371 }
0372 }
0373 }
0374
0375 void DQMFileIterator::logFileAction(const std::string& msg, const std::string& fileName) const {
0376 edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << " " << msg << fileName;
0377 edm::FlushMessageLog();
0378 }
0379
0380 void DQMFileIterator::logLumiState(const LumiEntry& lumi, const std::string& msg) {
0381 if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
0382 lumiSeen_[lumi.file_ls].state = msg;
0383
0384 monUpdateLumi(lumiSeen_[lumi.file_ls]);
0385 } else {
0386 logFileAction("Internal error: referenced lumi is not the map.");
0387 }
0388 }
0389
0390 void DQMFileIterator::delay() {
0391 if (mon_.isAvailable())
0392 mon_->keepAlive();
0393
0394 usleep(delayMillis_ * 1000);
0395 }
0396
0397 void DQMFileIterator::fillDescription(edm::ParameterSetDescription& desc) {
0398 desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
0399
0400 desc.addUntracked<unsigned int>("datafnPosition", 3)
0401 ->setComment(
0402 "Data filename position in the positional arguments array 'data' in "
0403 "json file.");
0404
0405 desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
0406
0407 desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
0408
0409 desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
0410 ->setComment(
0411 "Number of milliseconds to wait before switching to the next lumi "
0412 "section if the current is missing, -1 to disable.");
0413
0414 desc.addUntracked<bool>("scanOnce", false)
0415 ->setComment(
0416 "Don't repeat file scans: use what was found during the initial scan. "
0417 "EOR file is ignored and the state is set to 'past end of run'.");
0418
0419 desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
0420 }
0421
0422 }