Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-03-19 06:07:20

0001 #include <filesystem>
0002 #include <iterator>
0003 #include <memory>
0004 #include <string>
0005 
0006 #include <boost/algorithm/string.hpp>
0007 #include <boost/algorithm/string/predicate.hpp>
0008 #include <boost/property_tree/json_parser.hpp>
0009 #include <boost/property_tree/ptree.hpp>
0010 #include <boost/range.hpp>
0011 #include <boost/regex.hpp>
0012 #include <fmt/printf.h>
0013 
0014 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0015 #include "FWCore/Utilities/interface/TimeOfDay.h"
0016 #include "DQMFileIterator.h"
0017 
0018 namespace dqmservices {
0019 
0020   DQMFileIterator::LumiEntry DQMFileIterator::LumiEntry::load_json(const std::string& run_path,
0021                                                                    const std::string& filename,
0022                                                                    int lumiNumber,
0023                                                                    int datafn_position) {
0024     boost::property_tree::ptree pt;
0025     read_json(filename, pt);
0026 
0027     LumiEntry lumi;
0028     lumi.filename = filename;
0029     lumi.run_path = run_path;
0030 
0031     lumi.n_events_processed = std::next(pt.get_child("data").begin(), 0)->second.get_value<std::size_t>();
0032 
0033     lumi.n_events_accepted = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0034 
0035     lumi.file_ls = lumiNumber;
0036 
0037     if (datafn_position >= 0) {
0038       lumi.datafn = std::next(pt.get_child("data").begin(), datafn_position)->second.get_value<std::string>();
0039     }
0040 
0041     return lumi;
0042   }
0043 
0044   std::string DQMFileIterator::LumiEntry::get_data_path() const {
0045     if (boost::starts_with(datafn, "/"))
0046       return datafn;
0047 
0048     std::filesystem::path p(run_path);
0049     p /= datafn;
0050     return p.string();
0051   }
0052 
0053   std::string DQMFileIterator::LumiEntry::get_json_path() const { return filename; }
0054 
0055   // Contents of Eor json file are ignored for the moment.
0056   // This function will not be called.
0057   DQMFileIterator::EorEntry DQMFileIterator::EorEntry::load_json(const std::string& run_path,
0058                                                                  const std::string& filename) {
0059     boost::property_tree::ptree pt;
0060     read_json(filename, pt);
0061 
0062     EorEntry eor;
0063     eor.filename = filename;
0064     eor.run_path = run_path;
0065 
0066     // We rely on n_events to be the first item on the array...
0067     eor.n_events = std::next(pt.get_child("data").begin(), 1)->second.get_value<std::size_t>();
0068     eor.n_lumi = std::next(pt.get_child("data").begin(), 2)->second.get_value<std::size_t>();
0069 
0070     eor.loaded = true;
0071     return eor;
0072   }
0073 
0074   DQMFileIterator::DQMFileIterator(edm::ParameterSet const& pset) : state_(EOR) {
0075     runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
0076     datafnPosition_ = pset.getUntrackedParameter<unsigned int>("datafnPosition");
0077     runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
0078     streamLabel_ = pset.getUntrackedParameter<std::string>("streamLabel");
0079     delayMillis_ = pset.getUntrackedParameter<uint32_t>("delayMillis");
0080     nextLumiTimeoutMillis_ = pset.getUntrackedParameter<int32_t>("nextLumiTimeoutMillis");
0081 
0082     // scan one mode
0083     flagScanOnce_ = pset.getUntrackedParameter<bool>("scanOnce");
0084 
0085     forceFileCheckTimeoutMillis_ = 5015;
0086     reset();
0087   }
0088 
0089   DQMFileIterator::~DQMFileIterator() {}
0090 
0091   void DQMFileIterator::reset() {
0092     runPath_.clear();
0093 
0094     std::vector<std::string> tokens;
0095     boost::split(tokens, runInputDir_, boost::is_any_of(":"));
0096 
0097     for (const auto& token : tokens) {
0098       runPath_.push_back(fmt::sprintf("%s/run%06d", token, runNumber_));
0099     }
0100 
0101     eor_.loaded = false;
0102     state_ = State::OPEN;
0103     nextLumiNumber_ = 1;
0104     lumiSeen_.clear();
0105     filesSeen_.clear();
0106 
0107     lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0108 
0109     collect(true);
0110     update_state();
0111 
0112     if (mon_.isAvailable()) {
0113       ptree doc;
0114       doc.put("run", runNumber_);
0115       doc.put("next_lumi", nextLumiNumber_);
0116       doc.put("fi_state", std::to_string(state_));
0117       mon_->outputUpdate(doc);
0118     }
0119   }
0120 
0121   DQMFileIterator::State DQMFileIterator::state() { return state_; }
0122 
0123   DQMFileIterator::LumiEntry DQMFileIterator::open() {
0124     LumiEntry& lumi = lumiSeen_[nextLumiNumber_];
0125     advanceToLumi(nextLumiNumber_ + 1, "open: file iterator");
0126     return lumi;
0127   }
0128 
0129   bool DQMFileIterator::lumiReady() {
0130     if (lumiSeen_.find(nextLumiNumber_) != lumiSeen_.end()) {
0131       return true;
0132     }
0133 
0134     return false;
0135   }
0136 
0137   unsigned int DQMFileIterator::runNumber() { return runNumber_; }
0138 
0139   unsigned int DQMFileIterator::lastLumiFound() {
0140     if (!lumiSeen_.empty()) {
0141       return lumiSeen_.rbegin()->first;
0142     }
0143 
0144     return 1;
0145   }
0146 
0147   void DQMFileIterator::advanceToLumi(unsigned int lumi, std::string reason) {
0148     using boost::str;
0149     using boost::property_tree::ptree;
0150 
0151     unsigned int currentLumi = nextLumiNumber_;
0152 
0153     nextLumiNumber_ = lumi;
0154     lastLumiLoad_ = std::chrono::high_resolution_clock::now();
0155 
0156     auto iter = lumiSeen_.lower_bound(currentLumi);
0157 
0158     while ((iter != lumiSeen_.end()) && ((iter->first) < nextLumiNumber_)) {
0159       iter->second.state = reason;
0160       monUpdateLumi(iter->second);
0161 
0162       ++iter;
0163     }
0164 
0165     if (mon_.isAvailable()) {
0166       // report the successful lumi file open
0167       ptree doc;
0168       doc.put("next_lumi", nextLumiNumber_);
0169       mon_->outputUpdate(doc);
0170     }
0171   }
0172 
0173   void DQMFileIterator::monUpdateLumi(const LumiEntry& lumi) {
0174     if (!mon_.isAvailable())
0175       return;
0176 
0177     ptree doc;
0178     doc.put(fmt::sprintf("extra.lumi_seen.lumi%06d", lumi.file_ls), lumi.state);
0179     mon_->outputUpdate(doc);
0180   }
0181 
0182   unsigned DQMFileIterator::mtimeHash() const {
0183     unsigned mtime_now = 0;
0184 
0185     for (const auto& path : runPath_) {
0186       if (!std::filesystem::exists(path))
0187         continue;
0188 
0189       auto write_time = std::filesystem::last_write_time(path);
0190       mtime_now =
0191           mtime_now ^ std::chrono::duration_cast<std::chrono::microseconds>(write_time.time_since_epoch()).count();
0192     }
0193 
0194     return mtime_now;
0195   }
0196 
0197   void DQMFileIterator::collect(bool ignoreTimers) {
0198     // search filesystem to find available lumi section files
0199     // or the end of run files
0200 
0201     auto now = std::chrono::high_resolution_clock::now();
0202     auto last_ms = std::chrono::duration_cast<std::chrono::milliseconds>(now - runPathLastCollect_).count();
0203 
0204     // don't refresh if it's too soon
0205     if ((!ignoreTimers) && (last_ms >= 0) && (last_ms < 100)) {
0206       return;
0207     }
0208 
0209     // check if directory changed
0210     auto mtime_now = mtimeHash();
0211 
0212     if ((!ignoreTimers) && (last_ms < forceFileCheckTimeoutMillis_) && (mtime_now == runPathMTime_)) {
0213       // logFileAction("Directory hasn't changed.");
0214       return;
0215     } else {
0216       // logFileAction("Directory changed, updating.");
0217     }
0218 
0219     runPathMTime_ = mtime_now;
0220     runPathLastCollect_ = now;
0221 
0222     using std::filesystem::directory_entry;
0223     using std::filesystem::directory_iterator;
0224 
0225     std::string fn_eor;
0226 
0227     for (const auto& runPath : runPath_) {
0228       if (!std::filesystem::exists(runPath)) {
0229         logFileAction("Directory does not exist: ", runPath);
0230 
0231         continue;
0232       }
0233 
0234       directory_iterator dend;
0235       for (directory_iterator di(runPath); di != dend; ++di) {
0236         const boost::regex fn_re("run(\\d+)_ls(\\d+)_([a-zA-Z0-9]+)(_.*)?\\.jsn");
0237 
0238         const std::string filename = di->path().filename().string();
0239         const std::string fn = di->path().string();
0240 
0241         if (filesSeen_.find(filename) != filesSeen_.end()) {
0242           continue;
0243         }
0244 
0245         boost::smatch result;
0246         if (boost::regex_match(filename, result, fn_re)) {
0247           unsigned int run = std::stoi(result[1]);
0248           unsigned int lumi = std::stoi(result[2]);
0249           std::string label = result[3];
0250 
0251           filesSeen_.insert(filename);
0252 
0253           if (run != runNumber_)
0254             continue;
0255 
0256           // check if this is EoR
0257           // for various reasons we have to load it after all other files
0258           if ((lumi == 0) && (label == "EoR") && (!eor_.loaded)) {
0259             fn_eor = fn;
0260             continue;
0261           }
0262 
0263           // check if lumi is loaded
0264           if (lumiSeen_.find(lumi) != lumiSeen_.end()) {
0265             continue;  // already loaded
0266           }
0267 
0268           // check if this belongs to us
0269           if (label != streamLabel_) {
0270             std::string msg("Found and skipped json file (stream label mismatch, ");
0271             msg += label + " [files] != " + streamLabel_ + " [config]";
0272             msg += "): ";
0273             logFileAction(msg, fn);
0274             continue;
0275           }
0276 
0277           try {
0278             LumiEntry lumi_jsn = LumiEntry::load_json(runPath, fn, lumi, datafnPosition_);
0279             lumiSeen_.emplace(lumi, lumi_jsn);
0280             logFileAction("Found and loaded json file: ", fn);
0281 
0282             monUpdateLumi(lumi_jsn);
0283           } catch (const std::exception& e) {
0284             // don't reset the mtime, keep it waiting
0285             filesSeen_.erase(filename);
0286 
0287             std::string msg("Found, tried to load the json, but failed (");
0288             msg += e.what();
0289             msg += "): ";
0290             logFileAction(msg, fn);
0291           }
0292         }
0293       }
0294     }
0295 
0296     if ((!fn_eor.empty()) or flagScanOnce_) {
0297       if (!fn_eor.empty()) {
0298         logFileAction("EoR file found: ", fn_eor);
0299       }
0300 
0301       // @TODO load EoR files correctly
0302       // eor_ = EorEntry::load_json(fn_eor);
0303       // logFileAction("Loaded eor file: ", fn_eor);
0304 
0305       // for now , set n_lumi to the highest _found_ lumi
0306       eor_.loaded = true;
0307 
0308       if (lumiSeen_.empty()) {
0309         eor_.n_lumi = 0;
0310       } else {
0311         eor_.n_lumi = lumiSeen_.rbegin()->first;
0312       }
0313     }
0314   }
0315 
0316   void DQMFileIterator::update_state() {
0317     using std::chrono::duration_cast;
0318     using std::chrono::high_resolution_clock;
0319     using std::chrono::milliseconds;
0320 
0321     State old_state = state_;
0322 
0323     // in scanOnce mode we don't do repeated scans
0324     // whatever found at reset() is be used
0325     if (!flagScanOnce_) {
0326       collect(false);
0327     }
0328 
0329     if ((state_ == State::OPEN) && (eor_.loaded)) {
0330       state_ = State::EOR_CLOSING;
0331     }
0332 
0333     // special case for missing lumi files
0334     // skip to the next available, but after the timeout
0335     if ((state_ != State::EOR) && (nextLumiTimeoutMillis_ >= 0)) {
0336       auto iter = lumiSeen_.lower_bound(nextLumiNumber_);
0337       if ((iter != lumiSeen_.end()) && iter->first != nextLumiNumber_) {
0338         auto elapsed = high_resolution_clock::now() - lastLumiLoad_;
0339         auto elapsed_ms = duration_cast<milliseconds>(elapsed).count();
0340 
0341         if (elapsed_ms >= nextLumiTimeoutMillis_) {
0342           std::string msg("Timeout reached, skipping lumisection(s) ");
0343           msg += std::to_string(nextLumiNumber_) + " .. " + std::to_string(iter->first - 1);
0344           msg += ", nextLumiNumber_ is now " + std::to_string(iter->first);
0345           logFileAction(msg);
0346 
0347           advanceToLumi(iter->first, "skipped: timeout");
0348         }
0349       }
0350     }
0351 
0352     if (state_ == State::EOR_CLOSING) {
0353       // check if we parsed all lumis
0354       // n_lumi is both last lumi and the number of lumi
0355       // since lumis are indexed from 1
0356 
0357       // after all lumi have been pop()'ed
0358       // current lumi will become larger than the last lumi
0359       if (nextLumiNumber_ > eor_.n_lumi) {
0360         state_ = State::EOR;
0361       }
0362     }
0363 
0364     if (state_ != old_state) {
0365       logFileAction("Streamer state changed: ", std::to_string(old_state) + "->" + std::to_string(state_));
0366 
0367       if (mon_) {
0368         ptree doc;
0369         doc.put("fi_state", std::to_string(state_));
0370         mon_->outputUpdate(doc);
0371       }
0372     }
0373   }
0374 
0375   void DQMFileIterator::logFileAction(const std::string& msg, const std::string& fileName) const {
0376     edm::LogAbsolute("fileAction") << std::setprecision(0) << edm::TimeOfDay() << "  " << msg << fileName;
0377     edm::FlushMessageLog();
0378   }
0379 
0380   void DQMFileIterator::logLumiState(const LumiEntry& lumi, const std::string& msg) {
0381     if (lumiSeen_.find(lumi.file_ls) != lumiSeen_.end()) {
0382       lumiSeen_[lumi.file_ls].state = msg;
0383 
0384       monUpdateLumi(lumiSeen_[lumi.file_ls]);
0385     } else {
0386       logFileAction("Internal error: referenced lumi is not the map.");
0387     }
0388   }
0389 
0390   void DQMFileIterator::delay() {
0391     if (mon_.isAvailable())
0392       mon_->keepAlive();
0393 
0394     usleep(delayMillis_ * 1000);
0395   }
0396 
0397   void DQMFileIterator::fillDescription(edm::ParameterSetDescription& desc) {
0398     desc.addUntracked<unsigned int>("runNumber")->setComment("Run number passed via configuration file.");
0399 
0400     desc.addUntracked<unsigned int>("datafnPosition", 3)
0401         ->setComment(
0402             "Data filename position in the positional arguments array 'data' in "
0403             "json file.");
0404 
0405     desc.addUntracked<std::string>("streamLabel")->setComment("Stream label used in json discovery.");
0406 
0407     desc.addUntracked<uint32_t>("delayMillis")->setComment("Number of milliseconds to wait between file checks.");
0408 
0409     desc.addUntracked<int32_t>("nextLumiTimeoutMillis", -1)
0410         ->setComment(
0411             "Number of milliseconds to wait before switching to the next lumi "
0412             "section if the current is missing, -1 to disable.");
0413 
0414     desc.addUntracked<bool>("scanOnce", false)
0415         ->setComment(
0416             "Don't repeat file scans: use what was found during the initial scan. "
0417             "EOR file is ignored and the state is set to 'past end of run'.");
0418 
0419     desc.addUntracked<std::string>("runInputDir")->setComment("Directory where the DQM files will appear.");
0420   }
0421 
0422 }  // namespace dqmservices