Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-31 02:19:42

0001 /*----------------------------------------------------------------------
0002 ----------------------------------------------------------------------*/
0003 #include "PoolSource.h"
0004 #include "InputFile.h"
0005 #include "RootPrimaryFileSequence.h"
0006 #include "RootSecondaryFileSequence.h"
0007 #include "RunHelper.h"
0008 #include "DataFormats/Common/interface/ThinnedAssociation.h"
0009 #include "DataFormats/Provenance/interface/ProductDescription.h"
0010 #include "DataFormats/Provenance/interface/IndexIntoFile.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0013 #include "FWCore/Framework/interface/EventPrincipal.h"
0014 #include "FWCore/Framework/interface/FileBlock.h"
0015 #include "FWCore/Framework/interface/InputSourceDescription.h"
0016 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0017 #include "FWCore/Framework/interface/PreallocationConfiguration.h"
0018 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0019 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0020 #include "FWCore/Framework/interface/RunPrincipal.h"
0021 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0022 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0023 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0024 #include "FWCore/Utilities/interface/EDMException.h"
0025 #include "FWCore/Utilities/interface/Exception.h"
0026 #include "FWCore/Utilities/interface/InputType.h"
0027 
0028 #include <set>
0029 
0030 namespace edm {
0031 
0032   class BranchID;
0033   class LuminosityBlockID;
0034   class EventID;
0035   class ThinnedAssociationsHelper;
0036 
0037   namespace {
0038     void checkHistoryConsistency(Principal const& primary, Principal const& secondary) {
0039       ProcessHistory const& ph1 = primary.processHistory();
0040       ProcessHistory const& ph2 = secondary.processHistory();
0041       if (ph1 != ph2 && !isAncestor(ph2, ph1)) {
0042         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0043             << "The secondary file is not an ancestor of the primary file\n";
0044       }
0045     }
0046     void checkConsistency(EventPrincipal const& primary, EventPrincipal const& secondary) {
0047       if (!isSameEvent(primary, secondary)) {
0048         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0049             << primary.id() << " has inconsistent EventAuxiliary data in the primary and secondary file\n";
0050       }
0051     }
0052     void checkConsistency(LuminosityBlockAuxiliary const& primary, LuminosityBlockAuxiliary const& secondary) {
0053       if (primary.id() != secondary.id()) {
0054         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0055             << primary.id() << " has inconsistent LuminosityBlockAuxiliary data in the primary and secondary file\n";
0056       }
0057     }
0058     void checkConsistency(RunAuxiliary const& primary, RunAuxiliary const& secondary) {
0059       if (primary.id() != secondary.id()) {
0060         throw Exception(errors::MismatchedInputFiles, "PoolSource::checkConsistency")
0061             << primary.id() << " has inconsistent RunAuxiliary data in the primary and secondary file\n";
0062       }
0063     }
0064   }  // namespace
0065 
0066   PoolSource::PoolSource(ParameterSet const& pset, InputSourceDescription const& desc)
0067       : InputSource(pset, desc),
0068         rootServiceChecker_(),
0069         catalog_(pset.getUntrackedParameter<std::vector<std::string> >("fileNames"),
0070                  pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0071         secondaryCatalog_(
0072             pset.getUntrackedParameter<std::vector<std::string> >("secondaryFileNames", std::vector<std::string>()),
0073             pset.getUntrackedParameter<std::string>("overrideCatalog", std::string())),
0074         secondaryRunPrincipal_(),
0075         secondaryLumiPrincipal_(),
0076         secondaryEventPrincipals_(),
0077         branchIDsToReplace_(),
0078         nStreams_(desc.allocations_->numberOfStreams()),
0079         skipBadFiles_(pset.getUntrackedParameter<bool>("skipBadFiles")),
0080         bypassVersionCheck_(pset.getUntrackedParameter<bool>("bypassVersionCheck")),
0081         treeMaxVirtualSize_(pset.getUntrackedParameter<int>("treeMaxVirtualSize")),
0082         productSelectorRules_(pset, "inputCommands", "InputSource"),
0083         dropDescendants_(pset.getUntrackedParameter<bool>("dropDescendantsOfDroppedBranches")),
0084         labelRawDataLikeMC_(pset.getUntrackedParameter<bool>("labelRawDataLikeMC")),
0085         delayReadingEventProducts_(pset.getUntrackedParameter<bool>("delayReadingEventProducts")),
0086         runHelper_(makeRunHelper(pset)),
0087         resourceSharedWithDelayedReaderPtr_(),
0088         // Note: primaryFileSequence_ and secondaryFileSequence_ need to be initialized last, because they use data members
0089         // initialized previously in their own initialization.
0090         primaryFileSequence_(new RootPrimaryFileSequence(pset, *this, catalog_)),
0091         secondaryFileSequence_(
0092             secondaryCatalog_.empty() ? nullptr : new RootSecondaryFileSequence(pset, *this, secondaryCatalog_)) {
0093     auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0094     resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0095     mutexSharedWithDelayedReader_ = resources.second;
0096 
0097     if (secondaryCatalog_.empty() && pset.getUntrackedParameter<bool>("needSecondaryFileNames", false)) {
0098       throw Exception(errors::Configuration, "PoolSource") << "'secondaryFileNames' must be specified\n";
0099     }
0100     if (secondaryFileSequence_) {
0101       secondaryEventPrincipals_.reserve(nStreams_);
0102       for (unsigned int index = 0; index < nStreams_; ++index) {
0103         secondaryEventPrincipals_.emplace_back(new EventPrincipal(secondaryFileSequence_->fileProductRegistry(),
0104                                                                   edm::productResolversFactory::makePrimary,
0105                                                                   secondaryFileSequence_->fileBranchIDListHelper(),
0106                                                                   std::make_shared<ThinnedAssociationsHelper const>(),
0107                                                                   processConfiguration(),
0108                                                                   nullptr,
0109                                                                   index));
0110       }
0111       std::array<std::set<BranchID>, NumBranchTypes> idsToReplace;
0112       ProductRegistry::ProductList const& secondary = secondaryFileSequence_->fileProductRegistry()->productList();
0113       ProductRegistry::ProductList const& primary = primaryFileSequence_->fileProductRegistry()->productList();
0114       std::set<BranchID> associationsFromSecondary;
0115       //this is the registry used by the 'outside' world and only has the primary file information in it at present
0116       ProductRegistry::ProductList& fullList = productRegistryUpdate().productListUpdator();
0117       for (auto const& item : secondary) {
0118         if (item.second.present()) {
0119           idsToReplace[item.second.branchType()].insert(item.second.branchID());
0120           if (item.second.branchType() == InEvent && item.second.unwrappedType() == typeid(ThinnedAssociation)) {
0121             associationsFromSecondary.insert(item.second.branchID());
0122           }
0123           //now make sure this is marked as not dropped else the product will not be 'get'table from the Event
0124           auto itFound = fullList.find(item.first);
0125           if (itFound != fullList.end()) {
0126             // If the branch in primary file was dropped, need to initilize the dictionary information
0127             if (itFound->second.dropped()) {
0128               itFound->second.initFromDictionary();
0129             }
0130             itFound->second.setDropped(false);
0131           }
0132         }
0133       }
0134       for (auto const& item : primary) {
0135         if (item.second.present()) {
0136           idsToReplace[item.second.branchType()].erase(item.second.branchID());
0137           associationsFromSecondary.erase(item.second.branchID());
0138         }
0139       }
0140       if (idsToReplace[InEvent].empty() && idsToReplace[InLumi].empty() && idsToReplace[InRun].empty()) {
0141         secondaryFileSequence_ = nullptr;  // propagate_const<T> has no reset() function
0142       } else {
0143         for (int i = InEvent; i < NumBranchTypes; ++i) {
0144           branchIDsToReplace_[i].reserve(idsToReplace[i].size());
0145           for (auto const& id : idsToReplace[i]) {
0146             branchIDsToReplace_[i].push_back(id);
0147           }
0148         }
0149         secondaryFileSequence_->initAssociationsFromSecondary(associationsFromSecondary);
0150       }
0151     }
0152   }
0153 
0154   PoolSource::~PoolSource() {}
0155 
0156   void PoolSource::endJob() {
0157     if (secondaryFileSequence_)
0158       secondaryFileSequence_->endJob();
0159     primaryFileSequence_->endJob();
0160     InputFile::reportReadBranches();
0161   }
0162 
0163   std::shared_ptr<FileBlock> PoolSource::readFile_() {
0164     std::shared_ptr<FileBlock> fb = primaryFileSequence_->readFile_();
0165     if (secondaryFileSequence_) {
0166       fb->setNotFastClonable(FileBlock::HasSecondaryFileSequence);
0167     }
0168     return fb;
0169   }
0170 
0171   void PoolSource::closeFile_() { primaryFileSequence_->closeFile(); }
0172 
0173   std::shared_ptr<RunAuxiliary> PoolSource::readRunAuxiliary_() { return primaryFileSequence_->readRunAuxiliary_(); }
0174 
0175   std::shared_ptr<LuminosityBlockAuxiliary> PoolSource::readLuminosityBlockAuxiliary_() {
0176     return primaryFileSequence_->readLuminosityBlockAuxiliary_();
0177   }
0178 
0179   void PoolSource::fillProcessBlockHelper_() { primaryFileSequence_->fillProcessBlockHelper_(); }
0180 
0181   bool PoolSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0182     return primaryFileSequence_->nextProcessBlock_(processBlockPrincipal);
0183   }
0184 
0185   void PoolSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0186     primaryFileSequence_->readProcessBlock_(processBlockPrincipal);
0187   }
0188 
0189   void PoolSource::readRun_(RunPrincipal& runPrincipal) {
0190     bool shouldWeProcessRun = primaryFileSequence_->readRun_(runPrincipal);
0191     if (secondaryFileSequence_ && shouldWeProcessRun && !branchIDsToReplace_[InRun].empty()) {
0192       bool found = secondaryFileSequence_->skipToItem(runPrincipal.run(), 0U, 0U);
0193       if (found) {
0194         std::shared_ptr<RunAuxiliary> secondaryAuxiliary = secondaryFileSequence_->readRunAuxiliary_();
0195         checkConsistency(runPrincipal.aux(), *secondaryAuxiliary);
0196         secondaryRunPrincipal_ = std::make_shared<RunPrincipal>(secondaryFileSequence_->fileProductRegistry(),
0197                                                                 edm::productResolversFactory::makePrimary,
0198                                                                 processConfiguration(),
0199                                                                 nullptr,
0200                                                                 runPrincipal.index());
0201         secondaryRunPrincipal_->setAux(*secondaryAuxiliary);
0202         secondaryFileSequence_->readRun_(*secondaryRunPrincipal_);
0203         checkHistoryConsistency(runPrincipal, *secondaryRunPrincipal_);
0204         runPrincipal.recombine(*secondaryRunPrincipal_, branchIDsToReplace_[InRun]);
0205       } else {
0206         throw Exception(errors::MismatchedInputFiles, "PoolSource::readRun_")
0207             << " Run " << runPrincipal.run() << " is not found in the secondary input files\n";
0208       }
0209     }
0210   }
0211 
0212   void PoolSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0213     bool shouldWeProcessLumi = primaryFileSequence_->readLuminosityBlock_(lumiPrincipal);
0214     if (secondaryFileSequence_ && shouldWeProcessLumi && !branchIDsToReplace_[InLumi].empty()) {
0215       bool found = secondaryFileSequence_->skipToItem(lumiPrincipal.run(), lumiPrincipal.luminosityBlock(), 0U);
0216       if (found) {
0217         std::shared_ptr<LuminosityBlockAuxiliary> secondaryAuxiliary =
0218             secondaryFileSequence_->readLuminosityBlockAuxiliary_();
0219         checkConsistency(lumiPrincipal.aux(), *secondaryAuxiliary);
0220         secondaryLumiPrincipal_ =
0221             std::make_shared<LuminosityBlockPrincipal>(secondaryFileSequence_->fileProductRegistry(),
0222                                                        edm::productResolversFactory::makePrimary,
0223                                                        processConfiguration(),
0224                                                        nullptr,
0225                                                        lumiPrincipal.index());
0226         secondaryLumiPrincipal_->setAux(*secondaryAuxiliary);
0227         secondaryFileSequence_->readLuminosityBlock_(*secondaryLumiPrincipal_);
0228         checkHistoryConsistency(lumiPrincipal, *secondaryLumiPrincipal_);
0229         lumiPrincipal.recombine(*secondaryLumiPrincipal_, branchIDsToReplace_[InLumi]);
0230       } else {
0231         throw Exception(errors::MismatchedInputFiles, "PoolSource::readLuminosityBlock_")
0232             << " Run " << lumiPrincipal.run() << " LuminosityBlock " << lumiPrincipal.luminosityBlock()
0233             << " is not found in the secondary input files\n";
0234       }
0235     }
0236   }
0237 
0238   void PoolSource::readEvent_(EventPrincipal& eventPrincipal) {
0239     bool readEventSucceeded = primaryFileSequence_->readEvent(eventPrincipal);
0240     assert(readEventSucceeded);
0241     if (secondaryFileSequence_ && !branchIDsToReplace_[InEvent].empty()) {
0242       bool found = secondaryFileSequence_->skipToItem(
0243           eventPrincipal.run(), eventPrincipal.luminosityBlock(), eventPrincipal.id().event());
0244       if (found) {
0245         EventPrincipal& secondaryEventPrincipal = *secondaryEventPrincipals_[eventPrincipal.streamID().value()];
0246         bool readEventSucceeded = secondaryFileSequence_->readEvent(secondaryEventPrincipal);
0247         checkConsistency(eventPrincipal, secondaryEventPrincipal);
0248         checkHistoryConsistency(eventPrincipal, secondaryEventPrincipal);
0249         assert(readEventSucceeded);
0250         eventPrincipal.recombine(secondaryEventPrincipal, branchIDsToReplace_[InEvent]);
0251         eventPrincipal.mergeProvenanceRetrievers(secondaryEventPrincipal);
0252         secondaryEventPrincipal.clearPrincipal();
0253       } else {
0254         throw Exception(errors::MismatchedInputFiles, "PoolSource::readEvent_")
0255             << eventPrincipal.id() << " is not found in the secondary input files\n";
0256       }
0257     }
0258     if (not delayReadingEventProducts_) {
0259       eventPrincipal.readAllFromSourceAndMergeImmediately();
0260     }
0261   }
0262 
0263   bool PoolSource::readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) {
0264     bool found = primaryFileSequence_->skipToItem(id.run(), id.luminosityBlock(), id.event());
0265     if (!found)
0266       return false;
0267     EventSourceSentry sentry(*this, streamContext);
0268     readEvent_(eventPrincipal);
0269     return true;
0270   }
0271 
0272   InputSource::ItemTypeInfo PoolSource::getNextItemType() {
0273     RunNumber_t run = IndexIntoFile::invalidRun;
0274     LuminosityBlockNumber_t lumi = IndexIntoFile::invalidLumi;
0275     EventNumber_t event = IndexIntoFile::invalidEvent;
0276     InputSource::ItemType itemType = primaryFileSequence_->getNextItemType(run, lumi, event);
0277     if (secondaryFileSequence_ && (ItemType::IsSynchronize != state())) {
0278       if (itemType == ItemType::IsRun || itemType == ItemType::IsLumi || itemType == ItemType::IsEvent) {
0279         if (!secondaryFileSequence_->containedInCurrentFile(run, lumi, event)) {
0280           return ItemType::IsSynchronize;
0281         }
0282       }
0283     }
0284     return runHelper_->nextItemType(state(), itemType, run, lumi, event);
0285   }
0286 
0287   std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> PoolSource::resourceSharedWithDelayedReader_() {
0288     return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0289   }
0290 
0291   // Rewind to before the first event that was read.
0292   void PoolSource::rewind_() { primaryFileSequence_->rewind_(); }
0293 
0294   // Advance "offset" events.  Offset can be positive or negative (or zero).
0295   void PoolSource::skip(int offset) { primaryFileSequence_->skipEvents(offset); }
0296 
0297   bool PoolSource::goToEvent_(EventID const& eventID) { return primaryFileSequence_->goToEvent(eventID); }
0298 
0299   void PoolSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0300     ParameterSetDescription desc;
0301 
0302     std::vector<std::string> defaultStrings;
0303     desc.setComment("Reads EDM/Root files.");
0304     desc.addUntracked<std::vector<std::string> >("fileNames")->setComment("Names of files to be processed.");
0305     desc.addUntracked<std::vector<std::string> >("secondaryFileNames", defaultStrings)
0306         ->setComment("Names of secondary files to be processed.");
0307     desc.addUntracked<bool>("needSecondaryFileNames", false)
0308         ->setComment("If True, 'secondaryFileNames' must be specified and be non-empty.");
0309     desc.addUntracked<std::string>("overrideCatalog", std::string());
0310     desc.addUntracked<bool>("skipBadFiles", false)
0311         ->setComment(
0312             "True:  Ignore any missing or unopenable input file.\n"
0313             "False: Throw exception if missing or unopenable input file.");
0314     desc.addUntracked<bool>("bypassVersionCheck", false)
0315         ->setComment(
0316             "True:  Bypass release version check.\n"
0317             "False: Throw exception if reading file in a release prior to the release in which the file was written.");
0318     desc.addUntracked<int>("treeMaxVirtualSize", -1)
0319         ->setComment("Size of ROOT TTree TBasket cache. Affects performance.");
0320     desc.addUntracked<bool>("dropDescendantsOfDroppedBranches", true)
0321         ->setComment("If True, also drop on input any descendent of any branch dropped on input.");
0322     desc.addUntracked<bool>("labelRawDataLikeMC", true)
0323         ->setComment("If True: replace module label for raw data to match MC. Also use 'LHC' as process.");
0324     desc.addUntracked<bool>("delayReadingEventProducts", true)
0325         ->setComment(
0326             "If True: do not read a data product from the file until it is requested. If False: all event data "
0327             "products are read upfront.");
0328     ProductSelectorRules::fillDescription(desc, "inputCommands");
0329     InputSource::fillDescription(desc);
0330     RootPrimaryFileSequence::fillDescription(desc);
0331     RunHelperBase::fillDescription(desc);
0332 
0333     descriptions.add("source", desc);
0334   }
0335 
0336   bool PoolSource::randomAccess_() const { return true; }
0337 
0338   ProcessingController::ForwardState PoolSource::forwardState_() const { return primaryFileSequence_->forwardState(); }
0339 
0340   ProcessingController::ReverseState PoolSource::reverseState_() const { return primaryFileSequence_->reverseState(); }
0341 }  // namespace edm