File indexing completed on 2023-03-17 11:10:28
0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "IOPool/Streamer/src/StreamerFileReader.h"
0004 #include "FWCore/Framework/interface/FileBlock.h"
0005 #include "FWCore/Utilities/interface/Exception.h"
0006 #include "FWCore/Utilities/interface/EDMException.h"
0007 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0009 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0011 #include "FWCore/Sources/interface/EventSkipperByID.h"
0012
0013 namespace edm {
0014
0015 StreamerFileReader::StreamerFileReader(ParameterSet const& pset, InputSourceDescription const& desc)
0016 : StreamerInputSource(pset, desc),
0017 streamReader_(),
0018 eventSkipperByID_(EventSkipperByID::create(pset).release()),
0019 initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0020 prefetchMBytes_(pset.getUntrackedParameter<unsigned int>("prefetchMBytes")) {
0021 InputFileCatalog catalog(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0022 pset.getUntrackedParameter<std::string>("overrideCatalog"));
0023 streamerNames_ = catalog.fileCatalogItems();
0024 reset_();
0025 }
0026
0027 StreamerFileReader::~StreamerFileReader() {}
0028
0029 void StreamerFileReader::reset_() {
0030 if (streamerNames_.size() > 1) {
0031 streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_, eventSkipperByID(), prefetchMBytes_);
0032 } else if (streamerNames_.size() == 1) {
0033 streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_.at(0).fileNames()[0],
0034 streamerNames_.at(0).logicalFileName(),
0035 eventSkipperByID(),
0036 prefetchMBytes_);
0037 } else {
0038 throw Exception(errors::FileReadError, "StreamerFileReader::StreamerFileReader")
0039 << "No fileNames were specified\n";
0040 }
0041 isFirstFile_ = true;
0042 InitMsgView const* header = getHeader();
0043 deserializeAndMergeWithRegistry(*header, false);
0044 if (initialNumberOfEventsToSkip_) {
0045 skip(initialNumberOfEventsToSkip_);
0046 }
0047 }
0048
0049 StreamerFileReader::Next StreamerFileReader::checkNext() {
0050 EventMsgView const* eview = getNextEvent();
0051
0052 if (eview == nullptr) {
0053 if (newHeader()) {
0054 return Next::kFile;
0055 }
0056 return Next::kStop;
0057 }
0058 deserializeEvent(*eview);
0059 return Next::kEvent;
0060 }
0061
0062 void StreamerFileReader::skip(int toSkip) {
0063 for (int i = 0; i != toSkip; ++i) {
0064 EventMsgView const* evMsg = getNextEvent();
0065 if (evMsg == nullptr) {
0066 return;
0067 }
0068
0069 if (eventSkipperByID_ && eventSkipperByID_->skipIt(evMsg->run(), evMsg->lumi(), evMsg->event())) {
0070 --i;
0071 }
0072 }
0073 }
0074
0075 void StreamerFileReader::genuineCloseFile() {
0076 if (streamReader_.get() != nullptr)
0077 streamReader_->closeStreamerFile();
0078 }
0079
0080 void StreamerFileReader::genuineReadFile() {
0081 if (isFirstFile_) {
0082
0083 isFirstFile_ = false;
0084 return;
0085 }
0086 streamReader_->openNextFile();
0087
0088
0089
0090 InitMsgView const* header = getHeader();
0091 deserializeAndMergeWithRegistry(*header, true);
0092 }
0093
0094 bool StreamerFileReader::newHeader() { return streamReader_->newHeader(); }
0095
0096 InitMsgView const* StreamerFileReader::getHeader() {
0097 InitMsgView const* header = streamReader_->startMessage();
0098
0099 if (header->code() != Header::INIT) {
0100 throw Exception(errors::FileReadError, "StreamerFileReader::readHeader")
0101 << "received wrong message type: expected INIT, got " << header->code() << "\n";
0102 }
0103 return header;
0104 }
0105
0106 EventMsgView const* StreamerFileReader::getNextEvent() {
0107 if (StreamerInputFile::Next::kEvent != streamReader_->next()) {
0108 return nullptr;
0109 }
0110 return streamReader_->currentRecord();
0111 }
0112
0113 void StreamerFileReader::fillDescriptions(ConfigurationDescriptions& descriptions) {
0114 ParameterSetDescription desc;
0115 desc.setComment("Reads events from streamer files.");
0116 desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0117 desc.addUntracked<unsigned int>("skipEvents", 0U)
0118 ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0119 desc.addUntracked<std::string>("overrideCatalog", std::string());
0120
0121 desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0122 desc.addUntracked<unsigned int>("prefetchMBytes", 0);
0123 StreamerInputSource::fillDescription(desc);
0124 EventSkipperByID::fillDescription(desc);
0125 descriptions.add("source", desc);
0126 }
0127 }