File indexing completed on 2025-05-23 02:05:06
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <memory>
0015
0016
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
0021 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0022 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0023 #include "DataFormats/Common/interface/WrapperBase.h"
0024 #include "DataFormats/Common/interface/EDProductGetter.h"
0025
0026 #include "IOPool/Common/interface/RootServiceChecker.h"
0027
0028 #include "FWCore/Framework/interface/InputSource.h"
0029 #include "FWCore/Framework/interface/ProductSelectorRules.h"
0030 #include "FWCore/Framework/interface/DelayedReader.h"
0031 #include "FWCore/Framework/interface/InputSourceDescription.h"
0032 #include "FWCore/Framework/interface/EventPrincipal.h"
0033 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0034
0035 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0036 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0037 #include "FWCore/Utilities/interface/propagate_const.h"
0038 #include "FWCore/Utilities/interface/Exception.h"
0039 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
0040 #include "FWCore/Sources/interface/EventSkipperByID.h"
0041
0042 #include "FWCore/Framework/interface/InputSourceMacros.h"
0043
0044 #include "RunHelper.h"
0045 #include "RootFile.h"
0046 #include "InputFile.h"
0047 #include "DuplicateChecker.h"
0048
0049 namespace edm {
0050 class RunHelperBase;
0051
0052 class RepeatingCachedRootSource : public InputSource {
0053 public:
0054 RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc);
0055
0056 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0057
0058 std::shared_ptr<WrapperBase> getProduct(unsigned int iStreamIndex,
0059 BranchID const& k,
0060 EDProductGetter const* ep) const;
0061
0062 class RCProductGetter : public EDProductGetter {
0063 public:
0064 RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}
0065
0066 RCProductGetter const& operator=(RCProductGetter const& iOther) {
0067 map_ = iOther.map_;
0068 wrappers_ = iOther.wrappers_;
0069 return *this;
0070 }
0071
0072 RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
0073 std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
0074 : map_(iMap), wrappers_(iWrappers) {}
0075
0076 WrapperBase const* getIt(ProductID const&) const override;
0077
0078 std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0079 unsigned int key) const override;
0080
0081 void getThinnedProducts(ProductID const& pid,
0082 std::vector<WrapperBase const*>& foundContainers,
0083 std::vector<unsigned int>& keys) const override;
0084
0085 OptionalThinnedKey getThinnedKeyFrom(ProductID const& parent,
0086 unsigned int key,
0087 ProductID const& thinned) const override;
0088
0089 private:
0090 unsigned int transitionIndex_() const override;
0091
0092 std::map<edm::ProductID, size_t> const* map_;
0093 std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
0094 };
0095
0096 class RCDelayedReader : public edm::DelayedReader {
0097 public:
0098 std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
0099 return m_source->getProduct(m_streamIndex, k, ep);
0100 }
0101 void mergeReaders_(edm::DelayedReader*) final { assert(false); }
0102 void reset_() final {}
0103
0104 unsigned int m_streamIndex;
0105 edm::RepeatingCachedRootSource const* m_source;
0106
0107 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0108 const final {
0109 return nullptr;
0110 }
0111 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0112 const final {
0113 return nullptr;
0114 }
0115 };
0116
0117 protected:
0118 ItemTypeInfo getNextItemType() override;
0119 void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
0120 std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
0121 void readEvent_(EventPrincipal& eventPrincipal) override;
0122
0123 private:
0124 std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
0125 void readRun_(RunPrincipal& runPrincipal) override;
0126 bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
0127 void skip(int offset) override;
0128 bool goToEvent_(EventID const& eventID) override;
0129 void beginJob(ProductRegistry const&) override;
0130
0131 void fillProcessBlockHelper_() override;
0132 bool nextProcessBlock_(ProcessBlockPrincipal&) override;
0133 void readProcessBlock_(ProcessBlockPrincipal&) override;
0134
0135 std::unique_ptr<RootFile> makeRootFile(std::string const& logicalFileName,
0136 std::string const& pName,
0137 bool isSkipping,
0138 std::shared_ptr<InputFile> filePtr,
0139 std::shared_ptr<EventSkipperByID> skipper,
0140 std::shared_ptr<DuplicateChecker> duplicateChecker,
0141 std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles);
0142
0143 RootServiceChecker rootServiceChecker_;
0144 ProductSelectorRules selectorRules_;
0145 edm::propagate_const<std::unique_ptr<RunHelperBase>> runHelper_;
0146 std::unique_ptr<RootFile> rootFile_;
0147 std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
0148 std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
0149 std::vector<RCProductGetter> getters_;
0150 std::vector<EventAuxiliary> eventAuxs_;
0151 EventSelectionIDVector selectionIDs_;
0152 BranchListIndexes branchListIndexes_;
0153 ProductProvenanceRetriever provRetriever_;
0154 std::vector<RCDelayedReader> delayedReaders_;
0155 std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
0156 std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
0157 std::vector<size_t> streamToCacheIndex_;
0158 size_t nextEventIndex_ = 0;
0159 ItemType presentState_ = ItemType::IsFile;
0160 unsigned long long eventIndex_ = 0;
0161 };
0162 }
0163
0164 using namespace edm;
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174
0175
0176 RepeatingCachedRootSource::RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc)
0177 : InputSource(pset, desc),
0178 selectorRules_(pset, "inputCommands", "InputSource"),
0179 runHelper_(std::make_unique<DefaultRunHelper>()),
0180 cachedWrappers_(pset.getUntrackedParameter<unsigned int>("repeatNEvents")),
0181 eventAuxs_(cachedWrappers_.size()),
0182 provRetriever_(0),
0183 delayedReaders_(desc.allocations_->numberOfStreams()),
0184 streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
0185 {
0186 getters_.reserve(cachedWrappers_.size());
0187 for (auto& cw : cachedWrappers_) {
0188 getters_.emplace_back(&productIDToWrapperIndex_, &cw);
0189 }
0190
0191 int index = 0;
0192 std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
0193 iR.m_streamIndex = index++;
0194 iR.m_source = this;
0195 });
0196 }
0197 auto logicalFileName = pset.getUntrackedParameter<std::string>("fileName");
0198 InputFileCatalog catalog(std::vector<std::string>(1U, logicalFileName), "");
0199 auto const& physicalFileName = catalog.fileCatalogItems().front().fileNames().front();
0200 auto const nEventsToSkip = pset.getUntrackedParameter<unsigned int>("skipEvents");
0201 std::shared_ptr<EventSkipperByID> skipper(EventSkipperByID::create(pset).release());
0202
0203 auto duplicateChecker = std::make_shared<DuplicateChecker>(pset);
0204
0205 std::vector<std::shared_ptr<IndexIntoFile>> indexesIntoFiles(1);
0206
0207 auto input =
0208 std::make_shared<InputFile>(physicalFileName.c_str(), " Initiating request to open file ", InputType::Primary);
0209 rootFile_ = makeRootFile(
0210 logicalFileName, physicalFileName, 0 != nEventsToSkip, input, skipper, duplicateChecker, indexesIntoFiles);
0211 rootFile_->reportOpened("repeating");
0212
0213 auto const& prodList = rootFile_->productRegistry()->productList();
0214 productRegistryUpdate().updateFromInput(prodList);
0215
0216
0217 auto nProdsInEvent =
0218 std::count_if(prodList.begin(), prodList.end(), [](auto&& iV) { return iV.second.branchType() == edm::InEvent; });
0219 {
0220 size_t index = 0;
0221 for (auto& prod : prodList) {
0222 if (prod.second.branchType() == edm::InEvent) {
0223 branchIDToWrapperIndex_[prod.second.branchID()] = index++;
0224 }
0225 }
0226 }
0227 for (auto& cache : cachedWrappers_) {
0228 cache.resize(nProdsInEvent);
0229 }
0230 }
0231
0232 void RepeatingCachedRootSource::beginJob(ProductRegistry const&) {
0233 ProcessConfiguration processConfiguration;
0234 processConfiguration.setParameterSetID(ParameterSet::emptyParameterSetID());
0235 processConfiguration.setProcessConfigurationID();
0236
0237
0238
0239 productRegistryUpdate().setFrozen();
0240
0241 EventPrincipal eventPrincipal(std::shared_ptr<ProductRegistry const>(&productRegistry(), do_nothing_deleter()),
0242 edm::productResolversFactory::makePrimary,
0243 branchIDListHelper(),
0244 std::make_shared<ThinnedAssociationsHelper>(),
0245 processConfiguration,
0246 nullptr);
0247 {
0248 RunNumber_t run = 0;
0249 LuminosityBlockNumber_t lumi = 0;
0250 auto itAux = eventAuxs_.begin();
0251 auto itGetter = getters_.begin();
0252 for (auto& cache : cachedWrappers_) {
0253 rootFile_->nextEventEntry();
0254 rootFile_->readCurrentEvent(eventPrincipal);
0255 auto const& aux = eventPrincipal.aux();
0256 *(itAux++) = aux;
0257 if (0 == run) {
0258 run = aux.run();
0259 lumi = aux.luminosityBlock();
0260 } else {
0261 if (run != aux.run()) {
0262 throw cms::Exception("EventsWithDifferentRuns") << "The requested events to cache are from different Runs";
0263 }
0264 if (lumi != aux.luminosityBlock()) {
0265 throw cms::Exception("EventsWithDifferentLuminosityBlocks")
0266 << "The requested events to cache are from different LuminosityBlocks";
0267 }
0268 }
0269 selectionIDs_ = eventPrincipal.eventSelectionIDs();
0270 branchListIndexes_ = eventPrincipal.branchListIndexes();
0271 {
0272 auto reader = eventPrincipal.reader();
0273 auto& getter = *(itGetter++);
0274 for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0275 cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
0276 }
0277 }
0278 }
0279 for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0280 auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
0281 productIDToWrapperIndex_[pid] = branchToIndex.second;
0282 }
0283 rootFile_->rewind();
0284 }
0285 }
0286
0287 void RepeatingCachedRootSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0288 ParameterSetDescription desc;
0289 desc.setComment(
0290 "Read only a few Events from one EDM/Root file, and repeat them in sequence. The Events are required to be from "
0291 "the same Run and LuminosityBlock.");
0292 desc.addUntracked<std::string>("fileName")->setComment("Name of file to be processed.");
0293 desc.addUntracked<unsigned int>("repeatNEvents", 10U)
0294 ->setComment("Number of events to read from file and then repeat in sequence.");
0295 desc.addUntracked<unsigned int>("skipEvents", 0);
0296 ProductSelectorRules::fillDescription(desc, "inputCommands");
0297 InputSource::fillDescription(desc);
0298
0299 descriptions.add("source", desc);
0300 }
0301
0302
0303
0304
0305
0306 std::unique_ptr<RootFile> RepeatingCachedRootSource::makeRootFile(
0307 std::string const& logicalFileName,
0308 std::string const& pName,
0309 bool isSkipping,
0310 std::shared_ptr<InputFile> filePtr,
0311 std::shared_ptr<EventSkipperByID> skipper,
0312 std::shared_ptr<DuplicateChecker> duplicateChecker,
0313 std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles) {
0314 return std::make_unique<RootFile>(
0315 RootFile::FileOptions{.fileName = pName,
0316 .logicalFileName = logicalFileName,
0317 .filePtr = filePtr,
0318 .bypassVersionCheck = false,
0319 .enforceGUIDInFileName = false},
0320 InputType::Primary,
0321 RootFile::ProcessingOptions{.eventSkipperByID = skipper,
0322 .skipAnyEvents = isSkipping,
0323 .remainingEvents = remainingEvents(),
0324 .remainingLumis = remainingLuminosityBlocks(),
0325 .processingMode = processingMode(),
0326 .noRunLumiSort = false,
0327 .noEventSort = true,
0328 .usingGoToEvent = false},
0329 RootFile::TTreeOptions{
0330 .treeCacheSize = roottree::defaultCacheSize, .treeMaxVirtualSize = -1, .enablePrefetching = true},
0331 RootFile::ProductChoices{.productSelectorRules = selectorRules_,
0332 .associationsFromSecondary = nullptr,
0333 .dropDescendantsOfDroppedProducts = false,
0334 .labelRawDataLikeMC = true},
0335 RootFile::CrossFileInfo{.runHelper = runHelper_.get(),
0336 .branchIDListHelper = branchIDListHelper(),
0337 .processBlockHelper = processBlockHelper().get(),
0338 .thinnedAssociationsHelper = thinnedAssociationsHelper(),
0339 .duplicateChecker = duplicateChecker,
0340 .indexesIntoFiles = indexesIntoFiles,
0341 .currentIndexIntoFile = 0},
0342 1,
0343 processHistoryRegistryForUpdate(),
0344 orderedProcessHistoryIDs_);
0345 }
0346
0347 std::shared_ptr<WrapperBase> RepeatingCachedRootSource::getProduct(unsigned int iStreamIndex,
0348 BranchID const& k,
0349 EDProductGetter const* ep) const {
0350 return cachedWrappers_[streamToCacheIndex_[iStreamIndex]][branchIDToWrapperIndex_.find(k)->second];
0351 }
0352
0353 RepeatingCachedRootSource::ItemTypeInfo RepeatingCachedRootSource::getNextItemType() {
0354 auto v = presentState_;
0355 switch (presentState_) {
0356 case ItemType::IsFile:
0357 presentState_ = ItemType::IsRun;
0358 break;
0359 case ItemType::IsRun:
0360 presentState_ = ItemType::IsLumi;
0361 break;
0362 case ItemType::IsLumi:
0363 presentState_ = ItemType::IsEvent;
0364 break;
0365 default:
0366 break;
0367 }
0368 return v;
0369 }
0370
0371 void RepeatingCachedRootSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0372 rootFile_->readLuminosityBlock_(lumiPrincipal);
0373 }
0374
0375 std::shared_ptr<LuminosityBlockAuxiliary> RepeatingCachedRootSource::readLuminosityBlockAuxiliary_() {
0376 return rootFile_->readLuminosityBlockAuxiliary_();
0377 }
0378 void RepeatingCachedRootSource::readEvent_(EventPrincipal& eventPrincipal) {
0379 auto index = eventIndex_++;
0380
0381 auto repeatedIndex = index % cachedWrappers_.size();
0382
0383 auto const& aux = eventAuxs_[repeatedIndex];
0384
0385 auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
0386
0387 streamToCacheIndex_[eventPrincipal.streamID().value()] = repeatedIndex;
0388 eventPrincipal.fillEventPrincipal(aux,
0389 history,
0390 selectionIDs_,
0391 branchListIndexes_,
0392 EventToProcessBlockIndexes(),
0393 provRetriever_,
0394 &delayedReaders_[eventPrincipal.streamID().value()]);
0395 }
0396
0397 std::shared_ptr<RunAuxiliary> RepeatingCachedRootSource::readRunAuxiliary_() {
0398 return rootFile_->readRunAuxiliary_();
0399 ;
0400 }
0401
0402 void RepeatingCachedRootSource::readRun_(RunPrincipal& runPrincipal) { rootFile_->readRun_(runPrincipal); }
0403
0404 bool RepeatingCachedRootSource::readIt(EventID const& id,
0405 EventPrincipal& eventPrincipal,
0406 StreamContext& streamContext) {
0407 return false;
0408 }
0409
0410 void RepeatingCachedRootSource::skip(int offset) {}
0411
0412 bool RepeatingCachedRootSource::goToEvent_(EventID const& eventID) { return false; }
0413
0414 void RepeatingCachedRootSource::fillProcessBlockHelper_() { rootFile_->fillProcessBlockHelper_(); }
0415
0416 bool RepeatingCachedRootSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0417 return rootFile_->nextProcessBlock_(processBlockPrincipal);
0418 }
0419
0420 void RepeatingCachedRootSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0421 rootFile_->readProcessBlock_(processBlockPrincipal);
0422 }
0423
0424 WrapperBase const* RepeatingCachedRootSource::RCProductGetter::getIt(ProductID const& iPID) const {
0425 auto itFound = map_->find(iPID);
0426 if (itFound == map_->end()) {
0427 return nullptr;
0428 }
0429 return (*wrappers_)[itFound->second].get();
0430 }
0431
0432 std::optional<std::tuple<WrapperBase const*, unsigned int>>
0433 RepeatingCachedRootSource::RCProductGetter::getThinnedProduct(ProductID const&, unsigned int key) const {
0434 return {};
0435 };
0436
0437 void RepeatingCachedRootSource::RCProductGetter::getThinnedProducts(ProductID const& pid,
0438 std::vector<WrapperBase const*>& foundContainers,
0439 std::vector<unsigned int>& keys) const {}
0440
0441 OptionalThinnedKey RepeatingCachedRootSource::RCProductGetter::getThinnedKeyFrom(ProductID const& parent,
0442 unsigned int key,
0443 ProductID const& thinned) const {
0444 return {};
0445 }
0446 unsigned int RepeatingCachedRootSource::RCProductGetter::transitionIndex_() const { return 0; }
0447
0448
0449
0450
0451
0452
0453
0454
0455
0456 DEFINE_FWK_INPUT_SOURCE(RepeatingCachedRootSource);