File indexing completed on 2024-04-06 12:19:07
0001
0002
0003 #include "DuplicateChecker.h"
0004 #include "InputFile.h"
0005 #include "PoolSource.h"
0006 #include "RootFile.h"
0007 #include "RootPrimaryFileSequence.h"
0008 #include "RootTree.h"
0009
0010 #include "DataFormats/Provenance/interface/BranchID.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0013 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0016 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0017 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0018 #include "FWCore/ServiceRegistry/interface/Service.h"
0019 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0020
0021 namespace edm {
0022 RootPrimaryFileSequence::RootPrimaryFileSequence(ParameterSet const& pset,
0023 PoolSource& input,
0024 InputFileCatalog const& catalog)
0025 : RootInputFileSequence(pset, catalog),
0026 input_(input),
0027 firstFile_(true),
0028 branchesMustMatch_(BranchDescription::Permissive),
0029 orderedProcessHistoryIDs_(),
0030 eventSkipperByID_(EventSkipperByID::create(pset).release()),
0031 initialNumberOfEventsToSkip_(pset.getUntrackedParameter<unsigned int>("skipEvents")),
0032 noRunLumiSort_(pset.getUntrackedParameter<bool>("noRunLumiSort")),
0033 noEventSort_(noRunLumiSort_ ? true : pset.getUntrackedParameter<bool>("noEventSort")),
0034 treeCacheSize_(noEventSort_ ? pset.getUntrackedParameter<unsigned int>("cacheSize") : 0U),
0035 duplicateChecker_(new DuplicateChecker(pset)),
0036 usingGoToEvent_(false),
0037 enablePrefetching_(false),
0038 enforceGUIDInFileName_(pset.getUntrackedParameter<bool>("enforceGUIDInFileName")) {
0039 if (noRunLumiSort_ && (remainingEvents() >= 0 || remainingLuminosityBlocks() >= 0)) {
0040
0041
0042
0043
0044 throw Exception(errors::Configuration,
0045 "Illegal to configure noRunLumiSort and limit the number of events or luminosityBlocks");
0046 }
0047
0048 Service<SiteLocalConfig> pSLC;
0049 if (pSLC.isAvailable()) {
0050 if (treeCacheSize_ != 0U && pSLC->sourceTTreeCacheSize()) {
0051 treeCacheSize_ = *(pSLC->sourceTTreeCacheSize());
0052 }
0053 enablePrefetching_ = pSLC->enablePrefetching();
0054 }
0055
0056 std::string branchesMustMatch =
0057 pset.getUntrackedParameter<std::string>("branchesMustMatch", std::string("permissive"));
0058 if (branchesMustMatch == std::string("strict"))
0059 branchesMustMatch_ = BranchDescription::Strict;
0060
0061
0062 for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0063 storage::StorageFactory::get()->stagein(fileNames()[0]);
0064 }
0065
0066 for (setAtFirstFile(); !noMoreFiles(); setAtNextFile()) {
0067 initFile(input_.skipBadFiles());
0068 if (rootFile())
0069 break;
0070 }
0071 if (rootFile()) {
0072 input_.productRegistryUpdate().updateFromInput(rootFile()->productRegistry()->productList());
0073 if (initialNumberOfEventsToSkip_ != 0) {
0074 skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0075 }
0076 }
0077 }
0078
0079 RootPrimaryFileSequence::~RootPrimaryFileSequence() {}
0080
0081 void RootPrimaryFileSequence::endJob() { closeFile(); }
0082
0083 std::shared_ptr<FileBlock> RootPrimaryFileSequence::readFile_() {
0084 std::shared_ptr<FileBlock> fileBlock;
0085 if (firstFile_) {
0086 firstFile_ = false;
0087
0088 if (!rootFile()) {
0089 initFile(input_.skipBadFiles());
0090 }
0091 } else if (goToEventInNewFile_) {
0092 goToEventInNewFile_ = false;
0093 setAtFileSequenceNumber(goToFileSequenceOffset_);
0094 initFile(false);
0095 assert(rootFile());
0096 bool found = rootFile()->goToEvent(goToEventID_);
0097 assert(found);
0098 } else if (skipIntoNewFile_) {
0099 skipIntoNewFile_ = false;
0100 setAtFileSequenceNumber(skipToFileSequenceNumber_);
0101 initFile(false);
0102 assert(rootFile());
0103 if (skipToOffsetInFinalFile_ < 0) {
0104 rootFile()->setToLastEntry();
0105 }
0106 bool atEnd = rootFile()->skipEvents(skipToOffsetInFinalFile_);
0107 assert(!atEnd && skipToOffsetInFinalFile_ == 0);
0108 } else {
0109 if (!nextFile()) {
0110
0111
0112 fb_ = fileBlock;
0113 return fileBlock;
0114 }
0115 }
0116 if (!rootFile()) {
0117 fileBlock = std::make_shared<FileBlock>();
0118 fb_ = fileBlock;
0119 return fileBlock;
0120 }
0121 fileBlock = rootFile()->createFileBlock();
0122 fb_ = fileBlock;
0123 return fileBlock;
0124 }
0125
0126 void RootPrimaryFileSequence::closeFile_() {
0127
0128 if (rootFile()) {
0129 auto sentry = std::make_unique<InputSource::FileCloseSentry>(input_, lfn());
0130 rootFile()->close();
0131 if (duplicateChecker_)
0132 duplicateChecker_->inputFileClosed();
0133 rootFile().reset();
0134 }
0135 }
0136
0137 void RootPrimaryFileSequence::initFile_(bool skipBadFiles) {
0138
0139
0140
0141 bool deleteIndexIntoFile = !usingGoToEvent_ && !(duplicateChecker_ && duplicateChecker_->checkingAllFiles() &&
0142 !duplicateChecker_->checkDisabled());
0143 initTheFile(skipBadFiles, deleteIndexIntoFile, &input_, "primaryFiles", InputType::Primary);
0144 }
0145
0146 RootPrimaryFileSequence::RootFileSharedPtr RootPrimaryFileSequence::makeRootFile(std::shared_ptr<InputFile> filePtr) {
0147 size_t currentIndexIntoFile = sequenceNumberOfFile();
0148 return std::make_shared<RootFile>(fileNames()[0],
0149 input_.processConfiguration(),
0150 logicalFileName(),
0151 filePtr,
0152 eventSkipperByID(),
0153 initialNumberOfEventsToSkip_ != 0,
0154 remainingEvents(),
0155 remainingLuminosityBlocks(),
0156 input_.nStreams(),
0157 treeCacheSize_,
0158 input_.treeMaxVirtualSize(),
0159 input_.processingMode(),
0160 input_.runHelper(),
0161 noRunLumiSort_,
0162 noEventSort_,
0163 input_.productSelectorRules(),
0164 InputType::Primary,
0165 input_.branchIDListHelper(),
0166 input_.processBlockHelper().get(),
0167 input_.thinnedAssociationsHelper(),
0168 nullptr,
0169 duplicateChecker(),
0170 input_.dropDescendants(),
0171 input_.processHistoryRegistryForUpdate(),
0172 indexesIntoFiles(),
0173 currentIndexIntoFile,
0174 orderedProcessHistoryIDs_,
0175 input_.bypassVersionCheck(),
0176 input_.labelRawDataLikeMC(),
0177 usingGoToEvent_,
0178 enablePrefetching_,
0179 enforceGUIDInFileName_);
0180 }
0181
0182 bool RootPrimaryFileSequence::nextFile() {
0183 do {
0184 if (!noMoreFiles())
0185 setAtNextFile();
0186 if (noMoreFiles()) {
0187 return false;
0188 }
0189
0190 initFile(input_.skipBadFiles());
0191 if (rootFile()) {
0192 break;
0193 }
0194
0195
0196 assert(input_.skipBadFiles());
0197 } while (true);
0198
0199
0200 std::string mergeInfo =
0201 input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0202 if (!mergeInfo.empty()) {
0203 throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::nextFile()") << mergeInfo;
0204 }
0205 return true;
0206 }
0207
0208 bool RootPrimaryFileSequence::previousFile() {
0209 if (atFirstFile()) {
0210 return false;
0211 }
0212 setAtPreviousFile();
0213
0214 initFile(false);
0215
0216 if (rootFile()) {
0217
0218 std::string mergeInfo =
0219 input_.productRegistryUpdate().merge(*rootFile()->productRegistry(), fileNames()[0], branchesMustMatch_);
0220 if (!mergeInfo.empty()) {
0221 throw Exception(errors::MismatchedInputFiles, "RootPrimaryFileSequence::previousEvent()") << mergeInfo;
0222 }
0223 }
0224 if (rootFile())
0225 rootFile()->setToLastEntry();
0226 return true;
0227 }
0228
0229 InputSource::ItemTypeInfo RootPrimaryFileSequence::getNextItemType(RunNumber_t& run,
0230 LuminosityBlockNumber_t& lumi,
0231 EventNumber_t& event) {
0232 if (noMoreFiles() || skipToStop_) {
0233 skipToStop_ = false;
0234 return InputSource::ItemType::IsStop;
0235 }
0236 if (firstFile_ || goToEventInNewFile_ || skipIntoNewFile_) {
0237 return InputSource::ItemType::IsFile;
0238 }
0239 if (rootFile()) {
0240 IndexIntoFile::EntryType entryType = rootFile()->getNextItemType(run, lumi, event);
0241 if (entryType == IndexIntoFile::kEvent) {
0242 return InputSource::ItemType::IsEvent;
0243 } else if (entryType == IndexIntoFile::kLumi) {
0244 return InputSource::ItemType::IsLumi;
0245 } else if (entryType == IndexIntoFile::kRun) {
0246 return InputSource::ItemType::IsRun;
0247 }
0248 assert(entryType == IndexIntoFile::kEnd);
0249 }
0250 if (atLastFile()) {
0251 return InputSource::ItemType::IsStop;
0252 }
0253 return InputSource::ItemType::IsFile;
0254 }
0255
0256
0257 void RootPrimaryFileSequence::rewind_() {
0258 if (!atFirstFile()) {
0259 closeFile();
0260 setAtFirstFile();
0261 }
0262 if (!rootFile()) {
0263 initFile(false);
0264 }
0265 rewindFile();
0266 firstFile_ = true;
0267 goToEventInNewFile_ = false;
0268 skipIntoNewFile_ = false;
0269 skipToStop_ = false;
0270 if (rootFile()) {
0271 if (initialNumberOfEventsToSkip_ != 0) {
0272 skipEventsAtBeginning(initialNumberOfEventsToSkip_);
0273 }
0274 }
0275 }
0276
0277
0278 void RootPrimaryFileSequence::rewindFile() {
0279 if (rootFile())
0280 rootFile()->rewind();
0281 }
0282
0283
0284 void RootPrimaryFileSequence::skipEventsAtBeginning(int offset) {
0285 assert(rootFile());
0286 assert(offset >= 0);
0287 while (offset != 0) {
0288 bool atEnd = rootFile()->skipEvents(offset);
0289 if ((offset > 0 || atEnd) && !nextFile()) {
0290 return;
0291 }
0292 }
0293 }
0294
0295
0296 void RootPrimaryFileSequence::skipEvents(int offset) {
0297 assert(rootFile());
0298
0299 bool atEnd = rootFile()->skipEvents(offset);
0300 if (!atEnd && offset == 0) {
0301
0302 return;
0303 }
0304
0305
0306 skipToStop_ = false;
0307 if (offset > 0 || atEnd) {
0308 if (atLastFile() || noMoreFiles()) {
0309 skipToStop_ = true;
0310 return;
0311 }
0312 }
0313 if (offset < 0 && atFirstFile()) {
0314 skipToStop_ = true;
0315 return;
0316 }
0317
0318
0319 size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0320 IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0321
0322 if ((offset > 0 || atEnd) && !nextFile()) {
0323 skipToStop_ = true;
0324 }
0325 if (offset < 0 && !previousFile()) {
0326 skipToStop_ = true;
0327 }
0328
0329 if (!skipToStop_) {
0330 while (offset != 0) {
0331 skipToOffsetInFinalFile_ = offset;
0332 bool atEnd = rootFile()->skipEvents(offset);
0333 if ((offset > 0 || atEnd) && !nextFile()) {
0334 skipToStop_ = true;
0335 break;
0336 }
0337 if (offset < 0 && !previousFile()) {
0338 skipToStop_ = true;
0339 break;
0340 }
0341 }
0342 if (!skipToStop_) {
0343 skipIntoNewFile_ = true;
0344 }
0345 }
0346 skipToFileSequenceNumber_ = sequenceNumberOfFile();
0347
0348
0349 setAtFileSequenceNumber(originalFileSequenceNumber);
0350 initFile(false);
0351 assert(rootFile());
0352 rootFile()->setPosition(originalPosition);
0353 rootFile()->updateFileBlock(*fb_);
0354 }
0355
0356 bool RootPrimaryFileSequence::goToEvent(EventID const& eventID) {
0357 usingGoToEvent_ = true;
0358 if (rootFile()) {
0359 if (rootFile()->goToEvent(eventID)) {
0360 return true;
0361 }
0362
0363 if (rootFile() && indexesIntoFiles().size() == 1) {
0364 return false;
0365 }
0366
0367 for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0368 if (*it && (*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0369 goToEventInNewFile_ = true;
0370 goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0371 goToEventID_ = eventID;
0372 return true;
0373 }
0374 }
0375
0376
0377 bool closedOriginalFile = false;
0378 size_t const originalFileSequenceNumber = sequenceNumberOfFile();
0379 IndexIntoFile::IndexIntoFileItr originalPosition = rootFile()->indexIntoFileIter();
0380
0381
0382 bool foundIt = false;
0383 for (auto it = indexesIntoFiles().begin(), itEnd = indexesIntoFiles().end(); it != itEnd; ++it) {
0384 if (!*it) {
0385 setAtFileSequenceNumber(it - indexesIntoFiles().begin());
0386 initFile(false);
0387 assert(rootFile());
0388 closedOriginalFile = true;
0389 if ((*it)->containsItem(eventID.run(), eventID.luminosityBlock(), eventID.event())) {
0390 foundIt = true;
0391 goToEventInNewFile_ = true;
0392 goToFileSequenceOffset_ = it - indexesIntoFiles().begin();
0393 goToEventID_ = eventID;
0394 }
0395 }
0396 }
0397 if (closedOriginalFile) {
0398 setAtFileSequenceNumber(originalFileSequenceNumber);
0399 initFile(false);
0400 assert(rootFile());
0401 rootFile()->setPosition(originalPosition);
0402 rootFile()->updateFileBlock(*fb_);
0403 }
0404 return foundIt;
0405 }
0406 return false;
0407 }
0408
0409 int RootPrimaryFileSequence::remainingEvents() const { return input_.remainingEvents(); }
0410
0411 int RootPrimaryFileSequence::remainingLuminosityBlocks() const { return input_.remainingLuminosityBlocks(); }
0412
0413 void RootPrimaryFileSequence::fillDescription(ParameterSetDescription& desc) {
0414 desc.addUntracked<unsigned int>("skipEvents", 0U)
0415 ->setComment("Skip the first 'skipEvents' events that otherwise would have been processed.");
0416 desc.addUntracked<bool>("noEventSort", true)
0417 ->setComment(
0418 "True: Process runs, lumis and events in the order they appear in the file (but see notes 1 and 2).\n"
0419 "False: Process runs, lumis and events in each file in numerical order (run#, lumi#, event#) (but see note "
0420 "3).\n"
0421 "Note 1: Events within the same lumi will always be processed contiguously.\n"
0422 "Note 2: Lumis within the same run will always be processed contiguously.\n"
0423 "Note 3: Any sorting occurs independently in each input file (no sorting across input files).");
0424 desc.addUntracked<bool>("noRunLumiSort", false)
0425 ->setComment(
0426 "True: Process runs, lumis and events in the order they appear in the file.\n"
0427 "False: Follow settings based on 'noEventSort' setting.");
0428 desc.addUntracked<unsigned int>("cacheSize", roottree::defaultCacheSize)
0429 ->setComment("Size of ROOT TTree prefetch cache. Affects performance.");
0430 std::string defaultString("permissive");
0431 desc.addUntracked<std::string>("branchesMustMatch", defaultString)
0432 ->setComment(
0433 "'strict': Branches in each input file must match those in the first file.\n"
0434 "'permissive': Branches in each input file may be any subset of those in the first file.");
0435 desc.addUntracked<bool>("enforceGUIDInFileName", false)
0436 ->setComment(
0437 "True: file name part is required to be equal to the GUID of the file\n"
0438 "False: file name can be anything");
0439
0440 EventSkipperByID::fillDescription(desc);
0441 DuplicateChecker::fillDescription(desc);
0442 }
0443
0444 ProcessingController::ForwardState RootPrimaryFileSequence::forwardState() const {
0445 if (rootFile()) {
0446 if (!rootFile()->wasLastEventJustRead()) {
0447 return ProcessingController::kEventsAheadInFile;
0448 }
0449 if (noMoreFiles() || atLastFile()) {
0450 return ProcessingController::kAtLastEvent;
0451 } else {
0452 return ProcessingController::kNextFileExists;
0453 }
0454 }
0455 return ProcessingController::kUnknownForward;
0456 }
0457
0458 ProcessingController::ReverseState RootPrimaryFileSequence::reverseState() const {
0459 if (rootFile()) {
0460 if (!rootFile()->wasFirstEventJustRead()) {
0461 return ProcessingController::kEventsBackwardsInFile;
0462 }
0463 if (!atFirstFile()) {
0464 return ProcessingController::kPreviousFileExists;
0465 }
0466 return ProcessingController::kAtFirstEvent;
0467 }
0468 return ProcessingController::kUnknownReverse;
0469 }
0470
0471 }