Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:19:06

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "EmbeddedRootSource.h"
0004 #include "InputFile.h"
0005 #include "RootFile.h"
0006 #include "RootEmbeddedFileSequence.h"
0007 #include "RootTree.h"
0008 
0009 #include "DataFormats/Provenance/interface/BranchID.h"
0010 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0011 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0012 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0013 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0014 #include "FWCore/Framework/interface/InputSource.h"
0015 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0017 #include "FWCore/ServiceRegistry/interface/Service.h"
0018 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0019 
0020 #include "CLHEP/Random/RandFlat.h"
0021 
0022 #include <random>
0023 #include <algorithm>
0024 #include <atomic>
0025 
0026 namespace {
0027   std::atomic<unsigned int> badFilesSkipped_{0};
0028   auto operator"" _uz(unsigned long long i) -> std::size_t { return std::size_t{i}; }  // uz will be in C++23
0029 }  // namespace
0030 
0031 namespace edm {
0032   class EventPrincipal;
0033 
0034   RootEmbeddedFileSequence::RootEmbeddedFileSequence(ParameterSet const& pset,
0035                                                      EmbeddedRootSource& input,
0036                                                      InputFileCatalog const& catalog)
0037       : RootInputFileSequence(pset, catalog),
0038         input_(input),
0039         orderedProcessHistoryIDs_(),
0040         sequential_(pset.getUntrackedParameter<bool>("sequential", false)),
0041         sameLumiBlock_(pset.getUntrackedParameter<bool>("sameLumiBlock", false)),
0042         fptr_(nullptr),
0043         eventsRemainingInFile_(0),
0044         // The default value provided as the second argument to the getUntrackedParameter function call
0045         // is not used when the ParameterSet has been validated and the parameters are not optional
0046         // in the description.  This is currently true when PoolSource is the primary input source.
0047         // The modules that use PoolSource as a SecSource have not defined their fillDescriptions function
0048         // yet, so the ParameterSet does not get validated yet.  As soon as all the modules with a SecSource
0049         // have defined descriptions, the defaults in the getUntrackedParameterSet function calls can
0050         // and should be deleted from the code.
0051         initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents", 0U)),
0052         treeCacheSize_(pset.getUntrackedParameter<unsigned int>("cacheSize", roottree::defaultCacheSize)),
0053         enablePrefetching_(false),
0054         enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName", false)),
0055         maxFileSkips_(pset.getUntrackedParameter<unsigned int>("maxFileSkips", std::min(3_uz, numberOfFiles()))) {
0056     if (noFiles()) {
0057       throw Exception(errors::NoSecondaryFiles)
0058           << "RootEmbeddedFileSequence no input files specified for secondary input source.\n";
0059     }
0060     //
0061     // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
0062     Service<SiteLocalConfig> pSLC;
0063     if (pSLC.isAvailable()) {
0064       if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
0065         treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
0066       }
0067       enablePrefetching_ = pSLC->enablePrefetching();
0068     }
0069 
0070     // Set the pointer to the function that reads an event.
0071     if (sameLumiBlock_) {
0072       if (sequential_) {
0073         fptr_ = &RootEmbeddedFileSequence::readOneSequentialWithID;
0074       } else {
0075         fptr_ = &RootEmbeddedFileSequence::readOneRandomWithID;
0076       }
0077     } else {
0078       if (sequential_) {
0079         fptr_ = &RootEmbeddedFileSequence::readOneSequential;
0080       } else {
0081         fptr_ = &RootEmbeddedFileSequence::readOneRandom;
0082       }
0083     }
0084 
0085     // For the secondary input source we do not stage in.
0086     if (sequential_) {
0087       // We open the first file
0088       if (!atFirstFile()) {
0089         setAtFirstFile();
0090         initFile(false);
0091       }
0092       assert(rootFile());
0093       rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
0094       if (!sameLumiBlock_) {
0095         skipEntries(initialNumberOfEventsToSkip_);
0096       }
0097     } else {
0098       // We randomly choose the first file to open.
0099       // We cannot use the random number service yet.
0100       std::ifstream f("/dev/urandom");
0101       unsigned int seed;
0102       f.read(reinterpret_cast<char*>(&seed), sizeof(seed));
0103       std::default_random_engine dre(seed);
0104       std::uniform_int_distribution<int> distribution(0, numberOfFiles() - 1);
0105       while (!rootFile() && badFilesSkipped_ < maxFileSkips_) {
0106         int offset = distribution(dre);
0107         setAtFileSequenceNumber(offset);
0108         initFile(input_.skipBadFiles());
0109         if (not rootFile()) {
0110           ++badFilesSkipped_;
0111         }
0112       }
0113     }
0114     if (rootFile()) {
0115       input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
0116     } else {
0117       throw Exception(errors::FileOpenError) << "RootEmbeddedFileSequence::RootEmbeddedFileSequence(): "
0118                                              << " input file retries exhausted.\n";
0119     }
0120   }
0121 
0122   RootEmbeddedFileSequence::~RootEmbeddedFileSequence() {}
0123 
0124   void RootEmbeddedFileSequence::endJob() { closeFile(); }
0125 
0126   void RootEmbeddedFileSequence::closeFile_() {
0127     // delete the RootFile object.
0128     if (rootFile()) {
0129       rootFile().reset();
0130     }
0131   }
0132 
0133   void RootEmbeddedFileSequence::initFile_(bool skipBadFiles) {
0134     initTheFile(skipBadFiles, false, nullptr, "mixingFiles", InputType::SecondarySource);
0135   }
0136 
0137   RootEmbeddedFileSequence::RootFileSharedPtr RootEmbeddedFileSequence::makeRootFile(
0138       std::shared_ptr<InputFile> filePtr) {
0139     size_t currentIndexIntoFile = sequenceNumberOfFile();
0140     return std::make_shared<RootFile>(fileNames()[0],
0141                                       ProcessConfiguration(),
0142                                       logicalFileName(),
0143                                       filePtr,
0144                                       input_.nStreams(),
0145                                       treeCacheSize_,
0146                                       input_.treeMaxVirtualSize(),
0147                                       input_.runHelper(),
0148                                       input_.productSelectorRules(),
0149                                       InputType::SecondarySource,
0150                                       input_.processHistoryRegistryForUpdate(),
0151                                       indexesIntoFiles(),
0152                                       currentIndexIntoFile,
0153                                       orderedProcessHistoryIDs_,
0154                                       input_.bypassVersionCheck(),
0155                                       enablePrefetching_,
0156                                       enforceGUIDInFileName_);
0157   }
0158 
0159   void RootEmbeddedFileSequence::skipEntries(unsigned int offset) {
0160     // offset is decremented by the number of events actually skipped.
0161     bool completed = rootFile()->skipEntries(offset);
0162     while (!completed) {
0163       setAtNextFile();
0164       if (noMoreFiles()) {
0165         setAtFirstFile();
0166       }
0167       initFile(false);
0168       assert(rootFile());
0169       rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
0170       completed = rootFile()->skipEntries(offset);
0171     }
0172   }
0173 
0174   bool RootEmbeddedFileSequence::readOneSequential(
0175       EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine*, EventID const*, bool recycleFiles) {
0176     assert(rootFile());
0177     bool found = rootFile()->nextEventEntry();
0178     if (found) {
0179       auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0180       found = found2;
0181     }
0182     if (!found) {
0183       setAtNextFile();
0184       if (noMoreFiles()) {
0185         if (recycleFiles) {
0186           setAtFirstFile();
0187         } else {
0188           return false;
0189         }
0190       }
0191       initFile(false);
0192       assert(rootFile());
0193       rootFile()->setAtEventEntry(IndexIntoFile::invalidEntry);
0194       return readOneSequential(cache, fileNameHash, nullptr, nullptr, recycleFiles);
0195     }
0196     fileNameHash = lfnHash();
0197     return true;
0198   }
0199 
0200   bool RootEmbeddedFileSequence::readOneSequentialWithID(
0201       EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine*, EventID const* idp, bool recycleFiles) {
0202     assert(idp);
0203     EventID const& id = *idp;
0204     int offset = initialNumberOfEventsToSkip_;
0205     initialNumberOfEventsToSkip_ = 0;
0206     if (offset > 0) {
0207       assert(rootFile());
0208       while (offset > 0) {
0209         bool found = readOneSequentialWithID(cache, fileNameHash, nullptr, idp, recycleFiles);
0210         if (!found) {
0211           return false;
0212         }
0213         --offset;
0214       }
0215     }
0216     assert(rootFile());
0217     if (noMoreFiles() || rootFile()->indexIntoFileIter().run() != id.run() ||
0218         rootFile()->indexIntoFileIter().lumi() != id.luminosityBlock()) {
0219       bool found = skipToItem(id.run(), id.luminosityBlock(), 0, 0, false);
0220       if (!found) {
0221         return false;
0222       }
0223     }
0224     assert(rootFile());
0225     bool found = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
0226     if (found) {
0227       auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0228       found = found2;
0229     }
0230     if (!found) {
0231       found = skipToItemInNewFile(id.run(), id.luminosityBlock(), 0);
0232       if (!found) {
0233         return false;
0234       }
0235       return readOneSequentialWithID(cache, fileNameHash, nullptr, idp, recycleFiles);
0236     }
0237     fileNameHash = lfnHash();
0238     return true;
0239   }
0240 
0241   void RootEmbeddedFileSequence::readOneSpecified(EventPrincipal& cache,
0242                                                   size_t& fileNameHash,
0243                                                   SecondaryEventIDAndFileInfo const& idx) {
0244     EventID const& id = idx.eventID();
0245     bool found = skipToItem(id.run(), id.luminosityBlock(), id.event(), idx.fileNameHash());
0246     if (!found) {
0247       throw Exception(errors::NotFound) << "RootEmbeddedFileSequence::readOneSpecified(): Secondary Input files"
0248                                         << " do not contain specified event:\n"
0249                                         << id << " in file id " << idx.fileNameHash() << "\n";
0250     }
0251     assert(rootFile());
0252     auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0253     found = found2;
0254     assert(found);
0255     fileNameHash = idx.fileNameHash();
0256     if (fileNameHash == 0U) {
0257       fileNameHash = lfnHash();
0258     }
0259   }
0260 
0261   bool RootEmbeddedFileSequence::readOneRandom(
0262       EventPrincipal& cache, size_t& fileNameHash, CLHEP::HepRandomEngine* engine, EventID const*, bool) {
0263     assert(rootFile());
0264     assert(engine);
0265     unsigned int currentSeqNumber = sequenceNumberOfFile();
0266     while (eventsRemainingInFile_ == 0) {
0267       bool opened{false};
0268       while (!opened && badFilesSkipped_ < maxFileSkips_) {
0269         unsigned int newSeqNumber = CLHEP::RandFlat::shootInt(engine, fileCatalogItems().size());
0270         setAtFileSequenceNumber(newSeqNumber);
0271         if (newSeqNumber != currentSeqNumber) {
0272           initFile(input_.skipBadFiles());
0273           currentSeqNumber = newSeqNumber;
0274         }
0275         if (rootFile()) {
0276           eventsRemainingInFile_ = rootFile()->eventTree().entries();
0277           if (eventsRemainingInFile_ == 0) {
0278             if (!input_.skipBadFiles()) {
0279               throw Exception(errors::NotFound) << "RootEmbeddedFileSequence::readOneRandom(): Secondary Input file "
0280                                                 << fileNames()[0] << " contains no events.\n";
0281             }
0282             LogWarning("RootEmbeddedFileSequence") << "RootEmbeddedFileSequence::readOneRandom(): Secondary Input file "
0283                                                    << fileNames()[0] << " contains no events and will be skipped.\n";
0284             ++badFilesSkipped_;
0285           } else {
0286             opened = true;
0287           }
0288         } else {
0289           if (newSeqNumber != currentSeqNumber) {
0290             ++badFilesSkipped_;
0291           }
0292         }
0293       }
0294       if (not opened) {
0295         throw Exception(errors::FileOpenError) << "RootEmbeddedFileSequence::readOneRandom(): "
0296                                                << " input file retries exhausted.\n";
0297       }
0298       rootFile()->setAtEventEntry(CLHEP::RandFlat::shootInt(engine, eventsRemainingInFile_) - 1);
0299     }
0300     rootFile()->nextEventEntry();
0301 
0302     auto [found, succeeded] = rootFile()->readCurrentEvent(cache);
0303     if (!found) {
0304       rootFile()->setAtEventEntry(0);
0305       auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0306       assert(found2);
0307     }
0308     fileNameHash = lfnHash();
0309     --eventsRemainingInFile_;
0310     return true;
0311   }
0312 
0313   bool RootEmbeddedFileSequence::readOneRandomWithID(EventPrincipal& cache,
0314                                                      size_t& fileNameHash,
0315                                                      CLHEP::HepRandomEngine* engine,
0316                                                      EventID const* idp,
0317                                                      bool recycleFiles) {
0318     assert(engine);
0319     assert(idp);
0320     EventID const& id = *idp;
0321     if (noMoreFiles() || !rootFile() || rootFile()->indexIntoFileIter().run() != id.run() ||
0322         rootFile()->indexIntoFileIter().lumi() != id.luminosityBlock()) {
0323       bool found = skipToItem(id.run(), id.luminosityBlock(), 0);
0324       if (!found) {
0325         return false;
0326       }
0327       int eventsInLumi = 0;
0328       assert(rootFile());
0329       while (rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock()))
0330         ++eventsInLumi;
0331       found = skipToItem(id.run(), id.luminosityBlock(), 0);
0332       assert(found);
0333       int eventInLumi = CLHEP::RandFlat::shootInt(engine, eventsInLumi);
0334       for (int i = 0; i < eventInLumi; ++i) {
0335         bool foundEventInLumi = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
0336         assert(foundEventInLumi);
0337       }
0338     }
0339     assert(rootFile());
0340     bool found = rootFile()->setEntryAtNextEventInLumi(id.run(), id.luminosityBlock());
0341     if (found) {
0342       auto [found2, succeeded] = rootFile()->readCurrentEvent(cache);
0343       found = found2;
0344     }
0345     if (!found) {
0346       found = rootFile()->setEntryAtItem(id.run(), id.luminosityBlock(), 0);
0347       if (!found) {
0348         return false;
0349       }
0350       return readOneRandomWithID(cache, fileNameHash, engine, idp, recycleFiles);
0351     }
0352     fileNameHash = lfnHash();
0353     return true;
0354   }
0355 
0356   bool RootEmbeddedFileSequence::readOneEvent(EventPrincipal& cache,
0357                                               size_t& fileNameHash,
0358                                               CLHEP::HepRandomEngine* engine,
0359                                               EventID const* id,
0360                                               bool recycleFiles) {
0361     assert(!sameLumiBlock_ || id != nullptr);
0362     assert(sequential_ || engine != nullptr);
0363     return (this->*fptr_)(cache, fileNameHash, engine, id, recycleFiles);
0364   }
0365 
0366   void RootEmbeddedFileSequence::fillDescription(ParameterSetDescription& desc) {
0367     desc.addUntracked<bool>("sequential", false)
0368         ->setComment(
0369             "True: loopEvents() reads events sequentially from beginning of first file.\n"
0370             "False: loopEvents() first reads events beginning at random event. New files also chosen randomly");
0371     desc.addUntracked<bool>("sameLumiBlock", false)
0372         ->setComment(
0373             "True: loopEvents() reads events only in same lumi as the specified event.\n"
0374             "False: loopEvents() reads events regardless of lumi.");
0375     desc.addUntracked<unsigned int>("skipEvents", 0U)
0376         ->setComment(
0377             "Skip the first 'skipEvents' events. Used only if 'sequential' is True and 'sameLumiBlock' is False");
0378     desc.addUntracked<unsigned int>("maxFileSkips")
0379         ->setComment(
0380             "How many files to try if 'sequential' is False and 'skipBadFiles' is True.\n"
0381             "Defaults to 3 (or # of files if smaller).");
0382     desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
0383         ->setComment("Size of ROOT TTree prefetch cache.  Affects performance.");
0384     desc.addUntracked<bool>("enforceGUIDInFileName", false)
0385         ->setComment(
0386             "True:  file name part is required to be equal to the GUID of the file\n"
0387             "False: file name can be anything");
0388   }
0389 }  // namespace edm