Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:14

0001 #include "DQMFileIterator.h"
0002 #include "DQMMonitoringService.h"
0003 
0004 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0005 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Utilities/interface/TimeOfDay.h"
0008 
0009 #include <filesystem>
0010 
0011 #include <boost/algorithm/string.hpp>
0012 #include <boost/algorithm/string/predicate.hpp>
0013 #include <boost/property_tree/json_parser.hpp>
0014 #include <boost/property_tree/ptree.hpp>
0015 #include <boost/regex.hpp>
0016 
0017 #include <fmt/printf.h>
0018 
0019 namespace dqmservices {
0020 
0021   DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(const std::string& run_path,
0022                                                                    const std::string& filename,
0023                                                                    int lumiNumber,
0024                                                                    int datafn_position) {
0025     boost::property_tree::ptree pt;
0026     read_json(filename, pt);
0027 
0028     LumiEntry lumi;
0029     lumi.filename = filename;
0030     lumi.run_path = run_path;
0031 
0032     lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
0033 
0034     lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0035 
0036     lumi.file_ls = lumiNumber;
0037 
0038     if (datafn_position >= 0) {
0039       lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
0040     }
0041 
0042     return lumi;
0043   }
0044 
0045   std::string DQMFileIterator::LumiEntry::get_data_path() const {
0046     if (boost::starts_with(datafn, "/"))
0047       return datafn;
0048 
0049     std::filesystem::path p(run_path);
0050     p /= datafn;
0051     return p.string();
0052   }
0053 
0054   std::string DQMFileIterator::LumiEntry::get_json_path() const { return filename; }
0055 
0056   // Contents of Eor json file are ignored for the moment.
0057   // This function will not be called.
0058   DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(const std::string& run_path,
0059                                                                  const std::string& filename) {
0060     boost::property_tree::ptree pt;
0061     read_json(filename, pt);
0062 
0063     EorEntry eor;
0064     eor.filename = filename;
0065     eor.run_path = run_path;
0066 
0067     // We rely on n_events to be the first item on the array...
0068     eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0069     eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
0070 
0071     eor.loaded = true;
0072     return eor;
0073   }
0074 
0075   DQMFileIterator::DQMFileIterator(edm::ParameterSet const& pset) : state_(EOR) {
0076     runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
0077     datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
0078     runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
0079     streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
0080     delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
0081     nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
0082 
0083     // scan one mode
0084     flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
0085 
0086     forceFileCheckTimeoutMillis_ = 5015;
0087     reset();
0088   }
0089 
0090   void DQMFileIterator::reset() {
0091     runPath_.clear();
0092 
0093     std::vector<std::string> tokens;
0094     boost::split(tokens, runInputDir_, boost::is_any_of(":"));
0095 
0096     for (const auto& token : tokens) {
0097       runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
0098     }
0099 
0100     eor_.loaded = false;
0101     state_ = State::OPEN;
0102     nextLumiNumber_ = 1;
0103     lumiSeen_.clear();
0104     filesSeen_.clear();
0105 
0106     lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0107 
0108     collect(true);
0109     update_state();
0110 
0111     if (mon_.isAvailable()) {
0112       boost::property_tree::ptree doc;
0113       doc.put("run", runNumber_);
0114       doc.put("next_lumi", nextLumiNumber_);
0115       doc.put("fi_state", std::to_string(state_));
0116       mon_->outputUpdate(doc);
0117     }
0118   }
0119 
0120   DQMFileIterator::LumiEntry DQMFileIterator::open() {
0121     LumiEntry& lumi = lumiSeen_[nextLumiNumber_];
0122     advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
0123     return lumi;
0124   }
0125 
0126   bool DQMFileIterator::lumiReady() {
0127     if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
0128       return true;
0129     }
0130 
0131     return false;
0132   }
0133 
0134   unsigned int DQMFileIterator::lastLumiFound() {
0135     if (!lumiSeen_.empty()) {
0136       return lumiSeen_.rbegin()->first;
0137     }
0138 
0139     return 1;
0140   }
0141 
0142   void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
0143     unsigned int currentLumi = nextLumiNumber_;
0144 
0145     nextLumiNumber_ = lumi;
0146     lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0147 
0148     auto iter = lumiSeen_.lower_bound(currentLumi);
0149 
0150     while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
0151       iter->second.state = reason;
0152       monUpdateLumi(iter->second);
0153 
0154       ++iter;
0155     }
0156 
0157     if (mon_.isAvailable()) {
0158       // report the successful lumi file open
0159       boost::property_tree::ptree doc;
0160       doc.put("next_lumi", nextLumiNumber_);
0161       mon_->outputUpdate(doc);
0162     }
0163   }
0164 
0165   void DQMFileIterator::monUpdateLumi(const LumiEntry& lumi) {
0166     if (!mon_.isAvailable())
0167       return;
0168 
0169     boost::property_tree::ptree doc;
0170     doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
0171     mon_->outputUpdate(doc);
0172   }
0173 
0174   unsigned DQMFileIterator::mtimeHash() const {
0175     unsigned mtime_now = 0;
0176 
0177     for (const auto& path : runPath_) {
0178       if (!std::filesystem::exists(path))
0179         continue;
0180 
0181       auto write_time = std::filesystem::last_write_time(path);
0182       mtime_now =
0183           mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
0184     }
0185 
0186     return mtime_now;
0187   }
0188 
0189   void DQMFileIterator::collect(bool ignoreTimers) {
0190     // search filesystem to find available lumi section files
0191     // or the end of run files
0192 
0193     auto now = std::chrono::high_resolution_clock::now();
0194     auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
0195 
0196     // don't refresh if it's too soon
0197     if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
0198       return;
0199     }
0200 
0201     // check if directory changed
0202     auto mtime_now = mtimeHash();
0203 
0204     if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
0205       // logFileAction("Directory hasn't changed.");
0206       return;
0207     } else {
0208       // logFileAction("Directory changed, updating.");
0209     }
0210 
0211     runPathMTime_ = mtime_now;
0212     runPathLastCollect_ = now;
0213 
0214     using std::filesystem::directory_entry;
0215     using std::filesystem::directory_iterator;
0216 
0217     std::string fn_eor;
0218 
0219     for (const auto& runPath : runPath_) {
0220       if (!std::filesystem::exists(runPath)) {
0221         logFileAction("Directory does not exist: ", runPath);
0222 
0223         continue;
0224       }
0225 
0226       directory_iterator dend;
0227       for (directory_iterator di(runPath); di != dend; ++di) {
0228         const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
0229 
0230         const std::string filename = di->path().filename().string();
0231         const std::string fn = di->path().string();
0232 
0233         if (filesSeen_.find(filename) != filesSeen_.end()) {
0234           continue;
0235         }
0236 
0237         boost::smatch result;
0238         if (boost::regex_match(filename, result, fn_re)) {
0239           unsigned int run = std::stoi(result[1]);
0240           unsigned int lumi = std::stoi(result[2]);
0241           std::string label = result[3];
0242 
0243           filesSeen_.insert(filename);
0244 
0245           if (run != runNumber_)
0246             continue;
0247 
0248           // check if this is EoR
0249           // for various reasons we have to load it after all other files
0250           if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
0251             fn_eor = fn;
0252             continue;
0253           }
0254 
0255           // check if lumi is loaded
0256           if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
0257             continue;  // already loaded
0258           }
0259 
0260           // check if this belongs to us
0261           if (label != streamLabel_) {
0262             std::string msg("Found and skipped json file (stream label mismatch, ");
0263             msg += label + " [files] != " + streamLabel_ + " [config]";
0264             msg += "): ";
0265             logFileAction(msg, fn);
0266             continue;
0267           }
0268 
0269           try {
0270             LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
0271             lumiSeen_.emplace(lumi, lumi_jsn);
0272             logFileAction("Found and loaded json file: ", fn);
0273 
0274             monUpdateLumi(lumi_jsn);
0275           } catch (const std::exception& e) {
0276             // don't reset the mtime, keep it waiting
0277             filesSeen_.erase(filename);
0278 
0279             std::string msg("Found, tried to load the json, but failed (");
0280             msg += e.what();
0281             msg += "): ";
0282             logFileAction(msg, fn);
0283           }
0284         }
0285       }
0286     }
0287 
0288     if ((!fn_eor.empty()) or flagScanOnce_) {
0289       if (!fn_eor.empty()) {
0290         logFileAction("EoR file found: ", fn_eor);
0291       }
0292 
0293       // @TODO load EoR files correctly
0294       // eor_ = EorEntry::load_json(fn_eor);
0295       // logFileAction("Loaded eor file: ", fn_eor);
0296 
0297       // for now , set n_lumi to the highest _found_ lumi
0298       eor_.loaded = true;
0299 
0300       if (lumiSeen_.empty()) {
0301         eor_.n_lumi = 0;
0302       } else {
0303         eor_.n_lumi = lumiSeen_.rbegin()->first;
0304       }
0305     }
0306   }
0307 
0308   void DQMFileIterator::update_state() {
0309     using std::chrono::duration_cast;
0310     using std::chrono::high_resolution_clock;
0311     using std::chrono::milliseconds;
0312 
0313     State old_state = state_;
0314 
0315     // in scanOnce mode we don't do repeated scans
0316     // whatever found at reset() is be used
0317     if (!flagScanOnce_) {
0318       collect(false);
0319     }
0320 
0321     if ((state_ == State::OPEN) && (eor_.loaded)) {
0322       state_ = State::EOR_CLOSING;
0323     }
0324 
0325     // special case for missing lumi files
0326     // skip to the next available, but after the timeout
0327     if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
0328       auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
0329       if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
0330         auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
0331         auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
0332 
0333         if (elapsed_ms >= nextLumiTimeoutMillis_) {
0334           std::string msg("Timeout reached, skipping lumisection(s) ");
0335           msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
0336           msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
0337           logFileAction(msg);
0338 
0339           advanceToLumi(iter->first, "skipped: timeout");
0340         }
0341       }
0342     }
0343 
0344     if (state_ == State::EOR_CLOSING) {
0345       // check if we parsed all lumis
0346       // n_lumi is both last lumi and the number of lumi
0347       // since lumis are indexed from 1
0348 
0349       // after all lumi have been pop()'ed
0350       // current lumi will become larger than the last lumi
0351       if (nextLumiNumber_ > eor_.n_lumi) {
0352         state_ = State::EOR;
0353       }
0354     }
0355 
0356     if (state_ != old_state) {
0357       logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
0358 
0359       if (mon_) {
0360         boost::property_tree::ptree doc;
0361         doc.put("fi_state", std::to_string(state_));
0362         mon_->outputUpdate(doc);
0363       }
0364     }
0365   }
0366 
0367   void DQMFileIterator::logFileAction(const std::string& msg, const std::string& fileName) const {
0368     edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << "  " << msg << fileName;
0369     edm::FlushMessageLog();
0370   }
0371 
0372   void DQMFileIterator::logLumiState(const LumiEntry& lumi, const std::string& msg) {
0373     if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
0374       lumiSeen_[lumi.file_ls].state = msg;
0375 
0376       monUpdateLumi(lumiSeen_[lumi.file_ls]);
0377     } else {
0378       logFileAction("Internal error: referenced lumi is not the map.");
0379     }
0380   }
0381 
0382   void DQMFileIterator::delay() {
0383     if (mon_.isAvailable())
0384       mon_->keepAlive();
0385 
0386     usleep(delayMillis_ * 1000);
0387   }
0388 
0389   void DQMFileIterator::fillDescription(edm::ParameterSetDescription& desc) {
0390     desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
0391 
0392     desc.addUntracked<unsigned int>("datafnPosition", 3)
0393         ->setComment(
0394             "Data filename position in the positional arguments array 'data' in "
0395             "json file.");
0396 
0397     desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
0398 
0399     desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
0400 
0401     desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
0402         ->setComment(
0403             "Number of milliseconds to wait before switching to the next lumi "
0404             "section if the current is missing, -1 to disable.");
0405 
0406     desc.addUntracked<bool>("scanOnce", false)
0407         ->setComment(
0408             "Don't repeat file scans: use what was found during the initial scan. "
0409             "EOR file is ignored and the state is set to 'past end of run'.");
0410 
0411     desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
0412   }
0413 
0414 }  // namespace dqmservices