Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-31 04:19:42

0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "IOPool/Streamer/src/StreamerFileReader.h"
0004 #include "FWCore/Framework/interface/FileBlock.h"
0005 #include "FWCore/Utilities/interface/Exception.h"
0006 #include "FWCore/Utilities/interface/EDMException.h"
0007 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0009 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0010 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0011 #include "FWCore/Sources/interface/EventSkipperByID.h"
0012 
0013 #include <cassert>
0014 namespace edm::streamer {
0015 
0016   StreamerFileReader::StreamerFileReader(ParameterSet const& pset, InputSourceDescription const& desc)
0017       : StreamerInputSource(pset, desc),
0018         streamReader_(),
0019         eventSkipperByID_(EventSkipperByID::create(pset).release()),
0020         initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0021         prefetchMBytes_(pset.getUntrackedParameter<unsigned int>("prefetchMBytes")) {
0022     InputFileCatalog catalog(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0023                              pset.getUntrackedParameter<std::string>("overrideCatalog"));
0024     streamerNames_ = catalog.fileCatalogItems();
0025     reset_();
0026   }
0027 
0028   StreamerFileReader::~StreamerFileReader() {}
0029 
0030   void StreamerFileReader::reset_() {
0031     if (streamerNames_.size() > 1) {
0032       streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_, eventSkipperByID(), prefetchMBytes_);
0033     } else if (streamerNames_.size() == 1) {
0034       streamReader_ = std::make_unique<StreamerInputFile>(streamerNames_.at(0).fileNames()[0],
0035                                                           streamerNames_.at(0).logicalFileName(),
0036                                                           eventSkipperByID(),
0037                                                           prefetchMBytes_);
0038     } else {
0039       throw Exception(errors::FileReadError, "StreamerFileReader::StreamerFileReader")
0040           << "No fileNames were specified\n";
0041     }
0042     isFirstFile_ = true;
0043     updateMetaData(false);
0044     if (initialNumberOfEventsToSkip_) {
0045       skip(initialNumberOfEventsToSkip_);
0046     }
0047   }
0048 
0049   void StreamerFileReader::updateMetaData(bool subsequent) {
0050     InitMsgView const* header = getHeader();
0051     deserializeAndMergeWithRegistry(*header, subsequent);
0052     //NOTE: should read first Event to get the meta data and then set 'artificial file'
0053     auto eview = getNextEvent();
0054 
0055     //if no events then file must be empty
0056     if (eview == nullptr)
0057       return;
0058 
0059     assert(eview->isEventMetaData());
0060     deserializeEventMetaData(*eview);
0061     updateEventMetaData();
0062   }
0063 
0064   StreamerFileReader::Next StreamerFileReader::checkNext() {
0065     EventMsgView const* eview = getNextEvent();
0066 
0067     if (eview == nullptr) {
0068       if (newHeader()) {
0069         return Next::kFile;
0070       }
0071       return Next::kStop;
0072     }
0073     if (eview->isEventMetaData()) {
0074       if (presentEventMetaDataChecksum() != eventMetaDataChecksum(*eview)) {
0075         //we lie and say there is a new file since we need to synchronize to update the meta data
0076         didArtificialFile_ = true;
0077         deserializeEventMetaData(*eview);
0078         return Next::kFile;
0079       } else {
0080         //skip this meta data
0081         eview = getNextEvent();
0082         if (eview == nullptr) {
0083           if (newHeader()) {
0084             return Next::kFile;
0085           }
0086           return Next::kStop;
0087         }
0088       }
0089     }
0090     deserializeEvent(*eview);
0091     return Next::kEvent;
0092   }
0093 
0094   void StreamerFileReader::skip(int toSkip) {
0095     for (int i = 0; i != toSkip; ++i) {
0096       EventMsgView const* evMsg = getNextEvent();
0097       if (evMsg == nullptr) {
0098         return;
0099       }
0100       // If the event would have been skipped anyway, don't count it as a skipped event.
0101       if (eventSkipperByID_ && eventSkipperByID_->skipIt(evMsg->run(), evMsg->lumi(), evMsg->event())) {
0102         --i;
0103       }
0104     }
0105   }
0106 
0107   void StreamerFileReader::genuineCloseFile() {
0108     if (didArtificialFile_) {
0109       return;
0110     }
0111     if (streamReader_.get() != nullptr)
0112       streamReader_->closeStreamerFile();
0113   }
0114 
0115   void StreamerFileReader::genuineReadFile() {
0116     if (isFirstFile_) {
0117       //The file was already opened in the constructor
0118       isFirstFile_ = false;
0119       return;
0120     }
0121     if (didArtificialFile_) {
0122       //update the event meta data
0123       didArtificialFile_ = false;
0124       updateEventMetaData();
0125       return;
0126     }
0127     streamReader_->openNextFile();
0128     // FDEBUG(6) << "A new file has been opened and we must compare Headers here !!" << std::endl;
0129     // A new file has been opened and we must compare Heraders here !!
0130     //Get header/init from reader
0131     updateMetaData(true);
0132   }
0133 
0134   bool StreamerFileReader::newHeader() { return streamReader_->newHeader(); }
0135 
0136   InitMsgView const* StreamerFileReader::getHeader() {
0137     InitMsgView const* header = streamReader_->startMessage();
0138 
0139     if (header->code() != Header::INIT) {  //INIT Msg
0140       throw Exception(errors::FileReadError, "StreamerFileReader::readHeader")
0141           << "received wrong message type: expected INIT, got " << header->code() << "\n";
0142     }
0143     return header;
0144   }
0145 
0146   EventMsgView const* StreamerFileReader::getNextEvent() {
0147     if (StreamerInputFile::Next::kEvent != streamReader_->next()) {
0148       return nullptr;
0149     }
0150     return streamReader_->currentRecord();
0151   }
0152 
0153   void StreamerFileReader::fillDescriptions(ConfigurationDescriptions& descriptions) {
0154     ParameterSetDescription desc;
0155     desc.setComment("Reads events from streamer files.");
0156     desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0157     desc.addUntracked<unsigned int>("skipEvents", 0U)
0158         ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0159     desc.addUntracked<std::string>("overrideCatalog", std::string());
0160     //This next parameter is read in the base class, but its default value depends on the derived class, so it is set here.
0161     desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
0162     desc.addUntracked<unsigned int>("prefetchMBytes", 0);
0163     StreamerInputSource::fillDescription(desc);
0164     EventSkipperByID::fillDescription(desc);
0165     descriptions.add("source", desc);
0166   }
0167 }  // namespace edm::streamer