File indexing completed on 2025-02-27 07:20:00
0001 #include "FWCore/Framework/interface/Frameworkfwd.h"
0002 #include "FWCore/Framework/interface/InputSourceMacros.h"
0003 #include "FWCore/Framework/interface/InputSourceDescription.h"
0004 #include "FWCore/Framework/interface/DelayedReader.h"
0005 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0006 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0007 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Framework/interface/InputSource.h"
0010 #include "FWCore/Sources/interface/IDGeneratorSourceBase.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0013
0014 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0015
0016 namespace edm {
0017 namespace {
0018 class ThrowingDelayedReader : public DelayedReader {
0019 public:
0020 ThrowingDelayedReader(
0021 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadSource,
0022 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadSource)
0023 : preEventReadFromSourceSignal_(preEventReadSource),
0024 postEventReadFromSourceSignal_(postEventReadSource),
0025 e_(std::make_exception_ptr(cms::Exception("TEST"))) {}
0026
0027 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0028 const final {
0029 return preEventReadFromSourceSignal_;
0030 }
0031 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0032 const final {
0033 return postEventReadFromSourceSignal_;
0034 }
0035
0036 private:
0037 std::shared_ptr<WrapperBase> getProduct_(BranchID const& k, EDProductGetter const* ep) final {
0038 try {
0039 std::rethrow_exception(e_);
0040 } catch (cms::Exception const& iE) {
0041
0042 auto copyException = iE;
0043 copyException.addContext("called ThrowingDelayedReader");
0044 throw copyException;
0045 }
0046 }
0047 void mergeReaders_(DelayedReader*) final {}
0048 void reset_() final {}
0049
0050 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal_ =
0051 nullptr;
0052 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal_ =
0053 nullptr;
0054 std::exception_ptr e_;
0055 };
0056 }
0057
0058 class DelayedReaderThrowingSource : public IDGeneratorSourceBase<InputSource> {
0059 public:
0060 explicit DelayedReaderThrowingSource(ParameterSet const&, InputSourceDescription const&);
0061 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0062
0063 private:
0064 bool setRunAndEventInfo(EventID& id, TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override;
0065 void readEvent_(edm::EventPrincipal&) override;
0066
0067 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_() override {
0068 return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0069 }
0070
0071 ThrowingDelayedReader delayedReader_;
0072 ProcessHistoryID historyID_;
0073
0074 std::unique_ptr<SharedResourcesAcquirer>
0075 resourceSharedWithDelayedReaderPtr_;
0076 std::shared_ptr<std::recursive_mutex> mutexSharedWithDelayedReader_;
0077 };
0078
0079 DelayedReaderThrowingSource::DelayedReaderThrowingSource(ParameterSet const& pset, InputSourceDescription const& desc)
0080 : IDGeneratorSourceBase<InputSource>(pset, desc, false),
0081 delayedReader_(&preEventReadFromSourceSignal_, &postEventReadFromSourceSignal_) {
0082 auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0083 resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0084 mutexSharedWithDelayedReader_ = resources.second;
0085
0086 ParameterSet dummy;
0087 dummy.registerIt();
0088 auto twd = TypeWithDict::byTypeInfo(typeid(edmtest::IntProduct));
0089
0090 std::vector<ProductDescription> branches;
0091 for (auto const& label : pset.getUntrackedParameter<std::vector<std::string>>("labels")) {
0092 branches.push_back(ProductDescription(InEvent,
0093 label,
0094 "INPUTTEST",
0095 twd.userClassName(),
0096 twd.friendlyClassName(),
0097 "",
0098 twd,
0099 false
0100 ));
0101 branches.back().setOnDemand(true);
0102 }
0103 productRegistryUpdate().updateFromInput(branches);
0104
0105 ProcessHistory ph;
0106 ph.emplace_back("INPUTTEST", dummy.id(), PROJECT_VERSION, HardwareResourcesDescription());
0107 processHistoryRegistry().registerProcessHistory(ph);
0108 historyID_ = ph.id();
0109
0110 BranchIDLists bilists(1);
0111 for (auto const& branch : branches) {
0112 bilists[0].emplace_back(branch.branchID().id());
0113 }
0114 branchIDListHelper()->updateFromInput(bilists);
0115 }
0116
0117 bool DelayedReaderThrowingSource::setRunAndEventInfo(EventID&, TimeValue_t&, edm::EventAuxiliary::ExperimentType&) {
0118 return true;
0119 }
0120
0121 void DelayedReaderThrowingSource::readEvent_(edm::EventPrincipal& e) {
0122 BranchListIndexes indexes(1, static_cast<unsigned short>(0));
0123 branchIDListHelper()->fixBranchListIndexes(indexes);
0124 doReadEventWithDelayedReader(e, historyID_, EventSelectionIDVector(), std::move(indexes), &delayedReader_);
0125 }
0126
0127 void DelayedReaderThrowingSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0128 ParameterSetDescription desc;
0129 desc.setComment("Throws an exception when the DelayedReader is used.");
0130 IDGeneratorSourceBase<InputSource>::fillDescription(desc);
0131 desc.addUntracked<std::vector<std::string>>("labels", {{"test"}});
0132 descriptions.add("source", desc);
0133 }
0134 }
0135
0136 using edm::DelayedReaderThrowingSource;
0137 DEFINE_FWK_INPUT_SOURCE(DelayedReaderThrowingSource);