File indexing completed on 2025-05-23 02:05:08
0001
0002
0003 #include "DuplicateChecker.h"
0004 #include "InputFile.h"
0005 #include "PoolSource.h"
0006 #include "RootFile.h"
0007 #include "RootPrimaryFileSequence.h"
0008 #include "RootTree.h"
0009
0010 #include "DataFormats/Provenance/interface/BranchID.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0013 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/Service.h"
0019 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0020
0021 namespace edm {
0022 RootPrimaryFileSequence::RootPrimaryFileSequence(ParameterSet const& pset,
0023 PoolSource& input,
0024 InputFileCatalog const& catalog)
0025 : RootInputFileSequence(pset, catalog),
0026 input_(input),
0027 firstFile_(true),
0028 branchesMustMatch_(ProductDescription::Permissive),
0029 orderedProcessHistoryIDs_(),
0030 eventSkipperByID_(EventSkipperByID::create(pset).release()),
0031 initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0032 noRunLumiSort_(pset.getUntrackedParameter<bool>("noRunLumiSort")),
0033 noEventSort_(noRunLumiSort_ ? true : pset.getUntrackedParameter<bool>("noEventSort")),
0034 treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize") : 0U),
0035 duplicateChecker_(new DuplicateChecker(pset)),
0036 usingGoToEvent_(false),
0037 enablePrefetching_(false),
0038 enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName")) {
0039 if (noRunLumiSort_ && (remainingEvents() >= 0 || remainingLuminosityBlocks() >= 0)) {
0040
0041
0042
0043
0044 throw Exception(errors::Configuration,
0045 "Illegal to configure noRunLumiSort and limit the number of events or luminosityBlocks");
0046 }
0047
0048 Service<SiteLocalConfig> pSLC;
0049 if (pSLC.isAvailable()) {
0050 if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
0051 treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
0052 }
0053 enablePrefetching_ = pSLC->enablePrefetching();
0054 }
0055
0056 std::string branchesMustMatch =
0057 pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
0058 if (branchesMustMatch == std::string("strict"))
0059 branchesMustMatch_ = ProductDescription::Strict;
0060
0061
0062 for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0063 storage::StorageFactory::get()->stagein(fileNames()[0]);
0064 }
0065
0066 for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0067 initFile(input_.skipBadFiles());
0068 if (rootFile())
0069 break;
0070 }
0071 if (rootFile()) {
0072 input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
0073 if (initialNumberOfEventsToSkip_ != 0) {
0074 skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0075 }
0076 }
0077 }
0078
0079 RootPrimaryFileSequence::~RootPrimaryFileSequence() {}
0080
0081 void RootPrimaryFileSequence::endJob() { closeFile(); }
0082
0083 std::shared_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
0084 std::shared_ptr<FileBlock> fileBlock;
0085 if (firstFile_) {
0086 firstFile_ = false;
0087
0088 if (!rootFile()) {
0089 initFile(input_.skipBadFiles());
0090 }
0091 } else if (goToEventInNewFile_) {
0092 goToEventInNewFile_ = false;
0093 setAtFileSequenceNumber(goToFileSequenceOffset_);
0094 initFile(false);
0095 assert(rootFile());
0096 bool found = rootFile()->goToEvent(goToEventID_);
0097 assert(found);
0098 } else if (skipIntoNewFile_) {
0099 skipIntoNewFile_ = false;
0100 setAtFileSequenceNumber(skipToFileSequenceNumber_);
0101 initFile(false);
0102 assert(rootFile());
0103 if (skipToOffsetInFinalFile_ < 0) {
0104 rootFile()->setToLastEntry();
0105 }
0106 bool atEnd = rootFile()->skipEvents(skipToOffsetInFinalFile_);
0107 assert(!atEnd && skipToOffsetInFinalFile_ == 0);
0108 } else {
0109 if (!nextFile()) {
0110
0111
0112 fb_ = fileBlock;
0113 return fileBlock;
0114 }
0115 }
0116 if (!rootFile()) {
0117 fileBlock = std::make_shared<FileBlock>();
0118 fb_ = fileBlock;
0119 return fileBlock;
0120 }
0121 fileBlock = rootFile()->createFileBlock();
0122 fb_ = fileBlock;
0123 return fileBlock;
0124 }
0125
0126 void RootPrimaryFileSequence::closeFile_() {
0127
0128 if (rootFile()) {
0129 auto sentry = std::make_unique<InputSource::FileCloseSentry>(input_, lfn());
0130 rootFile()->close();
0131 if (duplicateChecker_)
0132 duplicateChecker_->inputFileClosed();
0133 rootFile().reset();
0134 }
0135 }
0136
0137 void RootPrimaryFileSequence::initFile_(bool skipBadFiles) {
0138
0139
0140
0141 bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() &&
0142 !duplicateChecker_->checkDisabled());
0143 initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
0144 }
0145
0146 RootPrimaryFileSequence::RootFileSharedPtr RootPrimaryFileSequence::makeRootFile(std::shared_ptr<InputFile> filePtr) {
0147 size_t currentIndexIntoFile = sequenceNumberOfFile();
0148 return std::make_shared<RootFile>(
0149 RootFile::FileOptions{.fileName = fileNames()[0],
0150 .logicalFileName = logicalFileName(),
0151 .filePtr = filePtr,
0152 .bypassVersionCheck = input_.bypassVersionCheck(),
0153 .enforceGUIDInFileName = enforceGUIDInFileName_},
0154 InputType::Primary,
0155 RootFile::ProcessingOptions{.eventSkipperByID = eventSkipperByID(),
0156 .skipAnyEvents = initialNumberOfEventsToSkip_ != 0,
0157 .remainingEvents = remainingEvents(),
0158 .remainingLumis = remainingLuminosityBlocks(),
0159 .processingMode = input_.processingMode(),
0160 .noRunLumiSort = noRunLumiSort_,
0161 .noEventSort = noEventSort_,
0162 .usingGoToEvent = usingGoToEvent_},
0163 RootFile::TTreeOptions{.treeCacheSize = treeCacheSize_,
0164 .treeMaxVirtualSize = input_.treeMaxVirtualSize(),
0165 .enablePrefetching = enablePrefetching_,
0166 .promptReading = not input_.delayReadingEventProducts()},
0167 RootFile::ProductChoices{.productSelectorRules = input_.productSelectorRules(),
0168 .associationsFromSecondary = nullptr,
0169 .dropDescendantsOfDroppedProducts = input_.dropDescendants(),
0170 .labelRawDataLikeMC = input_.labelRawDataLikeMC()},
0171 RootFile::CrossFileInfo{.runHelper = input_.runHelper(),
0172 .branchIDListHelper = input_.branchIDListHelper(),
0173 .processBlockHelper = input_.processBlockHelper().get(),
0174 .thinnedAssociationsHelper = input_.thinnedAssociationsHelper(),
0175 .duplicateChecker = duplicateChecker(),
0176 .indexesIntoFiles = indexesIntoFiles(),
0177 .currentIndexIntoFile = currentIndexIntoFile},
0178 input_.nStreams(),
0179 input_.processHistoryRegistryForUpdate(),
0180 orderedProcessHistoryIDs_);
0181 }
0182
0183 bool RootPrimaryFileSequence::nextFile() {
0184 do {
0185 if (!noMoreFiles())
0186 setAtNextFile();
0187 if (noMoreFiles()) {
0188 return false;
0189 }
0190
0191 initFile(input_.skipBadFiles());
0192 if (rootFile()) {
0193 break;
0194 }
0195
0196
0197 assert(input_.skipBadFiles());
0198 } while (true);
0199
0200
0201 std::string mergeInfo =
0202 input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0203 if (!mergeInfo.empty()) {
0204 throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::nextFile()") << mergeInfo;
0205 }
0206 return true;
0207 }
0208
0209 bool RootPrimaryFileSequence::previousFile() {
0210 if (atFirstFile()) {
0211 return false;
0212 }
0213 setAtPreviousFile();
0214
0215 initFile(false);
0216
0217 if (rootFile()) {
0218
0219 std::string mergeInfo =
0220 input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0221 if (!mergeInfo.empty()) {
0222 throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::previousEvent()") << mergeInfo;
0223 }
0224 }
0225 if (rootFile())
0226 rootFile()->setToLastEntry();
0227 return true;
0228 }
0229
0230 InputSource::ItemTypeInfo RootPrimaryFileSequence::getNextItemType(RunNumber_t& run,
0231 LuminosityBlockNumber_t& lumi,
0232 EventNumber_t& event) {
0233 if (noMoreFiles() || skipToStop_) {
0234 skipToStop_ = false;
0235 return InputSource::ItemType::IsStop;
0236 }
0237 if (firstFile_ || goToEventInNewFile_ || skipIntoNewFile_) {
0238 return InputSource::ItemType::IsFile;
0239 }
0240 if (rootFile()) {
0241 IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
0242 if (entryType == IndexIntoFile::kEvent) {
0243 return InputSource::ItemType::IsEvent;
0244 } else if (entryType == IndexIntoFile::kLumi) {
0245 return InputSource::ItemType::IsLumi;
0246 } else if (entryType == IndexIntoFile::kRun) {
0247 return InputSource::ItemType::IsRun;
0248 }
0249 assert(entryType == IndexIntoFile::kEnd);
0250 }
0251 if (atLastFile()) {
0252 return InputSource::ItemType::IsStop;
0253 }
0254 return InputSource::ItemType::IsFile;
0255 }
0256
0257
0258 void RootPrimaryFileSequence::rewind_() {
0259 if (!atFirstFile()) {
0260 closeFile();
0261 setAtFirstFile();
0262 }
0263 if (!rootFile()) {
0264 initFile(false);
0265 }
0266 rewindFile();
0267 firstFile_ = true;
0268 goToEventInNewFile_ = false;
0269 skipIntoNewFile_ = false;
0270 skipToStop_ = false;
0271 if (rootFile()) {
0272 if (initialNumberOfEventsToSkip_ != 0) {
0273 skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0274 }
0275 }
0276 }
0277
0278
0279 void RootPrimaryFileSequence::rewindFile() {
0280 if (rootFile())
0281 rootFile()->rewind();
0282 }
0283
0284
0285 void RootPrimaryFileSequence::skipEventsAtBeginning(int offset) {
0286 assert(rootFile());
0287 assert(offset >= 0);
0288 while (offset != 0) {
0289 bool atEnd = rootFile()->skipEvents(offset);
0290 if ((offset > 0 || atEnd) && !nextFile()) {
0291 return;
0292 }
0293 }
0294 }
0295
0296
0297 void RootPrimaryFileSequence::skipEvents(int offset) {
0298 assert(rootFile());
0299
0300 bool atEnd = rootFile()->skipEvents(offset);
0301 if (!atEnd && offset == 0) {
0302
0303 return;
0304 }
0305
0306
0307 skipToStop_ = false;
0308 if (offset > 0 || atEnd) {
0309 if (atLastFile() || noMoreFiles()) {
0310 skipToStop_ = true;
0311 return;
0312 }
0313 }
0314 if (offset < 0 && atFirstFile()) {
0315 skipToStop_ = true;
0316 return;
0317 }
0318
0319
0320 size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0321 IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0322
0323 if ((offset > 0 || atEnd) && !nextFile()) {
0324 skipToStop_ = true;
0325 }
0326 if (offset < 0 && !previousFile()) {
0327 skipToStop_ = true;
0328 }
0329
0330 if (!skipToStop_) {
0331 while (offset != 0) {
0332 skipToOffsetInFinalFile_ = offset;
0333 bool atEnd = rootFile()->skipEvents(offset);
0334 if ((offset > 0 || atEnd) && !nextFile()) {
0335 skipToStop_ = true;
0336 break;
0337 }
0338 if (offset < 0 && !previousFile()) {
0339 skipToStop_ = true;
0340 break;
0341 }
0342 }
0343 if (!skipToStop_) {
0344 skipIntoNewFile_ = true;
0345 }
0346 }
0347 skipToFileSequenceNumber_ = sequenceNumberOfFile();
0348
0349
0350 setAtFileSequenceNumber(originalFileSequenceNumber);
0351 initFile(false);
0352 assert(rootFile());
0353 rootFile()->setPosition(originalPosition);
0354 rootFile()->updateFileBlock(*fb_);
0355 }
0356
0357 bool RootPrimaryFileSequence::goToEvent(EventID const& eventID) {
0358 usingGoToEvent_ = true;
0359 if (rootFile()) {
0360 if (rootFile()->goToEvent(eventID)) {
0361 return true;
0362 }
0363
0364 if (rootFile() && indexesIntoFiles().size() == 1) {
0365 return false;
0366 }
0367
0368 for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0369 if (*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0370 goToEventInNewFile_ = true;
0371 goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0372 goToEventID_ = eventID;
0373 return true;
0374 }
0375 }
0376
0377
0378 bool closedOriginalFile = false;
0379 size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0380 IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0381
0382
0383 bool foundIt = false;
0384 for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0385 if (!*it) {
0386 setAtFileSequenceNumber(it - indexesIntoFiles().begin());
0387 initFile(false);
0388 assert(rootFile());
0389 closedOriginalFile = true;
0390 if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0391 foundIt = true;
0392 goToEventInNewFile_ = true;
0393 goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0394 goToEventID_ = eventID;
0395 }
0396 }
0397 }
0398 if (closedOriginalFile) {
0399 setAtFileSequenceNumber(originalFileSequenceNumber);
0400 initFile(false);
0401 assert(rootFile());
0402 rootFile()->setPosition(originalPosition);
0403 rootFile()->updateFileBlock(*fb_);
0404 }
0405 return foundIt;
0406 }
0407 return false;
0408 }
0409
0410 int RootPrimaryFileSequence::remainingEvents() const { return input_.remainingEvents(); }
0411
0412 int RootPrimaryFileSequence::remainingLuminosityBlocks() const { return input_.remainingLuminosityBlocks(); }
0413
0414 void RootPrimaryFileSequence::fillDescription(ParameterSetDescription& desc) {
0415 desc.addUntracked<unsigned int>("skipEvents", 0U)
0416 ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0417 desc.addUntracked<bool>("noEventSort", true)
0418 ->setComment(
0419 "True: Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
0420 "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note "
0421 "3).\n"
0422 "Note 1: Events within the same lumi will always be processed contiguously.\n"
0423 "Note 2: Lumis within the same run will always be processed contiguously.\n"
0424 "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
0425 desc.addUntracked<bool>("noRunLumiSort", false)
0426 ->setComment(
0427 "True: Process runs, lumis and events in the order they appear in the file.\n"
0428 "False: Follow settings based on 'noEventSort' setting.");
0429 desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
0430 ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
0431 std::string defaultString("permissive");
0432 desc.addUntracked<std::string>("branchesMustMatch", defaultString)
0433 ->setComment(
0434 "'strict': Branches in each input file must match those in the first file.\n"
0435 "'permissive': Branches in each input file may be any subset of those in the first file.");
0436 desc.addUntracked<bool>("enforceGUIDInFileName", false)
0437 ->setComment(
0438 "True: file name part is required to be equal to the GUID of the file\n"
0439 "False: file name can be anything");
0440
0441 EventSkipperByID::fillDescription(desc);
0442 DuplicateChecker::fillDescription(desc);
0443 }
0444
0445 ProcessingController::ForwardState RootPrimaryFileSequence::forwardState() const {
0446 if (rootFile()) {
0447 if (!rootFile()->wasLastEventJustRead()) {
0448 return ProcessingController::kEventsAheadInFile;
0449 }
0450 if (noMoreFiles() || atLastFile()) {
0451 return ProcessingController::kAtLastEvent;
0452 } else {
0453 return ProcessingController::kNextFileExists;
0454 }
0455 }
0456 return ProcessingController::kUnknownForward;
0457 }
0458
0459 ProcessingController::ReverseState RootPrimaryFileSequence::reverseState() const {
0460 if (rootFile()) {
0461 if (!rootFile()->wasFirstEventJustRead()) {
0462 return ProcessingController::kEventsBackwardsInFile;
0463 }
0464 if (!atFirstFile()) {
0465 return ProcessingController::kPreviousFileExists;
0466 }
0467 return ProcessingController::kAtFirstEvent;
0468 }
0469 return ProcessingController::kUnknownReverse;
0470 }
0471
0472 }