Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-08-23 02:49:24

0001 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0002 
0003 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0004 #include "FWCore/Sources/interface/EventSkipperByID.h"
0005 #include "FWCore/Utilities/interface/DebugMacros.h"
0006 #include "FWCore/Utilities/interface/EDMException.h"
0007 #include "FWCore/Utilities/interface/Exception.h"
0008 #include "FWCore/Utilities/interface/TimeOfDay.h"
0009 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0010 
0011 #include "Utilities/StorageFactory/interface/IOFlags.h"
0012 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0013 
0014 #include <iomanip>
0015 #include <iostream>
0016 
0017 namespace edm {
0018 
0019   StreamerInputFile::~StreamerInputFile() { closeStreamerFile(); }
0020 
0021   StreamerInputFile::StreamerInputFile(std::string const& name,
0022                                        std::string const& LFN,
0023                                        std::shared_ptr<EventSkipperByID> eventSkipperByID,
0024                                        unsigned int prefetchMBytes)
0025       : startMsg_(),
0026         currentEvMsg_(),
0027         headerBuf_(1000 * 1000),
0028         eventBuf_(1000 * 1000 * 7),
0029         tempBuf_(1024 * 1024 * prefetchMBytes),
0030         currentFile_(0),
0031         streamerNames_(),
0032         multiStreams_(false),
0033         currentFileName_(),
0034         currentFileOpen_(false),
0035         eventSkipperByID_(eventSkipperByID),
0036         currRun_(0),
0037         currProto_(0),
0038         newHeader_(false),
0039         storage_(),
0040         endOfFile_(false) {
0041     openStreamerFile(name, LFN);
0042     readStartMessage();
0043   }
0044 
0045   StreamerInputFile::StreamerInputFile(std::string const& name,
0046                                        std::shared_ptr<EventSkipperByID> eventSkipperByID,
0047                                        unsigned int prefetchMBytes)
0048       : StreamerInputFile(name, name, eventSkipperByID, prefetchMBytes) {}
0049 
0050   StreamerInputFile::StreamerInputFile(std::vector<FileCatalogItem> const& names,
0051                                        std::shared_ptr<EventSkipperByID> eventSkipperByID,
0052                                        unsigned int prefetchMBytes)
0053       : startMsg_(),
0054         currentEvMsg_(),
0055         headerBuf_(1000 * 1000),
0056         eventBuf_(1000 * 1000 * 7),
0057         tempBuf_(1024 * 1024 * prefetchMBytes),
0058         currentFile_(0),
0059         streamerNames_(names),
0060         multiStreams_(true),
0061         currentFileName_(),
0062         currentFileOpen_(false),
0063         eventSkipperByID_(eventSkipperByID),
0064         currRun_(0),
0065         currProto_(0),
0066         newHeader_(false),
0067         endOfFile_(false) {
0068     openStreamerFile(names.at(0).fileName(0), names.at(0).logicalFileName());
0069     ++currentFile_;
0070     readStartMessage();
0071     currRun_ = startMsg_->run();
0072     currProto_ = startMsg_->protocolVersion();
0073   }
0074 
0075   void StreamerInputFile::openStreamerFile(std::string const& name, std::string const& LFN) {
0076     closeStreamerFile();
0077 
0078     currentFileName_ = name;
0079 
0080     // Check if the logical file name was found.
0081     if (currentFileName_.empty()) {
0082       // LFN not found in catalog.
0083       throw cms::Exception("LogicalFileNameNotFound", "StreamerInputFile::openStreamerFile()\n")
0084           << "Logical file name '" << LFN << "' was not found in the file catalog.\n"
0085           << "If you wanted a local file, you forgot the 'file:' prefix\n"
0086           << "before the file name in your configuration file.\n";
0087       return;
0088     }
0089 
0090     logFileAction("  Initiating request to open file ");
0091 
0092     using namespace edm::storage;
0093     IOOffset size = -1;
0094     if (StorageFactory::get()->check(name, &size)) {
0095       try {
0096         storage_ = StorageFactory::get()->open(name, IOFlags::OpenRead);
0097       } catch (cms::Exception& e) {
0098         Exception ex(errors::FileOpenError, "", e);
0099         ex.addContext("Calling StreamerInputFile::openStreamerFile()");
0100         ex.clearMessage();
0101         ex << "Error Opening Streamer Input File: " << name << "\n";
0102         throw ex;
0103       }
0104     } else {
0105       throw Exception(errors::FileOpenError, "StreamerInputFile::openStreamerFile")
0106           << "Error Opening Streamer Input File, file does not exist: " << name << "\n";
0107     }
0108     currentFileOpen_ = true;
0109     logFileAction("  Successfully opened file ");
0110   }
0111 
0112   void StreamerInputFile::closeStreamerFile() {
0113     if (currentFileOpen_ && storage_) {
0114       storage_->close();
0115       logFileAction("  Closed file ");
0116     }
0117     currentFileOpen_ = false;
0118   }
0119 
0120   std::pair<storage::IOSize, char*> StreamerInputFile::readBytes(char* buf,
0121                                                                  storage::IOSize nBytes,
0122                                                                  bool zeroCopy,
0123                                                                  unsigned int skippedHdr) {
0124     storage::IOSize n = 0;
0125     //returned pointer should point to the beginning of the header
0126     //even if we read event payload that comes afterwards
0127     char* ptr = buf - skippedHdr;
0128     try {
0129       if (!tempBuf_.empty()) {
0130         if (tempPos_ == tempLen_) {
0131           n = storage_->read(&tempBuf_[0], tempBuf_.size());
0132           tempPos_ = 0;
0133           tempLen_ = n;
0134           if (n == 0)
0135             return std::pair<storage::IOSize, char*>(0, ptr);
0136         }
0137         if (nBytes <= tempLen_ - tempPos_) {
0138           //zero-copy can't done when header start address is in the previous buffer
0139           if (!zeroCopy || skippedHdr > tempPos_) {
0140             memcpy(buf, &tempBuf_[0] + tempPos_, nBytes);
0141             tempPos_ += nBytes;
0142           } else {
0143             //pass pointer to the prebuffer address (zero copy) at the beginning of the header
0144             ptr = &tempBuf_[0] + tempPos_ - skippedHdr;
0145             tempPos_ += nBytes;
0146           }
0147           n = nBytes;
0148         } else {
0149           //crossing buffer boundary
0150           auto len = tempLen_ - tempPos_;
0151           memcpy(buf, &tempBuf_[0] + tempPos_, len);
0152           tempPos_ += len;
0153           char* tmpPtr = buf + len;
0154           n = len + readBytes(tmpPtr, nBytes - len, false).first;
0155         }
0156       } else
0157         n = storage_->read(buf, nBytes);
0158     } catch (cms::Exception& ce) {
0159       Exception ex(errors::FileReadError, "", ce);
0160       ex.addContext("Calling StreamerInputFile::readBytes()");
0161       throw ex;
0162     }
0163     return std::pair<storage::IOSize, char*>(n, ptr);
0164   }
0165 
0166   storage::IOOffset StreamerInputFile::skipBytes(storage::IOSize nBytes) {
0167     storage::IOOffset n = 0;
0168     try {
0169       // We wish to return the number of bytes skipped, not the final offset.
0170       n = storage_->position(0, storage::Storage::CURRENT);
0171       n = storage_->position(nBytes, storage::Storage::CURRENT) - n;
0172     } catch (cms::Exception& ce) {
0173       Exception ex(errors::FileReadError, "", ce);
0174       ex.addContext("Calling StreamerInputFile::skipBytes()");
0175       throw ex;
0176     }
0177     return n;
0178   }
0179 
0180   void StreamerInputFile::readStartMessage() {
0181     using namespace edm::storage;
0182     IOSize nWant = sizeof(HeaderView);
0183     IOSize nGot = readBytes(&headerBuf_[0], nWant, false).first;
0184     if (nGot != nWant) {
0185       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0186           << "Failed reading streamer file, first read in readStartMessage\n";
0187     }
0188 
0189     uint32 headerSize;
0190     {
0191       HeaderView head(&headerBuf_[0]);
0192       uint32 code = head.code();
0193       if (code != Header::INIT) /** Not an init message should return ******/
0194       {
0195         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0196             << "Expecting an init Message at start of file\n";
0197         return;
0198       }
0199       headerSize = head.size();
0200     }
0201 
0202     if (headerBuf_.size() < headerSize)
0203       headerBuf_.resize(headerSize);
0204 
0205     if (headerSize > sizeof(HeaderView)) {
0206       nWant = headerSize - sizeof(HeaderView);
0207       auto res = readBytes(&headerBuf_[sizeof(HeaderView)], nWant, true, sizeof(HeaderView));
0208       if (res.first != nWant) {
0209         throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0210             << "Failed reading streamer file, second read in readStartMessage\n";
0211       }
0212       startMsg_ = std::make_shared<InitMsgView>(res.second);  // propagate_const<T> has no reset() function
0213     } else {
0214       throw Exception(errors::FileReadError, "StreamerInputFile::readStartMessage")
0215           << "Failed reading streamer file, init header size from data too small\n";
0216     }
0217   }
0218 
0219   StreamerInputFile::Next StreamerInputFile::next() {
0220     if (this->readEventMessage()) {
0221       return Next::kEvent;
0222     }
0223     if (multiStreams_) {
0224       //Try opening next file
0225       if (currentFile_ <= streamerNames_.size() - 1) {
0226         newHeader_ = true;
0227         return Next::kFile;
0228       }
0229     }
0230     return Next::kStop;
0231   }
0232 
0233   bool StreamerInputFile::openNextFile() {
0234     if (currentFile_ <= streamerNames_.size() - 1) {
0235       FDEBUG(10) << "Opening file " << streamerNames_.at(currentFile_).fileNames()[0].c_str() << std::endl;
0236 
0237       openStreamerFile(streamerNames_.at(currentFile_).fileNames()[0],
0238                        streamerNames_.at(currentFile_).logicalFileName());
0239 
0240       // If start message was already there, then compare the
0241       // previous and new headers
0242       if (startMsg_) {
0243         FDEBUG(10) << "Comparing Header" << std::endl;
0244         compareHeader();
0245       }
0246       ++currentFile_;
0247       endOfFile_ = false;
0248       return true;
0249     }
0250     return false;
0251   }
0252 
0253   bool StreamerInputFile::compareHeader() {
0254     //Get the new header
0255     readStartMessage();
0256 
0257     //Values from new Header should match up
0258     if (currRun_ != startMsg_->run() || currProto_ != startMsg_->protocolVersion()) {
0259       throw Exception(errors::MismatchedInputFiles, "StreamerInputFile::compareHeader")
0260           << "File " << streamerNames_.at(currentFile_).fileNames()[0]
0261           << "\nhas different run number or protocol version than previous\n";
0262       return false;
0263     }
0264     return true;
0265   }
0266 
0267   int StreamerInputFile::readEventMessage() {
0268     if (endOfFile_)
0269       return 0;
0270 
0271     using namespace edm::storage;
0272     bool eventRead = false;
0273     while (!eventRead) {
0274       IOSize nWant = sizeof(EventHeader);
0275       IOSize nGot = readBytes(&eventBuf_[0], nWant, false).first;
0276       if (nGot == 0) {
0277         // no more data available
0278         endOfFile_ = true;
0279         return 0;
0280       }
0281       if (nGot != nWant) {
0282         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0283             << "Failed reading streamer file, first read in readEventMessage\n"
0284             << "Requested " << nWant << " bytes, read function returned " << nGot << " bytes\n";
0285       }
0286       uint32 eventSize;
0287       {
0288         HeaderView head(&eventBuf_[0]);
0289         uint32 code = head.code();
0290 
0291         // If it is not an event then something is wrong.
0292         if (code != Header::EVENT) {
0293           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0294               << "Failed reading streamer file, unknown code in event header\n"
0295               << "code = " << code << "\n";
0296         }
0297         eventSize = head.size();
0298       }
0299       if (eventSize <= sizeof(EventHeader)) {
0300         throw edm::Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0301             << "Failed reading streamer file, event header size from data too small\n";
0302       }
0303       eventRead = true;
0304       if (eventSkipperByID_) {
0305         EventHeader* evh = (EventHeader*)(&eventBuf_[0]);
0306         if (eventSkipperByID_->skipIt(convert32(evh->run_), convert32(evh->lumi_), convert64(evh->event_))) {
0307           eventRead = false;
0308         }
0309       }
0310       nWant = eventSize - sizeof(EventHeader);
0311       if (eventRead) {
0312         if (eventBuf_.size() < eventSize)
0313           eventBuf_.resize(eventSize);
0314 
0315         auto res = readBytes(&eventBuf_[sizeof(EventHeader)], nWant, true, sizeof(EventHeader));
0316         if (res.first != nWant) {
0317           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0318               << "Failed reading streamer file, second read in readEventMessage\n"
0319               << "Requested " << nWant << " bytes, read function returned " << res.first << " bytes\n";
0320         }
0321         currentEvMsg_ =
0322             std::make_shared<EventMsgView>((void*)res.second);  // propagate_const<T> has no reset() function
0323       } else {
0324         nGot = skipBytes(nWant);
0325         if (nGot != nWant) {
0326           throw Exception(errors::FileReadError, "StreamerInputFile::readEventMessage")
0327               << "Failed reading streamer file, skip event in readEventMessage\n"
0328               << "Requested " << nWant << " bytes skipped, seek function returned " << nGot << " bytes\n";
0329         }
0330       }
0331     }
0332     return 1;
0333   }
0334 
0335   void StreamerInputFile::logFileAction(char const* msg) {
0336     LogAbsolute("fileAction") << std::setprecision(0) << TimeOfDay() << msg << currentFileName_;
0337     FlushMessageLog();
0338   }
0339 }  // namespace edm