File indexing completed on 2023-03-17 11:02:52
0001 #include "FWCore/Framework/interface/Frameworkfwd.h"
0002 #include "FWCore/Framework/interface/InputSourceMacros.h"
0003 #include "FWCore/Framework/interface/InputSourceDescription.h"
0004 #include "FWCore/Framework/interface/DelayedReader.h"
0005 #include "FWCore/Framework/interface/SharedResourcesRegistry.h"
0006 #include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
0007 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0008 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0009 #include "FWCore/Framework/interface/InputSource.h"
0010 #include "FWCore/Sources/interface/IDGeneratorSourceBase.h"
0011 #include "DataFormats/Provenance/interface/ProductRegistry.h"
0012 #include "DataFormats/Provenance/interface/BranchIDListHelper.h"
0013
0014 #include "DataFormats/TestObjects/interface/ToyProducts.h"
0015
0016 #include "FWCore/Utilities/interface/GetPassID.h"
0017
0018 namespace edm {
0019 namespace {
0020 class ThrowingDelayedReader : public DelayedReader {
0021 public:
0022 ThrowingDelayedReader(
0023 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadSource,
0024 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadSource)
0025 : preEventReadFromSourceSignal_(preEventReadSource),
0026 postEventReadFromSourceSignal_(postEventReadSource),
0027 e_(std::make_exception_ptr(cms::Exception("TEST"))) {}
0028
0029 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
0030 const final {
0031 return preEventReadFromSourceSignal_;
0032 }
0033 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
0034 const final {
0035 return postEventReadFromSourceSignal_;
0036 }
0037
0038 private:
0039 std::shared_ptr<WrapperBase> getProduct_(BranchID const& k, EDProductGetter const* ep) final {
0040 try {
0041 std::rethrow_exception(e_);
0042 } catch (cms::Exception const& iE) {
0043
0044 auto copyException = iE;
0045 copyException.addContext("called ThrowingDelayedReader");
0046 throw copyException;
0047 }
0048 }
0049 void mergeReaders_(DelayedReader*) final{};
0050 void reset_() final{};
0051
0052 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal_ =
0053 nullptr;
0054 signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal_ =
0055 nullptr;
0056 std::exception_ptr e_;
0057 };
0058 }
0059
0060 class DelayedReaderThrowingSource : public IDGeneratorSourceBase<InputSource> {
0061 public:
0062 explicit DelayedReaderThrowingSource(ParameterSet const&, InputSourceDescription const&);
0063 static void fillDescriptions(ConfigurationDescriptions& descriptions);
0064
0065 private:
0066 bool setRunAndEventInfo(EventID& id, TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override;
0067 void readEvent_(edm::EventPrincipal&) override;
0068
0069 std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_() override {
0070 return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
0071 }
0072
0073 ThrowingDelayedReader delayedReader_;
0074 ProcessHistoryID historyID_;
0075
0076 std::unique_ptr<SharedResourcesAcquirer>
0077 resourceSharedWithDelayedReaderPtr_;
0078 std::shared_ptr<std::recursive_mutex> mutexSharedWithDelayedReader_;
0079 };
0080
0081 DelayedReaderThrowingSource::DelayedReaderThrowingSource(ParameterSet const& pset, InputSourceDescription const& desc)
0082 : IDGeneratorSourceBase<InputSource>(pset, desc, false),
0083 delayedReader_(&preEventReadFromSourceSignal_, &postEventReadFromSourceSignal_) {
0084 auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
0085 resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
0086 mutexSharedWithDelayedReader_ = resources.second;
0087
0088 ParameterSet dummy;
0089 dummy.registerIt();
0090 auto twd = TypeWithDict::byTypeInfo(typeid(edmtest::IntProduct));
0091
0092 std::vector<BranchDescription> branches;
0093 for (auto const& label : pset.getUntrackedParameter<std::vector<std::string>>("labels")) {
0094 branches.push_back(BranchDescription(InEvent,
0095 label,
0096 "INPUTTEST",
0097 twd.userClassName(),
0098 twd.friendlyClassName(),
0099 "",
0100 "",
0101 dummy.id(),
0102 twd,
0103 false
0104 ));
0105 branches.back().setOnDemand(true);
0106 }
0107 productRegistryUpdate().updateFromInput(branches);
0108
0109 ProcessHistory ph;
0110 ph.emplace_back("INPUTTEST", dummy.id(), PROJECT_VERSION, getPassID());
0111 processHistoryRegistry().registerProcessHistory(ph);
0112 historyID_ = ph.id();
0113
0114 BranchIDLists bilists(1);
0115 for (auto const& branch : branches) {
0116 bilists[0].emplace_back(branch.branchID().id());
0117 }
0118 branchIDListHelper()->updateFromInput(bilists);
0119 }
0120
0121 bool DelayedReaderThrowingSource::setRunAndEventInfo(EventID&, TimeValue_t&, edm::EventAuxiliary::ExperimentType&) {
0122 return true;
0123 }
0124
0125 void DelayedReaderThrowingSource::readEvent_(edm::EventPrincipal& e) {
0126 BranchListIndexes indexes(1, static_cast<unsigned short>(0));
0127 branchIDListHelper()->fixBranchListIndexes(indexes);
0128 doReadEventWithDelayedReader(e, historyID_, EventSelectionIDVector(), std::move(indexes), &delayedReader_);
0129 }
0130
0131 void DelayedReaderThrowingSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
0132 ParameterSetDescription desc;
0133 desc.setComment("Throws an exception when the DelayedReader is used.");
0134 IDGeneratorSourceBase<InputSource>::fillDescription(desc);
0135 desc.addUntracked<std::vector<std::string>>("labels", {{"test"}});
0136 descriptions.add("source", desc);
0137 }
0138 }
0139
0140 using edm::DelayedReaderThrowingSource;
0141 DEFINE_FWK_INPUT_SOURCE(DelayedReaderThrowingSource);