Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-23 02:05:08

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "DuplicateChecker.h"
0004 #include "InputFile.h"
0005 #include "PoolSource.h"
0006 #include "RootFile.h"
0007 #include "RootPrimaryFileSequence.h"
0008 #include "RootTree.h"
0009 
0010 #include "DataFormats/Provenance/interface/BranchID.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0013 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/Service.h"
0019 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0020 
0021 namespace edm {
0022   RootPrimaryFileSequence::RootPrimaryFileSequence(ParameterSet const& pset,
0023                                                    PoolSource& input,
0024                                                    InputFileCatalog const& catalog)
0025       : RootInputFileSequence(pset, catalog),
0026         input_(input),
0027         firstFile_(true),
0028         branchesMustMatch_(ProductDescription::Permissive),
0029         orderedProcessHistoryIDs_(),
0030         eventSkipperByID_(EventSkipperByID::create(pset).release()),
0031         initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0032         noRunLumiSort_(pset.getUntrackedParameter<bool>("noRunLumiSort")),
0033         noEventSort_(noRunLumiSort_ ? true : pset.getUntrackedParameter<bool>("noEventSort")),
0034         treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize") : 0U),
0035         duplicateChecker_(new DuplicateChecker(pset)),
0036         usingGoToEvent_(false),
0037         enablePrefetching_(false),
0038         enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName")) {
0039     if (noRunLumiSort_ && (remainingEvents() >= 0 || remainingLuminosityBlocks() >= 0)) {
0040       // There would need to be some Framework development work to allow stopping
0041       // early with noRunLumiSort set true related to closing lumis and runs that
0042       // were supposed to be continued but were not... We cannot have events written
0043       // to output with no run or lumi written to output.
0044       throw Exception(errors::Configuration,
0045                       "Illegal to configure noRunLumiSort and limit the number of events or luminosityBlocks");
0046     }
0047     // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
0048     Service<SiteLocalConfig> pSLC;
0049     if (pSLC.isAvailable()) {
0050       if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
0051         treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
0052       }
0053       enablePrefetching_ = pSLC->enablePrefetching();
0054     }
0055 
0056     std::string branchesMustMatch =
0057         pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
0058     if (branchesMustMatch == std::string("strict"))
0059       branchesMustMatch_ = ProductDescription::Strict;
0060 
0061     // Prestage the files
0062     for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0063       storage::StorageFactory::get()->stagein(fileNames()[0]);
0064     }
0065     // Open the first file.
0066     for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0067       initFile(input_.skipBadFiles());
0068       if (rootFile())
0069         break;
0070     }
0071     if (rootFile()) {
0072       input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
0073       if (initialNumberOfEventsToSkip_ != 0) {
0074         skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0075       }
0076     }
0077   }
0078 
0079   RootPrimaryFileSequence::~RootPrimaryFileSequence() {}
0080 
0081   void RootPrimaryFileSequence::endJob() { closeFile(); }
0082 
0083   std::shared_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
0084     std::shared_ptr<FileBlock> fileBlock;
0085     if (firstFile_) {
0086       firstFile_ = false;
0087       // Usually the first input file will already be open
0088       if (!rootFile()) {
0089         initFile(input_.skipBadFiles());
0090       }
0091     } else if (goToEventInNewFile_) {
0092       goToEventInNewFile_ = false;
0093       setAtFileSequenceNumber(goToFileSequenceOffset_);
0094       initFile(false);
0095       assert(rootFile());
0096       bool found = rootFile()->goToEvent(goToEventID_);
0097       assert(found);
0098     } else if (skipIntoNewFile_) {
0099       skipIntoNewFile_ = false;
0100       setAtFileSequenceNumber(skipToFileSequenceNumber_);
0101       initFile(false);
0102       assert(rootFile());
0103       if (skipToOffsetInFinalFile_ < 0) {
0104         rootFile()->setToLastEntry();
0105       }
0106       bool atEnd = rootFile()->skipEvents(skipToOffsetInFinalFile_);
0107       assert(!atEnd && skipToOffsetInFinalFile_ == 0);
0108     } else {
0109       if (!nextFile()) {
0110         // handle case with last file bad and
0111         // skipBadFiles true
0112         fb_ = fileBlock;
0113         return fileBlock;
0114       }
0115     }
0116     if (!rootFile()) {
0117       fileBlock = std::make_shared<FileBlock>();
0118       fb_ = fileBlock;
0119       return fileBlock;
0120     }
0121     fileBlock = rootFile()->createFileBlock();
0122     fb_ = fileBlock;
0123     return fileBlock;
0124   }
0125 
0126   void RootPrimaryFileSequence::closeFile_() {
0127     // close the currently open file, if any, and delete the RootFile object.
0128     if (rootFile()) {
0129       auto sentry = std::make_unique<InputSource::FileCloseSentry>(input_, lfn());
0130       rootFile()->close();
0131       if (duplicateChecker_)
0132         duplicateChecker_->inputFileClosed();
0133       rootFile().reset();
0134     }
0135   }
0136 
0137   void RootPrimaryFileSequence::initFile_(bool skipBadFiles) {
0138     // If we are not duplicate checking across files and we are not using random access to find events,
0139     // then we can delete the IndexIntoFile for the file we are closing.
0140     // If we can't delete all of it, then we can delete the parts we do not need.
0141     bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() &&
0142                                                      !duplicateChecker_->checkDisabled());
0143     initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
0144   }
0145 
0146   RootPrimaryFileSequence::RootFileSharedPtr RootPrimaryFileSequence::makeRootFile(std::shared_ptr<InputFile> filePtr) {
0147     size_t currentIndexIntoFile = sequenceNumberOfFile();
0148     return std::make_shared<RootFile>(
0149         RootFile::FileOptions{.fileName = fileNames()[0],
0150                               .logicalFileName = logicalFileName(),
0151                               .filePtr = filePtr,
0152                               .bypassVersionCheck = input_.bypassVersionCheck(),
0153                               .enforceGUIDInFileName = enforceGUIDInFileName_},
0154         InputType::Primary,
0155         RootFile::ProcessingOptions{.eventSkipperByID = eventSkipperByID(),
0156                                     .skipAnyEvents = initialNumberOfEventsToSkip_ != 0,
0157                                     .remainingEvents = remainingEvents(),
0158                                     .remainingLumis = remainingLuminosityBlocks(),
0159                                     .processingMode = input_.processingMode(),
0160                                     .noRunLumiSort = noRunLumiSort_,
0161                                     .noEventSort = noEventSort_,
0162                                     .usingGoToEvent = usingGoToEvent_},
0163         RootFile::TTreeOptions{.treeCacheSize = treeCacheSize_,
0164                                .treeMaxVirtualSize = input_.treeMaxVirtualSize(),
0165                                .enablePrefetching = enablePrefetching_,
0166                                .promptReading = not input_.delayReadingEventProducts()},
0167         RootFile::ProductChoices{.productSelectorRules = input_.productSelectorRules(),
0168                                  .associationsFromSecondary = nullptr,  // associationsFromSecondary
0169                                  .dropDescendantsOfDroppedProducts = input_.dropDescendants(),
0170                                  .labelRawDataLikeMC = input_.labelRawDataLikeMC()},
0171         RootFile::CrossFileInfo{.runHelper = input_.runHelper(),
0172                                 .branchIDListHelper = input_.branchIDListHelper(),
0173                                 .processBlockHelper = input_.processBlockHelper().get(),
0174                                 .thinnedAssociationsHelper = input_.thinnedAssociationsHelper(),
0175                                 .duplicateChecker = duplicateChecker(),
0176                                 .indexesIntoFiles = indexesIntoFiles(),
0177                                 .currentIndexIntoFile = currentIndexIntoFile},
0178         input_.nStreams(),
0179         input_.processHistoryRegistryForUpdate(),
0180         orderedProcessHistoryIDs_);
0181   }
0182 
0183   bool RootPrimaryFileSequence::nextFile() {
0184     do {
0185       if (!noMoreFiles())
0186         setAtNextFile();
0187       if (noMoreFiles()) {
0188         return false;
0189       }
0190 
0191       initFile(input_.skipBadFiles());
0192       if (rootFile()) {
0193         break;
0194       }
0195       // If we are not skipping bad files and the file
0196       // open failed, then initFile should have thrown
0197       assert(input_.skipBadFiles());
0198     } while (true);
0199 
0200     // make sure the new product registry is compatible with the main one
0201     std::string mergeInfo =
0202         input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0203     if (!mergeInfo.empty()) {
0204       throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::nextFile()") << mergeInfo;
0205     }
0206     return true;
0207   }
0208 
0209   bool RootPrimaryFileSequence::previousFile() {
0210     if (atFirstFile()) {
0211       return false;
0212     }
0213     setAtPreviousFile();
0214 
0215     initFile(false);
0216 
0217     if (rootFile()) {
0218       // make sure the new product registry is compatible to the main one
0219       std::string mergeInfo =
0220           input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0221       if (!mergeInfo.empty()) {
0222         throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::previousEvent()") << mergeInfo;
0223       }
0224     }
0225     if (rootFile())
0226       rootFile()->setToLastEntry();
0227     return true;
0228   }
0229 
0230   InputSource::ItemTypeInfo RootPrimaryFileSequence::getNextItemType(RunNumber_t& run,
0231                                                                      LuminosityBlockNumber_t& lumi,
0232                                                                      EventNumber_t& event) {
0233     if (noMoreFiles() || skipToStop_) {
0234       skipToStop_ = false;
0235       return InputSource::ItemType::IsStop;
0236     }
0237     if (firstFile_ || goToEventInNewFile_ || skipIntoNewFile_) {
0238       return InputSource::ItemType::IsFile;
0239     }
0240     if (rootFile()) {
0241       IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
0242       if (entryType == IndexIntoFile::kEvent) {
0243         return InputSource::ItemType::IsEvent;
0244       } else if (entryType == IndexIntoFile::kLumi) {
0245         return InputSource::ItemType::IsLumi;
0246       } else if (entryType == IndexIntoFile::kRun) {
0247         return InputSource::ItemType::IsRun;
0248       }
0249       assert(entryType == IndexIntoFile::kEnd);
0250     }
0251     if (atLastFile()) {
0252       return InputSource::ItemType::IsStop;
0253     }
0254     return InputSource::ItemType::IsFile;
0255   }
0256 
0257   // Rewind to before the first event that was read.
0258   void RootPrimaryFileSequence::rewind_() {
0259     if (!atFirstFile()) {
0260       closeFile();
0261       setAtFirstFile();
0262     }
0263     if (!rootFile()) {
0264       initFile(false);
0265     }
0266     rewindFile();
0267     firstFile_ = true;
0268     goToEventInNewFile_ = false;
0269     skipIntoNewFile_ = false;
0270     skipToStop_ = false;
0271     if (rootFile()) {
0272       if (initialNumberOfEventsToSkip_ != 0) {
0273         skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0274       }
0275     }
0276   }
0277 
0278   // Rewind to the beginning of the current file
0279   void RootPrimaryFileSequence::rewindFile() {
0280     if (rootFile())
0281       rootFile()->rewind();
0282   }
0283 
0284   // Advance "offset" events.  Offset will be positive.
0285   void RootPrimaryFileSequence::skipEventsAtBeginning(int offset) {
0286     assert(rootFile());
0287     assert(offset >= 0);
0288     while (offset != 0) {
0289       bool atEnd = rootFile()->skipEvents(offset);
0290       if ((offset > 0 || atEnd) && !nextFile()) {
0291         return;
0292       }
0293     }
0294   }
0295 
0296   // Advance "offset" events.  Offset can be positive or negative (or zero).
0297   void RootPrimaryFileSequence::skipEvents(int offset) {
0298     assert(rootFile());
0299 
0300     bool atEnd = rootFile()->skipEvents(offset);
0301     if (!atEnd && offset == 0) {
0302       // successfully completed skip within current file
0303       return;
0304     }
0305 
0306     // Return, if without closing the current file we know the skip cannot be completed
0307     skipToStop_ = false;
0308     if (offset > 0 || atEnd) {
0309       if (atLastFile() || noMoreFiles()) {
0310         skipToStop_ = true;
0311         return;
0312       }
0313     }
0314     if (offset < 0 && atFirstFile()) {
0315       skipToStop_ = true;
0316       return;
0317     }
0318 
0319     // Save the current file and position so that we can restore them
0320     size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0321     IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0322 
0323     if ((offset > 0 || atEnd) && !nextFile()) {
0324       skipToStop_ = true;  // Can only get here if skipBadFiles is true
0325     }
0326     if (offset < 0 && !previousFile()) {
0327       skipToStop_ = true;  // Can't actually get here
0328     }
0329 
0330     if (!skipToStop_) {
0331       while (offset != 0) {
0332         skipToOffsetInFinalFile_ = offset;
0333         bool atEnd = rootFile()->skipEvents(offset);
0334         if ((offset > 0 || atEnd) && !nextFile()) {
0335           skipToStop_ = true;
0336           break;
0337         }
0338         if (offset < 0 && !previousFile()) {
0339           skipToStop_ = true;
0340           break;
0341         }
0342       }
0343       if (!skipToStop_) {
0344         skipIntoNewFile_ = true;
0345       }
0346     }
0347     skipToFileSequenceNumber_ = sequenceNumberOfFile();
0348 
0349     // Restore the original file and position
0350     setAtFileSequenceNumber(originalFileSequenceNumber);
0351     initFile(false);
0352     assert(rootFile());
0353     rootFile()->setPosition(originalPosition);
0354     rootFile()->updateFileBlock(*fb_);
0355   }
0356 
0357   bool RootPrimaryFileSequence::goToEvent(EventID const& eventID) {
0358     usingGoToEvent_ = true;
0359     if (rootFile()) {
0360       if (rootFile()->goToEvent(eventID)) {
0361         return true;
0362       }
0363       // If only one input file, give up now, to save time.
0364       if (rootFile() && indexesIntoFiles().size() == 1) {
0365         return false;
0366       }
0367       // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
0368       for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0369         if (*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0370           goToEventInNewFile_ = true;
0371           goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0372           goToEventID_ = eventID;
0373           return true;
0374         }
0375       }
0376 
0377       // Save the current file and position so that we can restore them
0378       bool closedOriginalFile = false;
0379       size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0380       IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0381 
0382       // Look for item in files not yet opened.
0383       bool foundIt = false;
0384       for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0385         if (!*it) {
0386           setAtFileSequenceNumber(it - indexesIntoFiles().begin());
0387           initFile(false);
0388           assert(rootFile());
0389           closedOriginalFile = true;
0390           if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0391             foundIt = true;
0392             goToEventInNewFile_ = true;
0393             goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0394             goToEventID_ = eventID;
0395           }
0396         }
0397       }
0398       if (closedOriginalFile) {
0399         setAtFileSequenceNumber(originalFileSequenceNumber);
0400         initFile(false);
0401         assert(rootFile());
0402         rootFile()->setPosition(originalPosition);
0403         rootFile()->updateFileBlock(*fb_);
0404       }
0405       return foundIt;
0406     }
0407     return false;
0408   }
0409 
0410   int RootPrimaryFileSequence::remainingEvents() const { return input_.remainingEvents(); }
0411 
0412   int RootPrimaryFileSequence::remainingLuminosityBlocks() const { return input_.remainingLuminosityBlocks(); }
0413 
0414   void RootPrimaryFileSequence::fillDescription(ParameterSetDescription& desc) {
0415     desc.addUntracked<unsigned int>("skipEvents", 0U)
0416         ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0417     desc.addUntracked<bool>("noEventSort", true)
0418         ->setComment(
0419             "True:  Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
0420             "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note "
0421             "3).\n"
0422             "Note 1: Events within the same lumi will always be processed contiguously.\n"
0423             "Note 2: Lumis within the same run will always be processed contiguously.\n"
0424             "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
0425     desc.addUntracked<bool>("noRunLumiSort", false)
0426         ->setComment(
0427             "True:  Process runs, lumis and events in the order they appear in the file.\n"
0428             "False: Follow settings based on 'noEventSort' setting.");
0429     desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
0430         ->setComment("Size of ROOT TTree prefetch cache.  Affects performance.");
0431     std::string defaultString("permissive");
0432     desc.addUntracked<std::string>("branchesMustMatch", defaultString)
0433         ->setComment(
0434             "'strict':     Branches in each input file must match those in the first file.\n"
0435             "'permissive': Branches in each input file may be any subset of those in the first file.");
0436     desc.addUntracked<bool>("enforceGUIDInFileName", false)
0437         ->setComment(
0438             "True:  file name part is required to be equal to the GUID of the file\n"
0439             "False: file name can be anything");
0440 
0441     EventSkipperByID::fillDescription(desc);
0442     DuplicateChecker::fillDescription(desc);
0443   }
0444 
0445   ProcessingController::ForwardState RootPrimaryFileSequence::forwardState() const {
0446     if (rootFile()) {
0447       if (!rootFile()->wasLastEventJustRead()) {
0448         return ProcessingController::kEventsAheadInFile;
0449       }
0450       if (noMoreFiles() || atLastFile()) {
0451         return ProcessingController::kAtLastEvent;
0452       } else {
0453         return ProcessingController::kNextFileExists;
0454       }
0455     }
0456     return ProcessingController::kUnknownForward;
0457   }
0458 
0459   ProcessingController::ReverseState RootPrimaryFileSequence::reverseState() const {
0460     if (rootFile()) {
0461       if (!rootFile()->wasFirstEventJustRead()) {
0462         return ProcessingController::kEventsBackwardsInFile;
0463       }
0464       if (!atFirstFile()) {
0465         return ProcessingController::kPreviousFileExists;
0466       }
0467       return ProcessingController::kAtFirstEvent;
0468     }
0469     return ProcessingController::kUnknownReverse;
0470   }
0471 
0472 }  // namespace edm