File indexing completed on 2025-01-31 02:19:42
0001
0002
0003 #include "PoolSource.h"
0004 #include "InputFile.h"
0005 #include "RootPrimaryFileSequence.h"
0006 #include "RootSecondaryFileSequence.h"
0007 #include "RunHelper.h"
0008 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0009 #include "DataFormats/Provenance/interface/ProductDescription.h"
0010 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/InputSourceDescription.h"
0016 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/RunPrincipal.h"
0021 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0022 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0024 #include "FWCore/Utilities/interface/EDMException.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/InputType.h"
0027
0028 #include <set>
0029
0030 namespace edm {
0031
0032 class BranchID;
0033 class LuminosityBlockID;
0034 class EventID;
0035 class ThinnedAssociationsHelper;
0036
0037 namespace {
0038 void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
0039 ProcessHistory const& ph1 = primary.processHistory();
0040 ProcessHistory const& ph2 = secondary.processHistory();
0041 if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
0042 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0043 << "The secondary file is not an ancestor of the primary file\n";
0044 }
0045 }
0046 void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
0047 if (!isSameEvent(primary, secondary)) {
0048 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0049 << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
0050 }
0051 }
0052 void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
0053 if (primary.id() != secondary.id()) {
0054 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0055 << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
0056 }
0057 }
0058 void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
0059 if (primary.id() != secondary.id()) {
0060 throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0061 << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
0062 }
0063 }
0064 }
0065
0066 PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc)
0067 : InputSource(pset, desc),
0068 rootServiceChecker_(),
0069 catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0070 pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0071 secondaryCatalog_(
0072 pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
0073 pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0074 secondaryRunPrincipal_(),
0075 secondaryLumiPrincipal_(),
0076 secondaryEventPrincipals_(),
0077 branchIDsToReplace_(),
0078 nStreams_(desc.allocations_->numberOfStreams()),
0079 skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
0080 bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
0081 treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
0082 productSelectorRules_(pset, "inputCommands", "InputSource"),
0083 dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
0084 labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
0085 delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
0086 runHelper_(makeRunHelper(pset)),
0087 resourceSharedWithDelayedReaderPtr_(),
0088
0089
0090 primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
0091 secondaryFileSequence_(
0092 secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
0093 auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0094 resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0095 mutexSharedWithDelayedReader_ = resources.second;
0096
0097 if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
0098 throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
0099 }
0100 if (secondaryFileSequence_) {
0101 secondaryEventPrincipals_.reserve(nStreams_);
0102 for (unsigned int index = 0; index < nStreams_; ++index) {
0103 secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
0104 edm::productResolversFactory::makePrimary,
0105 secondaryFileSequence_->fileBranchIDListHelper(),
0106 std::make_shared<ThinnedAssociationsHelper const>(),
0107 processConfiguration(),
0108 nullptr,
0109 index));
0110 }
0111 std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
0112 ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
0113 ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
0114 std::set<BranchID> associationsFromSecondary;
0115
0116 ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
0117 for (auto const& item : secondary) {
0118 if (item.second.present()) {
0119 idsToReplace[item.second.branchType()].insert(item.second.branchID());
0120 if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
0121 associationsFromSecondary.insert(item.second.branchID());
0122 }
0123
0124 auto itFound = fullList.find(item.first);
0125 if (itFound != fullList.end()) {
0126
0127 if (itFound->second.dropped()) {
0128 itFound->second.initFromDictionary();
0129 }
0130 itFound->second.setDropped(false);
0131 }
0132 }
0133 }
0134 for (auto const& item : primary) {
0135 if (item.second.present()) {
0136 idsToReplace[item.second.branchType()].erase(item.second.branchID());
0137 associationsFromSecondary.erase(item.second.branchID());
0138 }
0139 }
0140 if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
0141 secondaryFileSequence_ = nullptr;
0142 } else {
0143 for (int i = InEvent; i < NumBranchTypes; ++i) {
0144 branchIDsToReplace_[i].reserve(idsToReplace[i].size());
0145 for (auto const& id : idsToReplace[i]) {
0146 branchIDsToReplace_[i].push_back(id);
0147 }
0148 }
0149 secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
0150 }
0151 }
0152 }
0153
0154 PoolSource::~PoolSource() {}
0155
0156 void PoolSource::endJob() {
0157 if (secondaryFileSequence_)
0158 secondaryFileSequence_->endJob();
0159 primaryFileSequence_->endJob();
0160 InputFile::reportReadBranches();
0161 }
0162
0163 std::shared_ptr<FileBlock> PoolSource::readFile_() {
0164 std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
0165 if (secondaryFileSequence_) {
0166 fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
0167 }
0168 return fb;
0169 }
0170
0171 void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }
0172
0173 std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
0174
0175 std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
0176 return primaryFileSequence_->readLuminosityBlockAuxiliary_();
0177 }
0178
0179 void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
0180
0181 bool PoolSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0182 return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
0183 }
0184
0185 void PoolSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0186 primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
0187 }
0188
0189 void PoolSource::readRun_(RunPrincipal& runPrincipal) {
0190 bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
0191 if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
0192 bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
0193 if (found) {
0194 std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
0195 checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
0196 secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryFileSequence_->fileProductRegistry(),
0197 edm::productResolversFactory::makePrimary,
0198 processConfiguration(),
0199 nullptr,
0200 runPrincipal.index());
0201 secondaryRunPrincipal_->setAux(*secondaryAuxiliary);
0202 secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
0203 checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
0204 runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
0205 } else {
0206 throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
0207 << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
0208 }
0209 }
0210 }
0211
0212 void PoolSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0213 bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
0214 if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
0215 bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
0216 if (found) {
0217 std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
0218 secondaryFileSequence_->readLuminosityBlockAuxiliary_();
0219 checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
0220 secondaryLumiPrincipal_ =
0221 std::make_shared<LuminosityBlockPrincipal>(secondaryFileSequence_->fileProductRegistry(),
0222 edm::productResolversFactory::makePrimary,
0223 processConfiguration(),
0224 nullptr,
0225 lumiPrincipal.index());
0226 secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
0227 secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
0228 checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
0229 lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
0230 } else {
0231 throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
0232 << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
0233 << " is not found in the secondary input files\n";
0234 }
0235 }
0236 }
0237
0238 void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
0239 bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
0240 assert(readEventSucceeded);
0241 if (secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
0242 bool found = secondaryFileSequence_->skipToItem(
0243 eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
0244 if (found) {
0245 EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
0246 bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
0247 checkConsistency(eventPrincipal, secondaryEventPrincipal);
0248 checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
0249 assert(readEventSucceeded);
0250 eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
0251 eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
0252 secondaryEventPrincipal.clearPrincipal();
0253 } else {
0254 throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
0255 << eventPrincipal.id() << " is not found in the secondary input files\n";
0256 }
0257 }
0258 if (not delayReadingEventProducts_) {
0259 eventPrincipal.readAllFromSourceAndMergeImmediately();
0260 }
0261 }
0262
0263 bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
0264 bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
0265 if (!found)
0266 return false;
0267 EventSourceSentry sentry(*this, streamContext);
0268 readEvent_(eventPrincipal);
0269 return true;
0270 }
0271
0272 InputSource::ItemTypeInfo PoolSource::getNextItemType() {
0273 RunNumber_t run = IndexIntoFile::invalidRun;
0274 LuminosityBlockNumber_t lumi = IndexIntoFile::invalidLumi;
0275 EventNumber_t event = IndexIntoFile::invalidEvent;
0276 InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
0277 if (secondaryFileSequence_ && (ItemType::IsSynchronize != state())) {
0278 if (itemType == ItemType::IsRun || itemType == ItemType::IsLumi || itemType == ItemType::IsEvent) {
0279 if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
0280 return ItemType::IsSynchronize;
0281 }
0282 }
0283 }
0284 return runHelper_->nextItemType(state(), itemType, run, lumi, event);
0285 }
0286
0287 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
0288 return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0289 }
0290
0291
0292 void PoolSource::rewind_() { primaryFileSequence_->rewind_(); }
0293
0294
0295 void PoolSource::skip(int offset) { primaryFileSequence_->skipEvents(offset); }
0296
0297 bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
0298
0299 void PoolSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0300 ParameterSetDescription desc;
0301
0302 std::vector<std::string> defaultStrings;
0303 desc.setComment("Reads EDM/Root files.");
0304 desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0305 desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
0306 ->setComment("Names of secondary files to be processed.");
0307 desc.addUntracked<bool>("needSecondaryFileNames", false)
0308 ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
0309 desc.addUntracked<std::string>("overrideCatalog", std::string());
0310 desc.addUntracked<bool>("skipBadFiles", false)
0311 ->setComment(
0312 "True: Ignore any missing or unopenable input file.\n"
0313 "False: Throw exception if missing or unopenable input file.");
0314 desc.addUntracked<bool>("bypassVersionCheck", false)
0315 ->setComment(
0316 "True: Bypass release version check.\n"
0317 "False: Throw exception if reading file in a release prior to the release in which the file was written.");
0318 desc.addUntracked<int>("treeMaxVirtualSize", -1)
0319 ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
0320 desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
0321 ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
0322 desc.addUntracked<bool>("labelRawDataLikeMC", true)
0323 ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
0324 desc.addUntracked<bool>("delayReadingEventProducts", true)
0325 ->setComment(
0326 "If True: do not read a data product from the file until it is requested. If False: all event data "
0327 "products are read upfront.");
0328 ProductSelectorRules::fillDescription(desc, "inputCommands");
0329 InputSource::fillDescription(desc);
0330 RootPrimaryFileSequence::fillDescription(desc);
0331 RunHelperBase::fillDescription(desc);
0332
0333 descriptions.add("source", desc);
0334 }
0335
0336 bool PoolSource::randomAccess_() const { return true; }
0337
0338 ProcessingController::ForwardState PoolSource::forwardState_() const { return primaryFileSequence_->forwardState(); }
0339
0340 ProcessingController::ReverseState PoolSource::reverseState_() const { return primaryFileSequence_->reverseState(); }
0341 }