Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-02-26 04:25:12

0001 #include "DQMStreamerReader.h"
0002 
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Sources/interface/EventSkipperByID.h"
0008 #include "FWCore/Utilities/interface/Exception.h"
0009 #include "FWCore/Utilities/interface/RegexMatch.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 #include "IOPool/Streamer/interface/DumpTools.h"
0012 
0013 #include <algorithm>
0014 #include <cctype>
0015 #include <cstdlib>
0016 #include <filesystem>
0017 #include <fstream>
0018 #include <memory>
0019 #include <queue>
0020 
0021 namespace dqmservices {
0022   using namespace edm::streamer;
0023 
0024   DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0025       : StreamerInputSource(pset, desc),
0026         fiterator_(pset),
0027         minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
0028         flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0029         flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0030         flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0031         hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")),
0032         unitTest_(pset.getUntrackedParameter<bool>("unitTest", false)) {
0033     setAcceptAllEvt();
0034     reset_();
0035   }
0036 
0037   DQMStreamerReader::~DQMStreamerReader() {
0038     // Sometimes(?) the destructor called after service registry was already destructed
0039     // and closeFile_ throws away no ServiceRegistry found exception...
0040     //
0041     // Normally, this file should be closed before this destructor is called.
0042     //closeFileImp_("destructor");
0043   }
0044 
0045   void DQMStreamerReader::reset_() {
0046     // We have to load at least a single header,
0047     // so the ProductRegistry gets initialized.
0048     //
0049     // This must happen here (inside the constructor),
0050     // as ProductRegistry gets frozen after we initialize:
0051     // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
0052 
0053     fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
0054 
0055     fiterator_.update_state();
0056 
0057     // Fast-forward to the last open file.
0058     if (flagSkipFirstLumis_) {
0059       unsigned int l = fiterator_.lastLumiFound();
0060       if (l > 1) {
0061         fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
0062       }
0063     }
0064 
0065     for (;;) {
0066       bool next = prepareNextFile();
0067 
0068       // check for end of run
0069       if (!next) {
0070         fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
0071         return;
0072       }
0073 
0074       // check if we have a file openned
0075       if (file_.open()) {
0076         // we are now initialised
0077         break;
0078       }
0079 
0080       // wait
0081       fiterator_.delay();
0082     }
0083 
0084     fiterator_.logFileAction("DQMStreamerReader initialised.");
0085   }
0086 
0087   void DQMStreamerReader::setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent) {
0088     deserializeAndMergeWithRegistry(msg, subsequent);
0089     auto event = getEventMsg();
0090     //file might be empty
0091     if (not event)
0092       return;
0093     assert(event->isEventMetaData());
0094     deserializeEventMetaData(*event);
0095     updateEventMetaData();
0096   }
0097   void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
0098     processedEventPerLs_ = 0;
0099 
0100     std::string path = entry.get_data_path();
0101 
0102     file_.lumi_ = entry;
0103     file_.streamFile_ = std::make_unique<StreamerInputFile>(path);
0104 
0105     InitMsgView const* header = getHeaderMsg();
0106     if (isFirstFile_) {
0107       setupMetaData(*header, false);
0108     }
0109 
0110     // dump the list of HLT trigger name from the header
0111     //  dumpInitHeader(header);
0112 
0113     // if specific trigger selection is requested, check if the requested triggers match with trigger paths in the header file
0114     if (!acceptAllEvt_) {
0115       std::vector<std::string> tnames;
0116       header->hltTriggerNames(tnames);
0117 
0118       triggerSelector_ = std::make_shared<TriggerSelector>(hltSel_, tnames);
0119 
0120       // check if any trigger path name requested matches with trigger name in the header file
0121       setMatchTriggerSel(tnames);
0122     }
0123 
0124     // our initialization
0125     processedEventPerLs_ = 0;
0126 
0127     if (flagDeleteDatFiles_) {
0128       // unlink the file
0129       unlink(path.c_str());
0130     }
0131   }
0132 
0133   void DQMStreamerReader::genuineCloseFile() {}
0134 
0135   void DQMStreamerReader::closeFileImp_(const std::string& reason) {
0136     if (file_.open()) {
0137       file_.streamFile_->closeStreamerFile();
0138       file_.streamFile_ = nullptr;
0139 
0140       fiterator_.logLumiState(file_.lumi_, "close: " + reason);
0141     }
0142   }
0143 
0144   void DQMStreamerReader::genuineReadFile() {
0145     if (isFirstFile_) {
0146       //The file was already opened in the constructor
0147       isFirstFile_ = false;
0148       return;
0149     }
0150 
0151     if (artificialFileBoundary_) {
0152       updateEventMetaData();
0153       artificialFileBoundary_ = false;
0154       return;
0155     }
0156     //Get header/init from reader
0157     InitMsgView const* header = getHeaderMsg();
0158     setupMetaData(*header, true);
0159   }
0160 
0161   bool DQMStreamerReader::openNextFileImp_() {
0162     closeFileImp_("skipping to another file");
0163 
0164     DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
0165     std::string p = currentLumi.get_data_path();
0166 
0167     if (std::filesystem::exists(p)) {
0168       try {
0169         openFileImp_(currentLumi);
0170         return true;
0171       } catch (const cms::Exception& e) {
0172         if (unitTest_) {
0173           throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::openNextFileInp")
0174               << std::string("Can't deserialize registry data (in open file): ") + e.what()
0175               << "\n error: data file corrupted, rethrowing!";
0176         }
0177 
0178         fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
0179         fiterator_.logLumiState(currentLumi, "error: data file corrupted");
0180 
0181         closeFileImp_("data file corrupted");
0182         return false;
0183       }
0184     } else {
0185       /* dat file missing */
0186       fiterator_.logFileAction("Data file (specified in json) is missing:", p);
0187       fiterator_.logLumiState(currentLumi, "error: data file missing");
0188 
0189       return false;
0190     }
0191   }
0192 
0193   InitMsgView const* DQMStreamerReader::getHeaderMsg() {
0194     InitMsgView const* header = file_.streamFile_->startMessage();
0195 
0196     if (header->code() != Header::INIT) {  // INIT Msg
0197       throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
0198           << "received wrong message type: expected INIT, got " << header->code() << "\n";
0199     }
0200 
0201     return header;
0202   }
0203 
0204   EventMsgView const* DQMStreamerReader::getEventMsg() {
0205     auto next = file_.streamFile_->next();
0206     if (StreamerInputFile::Next::kFile == next) {
0207       return nullptr;
0208     }
0209 
0210     if (StreamerInputFile::Next::kStop == next) {
0211       return nullptr;
0212     }
0213 
0214     EventMsgView const* msg = file_.streamFile_->currentRecord();
0215 
0216     //  if (msg != nullptr) dumpEventView(msg);
0217     return msg;
0218   }
0219 
0220   /**
0221  * Prepare (open) the next file for reading.
0222  * It is used by prepareNextEvent and in the constructor.
0223  *
0224  * Does not block/wait.
0225  *
0226  * Return false if this is end of run and/or no more file are available.
0227  * However, return of "true" does not imply the file has been openned,
0228  * but we need to wait until some future file becomes available.
0229  */
0230   bool DQMStreamerReader::prepareNextFile() {
0231     typedef DQMFileIterator::State State;
0232 
0233     for (;;) {
0234       fiterator_.update_state();
0235 
0236       if (edm::shutdown_flag.load()) {
0237         fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0238 
0239         closeFileImp_("shutdown flag is set");
0240         return false;
0241       }
0242 
0243       // check for end of run file and force quit
0244       if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0245         closeFileImp_("forced end-of-run");
0246         return false;
0247       }
0248 
0249       // check for end of run and quit if everything has been processed.
0250       // this clean exit
0251       if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0252         return false;
0253       }
0254 
0255       // if this is end of run and no more files to process
0256       // close it
0257       if ((processedEventPerLs_ >= minEventsPerLs_) && (!fiterator_.lumiReady()) &&
0258           (fiterator_.state() == State::EOR)) {
0259         closeFileImp_("graceful end-of-run");
0260         return false;
0261       }
0262 
0263       // skip to the next file if we have no files openned yet
0264       if (!file_.open()) {
0265         if (fiterator_.lumiReady()) {
0266           openNextFileImp_();
0267           // we might need to open once more (if .dat is missing)
0268           continue;
0269         }
0270       }
0271 
0272       // or if there is a next file and enough eventshas been processed.
0273       if (fiterator_.lumiReady() && (processedEventPerLs_ >= minEventsPerLs_)) {
0274         openNextFileImp_();
0275         // we might need to open once more (if .dat is missing)
0276         continue;
0277       }
0278 
0279       return true;
0280     }
0281   }
0282 
0283   /**
0284  * Waits and reads the event header.
0285  * If end-of-run nullptr is returned.
0286  */
0287   EventMsgView const* DQMStreamerReader::prepareNextEvent() {
0288     EventMsgView const* eview = nullptr;
0289     typedef DQMFileIterator::State State;
0290 
0291     // wait for the next event
0292     for (;;) {
0293       // edm::LogAbsolute("DQMStreamerReader")
0294       //     << "State loop.";
0295       bool next = prepareNextFile();
0296       if (!next)
0297         return nullptr;
0298 
0299       // sleep
0300       if (!file_.open()) {
0301         // the reader does not exist
0302         fiterator_.delay();
0303       } else {
0304         // our reader exists, try to read out an event
0305         eview = getEventMsg();
0306 
0307         if (eview == nullptr) {
0308           // read unsuccessful
0309           // this means end of file, so close the file
0310           closeFileImp_("eof");
0311         } else {
0312           //NOTE: at this point need to see if meta data checksum changed. If it did
0313           // we need to issue a 'new File' transition
0314           if (eview->isEventMetaData()) {
0315             auto lastEventMetaData = presentEventMetaDataChecksum();
0316             if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
0317               deserializeEventMetaData(*eview);
0318               artificialFileBoundary_ = true;
0319               return nullptr;
0320             } else {
0321               //skipping
0322               eview = getEventMsg();
0323               assert((eview == nullptr) or (not eview->isEventMetaData()));
0324               if (eview == nullptr) {
0325                 closeFileImp_("eof");
0326                 continue;
0327               }
0328             }
0329           }
0330 
0331           if (!acceptEvent(eview)) {
0332             continue;
0333           } else {
0334             return eview;
0335           }
0336         }
0337       }
0338     }
0339     return eview;
0340   }
0341 
0342   /**
0343  * This is the actual code for checking the new event and/or deserializing it.
0344  */
0345   edm::RawInputSource::Next DQMStreamerReader::checkNext() {
0346     try {
0347       EventMsgView const* eview = prepareNextEvent();
0348       if (eview == nullptr) {
0349         if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
0350           return Next::kFile;
0351         }
0352         return Next::kStop;
0353       }
0354 
0355       deserializeEvent(*eview);
0356     } catch (const cms::Exception& e) {
0357       if (unitTest_) {
0358         throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::checkNext")
0359             << std::string("Can't deserialize event or registry data: ") + e.what()
0360             << "\n error: data file corrupted, rethrowing!";
0361       }
0362 
0363       // try to recover from corrupted files/events
0364       fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
0365       closeFileImp_("data file corrupted");
0366 
0367       // this is not optimal, but hopefully we won't catch this many times in a row
0368       return checkNext();
0369     }
0370 
0371     processedEventPerLs_ += 1;
0372 
0373     return Next::kEvent;
0374   }
0375 
0376   /**
0377  * If hlt trigger selection is '*', return a boolean variable to accept all
0378  * events
0379  */
0380   bool DQMStreamerReader::setAcceptAllEvt() {
0381     acceptAllEvt_ = false;
0382     for (auto hltPath : hltSel_) {
0383       hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0384                     hltPath.end());
0385       if (hltPath == "*") {
0386         acceptAllEvt_ = true;
0387         break;
0388       }
0389     }
0390     return acceptAllEvt_;
0391   }
0392 
0393   /**
0394  * Check if hlt selection matches any trigger name taken from the header file
0395  */
0396   bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
0397     matchTriggerSel_ = false;
0398     for (auto hltPath : hltSel_) {
0399       hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0400                     hltPath.end());
0401       auto const matches = edm::regexMatch(tnames, hltPath);
0402       if (not matches.empty()) {
0403         matchTriggerSel_ = true;
0404         break;
0405       }
0406     }
0407 
0408     if (not matchTriggerSel_) {
0409       edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
0410     }
0411 
0412     return matchTriggerSel_;
0413   }
0414 
0415   /**
0416  * Check the trigger path to accept event
0417  */
0418   bool DQMStreamerReader::acceptEvent(const EventMsgView* evtmsg) {
0419     if (acceptAllEvt_)
0420       return true;
0421     if (!matchTriggerSel_)
0422       return false;
0423 
0424     std::vector<unsigned char> hltTriggerBits_;
0425     int hltTriggerCount_ = evtmsg->hltCount();
0426     if (hltTriggerCount_ > 0) {
0427       hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
0428     }
0429     evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
0430 
0431     return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
0432   }
0433 
0434   void DQMStreamerReader::skip(int toSkip) {
0435     try {
0436       for (int i = 0; i != toSkip; ++i) {
0437         EventMsgView const* evMsg = prepareNextEvent();
0438 
0439         if (evMsg == nullptr) {
0440           return;
0441         }
0442       }
0443     } catch (const cms::Exception& e) {
0444       if (unitTest_) {
0445         throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::skip")
0446             << std::string("Can't deserialize registry data: ") + e.what()
0447             << "\n error: data file corrupted, rethrowing!";
0448       }
0449       // try to recover from corrupted files/events
0450       fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
0451       closeFileImp_("data file corrupted");
0452     }
0453   }
0454 
0455   void DQMStreamerReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0456     edm::ParameterSetDescription desc;
0457     desc.setComment("Reads events from streamer files.");
0458 
0459     desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
0460 
0461     desc.addUntracked<int>("minEventsPerLumi", 1)
0462         ->setComment(
0463             "Minimum number of events to process per lumisection, "
0464             "before switching to a new input file. If the next file "
0465             "does not yet exist, "
0466             "the number of processed events will be bigger.");
0467 
0468     desc.addUntracked<bool>("skipFirstLumis", false)
0469         ->setComment(
0470             "Skip (and ignore the minEventsPerLumi parameter) for the files "
0471             "which have been available at the begining of the processing. "
0472             "If set to true, the reader will open last available file for "
0473             "processing.");
0474 
0475     desc.addUntracked<bool>("deleteDatFiles", false)
0476         ->setComment(
0477             "Delete data files after they have been closed, in order to "
0478             "save disk space.");
0479 
0480     desc.addUntracked<bool>("endOfRunKills", false)
0481         ->setComment(
0482             "Kill the processing as soon as the end-of-run file appears, even if "
0483             "there are/will be unprocessed lumisections.");
0484 
0485     desc.addUntracked<bool>("unitTest", false)
0486         ->setComment("Kill the processing if the input data cannot be deserialized");
0487 
0488     // desc.addUntracked<unsigned int>("skipEvents", 0U)
0489     //    ->setComment("Skip the first 'skipEvents' events that otherwise would "
0490     //                 "have been processed.");
0491 
0492     // This next parameter is read in the base class, but its default value
0493     // depends on the derived class, so it is set here.
0494     desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0495 
0496     DQMFileIterator::fillDescription(desc);
0497     StreamerInputSource::fillDescription(desc);
0498     edm::EventSkipperByID::fillDescription(desc);
0499 
0500     descriptions.add("source", desc);
0501   }
0502 
0503 }  // namespace dqmservices
0504 
0505 #include "FWCore/Framework/interface/InputSourceMacros.h"
0506 #include "FWCore/Framework/interface/MakerMacros.h"
0507 
0508 typedef dqmservices::DQMStreamerReader DQMStreamerReader;
0509 DEFINE_FWK_INPUT_SOURCE(DQMStreamerReader);