File indexing completed on 2025-01-31 23:35:53
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <memory>
0015
0016
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
0021 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0022 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0023 #include "DataFormats/Common/interface/WrapperBase.h"
0024 #include "DataFormats/Common/interface/EDProductGetter.h"
0025
0026 #include "IOPool/Common/interface/RootServiceChecker.h"
0027
0028 #include "FWCore/Framework/interface/InputSource.h"
0029 #include "FWCore/Framework/interface/ProductSelectorRules.h"
0030 #include "FWCore/Framework/interface/DelayedReader.h"
0031 #include "FWCore/Framework/interface/InputSourceDescription.h"
0032 #include "FWCore/Framework/interface/EventPrincipal.h"
0033 #include "FWCore/Framework/interface/ProductResolversFactory.h"
0034
0035 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0036 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0037 #include "FWCore/Utilities/interface/propagate_const.h"
0038 #include "FWCore/Utilities/interface/Exception.h"
0039 #include "FWCore/Utilities/interface/do_nothing_deleter.h"
0040 #include "FWCore/Sources/interface/EventSkipperByID.h"
0041
0042 #include "FWCore/Framework/interface/InputSourceMacros.h"
0043
0044 #include "RunHelper.h"
0045 #include "RootFile.h"
0046 #include "InputFile.h"
0047 #include "DuplicateChecker.h"
0048
0049 namespace edm {
0050 class RunHelperBase;
0051
0052 class RepeatingCachedRootSource : public InputSource {
0053 public:
0054 RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc);
0055
0056 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0057
0058 std::shared_ptr<WrapperBase> getProduct(unsigned int iStreamIndex,
0059 BranchID const& k,
0060 EDProductGetter const* ep) const;
0061
0062 class RCProductGetter : public EDProductGetter {
0063 public:
0064 RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}
0065
0066 RCProductGetter const& operator=(RCProductGetter const& iOther) {
0067 map_ = iOther.map_;
0068 wrappers_ = iOther.wrappers_;
0069 return *this;
0070 }
0071
0072 RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
0073 std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
0074 : map_(iMap), wrappers_(iWrappers) {}
0075
0076 WrapperBase const* getIt(ProductID const&) const override;
0077
0078 std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0079 unsigned int key) const override;
0080
0081 void getThinnedProducts(ProductID const& pid,
0082 std::vector<WrapperBase const*>& foundContainers,
0083 std::vector<unsigned int>& keys) const override;
0084
0085 OptionalThinnedKey getThinnedKeyFrom(ProductID const& parent,
0086 unsigned int key,
0087 ProductID const& thinned) const override;
0088
0089 private:
0090 unsigned int transitionIndex_() const override;
0091
0092 std::map<edm::ProductID, size_t> const* map_;
0093 std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
0094 };
0095
0096 class RCDelayedReader : public edm::DelayedReader {
0097 public:
0098 std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
0099 return m_source->getProduct(m_streamIndex, k, ep);
0100 }
0101 void mergeReaders_(edm::DelayedReader*) final { assert(false); }
0102 void reset_() final {}
0103
0104 unsigned int m_streamIndex;
0105 edm::RepeatingCachedRootSource const* m_source;
0106
0107 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0108 const final {
0109 return nullptr;
0110 }
0111 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0112 const final {
0113 return nullptr;
0114 }
0115 };
0116
0117 protected:
0118 ItemTypeInfo getNextItemType() override;
0119 void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
0120 std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
0121 void readEvent_(EventPrincipal& eventPrincipal) override;
0122
0123 private:
0124 std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
0125 void readRun_(RunPrincipal& runPrincipal) override;
0126 bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
0127 void skip(int offset) override;
0128 bool goToEvent_(EventID const& eventID) override;
0129 void beginJob(ProductRegistry const&) override;
0130
0131 void fillProcessBlockHelper_() override;
0132 bool nextProcessBlock_(ProcessBlockPrincipal&) override;
0133 void readProcessBlock_(ProcessBlockPrincipal&) override;
0134
0135 std::unique_ptr<RootFile> makeRootFile(std::string const& logicalFileName,
0136 std::string const& pName,
0137 bool isSkipping,
0138 std::shared_ptr<InputFile> filePtr,
0139 std::shared_ptr<EventSkipperByID> skipper,
0140 std::shared_ptr<DuplicateChecker> duplicateChecker,
0141 std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles);
0142
0143 RootServiceChecker rootServiceChecker_;
0144 ProductSelectorRules selectorRules_;
0145 edm::propagate_const<std::unique_ptr<RunHelperBase>> runHelper_;
0146 std::unique_ptr<RootFile> rootFile_;
0147 std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
0148 std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
0149 std::vector<RCProductGetter> getters_;
0150 std::vector<EventAuxiliary> eventAuxs_;
0151 EventSelectionIDVector selectionIDs_;
0152 BranchListIndexes branchListIndexes_;
0153 ProductProvenanceRetriever provRetriever_;
0154 std::vector<RCDelayedReader> delayedReaders_;
0155 std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
0156 std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
0157 std::vector<size_t> streamToCacheIndex_;
0158 size_t nextEventIndex_ = 0;
0159 ItemType presentState_ = ItemType::IsFile;
0160 unsigned long long eventIndex_ = 0;
0161 };
0162 }
0163
0164 using namespace edm;
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174
0175
0176 RepeatingCachedRootSource::RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc)
0177 : InputSource(pset, desc),
0178 selectorRules_(pset, "inputCommands", "InputSource"),
0179 runHelper_(std::make_unique<DefaultRunHelper>()),
0180 cachedWrappers_(pset.getUntrackedParameter<unsigned int>("repeatNEvents")),
0181 eventAuxs_(cachedWrappers_.size()),
0182 provRetriever_(0),
0183 delayedReaders_(desc.allocations_->numberOfStreams()),
0184 streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
0185 {
0186 getters_.reserve(cachedWrappers_.size());
0187 for (auto& cw : cachedWrappers_) {
0188 getters_.emplace_back(&productIDToWrapperIndex_, &cw);
0189 }
0190
0191 int index = 0;
0192 std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
0193 iR.m_streamIndex = index++;
0194 iR.m_source = this;
0195 });
0196 }
0197 auto logicalFileName = pset.getUntrackedParameter<std::string>("fileName");
0198 InputFileCatalog catalog(std::vector<std::string>(1U, logicalFileName), "");
0199 auto const& physicalFileName = catalog.fileCatalogItems().front().fileNames().front();
0200 auto const nEventsToSkip = pset.getUntrackedParameter<unsigned int>("skipEvents");
0201 std::shared_ptr<EventSkipperByID> skipper(EventSkipperByID::create(pset).release());
0202
0203 auto duplicateChecker = std::make_shared<DuplicateChecker>(pset);
0204
0205 std::vector<std::shared_ptr<IndexIntoFile>> indexesIntoFiles(1);
0206
0207 auto input =
0208 std::make_shared<InputFile>(physicalFileName.c_str(), " Initiating request to open file ", InputType::Primary);
0209 rootFile_ = makeRootFile(
0210 logicalFileName, physicalFileName, 0 != nEventsToSkip, input, skipper, duplicateChecker, indexesIntoFiles);
0211 rootFile_->reportOpened("repeating");
0212
0213 auto const& prodList = rootFile_->productRegistry()->productList();
0214 productRegistryUpdate().updateFromInput(prodList);
0215
0216
0217 auto nProdsInEvent =
0218 std::count_if(prodList.begin(), prodList.end(), [](auto&& iV) { return iV.second.branchType() == edm::InEvent; });
0219 {
0220 size_t index = 0;
0221 for (auto& prod : prodList) {
0222 if (prod.second.branchType() == edm::InEvent) {
0223 branchIDToWrapperIndex_[prod.second.branchID()] = index++;
0224 }
0225 }
0226 }
0227 for (auto& cache : cachedWrappers_) {
0228 cache.resize(nProdsInEvent);
0229 }
0230 }
0231
0232 void RepeatingCachedRootSource::beginJob(ProductRegistry const&) {
0233 ProcessConfiguration processConfiguration;
0234 processConfiguration.setParameterSetID(ParameterSet::emptyParameterSetID());
0235 processConfiguration.setProcessConfigurationID();
0236
0237
0238
0239 productRegistryUpdate().setFrozen();
0240
0241 EventPrincipal eventPrincipal(std::shared_ptr<ProductRegistry const>(&productRegistry(), do_nothing_deleter()),
0242 edm::productResolversFactory::makePrimary,
0243 branchIDListHelper(),
0244 std::make_shared<ThinnedAssociationsHelper>(),
0245 processConfiguration,
0246 nullptr);
0247 {
0248 RunNumber_t run = 0;
0249 LuminosityBlockNumber_t lumi = 0;
0250 auto itAux = eventAuxs_.begin();
0251 auto itGetter = getters_.begin();
0252 for (auto& cache : cachedWrappers_) {
0253 rootFile_->nextEventEntry();
0254 rootFile_->readCurrentEvent(eventPrincipal);
0255 auto const& aux = eventPrincipal.aux();
0256 *(itAux++) = aux;
0257 if (0 == run) {
0258 run = aux.run();
0259 lumi = aux.luminosityBlock();
0260 } else {
0261 if (run != aux.run()) {
0262 throw cms::Exception("EventsWithDifferentRuns") << "The requested events to cache are from different Runs";
0263 }
0264 if (lumi != aux.luminosityBlock()) {
0265 throw cms::Exception("EventsWithDifferentLuminosityBlocks")
0266 << "The requested events to cache are from different LuminosityBlocks";
0267 }
0268 }
0269 selectionIDs_ = eventPrincipal.eventSelectionIDs();
0270 branchListIndexes_ = eventPrincipal.branchListIndexes();
0271 {
0272 auto reader = eventPrincipal.reader();
0273 auto& getter = *(itGetter++);
0274 for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0275 cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
0276 }
0277 }
0278 }
0279 for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0280 auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
0281 productIDToWrapperIndex_[pid] = branchToIndex.second;
0282 }
0283 rootFile_->rewind();
0284 }
0285 }
0286
0287 void RepeatingCachedRootSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0288 ParameterSetDescription desc;
0289 desc.setComment(
0290 "Read only a few Events from one EDM/Root file, and repeat them in sequence. The Events are required to be from "
0291 "the same Run and LuminosityBlock.");
0292 desc.addUntracked<std::string>("fileName")->setComment("Name of file to be processed.");
0293 desc.addUntracked<unsigned int>("repeatNEvents", 10U)
0294 ->setComment("Number of events to read from file and then repeat in sequence.");
0295 desc.addUntracked<unsigned int>("skipEvents", 0);
0296 ProductSelectorRules::fillDescription(desc, "inputCommands");
0297 InputSource::fillDescription(desc);
0298
0299 descriptions.add("source", desc);
0300 }
0301
0302
0303
0304
0305
0306 std::unique_ptr<RootFile> RepeatingCachedRootSource::makeRootFile(
0307 std::string const& logicalFileName,
0308 std::string const& pName,
0309 bool isSkipping,
0310 std::shared_ptr<InputFile> filePtr,
0311 std::shared_ptr<EventSkipperByID> skipper,
0312 std::shared_ptr<DuplicateChecker> duplicateChecker,
0313 std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles) {
0314 return std::make_unique<RootFile>(pName,
0315 processConfiguration(),
0316 logicalFileName,
0317 filePtr,
0318 skipper,
0319 isSkipping,
0320 remainingEvents(),
0321 remainingLuminosityBlocks(),
0322 1,
0323 roottree::defaultCacheSize,
0324 -1,
0325 processingMode(),
0326 runHelper_,
0327 false,
0328 true,
0329 selectorRules_,
0330 InputType::Primary,
0331 branchIDListHelper(),
0332 processBlockHelper().get(),
0333 thinnedAssociationsHelper(),
0334 nullptr,
0335 duplicateChecker,
0336 false,
0337 processHistoryRegistryForUpdate(),
0338 indexesIntoFiles,
0339 0,
0340 orderedProcessHistoryIDs_,
0341 false,
0342 true,
0343 false,
0344 true,
0345 false);
0346 }
0347
0348 std::shared_ptr<WrapperBase> RepeatingCachedRootSource::getProduct(unsigned int iStreamIndex,
0349 BranchID const& k,
0350 EDProductGetter const* ep) const {
0351 return cachedWrappers_[streamToCacheIndex_[iStreamIndex]][branchIDToWrapperIndex_.find(k)->second];
0352 }
0353
0354 RepeatingCachedRootSource::ItemTypeInfo RepeatingCachedRootSource::getNextItemType() {
0355 auto v = presentState_;
0356 switch (presentState_) {
0357 case ItemType::IsFile:
0358 presentState_ = ItemType::IsRun;
0359 break;
0360 case ItemType::IsRun:
0361 presentState_ = ItemType::IsLumi;
0362 break;
0363 case ItemType::IsLumi:
0364 presentState_ = ItemType::IsEvent;
0365 break;
0366 default:
0367 break;
0368 }
0369 return v;
0370 }
0371
0372 void RepeatingCachedRootSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0373 rootFile_->readLuminosityBlock_(lumiPrincipal);
0374 }
0375
0376 std::shared_ptr<LuminosityBlockAuxiliary> RepeatingCachedRootSource::readLuminosityBlockAuxiliary_() {
0377 return rootFile_->readLuminosityBlockAuxiliary_();
0378 }
0379 void RepeatingCachedRootSource::readEvent_(EventPrincipal& eventPrincipal) {
0380 auto index = eventIndex_++;
0381
0382 auto repeatedIndex = index % cachedWrappers_.size();
0383
0384 auto const& aux = eventAuxs_[repeatedIndex];
0385
0386 auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
0387
0388 streamToCacheIndex_[eventPrincipal.streamID().value()] = repeatedIndex;
0389 eventPrincipal.fillEventPrincipal(aux,
0390 history,
0391 selectionIDs_,
0392 branchListIndexes_,
0393 EventToProcessBlockIndexes(),
0394 provRetriever_,
0395 &delayedReaders_[eventPrincipal.streamID().value()]);
0396 }
0397
0398 std::shared_ptr<RunAuxiliary> RepeatingCachedRootSource::readRunAuxiliary_() {
0399 return rootFile_->readRunAuxiliary_();
0400 ;
0401 }
0402
0403 void RepeatingCachedRootSource::readRun_(RunPrincipal& runPrincipal) { rootFile_->readRun_(runPrincipal); }
0404
0405 bool RepeatingCachedRootSource::readIt(EventID const& id,
0406 EventPrincipal& eventPrincipal,
0407 StreamContext& streamContext) {
0408 return false;
0409 }
0410
0411 void RepeatingCachedRootSource::skip(int offset) {}
0412
0413 bool RepeatingCachedRootSource::goToEvent_(EventID const& eventID) { return false; }
0414
0415 void RepeatingCachedRootSource::fillProcessBlockHelper_() { rootFile_->fillProcessBlockHelper_(); }
0416
0417 bool RepeatingCachedRootSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0418 return rootFile_->nextProcessBlock_(processBlockPrincipal);
0419 }
0420
0421 void RepeatingCachedRootSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0422 rootFile_->readProcessBlock_(processBlockPrincipal);
0423 }
0424
0425 WrapperBase const* RepeatingCachedRootSource::RCProductGetter::getIt(ProductID const& iPID) const {
0426 auto itFound = map_->find(iPID);
0427 if (itFound == map_->end()) {
0428 return nullptr;
0429 }
0430 return (*wrappers_)[itFound->second].get();
0431 }
0432
0433 std::optional<std::tuple<WrapperBase const*, unsigned int>>
0434 RepeatingCachedRootSource::RCProductGetter::getThinnedProduct(ProductID const&, unsigned int key) const {
0435 return {};
0436 };
0437
0438 void RepeatingCachedRootSource::RCProductGetter::getThinnedProducts(ProductID const& pid,
0439 std::vector<WrapperBase const*>& foundContainers,
0440 std::vector<unsigned int>& keys) const {}
0441
0442 OptionalThinnedKey RepeatingCachedRootSource::RCProductGetter::getThinnedKeyFrom(ProductID const& parent,
0443 unsigned int key,
0444 ProductID const& thinned) const {
0445 return {};
0446 }
0447 unsigned int RepeatingCachedRootSource::RCProductGetter::transitionIndex_() const { return 0; }
0448
0449
0450
0451
0452
0453
0454
0455
0456
0457 DEFINE_FWK_INPUT_SOURCE(RepeatingCachedRootSource);