Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-01-31 23:35:53

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     RepeatingCachedRootSource
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 15 Mar 2021 19:02:31 GMT
0011 //
0012 
0013 // system include files
0014 #include <memory>
0015 
0016 // user include files
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
0021 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0022 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0023 #include "DataFormats/Common/interface/WrapperBase.h"
0024 #include "DataFormats/Common/interface/EDProductGetter.h"
0025 
0026 #include "IOPool/Common/interface/RootServiceChecker.h"
0027 
0028 #include "FWCore/Framework/interface/InputSource.h"
0029 #include "FWCore/Framework/interface/ProductSelectorRules.h"
0030 #include "FWCore/Framework/interface/DelayedReader.h"
0031 #include "FWCore/Framework/interface/InputSourceDescription.h"
0032 #include "FWCore/Framework/interface/EventPrincipal.h"
0033 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0034 
0035 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0036 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0037 #include "FWCore/Utilities/interface/propagate_const.h"
0038 #include "FWCore/Utilities/interface/Exception.h"
0039 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
0040 #include "FWCore/Sources/interface/EventSkipperByID.h"
0041 
0042 #include "FWCore/Framework/interface/InputSourceMacros.h"
0043 
0044 #include "RunHelper.h"
0045 #include "RootFile.h"
0046 #include "InputFile.h"
0047 #include "DuplicateChecker.h"
0048 
0049 namespace edm {
0050   class RunHelperBase;
0051 
0052   class RepeatingCachedRootSource : public InputSource {
0053   public:
0054     RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc);
0055 
0056     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0057 
0058     std::shared_ptr<WrapperBase> getProduct(unsigned int iStreamIndex,
0059                                             BranchID const& k,
0060                                             EDProductGetter const* ep) const;
0061 
0062     class RCProductGetter : public EDProductGetter {
0063     public:
0064       RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}
0065 
0066       RCProductGetter const& operator=(RCProductGetter const& iOther) {
0067         map_ = iOther.map_;
0068         wrappers_ = iOther.wrappers_;
0069         return *this;
0070       }
0071 
0072       RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
0073                       std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
0074           : map_(iMap), wrappers_(iWrappers) {}
0075 
0076       WrapperBase const* getIt(ProductID const&) const override;
0077 
0078       std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0079                                                                                     unsigned int key) const override;
0080 
0081       void getThinnedProducts(ProductID const& pid,
0082                               std::vector<WrapperBase const*>& foundContainers,
0083                               std::vector<unsigned int>& keys) const override;
0084 
0085       OptionalThinnedKey getThinnedKeyFrom(ProductID const& parent,
0086                                            unsigned int key,
0087                                            ProductID const& thinned) const override;
0088 
0089     private:
0090       unsigned int transitionIndex_() const override;
0091 
0092       std::map<edm::ProductID, size_t> const* map_;
0093       std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
0094     };
0095 
0096     class RCDelayedReader : public edm::DelayedReader {
0097     public:
0098       std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
0099         return m_source->getProduct(m_streamIndex, k, ep);
0100       }
0101       void mergeReaders_(edm::DelayedReader*) final { assert(false); }
0102       void reset_() final {}
0103 
0104       unsigned int m_streamIndex;
0105       edm::RepeatingCachedRootSource const* m_source;
0106 
0107       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0108           const final {
0109         return nullptr;
0110       }
0111       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0112           const final {
0113         return nullptr;
0114       }
0115     };
0116 
0117   protected:
0118     ItemTypeInfo getNextItemType() override;
0119     void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
0120     std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
0121     void readEvent_(EventPrincipal& eventPrincipal) override;
0122 
0123   private:
0124     std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
0125     void readRun_(RunPrincipal& runPrincipal) override;
0126     bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
0127     void skip(int offset) override;
0128     bool goToEvent_(EventID const& eventID) override;
0129     void beginJob(ProductRegistry const&) override;
0130 
0131     void fillProcessBlockHelper_() override;
0132     bool nextProcessBlock_(ProcessBlockPrincipal&) override;
0133     void readProcessBlock_(ProcessBlockPrincipal&) override;
0134 
0135     std::unique_ptr<RootFile> makeRootFile(std::string const& logicalFileName,
0136                                            std::string const& pName,
0137                                            bool isSkipping,
0138                                            std::shared_ptr<InputFile> filePtr,
0139                                            std::shared_ptr<EventSkipperByID> skipper,
0140                                            std::shared_ptr<DuplicateChecker> duplicateChecker,
0141                                            std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles);
0142 
0143     RootServiceChecker rootServiceChecker_;
0144     ProductSelectorRules selectorRules_;
0145     edm::propagate_const<std::unique_ptr<RunHelperBase>> runHelper_;
0146     std::unique_ptr<RootFile> rootFile_;
0147     std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
0148     std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
0149     std::vector<RCProductGetter> getters_;  //one per cached event
0150     std::vector<EventAuxiliary> eventAuxs_;
0151     EventSelectionIDVector selectionIDs_;
0152     BranchListIndexes branchListIndexes_;
0153     ProductProvenanceRetriever provRetriever_;
0154     std::vector<RCDelayedReader> delayedReaders_;  //one per stream
0155     std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
0156     std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
0157     std::vector<size_t> streamToCacheIndex_;
0158     size_t nextEventIndex_ = 0;
0159     ItemType presentState_ = ItemType::IsFile;
0160     unsigned long long eventIndex_ = 0;
0161   };
0162 }  // namespace edm
0163 
0164 using namespace edm;
0165 //
0166 // constants, enums and typedefs
0167 //
0168 
0169 //
0170 // static data member definitions
0171 //
0172 
0173 //
0174 // constructors and destructor
0175 //
0176 RepeatingCachedRootSource::RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc)
0177     : InputSource(pset, desc),
0178       selectorRules_(pset, "inputCommands", "InputSource"),
0179       runHelper_(std::make_unique<DefaultRunHelper>()),
0180       cachedWrappers_(pset.getUntrackedParameter<unsigned int>("repeatNEvents")),
0181       eventAuxs_(cachedWrappers_.size()),
0182       provRetriever_(0),
0183       delayedReaders_(desc.allocations_->numberOfStreams()),
0184       streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
0185   {
0186     getters_.reserve(cachedWrappers_.size());
0187     for (auto& cw : cachedWrappers_) {
0188       getters_.emplace_back(&productIDToWrapperIndex_, &cw);
0189     }
0190 
0191     int index = 0;
0192     std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
0193       iR.m_streamIndex = index++;
0194       iR.m_source = this;
0195     });
0196   }
0197   auto logicalFileName = pset.getUntrackedParameter<std::string>("fileName");
0198   InputFileCatalog catalog(std::vector<std::string>(1U, logicalFileName), "");
0199   auto const& physicalFileName = catalog.fileCatalogItems().front().fileNames().front();
0200   auto const nEventsToSkip = pset.getUntrackedParameter<unsigned int>("skipEvents");
0201   std::shared_ptr<EventSkipperByID> skipper(EventSkipperByID::create(pset).release());
0202 
0203   auto duplicateChecker = std::make_shared<DuplicateChecker>(pset);
0204 
0205   std::vector<std::shared_ptr<IndexIntoFile>> indexesIntoFiles(1);
0206 
0207   auto input =
0208       std::make_shared<InputFile>(physicalFileName.c_str(), "  Initiating request to open file ", InputType::Primary);
0209   rootFile_ = makeRootFile(
0210       logicalFileName, physicalFileName, 0 != nEventsToSkip, input, skipper, duplicateChecker, indexesIntoFiles);
0211   rootFile_->reportOpened("repeating");
0212 
0213   auto const& prodList = rootFile_->productRegistry()->productList();
0214   productRegistryUpdate().updateFromInput(prodList);
0215 
0216   //setup caching
0217   auto nProdsInEvent =
0218       std::count_if(prodList.begin(), prodList.end(), [](auto&& iV) { return iV.second.branchType() == edm::InEvent; });
0219   {
0220     size_t index = 0;
0221     for (auto& prod : prodList) {
0222       if (prod.second.branchType() == edm::InEvent) {
0223         branchIDToWrapperIndex_[prod.second.branchID()] = index++;
0224       }
0225     }
0226   }
0227   for (auto& cache : cachedWrappers_) {
0228     cache.resize(nProdsInEvent);
0229   }
0230 }
0231 
0232 void RepeatingCachedRootSource::beginJob(ProductRegistry const&) {
0233   ProcessConfiguration processConfiguration;
0234   processConfiguration.setParameterSetID(ParameterSet::emptyParameterSetID());
0235   processConfiguration.setProcessConfigurationID();
0236 
0237   //in order to use the source's internal ProductRegistry for looking up date
0238   // it needs to be frozen (which setups the other structures)
0239   productRegistryUpdate().setFrozen();
0240   //Thinned collection associations are not supported at this time
0241   EventPrincipal eventPrincipal(std::shared_ptr<ProductRegistry const>(&productRegistry(), do_nothing_deleter()),
0242                                 edm::productResolversFactory::makePrimary,
0243                                 branchIDListHelper(),
0244                                 std::make_shared<ThinnedAssociationsHelper>(),
0245                                 processConfiguration,
0246                                 nullptr);
0247   {
0248     RunNumber_t run = 0;
0249     LuminosityBlockNumber_t lumi = 0;
0250     auto itAux = eventAuxs_.begin();
0251     auto itGetter = getters_.begin();
0252     for (auto& cache : cachedWrappers_) {
0253       rootFile_->nextEventEntry();
0254       rootFile_->readCurrentEvent(eventPrincipal);
0255       auto const& aux = eventPrincipal.aux();
0256       *(itAux++) = aux;
0257       if (0 == run) {
0258         run = aux.run();
0259         lumi = aux.luminosityBlock();
0260       } else {
0261         if (run != aux.run()) {
0262           throw cms::Exception("EventsWithDifferentRuns") << "The requested events to cache are from different Runs";
0263         }
0264         if (lumi != aux.luminosityBlock()) {
0265           throw cms::Exception("EventsWithDifferentLuminosityBlocks")
0266               << "The requested events to cache are from different LuminosityBlocks";
0267         }
0268       }
0269       selectionIDs_ = eventPrincipal.eventSelectionIDs();
0270       branchListIndexes_ = eventPrincipal.branchListIndexes();
0271       {
0272         auto reader = eventPrincipal.reader();
0273         auto& getter = *(itGetter++);
0274         for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0275           cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
0276         }
0277       }
0278     }
0279     for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0280       auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
0281       productIDToWrapperIndex_[pid] = branchToIndex.second;
0282     }
0283     rootFile_->rewind();
0284   }
0285 }
0286 
0287 void RepeatingCachedRootSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0288   ParameterSetDescription desc;
0289   desc.setComment(
0290       "Read only a few Events from one EDM/Root file, and repeat them in sequence. The Events are required to be from "
0291       "the same Run and LuminosityBlock.");
0292   desc.addUntracked<std::string>("fileName")->setComment("Name of file to be processed.");
0293   desc.addUntracked<unsigned int>("repeatNEvents", 10U)
0294       ->setComment("Number of events to read from file and then repeat in sequence.");
0295   desc.addUntracked<unsigned int>("skipEvents", 0);
0296   ProductSelectorRules::fillDescription(desc, "inputCommands");
0297   InputSource::fillDescription(desc);
0298 
0299   descriptions.add("source", desc);
0300 }
0301 
0302 //
0303 // member functions
0304 //
0305 
0306 std::unique_ptr<RootFile> RepeatingCachedRootSource::makeRootFile(
0307     std::string const& logicalFileName,
0308     std::string const& pName,
0309     bool isSkipping,
0310     std::shared_ptr<InputFile> filePtr,
0311     std::shared_ptr<EventSkipperByID> skipper,
0312     std::shared_ptr<DuplicateChecker> duplicateChecker,
0313     std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles) {
0314   return std::make_unique<RootFile>(pName,
0315                                     processConfiguration(),
0316                                     logicalFileName,
0317                                     filePtr,
0318                                     skipper,
0319                                     isSkipping,
0320                                     remainingEvents(),
0321                                     remainingLuminosityBlocks(),
0322                                     1,
0323                                     roottree::defaultCacheSize,  //treeCacheSize_,
0324                                     -1,                          //treeMaxVirtualSize(),
0325                                     processingMode(),
0326                                     runHelper_,
0327                                     false,  //noRunLumiSort_
0328                                     true,   //noEventSort_,
0329                                     selectorRules_,
0330                                     InputType::Primary,
0331                                     branchIDListHelper(),
0332                                     processBlockHelper().get(),
0333                                     thinnedAssociationsHelper(),
0334                                     nullptr,  // associationsFromSecondary
0335                                     duplicateChecker,
0336                                     false,  //dropDescendants(),
0337                                     processHistoryRegistryForUpdate(),
0338                                     indexesIntoFiles,
0339                                     0,  //currentIndexIntoFile,
0340                                     orderedProcessHistoryIDs_,
0341                                     false,   //bypassVersionCheck(),
0342                                     true,    //labelRawDataLikeMC(),
0343                                     false,   //usingGoToEvent_,
0344                                     true,    //enablePrefetching_,
0345                                     false);  //enforceGUIDInFileName_);
0346 }
0347 
0348 std::shared_ptr<WrapperBase> RepeatingCachedRootSource::getProduct(unsigned int iStreamIndex,
0349                                                                    BranchID const& k,
0350                                                                    EDProductGetter const* ep) const {
0351   return cachedWrappers_[streamToCacheIndex_[iStreamIndex]][branchIDToWrapperIndex_.find(k)->second];
0352 }
0353 
0354 RepeatingCachedRootSource::ItemTypeInfo RepeatingCachedRootSource::getNextItemType() {
0355   auto v = presentState_;
0356   switch (presentState_) {
0357     case ItemType::IsFile:
0358       presentState_ = ItemType::IsRun;
0359       break;
0360     case ItemType::IsRun:
0361       presentState_ = ItemType::IsLumi;
0362       break;
0363     case ItemType::IsLumi:
0364       presentState_ = ItemType::IsEvent;
0365       break;
0366     default:
0367       break;
0368   }
0369   return v;
0370 }
0371 
0372 void RepeatingCachedRootSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0373   rootFile_->readLuminosityBlock_(lumiPrincipal);
0374 }
0375 
0376 std::shared_ptr<LuminosityBlockAuxiliary> RepeatingCachedRootSource::readLuminosityBlockAuxiliary_() {
0377   return rootFile_->readLuminosityBlockAuxiliary_();
0378 }
0379 void RepeatingCachedRootSource::readEvent_(EventPrincipal& eventPrincipal) {
0380   auto index = eventIndex_++;
0381 
0382   auto repeatedIndex = index % cachedWrappers_.size();
0383 
0384   auto const& aux = eventAuxs_[repeatedIndex];
0385 
0386   auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
0387 
0388   streamToCacheIndex_[eventPrincipal.streamID().value()] = repeatedIndex;
0389   eventPrincipal.fillEventPrincipal(aux,
0390                                     history,
0391                                     selectionIDs_,
0392                                     branchListIndexes_,
0393                                     EventToProcessBlockIndexes(),
0394                                     provRetriever_,
0395                                     &delayedReaders_[eventPrincipal.streamID().value()]);
0396 }
0397 
0398 std::shared_ptr<RunAuxiliary> RepeatingCachedRootSource::readRunAuxiliary_() {
0399   return rootFile_->readRunAuxiliary_();
0400   ;
0401 }
0402 
0403 void RepeatingCachedRootSource::readRun_(RunPrincipal& runPrincipal) { rootFile_->readRun_(runPrincipal); }
0404 
0405 bool RepeatingCachedRootSource::readIt(EventID const& id,
0406                                        EventPrincipal& eventPrincipal,
0407                                        StreamContext& streamContext) {
0408   return false;
0409 }
0410 
0411 void RepeatingCachedRootSource::skip(int offset) {}
0412 
0413 bool RepeatingCachedRootSource::goToEvent_(EventID const& eventID) { return false; }
0414 
0415 void RepeatingCachedRootSource::fillProcessBlockHelper_() { rootFile_->fillProcessBlockHelper_(); }
0416 
0417 bool RepeatingCachedRootSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0418   return rootFile_->nextProcessBlock_(processBlockPrincipal);
0419 }
0420 
0421 void RepeatingCachedRootSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0422   rootFile_->readProcessBlock_(processBlockPrincipal);
0423 }
0424 
0425 WrapperBase const* RepeatingCachedRootSource::RCProductGetter::getIt(ProductID const& iPID) const {
0426   auto itFound = map_->find(iPID);
0427   if (itFound == map_->end()) {
0428     return nullptr;
0429   }
0430   return (*wrappers_)[itFound->second].get();
0431 }
0432 
0433 std::optional<std::tuple<WrapperBase const*, unsigned int>>
0434 RepeatingCachedRootSource::RCProductGetter::getThinnedProduct(ProductID const&, unsigned int key) const {
0435   return {};
0436 };
0437 
0438 void RepeatingCachedRootSource::RCProductGetter::getThinnedProducts(ProductID const& pid,
0439                                                                     std::vector<WrapperBase const*>& foundContainers,
0440                                                                     std::vector<unsigned int>& keys) const {}
0441 
0442 OptionalThinnedKey RepeatingCachedRootSource::RCProductGetter::getThinnedKeyFrom(ProductID const& parent,
0443                                                                                  unsigned int key,
0444                                                                                  ProductID const& thinned) const {
0445   return {};
0446 }
0447 unsigned int RepeatingCachedRootSource::RCProductGetter::transitionIndex_() const { return 0; }
0448 
0449 //
0450 // const member functions
0451 //
0452 
0453 //
0454 // static member functions
0455 //
0456 
0457 DEFINE_FWK_INPUT_SOURCE(RepeatingCachedRootSource);