1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/InputSourceMacros.h"
#include "FWCore/Framework/interface/InputSourceDescription.h"
#include "FWCore/Framework/interface/DelayedReader.h"
#include "FWCore/Framework/interface/SharedResourcesRegistry.h"
#include "FWCore/Framework/interface/SharedResourcesAcquirer.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Framework/interface/InputSource.h"
#include "FWCore/Sources/interface/IDGeneratorSourceBase.h"
#include "DataFormats/Provenance/interface/ProductRegistry.h"
#include "DataFormats/Provenance/interface/BranchIDListHelper.h"
#include "DataFormats/TestObjects/interface/ToyProducts.h"
namespace edm {
namespace {
class ThrowingDelayedReader : public DelayedReader {
public:
ThrowingDelayedReader(
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadSource,
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadSource)
: preEventReadFromSourceSignal_(preEventReadSource),
postEventReadFromSourceSignal_(postEventReadSource),
e_(std::make_exception_ptr(cms::Exception("TEST"))) {}
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal()
const final {
return preEventReadFromSourceSignal_;
}
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal()
const final {
return postEventReadFromSourceSignal_;
}
private:
std::shared_ptr<WrapperBase> getProduct_(BranchID const& k, EDProductGetter const* ep) final {
try {
std::rethrow_exception(e_);
} catch (cms::Exception const& iE) {
//avoid adding to the context for each call
auto copyException = iE;
copyException.addContext("called ThrowingDelayedReader");
throw copyException;
}
}
void mergeReaders_(DelayedReader*) final {}
void reset_() final {}
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* preEventReadFromSourceSignal_ =
nullptr;
signalslot::Signal<void(StreamContext const&, ModuleCallingContext const&)> const* postEventReadFromSourceSignal_ =
nullptr;
std::exception_ptr e_;
};
} // namespace
class DelayedReaderThrowingSource : public IDGeneratorSourceBase<InputSource> {
public:
explicit DelayedReaderThrowingSource(ParameterSet const&, InputSourceDescription const&);
static void fillDescriptions(ConfigurationDescriptions& descriptions);
private:
bool setRunAndEventInfo(EventID& id, TimeValue_t& time, edm::EventAuxiliary::ExperimentType&) override;
void readEvent_(edm::EventPrincipal&) override;
std::pair<SharedResourcesAcquirer*, std::recursive_mutex*> resourceSharedWithDelayedReader_() override {
return std::make_pair(resourceSharedWithDelayedReaderPtr_.get(), mutexSharedWithDelayedReader_.get());
}
ThrowingDelayedReader delayedReader_;
ProcessHistoryID historyID_;
std::unique_ptr<SharedResourcesAcquirer>
resourceSharedWithDelayedReaderPtr_; // We do not use propagate_const because the acquirer is itself mutable.
std::shared_ptr<std::recursive_mutex> mutexSharedWithDelayedReader_;
};
DelayedReaderThrowingSource::DelayedReaderThrowingSource(ParameterSet const& pset, InputSourceDescription const& desc)
: IDGeneratorSourceBase<InputSource>(pset, desc, false),
delayedReader_(&preEventReadFromSourceSignal_, &postEventReadFromSourceSignal_) {
auto resources = SharedResourcesRegistry::instance()->createAcquirerForSourceDelayedReader();
resourceSharedWithDelayedReaderPtr_ = std::make_unique<SharedResourcesAcquirer>(std::move(resources.first));
mutexSharedWithDelayedReader_ = resources.second;
ParameterSet dummy;
dummy.registerIt();
auto twd = TypeWithDict::byTypeInfo(typeid(edmtest::IntProduct));
std::vector<ProductDescription> branches;
for (auto const& label : pset.getUntrackedParameter<std::vector<std::string>>("labels")) {
branches.push_back(ProductDescription(InEvent,
label, //module label
"INPUTTEST", //can't be the present process name
twd.userClassName(),
twd.friendlyClassName(),
"", //product instance name
twd,
false //not produced
));
branches.back().setOnDemand(true); //says we use delayed reader
}
productRegistryUpdate().updateFromInput(branches);
ProcessHistory ph;
ph.emplace_back("INPUTTEST", dummy.id(), PROJECT_VERSION, HardwareResourcesDescription());
processHistoryRegistry().registerProcessHistory(ph);
historyID_ = ph.id();
BranchIDLists bilists(1);
for (auto const& branch : branches) {
bilists[0].emplace_back(branch.branchID().id());
}
branchIDListHelper()->updateFromInput(bilists);
}
bool DelayedReaderThrowingSource::setRunAndEventInfo(EventID&, TimeValue_t&, edm::EventAuxiliary::ExperimentType&) {
return true;
}
void DelayedReaderThrowingSource::readEvent_(edm::EventPrincipal& e) {
BranchListIndexes indexes(1, static_cast<unsigned short>(0));
branchIDListHelper()->fixBranchListIndexes(indexes);
doReadEventWithDelayedReader(e, historyID_, EventSelectionIDVector(), std::move(indexes), &delayedReader_);
}
void DelayedReaderThrowingSource::fillDescriptions(ConfigurationDescriptions& descriptions) {
ParameterSetDescription desc;
desc.setComment("Throws an exception when the DelayedReader is used.");
IDGeneratorSourceBase<InputSource>::fillDescription(desc);
desc.addUntracked<std::vector<std::string>>("labels", {{"test"}});
descriptions.add("source", desc);
}
} // namespace edm
using edm::DelayedReaderThrowingSource;
DEFINE_FWK_INPUT_SOURCE(DelayedReaderThrowingSource);
|