File indexing completed on 2023-10-25 09:44:59
0001 #include "DQMStreamerReader.h"
0002
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0005 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0006 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0007 #include "FWCore/Sources/interface/EventSkipperByID.h"
0008 #include "FWCore/Utilities/interface/Exception.h"
0009 #include "FWCore/Utilities/interface/RegexMatch.h"
0010 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0011 #include "IOPool/Streamer/interface/DumpTools.h"
0012
0013 #include <cstdlib>
0014 #include <filesystem>
0015 #include <fstream>
0016 #include <queue>
0017 #include <algorithm>
0018 #include <cctype>
0019
0020 namespace dqmservices {
0021
0022 DQMStreamerReader::DQMStreamerReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0023 : StreamerInputSource(pset, desc),
0024 fiterator_(pset),
0025 minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
0026 flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0027 flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0028 flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0029 hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")) {
0030 setAcceptAllEvt();
0031 reset_();
0032 }
0033
0034 DQMStreamerReader::~DQMStreamerReader() {
0035
0036
0037
0038
0039
0040 }
0041
0042 void DQMStreamerReader::reset_() {
0043
0044
0045
0046
0047
0048
0049
0050 fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
0051
0052 fiterator_.update_state();
0053
0054
0055 if (flagSkipFirstLumis_) {
0056 unsigned int l = fiterator_.lastLumiFound();
0057 if (l > 1) {
0058 fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
0059 }
0060 }
0061
0062 for (;;) {
0063 bool next = prepareNextFile();
0064
0065
0066 if (!next) {
0067 fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
0068 return;
0069 }
0070
0071
0072 if (file_.open()) {
0073
0074 break;
0075 }
0076
0077
0078 fiterator_.delay();
0079 }
0080
0081 fiterator_.logFileAction("DQMStreamerReader initialised.");
0082 }
0083
0084 void DQMStreamerReader::openFileImp_(const DQMFileIterator::LumiEntry& entry) {
0085 processedEventPerLs_ = 0;
0086
0087 std::string path = entry.get_data_path();
0088
0089 file_.lumi_ = entry;
0090 file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
0091
0092 InitMsgView const* header = getHeaderMsg();
0093 if (isFirstFile_) {
0094 deserializeAndMergeWithRegistry(*header, false);
0095 }
0096
0097
0098
0099
0100
0101 if (!acceptAllEvt_) {
0102 std::vector<std::string> tnames;
0103 header->hltTriggerNames(tnames);
0104
0105 triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
0106
0107
0108 setMatchTriggerSel(tnames);
0109 }
0110
0111
0112 processedEventPerLs_ = 0;
0113
0114 if (flagDeleteDatFiles_) {
0115
0116 unlink(path.c_str());
0117 }
0118 }
0119
0120 void DQMStreamerReader::genuineCloseFile() {}
0121
0122 void DQMStreamerReader::closeFileImp_(const std::string& reason) {
0123 if (file_.open()) {
0124 file_.streamFile_->closeStreamerFile();
0125 file_.streamFile_ = nullptr;
0126
0127 fiterator_.logLumiState(file_.lumi_, "close: " + reason);
0128 }
0129 }
0130
0131 void DQMStreamerReader::genuineReadFile() {
0132 if (isFirstFile_) {
0133
0134 isFirstFile_ = false;
0135 return;
0136 }
0137
0138
0139 InitMsgView const* header = getHeaderMsg();
0140 deserializeAndMergeWithRegistry(*header, true);
0141 }
0142
0143 bool DQMStreamerReader::openNextFileImp_() {
0144 closeFileImp_("skipping to another file");
0145
0146 DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
0147 std::string p = currentLumi.get_data_path();
0148
0149 if (std::filesystem::exists(p)) {
0150 try {
0151 openFileImp_(currentLumi);
0152 return true;
0153 } catch (const cms::Exception& e) {
0154 fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
0155 fiterator_.logLumiState(currentLumi, "error: data file corrupted");
0156
0157 closeFileImp_("data file corrupted");
0158 return false;
0159 }
0160 } else {
0161
0162 fiterator_.logFileAction("Data file (specified in json) is missing:", p);
0163 fiterator_.logLumiState(currentLumi, "error: data file missing");
0164
0165 return false;
0166 }
0167 }
0168
0169 InitMsgView const* DQMStreamerReader::getHeaderMsg() {
0170 InitMsgView const* header = file_.streamFile_->startMessage();
0171
0172 if (header->code() != Header::INIT) {
0173 throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
0174 << "received wrong message type: expected INIT, got " << header->code() << "\n";
0175 }
0176
0177 return header;
0178 }
0179
0180 EventMsgView const* DQMStreamerReader::getEventMsg() {
0181 auto next = file_.streamFile_->next();
0182 if (edm::StreamerInputFile::Next::kFile == next) {
0183 return nullptr;
0184 }
0185
0186 if (edm::StreamerInputFile::Next::kStop == next) {
0187 return nullptr;
0188 }
0189
0190 EventMsgView const* msg = file_.streamFile_->currentRecord();
0191
0192
0193 return msg;
0194 }
0195
0196
0197
0198
0199
0200
0201
0202
0203
0204
0205
0206 bool DQMStreamerReader::prepareNextFile() {
0207 typedef DQMFileIterator::State State;
0208
0209 for (;;) {
0210 fiterator_.update_state();
0211
0212 if (edm::shutdown_flag.load()) {
0213 fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0214
0215 closeFileImp_("shutdown flag is set");
0216 return false;
0217 }
0218
0219
0220 if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0221 closeFileImp_("forced end-of-run");
0222 return false;
0223 }
0224
0225
0226
0227 if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0228 return false;
0229 }
0230
0231
0232
0233 if ((processedEventPerLs_ >= minEventsPerLs_) && (!fiterator_.lumiReady()) &&
0234 (fiterator_.state() == State::EOR)) {
0235 closeFileImp_("graceful end-of-run");
0236 return false;
0237 }
0238
0239
0240 if (!file_.open()) {
0241 if (fiterator_.lumiReady()) {
0242 openNextFileImp_();
0243
0244 continue;
0245 }
0246 }
0247
0248
0249 if (fiterator_.lumiReady() && (processedEventPerLs_ >= minEventsPerLs_)) {
0250 openNextFileImp_();
0251
0252 continue;
0253 }
0254
0255 return true;
0256 }
0257 }
0258
0259
0260
0261
0262
0263 EventMsgView const* DQMStreamerReader::prepareNextEvent() {
0264 EventMsgView const* eview = nullptr;
0265 typedef DQMFileIterator::State State;
0266
0267
0268 for (;;) {
0269
0270
0271 bool next = prepareNextFile();
0272 if (!next)
0273 return nullptr;
0274
0275
0276 if (!file_.open()) {
0277
0278 fiterator_.delay();
0279 } else {
0280
0281 eview = getEventMsg();
0282
0283 if (eview == nullptr) {
0284
0285
0286 closeFileImp_("eof");
0287 } else {
0288 if (!acceptEvent(eview)) {
0289 continue;
0290 } else {
0291 return eview;
0292 }
0293 }
0294 }
0295 }
0296 return eview;
0297 }
0298
0299
0300
0301
0302 edm::RawInputSource::Next DQMStreamerReader::checkNext() {
0303 try {
0304 EventMsgView const* eview = prepareNextEvent();
0305 if (eview == nullptr) {
0306 if (file_.streamFile_ and file_.streamFile_->newHeader()) {
0307 return Next::kFile;
0308 }
0309 return Next::kStop;
0310 }
0311
0312 deserializeEvent(*eview);
0313 } catch (const cms::Exception& e) {
0314
0315 fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
0316 closeFileImp_("data file corrupted");
0317
0318
0319 return checkNext();
0320 }
0321
0322 processedEventPerLs_ += 1;
0323
0324 return Next::kEvent;
0325 }
0326
0327
0328
0329
0330
0331 bool DQMStreamerReader::setAcceptAllEvt() {
0332 acceptAllEvt_ = false;
0333 for (auto hltPath : hltSel_) {
0334 hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0335 hltPath.end());
0336 if (hltPath == "*") {
0337 acceptAllEvt_ = true;
0338 break;
0339 }
0340 }
0341 return acceptAllEvt_;
0342 }
0343
0344
0345
0346
0347 bool DQMStreamerReader::setMatchTriggerSel(std::vector<std::string> const& tnames) {
0348 matchTriggerSel_ = false;
0349 for (auto hltPath : hltSel_) {
0350 hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
0351 hltPath.end());
0352 auto const matches = edm::regexMatch(tnames, hltPath);
0353 if (not matches.empty()) {
0354 matchTriggerSel_ = true;
0355 break;
0356 }
0357 }
0358
0359 if (not matchTriggerSel_) {
0360 edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
0361 }
0362
0363 return matchTriggerSel_;
0364 }
0365
0366
0367
0368
0369 bool DQMStreamerReader::acceptEvent(const EventMsgView* evtmsg) {
0370 if (acceptAllEvt_)
0371 return true;
0372 if (!matchTriggerSel_)
0373 return false;
0374
0375 std::vector<unsigned char> hltTriggerBits_;
0376 int hltTriggerCount_ = evtmsg->hltCount();
0377 if (hltTriggerCount_ > 0) {
0378 hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
0379 }
0380 evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
0381
0382 return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
0383 }
0384
0385 void DQMStreamerReader::skip(int toSkip) {
0386 try {
0387 for (int i = 0; i != toSkip; ++i) {
0388 EventMsgView const* evMsg = prepareNextEvent();
0389
0390 if (evMsg == nullptr) {
0391 return;
0392 }
0393 }
0394 } catch (const cms::Exception& e) {
0395
0396 fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
0397 closeFileImp_("data file corrupted");
0398 }
0399 }
0400
0401 void DQMStreamerReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0402 edm::ParameterSetDescription desc;
0403 desc.setComment("Reads events from streamer files.");
0404
0405 desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
0406
0407 desc.addUntracked<int>("minEventsPerLumi", 1)
0408 ->setComment(
0409 "Minimum number of events to process per lumisection, "
0410 "before switching to a new input file. If the next file "
0411 "does not yet exist, "
0412 "the number of processed events will be bigger.");
0413
0414 desc.addUntracked<bool>("skipFirstLumis", false)
0415 ->setComment(
0416 "Skip (and ignore the minEventsPerLumi parameter) for the files "
0417 "which have been available at the begining of the processing. "
0418 "If set to true, the reader will open last available file for "
0419 "processing.");
0420
0421 desc.addUntracked<bool>("deleteDatFiles", false)
0422 ->setComment(
0423 "Delete data files after they have been closed, in order to "
0424 "save disk space.");
0425
0426 desc.addUntracked<bool>("endOfRunKills", false)
0427 ->setComment(
0428 "Kill the processing as soon as the end-of-run file appears, even if "
0429 "there are/will be unprocessed lumisections.");
0430
0431
0432
0433
0434
0435
0436
0437 desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0438
0439 DQMFileIterator::fillDescription(desc);
0440 edm::StreamerInputSource::fillDescription(desc);
0441 edm::EventSkipperByID::fillDescription(desc);
0442
0443 descriptions.add("source", desc);
0444 }
0445
0446 }
0447
0448 #include "FWCore/Framework/interface/InputSourceMacros.h"
0449 #include "FWCore/Framework/interface/MakerMacros.h"
0450
0451 typedef dqmservices::DQMStreamerReader DQMStreamerReader;
0452 DEFINE_FWK_INPUT_SOURCE(DQMStreamerReader);