Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-01-16 23:36:49

0001 #include "DQMStreamerReader.h"
0002 
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Sources/interface/EventSkipperByID.h"
0008 #include "FWCore/Utilities/interface/Exception.h"
0009 #include "FWCore/Utilities/interface/RegexMatch.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 #include "IOPool/Streamer/interface/DumpTools.h"
0012 
0013 #include <cstdlib>
0014 #include <filesystem>
0015 #include <fstream>
0016 #include <queue>
0017 #include <algorithm>
0018 #include <cctype>
0019 
0020 namespace dqmservices {
0021 
0022   DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0023       : StreamerInputSource(pset, desc),
0024         fiterator_(pset),
0025         minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
0026         flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0027         flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0028         flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0029         hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")) {
0030     setAcceptAllEvt();
0031     reset_();
0032   }
0033 
0034   DQMStreamerReader::~DQMStreamerReader() {
0035     // Sometimes(?) the destructor called after service registry was already destructed
0036     // and closeFile_ throws away no ServiceRegistry found exception...
0037     //
0038     // Normally, this file should be closed before this destructor is called.
0039     //closeFileImp_("destructor");
0040   }
0041 
0042   void DQMStreamerReader::reset_() {
0043     // We have to load at least a single header,
0044     // so the ProductRegistry gets initialized.
0045     //
0046     // This must happen here (inside the constructor),
0047     // as ProductRegistry gets frozen after we initialize:
0048     // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
0049 
0050     fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
0051 
0052     fiterator_.update_state();
0053 
0054     // Fast-forward to the last open file.
0055     if (flagSkipFirstLumis_) {
0056       unsigned int l = fiterator_.lastLumiFound();
0057       if (l > 1) {
0058         fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
0059       }
0060     }
0061 
0062     for (;;) {
0063       bool next = prepareNextFile();
0064 
0065       // check for end of run
0066       if (!next) {
0067         fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
0068         return;
0069       }
0070 
0071       // check if we have a file openned
0072       if (file_.open()) {
0073         // we are now initialised
0074         break;
0075       }
0076 
0077       // wait
0078       fiterator_.delay();
0079     }
0080 
0081     fiterator_.logFileAction("DQMStreamerReader initialised.");
0082   }
0083 
0084   void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
0085     processedEventPerLs_ = 0;
0086 
0087     std::string path = entry.get_data_path();
0088 
0089     file_.lumi_ = entry;
0090     file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
0091 
0092     InitMsgView const* header = getHeaderMsg();
0093     if (isFirstFile_) {
0094       deserializeAndMergeWithRegistry(*header, false);
0095     }
0096 
0097     // dump the list of HLT trigger name from the header
0098     //  dumpInitHeader(header);
0099 
0100     // if specific trigger selection is requested, check if the requested triggers match with trigger paths in the header file
0101     if (!acceptAllEvt_) {
0102       std::vector<std::string> tnames;
0103       header->hltTriggerNames(tnames);
0104 
0105       triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
0106 
0107       // check if any trigger path name requested matches with trigger name in the header file
0108       setMatchTriggerSel(tnames);
0109     }
0110 
0111     // our initialization
0112     processedEventPerLs_ = 0;
0113 
0114     if (flagDeleteDatFiles_) {
0115       // unlink the file
0116       unlink(path.c_str());
0117     }
0118   }
0119 
0120   void DQMStreamerReader::genuineCloseFile() {}
0121 
0122   void DQMStreamerReader::closeFileImp_(const std::string& reason) {
0123     if (file_.open()) {
0124       file_.streamFile_->closeStreamerFile();
0125       file_.streamFile_ = nullptr;
0126 
0127       fiterator_.logLumiState(file_.lumi_, "close: " + reason);
0128     }
0129   }
0130 
0131   void DQMStreamerReader::genuineReadFile() {
0132     if (isFirstFile_) {
0133       //The file was already opened in the constructor
0134       isFirstFile_ = false;
0135       return;
0136     }
0137 
0138     //Get header/init from reader
0139     InitMsgView const* header = getHeaderMsg();
0140     deserializeAndMergeWithRegistry(*header, true);
0141   }
0142 
0143   bool DQMStreamerReader::openNextFileImp_() {
0144     closeFileImp_("skipping to another file");
0145 
0146     DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
0147     std::string p = currentLumi.get_data_path();
0148 
0149     if (std::filesystem::exists(p)) {
0150       try {
0151         openFileImp_(currentLumi);
0152         return true;
0153       } catch (const cms::Exception& e) {
0154         fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
0155         fiterator_.logLumiState(currentLumi, "error: data file corrupted");
0156 
0157         closeFileImp_("data file corrupted");
0158         return false;
0159       }
0160     } else {
0161       /* dat file missing */
0162       fiterator_.logFileAction("Data file (specified in json) is missing:", p);
0163       fiterator_.logLumiState(currentLumi, "error: data file missing");
0164 
0165       return false;
0166     }
0167   }
0168 
0169   InitMsgView const* DQMStreamerReader::getHeaderMsg() {
0170     InitMsgView const* header = file_.streamFile_->startMessage();
0171 
0172     if (header->code() != Header::INIT) {  // INIT Msg
0173       throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
0174           << "received wrong message type: expected INIT, got " << header->code() << "\n";
0175     }
0176 
0177     return header;
0178   }
0179 
0180   EventMsgView const* DQMStreamerReader::getEventMsg() {
0181     auto next = file_.streamFile_->next();
0182     if (edm::StreamerInputFile::Next::kFile == next) {
0183       return nullptr;
0184     }
0185 
0186     if (edm::StreamerInputFile::Next::kStop == next) {
0187       return nullptr;
0188     }
0189 
0190     EventMsgView const* msg = file_.streamFile_->currentRecord();
0191 
0192     //  if (msg != nullptr) dumpEventView(msg);
0193     return msg;
0194   }
0195 
0196   /**
0197  * Prepare (open) the next file for reading.
0198  * It is used by prepareNextEvent and in the constructor.
0199  *
0200  * Does not block/wait.
0201  *
0202  * Return false if this is end of run and/or no more file are available.
0203  * However, return of "true" does not imply the file has been openned,
0204  * but we need to wait until some future file becomes available.
0205  */
0206   bool DQMStreamerReader::prepareNextFile() {
0207     typedef DQMFileIterator::State State;
0208 
0209     for (;;) {
0210       fiterator_.update_state();
0211 
0212       if (edm::shutdown_flag.load()) {
0213         fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0214 
0215         closeFileImp_("shutdown flag is set");
0216         return false;
0217       }
0218 
0219       // check for end of run file and force quit
0220       if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0221         closeFileImp_("forced end-of-run");
0222         return false;
0223       }
0224 
0225       // check for end of run and quit if everything has been processed.
0226       // this clean exit
0227       if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0228         return false;
0229       }
0230 
0231       // if this is end of run and no more files to process
0232       // close it
0233       if ((processedEventPerLs_ >= minEventsPerLs_) && (!fiterator_.lumiReady()) &&
0234           (fiterator_.state() == State::EOR)) {
0235         closeFileImp_("graceful end-of-run");
0236         return false;
0237       }
0238 
0239       // skip to the next file if we have no files openned yet
0240       if (!file_.open()) {
0241         if (fiterator_.lumiReady()) {
0242           openNextFileImp_();
0243           // we might need to open once more (if .dat is missing)
0244           continue;
0245         }
0246       }
0247 
0248       // or if there is a next file and enough eventshas been processed.
0249       if (fiterator_.lumiReady() && (processedEventPerLs_ >= minEventsPerLs_)) {
0250         openNextFileImp_();
0251         // we might need to open once more (if .dat is missing)
0252         continue;
0253       }
0254 
0255       return true;
0256     }
0257   }
0258 
0259   /**
0260  * Waits and reads the event header.
0261  * If end-of-run nullptr is returned.
0262  */
0263   EventMsgView const* DQMStreamerReader::prepareNextEvent() {
0264     EventMsgView const* eview = nullptr;
0265     typedef DQMFileIterator::State State;
0266 
0267     // wait for the next event
0268     for (;;) {
0269       // edm::LogAbsolute("DQMStreamerReader")
0270       //     << "State loop.";
0271       bool next = prepareNextFile();
0272       if (!next)
0273         return nullptr;
0274 
0275       // sleep
0276       if (!file_.open()) {
0277         // the reader does not exist
0278         fiterator_.delay();
0279       } else {
0280         // our reader exists, try to read out an event
0281         eview = getEventMsg();
0282 
0283         if (eview == nullptr) {
0284           // read unsuccessful
0285           // this means end of file, so close the file
0286           closeFileImp_("eof");
0287         } else {
0288           if (!acceptEvent(eview)) {
0289             continue;
0290           } else {
0291             return eview;
0292           }
0293         }
0294       }
0295     }
0296     return eview;
0297   }
0298 
0299   /**
0300  * This is the actual code for checking the new event and/or deserializing it.
0301  */
0302   edm::RawInputSource::Next DQMStreamerReader::checkNext() {
0303     try {
0304       EventMsgView const* eview = prepareNextEvent();
0305       if (eview == nullptr) {
0306         if (file_.streamFile_ and file_.streamFile_->newHeader()) {
0307           return Next::kFile;
0308         }
0309         return Next::kStop;
0310       }
0311 
0312       deserializeEvent(*eview);
0313     } catch (const cms::Exception& e) {
0314       // try to recover from corrupted files/events
0315       fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
0316       closeFileImp_("data file corrupted");
0317 
0318       // this is not optimal, but hopefully we won't catch this many times in a row
0319       return checkNext();
0320     }
0321 
0322     processedEventPerLs_ += 1;
0323 
0324     return Next::kEvent;
0325   }
0326 
0327   /**
0328  * If hlt trigger selection is '*', return a boolean variable to accept all
0329  * events
0330  */
0331   bool DQMStreamerReader::setAcceptAllEvt() {
0332     acceptAllEvt_ = false;
0333     for (auto hltPath : hltSel_) {
0334       hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0335                     hltPath.end());
0336       if (hltPath == "*") {
0337         acceptAllEvt_ = true;
0338         break;
0339       }
0340     }
0341     return acceptAllEvt_;
0342   }
0343 
0344   /**
0345  * Check if hlt selection matches any trigger name taken from the header file
0346  */
0347   bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
0348     matchTriggerSel_ = false;
0349     for (auto hltPath : hltSel_) {
0350       hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0351                     hltPath.end());
0352       auto const matches = edm::regexMatch(tnames, hltPath);
0353       if (not matches.empty()) {
0354         matchTriggerSel_ = true;
0355         break;
0356       }
0357     }
0358 
0359     if (not matchTriggerSel_) {
0360       edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
0361     }
0362 
0363     return matchTriggerSel_;
0364   }
0365 
0366   /**
0367  * Check the trigger path to accept event
0368  */
0369   bool DQMStreamerReader::acceptEvent(const EventMsgView* evtmsg) {
0370     if (acceptAllEvt_)
0371       return true;
0372     if (!matchTriggerSel_)
0373       return false;
0374 
0375     std::vector<unsigned char> hltTriggerBits_;
0376     int hltTriggerCount_ = evtmsg->hltCount();
0377     if (hltTriggerCount_ > 0) {
0378       hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
0379     }
0380     evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
0381 
0382     return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
0383   }
0384 
0385   void DQMStreamerReader::skip(int toSkip) {
0386     try {
0387       for (int i = 0; i != toSkip; ++i) {
0388         EventMsgView const* evMsg = prepareNextEvent();
0389 
0390         if (evMsg == nullptr) {
0391           return;
0392         }
0393       }
0394     } catch (const cms::Exception& e) {
0395       // try to recover from corrupted files/events
0396       fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
0397       closeFileImp_("data file corrupted");
0398     }
0399   }
0400 
0401   void DQMStreamerReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0402     edm::ParameterSetDescription desc;
0403     desc.setComment("Reads events from streamer files.");
0404 
0405     desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
0406 
0407     desc.addUntracked<int>("minEventsPerLumi", 1)
0408         ->setComment(
0409             "Minimum number of events to process per lumisection, "
0410             "before switching to a new input file. If the next file "
0411             "does not yet exist, "
0412             "the number of processed events will be bigger.");
0413 
0414     desc.addUntracked<bool>("skipFirstLumis", false)
0415         ->setComment(
0416             "Skip (and ignore the minEventsPerLumi parameter) for the files "
0417             "which have been available at the begining of the processing. "
0418             "If set to true, the reader will open last available file for "
0419             "processing.");
0420 
0421     desc.addUntracked<bool>("deleteDatFiles", false)
0422         ->setComment(
0423             "Delete data files after they have been closed, in order to "
0424             "save disk space.");
0425 
0426     desc.addUntracked<bool>("endOfRunKills", false)
0427         ->setComment(
0428             "Kill the processing as soon as the end-of-run file appears, even if "
0429             "there are/will be unprocessed lumisections.");
0430 
0431     // desc.addUntracked<unsigned int>("skipEvents", 0U)
0432     //    ->setComment("Skip the first 'skipEvents' events that otherwise would "
0433     //                 "have been processed.");
0434 
0435     // This next parameter is read in the base class, but its default value
0436     // depends on the derived class, so it is set here.
0437     desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0438 
0439     DQMFileIterator::fillDescription(desc);
0440     edm::StreamerInputSource::fillDescription(desc);
0441     edm::EventSkipperByID::fillDescription(desc);
0442 
0443     descriptions.add("source", desc);
0444   }
0445 
0446 }  // namespace dqmservices
0447 
0448 #include "FWCore/Framework/interface/InputSourceMacros.h"
0449 #include "FWCore/Framework/interface/MakerMacros.h"
0450 
0451 typedef dqmservices::DQMStreamerReader DQMStreamerReader;
0452 DEFINE_FWK_INPUT_SOURCE(DQMStreamerReader);