File indexing completed on 2025-02-26 04:25:12
0001 #include "DQMStreamerReader.h"
0002
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Sources/interface/EventSkipperByID.h"
0008 #include "FWCore/Utilities/interface/Exception.h"
0009 #include "FWCore/Utilities/interface/RegexMatch.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 #include "IOPool/Streamer/interface/DumpTools.h"
0012
0013 #include <algorithm>
0014 #include <cctype>
0015 #include <cstdlib>
0016 #include <filesystem>
0017 #include <fstream>
0018 #include <memory>
0019 #include <queue>
0020
0021 namespace dqmservices {
0022 using namespace edm::streamer;
0023
0024 DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0025 : StreamerInputSource(pset, desc),
0026 fiterator_(pset),
0027 minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
0028 flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0029 flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0030 flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0031 hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")),
0032 unitTest_(pset.getUntrackedParameter<bool>("unitTest", false)) {
0033 setAcceptAllEvt();
0034 reset_();
0035 }
0036
0037 DQMStreamerReader::~DQMStreamerReader() {
0038
0039
0040
0041
0042
0043 }
0044
0045 void DQMStreamerReader::reset_() {
0046
0047
0048
0049
0050
0051
0052
0053 fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
0054
0055 fiterator_.update_state();
0056
0057
0058 if (flagSkipFirstLumis_) {
0059 unsigned int l = fiterator_.lastLumiFound();
0060 if (l > 1) {
0061 fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
0062 }
0063 }
0064
0065 for (;;) {
0066 bool next = prepareNextFile();
0067
0068
0069 if (!next) {
0070 fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
0071 return;
0072 }
0073
0074
0075 if (file_.open()) {
0076
0077 break;
0078 }
0079
0080
0081 fiterator_.delay();
0082 }
0083
0084 fiterator_.logFileAction("DQMStreamerReader initialised.");
0085 }
0086
0087 void DQMStreamerReader::setupMetaData(edm::streamer::InitMsgView const& msg, bool subsequent) {
0088 deserializeAndMergeWithRegistry(msg, subsequent);
0089 auto event = getEventMsg();
0090
0091 if (not event)
0092 return;
0093 assert(event->isEventMetaData());
0094 deserializeEventMetaData(*event);
0095 updateEventMetaData();
0096 }
0097 void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
0098 processedEventPerLs_ = 0;
0099
0100 std::string path = entry.get_data_path();
0101
0102 file_.lumi_ = entry;
0103 file_.streamFile_ = std::make_unique<StreamerInputFile>(path);
0104
0105 InitMsgView const* header = getHeaderMsg();
0106 if (isFirstFile_) {
0107 setupMetaData(*header, false);
0108 }
0109
0110
0111
0112
0113
0114 if (!acceptAllEvt_) {
0115 std::vector<std::string> tnames;
0116 header->hltTriggerNames(tnames);
0117
0118 triggerSelector_ = std::make_shared<TriggerSelector>(hltSel_, tnames);
0119
0120
0121 setMatchTriggerSel(tnames);
0122 }
0123
0124
0125 processedEventPerLs_ = 0;
0126
0127 if (flagDeleteDatFiles_) {
0128
0129 unlink(path.c_str());
0130 }
0131 }
0132
0133 void DQMStreamerReader::genuineCloseFile() {}
0134
0135 void DQMStreamerReader::closeFileImp_(const std::string& reason) {
0136 if (file_.open()) {
0137 file_.streamFile_->closeStreamerFile();
0138 file_.streamFile_ = nullptr;
0139
0140 fiterator_.logLumiState(file_.lumi_, "close: " + reason);
0141 }
0142 }
0143
0144 void DQMStreamerReader::genuineReadFile() {
0145 if (isFirstFile_) {
0146
0147 isFirstFile_ = false;
0148 return;
0149 }
0150
0151 if (artificialFileBoundary_) {
0152 updateEventMetaData();
0153 artificialFileBoundary_ = false;
0154 return;
0155 }
0156
0157 InitMsgView const* header = getHeaderMsg();
0158 setupMetaData(*header, true);
0159 }
0160
0161 bool DQMStreamerReader::openNextFileImp_() {
0162 closeFileImp_("skipping to another file");
0163
0164 DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
0165 std::string p = currentLumi.get_data_path();
0166
0167 if (std::filesystem::exists(p)) {
0168 try {
0169 openFileImp_(currentLumi);
0170 return true;
0171 } catch (const cms::Exception& e) {
0172 if (unitTest_) {
0173 throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::openNextFileInp")
0174 << std::string("Can't deserialize registry data (in open file): ") + e.what()
0175 << "\n error: data file corrupted, rethrowing!";
0176 }
0177
0178 fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
0179 fiterator_.logLumiState(currentLumi, "error: data file corrupted");
0180
0181 closeFileImp_("data file corrupted");
0182 return false;
0183 }
0184 } else {
0185
0186 fiterator_.logFileAction("Data file (specified in json) is missing:", p);
0187 fiterator_.logLumiState(currentLumi, "error: data file missing");
0188
0189 return false;
0190 }
0191 }
0192
0193 InitMsgView const* DQMStreamerReader::getHeaderMsg() {
0194 InitMsgView const* header = file_.streamFile_->startMessage();
0195
0196 if (header->code() != Header::INIT) {
0197 throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
0198 << "received wrong message type: expected INIT, got " << header->code() << "\n";
0199 }
0200
0201 return header;
0202 }
0203
0204 EventMsgView const* DQMStreamerReader::getEventMsg() {
0205 auto next = file_.streamFile_->next();
0206 if (StreamerInputFile::Next::kFile == next) {
0207 return nullptr;
0208 }
0209
0210 if (StreamerInputFile::Next::kStop == next) {
0211 return nullptr;
0212 }
0213
0214 EventMsgView const* msg = file_.streamFile_->currentRecord();
0215
0216
0217 return msg;
0218 }
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230 bool DQMStreamerReader::prepareNextFile() {
0231 typedef DQMFileIterator::State State;
0232
0233 for (;;) {
0234 fiterator_.update_state();
0235
0236 if (edm::shutdown_flag.load()) {
0237 fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0238
0239 closeFileImp_("shutdown flag is set");
0240 return false;
0241 }
0242
0243
0244 if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0245 closeFileImp_("forced end-of-run");
0246 return false;
0247 }
0248
0249
0250
0251 if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0252 return false;
0253 }
0254
0255
0256
0257 if ((processedEventPerLs_ >= minEventsPerLs_) && (!fiterator_.lumiReady()) &&
0258 (fiterator_.state() == State::EOR)) {
0259 closeFileImp_("graceful end-of-run");
0260 return false;
0261 }
0262
0263
0264 if (!file_.open()) {
0265 if (fiterator_.lumiReady()) {
0266 openNextFileImp_();
0267
0268 continue;
0269 }
0270 }
0271
0272
0273 if (fiterator_.lumiReady() && (processedEventPerLs_ >= minEventsPerLs_)) {
0274 openNextFileImp_();
0275
0276 continue;
0277 }
0278
0279 return true;
0280 }
0281 }
0282
0283
0284
0285
0286
0287 EventMsgView const* DQMStreamerReader::prepareNextEvent() {
0288 EventMsgView const* eview = nullptr;
0289 typedef DQMFileIterator::State State;
0290
0291
0292 for (;;) {
0293
0294
0295 bool next = prepareNextFile();
0296 if (!next)
0297 return nullptr;
0298
0299
0300 if (!file_.open()) {
0301
0302 fiterator_.delay();
0303 } else {
0304
0305 eview = getEventMsg();
0306
0307 if (eview == nullptr) {
0308
0309
0310 closeFileImp_("eof");
0311 } else {
0312
0313
0314 if (eview->isEventMetaData()) {
0315 auto lastEventMetaData = presentEventMetaDataChecksum();
0316 if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
0317 deserializeEventMetaData(*eview);
0318 artificialFileBoundary_ = true;
0319 return nullptr;
0320 } else {
0321
0322 eview = getEventMsg();
0323 assert((eview == nullptr) or (not eview->isEventMetaData()));
0324 if (eview == nullptr) {
0325 closeFileImp_("eof");
0326 continue;
0327 }
0328 }
0329 }
0330
0331 if (!acceptEvent(eview)) {
0332 continue;
0333 } else {
0334 return eview;
0335 }
0336 }
0337 }
0338 }
0339 return eview;
0340 }
0341
0342
0343
0344
0345 edm::RawInputSource::Next DQMStreamerReader::checkNext() {
0346 try {
0347 EventMsgView const* eview = prepareNextEvent();
0348 if (eview == nullptr) {
0349 if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
0350 return Next::kFile;
0351 }
0352 return Next::kStop;
0353 }
0354
0355 deserializeEvent(*eview);
0356 } catch (const cms::Exception& e) {
0357 if (unitTest_) {
0358 throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::checkNext")
0359 << std::string("Can't deserialize event or registry data: ") + e.what()
0360 << "\n error: data file corrupted, rethrowing!";
0361 }
0362
0363
0364 fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
0365 closeFileImp_("data file corrupted");
0366
0367
0368 return checkNext();
0369 }
0370
0371 processedEventPerLs_ += 1;
0372
0373 return Next::kEvent;
0374 }
0375
0376
0377
0378
0379
0380 bool DQMStreamerReader::setAcceptAllEvt() {
0381 acceptAllEvt_ = false;
0382 for (auto hltPath : hltSel_) {
0383 hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0384 hltPath.end());
0385 if (hltPath == "*") {
0386 acceptAllEvt_ = true;
0387 break;
0388 }
0389 }
0390 return acceptAllEvt_;
0391 }
0392
0393
0394
0395
0396 bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
0397 matchTriggerSel_ = false;
0398 for (auto hltPath : hltSel_) {
0399 hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0400 hltPath.end());
0401 auto const matches = edm::regexMatch(tnames, hltPath);
0402 if (not matches.empty()) {
0403 matchTriggerSel_ = true;
0404 break;
0405 }
0406 }
0407
0408 if (not matchTriggerSel_) {
0409 edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
0410 }
0411
0412 return matchTriggerSel_;
0413 }
0414
0415
0416
0417
0418 bool DQMStreamerReader::acceptEvent(const EventMsgView* evtmsg) {
0419 if (acceptAllEvt_)
0420 return true;
0421 if (!matchTriggerSel_)
0422 return false;
0423
0424 std::vector<unsigned char> hltTriggerBits_;
0425 int hltTriggerCount_ = evtmsg->hltCount();
0426 if (hltTriggerCount_ > 0) {
0427 hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
0428 }
0429 evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
0430
0431 return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
0432 }
0433
0434 void DQMStreamerReader::skip(int toSkip) {
0435 try {
0436 for (int i = 0; i != toSkip; ++i) {
0437 EventMsgView const* evMsg = prepareNextEvent();
0438
0439 if (evMsg == nullptr) {
0440 return;
0441 }
0442 }
0443 } catch (const cms::Exception& e) {
0444 if (unitTest_) {
0445 throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::skip")
0446 << std::string("Can't deserialize registry data: ") + e.what()
0447 << "\n error: data file corrupted, rethrowing!";
0448 }
0449
0450 fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
0451 closeFileImp_("data file corrupted");
0452 }
0453 }
0454
0455 void DQMStreamerReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0456 edm::ParameterSetDescription desc;
0457 desc.setComment("Reads events from streamer files.");
0458
0459 desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
0460
0461 desc.addUntracked<int>("minEventsPerLumi", 1)
0462 ->setComment(
0463 "Minimum number of events to process per lumisection, "
0464 "before switching to a new input file. If the next file "
0465 "does not yet exist, "
0466 "the number of processed events will be bigger.");
0467
0468 desc.addUntracked<bool>("skipFirstLumis", false)
0469 ->setComment(
0470 "Skip (and ignore the minEventsPerLumi parameter) for the files "
0471 "which have been available at the begining of the processing. "
0472 "If set to true, the reader will open last available file for "
0473 "processing.");
0474
0475 desc.addUntracked<bool>("deleteDatFiles", false)
0476 ->setComment(
0477 "Delete data files after they have been closed, in order to "
0478 "save disk space.");
0479
0480 desc.addUntracked<bool>("endOfRunKills", false)
0481 ->setComment(
0482 "Kill the processing as soon as the end-of-run file appears, even if "
0483 "there are/will be unprocessed lumisections.");
0484
0485 desc.addUntracked<bool>("unitTest", false)
0486 ->setComment("Kill the processing if the input data cannot be deserialized");
0487
0488
0489
0490
0491
0492
0493
0494 desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0495
0496 DQMFileIterator::fillDescription(desc);
0497 StreamerInputSource::fillDescription(desc);
0498 edm::EventSkipperByID::fillDescription(desc);
0499
0500 descriptions.add("source", desc);
0501 }
0502
0503 }
0504
0505 #include "FWCore/Framework/interface/InputSourceMacros.h"
0506 #include "FWCore/Framework/interface/MakerMacros.h"
0507
0508 typedef dqmservices::DQMStreamerReader DQMStreamerReader;
0509 DEFINE_FWK_INPUT_SOURCE(DQMStreamerReader);