Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:19:06

0001 // -*- C++ -*-
0002 //
0003 // Package:     Subsystem/Package
0004 // Class  :     RepeatingCachedRootSource
0005 //
0006 // Implementation:
0007 //     [Notes on implementation]
0008 //
0009 // Original Author:  Chris Jones
0010 //         Created:  Mon, 15 Mar 2021 19:02:31 GMT
0011 //
0012 
0013 // system include files
0014 #include <memory>
0015 
0016 // user include files
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
0021 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0022 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0023 #include "DataFormats/Common/interface/WrapperBase.h"
0024 #include "DataFormats/Common/interface/EDProductGetter.h"
0025 
0026 #include "IOPool/Common/interface/RootServiceChecker.h"
0027 
0028 #include "FWCore/Framework/interface/InputSource.h"
0029 #include "FWCore/Framework/interface/ProductSelectorRules.h"
0030 #include "FWCore/Framework/interface/DelayedReader.h"
0031 #include "FWCore/Framework/interface/InputSourceDescription.h"
0032 #include "FWCore/Framework/interface/EventPrincipal.h"
0033 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0034 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0035 #include "FWCore/Utilities/interface/propagate_const.h"
0036 #include "FWCore/Utilities/interface/Exception.h"
0037 #include "FWCore/Sources/interface/EventSkipperByID.h"
0038 
0039 #include "FWCore/Framework/interface/InputSourceMacros.h"
0040 
0041 #include "RunHelper.h"
0042 #include "RootFile.h"
0043 #include "InputFile.h"
0044 #include "DuplicateChecker.h"
0045 
0046 namespace edm {
0047   class RunHelperBase;
0048 
0049   class RepeatingCachedRootSource : public InputSource {
0050   public:
0051     RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc);
0052 
0053     static void fillDescriptions(ConfigurationDescriptions& descriptions);
0054 
0055     std::shared_ptr<WrapperBase> getProduct(unsigned int iStreamIndex,
0056                                             BranchID const& k,
0057                                             EDProductGetter const* ep) const;
0058 
0059     class RCProductGetter : public EDProductGetter {
0060     public:
0061       RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}
0062 
0063       RCProductGetter const& operator=(RCProductGetter const& iOther) {
0064         map_ = iOther.map_;
0065         wrappers_ = iOther.wrappers_;
0066         return *this;
0067       }
0068 
0069       RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
0070                       std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
0071           : map_(iMap), wrappers_(iWrappers) {}
0072 
0073       WrapperBase const* getIt(ProductID const&) const override;
0074 
0075       std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0076                                                                                     unsigned int key) const override;
0077 
0078       void getThinnedProducts(ProductID const& pid,
0079                               std::vector<WrapperBase const*>& foundContainers,
0080                               std::vector<unsigned int>& keys) const override;
0081 
0082       OptionalThinnedKey getThinnedKeyFrom(ProductID const& parent,
0083                                            unsigned int key,
0084                                            ProductID const& thinned) const override;
0085 
0086     private:
0087       unsigned int transitionIndex_() const override;
0088 
0089       std::map<edm::ProductID, size_t> const* map_;
0090       std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
0091     };
0092 
0093     class RCDelayedReader : public edm::DelayedReader {
0094     public:
0095       std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
0096         return m_source->getProduct(m_streamIndex, k, ep);
0097       }
0098       void mergeReaders_(edm::DelayedReader*) final { assert(false); }
0099       void reset_() final {}
0100 
0101       unsigned int m_streamIndex;
0102       edm::RepeatingCachedRootSource const* m_source;
0103 
0104       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0105           const final {
0106         return nullptr;
0107       }
0108       signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0109           const final {
0110         return nullptr;
0111       }
0112     };
0113 
0114   protected:
0115     ItemTypeInfo getNextItemType() override;
0116     void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
0117     std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
0118     void readEvent_(EventPrincipal& eventPrincipal) override;
0119 
0120   private:
0121     std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
0122     void readRun_(RunPrincipal& runPrincipal) override;
0123     bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
0124     void skip(int offset) override;
0125     bool goToEvent_(EventID const& eventID) override;
0126     void beginJob() override;
0127 
0128     void fillProcessBlockHelper_() override;
0129     bool nextProcessBlock_(ProcessBlockPrincipal&) override;
0130     void readProcessBlock_(ProcessBlockPrincipal&) override;
0131 
0132     std::unique_ptr<RootFile> makeRootFile(std::string const& logicalFileName,
0133                                            std::string const& pName,
0134                                            bool isSkipping,
0135                                            std::shared_ptr<InputFile> filePtr,
0136                                            std::shared_ptr<EventSkipperByID> skipper,
0137                                            std::shared_ptr<DuplicateChecker> duplicateChecker,
0138                                            std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles);
0139 
0140     RootServiceChecker rootServiceChecker_;
0141     ProductSelectorRules selectorRules_;
0142     edm::propagate_const<std::unique_ptr<RunHelperBase>> runHelper_;
0143     std::unique_ptr<RootFile> rootFile_;
0144     std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
0145     std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
0146     std::vector<RCProductGetter> getters_;  //one per cached event
0147     std::vector<EventAuxiliary> eventAuxs_;
0148     EventSelectionIDVector selectionIDs_;
0149     BranchListIndexes branchListIndexes_;
0150     ProductProvenanceRetriever provRetriever_;
0151     std::vector<RCDelayedReader> delayedReaders_;  //one per stream
0152     std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
0153     std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
0154     std::vector<size_t> streamToCacheIndex_;
0155     size_t nextEventIndex_ = 0;
0156     ItemType presentState_ = ItemType::IsFile;
0157     unsigned long long eventIndex_ = 0;
0158   };
0159 }  // namespace edm
0160 
0161 using namespace edm;
0162 //
0163 // constants, enums and typedefs
0164 //
0165 
0166 //
0167 // static data member definitions
0168 //
0169 
0170 //
0171 // constructors and destructor
0172 //
0173 RepeatingCachedRootSource::RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc)
0174     : InputSource(pset, desc),
0175       selectorRules_(pset, "inputCommands", "InputSource"),
0176       runHelper_(std::make_unique<DefaultRunHelper>()),
0177       cachedWrappers_(pset.getUntrackedParameter<unsigned int>("repeatNEvents")),
0178       eventAuxs_(cachedWrappers_.size()),
0179       provRetriever_(0),
0180       delayedReaders_(desc.allocations_->numberOfStreams()),
0181       streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
0182   {
0183     getters_.reserve(cachedWrappers_.size());
0184     for (auto& cw : cachedWrappers_) {
0185       getters_.emplace_back(&productIDToWrapperIndex_, &cw);
0186     }
0187 
0188     int index = 0;
0189     std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
0190       iR.m_streamIndex = index++;
0191       iR.m_source = this;
0192     });
0193   }
0194   auto logicalFileName = pset.getUntrackedParameter<std::string>("fileName");
0195   InputFileCatalog catalog(std::vector<std::string>(1U, logicalFileName), "");
0196   auto const& physicalFileName = catalog.fileCatalogItems().front().fileNames().front();
0197   auto const nEventsToSkip = pset.getUntrackedParameter<unsigned int>("skipEvents");
0198   std::shared_ptr<EventSkipperByID> skipper(EventSkipperByID::create(pset).release());
0199 
0200   auto duplicateChecker = std::make_shared<DuplicateChecker>(pset);
0201 
0202   std::vector<std::shared_ptr<IndexIntoFile>> indexesIntoFiles(1);
0203 
0204   auto input =
0205       std::make_shared<InputFile>(physicalFileName.c_str(), "  Initiating request to open file ", InputType::Primary);
0206   rootFile_ = makeRootFile(
0207       logicalFileName, physicalFileName, 0 != nEventsToSkip, input, skipper, duplicateChecker, indexesIntoFiles);
0208   rootFile_->reportOpened("repeating");
0209 
0210   auto const& prodList = rootFile_->productRegistry()->productList();
0211   productRegistryUpdate().updateFromInput(prodList);
0212 
0213   //setup caching
0214   auto nProdsInEvent =
0215       std::count_if(prodList.begin(), prodList.end(), [](auto&& iV) { return iV.second.branchType() == edm::InEvent; });
0216   {
0217     size_t index = 0;
0218     for (auto& prod : prodList) {
0219       if (prod.second.branchType() == edm::InEvent) {
0220         branchIDToWrapperIndex_[prod.second.branchID()] = index++;
0221       }
0222     }
0223   }
0224   for (auto& cache : cachedWrappers_) {
0225     cache.resize(nProdsInEvent);
0226   }
0227 }
0228 
0229 void RepeatingCachedRootSource::beginJob() {
0230   ProcessConfiguration processConfiguration;
0231   processConfiguration.setParameterSetID(ParameterSet::emptyParameterSetID());
0232   processConfiguration.setProcessConfigurationID();
0233 
0234   //Thinned collection associations are not supported at this time
0235   EventPrincipal eventPrincipal(productRegistry(),
0236                                 branchIDListHelper(),
0237                                 std::make_shared<ThinnedAssociationsHelper>(),
0238                                 processConfiguration,
0239                                 nullptr);
0240   {
0241     RunNumber_t run = 0;
0242     LuminosityBlockNumber_t lumi = 0;
0243     auto itAux = eventAuxs_.begin();
0244     auto itGetter = getters_.begin();
0245     for (auto& cache : cachedWrappers_) {
0246       rootFile_->nextEventEntry();
0247       rootFile_->readCurrentEvent(eventPrincipal);
0248       auto const& aux = eventPrincipal.aux();
0249       *(itAux++) = aux;
0250       if (0 == run) {
0251         run = aux.run();
0252         lumi = aux.luminosityBlock();
0253       } else {
0254         if (run != aux.run()) {
0255           throw cms::Exception("EventsWithDifferentRuns") << "The requested events to cache are from different Runs";
0256         }
0257         if (lumi != aux.luminosityBlock()) {
0258           throw cms::Exception("EventsWithDifferentLuminosityBlocks")
0259               << "The requested events to cache are from different LuminosityBlocks";
0260         }
0261       }
0262       selectionIDs_ = eventPrincipal.eventSelectionIDs();
0263       branchListIndexes_ = eventPrincipal.branchListIndexes();
0264       {
0265         auto reader = eventPrincipal.reader();
0266         auto& getter = *(itGetter++);
0267         for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0268           cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
0269         }
0270       }
0271     }
0272     for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0273       auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
0274       productIDToWrapperIndex_[pid] = branchToIndex.second;
0275     }
0276     rootFile_->rewind();
0277   }
0278 }
0279 
0280 void RepeatingCachedRootSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0281   ParameterSetDescription desc;
0282   desc.setComment(
0283       "Read only a few Events from one EDM/Root file, and repeat them in sequence. The Events are required to be from "
0284       "the same Run and LuminosityBlock.");
0285   desc.addUntracked<std::string>("fileName")->setComment("Name of file to be processed.");
0286   desc.addUntracked<unsigned int>("repeatNEvents", 10U)
0287       ->setComment("Number of events to read from file and then repeat in sequence.");
0288   desc.addUntracked<unsigned int>("skipEvents", 0);
0289   ProductSelectorRules::fillDescription(desc, "inputCommands");
0290   InputSource::fillDescription(desc);
0291 
0292   descriptions.add("source", desc);
0293 }
0294 
0295 //
0296 // member functions
0297 //
0298 
0299 std::unique_ptr<RootFile> RepeatingCachedRootSource::makeRootFile(
0300     std::string const& logicalFileName,
0301     std::string const& pName,
0302     bool isSkipping,
0303     std::shared_ptr<InputFile> filePtr,
0304     std::shared_ptr<EventSkipperByID> skipper,
0305     std::shared_ptr<DuplicateChecker> duplicateChecker,
0306     std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles) {
0307   return std::make_unique<RootFile>(pName,
0308                                     processConfiguration(),
0309                                     logicalFileName,
0310                                     filePtr,
0311                                     skipper,
0312                                     isSkipping,
0313                                     remainingEvents(),
0314                                     remainingLuminosityBlocks(),
0315                                     1,
0316                                     roottree::defaultCacheSize,  //treeCacheSize_,
0317                                     -1,                          //treeMaxVirtualSize(),
0318                                     processingMode(),
0319                                     runHelper_,
0320                                     false,  //noRunLumiSort_
0321                                     true,   //noEventSort_,
0322                                     selectorRules_,
0323                                     InputType::Primary,
0324                                     branchIDListHelper(),
0325                                     processBlockHelper().get(),
0326                                     thinnedAssociationsHelper(),
0327                                     nullptr,  // associationsFromSecondary
0328                                     duplicateChecker,
0329                                     false,  //dropDescendants(),
0330                                     processHistoryRegistryForUpdate(),
0331                                     indexesIntoFiles,
0332                                     0,  //currentIndexIntoFile,
0333                                     orderedProcessHistoryIDs_,
0334                                     false,   //bypassVersionCheck(),
0335                                     true,    //labelRawDataLikeMC(),
0336                                     false,   //usingGoToEvent_,
0337                                     true,    //enablePrefetching_,
0338                                     false);  //enforceGUIDInFileName_);
0339 }
0340 
0341 std::shared_ptr<WrapperBase> RepeatingCachedRootSource::getProduct(unsigned int iStreamIndex,
0342                                                                    BranchID const& k,
0343                                                                    EDProductGetter const* ep) const {
0344   return cachedWrappers_[streamToCacheIndex_[iStreamIndex]][branchIDToWrapperIndex_.find(k)->second];
0345 }
0346 
0347 RepeatingCachedRootSource::ItemTypeInfo RepeatingCachedRootSource::getNextItemType() {
0348   auto v = presentState_;
0349   switch (presentState_) {
0350     case ItemType::IsFile:
0351       presentState_ = ItemType::IsRun;
0352       break;
0353     case ItemType::IsRun:
0354       presentState_ = ItemType::IsLumi;
0355       break;
0356     case ItemType::IsLumi:
0357       presentState_ = ItemType::IsEvent;
0358       break;
0359     default:
0360       break;
0361   }
0362   return v;
0363 }
0364 
0365 void RepeatingCachedRootSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0366   rootFile_->readLuminosityBlock_(lumiPrincipal);
0367 }
0368 
0369 std::shared_ptr<LuminosityBlockAuxiliary> RepeatingCachedRootSource::readLuminosityBlockAuxiliary_() {
0370   return rootFile_->readLuminosityBlockAuxiliary_();
0371 }
0372 void RepeatingCachedRootSource::readEvent_(EventPrincipal& eventPrincipal) {
0373   auto index = eventIndex_++;
0374 
0375   auto repeatedIndex = index % cachedWrappers_.size();
0376 
0377   auto const& aux = eventAuxs_[repeatedIndex];
0378 
0379   auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
0380 
0381   streamToCacheIndex_[eventPrincipal.streamID().value()] = repeatedIndex;
0382   eventPrincipal.fillEventPrincipal(aux,
0383                                     history,
0384                                     selectionIDs_,
0385                                     branchListIndexes_,
0386                                     EventToProcessBlockIndexes(),
0387                                     provRetriever_,
0388                                     &delayedReaders_[eventPrincipal.streamID().value()]);
0389 }
0390 
0391 std::shared_ptr<RunAuxiliary> RepeatingCachedRootSource::readRunAuxiliary_() {
0392   return rootFile_->readRunAuxiliary_();
0393   ;
0394 }
0395 
0396 void RepeatingCachedRootSource::readRun_(RunPrincipal& runPrincipal) { rootFile_->readRun_(runPrincipal); }
0397 
0398 bool RepeatingCachedRootSource::readIt(EventID const& id,
0399                                        EventPrincipal& eventPrincipal,
0400                                        StreamContext& streamContext) {
0401   return false;
0402 }
0403 
0404 void RepeatingCachedRootSource::skip(int offset) {}
0405 
0406 bool RepeatingCachedRootSource::goToEvent_(EventID const& eventID) { return false; }
0407 
0408 void RepeatingCachedRootSource::fillProcessBlockHelper_() { rootFile_->fillProcessBlockHelper_(); }
0409 
0410 bool RepeatingCachedRootSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0411   return rootFile_->nextProcessBlock_(processBlockPrincipal);
0412 }
0413 
0414 void RepeatingCachedRootSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0415   rootFile_->readProcessBlock_(processBlockPrincipal);
0416 }
0417 
0418 WrapperBase const* RepeatingCachedRootSource::RCProductGetter::getIt(ProductID const& iPID) const {
0419   auto itFound = map_->find(iPID);
0420   if (itFound == map_->end()) {
0421     return nullptr;
0422   }
0423   return (*wrappers_)[itFound->second].get();
0424 }
0425 
0426 std::optional<std::tuple<WrapperBase const*, unsigned int>>
0427 RepeatingCachedRootSource::RCProductGetter::getThinnedProduct(ProductID const&, unsigned int key) const {
0428   return {};
0429 };
0430 
0431 void RepeatingCachedRootSource::RCProductGetter::getThinnedProducts(ProductID const& pid,
0432                                                                     std::vector<WrapperBase const*>& foundContainers,
0433                                                                     std::vector<unsigned int>& keys) const {}
0434 
0435 OptionalThinnedKey RepeatingCachedRootSource::RCProductGetter::getThinnedKeyFrom(ProductID const& parent,
0436                                                                                  unsigned int key,
0437                                                                                  ProductID const& thinned) const {
0438   return {};
0439 }
0440 unsigned int RepeatingCachedRootSource::RCProductGetter::transitionIndex_() const { return 0; }
0441 
0442 //
0443 // const member functions
0444 //
0445 
0446 //
0447 // static member functions
0448 //
0449 
0450 DEFINE_FWK_INPUT_SOURCE(RepeatingCachedRootSource);