Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-08-23 02:49:23

0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "IOPool/Streamer/src/StreamerFileReader.h"
0004 #include "FWCore/Framework/interface/FileBlock.h"
0005 #include "FWCore/Utilities/interface/Exception.h"
0006 #include "FWCore/Utilities/interface/EDMException.h"
0007 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0009 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0011 #include "FWCore/Sources/interface/EventSkipperByID.h"
0012 
0013 namespace edm {
0014 
0015   StreamerFileReader::StreamerFileReader(ParameterSet const& pset, InputSourceDescription const& desc)
0016       : StreamerInputSource(pset, desc),
0017         streamReader_(),
0018         eventSkipperByID_(EventSkipperByID::create(pset).release()),
0019         initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0020         prefetchMBytes_(pset.getUntrackedParameter<unsigned int>("prefetchMBytes")) {
0021     InputFileCatalog catalog(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0022                              pset.getUntrackedParameter<std::string>("overrideCatalog"));
0023     streamerNames_ = catalog.fileCatalogItems();
0024     reset_();
0025   }
0026 
0027   StreamerFileReader::~StreamerFileReader() {}
0028 
0029   void StreamerFileReader::reset_() {
0030     if (streamerNames_.size() > 1) {
0031       streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_, eventSkipperByID(), prefetchMBytes_);
0032     } else if (streamerNames_.size() == 1) {
0033       streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_.at(0).fileNames()[0],
0034                                                           streamerNames_.at(0).logicalFileName(),
0035                                                           eventSkipperByID(),
0036                                                           prefetchMBytes_);
0037     } else {
0038       throw Exception(errors::FileReadError, "StreamerFileReader::StreamerFileReader")
0039           << "No fileNames were specified\n";
0040     }
0041     isFirstFile_ = true;
0042     InitMsgView const* header = getHeader();
0043     deserializeAndMergeWithRegistry(*header, false);
0044     if (initialNumberOfEventsToSkip_) {
0045       skip(initialNumberOfEventsToSkip_);
0046     }
0047   }
0048 
0049   StreamerFileReader::Next StreamerFileReader::checkNext() {
0050     EventMsgView const* eview = getNextEvent();
0051 
0052     if (eview == nullptr) {
0053       if (newHeader()) {
0054         return Next::kFile;
0055       }
0056       return Next::kStop;
0057     }
0058     deserializeEvent(*eview);
0059     return Next::kEvent;
0060   }
0061 
0062   void StreamerFileReader::skip(int toSkip) {
0063     for (int i = 0; i != toSkip; ++i) {
0064       EventMsgView const* evMsg = getNextEvent();
0065       if (evMsg == nullptr) {
0066         return;
0067       }
0068       // If the event would have been skipped anyway, don't count it as a skipped event.
0069       if (eventSkipperByID_ && eventSkipperByID_->skipIt(evMsg->run(), evMsg->lumi(), evMsg->event())) {
0070         --i;
0071       }
0072     }
0073   }
0074 
0075   void StreamerFileReader::genuineCloseFile() {
0076     if (streamReader_.get() != nullptr)
0077       streamReader_->closeStreamerFile();
0078   }
0079 
0080   void StreamerFileReader::genuineReadFile() {
0081     if (isFirstFile_) {
0082       //The file was already opened in the constructor
0083       isFirstFile_ = false;
0084       return;
0085     }
0086     streamReader_->openNextFile();
0087     // FDEBUG(6) << "A new file has been opened and we must compare Headers here !!" << std::endl;
0088     // A new file has been opened and we must compare Heraders here !!
0089     //Get header/init from reader
0090     InitMsgView const* header = getHeader();
0091     deserializeAndMergeWithRegistry(*header, true);
0092   }
0093 
0094   bool StreamerFileReader::newHeader() { return streamReader_->newHeader(); }
0095 
0096   InitMsgView const* StreamerFileReader::getHeader() {
0097     InitMsgView const* header = streamReader_->startMessage();
0098 
0099     if (header->code() != Header::INIT) {  //INIT Msg
0100       throw Exception(errors::FileReadError, "StreamerFileReader::readHeader")
0101           << "received wrong message type: expected INIT, got " << header->code() << "\n";
0102     }
0103     return header;
0104   }
0105 
0106   EventMsgView const* StreamerFileReader::getNextEvent() {
0107     if (StreamerInputFile::Next::kEvent != streamReader_->next()) {
0108       return nullptr;
0109     }
0110     return streamReader_->currentRecord();
0111   }
0112 
0113   void StreamerFileReader::fillDescriptions(ConfigurationDescriptions& descriptions) {
0114     ParameterSetDescription desc;
0115     desc.setComment("Reads events from streamer files.");
0116     desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0117     desc.addUntracked<unsigned int>("skipEvents", 0U)
0118         ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0119     desc.addUntracked<std::string>("overrideCatalog", std::string());
0120     //This next parameter is read in the base class, but its default value depends on the derived class, so it is set here.
0121     desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0122     desc.addUntracked<unsigned int>("prefetchMBytes", 0);
0123     StreamerInputSource::fillDescription(desc);
0124     EventSkipperByID::fillDescription(desc);
0125     descriptions.add("source", desc);
0126   }
0127 }  // namespace edm