1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
|
// -*- C++ -*-
//
// Package: FWCore/Integration
// Class : SourceWithWaits
//
// Original Author: W. David Dagenhart
// Created: 12 October 2023
// This source allows configuring both a time per lumi section
// and events per lumi. Calls to std::this_thread::sleep_for are inserted in the
// getNextItemType function in the amount
//
// (time per lumi) / (events per lumi + 1)
//
// The sleeps occur before getNextItemType returns when
// an event is next and also when a lumi is next (excluding
// the first lumi). The total time sleeping that elapses per
// lumi is approximately equal to the configured amount.
// The algorithm accomplishing this is not perfect and
// if the events take enough time to process, then the lumis
// will last longer than configured amount (just because
// that was a lot easier to implement and good enough for
// the test this is used for).
//
// The time per lumi is the same for all lumis. events per lumi
// can be different each lumi. You can also configure a single
// value for lumis per run if you want multiple runs.
//
// The job will stop when the end of the vector specifying
// events per lumi is reached (it might end earlier if maxEvents
// is also configured).
//
// In some ways this source is like EmptySource. It does not produce
// or read anything. The initial intent is to use for tests of
// some issues we are facing with concurrent lumis in the online
// source. It emulates the relevant behavior of that source without
// all the associated complexity.
#include "DataFormats/Provenance/interface/EventAuxiliary.h"
#include "DataFormats/Provenance/interface/EventID.h"
#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"
#include "DataFormats/Provenance/interface/ProcessHistoryRegistry.h"
#include "DataFormats/Provenance/interface/RunAuxiliary.h"
#include "DataFormats/Provenance/interface/RunLumiEventNumber.h"
#include "DataFormats/Provenance/interface/Timestamp.h"
#include "FWCore/Framework/interface/EventPrincipal.h"
#include "FWCore/Framework/interface/Frameworkfwd.h"
#include "FWCore/Framework/interface/InputSource.h"
#include "FWCore/Framework/interface/InputSourceMacros.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include <cassert>
#include <memory>
#include <unistd.h>
#include <vector>
namespace edmtest {
class SourceWithWaits : public edm::InputSource {
public:
explicit SourceWithWaits(edm::ParameterSet const&, edm::InputSourceDescription const&);
~SourceWithWaits() override;
static void fillDescriptions(edm::ConfigurationDescriptions&);
private:
edm::InputSource::ItemTypeInfo getNextItemType() override;
std::shared_ptr<edm::RunAuxiliary> readRunAuxiliary_() override;
std::shared_ptr<edm::LuminosityBlockAuxiliary> readLuminosityBlockAuxiliary_() override;
void readEvent_(edm::EventPrincipal&) override;
double timePerLumi_; // seconds
double sleepAfterStartOfRun_; // seconds
std::vector<unsigned int> eventsPerLumi_;
unsigned int lumisPerRun_;
unsigned int multipleEntriesForRun_;
unsigned int multipleEntriesForLumi_;
bool declareLast_;
bool declareAllLast_;
edm::EventNumber_t currentEvent_ = 0;
edm::LuminosityBlockNumber_t currentLumi_ = 0;
edm::RunNumber_t currentRun_ = 0;
unsigned int currentFile_ = 0;
unsigned int eventInCurrentLumi_ = 0;
unsigned int lumiInCurrentRun_ = 0;
bool startedNewRun_ = false;
bool lastEventOfLumi_ = false;
bool noEventsInLumi_ = false;
};
SourceWithWaits::SourceWithWaits(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
: edm::InputSource(pset, desc),
timePerLumi_(pset.getUntrackedParameter<double>("timePerLumi")),
sleepAfterStartOfRun_(pset.getUntrackedParameter<double>("sleepAfterStartOfRun")),
eventsPerLumi_(pset.getUntrackedParameter<std::vector<unsigned int>>("eventsPerLumi")),
lumisPerRun_(pset.getUntrackedParameter<unsigned int>("lumisPerRun")),
multipleEntriesForRun_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForRun")),
multipleEntriesForLumi_(pset.getUntrackedParameter<unsigned int>("multipleEntriesForLumi")),
declareLast_(pset.getUntrackedParameter<bool>("declareLast")),
declareAllLast_(pset.getUntrackedParameter<bool>("declareAllLast")) {}
SourceWithWaits::~SourceWithWaits() {}
void SourceWithWaits::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.addUntracked<double>("timePerLumi");
desc.addUntracked<double>("sleepAfterStartOfRun");
desc.addUntracked<std::vector<unsigned int>>("eventsPerLumi");
desc.addUntracked<unsigned int>("lumisPerRun");
desc.addUntracked<unsigned int>("multipleEntriesForRun", 0);
desc.addUntracked<unsigned int>("multipleEntriesForLumi", 0);
desc.addUntracked<bool>("declareLast", false);
desc.addUntracked<bool>("declareAllLast", false);
descriptions.add("source", desc);
}
edm::InputSource::ItemTypeInfo SourceWithWaits::getNextItemType() {
if (startedNewRun_) {
std::this_thread::sleep_for(std::chrono::duration<double>(sleepAfterStartOfRun_));
startedNewRun_ = false;
}
if (lastEventOfLumi_ || noEventsInLumi_) {
std::this_thread::sleep_for(std::chrono::duration<double>(timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1)));
lastEventOfLumi_ = false;
noEventsInLumi_ = false;
}
// First three cases are for the initial file, run, and lumi transitions
// Note that there will always be at exactly one file and at least
// one run from this test source.
if (currentFile_ == 0u) {
++currentFile_;
return ItemType::IsFile;
}
// First Run
else if (currentRun_ == 0u) {
++currentRun_;
if (currentRun_ != multipleEntriesForRun_) {
startedNewRun_ = true;
auto const position =
(declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsRun, position);
} else {
// declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
// if the Framework detects the potential InputSource bug and throws an exception.
auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsRun, position);
}
}
// If configured, a second Entry for the same run number and reduced ProcessHistoryID
else if (currentRun_ == multipleEntriesForRun_) {
multipleEntriesForRun_ = 0;
startedNewRun_ = true;
auto const position =
(declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsRun, position);
}
// First lumi
else if (currentLumi_ == 0u && lumisPerRun_ != 0) {
++currentLumi_;
++lumiInCurrentRun_;
// The job will stop when we hit the end of the eventsPerLumi vector
// unless maxEvents stopped it earlier.
if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
return ItemType::IsStop;
}
if (currentLumi_ != multipleEntriesForLumi_) {
if (eventsPerLumi_[currentLumi_ - 1] == 0) {
noEventsInLumi_ = true;
}
auto const position =
(declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsLumi, position);
} else {
// declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
// if the Framework detects the potential InputSource bug and throws an exception.
auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsLumi, position);
}
}
// If configured, a second Entry for the same lumi number in the same run
else if (currentLumi_ == multipleEntriesForLumi_ && lumisPerRun_ != 0) {
multipleEntriesForLumi_ = 0;
if (eventsPerLumi_[currentLumi_ - 1] == 0) {
noEventsInLumi_ = true;
}
auto const position =
(declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsLumi, position);
}
// Handle events in the current lumi
else if (eventInCurrentLumi_ < eventsPerLumi_[currentLumi_ - 1] && lumisPerRun_ != 0) {
std::this_thread::sleep_for(std::chrono::duration<double>(timePerLumi_ / (eventsPerLumi_[currentLumi_ - 1] + 1)));
++eventInCurrentLumi_;
++currentEvent_;
if (eventInCurrentLumi_ == eventsPerLumi_[currentLumi_ - 1]) {
lastEventOfLumi_ = true;
}
return ItemType::IsEvent;
}
// Next lumi
else if (lumiInCurrentRun_ < lumisPerRun_) {
++currentLumi_;
++lumiInCurrentRun_;
// The job will stop when we hit the end of the eventsPerLumi vector
// unless maxEvents stopped it earlier.
if ((currentLumi_ - 1) >= eventsPerLumi_.size()) {
return ItemType::IsStop;
}
eventInCurrentLumi_ = 0;
if (currentLumi_ != multipleEntriesForLumi_) {
if (eventsPerLumi_[currentLumi_ - 1] == 0) {
noEventsInLumi_ = true;
}
auto const position =
(declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsLumi, position);
} else {
// declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
// if the Framework detects the potential InputSource bug and throws an exception.
auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsLumi, position);
}
}
// Next run
else {
// The job will stop when we hit the end of the eventsPerLumi vector
// unless maxEvents stopped it earlier. Don't start the run if
// it will end with no lumis in it.
if (currentLumi_ >= eventsPerLumi_.size()) {
return ItemType::IsStop;
}
++currentRun_;
// Avoid infinite job if lumisPerRun_ is 0
if (currentRun_ > 100) {
return ItemType::IsStop;
}
lumiInCurrentRun_ = 0;
if (currentRun_ != multipleEntriesForRun_) {
startedNewRun_ = true;
auto const position =
(declareLast_ || declareAllLast_) ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsRun, position);
} else {
// declareAllLast_ with multipleEntriesForRun_ or multipleEntriesForLumi_ is an intentional bug, used to test
// if the Framework detects the potential InputSource bug and throws an exception.
auto const position = declareAllLast_ ? ItemPosition::LastItemToBeMerged : ItemPosition::NotLastItemToBeMerged;
return ItemTypeInfo(ItemType::IsRun, position);
}
}
// Should be impossible to get here
assert(false);
// return something so it will compile
return ItemType::IsStop;
}
std::shared_ptr<edm::RunAuxiliary> SourceWithWaits::readRunAuxiliary_() {
edm::Timestamp ts = edm::Timestamp(1);
return std::make_shared<edm::RunAuxiliary>(currentRun_, ts, edm::Timestamp::invalidTimestamp());
}
std::shared_ptr<edm::LuminosityBlockAuxiliary> SourceWithWaits::readLuminosityBlockAuxiliary_() {
edm::Timestamp ts = edm::Timestamp(1);
return std::make_shared<edm::LuminosityBlockAuxiliary>(
currentRun_, currentLumi_, ts, edm::Timestamp::invalidTimestamp());
}
void SourceWithWaits::readEvent_(edm::EventPrincipal& eventPrincipal) {
bool isRealData = false;
edm::EventAuxiliary aux(
edm::EventID(currentRun_, currentLumi_, currentEvent_), processGUID(), edm::Timestamp(1), isRealData);
auto history = processHistoryRegistry().getMapped(aux.processHistoryID());
eventPrincipal.fillEventPrincipal(aux, history);
}
} // namespace edmtest
using edmtest::SourceWithWaits;
DEFINE_FWK_INPUT_SOURCE(SourceWithWaits);
|