File indexing completed on 2024-05-31 04:19:42
0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "IOPool/Streamer/src/StreamerFileReader.h"
0004 #include "FWCore/Framework/interface/FileBlock.h"
0005 #include "FWCore/Utilities/interface/Exception.h"
0006 #include "FWCore/Utilities/interface/EDMException.h"
0007 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0009 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0011 #include "FWCore/Sources/interface/EventSkipperByID.h"
0012
0013 #include <cassert>
0014 namespace edm::streamer {
0015
0016 StreamerFileReader::StreamerFileReader(ParameterSet const& pset, InputSourceDescription const& desc)
0017 : StreamerInputSource(pset, desc),
0018 streamReader_(),
0019 eventSkipperByID_(EventSkipperByID::create(pset).release()),
0020 initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0021 prefetchMBytes_(pset.getUntrackedParameter<unsigned int>("prefetchMBytes")) {
0022 InputFileCatalog catalog(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0023 pset.getUntrackedParameter<std::string>("overrideCatalog"));
0024 streamerNames_ = catalog.fileCatalogItems();
0025 reset_();
0026 }
0027
0028 StreamerFileReader::~StreamerFileReader() {}
0029
0030 void StreamerFileReader::reset_() {
0031 if (streamerNames_.size() > 1) {
0032 streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_, eventSkipperByID(), prefetchMBytes_);
0033 } else if (streamerNames_.size() == 1) {
0034 streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_.at(0).fileNames()[0],
0035 streamerNames_.at(0).logicalFileName(),
0036 eventSkipperByID(),
0037 prefetchMBytes_);
0038 } else {
0039 throw Exception(errors::FileReadError, "StreamerFileReader::StreamerFileReader")
0040 << "No fileNames were specified\n";
0041 }
0042 isFirstFile_ = true;
0043 updateMetaData(false);
0044 if (initialNumberOfEventsToSkip_) {
0045 skip(initialNumberOfEventsToSkip_);
0046 }
0047 }
0048
0049 void StreamerFileReader::updateMetaData(bool subsequent) {
0050 InitMsgView const* header = getHeader();
0051 deserializeAndMergeWithRegistry(*header, subsequent);
0052
0053 auto eview = getNextEvent();
0054
0055
0056 if (eview == nullptr)
0057 return;
0058
0059 assert(eview->isEventMetaData());
0060 deserializeEventMetaData(*eview);
0061 updateEventMetaData();
0062 }
0063
0064 StreamerFileReader::Next StreamerFileReader::checkNext() {
0065 EventMsgView const* eview = getNextEvent();
0066
0067 if (eview == nullptr) {
0068 if (newHeader()) {
0069 return Next::kFile;
0070 }
0071 return Next::kStop;
0072 }
0073 if (eview->isEventMetaData()) {
0074 if (presentEventMetaDataChecksum() != eventMetaDataChecksum(*eview)) {
0075
0076 didArtificialFile_ = true;
0077 deserializeEventMetaData(*eview);
0078 return Next::kFile;
0079 } else {
0080
0081 eview = getNextEvent();
0082 if (eview == nullptr) {
0083 if (newHeader()) {
0084 return Next::kFile;
0085 }
0086 return Next::kStop;
0087 }
0088 }
0089 }
0090 deserializeEvent(*eview);
0091 return Next::kEvent;
0092 }
0093
0094 void StreamerFileReader::skip(int toSkip) {
0095 for (int i = 0; i != toSkip; ++i) {
0096 EventMsgView const* evMsg = getNextEvent();
0097 if (evMsg == nullptr) {
0098 return;
0099 }
0100
0101 if (eventSkipperByID_ && eventSkipperByID_->skipIt(evMsg->run(), evMsg->lumi(), evMsg->event())) {
0102 --i;
0103 }
0104 }
0105 }
0106
0107 void StreamerFileReader::genuineCloseFile() {
0108 if (didArtificialFile_) {
0109 return;
0110 }
0111 if (streamReader_.get() != nullptr)
0112 streamReader_->closeStreamerFile();
0113 }
0114
0115 void StreamerFileReader::genuineReadFile() {
0116 if (isFirstFile_) {
0117
0118 isFirstFile_ = false;
0119 return;
0120 }
0121 if (didArtificialFile_) {
0122
0123 didArtificialFile_ = false;
0124 updateEventMetaData();
0125 return;
0126 }
0127 streamReader_->openNextFile();
0128
0129
0130
0131 updateMetaData(true);
0132 }
0133
0134 bool StreamerFileReader::newHeader() { return streamReader_->newHeader(); }
0135
0136 InitMsgView const* StreamerFileReader::getHeader() {
0137 InitMsgView const* header = streamReader_->startMessage();
0138
0139 if (header->code() != Header::INIT) {
0140 throw Exception(errors::FileReadError, "StreamerFileReader::readHeader")
0141 << "received wrong message type: expected INIT, got " << header->code() << "\n";
0142 }
0143 return header;
0144 }
0145
0146 EventMsgView const* StreamerFileReader::getNextEvent() {
0147 if (StreamerInputFile::Next::kEvent != streamReader_->next()) {
0148 return nullptr;
0149 }
0150 return streamReader_->currentRecord();
0151 }
0152
0153 void StreamerFileReader::fillDescriptions(ConfigurationDescriptions& descriptions) {
0154 ParameterSetDescription desc;
0155 desc.setComment("Reads events from streamer files.");
0156 desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0157 desc.addUntracked<unsigned int>("skipEvents", 0U)
0158 ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0159 desc.addUntracked<std::string>("overrideCatalog", std::string());
0160
0161 desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0162 desc.addUntracked<unsigned int>("prefetchMBytes", 0);
0163 StreamerInputSource::fillDescription(desc);
0164 EventSkipperByID::fillDescription(desc);
0165 descriptions.add("source", desc);
0166 }
0167 }