File indexing completed on 2024-05-29 23:12:42
0001 #include <filesystem>
0002 #include <fstream>
0003 #include <iostream>
0004 #include <string>
0005 #include <utility>
0006 #include <vector>
0007 #include <sys/stat.h>
0008 #include <sys/types.h>
0009 #include <unistd.h>
0010 #include <boost/property_tree/json_parser.hpp>
0011 #include <openssl/md5.h>
0012 #include <fmt/printf.h>
0013
0014 #include <google/protobuf/io/coded_stream.h>
0015 #include <google/protobuf/io/gzip_stream.h>
0016 #include <google/protobuf/io/zero_copy_stream_impl.h>
0017
0018 #include <TString.h>
0019 #include <TSystem.h>
0020 #include <TBufferFile.h>
0021
0022 #include "zlib.h"
0023 #include "DQMServices/Core/interface/DQMStore.h"
0024 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0025 #include "FWCore/Framework/interface/LuminosityBlock.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ServiceRegistry/interface/Service.h"
0028
0029 #include "DQMFileSaverPB.h"
0030
0031 using namespace dqm;
0032
0033 DQMFileSaverPB::DQMFileSaverPB(const edm::ParameterSet& ps) : DQMFileSaverBase(ps) {
0034 fakeFilterUnitMode_ = ps.getUntrackedParameter<bool>("fakeFilterUnitMode", false);
0035 streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel", "streamDQMHistograms");
0036 tag_ = ps.getUntrackedParameter<std::string>("tag", "UNKNOWN");
0037
0038 transferDestination_ = "";
0039 mergeType_ = "";
0040
0041
0042
0043 if (tag_ != "UNKNOWN") {
0044 streamLabel_ = "DQMLive";
0045 }
0046
0047 if (!fakeFilterUnitMode_) {
0048 if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
0049 throw cms::Exception("DQMFileSaverPB") << "EvFDaqDirector is not available";
0050 std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_);
0051 std::ofstream file(initFileName);
0052 if (!file)
0053 throw cms::Exception("DQMFileSaverPB")
0054 << "Cannot create INI file: " << initFileName << " error: " << strerror(errno);
0055 file.close();
0056 }
0057 }
0058
0059 DQMFileSaverPB::~DQMFileSaverPB() = default;
0060
0061 void DQMFileSaverPB::initRun() const {
0062 if (!fakeFilterUnitMode_) {
0063 transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel_);
0064 mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel_, evf::MergeTypePB);
0065 }
0066 }
0067
0068 void DQMFileSaverPB::saveLumi(const FileParameters& fp) const {
0069
0070 namespace bpt = boost::property_tree;
0071
0072 std::string openJsonFilePathName;
0073 std::string jsonFilePathName;
0074 std::string openHistoFilePathName;
0075 std::string histoFilePathName;
0076
0077 evf::FastMonitoringService* fms = nullptr;
0078 edm::Service<DQMStore> store;
0079
0080
0081 if (fakeFilterUnitMode_) {
0082 std::string runDir = fmt::sprintf("%s/run%06d", fp.path_, fp.run_);
0083 std::string baseName = "";
0084 std::filesystem::create_directories(runDir);
0085
0086
0087
0088 if (tag_ == "UNKNOWN") {
0089 baseName = fmt::sprintf("%s/run%06d_ls%04d_%s", runDir, fp.run_, fp.lumi_, streamLabel_);
0090 } else {
0091 baseName = fmt::sprintf("%s/run%06d_%s_%s", runDir, fp.run_, tag_, streamLabel_);
0092 }
0093
0094 jsonFilePathName = baseName + ".jsn";
0095 openJsonFilePathName = jsonFilePathName + ".open";
0096
0097 histoFilePathName = baseName + ".pb";
0098 openHistoFilePathName = histoFilePathName + ".open";
0099 } else {
0100 openJsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOpenOutputJsonFilePath(fp.lumi_, streamLabel_);
0101 jsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(fp.lumi_, streamLabel_);
0102
0103 openHistoFilePathName =
0104 edm::Service<evf::EvFDaqDirector>()->getOpenProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
0105 histoFilePathName = edm::Service<evf::EvFDaqDirector>()->getProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
0106
0107 fms = edm::Service<evf::FastMonitoringService>().operator->();
0108 }
0109
0110 bool abortFlag = false;
0111 if (fms ? fms->getEventsProcessedForLumi(fp.lumi_, &abortFlag) : true) {
0112
0113 this->savePB(&*store, openHistoFilePathName, fp.run_, fp.lumi_);
0114
0115
0116 ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
0117 }
0118
0119 if (abortFlag)
0120 return;
0121
0122
0123 bpt::ptree pt = fillJson(fp.run_, fp.lumi_, histoFilePathName, transferDestination_, mergeType_, fms);
0124 write_json(openJsonFilePathName, pt);
0125 ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
0126 }
0127
0128 void DQMFileSaverPB::saveRun(const FileParameters& fp) const {
0129
0130 }
0131
0132 boost::property_tree::ptree DQMFileSaverPB::fillJson(int run,
0133 int lumi,
0134 const std::string& dataFilePathName,
0135 const std::string& transferDestinationStr,
0136 const std::string& mergeTypeStr,
0137 evf::FastMonitoringService* fms) {
0138 namespace bpt = boost::property_tree;
0139 namespace bfs = std::filesystem;
0140
0141 bpt::ptree pt;
0142
0143 int hostnameReturn;
0144 char host[32];
0145 hostnameReturn = gethostname(host, sizeof(host));
0146 if (hostnameReturn == -1)
0147 throw cms::Exception("fillJson") << "Internal error, cannot get host name";
0148
0149 int pid = getpid();
0150 std::ostringstream oss_pid;
0151 oss_pid << pid;
0152
0153 int nProcessed = fms ? (fms->getEventsProcessedForLumi(lumi)) : -1;
0154
0155
0156 std::string dataFileName;
0157 struct stat dataFileStat;
0158 dataFileStat.st_size = 0;
0159 if (nProcessed) {
0160 if (stat(dataFilePathName.c_str(), &dataFileStat) != 0)
0161 throw cms::Exception("fillJson") << "Internal error, cannot get data file: " << dataFilePathName;
0162
0163 dataFileName = bfs::path(dataFilePathName).filename().string();
0164 }
0165
0166 bpt::ptree data;
0167 bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize, inputFiles, fileAdler32,
0168 transferDestination, mergeType, hltErrorEvents;
0169
0170 processedEvents.put("", nProcessed);
0171 acceptedEvents.put("", nProcessed);
0172
0173 errorEvents.put("", 0);
0174 bitmask.put("", 0);
0175 fileList.put("", dataFileName);
0176 fileSize.put("", dataFileStat.st_size);
0177 inputFiles.put("", "");
0178 fileAdler32.put("", -1);
0179 transferDestination.put("", transferDestinationStr);
0180 mergeType.put("", mergeTypeStr);
0181 hltErrorEvents.put("", 0);
0182
0183 data.push_back(std::make_pair("", processedEvents));
0184 data.push_back(std::make_pair("", acceptedEvents));
0185 data.push_back(std::make_pair("", errorEvents));
0186 data.push_back(std::make_pair("", bitmask));
0187 data.push_back(std::make_pair("", fileList));
0188 data.push_back(std::make_pair("", fileSize));
0189 data.push_back(std::make_pair("", inputFiles));
0190 data.push_back(std::make_pair("", fileAdler32));
0191 data.push_back(std::make_pair("", transferDestination));
0192 data.push_back(std::make_pair("", mergeType));
0193 data.push_back(std::make_pair("", hltErrorEvents));
0194
0195 pt.add_child("data", data);
0196
0197 if (fms == nullptr) {
0198 pt.put("definition", "/fakeDefinition.jsn");
0199 } else {
0200
0201 bfs::path outJsonDefName{
0202 edm::Service<evf::EvFDaqDirector>()->baseRunDir()};
0203 outJsonDefName /= (std::string("output_") + oss_pid.str() + std::string(".jsd"));
0204 pt.put("definition", outJsonDefName.string());
0205 }
0206
0207 char sourceInfo[64];
0208 sprintf(sourceInfo, "%s_%d", host, pid);
0209 pt.put("source", sourceInfo);
0210
0211 return pt;
0212 }
0213
0214 void DQMFileSaverPB::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0215 edm::ParameterSetDescription desc;
0216 desc.setComment("Saves histograms from DQM store, HLT->pb workflow.");
0217
0218 desc.addUntracked<bool>("fakeFilterUnitMode", false)->setComment("If set, EvFDaqDirector is emulated and not used.");
0219
0220 desc.addUntracked<std::string>("streamLabel", "streamDQMHistograms")->setComment("Label of the stream.");
0221
0222 DQMFileSaverBase::fillDescription(desc);
0223
0224
0225
0226
0227
0228 descriptions.addDefault(desc);
0229 }
0230
0231 void DQMFileSaverPB::savePB(DQMStore* store, std::string const& filename, int run, int lumi) const {
0232 using google::protobuf::io::FileOutputStream;
0233 using google::protobuf::io::GzipOutputStream;
0234 using google::protobuf::io::StringOutputStream;
0235
0236 unsigned int nme = 0;
0237
0238 dqmstorepb::ROOTFilePB dqmstore_message;
0239
0240
0241 auto mes = store->getAllContents("");
0242 for (auto const me : mes) {
0243 TBufferFile buffer(TBufferFile::kWrite);
0244 if (me->kind() < MonitorElement::Kind::TH1F) {
0245 TObjString object(me->tagString().c_str());
0246 buffer.WriteObject(&object);
0247 } else {
0248 buffer.WriteObject(me->getRootObject());
0249 }
0250 dqmstorepb::ROOTFilePB::Histo& histo = *dqmstore_message.add_histo();
0251 histo.set_full_pathname(me->getFullname());
0252 uint32_t flags = 0;
0253 flags |= (uint32_t)me->kind();
0254 if (me->getLumiFlag())
0255 flags |= DQMNet::DQM_PROP_LUMI;
0256 if (me->getEfficiencyFlag())
0257 flags |= DQMNet::DQM_PROP_EFFICIENCY_PLOT;
0258 histo.set_flags(flags);
0259 histo.set_size(buffer.Length());
0260
0261 if (tag_ == "UNKNOWN") {
0262 histo.set_streamed_histo((void const*)buffer.Buffer(), buffer.Length());
0263 } else {
0264
0265 int maxOutputSize = this->getMaxCompressedSize(buffer.Length());
0266 std::vector<char> compression_output(maxOutputSize);
0267 uLong total_out = this->compressME(buffer, maxOutputSize, compression_output.data());
0268 histo.set_streamed_histo(compression_output.data(), total_out);
0269 }
0270
0271
0272 for (const auto& qr : me->getQReports()) {
0273 std::string result;
0274
0275 char buf[64];
0276 std::snprintf(buf, sizeof(buf), "qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
0277 result = '<' + me->getName() + '.' + qr->getQRName() + '>';
0278 result += buf;
0279 result += qr->getAlgorithm() + ':' + qr->getMessage();
0280 result += "</" + me->getName() + '.' + qr->getQRName() + '>';
0281 TObjString str(result.c_str());
0282
0283 dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
0284 TBufferFile qr_buffer(TBufferFile::kWrite);
0285 qr_buffer.WriteObject(&str);
0286 qr_histo.set_full_pathname(me->getFullname() + '.' + qr->getQRName());
0287 qr_histo.set_flags(static_cast<uint32_t>(MonitorElement::Kind::STRING));
0288 qr_histo.set_size(qr_buffer.Length());
0289
0290
0291 if (tag_ == "UNKNOWN") {
0292 qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
0293 } else {
0294
0295 int maxOutputSize = this->getMaxCompressedSize(qr_buffer.Length());
0296 char* compression_output = new char[maxOutputSize];
0297 uLong total_out = this->compressME(qr_buffer, maxOutputSize, compression_output);
0298 qr_histo.set_streamed_histo(compression_output, total_out);
0299 delete[] compression_output;
0300 }
0301 }
0302
0303
0304
0305
0306
0307
0308
0309
0310 ++nme;
0311 }
0312
0313 int filedescriptor =
0314 ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
0315 FileOutputStream file_stream(filedescriptor);
0316 if (tag_ == "UNKNOWN") {
0317 GzipOutputStream::Options options;
0318 options.format = GzipOutputStream::GZIP;
0319 options.compression_level = 1;
0320 GzipOutputStream gzip_stream(&file_stream, options);
0321 dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
0322
0323
0324 gzip_stream.Close();
0325 file_stream.Close();
0326 } else {
0327
0328 dqmstore_message.SerializeToZeroCopyStream(&file_stream);
0329
0330
0331 file_stream.Close();
0332 }
0333
0334
0335 edm::LogInfo("DQMFileSaverPB") << "savePB: successfully wrote " << nme << " objects "
0336 << "into DQM file '" << filename << "'\n";
0337 }
0338
0339 int DQMFileSaverPB::getMaxCompressedSize(int bufferSize) const {
0340
0341
0342
0343 int n16kBlocks = (bufferSize + 16383) / 16384;
0344 int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
0345 return maxOutputSize;
0346 }
0347
0348 ulong DQMFileSaverPB::compressME(const TBufferFile& buffer, int maxOutputSize, char* compression_output) const {
0349 z_stream deflateStream;
0350 deflateStream.zalloc = Z_NULL;
0351 deflateStream.zfree = Z_NULL;
0352 deflateStream.opaque = Z_NULL;
0353 deflateStream.avail_in = (uInt)buffer.Length() + 1;
0354 deflateStream.next_in = (Bytef*)buffer.Buffer();
0355 deflateStream.avail_out = (uInt)maxOutputSize;
0356 deflateStream.next_out = (Bytef*)compression_output;
0357
0358
0359 deflateInit(&deflateStream, Z_BEST_COMPRESSION);
0360 deflate(&deflateStream, Z_FINISH);
0361 deflateEnd(&deflateStream);
0362
0363 return deflateStream.total_out;
0364 }
0365
0366 #include "FWCore/Framework/interface/MakerMacros.h"
0367 DEFINE_FWK_MODULE(DQMFileSaverPB);