Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:19:07

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "DuplicateChecker.h"
0004 #include "InputFile.h"
0005 #include "PoolSource.h"
0006 #include "RootFile.h"
0007 #include "RootPrimaryFileSequence.h"
0008 #include "RootTree.h"
0009 
0010 #include "DataFormats/Provenance/interface/BranchID.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0013 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/Service.h"
0019 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0020 
0021 namespace edm {
0022   RootPrimaryFileSequence::RootPrimaryFileSequence(ParameterSet const& pset,
0023                                                    PoolSource& input,
0024                                                    InputFileCatalog const& catalog)
0025       : RootInputFileSequence(pset, catalog),
0026         input_(input),
0027         firstFile_(true),
0028         branchesMustMatch_(BranchDescription::Permissive),
0029         orderedProcessHistoryIDs_(),
0030         eventSkipperByID_(EventSkipperByID::create(pset).release()),
0031         initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0032         noRunLumiSort_(pset.getUntrackedParameter<bool>("noRunLumiSort")),
0033         noEventSort_(noRunLumiSort_ ? true : pset.getUntrackedParameter<bool>("noEventSort")),
0034         treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize") : 0U),
0035         duplicateChecker_(new DuplicateChecker(pset)),
0036         usingGoToEvent_(false),
0037         enablePrefetching_(false),
0038         enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName")) {
0039     if (noRunLumiSort_ && (remainingEvents() >= 0 || remainingLuminosityBlocks() >= 0)) {
0040       // There would need to be some Framework development work to allow stopping
0041       // early with noRunLumiSort set true related to closing lumis and runs that
0042       // were supposed to be continued but were not... We cannot have events written
0043       // to output with no run or lumi written to output.
0044       throw Exception(errors::Configuration,
0045                       "Illegal to configure noRunLumiSort and limit the number of events or luminosityBlocks");
0046     }
0047     // The SiteLocalConfig controls the TTreeCache size and the prefetching settings.
0048     Service<SiteLocalConfig> pSLC;
0049     if (pSLC.isAvailable()) {
0050       if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
0051         treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
0052       }
0053       enablePrefetching_ = pSLC->enablePrefetching();
0054     }
0055 
0056     std::string branchesMustMatch =
0057         pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
0058     if (branchesMustMatch == std::string("strict"))
0059       branchesMustMatch_ = BranchDescription::Strict;
0060 
0061     // Prestage the files
0062     for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0063       storage::StorageFactory::get()->stagein(fileNames()[0]);
0064     }
0065     // Open the first file.
0066     for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0067       initFile(input_.skipBadFiles());
0068       if (rootFile())
0069         break;
0070     }
0071     if (rootFile()) {
0072       input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
0073       if (initialNumberOfEventsToSkip_ != 0) {
0074         skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0075       }
0076     }
0077   }
0078 
0079   RootPrimaryFileSequence::~RootPrimaryFileSequence() {}
0080 
0081   void RootPrimaryFileSequence::endJob() { closeFile(); }
0082 
0083   std::shared_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
0084     std::shared_ptr<FileBlock> fileBlock;
0085     if (firstFile_) {
0086       firstFile_ = false;
0087       // Usually the first input file will already be open
0088       if (!rootFile()) {
0089         initFile(input_.skipBadFiles());
0090       }
0091     } else if (goToEventInNewFile_) {
0092       goToEventInNewFile_ = false;
0093       setAtFileSequenceNumber(goToFileSequenceOffset_);
0094       initFile(false);
0095       assert(rootFile());
0096       bool found = rootFile()->goToEvent(goToEventID_);
0097       assert(found);
0098     } else if (skipIntoNewFile_) {
0099       skipIntoNewFile_ = false;
0100       setAtFileSequenceNumber(skipToFileSequenceNumber_);
0101       initFile(false);
0102       assert(rootFile());
0103       if (skipToOffsetInFinalFile_ < 0) {
0104         rootFile()->setToLastEntry();
0105       }
0106       bool atEnd = rootFile()->skipEvents(skipToOffsetInFinalFile_);
0107       assert(!atEnd && skipToOffsetInFinalFile_ == 0);
0108     } else {
0109       if (!nextFile()) {
0110         // handle case with last file bad and
0111         // skipBadFiles true
0112         fb_ = fileBlock;
0113         return fileBlock;
0114       }
0115     }
0116     if (!rootFile()) {
0117       fileBlock = std::make_shared<FileBlock>();
0118       fb_ = fileBlock;
0119       return fileBlock;
0120     }
0121     fileBlock = rootFile()->createFileBlock();
0122     fb_ = fileBlock;
0123     return fileBlock;
0124   }
0125 
0126   void RootPrimaryFileSequence::closeFile_() {
0127     // close the currently open file, if any, and delete the RootFile object.
0128     if (rootFile()) {
0129       auto sentry = std::make_unique<InputSource::FileCloseSentry>(input_, lfn());
0130       rootFile()->close();
0131       if (duplicateChecker_)
0132         duplicateChecker_->inputFileClosed();
0133       rootFile().reset();
0134     }
0135   }
0136 
0137   void RootPrimaryFileSequence::initFile_(bool skipBadFiles) {
0138     // If we are not duplicate checking across files and we are not using random access to find events,
0139     // then we can delete the IndexIntoFile for the file we are closing.
0140     // If we can't delete all of it, then we can delete the parts we do not need.
0141     bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() &&
0142                                                      !duplicateChecker_->checkDisabled());
0143     initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
0144   }
0145 
0146   RootPrimaryFileSequence::RootFileSharedPtr RootPrimaryFileSequence::makeRootFile(std::shared_ptr<InputFile> filePtr) {
0147     size_t currentIndexIntoFile = sequenceNumberOfFile();
0148     return std::make_shared<RootFile>(fileNames()[0],
0149                                       input_.processConfiguration(),
0150                                       logicalFileName(),
0151                                       filePtr,
0152                                       eventSkipperByID(),
0153                                       initialNumberOfEventsToSkip_ != 0,
0154                                       remainingEvents(),
0155                                       remainingLuminosityBlocks(),
0156                                       input_.nStreams(),
0157                                       treeCacheSize_,
0158                                       input_.treeMaxVirtualSize(),
0159                                       input_.processingMode(),
0160                                       input_.runHelper(),
0161                                       noRunLumiSort_,
0162                                       noEventSort_,
0163                                       input_.productSelectorRules(),
0164                                       InputType::Primary,
0165                                       input_.branchIDListHelper(),
0166                                       input_.processBlockHelper().get(),
0167                                       input_.thinnedAssociationsHelper(),
0168                                       nullptr,  // associationsFromSecondary
0169                                       duplicateChecker(),
0170                                       input_.dropDescendants(),
0171                                       input_.processHistoryRegistryForUpdate(),
0172                                       indexesIntoFiles(),
0173                                       currentIndexIntoFile,
0174                                       orderedProcessHistoryIDs_,
0175                                       input_.bypassVersionCheck(),
0176                                       input_.labelRawDataLikeMC(),
0177                                       usingGoToEvent_,
0178                                       enablePrefetching_,
0179                                       enforceGUIDInFileName_);
0180   }
0181 
0182   bool RootPrimaryFileSequence::nextFile() {
0183     do {
0184       if (!noMoreFiles())
0185         setAtNextFile();
0186       if (noMoreFiles()) {
0187         return false;
0188       }
0189 
0190       initFile(input_.skipBadFiles());
0191       if (rootFile()) {
0192         break;
0193       }
0194       // If we are not skipping bad files and the file
0195       // open failed, then initFile should have thrown
0196       assert(input_.skipBadFiles());
0197     } while (true);
0198 
0199     // make sure the new product registry is compatible with the main one
0200     std::string mergeInfo =
0201         input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0202     if (!mergeInfo.empty()) {
0203       throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::nextFile()") << mergeInfo;
0204     }
0205     return true;
0206   }
0207 
0208   bool RootPrimaryFileSequence::previousFile() {
0209     if (atFirstFile()) {
0210       return false;
0211     }
0212     setAtPreviousFile();
0213 
0214     initFile(false);
0215 
0216     if (rootFile()) {
0217       // make sure the new product registry is compatible to the main one
0218       std::string mergeInfo =
0219           input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0220       if (!mergeInfo.empty()) {
0221         throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::previousEvent()") << mergeInfo;
0222       }
0223     }
0224     if (rootFile())
0225       rootFile()->setToLastEntry();
0226     return true;
0227   }
0228 
0229   InputSource::ItemTypeInfo RootPrimaryFileSequence::getNextItemType(RunNumber_t& run,
0230                                                                      LuminosityBlockNumber_t& lumi,
0231                                                                      EventNumber_t& event) {
0232     if (noMoreFiles() || skipToStop_) {
0233       skipToStop_ = false;
0234       return InputSource::ItemType::IsStop;
0235     }
0236     if (firstFile_ || goToEventInNewFile_ || skipIntoNewFile_) {
0237       return InputSource::ItemType::IsFile;
0238     }
0239     if (rootFile()) {
0240       IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
0241       if (entryType == IndexIntoFile::kEvent) {
0242         return InputSource::ItemType::IsEvent;
0243       } else if (entryType == IndexIntoFile::kLumi) {
0244         return InputSource::ItemType::IsLumi;
0245       } else if (entryType == IndexIntoFile::kRun) {
0246         return InputSource::ItemType::IsRun;
0247       }
0248       assert(entryType == IndexIntoFile::kEnd);
0249     }
0250     if (atLastFile()) {
0251       return InputSource::ItemType::IsStop;
0252     }
0253     return InputSource::ItemType::IsFile;
0254   }
0255 
0256   // Rewind to before the first event that was read.
0257   void RootPrimaryFileSequence::rewind_() {
0258     if (!atFirstFile()) {
0259       closeFile();
0260       setAtFirstFile();
0261     }
0262     if (!rootFile()) {
0263       initFile(false);
0264     }
0265     rewindFile();
0266     firstFile_ = true;
0267     goToEventInNewFile_ = false;
0268     skipIntoNewFile_ = false;
0269     skipToStop_ = false;
0270     if (rootFile()) {
0271       if (initialNumberOfEventsToSkip_ != 0) {
0272         skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0273       }
0274     }
0275   }
0276 
0277   // Rewind to the beginning of the current file
0278   void RootPrimaryFileSequence::rewindFile() {
0279     if (rootFile())
0280       rootFile()->rewind();
0281   }
0282 
0283   // Advance "offset" events.  Offset will be positive.
0284   void RootPrimaryFileSequence::skipEventsAtBeginning(int offset) {
0285     assert(rootFile());
0286     assert(offset >= 0);
0287     while (offset != 0) {
0288       bool atEnd = rootFile()->skipEvents(offset);
0289       if ((offset > 0 || atEnd) && !nextFile()) {
0290         return;
0291       }
0292     }
0293   }
0294 
0295   // Advance "offset" events.  Offset can be positive or negative (or zero).
0296   void RootPrimaryFileSequence::skipEvents(int offset) {
0297     assert(rootFile());
0298 
0299     bool atEnd = rootFile()->skipEvents(offset);
0300     if (!atEnd && offset == 0) {
0301       // successfully completed skip within current file
0302       return;
0303     }
0304 
0305     // Return, if without closing the current file we know the skip cannot be completed
0306     skipToStop_ = false;
0307     if (offset > 0 || atEnd) {
0308       if (atLastFile() || noMoreFiles()) {
0309         skipToStop_ = true;
0310         return;
0311       }
0312     }
0313     if (offset < 0 && atFirstFile()) {
0314       skipToStop_ = true;
0315       return;
0316     }
0317 
0318     // Save the current file and position so that we can restore them
0319     size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0320     IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0321 
0322     if ((offset > 0 || atEnd) && !nextFile()) {
0323       skipToStop_ = true;  // Can only get here if skipBadFiles is true
0324     }
0325     if (offset < 0 && !previousFile()) {
0326       skipToStop_ = true;  // Can't actually get here
0327     }
0328 
0329     if (!skipToStop_) {
0330       while (offset != 0) {
0331         skipToOffsetInFinalFile_ = offset;
0332         bool atEnd = rootFile()->skipEvents(offset);
0333         if ((offset > 0 || atEnd) && !nextFile()) {
0334           skipToStop_ = true;
0335           break;
0336         }
0337         if (offset < 0 && !previousFile()) {
0338           skipToStop_ = true;
0339           break;
0340         }
0341       }
0342       if (!skipToStop_) {
0343         skipIntoNewFile_ = true;
0344       }
0345     }
0346     skipToFileSequenceNumber_ = sequenceNumberOfFile();
0347 
0348     // Restore the original file and position
0349     setAtFileSequenceNumber(originalFileSequenceNumber);
0350     initFile(false);
0351     assert(rootFile());
0352     rootFile()->setPosition(originalPosition);
0353     rootFile()->updateFileBlock(*fb_);
0354   }
0355 
0356   bool RootPrimaryFileSequence::goToEvent(EventID const& eventID) {
0357     usingGoToEvent_ = true;
0358     if (rootFile()) {
0359       if (rootFile()->goToEvent(eventID)) {
0360         return true;
0361       }
0362       // If only one input file, give up now, to save time.
0363       if (rootFile() && indexesIntoFiles().size() == 1) {
0364         return false;
0365       }
0366       // Look for item (run/lumi/event) in files previously opened without reopening unnecessary files.
0367       for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0368         if (*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0369           goToEventInNewFile_ = true;
0370           goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0371           goToEventID_ = eventID;
0372           return true;
0373         }
0374       }
0375 
0376       // Save the current file and position so that we can restore them
0377       bool closedOriginalFile = false;
0378       size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0379       IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0380 
0381       // Look for item in files not yet opened.
0382       bool foundIt = false;
0383       for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0384         if (!*it) {
0385           setAtFileSequenceNumber(it - indexesIntoFiles().begin());
0386           initFile(false);
0387           assert(rootFile());
0388           closedOriginalFile = true;
0389           if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0390             foundIt = true;
0391             goToEventInNewFile_ = true;
0392             goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0393             goToEventID_ = eventID;
0394           }
0395         }
0396       }
0397       if (closedOriginalFile) {
0398         setAtFileSequenceNumber(originalFileSequenceNumber);
0399         initFile(false);
0400         assert(rootFile());
0401         rootFile()->setPosition(originalPosition);
0402         rootFile()->updateFileBlock(*fb_);
0403       }
0404       return foundIt;
0405     }
0406     return false;
0407   }
0408 
0409   int RootPrimaryFileSequence::remainingEvents() const { return input_.remainingEvents(); }
0410 
0411   int RootPrimaryFileSequence::remainingLuminosityBlocks() const { return input_.remainingLuminosityBlocks(); }
0412 
0413   void RootPrimaryFileSequence::fillDescription(ParameterSetDescription& desc) {
0414     desc.addUntracked<unsigned int>("skipEvents", 0U)
0415         ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0416     desc.addUntracked<bool>("noEventSort", true)
0417         ->setComment(
0418             "True:  Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
0419             "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note "
0420             "3).\n"
0421             "Note 1: Events within the same lumi will always be processed contiguously.\n"
0422             "Note 2: Lumis within the same run will always be processed contiguously.\n"
0423             "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
0424     desc.addUntracked<bool>("noRunLumiSort", false)
0425         ->setComment(
0426             "True:  Process runs, lumis and events in the order they appear in the file.\n"
0427             "False: Follow settings based on 'noEventSort' setting.");
0428     desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
0429         ->setComment("Size of ROOT TTree prefetch cache.  Affects performance.");
0430     std::string defaultString("permissive");
0431     desc.addUntracked<std::string>("branchesMustMatch", defaultString)
0432         ->setComment(
0433             "'strict':     Branches in each input file must match those in the first file.\n"
0434             "'permissive': Branches in each input file may be any subset of those in the first file.");
0435     desc.addUntracked<bool>("enforceGUIDInFileName", false)
0436         ->setComment(
0437             "True:  file name part is required to be equal to the GUID of the file\n"
0438             "False: file name can be anything");
0439 
0440     EventSkipperByID::fillDescription(desc);
0441     DuplicateChecker::fillDescription(desc);
0442   }
0443 
0444   ProcessingController::ForwardState RootPrimaryFileSequence::forwardState() const {
0445     if (rootFile()) {
0446       if (!rootFile()->wasLastEventJustRead()) {
0447         return ProcessingController::kEventsAheadInFile;
0448       }
0449       if (noMoreFiles() || atLastFile()) {
0450         return ProcessingController::kAtLastEvent;
0451       } else {
0452         return ProcessingController::kNextFileExists;
0453       }
0454     }
0455     return ProcessingController::kUnknownForward;
0456   }
0457 
0458   ProcessingController::ReverseState RootPrimaryFileSequence::reverseState() const {
0459     if (rootFile()) {
0460       if (!rootFile()->wasFirstEventJustRead()) {
0461         return ProcessingController::kEventsBackwardsInFile;
0462       }
0463       if (!atFirstFile()) {
0464         return ProcessingController::kPreviousFileExists;
0465       }
0466       return ProcessingController::kAtFirstEvent;
0467     }
0468     return ProcessingController::kUnknownReverse;
0469   }
0470 
0471 }  // namespace edm