Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-28 01:34:22

0001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0002 
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/Sources/interface/EventSkipperByID.h"
0005 #include "FWCore/Utilities/interface/DebugMacros.h"
0006 #include "FWCore/Utilities/interface/EDMException.h"
0007 #include "FWCore/Utilities/interface/Exception.h"
0008 #include "FWCore/Utilities/interface/TimeOfDay.h"
0009 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0010 
0011 #include "Utilities/StorageFactory/interface/IOFlags.h"
0012 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0013 
0014 #include <iomanip>
0015 #include <iostream>
0016 
0017 namespace edm {
0018 
0019   StreamerInputFile::~StreamerInputFile() { closeStreamerFile(); }
0020 
0021   StreamerInputFile::StreamerInputFile(std::string const& name,
0022                                        std::string const& LFN,
0023                                        std::shared_ptr<EventSkipperByID> eventSkipperByID,
0024                                        unsigned int prefetchMBytes)
0025       : startMsg_(),
0026         currentEvMsg_(),
0027         headerBuf_(1000 * 1000),
0028         eventBuf_(1000 * 1000 * 7),
0029         tempBuf_(1024 * 1024 * prefetchMBytes),
0030         currentFile_(0),
0031         streamerNames_(),
0032         multiStreams_(false),
0033         currentFileName_(),
0034         currentFileOpen_(false),
0035         eventSkipperByID_(eventSkipperByID),
0036         currRun_(0),
0037         currProto_(0),
0038         newHeader_(false),
0039         storage_(),
0040         endOfFile_(false) {
0041     openStreamerFile(name, LFN);
0042     readStartMessage();
0043   }
0044 
0045   StreamerInputFile::StreamerInputFile(std::string const& name,
0046                                        std::shared_ptr<EventSkipperByID> eventSkipperByID,
0047                                        unsigned int prefetchMBytes)
0048       : StreamerInputFile(name, name, eventSkipperByID, prefetchMBytes) {}
0049 
0050   StreamerInputFile::StreamerInputFile(std::vector<FileCatalogItem> const& names,
0051                                        std::shared_ptr<EventSkipperByID> eventSkipperByID,
0052                                        unsigned int prefetchMBytes)
0053       : startMsg_(),
0054         currentEvMsg_(),
0055         headerBuf_(1000 * 1000),
0056         eventBuf_(1000 * 1000 * 7),
0057         tempBuf_(1024 * 1024 * prefetchMBytes),
0058         currentFile_(0),
0059         streamerNames_(names),
0060         multiStreams_(true),
0061         currentFileName_(),
0062         currentFileOpen_(false),
0063         eventSkipperByID_(eventSkipperByID),
0064         currRun_(0),
0065         currProto_(0),
0066         newHeader_(false),
0067         endOfFile_(false) {
0068     openStreamerFile(names.at(0).fileName(0), names.at(0).logicalFileName());
0069     ++currentFile_;
0070     readStartMessage();
0071     currRun_ = startMsg_->run();
0072     currProto_ = startMsg_->protocolVersion();
0073   }
0074 
0075   void StreamerInputFile::openStreamerFile(std::string const& name, std::string const& LFN) {
0076     closeStreamerFile();
0077 
0078     currentFileName_ = name;
0079 
0080     // Check if the logical file name was found.
0081     if (currentFileName_.empty()) {
0082       // LFN not found in catalog.
0083       throw cms::Exception("LogicalFileNameNotFound", "StreamerInputFile::openStreamerFile()\n")
0084           << "Logical file name '" << LFN << "' was not found in the file catalog.\n"
0085           << "If you wanted a local file, you forgot the 'file:' prefix\n"
0086           << "before the file name in your configuration file.\n";
0087       return;
0088     }
0089 
0090     logFileAction("  Initiating request to open file ");
0091 
0092     using namespace edm::storage;
0093     IOOffset size = -1;
0094     if (StorageFactory::get()->check(name, &size)) {
0095       try {
0096         storage_ = StorageFactory::get()->open(name, IOFlags::OpenRead);
0097       } catch (cms::Exception& e) {
0098         Exception ex(errors::FileOpenError, "", e);
0099         ex.addContext("Calling StreamerInputFile::openStreamerFile()");
0100         ex.clearMessage();
0101         ex << "Error Opening Streamer Input File: " << name << "\n";
0102         throw ex;
0103       }
0104     } else {
0105       throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
0106           << "Error Opening Streamer Input File, file does not exist: " << name << "\n";
0107     }
0108     currentFileOpen_ = true;
0109     logFileAction("  Successfully opened file ");
0110   }
0111 
0112   void StreamerInputFile::closeStreamerFile() {
0113     if (currentFileOpen_ && storage_) {
0114       storage_->close();
0115       logFileAction("  Closed file ");
0116     }
0117     currentFileOpen_ = false;
0118   }
0119 
0120   std::pair<storage::IOSize, char*> StreamerInputFile::readBytes(char* buf,
0121                                                                  storage::IOSize nBytes,
0122                                                                  bool zeroCopy,
0123                                                                  unsigned int skippedHdr) {
0124     storage::IOSize n = 0;
0125     //returned pointer should point to the beginning of the header
0126     //even if we read event payload that comes afterwards
0127     char* ptr = buf - skippedHdr;
0128     try {
0129       if (!tempBuf_.empty()) {
0130         if (tempPos_ == tempLen_) {
0131           n = storage_->read(&tempBuf_[0], tempBuf_.size());
0132           tempPos_ = 0;
0133           tempLen_ = n;
0134           if (n == 0)
0135             return std::pair<storage::IOSize, char*>(0, ptr);
0136         }
0137         if (nBytes <= tempLen_ - tempPos_) {
0138           //zero-copy can't done when header start address is in the previous buffer
0139           if (!zeroCopy || skippedHdr > tempPos_) {
0140             memcpy(buf, &tempBuf_[0] + tempPos_, nBytes);
0141             tempPos_ += nBytes;
0142           } else {
0143             //pass pointer to the prebuffer address (zero copy) at the beginning of the header
0144             ptr = &tempBuf_[0] + tempPos_ - skippedHdr;
0145             tempPos_ += nBytes;
0146           }
0147           n = nBytes;
0148         } else {
0149           //crossing buffer boundary
0150           auto len = tempLen_ - tempPos_;
0151           memcpy(buf, &tempBuf_[0] + tempPos_, len);
0152           tempPos_ += len;
0153           char* tmpPtr = buf + len;
0154           n = len + readBytes(tmpPtr, nBytes - len, false).first;
0155         }
0156       } else
0157         n = storage_->read(buf, nBytes);
0158     } catch (cms::Exception& ce) {
0159       Exception ex(errors::FileReadError, "", ce);
0160       ex.addContext("Calling StreamerInputFile::readBytes()");
0161       throw ex;
0162     }
0163     return std::pair<storage::IOSize, char*>(n, ptr);
0164   }
0165 
0166   storage::IOOffset StreamerInputFile::skipBytes(storage::IOSize nBytes) {
0167     storage::IOOffset n = 0;
0168     try {
0169       // We wish to return the number of bytes skipped, not the final offset.
0170       n = storage_->position(0, storage::Storage::CURRENT);
0171       n = storage_->position(nBytes, storage::Storage::CURRENT) - n;
0172     } catch (cms::Exception& ce) {
0173       Exception ex(errors::FileReadError, "", ce);
0174       ex.addContext("Calling StreamerInputFile::skipBytes()");
0175       throw ex;
0176     }
0177     return n;
0178   }
0179 
0180   void StreamerInputFile::readStartMessage() {
0181     using namespace edm::storage;
0182     IOSize nWant = sizeof(InitHeader);
0183     IOSize nGot = readBytes(&headerBuf_[0], nWant, false).first;
0184     if (nGot != nWant) {
0185       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0186           << "Failed reading streamer file, first read in readStartMessage\n";
0187     }
0188 
0189     uint32 headerSize;
0190     {
0191       HeaderView head(&headerBuf_[0]);
0192       uint32 code = head.code();
0193       if (code != Header::INIT) /** Not an init message should return ******/
0194       {
0195         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0196             << "Expecting an init Message at start of file\n";
0197         return;
0198       }
0199       headerSize = head.size();
0200     }
0201 
0202     if (headerBuf_.size() < headerSize)
0203       headerBuf_.resize(headerSize);
0204 
0205     if (headerSize > sizeof(InitHeader)) {
0206       nWant = headerSize - sizeof(InitHeader);
0207       auto res = readBytes(&headerBuf_[sizeof(InitHeader)], nWant, true, sizeof(InitHeader));
0208       if (res.first != nWant) {
0209         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0210             << "Failed reading streamer file, second read in readStartMessage\n";
0211       }
0212       startMsg_ = std::make_shared<InitMsgView>(res.second);  // propagate_const<T> has no reset() function
0213     } else {
0214       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0215           << "Failed reading streamer file, init header size from data too small\n";
0216     }
0217   }
0218 
0219   StreamerInputFile::Next StreamerInputFile::next() {
0220     if (this->readEventMessage()) {
0221       return Next::kEvent;
0222     }
0223     if (multiStreams_) {
0224       //Try opening next file
0225       if (currentFile_ <= streamerNames_.size() - 1) {
0226         newHeader_ = true;
0227         return Next::kFile;
0228       }
0229     }
0230     return Next::kStop;
0231   }
0232 
0233   bool StreamerInputFile::openNextFile() {
0234     if (currentFile_ <= streamerNames_.size() - 1) {
0235       FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).fileNames()[0].c_str() << std::endl;
0236 
0237       openStreamerFile(streamerNames_.at(currentFile_).fileNames()[0],
0238                        streamerNames_.at(currentFile_).logicalFileName());
0239 
0240       // If start message was already there, then compare the
0241       // previous and new headers
0242       if (startMsg_) {
0243         FDEBUG(10) << "Comparing Header" << std::endl;
0244         compareHeader();
0245       }
0246       ++currentFile_;
0247       endOfFile_ = false;
0248       return true;
0249     }
0250     return false;
0251   }
0252 
0253   bool StreamerInputFile::compareHeader() {
0254     //Get the new header
0255     readStartMessage();
0256 
0257     //Values from new Header should match up
0258     if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
0259       throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
0260           << "File " << streamerNames_.at(currentFile_).fileNames()[0]
0261           << "\nhas different run number or protocol version than previous\n";
0262       return false;
0263     }
0264     return true;
0265   }
0266 
0267   int StreamerInputFile::readEventMessage() {
0268     if (endOfFile_)
0269       return 0;
0270 
0271     using namespace edm::storage;
0272     bool eventRead = false;
0273     unsigned hdrSkipped = 0;
0274     while (!eventRead) {
0275       IOSize nWant = sizeof(EventHeader);
0276       IOSize nGot = readBytes(&eventBuf_[hdrSkipped], nWant - hdrSkipped, false).first + hdrSkipped;
0277       while (nGot == nWant) {
0278         //allow padding before next event or end of file.
0279         //event header starts with code 0 - 17, so 0xff (Header:PADDING) uniquely represents padding
0280         bool headerFetched = false;
0281         for (size_t i = 0; i < nGot; i++) {
0282           if ((unsigned char)eventBuf_[i] != Header::PADDING) {
0283             //no padding 0xff
0284             if (i != 0) {
0285               memmove(&eventBuf_[0], &eventBuf_[i], nGot - i);
0286               //read remainder of the header
0287               nGot = nGot - i + readBytes(&eventBuf_[nGot - i], i, false).first;
0288             }
0289             headerFetched = true;
0290             break;
0291           }
0292         }
0293         if (headerFetched)
0294           break;
0295         //read another block
0296         nGot = readBytes(&eventBuf_[0], nWant, false).first;
0297       }
0298       if (nGot == 0) {
0299         // no more data available
0300         endOfFile_ = true;
0301         return 0;
0302       }
0303       if (nGot != nWant) {
0304         for (size_t i = 0; i < nGot; i++) {
0305           if ((unsigned char)eventBuf_[i] != Header::PADDING)
0306             throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0307                 << "Failed reading streamer file, first read in readEventMessage\n"
0308                 << "Requested " << nWant << " bytes, read function returned " << nGot
0309                 << " bytes, non-padding at offset " << i;
0310         }
0311         //padded 0xff only
0312         endOfFile_ = true;
0313         return 0;
0314       }
0315       uint32 eventSize;
0316       {
0317         HeaderView head(&eventBuf_[0]);
0318         uint32 code = head.code();
0319 
0320         // If it is not an event then something is wrong.
0321         eventSize = head.size();
0322         if (code != Header::EVENT) {
0323           if (code == Header::INIT) {
0324             edm::LogWarning("StreamerInputFile") << "Found another INIT header in the file. It will be skipped";
0325             if (eventSize < sizeof(EventHeader)) {
0326               //very unlikely case that EventHeader is larger than total INIT size inserted in the middle of the file
0327               hdrSkipped = nGot - eventSize;
0328               memmove(&eventBuf_[0], &eventBuf_[eventSize], hdrSkipped);
0329               continue;
0330             }
0331             if (headerBuf_.size() < eventSize)
0332               headerBuf_.resize(eventSize);
0333             memcpy(&headerBuf_[0], &eventBuf_[0], nGot);
0334             readBytes(&headerBuf_[nGot], eventSize, true, nGot);
0335             //do not parse this header and proceed to the next event
0336             continue;
0337           }
0338           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0339               << "Failed reading streamer file, unknown code in event header\n"
0340               << "code = " << code << "\n";
0341         }
0342       }
0343       if (eventSize <= sizeof(EventHeader)) {
0344         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0345             << "Failed reading streamer file, event header size from data too small\n";
0346       }
0347       eventRead = true;
0348       if (eventSkipperByID_) {
0349         EventHeader* evh = (EventHeader*)(&eventBuf_[0]);
0350         if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
0351           eventRead = false;
0352         }
0353       }
0354       nWant = eventSize - sizeof(EventHeader);
0355       if (eventRead) {
0356         if (eventBuf_.size() < eventSize)
0357           eventBuf_.resize(eventSize);
0358 
0359         auto res = readBytes(&eventBuf_[sizeof(EventHeader)], nWant, true, sizeof(EventHeader));
0360         if (res.first != nWant) {
0361           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0362               << "Failed reading streamer file, second read in readEventMessage\n"
0363               << "Requested " << nWant << " bytes, read function returned " << res.first << " bytes\n";
0364         }
0365         currentEvMsg_ =
0366             std::make_shared<EventMsgView>((void*)res.second);  // propagate_const<T> has no reset() function
0367       } else {
0368         nGot = skipBytes(nWant);
0369         if (nGot != nWant) {
0370           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0371               << "Failed reading streamer file, skip event in readEventMessage\n"
0372               << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
0373         }
0374       }
0375     }
0376     return 1;
0377   }
0378 
0379   void StreamerInputFile::logFileAction(char const* msg) {
0380     LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
0381     FlushMessageLog();
0382   }
0383 }  // namespace edm