File indexing completed on 2024-04-06 12:19:06
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #include <memory>
0015
0016
0017 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0018 #include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
0019 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0020 #include "DataFormats/Provenance/interface/EventToProcessBlockIndexes.h"
0021 #include "DataFormats/Provenance/interface/ProcessConfiguration.h"
0022 #include "DataFormats/Provenance/interface/ThinnedAssociationsHelper.h"
0023 #include "DataFormats/Common/interface/WrapperBase.h"
0024 #include "DataFormats/Common/interface/EDProductGetter.h"
0025
0026 #include "IOPool/Common/interface/RootServiceChecker.h"
0027
0028 #include "FWCore/Framework/interface/InputSource.h"
0029 #include "FWCore/Framework/interface/ProductSelectorRules.h"
0030 #include "FWCore/Framework/interface/DelayedReader.h"
0031 #include "FWCore/Framework/interface/InputSourceDescription.h"
0032 #include "FWCore/Framework/interface/EventPrincipal.h"
0033 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0034 #include "FWCore/Catalog/interface/InputFileCatalog.h"
0035 #include "FWCore/Utilities/interface/propagate_const.h"
0036 #include "FWCore/Utilities/interface/Exception.h"
0037 #include "FWCore/Sources/interface/EventSkipperByID.h"
0038
0039 #include "FWCore/Framework/interface/InputSourceMacros.h"
0040
0041 #include "RunHelper.h"
0042 #include "RootFile.h"
0043 #include "InputFile.h"
0044 #include "DuplicateChecker.h"
0045
0046 namespace edm {
0047 class RunHelperBase;
0048
0049 class RepeatingCachedRootSource : public InputSource {
0050 public:
0051 RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc);
0052
0053 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0054
0055 std::shared_ptr<WrapperBase> getProduct(unsigned int iStreamIndex,
0056 BranchID const& k,
0057 EDProductGetter const* ep) const;
0058
0059 class RCProductGetter : public EDProductGetter {
0060 public:
0061 RCProductGetter(RCProductGetter const& iOther) : map_(iOther.map_), wrappers_(iOther.wrappers_) {}
0062
0063 RCProductGetter const& operator=(RCProductGetter const& iOther) {
0064 map_ = iOther.map_;
0065 wrappers_ = iOther.wrappers_;
0066 return *this;
0067 }
0068
0069 RCProductGetter(std::map<edm::ProductID, size_t> const* iMap,
0070 std::vector<std::shared_ptr<edm::WrapperBase>> const* iWrappers)
0071 : map_(iMap), wrappers_(iWrappers) {}
0072
0073 WrapperBase const* getIt(ProductID const&) const override;
0074
0075 std::optional<std::tuple<WrapperBase const*, unsigned int>> getThinnedProduct(ProductID const&,
0076 unsigned int key) const override;
0077
0078 void getThinnedProducts(ProductID const& pid,
0079 std::vector<WrapperBase const*>& foundContainers,
0080 std::vector<unsigned int>& keys) const override;
0081
0082 OptionalThinnedKey getThinnedKeyFrom(ProductID const& parent,
0083 unsigned int key,
0084 ProductID const& thinned) const override;
0085
0086 private:
0087 unsigned int transitionIndex_() const override;
0088
0089 std::map<edm::ProductID, size_t> const* map_;
0090 std::vector<std::shared_ptr<edm::WrapperBase>> const* wrappers_;
0091 };
0092
0093 class RCDelayedReader : public edm::DelayedReader {
0094 public:
0095 std::shared_ptr<edm::WrapperBase> getProduct_(edm::BranchID const& k, edm::EDProductGetter const* ep) final {
0096 return m_source->getProduct(m_streamIndex, k, ep);
0097 }
0098 void mergeReaders_(edm::DelayedReader*) final { assert(false); }
0099 void reset_() final {}
0100
0101 unsigned int m_streamIndex;
0102 edm::RepeatingCachedRootSource const* m_source;
0103
0104 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0105 const final {
0106 return nullptr;
0107 }
0108 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0109 const final {
0110 return nullptr;
0111 }
0112 };
0113
0114 protected:
0115 ItemTypeInfo getNextItemType() override;
0116 void readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) override;
0117 std::shared_ptr<LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
0118 void readEvent_(EventPrincipal& eventPrincipal) override;
0119
0120 private:
0121 std::shared_ptr<RunAuxiliary> readRunAuxiliary_() override;
0122 void readRun_(RunPrincipal& runPrincipal) override;
0123 bool readIt(EventID const& id, EventPrincipal& eventPrincipal, StreamContext& streamContext) override;
0124 void skip(int offset) override;
0125 bool goToEvent_(EventID const& eventID) override;
0126 void beginJob() override;
0127
0128 void fillProcessBlockHelper_() override;
0129 bool nextProcessBlock_(ProcessBlockPrincipal&) override;
0130 void readProcessBlock_(ProcessBlockPrincipal&) override;
0131
0132 std::unique_ptr<RootFile> makeRootFile(std::string const& logicalFileName,
0133 std::string const& pName,
0134 bool isSkipping,
0135 std::shared_ptr<InputFile> filePtr,
0136 std::shared_ptr<EventSkipperByID> skipper,
0137 std::shared_ptr<DuplicateChecker> duplicateChecker,
0138 std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles);
0139
0140 RootServiceChecker rootServiceChecker_;
0141 ProductSelectorRules selectorRules_;
0142 edm::propagate_const<std::unique_ptr<RunHelperBase>> runHelper_;
0143 std::unique_ptr<RootFile> rootFile_;
0144 std::vector<ProcessHistoryID> orderedProcessHistoryIDs_;
0145 std::vector<std::vector<std::shared_ptr<edm::WrapperBase>>> cachedWrappers_;
0146 std::vector<RCProductGetter> getters_;
0147 std::vector<EventAuxiliary> eventAuxs_;
0148 EventSelectionIDVector selectionIDs_;
0149 BranchListIndexes branchListIndexes_;
0150 ProductProvenanceRetriever provRetriever_;
0151 std::vector<RCDelayedReader> delayedReaders_;
0152 std::map<edm::BranchID, size_t> branchIDToWrapperIndex_;
0153 std::map<edm::ProductID, size_t> productIDToWrapperIndex_;
0154 std::vector<size_t> streamToCacheIndex_;
0155 size_t nextEventIndex_ = 0;
0156 ItemType presentState_ = ItemType::IsFile;
0157 unsigned long long eventIndex_ = 0;
0158 };
0159 }
0160
0161 using namespace edm;
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173 RepeatingCachedRootSource::RepeatingCachedRootSource(ParameterSet const& pset, InputSourceDescription const& desc)
0174 : InputSource(pset, desc),
0175 selectorRules_(pset, "inputCommands", "InputSource"),
0176 runHelper_(std::make_unique<DefaultRunHelper>()),
0177 cachedWrappers_(pset.getUntrackedParameter<unsigned int>("repeatNEvents")),
0178 eventAuxs_(cachedWrappers_.size()),
0179 provRetriever_(0),
0180 delayedReaders_(desc.allocations_->numberOfStreams()),
0181 streamToCacheIndex_(desc.allocations_->numberOfStreams(), 0) {
0182 {
0183 getters_.reserve(cachedWrappers_.size());
0184 for (auto& cw : cachedWrappers_) {
0185 getters_.emplace_back(&productIDToWrapperIndex_, &cw);
0186 }
0187
0188 int index = 0;
0189 std::for_each(delayedReaders_.begin(), delayedReaders_.end(), [&index, this](auto& iR) {
0190 iR.m_streamIndex = index++;
0191 iR.m_source = this;
0192 });
0193 }
0194 auto logicalFileName = pset.getUntrackedParameter<std::string>("fileName");
0195 InputFileCatalog catalog(std::vector<std::string>(1U, logicalFileName), "");
0196 auto const& physicalFileName = catalog.fileCatalogItems().front().fileNames().front();
0197 auto const nEventsToSkip = pset.getUntrackedParameter<unsigned int>("skipEvents");
0198 std::shared_ptr<EventSkipperByID> skipper(EventSkipperByID::create(pset).release());
0199
0200 auto duplicateChecker = std::make_shared<DuplicateChecker>(pset);
0201
0202 std::vector<std::shared_ptr<IndexIntoFile>> indexesIntoFiles(1);
0203
0204 auto input =
0205 std::make_shared<InputFile>(physicalFileName.c_str(), " Initiating request to open file ", InputType::Primary);
0206 rootFile_ = makeRootFile(
0207 logicalFileName, physicalFileName, 0 != nEventsToSkip, input, skipper, duplicateChecker, indexesIntoFiles);
0208 rootFile_->reportOpened("repeating");
0209
0210 auto const& prodList = rootFile_->productRegistry()->productList();
0211 productRegistryUpdate().updateFromInput(prodList);
0212
0213
0214 auto nProdsInEvent =
0215 std::count_if(prodList.begin(), prodList.end(), [](auto&& iV) { return iV.second.branchType() == edm::InEvent; });
0216 {
0217 size_t index = 0;
0218 for (auto& prod : prodList) {
0219 if (prod.second.branchType() == edm::InEvent) {
0220 branchIDToWrapperIndex_[prod.second.branchID()] = index++;
0221 }
0222 }
0223 }
0224 for (auto& cache : cachedWrappers_) {
0225 cache.resize(nProdsInEvent);
0226 }
0227 }
0228
0229 void RepeatingCachedRootSource::beginJob() {
0230 ProcessConfiguration processConfiguration;
0231 processConfiguration.setParameterSetID(ParameterSet::emptyParameterSetID());
0232 processConfiguration.setProcessConfigurationID();
0233
0234
0235 EventPrincipal eventPrincipal(productRegistry(),
0236 branchIDListHelper(),
0237 std::make_shared<ThinnedAssociationsHelper>(),
0238 processConfiguration,
0239 nullptr);
0240 {
0241 RunNumber_t run = 0;
0242 LuminosityBlockNumber_t lumi = 0;
0243 auto itAux = eventAuxs_.begin();
0244 auto itGetter = getters_.begin();
0245 for (auto& cache : cachedWrappers_) {
0246 rootFile_->nextEventEntry();
0247 rootFile_->readCurrentEvent(eventPrincipal);
0248 auto const& aux = eventPrincipal.aux();
0249 *(itAux++) = aux;
0250 if (0 == run) {
0251 run = aux.run();
0252 lumi = aux.luminosityBlock();
0253 } else {
0254 if (run != aux.run()) {
0255 throw cms::Exception("EventsWithDifferentRuns") << "The requested events to cache are from different Runs";
0256 }
0257 if (lumi != aux.luminosityBlock()) {
0258 throw cms::Exception("EventsWithDifferentLuminosityBlocks")
0259 << "The requested events to cache are from different LuminosityBlocks";
0260 }
0261 }
0262 selectionIDs_ = eventPrincipal.eventSelectionIDs();
0263 branchListIndexes_ = eventPrincipal.branchListIndexes();
0264 {
0265 auto reader = eventPrincipal.reader();
0266 auto& getter = *(itGetter++);
0267 for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0268 cache[branchToIndex.second] = reader->getProduct(branchToIndex.first, &getter);
0269 }
0270 }
0271 }
0272 for (auto const& branchToIndex : branchIDToWrapperIndex_) {
0273 auto pid = eventPrincipal.branchIDToProductID(branchToIndex.first);
0274 productIDToWrapperIndex_[pid] = branchToIndex.second;
0275 }
0276 rootFile_->rewind();
0277 }
0278 }
0279
0280 void RepeatingCachedRootSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0281 ParameterSetDescription desc;
0282 desc.setComment(
0283 "Read only a few Events from one EDM/Root file, and repeat them in sequence. The Events are required to be from "
0284 "the same Run and LuminosityBlock.");
0285 desc.addUntracked<std::string>("fileName")->setComment("Name of file to be processed.");
0286 desc.addUntracked<unsigned int>("repeatNEvents", 10U)
0287 ->setComment("Number of events to read from file and then repeat in sequence.");
0288 desc.addUntracked<unsigned int>("skipEvents", 0);
0289 ProductSelectorRules::fillDescription(desc, "inputCommands");
0290 InputSource::fillDescription(desc);
0291
0292 descriptions.add("source", desc);
0293 }
0294
0295
0296
0297
0298
0299 std::unique_ptr<RootFile> RepeatingCachedRootSource::makeRootFile(
0300 std::string const& logicalFileName,
0301 std::string const& pName,
0302 bool isSkipping,
0303 std::shared_ptr<InputFile> filePtr,
0304 std::shared_ptr<EventSkipperByID> skipper,
0305 std::shared_ptr<DuplicateChecker> duplicateChecker,
0306 std::vector<std::shared_ptr<IndexIntoFile>>& indexesIntoFiles) {
0307 return std::make_unique<RootFile>(pName,
0308 processConfiguration(),
0309 logicalFileName,
0310 filePtr,
0311 skipper,
0312 isSkipping,
0313 remainingEvents(),
0314 remainingLuminosityBlocks(),
0315 1,
0316 roottree::defaultCacheSize,
0317 -1,
0318 processingMode(),
0319 runHelper_,
0320 false,
0321 true,
0322 selectorRules_,
0323 InputType::Primary,
0324 branchIDListHelper(),
0325 processBlockHelper().get(),
0326 thinnedAssociationsHelper(),
0327 nullptr,
0328 duplicateChecker,
0329 false,
0330 processHistoryRegistryForUpdate(),
0331 indexesIntoFiles,
0332 0,
0333 orderedProcessHistoryIDs_,
0334 false,
0335 true,
0336 false,
0337 true,
0338 false);
0339 }
0340
0341 std::shared_ptr<WrapperBase> RepeatingCachedRootSource::getProduct(unsigned int iStreamIndex,
0342 BranchID const& k,
0343 EDProductGetter const* ep) const {
0344 return cachedWrappers_[streamToCacheIndex_[iStreamIndex]][branchIDToWrapperIndex_.find(k)->second];
0345 }
0346
0347 RepeatingCachedRootSource::ItemTypeInfo RepeatingCachedRootSource::getNextItemType() {
0348 auto v = presentState_;
0349 switch (presentState_) {
0350 case ItemType::IsFile:
0351 presentState_ = ItemType::IsRun;
0352 break;
0353 case ItemType::IsRun:
0354 presentState_ = ItemType::IsLumi;
0355 break;
0356 case ItemType::IsLumi:
0357 presentState_ = ItemType::IsEvent;
0358 break;
0359 default:
0360 break;
0361 }
0362 return v;
0363 }
0364
0365 void RepeatingCachedRootSource::readLuminosityBlock_(LuminosityBlockPrincipal& lumiPrincipal) {
0366 rootFile_->readLuminosityBlock_(lumiPrincipal);
0367 }
0368
0369 std::shared_ptr<LuminosityBlockAuxiliary> RepeatingCachedRootSource::readLuminosityBlockAuxiliary_() {
0370 return rootFile_->readLuminosityBlockAuxiliary_();
0371 }
0372 void RepeatingCachedRootSource::readEvent_(EventPrincipal& eventPrincipal) {
0373 auto index = eventIndex_++;
0374
0375 auto repeatedIndex = index % cachedWrappers_.size();
0376
0377 auto const& aux = eventAuxs_[repeatedIndex];
0378
0379 auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
0380
0381 streamToCacheIndex_[eventPrincipal.streamID().value()] = repeatedIndex;
0382 eventPrincipal.fillEventPrincipal(aux,
0383 history,
0384 selectionIDs_,
0385 branchListIndexes_,
0386 EventToProcessBlockIndexes(),
0387 provRetriever_,
0388 &delayedReaders_[eventPrincipal.streamID().value()]);
0389 }
0390
0391 std::shared_ptr<RunAuxiliary> RepeatingCachedRootSource::readRunAuxiliary_() {
0392 return rootFile_->readRunAuxiliary_();
0393 ;
0394 }
0395
0396 void RepeatingCachedRootSource::readRun_(RunPrincipal& runPrincipal) { rootFile_->readRun_(runPrincipal); }
0397
0398 bool RepeatingCachedRootSource::readIt(EventID const& id,
0399 EventPrincipal& eventPrincipal,
0400 StreamContext& streamContext) {
0401 return false;
0402 }
0403
0404 void RepeatingCachedRootSource::skip(int offset) {}
0405
0406 bool RepeatingCachedRootSource::goToEvent_(EventID const& eventID) { return false; }
0407
0408 void RepeatingCachedRootSource::fillProcessBlockHelper_() { rootFile_->fillProcessBlockHelper_(); }
0409
0410 bool RepeatingCachedRootSource::nextProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0411 return rootFile_->nextProcessBlock_(processBlockPrincipal);
0412 }
0413
0414 void RepeatingCachedRootSource::readProcessBlock_(ProcessBlockPrincipal& processBlockPrincipal) {
0415 rootFile_->readProcessBlock_(processBlockPrincipal);
0416 }
0417
0418 WrapperBase const* RepeatingCachedRootSource::RCProductGetter::getIt(ProductID const& iPID) const {
0419 auto itFound = map_->find(iPID);
0420 if (itFound == map_->end()) {
0421 return nullptr;
0422 }
0423 return (*wrappers_)[itFound->second].get();
0424 }
0425
0426 std::optional<std::tuple<WrapperBase const*, unsigned int>>
0427 RepeatingCachedRootSource::RCProductGetter::getThinnedProduct(ProductID const&, unsigned int key) const {
0428 return {};
0429 };
0430
0431 void RepeatingCachedRootSource::RCProductGetter::getThinnedProducts(ProductID const& pid,
0432 std::vector<WrapperBase const*>& foundContainers,
0433 std::vector<unsigned int>& keys) const {}
0434
0435 OptionalThinnedKey RepeatingCachedRootSource::RCProductGetter::getThinnedKeyFrom(ProductID const& parent,
0436 unsigned int key,
0437 ProductID const& thinned) const {
0438 return {};
0439 }
0440 unsigned int RepeatingCachedRootSource::RCProductGetter::transitionIndex_() const { return 0; }
0441
0442
0443
0444
0445
0446
0447
0448
0449
0450 DEFINE_FWK_INPUT_SOURCE(RepeatingCachedRootSource);