File indexing completed on 2024-06-22 02:23:36
0001 #include "DQMStreamerReader.h"
0002
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Sources/interface/EventSkipperByID.h"
0008 #include "FWCore/Utilities/interface/Exception.h"
0009 #include "FWCore/Utilities/interface/RegexMatch.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 #include "IOPool/Streamer/interface/DumpTools.h"
0012
0013 #include <cstdlib>
0014 #include <filesystem>
0015 #include <fstream>
0016 #include <queue>
0017 #include <algorithm>
0018 #include <cctype>
0019
0020 namespace dqmservices {
0021 using namespace edm::streamer;
0022
0023 DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0024 : StreamerInputSource(pset, desc),
0025 fiterator_(pset),
0026 minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
0027 flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0028 flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0029 flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0030 hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")),
0031 unitTest_(pset.getUntrackedParameter<bool>("unitTest", false)) {
0032 setAcceptAllEvt();
0033 reset_();
0034 }
0035
0036 DQMStreamerReader::~DQMStreamerReader() {
0037
0038
0039
0040
0041
0042 }
0043
0044 void DQMStreamerReader::reset_() {
0045
0046
0047
0048
0049
0050
0051
0052 fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
0053
0054 fiterator_.update_state();
0055
0056
0057 if (flagSkipFirstLumis_) {
0058 unsigned int l = fiterator_.lastLumiFound();
0059 if (l > 1) {
0060 fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
0061 }
0062 }
0063
0064 for (;;) {
0065 bool next = prepareNextFile();
0066
0067
0068 if (!next) {
0069 fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
0070 return;
0071 }
0072
0073
0074 if (file_.open()) {
0075
0076 break;
0077 }
0078
0079
0080 fiterator_.delay();
0081 }
0082
0083 fiterator_.logFileAction("DQMStreamerReader initialised.");
0084 }
0085
0086 void DQMStreamerReader::setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent) {
0087 deserializeAndMergeWithRegistry(msg, subsequent);
0088 auto event = getEventMsg();
0089
0090 if (not event)
0091 return;
0092 assert(event->isEventMetaData());
0093 deserializeEventMetaData(*event);
0094 updateEventMetaData();
0095 }
0096 void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
0097 processedEventPerLs_ = 0;
0098
0099 std::string path = entry.get_data_path();
0100
0101 file_.lumi_ = entry;
0102 file_.streamFile_ = std::make_unique<StreamerInputFile>(path);
0103
0104 InitMsgView const* header = getHeaderMsg();
0105 if (isFirstFile_) {
0106 setupMetaData(*header, false);
0107 }
0108
0109
0110
0111
0112
0113 if (!acceptAllEvt_) {
0114 std::vector<std::string> tnames;
0115 header->hltTriggerNames(tnames);
0116
0117 triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
0118
0119
0120 setMatchTriggerSel(tnames);
0121 }
0122
0123
0124 processedEventPerLs_ = 0;
0125
0126 if (flagDeleteDatFiles_) {
0127
0128 unlink(path.c_str());
0129 }
0130 }
0131
0132 void DQMStreamerReader::genuineCloseFile() {}
0133
0134 void DQMStreamerReader::closeFileImp_(const std::string& reason) {
0135 if (file_.open()) {
0136 file_.streamFile_->closeStreamerFile();
0137 file_.streamFile_ = nullptr;
0138
0139 fiterator_.logLumiState(file_.lumi_, "close: " + reason);
0140 }
0141 }
0142
0143 void DQMStreamerReader::genuineReadFile() {
0144 if (isFirstFile_) {
0145
0146 isFirstFile_ = false;
0147 return;
0148 }
0149
0150 if (artificialFileBoundary_) {
0151 updateEventMetaData();
0152 artificialFileBoundary_ = false;
0153 return;
0154 }
0155
0156 InitMsgView const* header = getHeaderMsg();
0157 setupMetaData(*header, true);
0158 }
0159
0160 bool DQMStreamerReader::openNextFileImp_() {
0161 closeFileImp_("skipping to another file");
0162
0163 DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
0164 std::string p = currentLumi.get_data_path();
0165
0166 if (std::filesystem::exists(p)) {
0167 try {
0168 openFileImp_(currentLumi);
0169 return true;
0170 } catch (const cms::Exception& e) {
0171 if (unitTest_) {
0172 throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::openNextFileInp")
0173 << std::string("Can't deserialize registry data (in open file): ") + e.what()
0174 << "\n error: data file corrupted";
0175 }
0176
0177 fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
0178 fiterator_.logLumiState(currentLumi, "error: data file corrupted");
0179
0180 closeFileImp_("data file corrupted");
0181 return false;
0182 }
0183 } else {
0184
0185 fiterator_.logFileAction("Data file (specified in json) is missing:", p);
0186 fiterator_.logLumiState(currentLumi, "error: data file missing");
0187
0188 return false;
0189 }
0190 }
0191
0192 InitMsgView const* DQMStreamerReader::getHeaderMsg() {
0193 InitMsgView const* header = file_.streamFile_->startMessage();
0194
0195 if (header->code() != Header::INIT) {
0196 throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
0197 << "received wrong message type: expected INIT, got " << header->code() << "\n";
0198 }
0199
0200 return header;
0201 }
0202
0203 EventMsgView const* DQMStreamerReader::getEventMsg() {
0204 auto next = file_.streamFile_->next();
0205 if (StreamerInputFile::Next::kFile == next) {
0206 return nullptr;
0207 }
0208
0209 if (StreamerInputFile::Next::kStop == next) {
0210 return nullptr;
0211 }
0212
0213 EventMsgView const* msg = file_.streamFile_->currentRecord();
0214
0215
0216 return msg;
0217 }
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229 bool DQMStreamerReader::prepareNextFile() {
0230 typedef DQMFileIterator::State State;
0231
0232 for (;;) {
0233 fiterator_.update_state();
0234
0235 if (edm::shutdown_flag.load()) {
0236 fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0237
0238 closeFileImp_("shutdown flag is set");
0239 return false;
0240 }
0241
0242
0243 if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0244 closeFileImp_("forced end-of-run");
0245 return false;
0246 }
0247
0248
0249
0250 if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0251 return false;
0252 }
0253
0254
0255
0256 if ((processedEventPerLs_ >= minEventsPerLs_) && (!fiterator_.lumiReady()) &&
0257 (fiterator_.state() == State::EOR)) {
0258 closeFileImp_("graceful end-of-run");
0259 return false;
0260 }
0261
0262
0263 if (!file_.open()) {
0264 if (fiterator_.lumiReady()) {
0265 openNextFileImp_();
0266
0267 continue;
0268 }
0269 }
0270
0271
0272 if (fiterator_.lumiReady() && (processedEventPerLs_ >= minEventsPerLs_)) {
0273 openNextFileImp_();
0274
0275 continue;
0276 }
0277
0278 return true;
0279 }
0280 }
0281
0282
0283
0284
0285
0286 EventMsgView const* DQMStreamerReader::prepareNextEvent() {
0287 EventMsgView const* eview = nullptr;
0288 typedef DQMFileIterator::State State;
0289
0290
0291 for (;;) {
0292
0293
0294 bool next = prepareNextFile();
0295 if (!next)
0296 return nullptr;
0297
0298
0299 if (!file_.open()) {
0300
0301 fiterator_.delay();
0302 } else {
0303
0304 eview = getEventMsg();
0305
0306 if (eview == nullptr) {
0307
0308
0309 closeFileImp_("eof");
0310 } else {
0311
0312
0313 if (eview->isEventMetaData()) {
0314 auto lastEventMetaData = presentEventMetaDataChecksum();
0315 if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
0316 deserializeEventMetaData(*eview);
0317 artificialFileBoundary_ = true;
0318 return nullptr;
0319 } else {
0320
0321 eview = getEventMsg();
0322 assert((eview == nullptr) or (not eview->isEventMetaData()));
0323 if (eview == nullptr) {
0324 closeFileImp_("eof");
0325 continue;
0326 }
0327 }
0328 }
0329
0330 if (!acceptEvent(eview)) {
0331 continue;
0332 } else {
0333 return eview;
0334 }
0335 }
0336 }
0337 }
0338 return eview;
0339 }
0340
0341
0342
0343
0344 edm::RawInputSource::Next DQMStreamerReader::checkNext() {
0345 try {
0346 EventMsgView const* eview = prepareNextEvent();
0347 if (eview == nullptr) {
0348 if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
0349 return Next::kFile;
0350 }
0351 return Next::kStop;
0352 }
0353
0354 deserializeEvent(*eview);
0355 } catch (const cms::Exception& e) {
0356
0357 fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
0358 closeFileImp_("data file corrupted");
0359
0360
0361 return checkNext();
0362 }
0363
0364 processedEventPerLs_ += 1;
0365
0366 return Next::kEvent;
0367 }
0368
0369
0370
0371
0372
0373 bool DQMStreamerReader::setAcceptAllEvt() {
0374 acceptAllEvt_ = false;
0375 for (auto hltPath : hltSel_) {
0376 hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0377 hltPath.end());
0378 if (hltPath == "*") {
0379 acceptAllEvt_ = true;
0380 break;
0381 }
0382 }
0383 return acceptAllEvt_;
0384 }
0385
0386
0387
0388
0389 bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
0390 matchTriggerSel_ = false;
0391 for (auto hltPath : hltSel_) {
0392 hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0393 hltPath.end());
0394 auto const matches = edm::regexMatch(tnames, hltPath);
0395 if (not matches.empty()) {
0396 matchTriggerSel_ = true;
0397 break;
0398 }
0399 }
0400
0401 if (not matchTriggerSel_) {
0402 edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
0403 }
0404
0405 return matchTriggerSel_;
0406 }
0407
0408
0409
0410
0411 bool DQMStreamerReader::acceptEvent(const EventMsgView* evtmsg) {
0412 if (acceptAllEvt_)
0413 return true;
0414 if (!matchTriggerSel_)
0415 return false;
0416
0417 std::vector<unsigned char> hltTriggerBits_;
0418 int hltTriggerCount_ = evtmsg->hltCount();
0419 if (hltTriggerCount_ > 0) {
0420 hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
0421 }
0422 evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
0423
0424 return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
0425 }
0426
0427 void DQMStreamerReader::skip(int toSkip) {
0428 try {
0429 for (int i = 0; i != toSkip; ++i) {
0430 EventMsgView const* evMsg = prepareNextEvent();
0431
0432 if (evMsg == nullptr) {
0433 return;
0434 }
0435 }
0436 } catch (const cms::Exception& e) {
0437
0438 fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
0439 closeFileImp_("data file corrupted");
0440 }
0441 }
0442
0443 void DQMStreamerReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0444 edm::ParameterSetDescription desc;
0445 desc.setComment("Reads events from streamer files.");
0446
0447 desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
0448
0449 desc.addUntracked<int>("minEventsPerLumi", 1)
0450 ->setComment(
0451 "Minimum number of events to process per lumisection, "
0452 "before switching to a new input file. If the next file "
0453 "does not yet exist, "
0454 "the number of processed events will be bigger.");
0455
0456 desc.addUntracked<bool>("skipFirstLumis", false)
0457 ->setComment(
0458 "Skip (and ignore the minEventsPerLumi parameter) for the files "
0459 "which have been available at the begining of the processing. "
0460 "If set to true, the reader will open last available file for "
0461 "processing.");
0462
0463 desc.addUntracked<bool>("deleteDatFiles", false)
0464 ->setComment(
0465 "Delete data files after they have been closed, in order to "
0466 "save disk space.");
0467
0468 desc.addUntracked<bool>("endOfRunKills", false)
0469 ->setComment(
0470 "Kill the processing as soon as the end-of-run file appears, even if "
0471 "there are/will be unprocessed lumisections.");
0472
0473 desc.addUntracked<bool>("unitTest", false)
0474 ->setComment("Kill the processing if the input data cannot be deserialized");
0475
0476
0477
0478
0479
0480
0481
0482 desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0483
0484 DQMFileIterator::fillDescription(desc);
0485 StreamerInputSource::fillDescription(desc);
0486 edm::EventSkipperByID::fillDescription(desc);
0487
0488 descriptions.add("source", desc);
0489 }
0490
0491 }
0492
0493 #include "FWCore/Framework/interface/InputSourceMacros.h"
0494 #include "FWCore/Framework/interface/MakerMacros.h"
0495
0496 typedef dqmservices::DQMStreamerReader DQMStreamerReader;
0497 DEFINE_FWK_INPUT_SOURCE(DQMStreamerReader);