File indexing completed on 2022-05-01 22:51:58
0001 #include "DQMProtobufReader.h"
0002
0003 #include "FWCore/MessageLogger/interface/JobReport.h"
0004 #include "DQMServices/Core/interface/DQMStore.h"
0005 #include "DataFormats/Histograms/interface/DQMToken.h"
0006
0007 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0008
0009
0010 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0011 #include <google/protobuf/io/coded_stream.h>
0012 #include <google/protobuf/io/gzip_stream.h>
0013 #include <google/protobuf/io/zero_copy_stream_impl.h>
0014
0015 #include "TBufferFile.h"
0016
0017 #include <regex>
0018 #include <cstdlib>
0019
0020 using namespace dqmservices;
0021
0022 DQMProtobufReader::DQMProtobufReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0023 : PuttableSourceBase(pset, desc), fiterator_(pset) {
0024 flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
0025 flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
0026 flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
0027 flagLoadFiles_ = pset.getUntrackedParameter<bool>("loadFiles");
0028
0029 produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceDataPath");
0030 produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceJsonPath");
0031 produces<DQMToken, edm::Transition::BeginRun>("DQMGenerationRecoRun");
0032 produces<DQMToken, edm::Transition::BeginLuminosityBlock>("DQMGenerationRecoLumi");
0033 }
0034
0035 DQMProtobufReader::~DQMProtobufReader() {}
0036
0037 edm::InputSource::ItemType DQMProtobufReader::getNextItemType() {
0038 typedef DQMFileIterator::State State;
0039 typedef DQMFileIterator::LumiEntry LumiEntry;
0040
0041
0042
0043 for (;;) {
0044 fiterator_.update_state();
0045
0046 if (edm::shutdown_flag.load()) {
0047 fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0048 return InputSource::IsStop;
0049 }
0050
0051
0052 if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0053 return InputSource::IsStop;
0054 }
0055
0056
0057
0058 if ((!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0059 return InputSource::IsStop;
0060 }
0061
0062
0063 if (fiterator_.lumiReady()) {
0064 return InputSource::IsLumi;
0065 }
0066
0067 fiterator_.delay();
0068
0069
0070
0071
0072
0073 }
0074
0075
0076 }
0077
0078 std::shared_ptr<edm::RunAuxiliary> DQMProtobufReader::readRunAuxiliary_() {
0079
0080
0081 edm::RunAuxiliary* aux = new edm::RunAuxiliary(fiterator_.runNumber(), edm::Timestamp(), edm::Timestamp());
0082 return std::shared_ptr<edm::RunAuxiliary>(aux);
0083 }
0084
0085 void DQMProtobufReader::readRun_(edm::RunPrincipal& rpCache) {
0086
0087 rpCache.fillRunPrincipal(processHistoryRegistryForUpdate());
0088
0089 edm::Service<DQMStore> store;
0090 std::vector<MonitorElement*> allMEs = store->getAllContents("");
0091 for (auto const& ME : allMEs) {
0092 ME->Reset();
0093 }
0094 }
0095
0096 std::shared_ptr<edm::LuminosityBlockAuxiliary> DQMProtobufReader::readLuminosityBlockAuxiliary_() {
0097
0098
0099 currentLumi_ = fiterator_.open();
0100 edm::LuminosityBlockAuxiliary* aux = new edm::LuminosityBlockAuxiliary(
0101 fiterator_.runNumber(), currentLumi_.file_ls, edm::Timestamp(), edm::Timestamp());
0102
0103 return std::shared_ptr<edm::LuminosityBlockAuxiliary>(aux);
0104 }
0105
0106 void DQMProtobufReader::readLuminosityBlock_(edm::LuminosityBlockPrincipal& lbCache) {
0107
0108
0109 edm::Service<edm::JobReport> jr;
0110 jr->reportInputLumiSection(lbCache.id().run(), lbCache.id().luminosityBlock());
0111 lbCache.fillLuminosityBlockPrincipal(processHistoryRegistry().getMapped(lbCache.aux().processHistoryID()));
0112 }
0113
0114 void DQMProtobufReader::beginLuminosityBlock(edm::LuminosityBlock& lb) {
0115 edm::Service<DQMStore> store;
0116
0117
0118 std::vector<MonitorElement*> allMEs = store->getAllContents("");
0119 for (auto const& ME : allMEs) {
0120
0121 if (ME->getLumiFlag()) {
0122 ME->Reset();
0123 }
0124 }
0125
0126
0127 std::string path = currentLumi_.get_data_path();
0128 std::string jspath = currentLumi_.get_json_path();
0129
0130 std::unique_ptr<std::string> path_product(new std::string(path));
0131 std::unique_ptr<std::string> json_product(new std::string(jspath));
0132
0133 lb.put(std::move(path_product), "sourceDataPath");
0134 lb.put(std::move(json_product), "sourceJsonPath");
0135
0136 if (flagLoadFiles_) {
0137 if (!std::filesystem::exists(path)) {
0138 fiterator_.logFileAction("Data file is missing ", path);
0139 fiterator_.logLumiState(currentLumi_, "error: data file missing");
0140 return;
0141 }
0142
0143 fiterator_.logFileAction("Initiating request to open file ", path);
0144 fiterator_.logFileAction("Successfully opened file ", path);
0145 load(&*store, path);
0146 fiterator_.logFileAction("Closed file ", path);
0147 fiterator_.logLumiState(currentLumi_, "close: ok");
0148 } else {
0149 fiterator_.logFileAction("Not loading the data file at source level ", path);
0150 fiterator_.logLumiState(currentLumi_, "close: not loading");
0151 }
0152 }
0153
0154 void DQMProtobufReader::load(DQMStore* store, std::string filename) {
0155 using google::protobuf::io::ArrayInputStream;
0156 using google::protobuf::io::CodedInputStream;
0157 using google::protobuf::io::FileInputStream;
0158 using google::protobuf::io::FileOutputStream;
0159 using google::protobuf::io::GzipInputStream;
0160 using google::protobuf::io::GzipOutputStream;
0161
0162 int filedescriptor;
0163 if ((filedescriptor = ::open(filename.c_str(), O_RDONLY)) == -1) {
0164 edm::LogError("DQMProtobufReader") << "File " << filename << " does not exist.";
0165 }
0166
0167 dqmstorepb::ROOTFilePB dqmstore_message;
0168 FileInputStream fin(filedescriptor);
0169 GzipInputStream input(&fin);
0170 CodedInputStream input_coded(&input);
0171 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0172 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0173 edm::LogError("DQMProtobufReader") << "Fatal parsing file '" << filename << "'";
0174 }
0175
0176 ::close(filedescriptor);
0177
0178 for (int i = 0; i < dqmstore_message.histo_size(); ++i) {
0179 TObject* obj = nullptr;
0180 dqmstorepb::ROOTFilePB::Histo const& h = dqmstore_message.histo(i);
0181
0182 size_t slash = h.full_pathname().rfind('/');
0183 size_t dirpos = (slash == std::string::npos ? 0 : slash);
0184 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0185 std::string objname, dirname;
0186 dirname.assign(h.full_pathname(), 0, dirpos);
0187 objname.assign(h.full_pathname(), namepos, std::string::npos);
0188 TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE);
0189 buf.Reset();
0190 if (buf.Length() == buf.BufferSize()) {
0191 obj = nullptr;
0192 } else {
0193 buf.InitMap();
0194 void* ptr = buf.ReadObjectAny(nullptr);
0195 obj = reinterpret_cast<TObject*>(ptr);
0196 }
0197
0198 if (!obj) {
0199 edm::LogError("DQMProtobufReader") << "Error reading element:'" << h.full_pathname();
0200 }
0201
0202 store->setCurrentFolder(dirname);
0203
0204 if (h.flags() & DQMNet::DQM_PROP_LUMI) {
0205 store->setScope(MonitorElementData::Scope::LUMI);
0206 } else {
0207 store->setScope(MonitorElementData::Scope::RUN);
0208 }
0209
0210 if (obj) {
0211 int kind = h.flags() & DQMNet::DQM_PROP_TYPE_MASK;
0212 if (kind == DQMNet::DQM_PROP_TYPE_INT) {
0213 MonitorElement* me = store->bookInt(objname);
0214 auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0215 std::regex parseint{"<.*>i=(.*)</.*>"};
0216 std::smatch match;
0217 bool ok = std::regex_match(expression, match, parseint);
0218 if (!ok) {
0219 edm::LogError("DQMProtobufReader") << "Malformed object of type INT: '" << expression << "'";
0220 continue;
0221 }
0222 int value = std::atoi(match[1].str().c_str());
0223 me->Fill(value);
0224 } else if (kind == DQMNet::DQM_PROP_TYPE_REAL) {
0225 MonitorElement* me = store->bookFloat(objname);
0226 auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0227 std::regex parsefloat{"<.*>f=(.*)</.*>"};
0228 std::smatch match;
0229 bool ok = std::regex_match(expression, match, parsefloat);
0230 if (!ok) {
0231 edm::LogError("DQMProtobufReader") << "Malformed object of type REAL: '" << expression << "'";
0232 continue;
0233 }
0234 double value = std::atof(match[1].str().c_str());
0235 me->Fill(value);
0236 } else if (kind == DQMNet::DQM_PROP_TYPE_STRING) {
0237 auto value = static_cast<TObjString*>(obj)->String();
0238 store->bookString(objname, value);
0239 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1F) {
0240 auto value = static_cast<TH1F*>(obj);
0241 store->book1D(objname, value);
0242 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1S) {
0243 auto value = static_cast<TH1S*>(obj);
0244 store->book1S(objname, value);
0245 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1D) {
0246 auto value = static_cast<TH1D*>(obj);
0247 store->book1DD(objname, value);
0248 } else if (kind == DQMNet::DQM_PROP_TYPE_TH1I) {
0249 auto value = static_cast<TH1I*>(obj);
0250 store->book1I(objname, value);
0251 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2F) {
0252 auto value = static_cast<TH2F*>(obj);
0253 store->book2D(objname, value);
0254 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2S) {
0255 auto value = static_cast<TH2S*>(obj);
0256 store->book2S(objname, value);
0257 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2D) {
0258 auto value = static_cast<TH2D*>(obj);
0259 store->book2DD(objname, value);
0260 } else if (kind == DQMNet::DQM_PROP_TYPE_TH2I) {
0261 auto value = static_cast<TH2I*>(obj);
0262 store->book2I(objname, value);
0263 } else if (kind == DQMNet::DQM_PROP_TYPE_TH3F) {
0264 auto value = static_cast<TH3F*>(obj);
0265 store->book3D(objname, value);
0266 } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF) {
0267 auto value = static_cast<TProfile*>(obj);
0268 store->bookProfile(objname, value);
0269 } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF2D) {
0270 auto value = static_cast<TProfile2D*>(obj);
0271 store->bookProfile2D(objname, value);
0272 } else {
0273 edm::LogError("DQMProtobufReader") << "Unknown type: " << kind;
0274 }
0275 delete obj;
0276 }
0277 }
0278 }
0279
0280 void DQMProtobufReader::readEvent_(edm::EventPrincipal&){};
0281
0282 void DQMProtobufReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0283 edm::ParameterSetDescription desc;
0284
0285 desc.setComment(
0286 "Creates runs and lumis and fills the dqmstore from protocol buffer "
0287 "files.");
0288 edm::ProducerSourceBase::fillDescription(desc);
0289
0290 desc.addUntracked<bool>("skipFirstLumis", false)
0291 ->setComment(
0292 "Skip (and ignore the minEventsPerLumi parameter) for the files "
0293 "which have been available at the begining of the processing. "
0294 "If set to true, the reader will open last available file for "
0295 "processing.");
0296
0297 desc.addUntracked<bool>("deleteDatFiles", false)
0298 ->setComment(
0299 "Delete data files after they have been closed, in order to "
0300 "save disk space.");
0301
0302 desc.addUntracked<bool>("endOfRunKills", false)
0303 ->setComment(
0304 "Kill the processing as soon as the end-of-run file appears, even if "
0305 "there are/will be unprocessed lumisections.");
0306
0307 desc.addUntracked<bool>("loadFiles", true)
0308 ->setComment(
0309 "Tells the source load the data files. If set to false, source will create skeleton lumi transitions.");
0310
0311 DQMFileIterator::fillDescription(desc);
0312 descriptions.add("source", desc);
0313 }
0314
0315 #include "FWCore/Framework/interface/InputSourceMacros.h"
0316 #include "FWCore/Framework/interface/MakerMacros.h"
0317
0318 using dqmservices::DQMProtobufReader;
0319 DEFINE_FWK_INPUT_SOURCE(DQMProtobufReader);