Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-05-06 00:35:37

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "PoolSource.h"
0004 #include "InputFile.h"
0005 #include "RootPrimaryFileSequence.h"
0006 #include "RootSecondaryFileSequence.h"
0007 #include "RunHelper.h"
0008 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/InputSourceDescription.h"
0016 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/RunPrincipal.h"
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/Utilities/interface/EDMException.h"
0024 #include "FWCore/Utilities/interface/Exception.h"
0025 #include "FWCore/Utilities/interface/InputType.h"
0026 
0027 #include <set>
0028 
0029 namespace edm {
0030 
0031   class BranchID;
0032   class LuminosityBlockID;
0033   class EventID;
0034   class ThinnedAssociationsHelper;
0035 
0036   namespace {
0037     void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
0038       ProcessHistory const& ph1 = primary.processHistory();
0039       ProcessHistory const& ph2 = secondary.processHistory();
0040       if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
0041         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0042             << "The secondary file is not an ancestor of the primary file\n";
0043       }
0044     }
0045     void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
0046       if (!isSameEvent(primary, secondary)) {
0047         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0048             << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
0049       }
0050     }
0051     void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
0052       if (primary.id() != secondary.id()) {
0053         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0054             << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
0055       }
0056     }
0057     void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
0058       if (primary.id() != secondary.id()) {
0059         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0060             << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
0061       }
0062     }
0063   }  // namespace
0064 
0065   PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc)
0066       : InputSource(pset, desc),
0067         rootServiceChecker_(),
0068         catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0069                  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0070         secondaryCatalog_(
0071             pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
0072             pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0073         secondaryRunPrincipal_(),
0074         secondaryLumiPrincipal_(),
0075         secondaryEventPrincipals_(),
0076         branchIDsToReplace_(),
0077         nStreams_(desc.allocations_->numberOfStreams()),
0078         skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
0079         bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
0080         treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
0081         productSelectorRules_(pset, "inputCommands", "InputSource"),
0082         dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
0083         labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
0084         delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
0085         runHelper_(makeRunHelper(pset)),
0086         resourceSharedWithDelayedReaderPtr_(),
0087         // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
0088         // initialized previously in their own initialization.
0089         primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
0090         secondaryFileSequence_(
0091             secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
0092     auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0093     resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0094     mutexSharedWithDelayedReader_ = resources.second;
0095 
0096     if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
0097       throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
0098     }
0099     if (secondaryFileSequence_) {
0100       secondaryEventPrincipals_.reserve(nStreams_);
0101       for (unsigned int index = 0; index < nStreams_; ++index) {
0102         secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
0103                                                                   secondaryFileSequence_->fileBranchIDListHelper(),
0104                                                                   std::make_shared<ThinnedAssociationsHelper const>(),
0105                                                                   processConfiguration(),
0106                                                                   nullptr,
0107                                                                   index));
0108       }
0109       std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
0110       ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
0111       ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
0112       std::set<BranchID> associationsFromSecondary;
0113       //this is the registry used by the 'outside' world and only has the primary file information in it at present
0114       ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
0115       for (auto const& item : secondary) {
0116         if (item.second.present()) {
0117           idsToReplace[item.second.branchType()].insert(item.second.branchID());
0118           if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
0119             associationsFromSecondary.insert(item.second.branchID());
0120           }
0121           //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
0122           auto itFound = fullList.find(item.first);
0123           if (itFound != fullList.end()) {
0124             itFound->second.setDropped(false);
0125           }
0126         }
0127       }
0128       for (auto const& item : primary) {
0129         if (item.second.present()) {
0130           idsToReplace[item.second.branchType()].erase(item.second.branchID());
0131           associationsFromSecondary.erase(item.second.branchID());
0132         }
0133       }
0134       if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
0135         secondaryFileSequence_ = nullptr;  // propagate_const<T> has no reset() function
0136       } else {
0137         for (int i = InEvent; i < NumBranchTypes; ++i) {
0138           branchIDsToReplace_[i].reserve(idsToReplace[i].size());
0139           for (auto const& id : idsToReplace[i]) {
0140             branchIDsToReplace_[i].push_back(id);
0141           }
0142         }
0143         secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
0144       }
0145     }
0146   }
0147 
0148   PoolSource::~PoolSource() {}
0149 
0150   void PoolSource::endJob() {
0151     if (secondaryFileSequence_)
0152       secondaryFileSequence_->endJob();
0153     primaryFileSequence_->endJob();
0154     InputFile::reportReadBranches();
0155   }
0156 
0157   std::shared_ptr<FileBlock> PoolSource::readFile_() {
0158     std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
0159     if (secondaryFileSequence_) {
0160       fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
0161     }
0162     return fb;
0163   }
0164 
0165   void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }
0166 
0167   std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
0168 
0169   std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
0170     return primaryFileSequence_->readLuminosityBlockAuxiliary_();
0171   }
0172 
0173   void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
0174 
0175   bool PoolSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0176     return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
0177   }
0178 
0179   void PoolSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0180     primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
0181   }
0182 
0183   void PoolSource::readRun_(RunPrincipal& runPrincipal) {
0184     bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
0185     if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
0186       bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
0187       if (found) {
0188         std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
0189         checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
0190         secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryAuxiliary,
0191                                                                 secondaryFileSequence_->fileProductRegistry(),
0192                                                                 processConfiguration(),
0193                                                                 nullptr,
0194                                                                 runPrincipal.index());
0195         secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
0196         checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
0197         runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
0198       } else {
0199         throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
0200             << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
0201       }
0202     }
0203   }
0204 
0205   void PoolSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0206     bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
0207     if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
0208       bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
0209       if (found) {
0210         std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
0211             secondaryFileSequence_->readLuminosityBlockAuxiliary_();
0212         checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
0213         secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
0214             secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, lumiPrincipal.index());
0215         secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
0216         secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
0217         checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
0218         lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
0219       } else {
0220         throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
0221             << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
0222             << " is not found in the secondary input files\n";
0223       }
0224     }
0225   }
0226 
0227   void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
0228     bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
0229     assert(readEventSucceeded);
0230     if (secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
0231       bool found = secondaryFileSequence_->skipToItem(
0232           eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
0233       if (found) {
0234         EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
0235         bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
0236         checkConsistency(eventPrincipal, secondaryEventPrincipal);
0237         checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
0238         assert(readEventSucceeded);
0239         eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
0240         eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
0241         secondaryEventPrincipal.clearPrincipal();
0242       } else {
0243         throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
0244             << eventPrincipal.id() << " is not found in the secondary input files\n";
0245       }
0246     }
0247     if (not delayReadingEventProducts_) {
0248       eventPrincipal.readAllFromSourceAndMergeImmediately();
0249     }
0250   }
0251 
0252   bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
0253     bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
0254     if (!found)
0255       return false;
0256     EventSourceSentry sentry(*this, streamContext);
0257     readEvent_(eventPrincipal);
0258     return true;
0259   }
0260 
0261   InputSource::ItemType PoolSource::getNextItemType() {
0262     RunNumber_t run = IndexIntoFile::invalidRun;
0263     LuminosityBlockNumber_t lumi = IndexIntoFile::invalidLumi;
0264     EventNumber_t event = IndexIntoFile::invalidEvent;
0265     InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
0266     if (secondaryFileSequence_ && (IsSynchronize != state())) {
0267       if (itemType == IsRun || itemType == IsLumi || itemType == IsEvent) {
0268         if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
0269           return IsSynchronize;
0270         }
0271       }
0272     }
0273     return runHelper_->nextItemType(state(), itemType, run, lumi, event);
0274   }
0275 
0276   std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
0277     return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0278   }
0279 
0280   // Rewind to before the first event that was read.
0281   void PoolSource::rewind_() { primaryFileSequence_->rewind_(); }
0282 
0283   // Advance "offset" events.  Offset can be positive or negative (or zero).
0284   void PoolSource::skip(int offset) { primaryFileSequence_->skipEvents(offset); }
0285 
0286   bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
0287 
0288   void PoolSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0289     ParameterSetDescription desc;
0290 
0291     std::vector<std::string> defaultStrings;
0292     desc.setComment("Reads EDM/Root files.");
0293     desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0294     desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
0295         ->setComment("Names of secondary files to be processed.");
0296     desc.addUntracked<bool>("needSecondaryFileNames", false)
0297         ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
0298     desc.addUntracked<std::string>("overrideCatalog", std::string());
0299     desc.addUntracked<bool>("skipBadFiles", false)
0300         ->setComment(
0301             "True:  Ignore any missing or unopenable input file.\n"
0302             "False: Throw exception if missing or unopenable input file.");
0303     desc.addUntracked<bool>("bypassVersionCheck", false)
0304         ->setComment(
0305             "True:  Bypass release version check.\n"
0306             "False: Throw exception if reading file in a release prior to the release in which the file was written.");
0307     desc.addUntracked<int>("treeMaxVirtualSize", -1)
0308         ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
0309     desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
0310         ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
0311     desc.addUntracked<bool>("labelRawDataLikeMC", true)
0312         ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
0313     desc.addUntracked<bool>("delayReadingEventProducts", true)
0314         ->setComment(
0315             "If True: do not read a data product from the file until it is requested. If False: all event data "
0316             "products are read upfront.");
0317     ProductSelectorRules::fillDescription(desc, "inputCommands");
0318     InputSource::fillDescription(desc);
0319     RootPrimaryFileSequence::fillDescription(desc);
0320     RunHelperBase::fillDescription(desc);
0321 
0322     descriptions.add("source", desc);
0323   }
0324 
0325   bool PoolSource::randomAccess_() const { return true; }
0326 
0327   ProcessingController::ForwardState PoolSource::forwardState_() const { return primaryFileSequence_->forwardState(); }
0328 
0329   ProcessingController::ReverseState PoolSource::reverseState_() const { return primaryFileSequence_->reverseState(); }
0330 }  // namespace edm