File indexing completed on 2024-04-06 12:19:06
0001
0002
0003 #include "EmbeddedRootSource.h"
0004 #include "InputFile.h"
0005 #include "RootFile.h"
0006 #include "RootEmbeddedFileSequence.h"
0007 #include "RootTree.h"
0008
0009 #include "DataFormats/Provenance/interface/BranchID.h"
0010 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0011 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0012 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0013 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0014 #include "FWCore/Framework/interface/InputSource.h"
0015 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0017 #include "FWCore/ServiceRegistry/interface/Service.h"
0018 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0019
0020 #include "CLHEP/Random/RandFlat.h"
0021
0022 #include <random>
0023 #include <algorithm>
0024 #include <atomic>
0025
0026 namespace {
0027 std::atomic<unsigned int> badFilesSkipped_{0};
0028 auto operator"" _uz(unsigned long long i) -> std::size_t { return std::size_t{i}; }
0029 }
0030
0031 namespace edm {
0032 class EventPrincipal;
0033
0034 RootEmbeddedFileSequence::RootEmbeddedFileSequence(ParameterSet const& pset,
0035 EmbeddedRootSource& input,
0036 InputFileCatalog const& catalog)
0037 : RootInputFileSequence(pset, catalog),
0038 input_(input),
0039 orderedProcessHistoryIDs_(),
0040 sequential_(pset.getUntrackedParameter<bool>("sequential", false)),
0041 sameLumiBlock_(pset.getUntrackedParameter<bool>("sameLumiBlock", false)),
0042 fptr_(nullptr),
0043 eventsRemainingInFile_(0),
0044
0045
0046
0047
0048
0049
0050
0051 initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents", 0U)),
0052 treeCacheSize_(pset.getUntrackedParameter<unsigned int>("cacheSize", roottree::defaultCacheSize)),
0053 enablePrefetching_(false),
0054 enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName", false)),
0055 maxFileSkips_(pset.getUntrackedParameter<unsigned int>("maxFileSkips", std::min(3_uz, numberOfFiles()))) {
0056 if (noFiles()) {
0057 throw Exception(errors::NoSecondaryFiles)
0058 << "RootEmbeddedFileSequence no input files specified for secondary input source.\n";
0059 }
0060
0061
0062 Service<SiteLocalConfig> pSLC;
0063 if (pSLC.isAvailable()) {
0064 if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
0065 treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
0066 }
0067 enablePrefetching_ = pSLC->enablePrefetching();
0068 }
0069
0070
0071 if (sameLumiBlock_) {
0072 if (sequential_) {
0073 fptr_ = &RootEmbeddedFileSequence::readOneSequentialWithID;
0074 } else {
0075 fptr_ = &RootEmbeddedFileSequence::readOneRandomWithID;
0076 }
0077 } else {
0078 if (sequential_) {
0079 fptr_ = &RootEmbeddedFileSequence::readOneSequential;
0080 } else {
0081 fptr_ = &RootEmbeddedFileSequence::readOneRandom;
0082 }
0083 }
0084
0085
0086 if (sequential_) {
0087
0088 if (!atFirstFile()) {
0089 setAtFirstFile();
0090 initFile(false);
0091 }
0092 assert(rootFile());
0093 rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
0094 if (!sameLumiBlock_) {
0095 skipEntries(initialNumberOfEventsToSkip_);
0096 }
0097 } else {
0098
0099
0100 std::ifstream f("/dev/urandom");
0101 unsigned int seed;
0102 f.read(reinterpret_cast<char*>(&seed), sizeof(seed));
0103 std::default_random_engine dre(seed);
0104 std::uniform_int_distribution<int> distribution(0, numberOfFiles() - 1);
0105 while (!rootFile() && badFilesSkipped_ < maxFileSkips_) {
0106 int offset = distribution(dre);
0107 setAtFileSequenceNumber(offset);
0108 initFile(input_.skipBadFiles());
0109 if (not rootFile()) {
0110 ++badFilesSkipped_;
0111 }
0112 }
0113 }
0114 if (rootFile()) {
0115 input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
0116 } else {
0117 throw Exception(errors::FileOpenError) << "RootEmbeddedFileSequence::RootEmbeddedFileSequence(): "
0118 << " input file retries exhausted.\n";
0119 }
0120 }
0121
0122 RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {}
0123
0124 void RootEmbeddedFileSequence::endJob() { closeFile(); }
0125
0126 void RootEmbeddedFileSequence::closeFile_() {
0127
0128 if (rootFile()) {
0129 rootFile().reset();
0130 }
0131 }
0132
0133 void RootEmbeddedFileSequence::initFile_(bool skipBadFiles) {
0134 initTheFile(skipBadFiles, false, nullptr, "mixingFiles", InputType::SecondarySource);
0135 }
0136
0137 RootEmbeddedFileSequence::RootFileSharedPtr RootEmbeddedFileSequence::makeRootFile(
0138 std::shared_ptr<InputFile> filePtr) {
0139 size_t currentIndexIntoFile = sequenceNumberOfFile();
0140 return std::make_shared<RootFile>(fileNames()[0],
0141 ProcessConfiguration(),
0142 logicalFileName(),
0143 filePtr,
0144 input_.nStreams(),
0145 treeCacheSize_,
0146 input_.treeMaxVirtualSize(),
0147 input_.runHelper(),
0148 input_.productSelectorRules(),
0149 InputType::SecondarySource,
0150 input_.processHistoryRegistryForUpdate(),
0151 indexesIntoFiles(),
0152 currentIndexIntoFile,
0153 orderedProcessHistoryIDs_,
0154 input_.bypassVersionCheck(),
0155 enablePrefetching_,
0156 enforceGUIDInFileName_);
0157 }
0158
0159 void RootEmbeddedFileSequence::skipEntries(unsigned int offset) {
0160
0161 bool completed = rootFile()->skipEntries(offset);
0162 while (!completed) {
0163 setAtNextFile();
0164 if (noMoreFiles()) {
0165 setAtFirstFile();
0166 }
0167 initFile(false);
0168 assert(rootFile());
0169 rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
0170 completed = rootFile()->skipEntries(offset);
0171 }
0172 }
0173
0174 bool RootEmbeddedFileSequence::readOneSequential(
0175 EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine*, EventID const*, bool recycleFiles) {
0176 assert(rootFile());
0177 bool found = rootFile()->nextEventEntry();
0178 if (found) {
0179 auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0180 found = found2;
0181 }
0182 if (!found) {
0183 setAtNextFile();
0184 if (noMoreFiles()) {
0185 if (recycleFiles) {
0186 setAtFirstFile();
0187 } else {
0188 return false;
0189 }
0190 }
0191 initFile(false);
0192 assert(rootFile());
0193 rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
0194 return readOneSequential(cache, fileNameHash, nullptr, nullptr, recycleFiles);
0195 }
0196 fileNameHash = lfnHash();
0197 return true;
0198 }
0199
0200 bool RootEmbeddedFileSequence::readOneSequentialWithID(
0201 EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine*, EventID const* idp, bool recycleFiles) {
0202 assert(idp);
0203 EventID const& id = *idp;
0204 int offset = initialNumberOfEventsToSkip_;
0205 initialNumberOfEventsToSkip_ = 0;
0206 if (offset > 0) {
0207 assert(rootFile());
0208 while (offset > 0) {
0209 bool found = readOneSequentialWithID(cache, fileNameHash, nullptr, idp, recycleFiles);
0210 if (!found) {
0211 return false;
0212 }
0213 --offset;
0214 }
0215 }
0216 assert(rootFile());
0217 if (noMoreFiles() || rootFile()->indexIntoFileIter().run() != id.run() ||
0218 rootFile()->indexIntoFileIter().lumi() != id.luminosityBlock()) {
0219 bool found = skipToItem(id.run(), id.luminosityBlock(), 0, 0, false);
0220 if (!found) {
0221 return false;
0222 }
0223 }
0224 assert(rootFile());
0225 bool found = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
0226 if (found) {
0227 auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0228 found = found2;
0229 }
0230 if (!found) {
0231 found = skipToItemInNewFile(id.run(), id.luminosityBlock(), 0);
0232 if (!found) {
0233 return false;
0234 }
0235 return readOneSequentialWithID(cache, fileNameHash, nullptr, idp, recycleFiles);
0236 }
0237 fileNameHash = lfnHash();
0238 return true;
0239 }
0240
0241 void RootEmbeddedFileSequence::readOneSpecified(EventPrincipal& cache,
0242 size_t& fileNameHash,
0243 SecondaryEventIDAndFileInfo const& idx) {
0244 EventID const& id = idx.eventID();
0245 bool found = skipToItem(id.run(), id.luminosityBlock(), id.event(), idx.fileNameHash());
0246 if (!found) {
0247 throw Exception(errors::NotFound) << "RootEmbeddedFileSequence::readOneSpecified(): Secondary Input files"
0248 << " do not contain specified event:\n"
0249 << id << " in file id " << idx.fileNameHash() << "\n";
0250 }
0251 assert(rootFile());
0252 auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0253 found = found2;
0254 assert(found);
0255 fileNameHash = idx.fileNameHash();
0256 if (fileNameHash == 0U) {
0257 fileNameHash = lfnHash();
0258 }
0259 }
0260
0261 bool RootEmbeddedFileSequence::readOneRandom(
0262 EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine* engine, EventID const*, bool) {
0263 assert(rootFile());
0264 assert(engine);
0265 unsigned int currentSeqNumber = sequenceNumberOfFile();
0266 while (eventsRemainingInFile_ == 0) {
0267 bool opened{false};
0268 while (!opened && badFilesSkipped_ < maxFileSkips_) {
0269 unsigned int newSeqNumber = CLHEP::RandFlat::shootInt(engine, fileCatalogItems().size());
0270 setAtFileSequenceNumber(newSeqNumber);
0271 if (newSeqNumber != currentSeqNumber) {
0272 initFile(input_.skipBadFiles());
0273 currentSeqNumber = newSeqNumber;
0274 }
0275 if (rootFile()) {
0276 eventsRemainingInFile_ = rootFile()->eventTree().entries();
0277 if (eventsRemainingInFile_ == 0) {
0278 if (!input_.skipBadFiles()) {
0279 throw Exception(errors::NotFound) << "RootEmbeddedFileSequence::readOneRandom(): Secondary Input file "
0280 << fileNames()[0] << " contains no events.\n";
0281 }
0282 LogWarning("RootEmbeddedFileSequence") << "RootEmbeddedFileSequence::readOneRandom(): Secondary Input file "
0283 << fileNames()[0] << " contains no events and will be skipped.\n";
0284 ++badFilesSkipped_;
0285 } else {
0286 opened = true;
0287 }
0288 } else {
0289 if (newSeqNumber != currentSeqNumber) {
0290 ++badFilesSkipped_;
0291 }
0292 }
0293 }
0294 if (not opened) {
0295 throw Exception(errors::FileOpenError) << "RootEmbeddedFileSequence::readOneRandom(): "
0296 << " input file retries exhausted.\n";
0297 }
0298 rootFile()->setAtEventEntry(CLHEP::RandFlat::shootInt(engine, eventsRemainingInFile_) - 1);
0299 }
0300 rootFile()->nextEventEntry();
0301
0302 auto [found, succeeded] = rootFile()->readCurrentEvent(cache);
0303 if (!found) {
0304 rootFile()->setAtEventEntry(0);
0305 auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0306 assert(found2);
0307 }
0308 fileNameHash = lfnHash();
0309 --eventsRemainingInFile_;
0310 return true;
0311 }
0312
0313 bool RootEmbeddedFileSequence::readOneRandomWithID(EventPrincipal& cache,
0314 size_t& fileNameHash,
0315 CLHEP::HepRandomEngine* engine,
0316 EventID const* idp,
0317 bool recycleFiles) {
0318 assert(engine);
0319 assert(idp);
0320 EventID const& id = *idp;
0321 if (noMoreFiles() || !rootFile() || rootFile()->indexIntoFileIter().run() != id.run() ||
0322 rootFile()->indexIntoFileIter().lumi() != id.luminosityBlock()) {
0323 bool found = skipToItem(id.run(), id.luminosityBlock(), 0);
0324 if (!found) {
0325 return false;
0326 }
0327 int eventsInLumi = 0;
0328 assert(rootFile());
0329 while (rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock()))
0330 ++eventsInLumi;
0331 found = skipToItem(id.run(), id.luminosityBlock(), 0);
0332 assert(found);
0333 int eventInLumi = CLHEP::RandFlat::shootInt(engine, eventsInLumi);
0334 for (int i = 0; i < eventInLumi; ++i) {
0335 bool foundEventInLumi = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
0336 assert(foundEventInLumi);
0337 }
0338 }
0339 assert(rootFile());
0340 bool found = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
0341 if (found) {
0342 auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0343 found = found2;
0344 }
0345 if (!found) {
0346 found = rootFile()->setEntryAtItem(id.run(), id.luminosityBlock(), 0);
0347 if (!found) {
0348 return false;
0349 }
0350 return readOneRandomWithID(cache, fileNameHash, engine, idp, recycleFiles);
0351 }
0352 fileNameHash = lfnHash();
0353 return true;
0354 }
0355
0356 bool RootEmbeddedFileSequence::readOneEvent(EventPrincipal& cache,
0357 size_t& fileNameHash,
0358 CLHEP::HepRandomEngine* engine,
0359 EventID const* id,
0360 bool recycleFiles) {
0361 assert(!sameLumiBlock_ || id != nullptr);
0362 assert(sequential_ || engine != nullptr);
0363 return (this->*fptr_)(cache, fileNameHash, engine, id, recycleFiles);
0364 }
0365
0366 void RootEmbeddedFileSequence::fillDescription(ParameterSetDescription& desc) {
0367 desc.addUntracked<bool>("sequential", false)
0368 ->setComment(
0369 "True: loopEvents() reads events sequentially from beginning of first file.\n"
0370 "False: loopEvents() first reads events beginning at random event. New files also chosen randomly");
0371 desc.addUntracked<bool>("sameLumiBlock", false)
0372 ->setComment(
0373 "True: loopEvents() reads events only in same lumi as the specified event.\n"
0374 "False: loopEvents() reads events regardless of lumi.");
0375 desc.addUntracked<unsigned int>("skipEvents", 0U)
0376 ->setComment(
0377 "Skip the first 'skipEvents' events. Used only if 'sequential' is True and 'sameLumiBlock' is False");
0378 desc.addUntracked<unsigned int>("maxFileSkips")
0379 ->setComment(
0380 "How many files to try if 'sequential' is False and 'skipBadFiles' is True.\n"
0381 "Defaults to 3 (or # of files if smaller).");
0382 desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
0383 ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
0384 desc.addUntracked<bool>("enforceGUIDInFileName", false)
0385 ->setComment(
0386 "True: file name part is required to be equal to the GUID of the file\n"
0387 "False: file name can be anything");
0388 }
0389 }