File indexing completed on 2022-05-06 00:35:37
0001
0002
0003 #include "PoolSource.h"
0004 #include "InputFile.h"
0005 #include "RootPrimaryFileSequence.h"
0006 #include "RootSecondaryFileSequence.h"
0007 #include "RunHelper.h"
0008 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/InputSourceDescription.h"
0016 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/RunPrincipal.h"
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/Utilities/interface/EDMException.h"
0024 #include "FWCore/Utilities/interface/Exception.h"
0025 #include "FWCore/Utilities/interface/InputType.h"
0026
0027 #include <set>
0028
0029 namespace edm {
0030
0031 class BranchID;
0032 class LuminosityBlockID;
0033 class EventID;
0034 class ThinnedAssociationsHelper;
0035
0036 namespace {
0037 void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
0038 ProcessHistory const& ph1 = primary.processHistory();
0039 ProcessHistory const& ph2 = secondary.processHistory();
0040 if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
0041 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0042 << "The secondary file is not an ancestor of the primary file\n";
0043 }
0044 }
0045 void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
0046 if (!isSameEvent(primary, secondary)) {
0047 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0048 << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
0049 }
0050 }
0051 void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
0052 if (primary.id() != secondary.id()) {
0053 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0054 << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
0055 }
0056 }
0057 void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
0058 if (primary.id() != secondary.id()) {
0059 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0060 << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
0061 }
0062 }
0063 }
0064
0065 PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc)
0066 : InputSource(pset, desc),
0067 rootServiceChecker_(),
0068 catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0069 pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0070 secondaryCatalog_(
0071 pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
0072 pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0073 secondaryRunPrincipal_(),
0074 secondaryLumiPrincipal_(),
0075 secondaryEventPrincipals_(),
0076 branchIDsToReplace_(),
0077 nStreams_(desc.allocations_->numberOfStreams()),
0078 skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
0079 bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
0080 treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
0081 productSelectorRules_(pset, "inputCommands", "InputSource"),
0082 dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
0083 labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
0084 delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
0085 runHelper_(makeRunHelper(pset)),
0086 resourceSharedWithDelayedReaderPtr_(),
0087
0088
0089 primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
0090 secondaryFileSequence_(
0091 secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
0092 auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0093 resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0094 mutexSharedWithDelayedReader_ = resources.second;
0095
0096 if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
0097 throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
0098 }
0099 if (secondaryFileSequence_) {
0100 secondaryEventPrincipals_.reserve(nStreams_);
0101 for (unsigned int index = 0; index < nStreams_; ++index) {
0102 secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
0103 secondaryFileSequence_->fileBranchIDListHelper(),
0104 std::make_shared<ThinnedAssociationsHelper const>(),
0105 processConfiguration(),
0106 nullptr,
0107 index));
0108 }
0109 std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
0110 ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
0111 ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
0112 std::set<BranchID> associationsFromSecondary;
0113
0114 ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
0115 for (auto const& item : secondary) {
0116 if (item.second.present()) {
0117 idsToReplace[item.second.branchType()].insert(item.second.branchID());
0118 if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
0119 associationsFromSecondary.insert(item.second.branchID());
0120 }
0121
0122 auto itFound = fullList.find(item.first);
0123 if (itFound != fullList.end()) {
0124 itFound->second.setDropped(false);
0125 }
0126 }
0127 }
0128 for (auto const& item : primary) {
0129 if (item.second.present()) {
0130 idsToReplace[item.second.branchType()].erase(item.second.branchID());
0131 associationsFromSecondary.erase(item.second.branchID());
0132 }
0133 }
0134 if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
0135 secondaryFileSequence_ = nullptr;
0136 } else {
0137 for (int i = InEvent; i < NumBranchTypes; ++i) {
0138 branchIDsToReplace_[i].reserve(idsToReplace[i].size());
0139 for (auto const& id : idsToReplace[i]) {
0140 branchIDsToReplace_[i].push_back(id);
0141 }
0142 }
0143 secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
0144 }
0145 }
0146 }
0147
0148 PoolSource::~PoolSource() {}
0149
0150 void PoolSource::endJob() {
0151 if (secondaryFileSequence_)
0152 secondaryFileSequence_->endJob();
0153 primaryFileSequence_->endJob();
0154 InputFile::reportReadBranches();
0155 }
0156
0157 std::shared_ptr<FileBlock> PoolSource::readFile_() {
0158 std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
0159 if (secondaryFileSequence_) {
0160 fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
0161 }
0162 return fb;
0163 }
0164
0165 void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }
0166
0167 std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
0168
0169 std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
0170 return primaryFileSequence_->readLuminosityBlockAuxiliary_();
0171 }
0172
0173 void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
0174
0175 bool PoolSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0176 return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
0177 }
0178
0179 void PoolSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0180 primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
0181 }
0182
0183 void PoolSource::readRun_(RunPrincipal& runPrincipal) {
0184 bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
0185 if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
0186 bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
0187 if (found) {
0188 std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
0189 checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
0190 secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryAuxiliary,
0191 secondaryFileSequence_->fileProductRegistry(),
0192 processConfiguration(),
0193 nullptr,
0194 runPrincipal.index());
0195 secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
0196 checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
0197 runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
0198 } else {
0199 throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
0200 << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
0201 }
0202 }
0203 }
0204
0205 void PoolSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0206 bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
0207 if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
0208 bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
0209 if (found) {
0210 std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
0211 secondaryFileSequence_->readLuminosityBlockAuxiliary_();
0212 checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
0213 secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
0214 secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, lumiPrincipal.index());
0215 secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
0216 secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
0217 checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
0218 lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
0219 } else {
0220 throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
0221 << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
0222 << " is not found in the secondary input files\n";
0223 }
0224 }
0225 }
0226
0227 void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
0228 bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
0229 assert(readEventSucceeded);
0230 if (secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
0231 bool found = secondaryFileSequence_->skipToItem(
0232 eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
0233 if (found) {
0234 EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
0235 bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
0236 checkConsistency(eventPrincipal, secondaryEventPrincipal);
0237 checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
0238 assert(readEventSucceeded);
0239 eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
0240 eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
0241 secondaryEventPrincipal.clearPrincipal();
0242 } else {
0243 throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
0244 << eventPrincipal.id() << " is not found in the secondary input files\n";
0245 }
0246 }
0247 if (not delayReadingEventProducts_) {
0248 eventPrincipal.readAllFromSourceAndMergeImmediately();
0249 }
0250 }
0251
0252 bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
0253 bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
0254 if (!found)
0255 return false;
0256 EventSourceSentry sentry(*this, streamContext);
0257 readEvent_(eventPrincipal);
0258 return true;
0259 }
0260
0261 InputSource::ItemType PoolSource::getNextItemType() {
0262 RunNumber_t run = IndexIntoFile::invalidRun;
0263 LuminosityBlockNumber_t lumi = IndexIntoFile::invalidLumi;
0264 EventNumber_t event = IndexIntoFile::invalidEvent;
0265 InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
0266 if (secondaryFileSequence_ && (IsSynchronize != state())) {
0267 if (itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
0268 if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
0269 return IsSynchronize;
0270 }
0271 }
0272 }
0273 return runHelper_->nextItemType(state(), itemType, run, lumi, event);
0274 }
0275
0276 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
0277 return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0278 }
0279
0280
0281 void PoolSource::rewind_() { primaryFileSequence_->rewind_(); }
0282
0283
0284 void PoolSource::skip(int offset) { primaryFileSequence_->skipEvents(offset); }
0285
0286 bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
0287
0288 void PoolSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0289 ParameterSetDescription desc;
0290
0291 std::vector<std::string> defaultStrings;
0292 desc.setComment("Reads EDM/Root files.");
0293 desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0294 desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
0295 ->setComment("Names of secondary files to be processed.");
0296 desc.addUntracked<bool>("needSecondaryFileNames", false)
0297 ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
0298 desc.addUntracked<std::string>("overrideCatalog", std::string());
0299 desc.addUntracked<bool>("skipBadFiles", false)
0300 ->setComment(
0301 "True: Ignore any missing or unopenable input file.\n"
0302 "False: Throw exception if missing or unopenable input file.");
0303 desc.addUntracked<bool>("bypassVersionCheck", false)
0304 ->setComment(
0305 "True: Bypass release version check.\n"
0306 "False: Throw exception if reading file in a release prior to the release in which the file was written.");
0307 desc.addUntracked<int>("treeMaxVirtualSize", -1)
0308 ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
0309 desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
0310 ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
0311 desc.addUntracked<bool>("labelRawDataLikeMC", true)
0312 ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
0313 desc.addUntracked<bool>("delayReadingEventProducts", true)
0314 ->setComment(
0315 "If True: do not read a data product from the file until it is requested. If False: all event data "
0316 "products are read upfront.");
0317 ProductSelectorRules::fillDescription(desc, "inputCommands");
0318 InputSource::fillDescription(desc);
0319 RootPrimaryFileSequence::fillDescription(desc);
0320 RunHelperBase::fillDescription(desc);
0321
0322 descriptions.add("source", desc);
0323 }
0324
0325 bool PoolSource::randomAccess_() const { return true; }
0326
0327 ProcessingController::ForwardState PoolSource::forwardState_() const { return primaryFileSequence_->forwardState(); }
0328
0329 ProcessingController::ReverseState PoolSource::reverseState_() const { return primaryFileSequence_->reverseState(); }
0330 }