File indexing completed on 2023-03-17 11:10:20
0001
0002
0003 #include "PoolSource.h"
0004 #include "InputFile.h"
0005 #include "RootPrimaryFileSequence.h"
0006 #include "RootSecondaryFileSequence.h"
0007 #include "RunHelper.h"
0008 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/InputSourceDescription.h"
0016 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/RunPrincipal.h"
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/Utilities/interface/EDMException.h"
0024 #include "FWCore/Utilities/interface/Exception.h"
0025 #include "FWCore/Utilities/interface/InputType.h"
0026
0027 #include <set>
0028
0029 namespace edm {
0030
0031 class BranchID;
0032 class LuminosityBlockID;
0033 class EventID;
0034 class ThinnedAssociationsHelper;
0035
0036 namespace {
0037 void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
0038 ProcessHistory const& ph1 = primary.processHistory();
0039 ProcessHistory const& ph2 = secondary.processHistory();
0040 if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
0041 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0042 << "The secondary file is not an ancestor of the primary file\n";
0043 }
0044 }
0045 void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
0046 if (!isSameEvent(primary, secondary)) {
0047 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0048 << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
0049 }
0050 }
0051 void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
0052 if (primary.id() != secondary.id()) {
0053 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0054 << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
0055 }
0056 }
0057 void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
0058 if (primary.id() != secondary.id()) {
0059 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0060 << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
0061 }
0062 }
0063 }
0064
0065 PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc)
0066 : InputSource(pset, desc),
0067 rootServiceChecker_(),
0068 catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0069 pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0070 secondaryCatalog_(
0071 pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
0072 pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0073 secondaryRunPrincipal_(),
0074 secondaryLumiPrincipal_(),
0075 secondaryEventPrincipals_(),
0076 branchIDsToReplace_(),
0077 nStreams_(desc.allocations_->numberOfStreams()),
0078 skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
0079 bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
0080 treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
0081 productSelectorRules_(pset, "inputCommands", "InputSource"),
0082 dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
0083 labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
0084 delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
0085 runHelper_(makeRunHelper(pset)),
0086 resourceSharedWithDelayedReaderPtr_(),
0087
0088
0089 primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
0090 secondaryFileSequence_(
0091 secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
0092 auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0093 resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0094 mutexSharedWithDelayedReader_ = resources.second;
0095
0096 if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
0097 throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
0098 }
0099 if (secondaryFileSequence_) {
0100 secondaryEventPrincipals_.reserve(nStreams_);
0101 for (unsigned int index = 0; index < nStreams_; ++index) {
0102 secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
0103 secondaryFileSequence_->fileBranchIDListHelper(),
0104 std::make_shared<ThinnedAssociationsHelper const>(),
0105 processConfiguration(),
0106 nullptr,
0107 index));
0108 }
0109 std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
0110 ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
0111 ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
0112 std::set<BranchID> associationsFromSecondary;
0113
0114 ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
0115 for (auto const& item : secondary) {
0116 if (item.second.present()) {
0117 idsToReplace[item.second.branchType()].insert(item.second.branchID());
0118 if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
0119 associationsFromSecondary.insert(item.second.branchID());
0120 }
0121
0122 auto itFound = fullList.find(item.first);
0123 if (itFound != fullList.end()) {
0124
0125 if (itFound->second.dropped()) {
0126 itFound->second.initFromDictionary();
0127 }
0128 itFound->second.setDropped(false);
0129 }
0130 }
0131 }
0132 for (auto const& item : primary) {
0133 if (item.second.present()) {
0134 idsToReplace[item.second.branchType()].erase(item.second.branchID());
0135 associationsFromSecondary.erase(item.second.branchID());
0136 }
0137 }
0138 if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
0139 secondaryFileSequence_ = nullptr;
0140 } else {
0141 for (int i = InEvent; i < NumBranchTypes; ++i) {
0142 branchIDsToReplace_[i].reserve(idsToReplace[i].size());
0143 for (auto const& id : idsToReplace[i]) {
0144 branchIDsToReplace_[i].push_back(id);
0145 }
0146 }
0147 secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
0148 }
0149 }
0150 }
0151
0152 PoolSource::~PoolSource() {}
0153
0154 void PoolSource::endJob() {
0155 if (secondaryFileSequence_)
0156 secondaryFileSequence_->endJob();
0157 primaryFileSequence_->endJob();
0158 InputFile::reportReadBranches();
0159 }
0160
0161 std::shared_ptr<FileBlock> PoolSource::readFile_() {
0162 std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
0163 if (secondaryFileSequence_) {
0164 fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
0165 }
0166 return fb;
0167 }
0168
0169 void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }
0170
0171 std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
0172
0173 std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
0174 return primaryFileSequence_->readLuminosityBlockAuxiliary_();
0175 }
0176
0177 void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
0178
0179 bool PoolSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0180 return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
0181 }
0182
0183 void PoolSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0184 primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
0185 }
0186
0187 void PoolSource::readRun_(RunPrincipal& runPrincipal) {
0188 bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
0189 if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
0190 bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
0191 if (found) {
0192 std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
0193 checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
0194 secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(
0195 secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, runPrincipal.index());
0196 secondaryRunPrincipal_->setAux(*secondaryAuxiliary);
0197 secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
0198 checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
0199 runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
0200 } else {
0201 throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
0202 << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
0203 }
0204 }
0205 }
0206
0207 void PoolSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0208 bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
0209 if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
0210 bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
0211 if (found) {
0212 std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
0213 secondaryFileSequence_->readLuminosityBlockAuxiliary_();
0214 checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
0215 secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
0216 secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, lumiPrincipal.index());
0217 secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
0218 secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
0219 checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
0220 lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
0221 } else {
0222 throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
0223 << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
0224 << " is not found in the secondary input files\n";
0225 }
0226 }
0227 }
0228
0229 void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
0230 bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
0231 assert(readEventSucceeded);
0232 if (secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
0233 bool found = secondaryFileSequence_->skipToItem(
0234 eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
0235 if (found) {
0236 EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
0237 bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
0238 checkConsistency(eventPrincipal, secondaryEventPrincipal);
0239 checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
0240 assert(readEventSucceeded);
0241 eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
0242 eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
0243 secondaryEventPrincipal.clearPrincipal();
0244 } else {
0245 throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
0246 << eventPrincipal.id() << " is not found in the secondary input files\n";
0247 }
0248 }
0249 if (not delayReadingEventProducts_) {
0250 eventPrincipal.readAllFromSourceAndMergeImmediately();
0251 }
0252 }
0253
0254 bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
0255 bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
0256 if (!found)
0257 return false;
0258 EventSourceSentry sentry(*this, streamContext);
0259 readEvent_(eventPrincipal);
0260 return true;
0261 }
0262
0263 InputSource::ItemType PoolSource::getNextItemType() {
0264 RunNumber_t run = IndexIntoFile::invalidRun;
0265 LuminosityBlockNumber_t lumi = IndexIntoFile::invalidLumi;
0266 EventNumber_t event = IndexIntoFile::invalidEvent;
0267 InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
0268 if (secondaryFileSequence_ && (IsSynchronize != state())) {
0269 if (itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
0270 if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
0271 return IsSynchronize;
0272 }
0273 }
0274 }
0275 return runHelper_->nextItemType(state(), itemType, run, lumi, event);
0276 }
0277
0278 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
0279 return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0280 }
0281
0282
0283 void PoolSource::rewind_() { primaryFileSequence_->rewind_(); }
0284
0285
0286 void PoolSource::skip(int offset) { primaryFileSequence_->skipEvents(offset); }
0287
0288 bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
0289
0290 void PoolSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0291 ParameterSetDescription desc;
0292
0293 std::vector<std::string> defaultStrings;
0294 desc.setComment("Reads EDM/Root files.");
0295 desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0296 desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
0297 ->setComment("Names of secondary files to be processed.");
0298 desc.addUntracked<bool>("needSecondaryFileNames", false)
0299 ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
0300 desc.addUntracked<std::string>("overrideCatalog", std::string());
0301 desc.addUntracked<bool>("skipBadFiles", false)
0302 ->setComment(
0303 "True: Ignore any missing or unopenable input file.\n"
0304 "False: Throw exception if missing or unopenable input file.");
0305 desc.addUntracked<bool>("bypassVersionCheck", false)
0306 ->setComment(
0307 "True: Bypass release version check.\n"
0308 "False: Throw exception if reading file in a release prior to the release in which the file was written.");
0309 desc.addUntracked<int>("treeMaxVirtualSize", -1)
0310 ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
0311 desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
0312 ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
0313 desc.addUntracked<bool>("labelRawDataLikeMC", true)
0314 ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
0315 desc.addUntracked<bool>("delayReadingEventProducts", true)
0316 ->setComment(
0317 "If True: do not read a data product from the file until it is requested. If False: all event data "
0318 "products are read upfront.");
0319 ProductSelectorRules::fillDescription(desc, "inputCommands");
0320 InputSource::fillDescription(desc);
0321 RootPrimaryFileSequence::fillDescription(desc);
0322 RunHelperBase::fillDescription(desc);
0323
0324 descriptions.add("source", desc);
0325 }
0326
0327 bool PoolSource::randomAccess_() const { return true; }
0328
0329 ProcessingController::ForwardState PoolSource::forwardState_() const { return primaryFileSequence_->forwardState(); }
0330
0331 ProcessingController::ReverseState PoolSource::reverseState_() const { return primaryFileSequence_->reverseState(); }
0332 }