File indexing completed on 2025-03-10 23:53:33
0001 #include "DQMProtobufReader.h"
0002
0003 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0004 #include "DataFormats/Histograms/interface/DQMToken.h"
0005 #include "FWCore/Framework/interface/RunPrincipal.h"
0006 #include "FWCore/Framework/interface/LuminosityBlockPrincipal.h"
0007 #include "FWCore/Framework/interface/LuminosityBlock.h"
0008 #include "FWCore/MessageLogger/interface/JobReport.h"
0009 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0010 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
0011 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
0012 #include "FWCore/Sources/interface/ProducerSourceBase.h"
0013 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0014
0015 #include <cstdlib>
0016 #include <fcntl.h>
0017 #include <filesystem>
0018 #include <regex>
0019
0020 #include <TBufferFile.h>
0021
0022 #include <google/protobuf/io/coded_stream.h>
0023 #include <google/protobuf/io/gzip_stream.h>
0024 #include <google/protobuf/io/zero_copy_stream_impl.h>
0025
0026 using namespace dqmservices;
0027
0028 DQMProtobufReader::DQMProtobufReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0029 : PuttableSourceBase(pset, desc),
0030 fiterator_(pset),
0031 flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
0032 flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
0033 flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
0034 flagLoadFiles_(pset.getUntrackedParameter<bool>("loadFiles")) {
0035 produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceDataPath");
0036 produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceJsonPath");
0037 produces<DQMToken, edm::Transition::BeginRun>("DQMGenerationRecoRun");
0038 produces<DQMToken, edm::Transition::BeginLuminosityBlock>("DQMGenerationRecoLumi");
0039 }
0040
0041 edm::InputSource::ItemTypeInfo DQMProtobufReader::getNextItemType() {
0042 typedef DQMFileIterator::State State;
0043 typedef DQMFileIterator::LumiEntry LumiEntry;
0044
0045
0046
0047 for (;;) {
0048 fiterator_.update_state();
0049
0050 if (edm::shutdown_flag.load()) {
0051 fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0052 return InputSource::ItemType::IsStop;
0053 }
0054
0055
0056 if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0057 return InputSource::ItemType::IsStop;
0058 }
0059
0060
0061
0062 if ((!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0063 return InputSource::ItemType::IsStop;
0064 }
0065
0066
0067 if (fiterator_.lumiReady()) {
0068 return InputSource::ItemType::IsLumi;
0069 }
0070
0071 fiterator_.delay();
0072
0073
0074
0075
0076
0077 }
0078
0079
0080 }
0081
0082 std::shared_ptr<edm::RunAuxiliary> DQMProtobufReader::readRunAuxiliary_() {
0083
0084
0085 edm::RunAuxiliary* aux = new edm::RunAuxiliary(fiterator_.runNumber(), edm::Timestamp(), edm::Timestamp());
0086 return std::shared_ptr<edm::RunAuxiliary>(aux);
0087 }
0088
0089 void DQMProtobufReader::readRun_(edm::RunPrincipal& rpCache) {
0090
0091 rpCache.fillRunPrincipal(processHistoryRegistryForUpdate());
0092
0093 edm::Service<DQMStore> store;
0094 std::vector<MonitorElement*> allMEs = store->getAllContents("");
0095 for (auto const& ME : allMEs) {
0096 ME->Reset();
0097 }
0098 }
0099
0100 std::shared_ptr<edm::LuminosityBlockAuxiliary> DQMProtobufReader::readLuminosityBlockAuxiliary_() {
0101
0102
0103 currentLumi_ = fiterator_.open();
0104 edm::LuminosityBlockAuxiliary* aux = new edm::LuminosityBlockAuxiliary(
0105 fiterator_.runNumber(), currentLumi_.file_ls, edm::Timestamp(), edm::Timestamp());
0106
0107 return std::shared_ptr<edm::LuminosityBlockAuxiliary>(aux);
0108 }
0109
0110 void DQMProtobufReader::readLuminosityBlock_(edm::LuminosityBlockPrincipal& lbCache) {
0111
0112
0113 edm::Service<edm::JobReport> jr;
0114 jr->reportInputLumiSection(lbCache.id().run(), lbCache.id().luminosityBlock());
0115 lbCache.fillLuminosityBlockPrincipal(processHistoryRegistry().getMapped(lbCache.aux().processHistoryID()));
0116 }
0117
0118 void DQMProtobufReader::beginLuminosityBlock(edm::LuminosityBlock& lb) {
0119 edm::Service<DQMStore> store;
0120
0121
0122 std::vector<MonitorElement*> allMEs = store->getAllContents("");
0123 for (auto const& ME : allMEs) {
0124
0125 if (ME->getLumiFlag()) {
0126 ME->Reset();
0127 }
0128 }
0129
0130
0131 std::string path = currentLumi_.get_data_path();
0132 std::string jspath = currentLumi_.get_json_path();
0133
0134 std::unique_ptr<std::string> path_product(new std::string(path));
0135 std::unique_ptr<std::string> json_product(new std::string(jspath));
0136
0137 lb.put(std::move(path_product), "sourceDataPath");
0138 lb.put(std::move(json_product), "sourceJsonPath");
0139
0140 if (flagLoadFiles_) {
0141 if (!std::filesystem::exists(path)) {
0142 fiterator_.logFileAction("Data file is missing ", path);
0143 fiterator_.logLumiState(currentLumi_, "error: data file missing");
0144 return;
0145 }
0146
0147 fiterator_.logFileAction("Initiating request to open file ", path);
0148 fiterator_.logFileAction("Successfully opened file ", path);
0149 load(&*store, path);
0150 fiterator_.logFileAction("Closed file ", path);
0151 fiterator_.logLumiState(currentLumi_, "close: ok");
0152 } else {
0153 fiterator_.logFileAction("Not loading the data file at source level ", path);
0154 fiterator_.logLumiState(currentLumi_, "close: not loading");
0155 }
0156 }
0157
0158 void DQMProtobufReader::load(DQMStore* store, std::string filename) {
0159 using google::protobuf::io::ArrayInputStream;
0160 using google::protobuf::io::CodedInputStream;
0161 using google::protobuf::io::FileInputStream;
0162 using google::protobuf::io::FileOutputStream;
0163 using google::protobuf::io::GzipInputStream;
0164 using google::protobuf::io::GzipOutputStream;
0165
0166 int filedescriptor;
0167 if ((filedescriptor = ::open(filename.c_str(), O_RDONLY)) == -1) {
0168 edm::LogError("DQMProtobufReader") << "File " << filename << " does not exist.";
0169 }
0170
0171 dqmstorepb::ROOTFilePB dqmstore_message;
0172 FileInputStream fin(filedescriptor);
0173 GzipInputStream input(&fin);
0174 CodedInputStream input_coded(&input);
0175 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0176 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0177 edm::LogError("DQMProtobufReader") << "Fatal parsing file '" << filename << "'";
0178 }
0179
0180 ::close(filedescriptor);
0181
0182 for (int i = 0; i < dqmstore_message.histo_size(); ++i) {
0183 TObject* obj = nullptr;
0184 dqmstorepb::ROOTFilePB::Histo const& h = dqmstore_message.histo(i);
0185
0186 size_t slash = h.full_pathname().rfind('/');
0187 size_t dirpos = (slash == std::string::npos ? 0 : slash);
0188 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0189 std::string objname, dirname;
0190 dirname.assign(h.full_pathname(), 0, dirpos);
0191 objname.assign(h.full_pathname(), namepos, std::string::npos);
0192 TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE);
0193 buf.Reset();
0194 if (buf.Length() == buf.BufferSize()) {
0195 obj = nullptr;
0196 } else {
0197 buf.InitMap();
0198 void* ptr = buf.ReadObjectAny(nullptr);
0199 obj = reinterpret_cast<TObject*>(ptr);
0200 }
0201
0202 if (!obj) {
0203 edm::LogError("DQMProtobufReader") << "Error reading element:'" << h.full_pathname();
0204 }
0205
0206 store->setCurrentFolder(dirname);
0207
0208 if (h.flags() & DQMNet::DQM_PROP_LUMI) {
0209 store->setScope(MonitorElementData::Scope::LUMI);
0210 } else {
0211 store->setScope(MonitorElementData::Scope::RUN);
0212 }
0213
0214 if (obj) {
0215 int kind = h.flags() & DQMNet::DQM_PROP_TYPE_MASK;
0216 if (kind == DQMNet::DQM_PROP_TYPE_INT) {
0217 MonitorElement* me = store->bookInt(objname);
0218 auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0219 std::regex parseint{"<.*>i=(.*)</.*>"};
0220 std::smatch match;
0221 bool ok = std::regex_match(expression, match, parseint);
0222 if (!ok) {
0223 edm::LogError("DQMProtobufReader") << "Malformed object of type INT: '" << expression << "'";
0224 continue;
0225 }
0226 int value = std::atoi(match[1].str().c_str());
0227 me->Fill(value);
0228 } else if (kind == DQMNet::DQM_PROP_TYPE_REAL) {
0229 MonitorElement* me = store->bookFloat(objname);
0230 auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0231 std::regex parsefloat{"<.*>f=(.*)</.*>"};
0232 std::smatch match;
0233 bool ok = std::regex_match(expression, match, parsefloat);
0234 if (!ok) {
0235 edm::LogError("DQMProtobufReader") << "Malformed object of type REAL: '" << expression << "'";
0236 continue;
0237 }
0238 double value = std::atof(match[1].str().c_str());
0239 me->Fill(value);
0240 } else if (kind == DQMNet::DQM_PROP_TYPE_STRING) {
0241 auto value = static_cast<TObjString*>(obj)->String();
0242 store->bookString(objname, value);
0243 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1F) {
0244 auto value = static_cast<TH1F*>(obj);
0245 store->book1D(objname, value);
0246 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1S) {
0247 auto value = static_cast<TH1S*>(obj);
0248 store->book1S(objname, value);
0249 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1D) {
0250 auto value = static_cast<TH1D*>(obj);
0251 store->book1DD(objname, value);
0252 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1I) {
0253 auto value = static_cast<TH1I*>(obj);
0254 store->book1I(objname, value);
0255 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2F) {
0256 auto value = static_cast<TH2F*>(obj);
0257 store->book2D(objname, value);
0258 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2S) {
0259 auto value = static_cast<TH2S*>(obj);
0260 store->book2S(objname, value);
0261 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2D) {
0262 auto value = static_cast<TH2D*>(obj);
0263 store->book2DD(objname, value);
0264 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2Poly) {
0265 auto value = static_cast<TH2Poly*>(obj);
0266 store->book2DPoly(objname, value);
0267 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2I) {
0268 auto value = static_cast<TH2I*>(obj);
0269 store->book2I(objname, value);
0270 } else if (kind == DQMNet::DQM_PROP_TYPE_TH3F) {
0271 auto value = static_cast<TH3F*>(obj);
0272 store->book3D(objname, value);
0273 } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF) {
0274 auto value = static_cast<TProfile*>(obj);
0275 store->bookProfile(objname, value);
0276 } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF2D) {
0277 auto value = static_cast<TProfile2D*>(obj);
0278 store->bookProfile2D(objname, value);
0279 } else {
0280 edm::LogError("DQMProtobufReader") << "Unknown type: " << kind;
0281 }
0282 delete obj;
0283 }
0284 }
0285 }
0286
0287 void DQMProtobufReader::readEvent_(edm::EventPrincipal&) {}
0288
0289 void DQMProtobufReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0290 edm::ParameterSetDescription desc;
0291
0292 desc.setComment("Creates runs and lumis and fills the dqmstore from protocol buffer files.");
0293 edm::ProducerSourceBase::fillDescription(desc);
0294
0295 desc.addUntracked<bool>("skipFirstLumis", false)
0296 ->setComment(
0297 "Skip (and ignore the minEventsPerLumi parameter) for the files which have been available at the begining of "
0298 "the processing. If set to true, the reader will open last available file for processing.");
0299
0300 desc.addUntracked<bool>("deleteDatFiles", false)
0301 ->setComment("Delete data files after they have been closed, in order to save disk space.");
0302
0303 desc.addUntracked<bool>("endOfRunKills", false)
0304 ->setComment(
0305 "Kill the processing as soon as the end-of-run file appears, even if there are/will be unprocessed "
0306 "lumisections.");
0307
0308 desc.addUntracked<bool>("loadFiles", true)
0309 ->setComment(
0310 "Tells the source to load the data files. If set to False, the source will create skeleton lumi "
0311 "transitions.");
0312
0313 DQMFileIterator::fillDescription(desc);
0314 descriptions.add("source", desc);
0315 }
0316
0317 #include "FWCore/Framework/interface/InputSourceMacros.h"
0318 #include "FWCore/Framework/interface/MakerMacros.h"
0319
0320 using dqmservices::DQMProtobufReader;
0321 DEFINE_FWK_INPUT_SOURCE(DQMProtobufReader);