File indexing completed on 2023-03-28 01:34:22
0001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0002
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/Sources/interface/EventSkipperByID.h"
0005 #include "FWCore/Utilities/interface/DebugMacros.h"
0006 #include "FWCore/Utilities/interface/EDMException.h"
0007 #include "FWCore/Utilities/interface/Exception.h"
0008 #include "FWCore/Utilities/interface/TimeOfDay.h"
0009 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0010
0011 #include "Utilities/StorageFactory/interface/IOFlags.h"
0012 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0013
0014 #include <iomanip>
0015 #include <iostream>
0016
0017 namespace edm {
0018
0019 StreamerInputFile::~StreamerInputFile() { closeStreamerFile(); }
0020
0021 StreamerInputFile::StreamerInputFile(std::string const& name,
0022 std::string const& LFN,
0023 std::shared_ptr<EventSkipperByID> eventSkipperByID,
0024 unsigned int prefetchMBytes)
0025 : startMsg_(),
0026 currentEvMsg_(),
0027 headerBuf_(1000 * 1000),
0028 eventBuf_(1000 * 1000 * 7),
0029 tempBuf_(1024 * 1024 * prefetchMBytes),
0030 currentFile_(0),
0031 streamerNames_(),
0032 multiStreams_(false),
0033 currentFileName_(),
0034 currentFileOpen_(false),
0035 eventSkipperByID_(eventSkipperByID),
0036 currRun_(0),
0037 currProto_(0),
0038 newHeader_(false),
0039 storage_(),
0040 endOfFile_(false) {
0041 openStreamerFile(name, LFN);
0042 readStartMessage();
0043 }
0044
0045 StreamerInputFile::StreamerInputFile(std::string const& name,
0046 std::shared_ptr<EventSkipperByID> eventSkipperByID,
0047 unsigned int prefetchMBytes)
0048 : StreamerInputFile(name, name, eventSkipperByID, prefetchMBytes) {}
0049
0050 StreamerInputFile::StreamerInputFile(std::vector<FileCatalogItem> const& names,
0051 std::shared_ptr<EventSkipperByID> eventSkipperByID,
0052 unsigned int prefetchMBytes)
0053 : startMsg_(),
0054 currentEvMsg_(),
0055 headerBuf_(1000 * 1000),
0056 eventBuf_(1000 * 1000 * 7),
0057 tempBuf_(1024 * 1024 * prefetchMBytes),
0058 currentFile_(0),
0059 streamerNames_(names),
0060 multiStreams_(true),
0061 currentFileName_(),
0062 currentFileOpen_(false),
0063 eventSkipperByID_(eventSkipperByID),
0064 currRun_(0),
0065 currProto_(0),
0066 newHeader_(false),
0067 endOfFile_(false) {
0068 openStreamerFile(names.at(0).fileName(0), names.at(0).logicalFileName());
0069 ++currentFile_;
0070 readStartMessage();
0071 currRun_ = startMsg_->run();
0072 currProto_ = startMsg_->protocolVersion();
0073 }
0074
0075 void StreamerInputFile::openStreamerFile(std::string const& name, std::string const& LFN) {
0076 closeStreamerFile();
0077
0078 currentFileName_ = name;
0079
0080
0081 if (currentFileName_.empty()) {
0082
0083 throw cms::Exception("LogicalFileNameNotFound", "StreamerInputFile::openStreamerFile()\n")
0084 << "Logical file name '" << LFN << "' was not found in the file catalog.\n"
0085 << "If you wanted a local file, you forgot the 'file:' prefix\n"
0086 << "before the file name in your configuration file.\n";
0087 return;
0088 }
0089
0090 logFileAction(" Initiating request to open file ");
0091
0092 using namespace edm::storage;
0093 IOOffset size = -1;
0094 if (StorageFactory::get()->check(name, &size)) {
0095 try {
0096 storage_ = StorageFactory::get()->open(name, IOFlags::OpenRead);
0097 } catch (cms::Exception& e) {
0098 Exception ex(errors::FileOpenError, "", e);
0099 ex.addContext("Calling StreamerInputFile::openStreamerFile()");
0100 ex.clearMessage();
0101 ex << "Error Opening Streamer Input File: " << name << "\n";
0102 throw ex;
0103 }
0104 } else {
0105 throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
0106 << "Error Opening Streamer Input File, file does not exist: " << name << "\n";
0107 }
0108 currentFileOpen_ = true;
0109 logFileAction(" Successfully opened file ");
0110 }
0111
0112 void StreamerInputFile::closeStreamerFile() {
0113 if (currentFileOpen_ && storage_) {
0114 storage_->close();
0115 logFileAction(" Closed file ");
0116 }
0117 currentFileOpen_ = false;
0118 }
0119
0120 std::pair<storage::IOSize, char*> StreamerInputFile::readBytes(char* buf,
0121 storage::IOSize nBytes,
0122 bool zeroCopy,
0123 unsigned int skippedHdr) {
0124 storage::IOSize n = 0;
0125
0126
0127 char* ptr = buf - skippedHdr;
0128 try {
0129 if (!tempBuf_.empty()) {
0130 if (tempPos_ == tempLen_) {
0131 n = storage_->read(&tempBuf_[0], tempBuf_.size());
0132 tempPos_ = 0;
0133 tempLen_ = n;
0134 if (n == 0)
0135 return std::pair<storage::IOSize, char*>(0, ptr);
0136 }
0137 if (nBytes <= tempLen_ - tempPos_) {
0138
0139 if (!zeroCopy || skippedHdr > tempPos_) {
0140 memcpy(buf, &tempBuf_[0] + tempPos_, nBytes);
0141 tempPos_ += nBytes;
0142 } else {
0143
0144 ptr = &tempBuf_[0] + tempPos_ - skippedHdr;
0145 tempPos_ += nBytes;
0146 }
0147 n = nBytes;
0148 } else {
0149
0150 auto len = tempLen_ - tempPos_;
0151 memcpy(buf, &tempBuf_[0] + tempPos_, len);
0152 tempPos_ += len;
0153 char* tmpPtr = buf + len;
0154 n = len + readBytes(tmpPtr, nBytes - len, false).first;
0155 }
0156 } else
0157 n = storage_->read(buf, nBytes);
0158 } catch (cms::Exception& ce) {
0159 Exception ex(errors::FileReadError, "", ce);
0160 ex.addContext("Calling StreamerInputFile::readBytes()");
0161 throw ex;
0162 }
0163 return std::pair<storage::IOSize, char*>(n, ptr);
0164 }
0165
0166 storage::IOOffset StreamerInputFile::skipBytes(storage::IOSize nBytes) {
0167 storage::IOOffset n = 0;
0168 try {
0169
0170 n = storage_->position(0, storage::Storage::CURRENT);
0171 n = storage_->position(nBytes, storage::Storage::CURRENT) - n;
0172 } catch (cms::Exception& ce) {
0173 Exception ex(errors::FileReadError, "", ce);
0174 ex.addContext("Calling StreamerInputFile::skipBytes()");
0175 throw ex;
0176 }
0177 return n;
0178 }
0179
0180 void StreamerInputFile::readStartMessage() {
0181 using namespace edm::storage;
0182 IOSize nWant = sizeof(InitHeader);
0183 IOSize nGot = readBytes(&headerBuf_[0], nWant, false).first;
0184 if (nGot != nWant) {
0185 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0186 << "Failed reading streamer file, first read in readStartMessage\n";
0187 }
0188
0189 uint32 headerSize;
0190 {
0191 HeaderView head(&headerBuf_[0]);
0192 uint32 code = head.code();
0193 if (code != Header::INIT)
0194 {
0195 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0196 << "Expecting an init Message at start of file\n";
0197 return;
0198 }
0199 headerSize = head.size();
0200 }
0201
0202 if (headerBuf_.size() < headerSize)
0203 headerBuf_.resize(headerSize);
0204
0205 if (headerSize > sizeof(InitHeader)) {
0206 nWant = headerSize - sizeof(InitHeader);
0207 auto res = readBytes(&headerBuf_[sizeof(InitHeader)], nWant, true, sizeof(InitHeader));
0208 if (res.first != nWant) {
0209 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0210 << "Failed reading streamer file, second read in readStartMessage\n";
0211 }
0212 startMsg_ = std::make_shared<InitMsgView>(res.second);
0213 } else {
0214 throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0215 << "Failed reading streamer file, init header size from data too small\n";
0216 }
0217 }
0218
0219 StreamerInputFile::Next StreamerInputFile::next() {
0220 if (this->readEventMessage()) {
0221 return Next::kEvent;
0222 }
0223 if (multiStreams_) {
0224
0225 if (currentFile_ <= streamerNames_.size() - 1) {
0226 newHeader_ = true;
0227 return Next::kFile;
0228 }
0229 }
0230 return Next::kStop;
0231 }
0232
0233 bool StreamerInputFile::openNextFile() {
0234 if (currentFile_ <= streamerNames_.size() - 1) {
0235 FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).fileNames()[0].c_str() << std::endl;
0236
0237 openStreamerFile(streamerNames_.at(currentFile_).fileNames()[0],
0238 streamerNames_.at(currentFile_).logicalFileName());
0239
0240
0241
0242 if (startMsg_) {
0243 FDEBUG(10) << "Comparing Header" << std::endl;
0244 compareHeader();
0245 }
0246 ++currentFile_;
0247 endOfFile_ = false;
0248 return true;
0249 }
0250 return false;
0251 }
0252
0253 bool StreamerInputFile::compareHeader() {
0254
0255 readStartMessage();
0256
0257
0258 if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
0259 throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
0260 << "File " << streamerNames_.at(currentFile_).fileNames()[0]
0261 << "\nhas different run number or protocol version than previous\n";
0262 return false;
0263 }
0264 return true;
0265 }
0266
0267 int StreamerInputFile::readEventMessage() {
0268 if (endOfFile_)
0269 return 0;
0270
0271 using namespace edm::storage;
0272 bool eventRead = false;
0273 unsigned hdrSkipped = 0;
0274 while (!eventRead) {
0275 IOSize nWant = sizeof(EventHeader);
0276 IOSize nGot = readBytes(&eventBuf_[hdrSkipped], nWant - hdrSkipped, false).first + hdrSkipped;
0277 while (nGot == nWant) {
0278
0279
0280 bool headerFetched = false;
0281 for (size_t i = 0; i < nGot; i++) {
0282 if ((unsigned char)eventBuf_[i] != Header::PADDING) {
0283
0284 if (i != 0) {
0285 memmove(&eventBuf_[0], &eventBuf_[i], nGot - i);
0286
0287 nGot = nGot - i + readBytes(&eventBuf_[nGot - i], i, false).first;
0288 }
0289 headerFetched = true;
0290 break;
0291 }
0292 }
0293 if (headerFetched)
0294 break;
0295
0296 nGot = readBytes(&eventBuf_[0], nWant, false).first;
0297 }
0298 if (nGot == 0) {
0299
0300 endOfFile_ = true;
0301 return 0;
0302 }
0303 if (nGot != nWant) {
0304 for (size_t i = 0; i < nGot; i++) {
0305 if ((unsigned char)eventBuf_[i] != Header::PADDING)
0306 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0307 << "Failed reading streamer file, first read in readEventMessage\n"
0308 << "Requested " << nWant << " bytes, read function returned " << nGot
0309 << " bytes, non-padding at offset " << i;
0310 }
0311
0312 endOfFile_ = true;
0313 return 0;
0314 }
0315 uint32 eventSize;
0316 {
0317 HeaderView head(&eventBuf_[0]);
0318 uint32 code = head.code();
0319
0320
0321 eventSize = head.size();
0322 if (code != Header::EVENT) {
0323 if (code == Header::INIT) {
0324 edm::LogWarning("StreamerInputFile") << "Found another INIT header in the file. It will be skipped";
0325 if (eventSize < sizeof(EventHeader)) {
0326
0327 hdrSkipped = nGot - eventSize;
0328 memmove(&eventBuf_[0], &eventBuf_[eventSize], hdrSkipped);
0329 continue;
0330 }
0331 if (headerBuf_.size() < eventSize)
0332 headerBuf_.resize(eventSize);
0333 memcpy(&headerBuf_[0], &eventBuf_[0], nGot);
0334 readBytes(&headerBuf_[nGot], eventSize, true, nGot);
0335
0336 continue;
0337 }
0338 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0339 << "Failed reading streamer file, unknown code in event header\n"
0340 << "code = " << code << "\n";
0341 }
0342 }
0343 if (eventSize <= sizeof(EventHeader)) {
0344 throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0345 << "Failed reading streamer file, event header size from data too small\n";
0346 }
0347 eventRead = true;
0348 if (eventSkipperByID_) {
0349 EventHeader* evh = (EventHeader*)(&eventBuf_[0]);
0350 if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
0351 eventRead = false;
0352 }
0353 }
0354 nWant = eventSize - sizeof(EventHeader);
0355 if (eventRead) {
0356 if (eventBuf_.size() < eventSize)
0357 eventBuf_.resize(eventSize);
0358
0359 auto res = readBytes(&eventBuf_[sizeof(EventHeader)], nWant, true, sizeof(EventHeader));
0360 if (res.first != nWant) {
0361 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0362 << "Failed reading streamer file, second read in readEventMessage\n"
0363 << "Requested " << nWant << " bytes, read function returned " << res.first << " bytes\n";
0364 }
0365 currentEvMsg_ =
0366 std::make_shared<EventMsgView>((void*)res.second);
0367 } else {
0368 nGot = skipBytes(nWant);
0369 if (nGot != nWant) {
0370 throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0371 << "Failed reading streamer file, skip event in readEventMessage\n"
0372 << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
0373 }
0374 }
0375 }
0376 return 1;
0377 }
0378
0379 void StreamerInputFile::logFileAction(char const* msg) {
0380 LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
0381 FlushMessageLog();
0382 }
0383 }