1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
|
#include <filesystem>
#include <fstream>
#include <iostream>
#include <string>
#include <utility>
#include <vector>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <boost/property_tree/json_parser.hpp>
#include <openssl/md5.h>
#include <fmt/printf.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/gzip_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <TString.h>
#include <TSystem.h>
#include <TBufferFile.h>
#include "zlib.h"
#include "DQMServices/Core/interface/DQMStore.h"
#include "DQMServices/Core/interface/ROOTFilePB.pb.h"
#include "FWCore/Framework/interface/LuminosityBlock.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include "DQMFileSaverPB.h"
using namespace dqm;
DQMFileSaverPB::DQMFileSaverPB(const edm::ParameterSet& ps) : DQMFileSaverBase(ps) {
fakeFilterUnitMode_ = ps.getUntrackedParameter<bool>("fakeFilterUnitMode", false);
streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel", "streamDQMHistograms");
tag_ = ps.getUntrackedParameter<std::string>("tag", "UNKNOWN");
transferDestination_ = "";
mergeType_ = "";
// If tag is set we're running in a DQM Live mode.
// Snapshot files will be saved for every client, then they will be merged and uploaded to the new DQM GUI.
if (tag_ != "UNKNOWN") {
streamLabel_ = "DQMLive";
}
if (!fakeFilterUnitMode_) {
if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
throw cms::Exception("DQMFileSaverPB") << "EvFDaqDirector is not available";
std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_);
std::ofstream file(initFileName);
if (!file)
throw cms::Exception("DQMFileSaverPB")
<< "Cannot create INI file: " << initFileName << " error: " << strerror(errno);
file.close();
}
}
DQMFileSaverPB::~DQMFileSaverPB() = default;
void DQMFileSaverPB::initRun() const {
if (!fakeFilterUnitMode_) {
transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel_);
mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel_, evf::MergeTypePB);
}
}
void DQMFileSaverPB::saveLumi(const FileParameters& fp) const {
// get from DAQ2 services where to store the files according to their format
namespace bpt = boost::property_tree;
std::string openJsonFilePathName;
std::string jsonFilePathName;
std::string openHistoFilePathName;
std::string histoFilePathName;
evf::FastMonitoringService* fms = nullptr;
edm::Service<DQMStore> store;
// create the files names
if (fakeFilterUnitMode_) {
std::string runDir = fmt::sprintf("%s/run%06d", fp.path_, fp.run_);
std::string baseName = "";
std::filesystem::create_directories(runDir);
// If tag is configured, append it to the name of the resulting file.
// This differentiates files saved by different clients.
// If tag is not configured, we don't add it at all to keep the old behaviour unchanged.
if (tag_ == "UNKNOWN") {
baseName = fmt::sprintf("%s/run%06d_ls%04d_%s", runDir, fp.run_, fp.lumi_, streamLabel_);
} else {
baseName = fmt::sprintf("%s/run%06d_%s_%s", runDir, fp.run_, tag_, streamLabel_);
}
jsonFilePathName = baseName + ".jsn";
openJsonFilePathName = jsonFilePathName + ".open";
histoFilePathName = baseName + ".pb";
openHistoFilePathName = histoFilePathName + ".open";
} else {
openJsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOpenOutputJsonFilePath(fp.lumi_, streamLabel_);
jsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(fp.lumi_, streamLabel_);
openHistoFilePathName =
edm::Service<evf::EvFDaqDirector>()->getOpenProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
histoFilePathName = edm::Service<evf::EvFDaqDirector>()->getProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
fms = edm::Service<evf::FastMonitoringService>().operator->();
}
bool abortFlag = false;
if (fms ? fms->getEventsProcessedForLumi(fp.lumi_, &abortFlag) : true) {
// Save the file in the open directory.
this->savePB(&*store, openHistoFilePathName, fp.run_, fp.lumi_);
// Now move the the data and json files into the output directory.
::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
}
if (abortFlag)
return;
// Write the json file in the open directory.
bpt::ptree pt = fillJson(fp.run_, fp.lumi_, histoFilePathName, transferDestination_, mergeType_, fms);
write_json(openJsonFilePathName, pt);
::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
}
void DQMFileSaverPB::saveRun(const FileParameters& fp) const {
// no saving for the run
}
boost::property_tree::ptree DQMFileSaverPB::fillJson(int run,
int lumi,
const std::string& dataFilePathName,
const std::string& transferDestinationStr,
const std::string& mergeTypeStr,
evf::FastMonitoringService* fms) {
namespace bpt = boost::property_tree;
namespace bfs = std::filesystem;
bpt::ptree pt;
int hostnameReturn;
char host[32];
hostnameReturn = gethostname(host, sizeof(host));
if (hostnameReturn == -1)
throw cms::Exception("fillJson") << "Internal error, cannot get host name";
int pid = getpid();
std::ostringstream oss_pid;
oss_pid << pid;
int nProcessed = fms ? (fms->getEventsProcessedForLumi(lumi)) : -1;
// Stat the data file: if not there, throw
std::string dataFileName;
struct stat dataFileStat;
dataFileStat.st_size = 0;
if (nProcessed) {
if (stat(dataFilePathName.c_str(), &dataFileStat) != 0)
throw cms::Exception("fillJson") << "Internal error, cannot get data file: " << dataFilePathName;
// Extract only the data file name from the full path
dataFileName = bfs::path(dataFilePathName).filename().string();
}
// The availability test of the FastMonitoringService was done in the ctor.
bpt::ptree data;
bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize, inputFiles, fileAdler32,
transferDestination, mergeType, hltErrorEvents;
processedEvents.put("", nProcessed); // Processed events
acceptedEvents.put("", nProcessed); // Accepted events, same as processed for our purposes
errorEvents.put("", 0); // Error events
bitmask.put("", 0); // Bitmask of abs of CMSSW return code
fileList.put("", dataFileName); // Data file the information refers to
fileSize.put("", dataFileStat.st_size); // Size in bytes of the data file
inputFiles.put("", ""); // We do not care about input files!
fileAdler32.put("", -1); // placeholder to match output json definition
transferDestination.put("", transferDestinationStr); // SM Transfer destination field
mergeType.put("", mergeTypeStr); // SM Transfer destination field
hltErrorEvents.put("", 0); // Error events
data.push_back(std::make_pair("", processedEvents));
data.push_back(std::make_pair("", acceptedEvents));
data.push_back(std::make_pair("", errorEvents));
data.push_back(std::make_pair("", bitmask));
data.push_back(std::make_pair("", fileList));
data.push_back(std::make_pair("", fileSize));
data.push_back(std::make_pair("", inputFiles));
data.push_back(std::make_pair("", fileAdler32));
data.push_back(std::make_pair("", transferDestination));
data.push_back(std::make_pair("", mergeType));
data.push_back(std::make_pair("", hltErrorEvents));
pt.add_child("data", data);
if (fms == nullptr) {
pt.put("definition", "/fakeDefinition.jsn");
} else {
// The availability test of the EvFDaqDirector Service was done in the ctor.
bfs::path outJsonDefName{
edm::Service<evf::EvFDaqDirector>()->baseRunDir()}; //we assume this file is written bu the EvF Output module
outJsonDefName /= (std::string("output_") + oss_pid.str() + std::string(".jsd"));
pt.put("definition", outJsonDefName.string());
}
char sourceInfo[64]; //host and pid information
sprintf(sourceInfo, "%s_%d", host, pid);
pt.put("source", sourceInfo);
return pt;
}
void DQMFileSaverPB::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.setComment("Saves histograms from DQM store, HLT->pb workflow.");
desc.addUntracked<bool>("fakeFilterUnitMode", false)->setComment("If set, EvFDaqDirector is emulated and not used.");
desc.addUntracked<std::string>("streamLabel", "streamDQMHistograms")->setComment("Label of the stream.");
DQMFileSaverBase::fillDescription(desc);
// Changed to use addDefault instead of add here because previously
// DQMFileSaverOnline and DQMFileSaverPB both used the module label
// "saver" which caused conflicting cfi filenames to be generated.
// add could be used if unique module labels were given.
descriptions.addDefault(desc);
}
void DQMFileSaverPB::savePB(DQMStore* store, std::string const& filename, int run, int lumi) const {
using google::protobuf::io::FileOutputStream;
using google::protobuf::io::GzipOutputStream;
using google::protobuf::io::StringOutputStream;
unsigned int nme = 0;
dqmstorepb::ROOTFilePB dqmstore_message;
// We save all histograms, indifferent of the lumi flag: even tough we save per lumi, this is a *snapshot*.
auto mes = store->getAllContents("");
for (auto const me : mes) {
TBufferFile buffer(TBufferFile::kWrite);
if (me->kind() < MonitorElement::Kind::TH1F) {
TObjString object(me->tagString().c_str());
buffer.WriteObject(&object);
} else {
buffer.WriteObject(me->getRootObject());
}
dqmstorepb::ROOTFilePB::Histo& histo = *dqmstore_message.add_histo();
histo.set_full_pathname(me->getFullname());
uint32_t flags = 0;
flags |= (uint32_t)me->kind();
if (me->getLumiFlag())
flags |= DQMNet::DQM_PROP_LUMI;
if (me->getEfficiencyFlag())
flags |= DQMNet::DQM_PROP_EFFICIENCY_PLOT;
histo.set_flags(flags);
histo.set_size(buffer.Length());
if (tag_ == "UNKNOWN") {
histo.set_streamed_histo((void const*)buffer.Buffer(), buffer.Length());
} else {
// Compress ME blob with zlib
int maxOutputSize = this->getMaxCompressedSize(buffer.Length());
std::vector<char> compression_output(maxOutputSize);
uLong total_out = this->compressME(buffer, maxOutputSize, compression_output.data());
histo.set_streamed_histo(compression_output.data(), total_out);
}
// Save quality reports
for (const auto& qr : me->getQReports()) {
std::string result;
// TODO: 64 is likely too short; memory corruption in the old code?
char buf[64];
std::snprintf(buf, sizeof(buf), "qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
result = '<' + me->getName() + '.' + qr->getQRName() + '>';
result += buf;
result += qr->getAlgorithm() + ':' + qr->getMessage();
result += "</" + me->getName() + '.' + qr->getQRName() + '>';
TObjString str(result.c_str());
dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
TBufferFile qr_buffer(TBufferFile::kWrite);
qr_buffer.WriteObject(&str);
qr_histo.set_full_pathname(me->getFullname() + '.' + qr->getQRName());
qr_histo.set_flags(static_cast<uint32_t>(MonitorElement::Kind::STRING));
qr_histo.set_size(qr_buffer.Length());
// qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
if (tag_ == "UNKNOWN") {
qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
} else {
// Compress ME blob with zlib
int maxOutputSize = this->getMaxCompressedSize(qr_buffer.Length());
char* compression_output = new char[maxOutputSize];
uLong total_out = this->compressME(qr_buffer, maxOutputSize, compression_output);
qr_histo.set_streamed_histo(compression_output, total_out);
delete[] compression_output;
}
}
// Save efficiency tag, if any.
// XXX not supported by protobuf files.
// Save tag if any.
// XXX not supported by protobuf files.
// Count saved histograms
++nme;
}
int filedescriptor =
::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
FileOutputStream file_stream(filedescriptor);
if (tag_ == "UNKNOWN") {
GzipOutputStream::Options options;
options.format = GzipOutputStream::GZIP;
options.compression_level = 1;
GzipOutputStream gzip_stream(&file_stream, options);
dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
// Flush the internal streams & Close the file descriptor
gzip_stream.Close();
file_stream.Close();
} else {
// We zlib compressed individual MEs so no need to compress the entire file again.
dqmstore_message.SerializeToZeroCopyStream(&file_stream);
// Flush the internal stream & Close the file descriptor
file_stream.Close();
}
// Maybe make some noise.
edm::LogInfo("DQMFileSaverPB") << "savePB: successfully wrote " << nme << " objects "
<< "into DQM file '" << filename << "'\n";
}
int DQMFileSaverPB::getMaxCompressedSize(int bufferSize) const {
// When input data is very badly compressable, zlib will add overhead instead of reducing the size.
// There is a minor amount of overhead (6 bytes overall and 5 bytes per 16K block) that is taken
// into consideration here to find out potential absolute maximum size of the output.
int n16kBlocks = (bufferSize + 16383) / 16384; // round up any fraction of a block
int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
return maxOutputSize;
}
ulong DQMFileSaverPB::compressME(const TBufferFile& buffer, int maxOutputSize, char* compression_output) const {
z_stream deflateStream;
deflateStream.zalloc = Z_NULL;
deflateStream.zfree = Z_NULL;
deflateStream.opaque = Z_NULL;
deflateStream.avail_in = (uInt)buffer.Length() + 1; // size of input, string + terminator
deflateStream.next_in = (Bytef*)buffer.Buffer(); // input array
deflateStream.avail_out = (uInt)maxOutputSize; // size of output
deflateStream.next_out = (Bytef*)compression_output; // output array, result will be placed here
// The actual compression
deflateInit(&deflateStream, Z_BEST_COMPRESSION);
deflate(&deflateStream, Z_FINISH);
deflateEnd(&deflateStream);
return deflateStream.total_out;
}
#include "FWCore/Framework/interface/MakerMacros.h"
DEFINE_FWK_MODULE(DQMFileSaverPB);
|