Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-05-29 23:12:42

0001 #include <filesystem>
0002 #include <fstream>
0003 #include <iostream>
0004 #include <string>
0005 #include <utility>
0006 #include <vector>
0007 #include <sys/stat.h>
0008 #include <sys/types.h>
0009 #include <unistd.h>
0010 #include <boost/property_tree/json_parser.hpp>
0011 #include <openssl/md5.h>
0012 #include <fmt/printf.h>
0013 
0014 #include <google/protobuf/io/coded_stream.h>
0015 #include <google/protobuf/io/gzip_stream.h>
0016 #include <google/protobuf/io/zero_copy_stream_impl.h>
0017 
0018 #include <TString.h>
0019 #include <TSystem.h>
0020 #include <TBufferFile.h>
0021 
0022 #include "zlib.h"
0023 #include "DQMServices/Core/interface/DQMStore.h"
0024 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0025 #include "FWCore/Framework/interface/LuminosityBlock.h"
0026 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0027 #include "FWCore/ServiceRegistry/interface/Service.h"
0028 
0029 #include "DQMFileSaverPB.h"
0030 
0031 using namespace dqm;
0032 
0033 DQMFileSaverPB::DQMFileSaverPB(const edm::ParameterSet& ps) : DQMFileSaverBase(ps) {
0034   fakeFilterUnitMode_ = ps.getUntrackedParameter<bool>("fakeFilterUnitMode", false);
0035   streamLabel_ = ps.getUntrackedParameter<std::string>("streamLabel", "streamDQMHistograms");
0036   tag_ = ps.getUntrackedParameter<std::string>("tag", "UNKNOWN");
0037 
0038   transferDestination_ = "";
0039   mergeType_ = "";
0040 
0041   // If tag is set we're running in a DQM Live mode.
0042   // Snapshot files will be saved for every client, then they will be merged and uploaded to the new DQM GUI.
0043   if (tag_ != "UNKNOWN") {
0044     streamLabel_ = "DQMLive";
0045   }
0046 
0047   if (!fakeFilterUnitMode_) {
0048     if (!edm::Service<evf::EvFDaqDirector>().isAvailable())
0049       throw cms::Exception("DQMFileSaverPB") << "EvFDaqDirector is not available";
0050     std::string initFileName = edm::Service<evf::EvFDaqDirector>()->getInitFilePath(streamLabel_);
0051     std::ofstream file(initFileName);
0052     if (!file)
0053       throw cms::Exception("DQMFileSaverPB")
0054           << "Cannot create INI file: " << initFileName << " error: " << strerror(errno);
0055     file.close();
0056   }
0057 }
0058 
0059 DQMFileSaverPB::~DQMFileSaverPB() = default;
0060 
0061 void DQMFileSaverPB::initRun() const {
0062   if (!fakeFilterUnitMode_) {
0063     transferDestination_ = edm::Service<evf::EvFDaqDirector>()->getStreamDestinations(streamLabel_);
0064     mergeType_ = edm::Service<evf::EvFDaqDirector>()->getStreamMergeType(streamLabel_, evf::MergeTypePB);
0065   }
0066 }
0067 
0068 void DQMFileSaverPB::saveLumi(const FileParameters& fp) const {
0069   // get from DAQ2 services where to store the files according to their format
0070   namespace bpt = boost::property_tree;
0071 
0072   std::string openJsonFilePathName;
0073   std::string jsonFilePathName;
0074   std::string openHistoFilePathName;
0075   std::string histoFilePathName;
0076 
0077   evf::FastMonitoringService* fms = nullptr;
0078   edm::Service<DQMStore> store;
0079 
0080   // create the files names
0081   if (fakeFilterUnitMode_) {
0082     std::string runDir = fmt::sprintf("%s/run%06d", fp.path_, fp.run_);
0083     std::string baseName = "";
0084     std::filesystem::create_directories(runDir);
0085     // If tag is configured, append it to the name of the resulting file.
0086     // This differentiates files saved by different clients.
0087     // If tag is not configured, we don't add it at all to keep the old behaviour unchanged.
0088     if (tag_ == "UNKNOWN") {
0089       baseName = fmt::sprintf("%s/run%06d_ls%04d_%s", runDir, fp.run_, fp.lumi_, streamLabel_);
0090     } else {
0091       baseName = fmt::sprintf("%s/run%06d_%s_%s", runDir, fp.run_, tag_, streamLabel_);
0092     }
0093 
0094     jsonFilePathName = baseName + ".jsn";
0095     openJsonFilePathName = jsonFilePathName + ".open";
0096 
0097     histoFilePathName = baseName + ".pb";
0098     openHistoFilePathName = histoFilePathName + ".open";
0099   } else {
0100     openJsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOpenOutputJsonFilePath(fp.lumi_, streamLabel_);
0101     jsonFilePathName = edm::Service<evf::EvFDaqDirector>()->getOutputJsonFilePath(fp.lumi_, streamLabel_);
0102 
0103     openHistoFilePathName =
0104         edm::Service<evf::EvFDaqDirector>()->getOpenProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
0105     histoFilePathName = edm::Service<evf::EvFDaqDirector>()->getProtocolBufferHistogramFilePath(fp.lumi_, streamLabel_);
0106 
0107     fms = edm::Service<evf::FastMonitoringService>().operator->();
0108   }
0109 
0110   bool abortFlag = false;
0111   if (fms ? fms->getEventsProcessedForLumi(fp.lumi_, &abortFlag) : true) {
0112     // Save the file in the open directory.
0113     this->savePB(&*store, openHistoFilePathName, fp.run_, fp.lumi_);
0114 
0115     // Now move the the data and json files into the output directory.
0116     ::rename(openHistoFilePathName.c_str(), histoFilePathName.c_str());
0117   }
0118 
0119   if (abortFlag)
0120     return;
0121 
0122   // Write the json file in the open directory.
0123   bpt::ptree pt = fillJson(fp.run_, fp.lumi_, histoFilePathName, transferDestination_, mergeType_, fms);
0124   write_json(openJsonFilePathName, pt);
0125   ::rename(openJsonFilePathName.c_str(), jsonFilePathName.c_str());
0126 }
0127 
0128 void DQMFileSaverPB::saveRun(const FileParameters& fp) const {
0129   // no saving for the run
0130 }
0131 
0132 boost::property_tree::ptree DQMFileSaverPB::fillJson(int run,
0133                                                      int lumi,
0134                                                      const std::string& dataFilePathName,
0135                                                      const std::string& transferDestinationStr,
0136                                                      const std::string& mergeTypeStr,
0137                                                      evf::FastMonitoringService* fms) {
0138   namespace bpt = boost::property_tree;
0139   namespace bfs = std::filesystem;
0140 
0141   bpt::ptree pt;
0142 
0143   int hostnameReturn;
0144   char host[32];
0145   hostnameReturn = gethostname(host, sizeof(host));
0146   if (hostnameReturn == -1)
0147     throw cms::Exception("fillJson") << "Internal error, cannot get host name";
0148 
0149   int pid = getpid();
0150   std::ostringstream oss_pid;
0151   oss_pid << pid;
0152 
0153   int nProcessed = fms ? (fms->getEventsProcessedForLumi(lumi)) : -1;
0154 
0155   // Stat the data file: if not there, throw
0156   std::string dataFileName;
0157   struct stat dataFileStat;
0158   dataFileStat.st_size = 0;
0159   if (nProcessed) {
0160     if (stat(dataFilePathName.c_str(), &dataFileStat) != 0)
0161       throw cms::Exception("fillJson") << "Internal error, cannot get data file: " << dataFilePathName;
0162     // Extract only the data file name from the full path
0163     dataFileName = bfs::path(dataFilePathName).filename().string();
0164   }
0165   // The availability test of the FastMonitoringService was done in the ctor.
0166   bpt::ptree data;
0167   bpt::ptree processedEvents, acceptedEvents, errorEvents, bitmask, fileList, fileSize, inputFiles, fileAdler32,
0168       transferDestination, mergeType, hltErrorEvents;
0169 
0170   processedEvents.put("", nProcessed);  // Processed events
0171   acceptedEvents.put("", nProcessed);   // Accepted events, same as processed for our purposes
0172 
0173   errorEvents.put("", 0);                               // Error events
0174   bitmask.put("", 0);                                   // Bitmask of abs of CMSSW return code
0175   fileList.put("", dataFileName);                       // Data file the information refers to
0176   fileSize.put("", dataFileStat.st_size);               // Size in bytes of the data file
0177   inputFiles.put("", "");                               // We do not care about input files!
0178   fileAdler32.put("", -1);                              // placeholder to match output json definition
0179   transferDestination.put("", transferDestinationStr);  // SM Transfer destination field
0180   mergeType.put("", mergeTypeStr);                      // SM Transfer destination field
0181   hltErrorEvents.put("", 0);                            // Error events
0182 
0183   data.push_back(std::make_pair("", processedEvents));
0184   data.push_back(std::make_pair("", acceptedEvents));
0185   data.push_back(std::make_pair("", errorEvents));
0186   data.push_back(std::make_pair("", bitmask));
0187   data.push_back(std::make_pair("", fileList));
0188   data.push_back(std::make_pair("", fileSize));
0189   data.push_back(std::make_pair("", inputFiles));
0190   data.push_back(std::make_pair("", fileAdler32));
0191   data.push_back(std::make_pair("", transferDestination));
0192   data.push_back(std::make_pair("", mergeType));
0193   data.push_back(std::make_pair("", hltErrorEvents));
0194 
0195   pt.add_child("data", data);
0196 
0197   if (fms == nullptr) {
0198     pt.put("definition", "/fakeDefinition.jsn");
0199   } else {
0200     // The availability test of the EvFDaqDirector Service was done in the ctor.
0201     bfs::path outJsonDefName{
0202         edm::Service<evf::EvFDaqDirector>()->baseRunDir()};  //we assume this file is written bu the EvF Output module
0203     outJsonDefName /= (std::string("output_") + oss_pid.str() + std::string(".jsd"));
0204     pt.put("definition", outJsonDefName.string());
0205   }
0206 
0207   char sourceInfo[64];  //host and pid information
0208   sprintf(sourceInfo, "%s_%d", host, pid);
0209   pt.put("source", sourceInfo);
0210 
0211   return pt;
0212 }
0213 
0214 void DQMFileSaverPB::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0215   edm::ParameterSetDescription desc;
0216   desc.setComment("Saves histograms from DQM store, HLT->pb workflow.");
0217 
0218   desc.addUntracked<bool>("fakeFilterUnitMode", false)->setComment("If set, EvFDaqDirector is emulated and not used.");
0219 
0220   desc.addUntracked<std::string>("streamLabel", "streamDQMHistograms")->setComment("Label of the stream.");
0221 
0222   DQMFileSaverBase::fillDescription(desc);
0223 
0224   // Changed to use addDefault instead of add here because previously
0225   // DQMFileSaverOnline and DQMFileSaverPB both used the module label
0226   // "saver" which caused conflicting cfi filenames to be generated.
0227   // add could be used if unique module labels were given.
0228   descriptions.addDefault(desc);
0229 }
0230 
0231 void DQMFileSaverPB::savePB(DQMStore* store, std::string const& filename, int run, int lumi) const {
0232   using google::protobuf::io::FileOutputStream;
0233   using google::protobuf::io::GzipOutputStream;
0234   using google::protobuf::io::StringOutputStream;
0235 
0236   unsigned int nme = 0;
0237 
0238   dqmstorepb::ROOTFilePB dqmstore_message;
0239 
0240   // We save all histograms, indifferent of the lumi flag: even tough we save per lumi, this is a *snapshot*.
0241   auto mes = store->getAllContents("");
0242   for (auto const me : mes) {
0243     TBufferFile buffer(TBufferFile::kWrite);
0244     if (me->kind() < MonitorElement::Kind::TH1F) {
0245       TObjString object(me->tagString().c_str());
0246       buffer.WriteObject(&object);
0247     } else {
0248       buffer.WriteObject(me->getRootObject());
0249     }
0250     dqmstorepb::ROOTFilePB::Histo& histo = *dqmstore_message.add_histo();
0251     histo.set_full_pathname(me->getFullname());
0252     uint32_t flags = 0;
0253     flags |= (uint32_t)me->kind();
0254     if (me->getLumiFlag())
0255       flags |= DQMNet::DQM_PROP_LUMI;
0256     if (me->getEfficiencyFlag())
0257       flags |= DQMNet::DQM_PROP_EFFICIENCY_PLOT;
0258     histo.set_flags(flags);
0259     histo.set_size(buffer.Length());
0260 
0261     if (tag_ == "UNKNOWN") {
0262       histo.set_streamed_histo((void const*)buffer.Buffer(), buffer.Length());
0263     } else {
0264       // Compress ME blob with zlib
0265       int maxOutputSize = this->getMaxCompressedSize(buffer.Length());
0266       std::vector<char> compression_output(maxOutputSize);
0267       uLong total_out = this->compressME(buffer, maxOutputSize, compression_output.data());
0268       histo.set_streamed_histo(compression_output.data(), total_out);
0269     }
0270 
0271     // Save quality reports
0272     for (const auto& qr : me->getQReports()) {
0273       std::string result;
0274       // TODO: 64 is likely too short; memory corruption in the old code?
0275       char buf[64];
0276       std::snprintf(buf, sizeof(buf), "qr=st:%d:%.*g:", qr->getStatus(), DBL_DIG + 2, qr->getQTresult());
0277       result = '<' + me->getName() + '.' + qr->getQRName() + '>';
0278       result += buf;
0279       result += qr->getAlgorithm() + ':' + qr->getMessage();
0280       result += "</" + me->getName() + '.' + qr->getQRName() + '>';
0281       TObjString str(result.c_str());
0282 
0283       dqmstorepb::ROOTFilePB::Histo& qr_histo = *dqmstore_message.add_histo();
0284       TBufferFile qr_buffer(TBufferFile::kWrite);
0285       qr_buffer.WriteObject(&str);
0286       qr_histo.set_full_pathname(me->getFullname() + '.' + qr->getQRName());
0287       qr_histo.set_flags(static_cast<uint32_t>(MonitorElement::Kind::STRING));
0288       qr_histo.set_size(qr_buffer.Length());
0289       // qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
0290 
0291       if (tag_ == "UNKNOWN") {
0292         qr_histo.set_streamed_histo((void const*)qr_buffer.Buffer(), qr_buffer.Length());
0293       } else {
0294         // Compress ME blob with zlib
0295         int maxOutputSize = this->getMaxCompressedSize(qr_buffer.Length());
0296         char* compression_output = new char[maxOutputSize];
0297         uLong total_out = this->compressME(qr_buffer, maxOutputSize, compression_output);
0298         qr_histo.set_streamed_histo(compression_output, total_out);
0299         delete[] compression_output;
0300       }
0301     }
0302 
0303     // Save efficiency tag, if any.
0304     // XXX not supported by protobuf files.
0305 
0306     // Save tag if any.
0307     // XXX not supported by protobuf files.
0308 
0309     // Count saved histograms
0310     ++nme;
0311   }
0312 
0313   int filedescriptor =
0314       ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
0315   FileOutputStream file_stream(filedescriptor);
0316   if (tag_ == "UNKNOWN") {
0317     GzipOutputStream::Options options;
0318     options.format = GzipOutputStream::GZIP;
0319     options.compression_level = 1;
0320     GzipOutputStream gzip_stream(&file_stream, options);
0321     dqmstore_message.SerializeToZeroCopyStream(&gzip_stream);
0322 
0323     // Flush the internal streams & Close the file descriptor
0324     gzip_stream.Close();
0325     file_stream.Close();
0326   } else {
0327     // We zlib compressed individual MEs so no need to compress the entire file again.
0328     dqmstore_message.SerializeToZeroCopyStream(&file_stream);
0329 
0330     // Flush the internal stream & Close the file descriptor
0331     file_stream.Close();
0332   }
0333 
0334   // Maybe make some noise.
0335   edm::LogInfo("DQMFileSaverPB") << "savePB: successfully wrote " << nme << " objects  "
0336                                  << "into DQM file '" << filename << "'\n";
0337 }
0338 
0339 int DQMFileSaverPB::getMaxCompressedSize(int bufferSize) const {
0340   // When input data is very badly compressable, zlib will add overhead instead of reducing the size.
0341   // There is a minor amount of overhead (6 bytes overall and 5 bytes per 16K block) that is taken
0342   // into consideration here to find out potential absolute maximum size of the output.
0343   int n16kBlocks = (bufferSize + 16383) / 16384;  // round up any fraction of a block
0344   int maxOutputSize = bufferSize + 6 + (n16kBlocks * 5);
0345   return maxOutputSize;
0346 }
0347 
0348 ulong DQMFileSaverPB::compressME(const TBufferFile& buffer, int maxOutputSize, char* compression_output) const {
0349   z_stream deflateStream;
0350   deflateStream.zalloc = Z_NULL;
0351   deflateStream.zfree = Z_NULL;
0352   deflateStream.opaque = Z_NULL;
0353   deflateStream.avail_in = (uInt)buffer.Length() + 1;   // size of input, string + terminator
0354   deflateStream.next_in = (Bytef*)buffer.Buffer();      // input array
0355   deflateStream.avail_out = (uInt)maxOutputSize;        // size of output
0356   deflateStream.next_out = (Bytef*)compression_output;  // output array, result will be placed here
0357 
0358   // The actual compression
0359   deflateInit(&deflateStream, Z_BEST_COMPRESSION);
0360   deflate(&deflateStream, Z_FINISH);
0361   deflateEnd(&deflateStream);
0362 
0363   return deflateStream.total_out;
0364 }
0365 
0366 #include "FWCore/Framework/interface/MakerMacros.h"
0367 DEFINE_FWK_MODULE(DQMFileSaverPB);