Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:19:06

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "PoolSource.h"
0004 #include "InputFile.h"
0005 #include "RootPrimaryFileSequence.h"
0006 #include "RootSecondaryFileSequence.h"
0007 #include "RunHelper.h"
0008 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0009 #include "DataFormats/Provenance/interface/BranchDescription.h"
0010 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/InputSourceDescription.h"
0016 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/RunPrincipal.h"
0021 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0022 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0023 #include "FWCore/Utilities/interface/EDMException.h"
0024 #include "FWCore/Utilities/interface/Exception.h"
0025 #include "FWCore/Utilities/interface/InputType.h"
0026 
0027 #include <set>
0028 
0029 namespace edm {
0030 
0031   class BranchID;
0032   class LuminosityBlockID;
0033   class EventID;
0034   class ThinnedAssociationsHelper;
0035 
0036   namespace {
0037     void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
0038       ProcessHistory const& ph1 = primary.processHistory();
0039       ProcessHistory const& ph2 = secondary.processHistory();
0040       if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
0041         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0042             << "The secondary file is not an ancestor of the primary file\n";
0043       }
0044     }
0045     void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
0046       if (!isSameEvent(primary, secondary)) {
0047         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0048             << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
0049       }
0050     }
0051     void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
0052       if (primary.id() != secondary.id()) {
0053         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0054             << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
0055       }
0056     }
0057     void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
0058       if (primary.id() != secondary.id()) {
0059         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0060             << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
0061       }
0062     }
0063   }  // namespace
0064 
0065   PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc)
0066       : InputSource(pset, desc),
0067         rootServiceChecker_(),
0068         catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0069                  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0070         secondaryCatalog_(
0071             pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
0072             pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0073         secondaryRunPrincipal_(),
0074         secondaryLumiPrincipal_(),
0075         secondaryEventPrincipals_(),
0076         branchIDsToReplace_(),
0077         nStreams_(desc.allocations_->numberOfStreams()),
0078         skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
0079         bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
0080         treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
0081         productSelectorRules_(pset, "inputCommands", "InputSource"),
0082         dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
0083         labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
0084         delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
0085         runHelper_(makeRunHelper(pset)),
0086         resourceSharedWithDelayedReaderPtr_(),
0087         // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
0088         // initialized previously in their own initialization.
0089         primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
0090         secondaryFileSequence_(
0091             secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
0092     auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0093     resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0094     mutexSharedWithDelayedReader_ = resources.second;
0095 
0096     if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
0097       throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
0098     }
0099     if (secondaryFileSequence_) {
0100       secondaryEventPrincipals_.reserve(nStreams_);
0101       for (unsigned int index = 0; index < nStreams_; ++index) {
0102         secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
0103                                                                   secondaryFileSequence_->fileBranchIDListHelper(),
0104                                                                   std::make_shared<ThinnedAssociationsHelper const>(),
0105                                                                   processConfiguration(),
0106                                                                   nullptr,
0107                                                                   index));
0108       }
0109       std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
0110       ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
0111       ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
0112       std::set<BranchID> associationsFromSecondary;
0113       //this is the registry used by the 'outside' world and only has the primary file information in it at present
0114       ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
0115       for (auto const& item : secondary) {
0116         if (item.second.present()) {
0117           idsToReplace[item.second.branchType()].insert(item.second.branchID());
0118           if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
0119             associationsFromSecondary.insert(item.second.branchID());
0120           }
0121           //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
0122           auto itFound = fullList.find(item.first);
0123           if (itFound != fullList.end()) {
0124             // If the branch in primary file was dropped, need to initilize the dictionary information
0125             if (itFound->second.dropped()) {
0126               itFound->second.initFromDictionary();
0127             }
0128             itFound->second.setDropped(false);
0129           }
0130         }
0131       }
0132       for (auto const& item : primary) {
0133         if (item.second.present()) {
0134           idsToReplace[item.second.branchType()].erase(item.second.branchID());
0135           associationsFromSecondary.erase(item.second.branchID());
0136         }
0137       }
0138       if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
0139         secondaryFileSequence_ = nullptr;  // propagate_const<T> has no reset() function
0140       } else {
0141         for (int i = InEvent; i < NumBranchTypes; ++i) {
0142           branchIDsToReplace_[i].reserve(idsToReplace[i].size());
0143           for (auto const& id : idsToReplace[i]) {
0144             branchIDsToReplace_[i].push_back(id);
0145           }
0146         }
0147         secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
0148       }
0149     }
0150   }
0151 
0152   PoolSource::~PoolSource() {}
0153 
0154   void PoolSource::endJob() {
0155     if (secondaryFileSequence_)
0156       secondaryFileSequence_->endJob();
0157     primaryFileSequence_->endJob();
0158     InputFile::reportReadBranches();
0159   }
0160 
0161   std::shared_ptr<FileBlock> PoolSource::readFile_() {
0162     std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
0163     if (secondaryFileSequence_) {
0164       fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
0165     }
0166     return fb;
0167   }
0168 
0169   void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }
0170 
0171   std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
0172 
0173   std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
0174     return primaryFileSequence_->readLuminosityBlockAuxiliary_();
0175   }
0176 
0177   void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
0178 
0179   bool PoolSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0180     return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
0181   }
0182 
0183   void PoolSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0184     primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
0185   }
0186 
0187   void PoolSource::readRun_(RunPrincipal& runPrincipal) {
0188     bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
0189     if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
0190       bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
0191       if (found) {
0192         std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
0193         checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
0194         secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(
0195             secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, runPrincipal.index());
0196         secondaryRunPrincipal_->setAux(*secondaryAuxiliary);
0197         secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
0198         checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
0199         runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
0200       } else {
0201         throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
0202             << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
0203       }
0204     }
0205   }
0206 
0207   void PoolSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0208     bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
0209     if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
0210       bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
0211       if (found) {
0212         std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
0213             secondaryFileSequence_->readLuminosityBlockAuxiliary_();
0214         checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
0215         secondaryLumiPrincipal_ = std::make_shared<LuminosityBlockPrincipal>(
0216             secondaryFileSequence_->fileProductRegistry(), processConfiguration(), nullptr, lumiPrincipal.index());
0217         secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
0218         secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
0219         checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
0220         lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
0221       } else {
0222         throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
0223             << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
0224             << " is not found in the secondary input files\n";
0225       }
0226     }
0227   }
0228 
0229   void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
0230     bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
0231     assert(readEventSucceeded);
0232     if (secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
0233       bool found = secondaryFileSequence_->skipToItem(
0234           eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
0235       if (found) {
0236         EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
0237         bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
0238         checkConsistency(eventPrincipal, secondaryEventPrincipal);
0239         checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
0240         assert(readEventSucceeded);
0241         eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
0242         eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
0243         secondaryEventPrincipal.clearPrincipal();
0244       } else {
0245         throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
0246             << eventPrincipal.id() << " is not found in the secondary input files\n";
0247       }
0248     }
0249     if (not delayReadingEventProducts_) {
0250       eventPrincipal.readAllFromSourceAndMergeImmediately();
0251     }
0252   }
0253 
0254   bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
0255     bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
0256     if (!found)
0257       return false;
0258     EventSourceSentry sentry(*this, streamContext);
0259     readEvent_(eventPrincipal);
0260     return true;
0261   }
0262 
0263   InputSource::ItemTypeInfo PoolSource::getNextItemType() {
0264     RunNumber_t run = IndexIntoFile::invalidRun;
0265     LuminosityBlockNumber_t lumi = IndexIntoFile::invalidLumi;
0266     EventNumber_t event = IndexIntoFile::invalidEvent;
0267     InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
0268     if (secondaryFileSequence_ && (ItemType::IsSynchronize != state())) {
0269       if (itemType == ItemType::IsRun || itemType == ItemType::IsLumi || itemType == ItemType::IsEvent) {
0270         if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
0271           return ItemType::IsSynchronize;
0272         }
0273       }
0274     }
0275     return runHelper_->nextItemType(state(), itemType, run, lumi, event);
0276   }
0277 
0278   std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
0279     return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0280   }
0281 
0282   // Rewind to before the first event that was read.
0283   void PoolSource::rewind_() { primaryFileSequence_->rewind_(); }
0284 
0285   // Advance "offset" events.  Offset can be positive or negative (or zero).
0286   void PoolSource::skip(int offset) { primaryFileSequence_->skipEvents(offset); }
0287 
0288   bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
0289 
0290   void PoolSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0291     ParameterSetDescription desc;
0292 
0293     std::vector<std::string> defaultStrings;
0294     desc.setComment("Reads EDM/Root files.");
0295     desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0296     desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
0297         ->setComment("Names of secondary files to be processed.");
0298     desc.addUntracked<bool>("needSecondaryFileNames", false)
0299         ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
0300     desc.addUntracked<std::string>("overrideCatalog", std::string());
0301     desc.addUntracked<bool>("skipBadFiles", false)
0302         ->setComment(
0303             "True:  Ignore any missing or unopenable input file.\n"
0304             "False: Throw exception if missing or unopenable input file.");
0305     desc.addUntracked<bool>("bypassVersionCheck", false)
0306         ->setComment(
0307             "True:  Bypass release version check.\n"
0308             "False: Throw exception if reading file in a release prior to the release in which the file was written.");
0309     desc.addUntracked<int>("treeMaxVirtualSize", -1)
0310         ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
0311     desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
0312         ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
0313     desc.addUntracked<bool>("labelRawDataLikeMC", true)
0314         ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
0315     desc.addUntracked<bool>("delayReadingEventProducts", true)
0316         ->setComment(
0317             "If True: do not read a data product from the file until it is requested. If False: all event data "
0318             "products are read upfront.");
0319     ProductSelectorRules::fillDescription(desc, "inputCommands");
0320     InputSource::fillDescription(desc);
0321     RootPrimaryFileSequence::fillDescription(desc);
0322     RunHelperBase::fillDescription(desc);
0323 
0324     descriptions.add("source", desc);
0325   }
0326 
0327   bool PoolSource::randomAccess_() const { return true; }
0328 
0329   ProcessingController::ForwardState PoolSource::forwardState_() const { return primaryFileSequence_->forwardState(); }
0330 
0331   ProcessingController::ReverseState PoolSource::reverseState_() const { return primaryFileSequence_->reverseState(); }
0332 }  // namespace edm