Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-06-22 02:23:36

0001 #include "DQMStreamerReader.h"
0002 
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Sources/interface/EventSkipperByID.h"
0008 #include "FWCore/Utilities/interface/Exception.h"
0009 #include "FWCore/Utilities/interface/RegexMatch.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 #include "IOPool/Streamer/interface/DumpTools.h"
0012 
0013 #include <cstdlib>
0014 #include <filesystem>
0015 #include <fstream>
0016 #include <queue>
0017 #include <algorithm>
0018 #include <cctype>
0019 
0020 namespace dqmservices {
0021   using namespace edm::streamer;
0022 
0023   DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0024       : StreamerInputSource(pset, desc),
0025         fiterator_(pset),
0026         minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
0027         flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0028         flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0029         flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0030         hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")),
0031         unitTest_(pset.getUntrackedParameter<bool>("unitTest", false)) {
0032     setAcceptAllEvt();
0033     reset_();
0034   }
0035 
0036   DQMStreamerReader::~DQMStreamerReader() {
0037     // Sometimes(?) the destructor called after service registry was already destructed
0038     // and closeFile_ throws away no ServiceRegistry found exception...
0039     //
0040     // Normally, this file should be closed before this destructor is called.
0041     //closeFileImp_("destructor");
0042   }
0043 
0044   void DQMStreamerReader::reset_() {
0045     // We have to load at least a single header,
0046     // so the ProductRegistry gets initialized.
0047     //
0048     // This must happen here (inside the constructor),
0049     // as ProductRegistry gets frozen after we initialize:
0050     // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
0051 
0052     fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
0053 
0054     fiterator_.update_state();
0055 
0056     // Fast-forward to the last open file.
0057     if (flagSkipFirstLumis_) {
0058       unsigned int l = fiterator_.lastLumiFound();
0059       if (l > 1) {
0060         fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
0061       }
0062     }
0063 
0064     for (;;) {
0065       bool next = prepareNextFile();
0066 
0067       // check for end of run
0068       if (!next) {
0069         fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
0070         return;
0071       }
0072 
0073       // check if we have a file openned
0074       if (file_.open()) {
0075         // we are now initialised
0076         break;
0077       }
0078 
0079       // wait
0080       fiterator_.delay();
0081     }
0082 
0083     fiterator_.logFileAction("DQMStreamerReader initialised.");
0084   }
0085 
0086   void DQMStreamerReader::setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent) {
0087     deserializeAndMergeWithRegistry(msg, subsequent);
0088     auto event = getEventMsg();
0089     //file might be empty
0090     if (not event)
0091       return;
0092     assert(event->isEventMetaData());
0093     deserializeEventMetaData(*event);
0094     updateEventMetaData();
0095   }
0096   void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
0097     processedEventPerLs_ = 0;
0098 
0099     std::string path = entry.get_data_path();
0100 
0101     file_.lumi_ = entry;
0102     file_.streamFile_ = std::make_unique<StreamerInputFile>(path);
0103 
0104     InitMsgView const* header = getHeaderMsg();
0105     if (isFirstFile_) {
0106       setupMetaData(*header, false);
0107     }
0108 
0109     // dump the list of HLT trigger name from the header
0110     //  dumpInitHeader(header);
0111 
0112     // if specific trigger selection is requested, check if the requested triggers match with trigger paths in the header file
0113     if (!acceptAllEvt_) {
0114       std::vector<std::string> tnames;
0115       header->hltTriggerNames(tnames);
0116 
0117       triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
0118 
0119       // check if any trigger path name requested matches with trigger name in the header file
0120       setMatchTriggerSel(tnames);
0121     }
0122 
0123     // our initialization
0124     processedEventPerLs_ = 0;
0125 
0126     if (flagDeleteDatFiles_) {
0127       // unlink the file
0128       unlink(path.c_str());
0129     }
0130   }
0131 
0132   void DQMStreamerReader::genuineCloseFile() {}
0133 
0134   void DQMStreamerReader::closeFileImp_(const std::string& reason) {
0135     if (file_.open()) {
0136       file_.streamFile_->closeStreamerFile();
0137       file_.streamFile_ = nullptr;
0138 
0139       fiterator_.logLumiState(file_.lumi_, "close: " + reason);
0140     }
0141   }
0142 
0143   void DQMStreamerReader::genuineReadFile() {
0144     if (isFirstFile_) {
0145       //The file was already opened in the constructor
0146       isFirstFile_ = false;
0147       return;
0148     }
0149 
0150     if (artificialFileBoundary_) {
0151       updateEventMetaData();
0152       artificialFileBoundary_ = false;
0153       return;
0154     }
0155     //Get header/init from reader
0156     InitMsgView const* header = getHeaderMsg();
0157     setupMetaData(*header, true);
0158   }
0159 
0160   bool DQMStreamerReader::openNextFileImp_() {
0161     closeFileImp_("skipping to another file");
0162 
0163     DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
0164     std::string p = currentLumi.get_data_path();
0165 
0166     if (std::filesystem::exists(p)) {
0167       try {
0168         openFileImp_(currentLumi);
0169         return true;
0170       } catch (const cms::Exception& e) {
0171         if (unitTest_) {
0172           throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::openNextFileInp")
0173               << std::string("Can't deserialize registry data (in open file): ") + e.what()
0174               << "\n error: data file corrupted";
0175         }
0176 
0177         fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
0178         fiterator_.logLumiState(currentLumi, "error: data file corrupted");
0179 
0180         closeFileImp_("data file corrupted");
0181         return false;
0182       }
0183     } else {
0184       /* dat file missing */
0185       fiterator_.logFileAction("Data file (specified in json) is missing:", p);
0186       fiterator_.logLumiState(currentLumi, "error: data file missing");
0187 
0188       return false;
0189     }
0190   }
0191 
0192   InitMsgView const* DQMStreamerReader::getHeaderMsg() {
0193     InitMsgView const* header = file_.streamFile_->startMessage();
0194 
0195     if (header->code() != Header::INIT) {  // INIT Msg
0196       throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
0197           << "received wrong message type: expected INIT, got " << header->code() << "\n";
0198     }
0199 
0200     return header;
0201   }
0202 
0203   EventMsgView const* DQMStreamerReader::getEventMsg() {
0204     auto next = file_.streamFile_->next();
0205     if (StreamerInputFile::Next::kFile == next) {
0206       return nullptr;
0207     }
0208 
0209     if (StreamerInputFile::Next::kStop == next) {
0210       return nullptr;
0211     }
0212 
0213     EventMsgView const* msg = file_.streamFile_->currentRecord();
0214 
0215     //  if (msg != nullptr) dumpEventView(msg);
0216     return msg;
0217   }
0218 
0219   /**
0220  * Prepare (open) the next file for reading.
0221  * It is used by prepareNextEvent and in the constructor.
0222  *
0223  * Does not block/wait.
0224  *
0225  * Return false if this is end of run and/or no more file are available.
0226  * However, return of "true" does not imply the file has been openned,
0227  * but we need to wait until some future file becomes available.
0228  */
0229   bool DQMStreamerReader::prepareNextFile() {
0230     typedef DQMFileIterator::State State;
0231 
0232     for (;;) {
0233       fiterator_.update_state();
0234 
0235       if (edm::shutdown_flag.load()) {
0236         fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0237 
0238         closeFileImp_("shutdown flag is set");
0239         return false;
0240       }
0241 
0242       // check for end of run file and force quit
0243       if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0244         closeFileImp_("forced end-of-run");
0245         return false;
0246       }
0247 
0248       // check for end of run and quit if everything has been processed.
0249       // this clean exit
0250       if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0251         return false;
0252       }
0253 
0254       // if this is end of run and no more files to process
0255       // close it
0256       if ((processedEventPerLs_ >= minEventsPerLs_) && (!fiterator_.lumiReady()) &&
0257           (fiterator_.state() == State::EOR)) {
0258         closeFileImp_("graceful end-of-run");
0259         return false;
0260       }
0261 
0262       // skip to the next file if we have no files openned yet
0263       if (!file_.open()) {
0264         if (fiterator_.lumiReady()) {
0265           openNextFileImp_();
0266           // we might need to open once more (if .dat is missing)
0267           continue;
0268         }
0269       }
0270 
0271       // or if there is a next file and enough eventshas been processed.
0272       if (fiterator_.lumiReady() && (processedEventPerLs_ >= minEventsPerLs_)) {
0273         openNextFileImp_();
0274         // we might need to open once more (if .dat is missing)
0275         continue;
0276       }
0277 
0278       return true;
0279     }
0280   }
0281 
0282   /**
0283  * Waits and reads the event header.
0284  * If end-of-run nullptr is returned.
0285  */
0286   EventMsgView const* DQMStreamerReader::prepareNextEvent() {
0287     EventMsgView const* eview = nullptr;
0288     typedef DQMFileIterator::State State;
0289 
0290     // wait for the next event
0291     for (;;) {
0292       // edm::LogAbsolute("DQMStreamerReader")
0293       //     << "State loop.";
0294       bool next = prepareNextFile();
0295       if (!next)
0296         return nullptr;
0297 
0298       // sleep
0299       if (!file_.open()) {
0300         // the reader does not exist
0301         fiterator_.delay();
0302       } else {
0303         // our reader exists, try to read out an event
0304         eview = getEventMsg();
0305 
0306         if (eview == nullptr) {
0307           // read unsuccessful
0308           // this means end of file, so close the file
0309           closeFileImp_("eof");
0310         } else {
0311           //NOTE: at this point need to see if meta data checksum changed. If it did
0312           // we need to issue a 'new File' transition
0313           if (eview->isEventMetaData()) {
0314             auto lastEventMetaData = presentEventMetaDataChecksum();
0315             if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
0316               deserializeEventMetaData(*eview);
0317               artificialFileBoundary_ = true;
0318               return nullptr;
0319             } else {
0320               //skipping
0321               eview = getEventMsg();
0322               assert((eview == nullptr) or (not eview->isEventMetaData()));
0323               if (eview == nullptr) {
0324                 closeFileImp_("eof");
0325                 continue;
0326               }
0327             }
0328           }
0329 
0330           if (!acceptEvent(eview)) {
0331             continue;
0332           } else {
0333             return eview;
0334           }
0335         }
0336       }
0337     }
0338     return eview;
0339   }
0340 
0341   /**
0342  * This is the actual code for checking the new event and/or deserializing it.
0343  */
0344   edm::RawInputSource::Next DQMStreamerReader::checkNext() {
0345     try {
0346       EventMsgView const* eview = prepareNextEvent();
0347       if (eview == nullptr) {
0348         if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
0349           return Next::kFile;
0350         }
0351         return Next::kStop;
0352       }
0353 
0354       deserializeEvent(*eview);
0355     } catch (const cms::Exception& e) {
0356       // try to recover from corrupted files/events
0357       fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
0358       closeFileImp_("data file corrupted");
0359 
0360       // this is not optimal, but hopefully we won't catch this many times in a row
0361       return checkNext();
0362     }
0363 
0364     processedEventPerLs_ += 1;
0365 
0366     return Next::kEvent;
0367   }
0368 
0369   /**
0370  * If hlt trigger selection is '*', return a boolean variable to accept all
0371  * events
0372  */
0373   bool DQMStreamerReader::setAcceptAllEvt() {
0374     acceptAllEvt_ = false;
0375     for (auto hltPath : hltSel_) {
0376       hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0377                     hltPath.end());
0378       if (hltPath == "*") {
0379         acceptAllEvt_ = true;
0380         break;
0381       }
0382     }
0383     return acceptAllEvt_;
0384   }
0385 
0386   /**
0387  * Check if hlt selection matches any trigger name taken from the header file
0388  */
0389   bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
0390     matchTriggerSel_ = false;
0391     for (auto hltPath : hltSel_) {
0392       hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0393                     hltPath.end());
0394       auto const matches = edm::regexMatch(tnames, hltPath);
0395       if (not matches.empty()) {
0396         matchTriggerSel_ = true;
0397         break;
0398       }
0399     }
0400 
0401     if (not matchTriggerSel_) {
0402       edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
0403     }
0404 
0405     return matchTriggerSel_;
0406   }
0407 
0408   /**
0409  * Check the trigger path to accept event
0410  */
0411   bool DQMStreamerReader::acceptEvent(const EventMsgView* evtmsg) {
0412     if (acceptAllEvt_)
0413       return true;
0414     if (!matchTriggerSel_)
0415       return false;
0416 
0417     std::vector<unsigned char> hltTriggerBits_;
0418     int hltTriggerCount_ = evtmsg->hltCount();
0419     if (hltTriggerCount_ > 0) {
0420       hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
0421     }
0422     evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
0423 
0424     return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
0425   }
0426 
0427   void DQMStreamerReader::skip(int toSkip) {
0428     try {
0429       for (int i = 0; i != toSkip; ++i) {
0430         EventMsgView const* evMsg = prepareNextEvent();
0431 
0432         if (evMsg == nullptr) {
0433           return;
0434         }
0435       }
0436     } catch (const cms::Exception& e) {
0437       // try to recover from corrupted files/events
0438       fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
0439       closeFileImp_("data file corrupted");
0440     }
0441   }
0442 
0443   void DQMStreamerReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0444     edm::ParameterSetDescription desc;
0445     desc.setComment("Reads events from streamer files.");
0446 
0447     desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
0448 
0449     desc.addUntracked<int>("minEventsPerLumi", 1)
0450         ->setComment(
0451             "Minimum number of events to process per lumisection, "
0452             "before switching to a new input file. If the next file "
0453             "does not yet exist, "
0454             "the number of processed events will be bigger.");
0455 
0456     desc.addUntracked<bool>("skipFirstLumis", false)
0457         ->setComment(
0458             "Skip (and ignore the minEventsPerLumi parameter) for the files "
0459             "which have been available at the begining of the processing. "
0460             "If set to true, the reader will open last available file for "
0461             "processing.");
0462 
0463     desc.addUntracked<bool>("deleteDatFiles", false)
0464         ->setComment(
0465             "Delete data files after they have been closed, in order to "
0466             "save disk space.");
0467 
0468     desc.addUntracked<bool>("endOfRunKills", false)
0469         ->setComment(
0470             "Kill the processing as soon as the end-of-run file appears, even if "
0471             "there are/will be unprocessed lumisections.");
0472 
0473     desc.addUntracked<bool>("unitTest", false)
0474         ->setComment("Kill the processing if the input data cannot be deserialized");
0475 
0476     // desc.addUntracked<unsigned int>("skipEvents", 0U)
0477     //    ->setComment("Skip the first 'skipEvents' events that otherwise would "
0478     //                 "have been processed.");
0479 
0480     // This next parameter is read in the base class, but its default value
0481     // depends on the derived class, so it is set here.
0482     desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0483 
0484     DQMFileIterator::fillDescription(desc);
0485     StreamerInputSource::fillDescription(desc);
0486     edm::EventSkipperByID::fillDescription(desc);
0487 
0488     descriptions.add("source", desc);
0489   }
0490 
0491 }  // namespace dqmservices
0492 
0493 #include "FWCore/Framework/interface/InputSourceMacros.h"
0494 #include "FWCore/Framework/interface/MakerMacros.h"
0495 
0496 typedef dqmservices::DQMStreamerReader DQMStreamerReader;
0497 DEFINE_FWK_INPUT_SOURCE(DQMStreamerReader);