Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:14

0001 #include "DQMProtobufReader.h"
0002 
0003 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0004 #include "DataFormats/Histograms/interface/DQMToken.h"
0005 #include "FWCore/Framework/interface/RunPrincipal.h"
0006 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0007 #include "FWCore/Framework/interface/LuminosityBlock.h"
0008 #include "FWCore/MessageLogger/interface/JobReport.h"
0009 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0010 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0012 #include "FWCore/Sources/interface/ProducerSourceBase.h"
0013 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0014 
0015 #include <cstdlib>
0016 #include <fcntl.h>
0017 #include <filesystem>
0018 #include <regex>
0019 
0020 #include <TBufferFile.h>
0021 
0022 #include <google/protobuf/io/coded_stream.h>
0023 #include <google/protobuf/io/gzip_stream.h>
0024 #include <google/protobuf/io/zero_copy_stream_impl.h>
0025 
0026 using namespace dqmservices;
0027 
0028 DQMProtobufReader::DQMProtobufReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0029     : PuttableSourceBase(pset, desc),
0030       fiterator_(pset),
0031       flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0032       flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0033       flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0034       flagLoadFiles_(pset.getUntrackedParameter<bool>("loadFiles")) {
0035   produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceDataPath");
0036   produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceJsonPath");
0037   produces<DQMToken, edm::Transition::BeginRun>("DQMGenerationRecoRun");
0038   produces<DQMToken, edm::Transition::BeginLuminosityBlock>("DQMGenerationRecoLumi");
0039 }
0040 
0041 edm::InputSource::ItemTypeInfo DQMProtobufReader::getNextItemType() {
0042   typedef DQMFileIterator::State State;
0043   typedef DQMFileIterator::LumiEntry LumiEntry;
0044 
0045   // fiterator_.logFileAction("getNextItemType");
0046 
0047   for (;;) {
0048     fiterator_.update_state();
0049 
0050     if (edm::shutdown_flag.load()) {
0051       fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0052       return InputSource::ItemType::IsStop;
0053     }
0054 
0055     // check for end of run file and force quit
0056     if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0057       return InputSource::ItemType::IsStop;
0058     }
0059 
0060     // check for end of run and quit if everything has been processed.
0061     // this is the clean exit
0062     if ((!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0063       return InputSource::ItemType::IsStop;
0064     }
0065 
0066     // skip to the next file if we have no files openned yet
0067     if (fiterator_.lumiReady()) {
0068       return InputSource::ItemType::IsLumi;
0069     }
0070 
0071     fiterator_.delay();
0072     // BUG: for an unknown reason it fails after a certain time if we use
0073     // IsSynchronize state
0074     //
0075     // comment out in order to block at this level
0076     // return InputSource::ItemType::IsSynchronize;
0077   }
0078 
0079   // this is unreachable
0080 }
0081 
0082 std::shared_ptr<edm::RunAuxiliary> DQMProtobufReader::readRunAuxiliary_() {
0083   // fiterator_.logFileAction("readRunAuxiliary_");
0084 
0085   edm::RunAuxiliary* aux = new edm::RunAuxiliary(fiterator_.runNumber(), edm::Timestamp(), edm::Timestamp());
0086   return std::shared_ptr<edm::RunAuxiliary>(aux);
0087 }
0088 
0089 void DQMProtobufReader::readRun_(edm::RunPrincipal& rpCache) {
0090   // fiterator_.logFileAction("readRun_");
0091   rpCache.fillRunPrincipal(processHistoryRegistryForUpdate());
0092 
0093   edm::Service<DQMStore> store;
0094   std::vector<MonitorElement*> allMEs = store->getAllContents("");
0095   for (auto const& ME : allMEs) {
0096     ME->Reset();
0097   }
0098 }
0099 
0100 std::shared_ptr<edm::LuminosityBlockAuxiliary> DQMProtobufReader::readLuminosityBlockAuxiliary_() {
0101   // fiterator_.logFileAction("readLuminosityBlockAuxiliary_");
0102 
0103   currentLumi_ = fiterator_.open();
0104   edm::LuminosityBlockAuxiliary* aux = new edm::LuminosityBlockAuxiliary(
0105       fiterator_.runNumber(), currentLumi_.file_ls, edm::Timestamp(), edm::Timestamp());
0106 
0107   return std::shared_ptr<edm::LuminosityBlockAuxiliary>(aux);
0108 }
0109 
0110 void DQMProtobufReader::readLuminosityBlock_(edm::LuminosityBlockPrincipal& lbCache) {
0111   // fiterator_.logFileAction("readLuminosityBlock_");
0112 
0113   edm::Service<edm::JobReport> jr;
0114   jr->reportInputLumiSection(lbCache.id().run(), lbCache.id().luminosityBlock());
0115   lbCache.fillLuminosityBlockPrincipal(processHistoryRegistry().getMapped(lbCache.aux().processHistoryID()));
0116 }
0117 
0118 void DQMProtobufReader::beginLuminosityBlock(edm::LuminosityBlock& lb) {
0119   edm::Service<DQMStore> store;
0120 
0121   // clear the old lumi histograms
0122   std::vector<MonitorElement*> allMEs = store->getAllContents("");
0123   for (auto const& ME : allMEs) {
0124     // We do not want to reset Run Products here!
0125     if (ME->getLumiFlag()) {
0126       ME->Reset();
0127     }
0128   }
0129 
0130   // load the new file
0131   std::string path = currentLumi_.get_data_path();
0132   std::string jspath = currentLumi_.get_json_path();
0133 
0134   std::unique_ptr<std::string> path_product(new std::string(path));
0135   std::unique_ptr<std::string> json_product(new std::string(jspath));
0136 
0137   lb.put(std::move(path_product), "sourceDataPath");
0138   lb.put(std::move(json_product), "sourceJsonPath");
0139 
0140   if (flagLoadFiles_) {
0141     if (!std::filesystem::exists(path)) {
0142       fiterator_.logFileAction("Data file is missing ", path);
0143       fiterator_.logLumiState(currentLumi_, "error: data file missing");
0144       return;
0145     }
0146 
0147     fiterator_.logFileAction("Initiating request to open file ", path);
0148     fiterator_.logFileAction("Successfully opened file ", path);
0149     load(&*store, path);
0150     fiterator_.logFileAction("Closed file ", path);
0151     fiterator_.logLumiState(currentLumi_, "close: ok");
0152   } else {
0153     fiterator_.logFileAction("Not loading the data file at source level ", path);
0154     fiterator_.logLumiState(currentLumi_, "close: not loading");
0155   }
0156 }
0157 
0158 void DQMProtobufReader::load(DQMStore* store, std::string filename) {
0159   using google::protobuf::io::ArrayInputStream;
0160   using google::protobuf::io::CodedInputStream;
0161   using google::protobuf::io::FileInputStream;
0162   using google::protobuf::io::FileOutputStream;
0163   using google::protobuf::io::GzipInputStream;
0164   using google::protobuf::io::GzipOutputStream;
0165 
0166   int filedescriptor;
0167   if ((filedescriptor = ::open(filename.c_str(), O_RDONLY)) == -1) {
0168     edm::LogError("DQMProtobufReader") << "File " << filename << " does not exist.";
0169   }
0170 
0171   dqmstorepb::ROOTFilePB dqmstore_message;
0172   FileInputStream fin(filedescriptor);
0173   GzipInputStream input(&fin);
0174   CodedInputStream input_coded(&input);
0175   input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0176   if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0177     edm::LogError("DQMProtobufReader") << "Fatal parsing file '" << filename << "'";
0178   }
0179 
0180   ::close(filedescriptor);
0181 
0182   for (int i = 0; i < dqmstore_message.histo_size(); ++i) {
0183     TObject* obj = nullptr;
0184     dqmstorepb::ROOTFilePB::Histo const& h = dqmstore_message.histo(i);
0185 
0186     size_t slash = h.full_pathname().rfind('/');
0187     size_t dirpos = (slash == std::string::npos ? 0 : slash);
0188     size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0189     std::string objname, dirname;
0190     dirname.assign(h.full_pathname(), 0, dirpos);
0191     objname.assign(h.full_pathname(), namepos, std::string::npos);
0192     TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE);
0193     buf.Reset();
0194     if (buf.Length() == buf.BufferSize()) {
0195       obj = nullptr;
0196     } else {
0197       buf.InitMap();
0198       void* ptr = buf.ReadObjectAny(nullptr);
0199       obj = reinterpret_cast<TObject*>(ptr);
0200     }
0201 
0202     if (!obj) {
0203       edm::LogError("DQMProtobufReader") << "Error reading element:'" << h.full_pathname();
0204     }
0205 
0206     store->setCurrentFolder(dirname);
0207 
0208     if (h.flags() & DQMNet::DQM_PROP_LUMI) {
0209       store->setScope(MonitorElementData::Scope::LUMI);
0210     } else {
0211       store->setScope(MonitorElementData::Scope::RUN);
0212     }
0213 
0214     if (obj) {
0215       int kind = h.flags() & DQMNet::DQM_PROP_TYPE_MASK;
0216       if (kind == DQMNet::DQM_PROP_TYPE_INT) {
0217         MonitorElement* me = store->bookInt(objname);
0218         auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0219         std::regex parseint{"<.*>i=(.*)</.*>"};
0220         std::smatch match;
0221         bool ok = std::regex_match(expression, match, parseint);
0222         if (!ok) {
0223           edm::LogError("DQMProtobufReader") << "Malformed object of type INT: '" << expression << "'";
0224           continue;
0225         }
0226         int value = std::atoi(match[1].str().c_str());
0227         me->Fill(value);
0228       } else if (kind == DQMNet::DQM_PROP_TYPE_REAL) {
0229         MonitorElement* me = store->bookFloat(objname);
0230         auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0231         std::regex parsefloat{"<.*>f=(.*)</.*>"};
0232         std::smatch match;
0233         bool ok = std::regex_match(expression, match, parsefloat);
0234         if (!ok) {
0235           edm::LogError("DQMProtobufReader") << "Malformed object of type REAL: '" << expression << "'";
0236           continue;
0237         }
0238         double value = std::atof(match[1].str().c_str());
0239         me->Fill(value);
0240       } else if (kind == DQMNet::DQM_PROP_TYPE_STRING) {
0241         auto value = static_cast<TObjString*>(obj)->String();
0242         store->bookString(objname, value);
0243       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1F) {
0244         auto value = static_cast<TH1F*>(obj);
0245         store->book1D(objname, value);
0246       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1S) {
0247         auto value = static_cast<TH1S*>(obj);
0248         store->book1S(objname, value);
0249       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1D) {
0250         auto value = static_cast<TH1D*>(obj);
0251         store->book1DD(objname, value);
0252       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1I) {
0253         auto value = static_cast<TH1I*>(obj);
0254         store->book1I(objname, value);
0255       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2F) {
0256         auto value = static_cast<TH2F*>(obj);
0257         store->book2D(objname, value);
0258       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2S) {
0259         auto value = static_cast<TH2S*>(obj);
0260         store->book2S(objname, value);
0261       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2D) {
0262         auto value = static_cast<TH2D*>(obj);
0263         store->book2DD(objname, value);
0264       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2I) {
0265         auto value = static_cast<TH2I*>(obj);
0266         store->book2I(objname, value);
0267       } else if (kind == DQMNet::DQM_PROP_TYPE_TH3F) {
0268         auto value = static_cast<TH3F*>(obj);
0269         store->book3D(objname, value);
0270       } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF) {
0271         auto value = static_cast<TProfile*>(obj);
0272         store->bookProfile(objname, value);
0273       } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF2D) {
0274         auto value = static_cast<TProfile2D*>(obj);
0275         store->bookProfile2D(objname, value);
0276       } else {
0277         edm::LogError("DQMProtobufReader") << "Unknown type: " << kind;
0278       }
0279       delete obj;
0280     }
0281   }
0282 }
0283 
0284 void DQMProtobufReader::readEvent_(edm::EventPrincipal&){};
0285 
0286 void DQMProtobufReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0287   edm::ParameterSetDescription desc;
0288 
0289   desc.setComment("Creates runs and lumis and fills the dqmstore from protocol buffer files.");
0290   edm::ProducerSourceBase::fillDescription(desc);
0291 
0292   desc.addUntracked<bool>("skipFirstLumis", false)
0293       ->setComment(
0294           "Skip (and ignore the minEventsPerLumi parameter) for the files which have been available at the begining of "
0295           "the processing. If set to true, the reader will open last available file for processing.");
0296 
0297   desc.addUntracked<bool>("deleteDatFiles", false)
0298       ->setComment("Delete data files after they have been closed, in order to save disk space.");
0299 
0300   desc.addUntracked<bool>("endOfRunKills", false)
0301       ->setComment(
0302           "Kill the processing as soon as the end-of-run file appears, even if there are/will be unprocessed "
0303           "lumisections.");
0304 
0305   desc.addUntracked<bool>("loadFiles", true)
0306       ->setComment(
0307           "Tells the source to load the data files. If set to False, the source will create skeleton lumi "
0308           "transitions.");
0309 
0310   DQMFileIterator::fillDescription(desc);
0311   descriptions.add("source", desc);
0312 }
0313 
0314 #include "FWCore/Framework/interface/InputSourceMacros.h"
0315 #include "FWCore/Framework/interface/MakerMacros.h"
0316 
0317 using dqmservices::DQMProtobufReader;
0318 DEFINE_FWK_INPUT_SOURCE(DQMProtobufReader);