1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
#include "EventFilter//Utilities/interface/DAQSourceModelsScoutingRun3.h"
using namespace edm::streamer;
void DataModeScoutingRun3::makeDirectoryEntries(std::vector<std::string> const& baseDirs,
std::vector<int> const& numSources,
std::vector<int> const& sourceIDs,
std::string const& sourceIdentifier,
std::string const& runDir) {
std::filesystem::path runDirP(runDir);
for (auto& baseDir : baseDirs) {
std::filesystem::path baseDirP(baseDir);
buPaths_.emplace_back(baseDirP / runDirP);
}
// store the number of sources in each BU
buNumSources_ = numSources;
}
std::pair<bool, std::vector<std::string>> DataModeScoutingRun3::defineAdditionalFiles(std::string const& primaryName,
bool fileListMode) const {
std::vector<std::string> additionalFiles;
if (fileListMode) {
// Expected file naming when working in file list mode
for (int j = 1; j < buNumSources_[0]; j++) {
additionalFiles.push_back(primaryName + "_" + std::to_string(j));
}
return std::make_pair(true, additionalFiles);
}
auto fullpath = std::filesystem::path(primaryName);
auto fullname = fullpath.filename();
for (size_t i = 0; i < buPaths_.size(); i++) {
std::filesystem::path newPath = buPaths_[i] / fullname;
if (i != 0) {
// secondary files from other ramdisks
additionalFiles.push_back(newPath.generic_string());
}
// add extra sources from the same ramdisk
for (int j = 1; j < buNumSources_[i]; j++) {
additionalFiles.push_back(newPath.generic_string() + "_" + std::to_string(j));
}
}
return std::make_pair(true, additionalFiles);
}
void DataModeScoutingRun3::readEvent(edm::EventPrincipal& eventPrincipal) {
assert(!events_.empty());
edm::TimeValue_t time;
timeval stv;
gettimeofday(&stv, nullptr);
time = stv.tv_sec;
time = (time << 32) + stv.tv_usec;
edm::Timestamp tstamp(time);
// set provenance helpers
uint32_t hdrEventID = currOrbit_;
edm::EventID eventID = edm::EventID(daqSource_->eventRunNumber(), daqSource_->currentLumiSection(), hdrEventID);
edm::EventAuxiliary aux(
eventID, daqSource_->processGUID(), tstamp, events_[0]->isRealData(), edm::EventAuxiliary::PhysicsTrigger);
aux.setProcessHistoryID(daqSource_->processHistoryID());
daqSource_->makeEventWrapper(eventPrincipal, aux);
// create scouting raw data collection
std::unique_ptr<SDSRawDataCollection> rawData(new SDSRawDataCollection);
// Fill the ScoutingRawDataCollection with valid orbit data from the multiple sources
for (const auto& pair : sourceValidOrbitPair_) {
fillSDSRawDataCollection(*rawData, (char*)events_[pair.second]->payload(), events_[pair.second]->eventSize());
}
std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<SDSRawDataCollection>(std::move(rawData)));
eventPrincipal.put(
daqProvenanceHelpers_[0]->productDescription(), std::move(edp), daqProvenanceHelpers_[0]->dummyProvenance());
eventCached_ = false;
}
void DataModeScoutingRun3::fillSDSRawDataCollection(SDSRawDataCollection& rawData, char* buff, size_t len) {
size_t pos = 0;
// get the source ID
int sourceId = *((uint32_t*)(buff + pos));
pos += 4;
// size of the orbit paylod
size_t orbitSize = len - pos;
// set the size (=orbit size) in the SRDColletion of the current source.
// FRD size is expecting 8 bytes words, while scouting is using 4 bytes
// words. This could be different for some future sources.
FEDRawData& fedData = rawData.FEDData(sourceId);
fedData.resize(orbitSize, 4);
memcpy(fedData.data(), buff + pos, orbitSize);
return;
}
std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& DataModeScoutingRun3::makeDaqProvenanceHelpers() {
//set SRD data collection
daqProvenanceHelpers_.clear();
daqProvenanceHelpers_.emplace_back(std::make_shared<const edm::DaqProvenanceHelper>(
edm::TypeID(typeid(SDSRawDataCollection)), "SDSRawDataCollection", "SDSRawDataCollection", "DAQSource"));
return daqProvenanceHelpers_;
}
bool DataModeScoutingRun3::nextEventView(RawInputFile*) {
blockCompleted_ = false;
if (eventCached_)
return true;
// move the data block address only for the sources processed
// un the previous event by adding the last event size
for (const auto& pair : sourceValidOrbitPair_) {
dataBlockAddrs_[pair.first] += events_[pair.second]->size();
}
return makeEvents();
}
bool DataModeScoutingRun3::makeEvents() {
// clear events and reset current orbit
events_.clear();
sourceValidOrbitPair_.clear();
currOrbit_ = 0xFFFFFFFF; // max uint
assert(!blockCompleted_);
// create current "events" (= orbits) list from each data source,
// check if one dataBlock terminated earlier than others.
for (int i = 0; i < numFiles_; i++) {
if (dataBlockAddrs_[i] >= dataBlockMaxAddrs_[i]) {
completedBlocks_[i] = true;
continue;
}
// event contains data, add it to the events list
events_.emplace_back(std::make_unique<FRDEventMsgView>(dataBlockAddrs_[i]));
if (dataBlockAddrs_[i] + events_.back()->size() > dataBlockMaxAddrs_[i])
throw cms::Exception("DAQSource::getNextEvent")
<< " event id:" << events_.back()->event() << " lumi:" << events_.back()->lumi()
<< " run:" << events_.back()->run() << " of size:" << events_.back()->size()
<< " bytes does not fit into the buffer or has corrupted header";
// find the minimum orbit for the current event between all files
if ((events_.back()->event() < currOrbit_) && (!completedBlocks_[i])) {
currOrbit_ = events_.back()->event();
}
}
// mark valid orbits from each data source
// e.g. find when orbit is missing from one source
bool allBlocksCompleted = true;
int evt_idx = 0;
for (int i = 0; i < numFiles_; i++) {
if (completedBlocks_[i]) {
continue;
}
if (events_[evt_idx]->event() != currOrbit_) {
// current source (=i-th source) doesn't contain the expected orbit.
// skip it, and move to the next orbit
} else {
// add a pair <current surce index, event index>
// evt_idx can be different from variable i, as some data blocks can be
// completed before others
sourceValidOrbitPair_.emplace_back(std::make_pair(i, evt_idx));
allBlocksCompleted = false;
}
evt_idx++;
}
if (allBlocksCompleted) {
blockCompleted_ = true;
}
return !allBlocksCompleted;
}
bool DataModeScoutingRun3::checksumValid() { return true; }
std::string DataModeScoutingRun3::getChecksumError() const { return std::string(); }
|