Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-10-06 02:53:53

0001 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0002 #include "FWCore/Utilities/interface/Exception.h"
0003 #include "FWCore/Utilities/interface/EDMException.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Sources/interface/EventSkipperByID.h"
0008 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0009 
0010 #include "FWCore/Utilities/interface/RegexMatch.h"
0011 #include "DQMStreamerReader.h"
0012 
0013 #include <cstdlib>
0014 #include <filesystem>
0015 #include <fstream>
0016 #include <memory>
0017 #include <queue>
0018 #include <algorithm>
0019 #include <cctype>
0020 
0021 #include <IOPool/Streamer/interface/DumpTools.h>
0022 
0023 namespace dqmservices {
0024 
0025   DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0026       : StreamerInputSource(pset, desc), fiterator_(pset) {
0027     runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
0028     runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
0029     hltSel_ = pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
0030 
0031     minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
0032     flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
0033     flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
0034     flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
0035 
0036     triggerSel();
0037 
0038     reset_();
0039   }
0040 
0041   DQMStreamerReader::~DQMStreamerReader() {
0042     // Sometimes(?) the destructor called after service registry was already destructed
0043     // and closeFile_ throws away no ServiceRegistry found exception...
0044     //
0045     // Normally, this file should be closed before this destructor is called.
0046     //closeFileImp_("destructor");
0047   }
0048 
0049   void DQMStreamerReader::reset_() {
0050     // We have to load at least a single header,
0051     // so the ProductRegistry gets initialized.
0052     //
0053     // This must happen here (inside the constructor),
0054     // as ProductRegistry gets frozen after we initialize:
0055     // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
0056 
0057     fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
0058 
0059     fiterator_.update_state();
0060 
0061     // Fast-forward to the last open file.
0062     if (flagSkipFirstLumis_) {
0063       unsigned int l = fiterator_.lastLumiFound();
0064       if (l > 1) {
0065         fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
0066       }
0067     }
0068 
0069     for (;;) {
0070       bool next = prepareNextFile();
0071 
0072       // check for end of run
0073       if (!next) {
0074         fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
0075         return;
0076       }
0077 
0078       // check if we have a file openned
0079       if (file_.open()) {
0080         // we are now initialised
0081         break;
0082       }
0083 
0084       // wait
0085       fiterator_.delay();
0086     }
0087 
0088     fiterator_.logFileAction("DQMStreamerReader initialised.");
0089   }
0090 
0091   void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
0092     processedEventPerLs_ = 0;
0093     edm::ParameterSet pset;
0094 
0095     std::string path = entry.get_data_path();
0096 
0097     file_.lumi_ = entry;
0098     file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
0099 
0100     InitMsgView const* header = getHeaderMsg();
0101     if (isFirstFile_) {
0102       deserializeAndMergeWithRegistry(*header, false);
0103     }
0104 
0105     // dump the list of HLT trigger name from the header
0106     //  dumpInitHeader(header);
0107 
0108     // if specific trigger selection is requested, check if the requested triggers
0109     // match with trigger paths in the header file
0110     if (!acceptAllEvt_) {
0111       Strings tnames;
0112       header->hltTriggerNames(tnames);
0113 
0114       pset.addParameter<Strings>("SelectEvents", hltSel_);
0115       eventSelector_.reset(new TriggerSelector(pset, tnames));
0116 
0117       // check if any trigger path name requested matches with trigger name in the
0118       // header file
0119       matchTriggerSel(tnames);
0120     }
0121 
0122     // our initialization
0123     processedEventPerLs_ = 0;
0124 
0125     if (flagDeleteDatFiles_) {
0126       // unlink the file
0127       unlink(path.c_str());
0128     }
0129   }
0130 
0131   void DQMStreamerReader::genuineCloseFile() {}
0132 
0133   void DQMStreamerReader::closeFileImp_(const std::string& reason) {
0134     if (file_.open()) {
0135       file_.streamFile_->closeStreamerFile();
0136       file_.streamFile_ = nullptr;
0137 
0138       fiterator_.logLumiState(file_.lumi_, "close: " + reason);
0139     }
0140   }
0141 
0142   void DQMStreamerReader::genuineReadFile() {
0143     if (isFirstFile_) {
0144       //The file was already opened in the constructor
0145       isFirstFile_ = false;
0146       return;
0147     }
0148 
0149     //Get header/init from reader
0150     InitMsgView const* header = getHeaderMsg();
0151     deserializeAndMergeWithRegistry(*header, true);
0152   }
0153 
0154   bool DQMStreamerReader::openNextFileImp_() {
0155     closeFileImp_("skipping to another file");
0156 
0157     DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
0158     std::string p = currentLumi.get_data_path();
0159 
0160     if (std::filesystem::exists(p)) {
0161       try {
0162         openFileImp_(currentLumi);
0163         return true;
0164       } catch (const cms::Exception& e) {
0165         fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
0166         fiterator_.logLumiState(currentLumi, "error: data file corrupted");
0167 
0168         closeFileImp_("data file corrupted");
0169         return false;
0170       }
0171     } else {
0172       /* dat file missing */
0173       fiterator_.logFileAction("Data file (specified in json) is missing:", p);
0174       fiterator_.logLumiState(currentLumi, "error: data file missing");
0175 
0176       return false;
0177     }
0178   }
0179 
0180   InitMsgView const* DQMStreamerReader::getHeaderMsg() {
0181     InitMsgView const* header = file_.streamFile_->startMessage();
0182 
0183     if (header->code() != Header::INIT) {  // INIT Msg
0184       throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
0185           << "received wrong message type: expected INIT, got " << header->code() << "\n";
0186     }
0187 
0188     return header;
0189   }
0190 
0191   EventMsgView const* DQMStreamerReader::getEventMsg() {
0192     auto next = file_.streamFile_->next();
0193     if (edm::StreamerInputFile::Next::kFile == next) {
0194       return nullptr;
0195     }
0196 
0197     if (edm::StreamerInputFile::Next::kStop == next) {
0198       return nullptr;
0199     }
0200 
0201     EventMsgView const* msg = file_.streamFile_->currentRecord();
0202 
0203     //  if (msg != nullptr) dumpEventView(msg);
0204     return msg;
0205   }
0206 
0207   /**
0208  * Prepare (open) the next file for reading.
0209  * It is used by prepareNextEvent and in the constructor.
0210  *
0211  * Does not block/wait.
0212  *
0213  * Return false if this is end of run and/or no more file are available.
0214  * However, return of "true" does not imply the file has been openned,
0215  * but we need to wait until some future file becomes available.
0216  */
0217   bool DQMStreamerReader::prepareNextFile() {
0218     typedef DQMFileIterator::State State;
0219 
0220     for (;;) {
0221       fiterator_.update_state();
0222 
0223       if (edm::shutdown_flag.load()) {
0224         fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0225 
0226         closeFileImp_("shutdown flag is set");
0227         return false;
0228       }
0229 
0230       // check for end of run file and force quit
0231       if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0232         closeFileImp_("forced end-of-run");
0233         return false;
0234       }
0235 
0236       // check for end of run and quit if everything has been processed.
0237       // this clean exit
0238       if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0239         return false;
0240       }
0241 
0242       // if this is end of run and no more files to process
0243       // close it
0244       if ((processedEventPerLs_ >= minEventsPerLs_) && (!fiterator_.lumiReady()) &&
0245           (fiterator_.state() == State::EOR)) {
0246         closeFileImp_("graceful end-of-run");
0247         return false;
0248       }
0249 
0250       // skip to the next file if we have no files openned yet
0251       if (!file_.open()) {
0252         if (fiterator_.lumiReady()) {
0253           openNextFileImp_();
0254           // we might need to open once more (if .dat is missing)
0255           continue;
0256         }
0257       }
0258 
0259       // or if there is a next file and enough eventshas been processed.
0260       if (fiterator_.lumiReady() && (processedEventPerLs_ >= minEventsPerLs_)) {
0261         openNextFileImp_();
0262         // we might need to open once more (if .dat is missing)
0263         continue;
0264       }
0265 
0266       return true;
0267     }
0268   }
0269 
0270   /**
0271  * Waits and reads the event header.
0272  * If end-of-run nullptr is returned.
0273  */
0274   EventMsgView const* DQMStreamerReader::prepareNextEvent() {
0275     EventMsgView const* eview = nullptr;
0276     typedef DQMFileIterator::State State;
0277 
0278     // wait for the next event
0279     for (;;) {
0280       // edm::LogAbsolute("DQMStreamerReader")
0281       //     << "State loop.";
0282       bool next = prepareNextFile();
0283       if (!next)
0284         return nullptr;
0285 
0286       // sleep
0287       if (!file_.open()) {
0288         // the reader does not exist
0289         fiterator_.delay();
0290       } else {
0291         // our reader exists, try to read out an event
0292         eview = getEventMsg();
0293 
0294         if (eview == nullptr) {
0295           // read unsuccessful
0296           // this means end of file, so close the file
0297           closeFileImp_("eof");
0298         } else {
0299           if (!acceptEvent(eview)) {
0300             continue;
0301           } else {
0302             return eview;
0303           }
0304         }
0305       }
0306     }
0307     return eview;
0308   }
0309 
0310   /**
0311  * This is the actual code for checking the new event and/or deserializing it.
0312  */
0313   edm::RawInputSource::Next DQMStreamerReader::checkNext() {
0314     try {
0315       EventMsgView const* eview = prepareNextEvent();
0316       if (eview == nullptr) {
0317         if (file_.streamFile_ and file_.streamFile_->newHeader()) {
0318           return Next::kFile;
0319         }
0320         return Next::kStop;
0321       }
0322 
0323       deserializeEvent(*eview);
0324     } catch (const cms::Exception& e) {
0325       // try to recover from corrupted files/events
0326       fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
0327       closeFileImp_("data file corrupted");
0328 
0329       // this is not optimal, but hopefully we won't catch this many times in a row
0330       return checkNext();
0331     }
0332 
0333     processedEventPerLs_ += 1;
0334 
0335     return Next::kEvent;
0336   }
0337 
0338   /**
0339  * If hlt trigger selection is '*', return a boolean variable to accept all
0340  * events
0341  */
0342   bool DQMStreamerReader::triggerSel() {
0343     acceptAllEvt_ = false;
0344     for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
0345       std::string hltPath(*i);
0346       hltPath.erase(
0347           std::remove_if(
0348               hltPath.begin(), hltPath.end(), [](char c) { return std::isspace(static_cast<unsigned char>(c)); }),
0349           hltPath.end());
0350       if (hltPath == "*")
0351         acceptAllEvt_ = true;
0352     }
0353     return acceptAllEvt_;
0354   }
0355 
0356   /**
0357  * Check if hlt selection matches any trigger name taken from the header file
0358  */
0359   bool DQMStreamerReader::matchTriggerSel(Strings const& tnames) {
0360     matchTriggerSel_ = false;
0361     for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
0362       std::string hltPath(*i);
0363       hltPath.erase(
0364           std::remove_if(
0365               hltPath.begin(), hltPath.end(), [](char c) { return std::isspace(static_cast<unsigned char>(c)); }),
0366           hltPath.end());
0367       std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
0368       if (!matches.empty()) {
0369         matchTriggerSel_ = true;
0370       }
0371     }
0372 
0373     if (!matchTriggerSel_) {
0374       edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
0375     }
0376 
0377     return matchTriggerSel_;
0378   }
0379 
0380   /**
0381  * Check the trigger path to accept event
0382  */
0383   bool DQMStreamerReader::acceptEvent(const EventMsgView* evtmsg) {
0384     if (acceptAllEvt_)
0385       return true;
0386     if (!matchTriggerSel_)
0387       return false;
0388 
0389     std::vector<unsigned char> hltTriggerBits_;
0390     int hltTriggerCount_ = evtmsg->hltCount();
0391     if (hltTriggerCount_ > 0) {
0392       hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
0393     }
0394     evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
0395 
0396     if (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
0397       return true;
0398     } else {
0399       return false;
0400     }
0401   }
0402 
0403   void DQMStreamerReader::skip(int toSkip) {
0404     try {
0405       for (int i = 0; i != toSkip; ++i) {
0406         EventMsgView const* evMsg = prepareNextEvent();
0407 
0408         if (evMsg == nullptr) {
0409           return;
0410         }
0411       }
0412     } catch (const cms::Exception& e) {
0413       // try to recover from corrupted files/events
0414       fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
0415       closeFileImp_("data file corrupted");
0416     }
0417   }
0418 
0419   void DQMStreamerReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0420     edm::ParameterSetDescription desc;
0421     desc.setComment("Reads events from streamer files.");
0422 
0423     desc.addUntracked<std::vector<std::string> >("SelectEvents")->setComment("HLT path to select events ");
0424 
0425     desc.addUntracked<int>("minEventsPerLumi", 1)
0426         ->setComment(
0427             "Minimum number of events to process per lumisection, "
0428             "before switching to a new input file. If the next file "
0429             "does not yet exist, "
0430             "the number of processed events will be bigger.");
0431 
0432     desc.addUntracked<bool>("skipFirstLumis", false)
0433         ->setComment(
0434             "Skip (and ignore the minEventsPerLumi parameter) for the files "
0435             "which have been available at the begining of the processing. "
0436             "If set to true, the reader will open last available file for "
0437             "processing.");
0438 
0439     desc.addUntracked<bool>("deleteDatFiles", false)
0440         ->setComment(
0441             "Delete data files after they have been closed, in order to "
0442             "save disk space.");
0443 
0444     desc.addUntracked<bool>("endOfRunKills", false)
0445         ->setComment(
0446             "Kill the processing as soon as the end-of-run file appears, even if "
0447             "there are/will be unprocessed lumisections.");
0448 
0449     // desc.addUntracked<unsigned int>("skipEvents", 0U)
0450     //    ->setComment("Skip the first 'skipEvents' events that otherwise would "
0451     //                 "have been processed.");
0452 
0453     // This next parameter is read in the base class, but its default value
0454     // depends on the derived class, so it is set here.
0455     desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0456 
0457     DQMFileIterator::fillDescription(desc);
0458     edm::StreamerInputSource::fillDescription(desc);
0459     edm::EventSkipperByID::fillDescription(desc);
0460 
0461     descriptions.add("source", desc);
0462   }
0463 
0464 }  // namespace dqmservices
0465 
0466 #include "FWCore/Framework/interface/InputSourceMacros.h"
0467 #include "FWCore/Framework/interface/MakerMacros.h"
0468 
0469 typedef dqmservices::DQMStreamerReader DQMStreamerReader;
0470 DEFINE_FWK_INPUT_SOURCE(DQMStreamerReader);