Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2022-05-01 22:51:58

0001 #include "DQMProtobufReader.h"
0002 
0003 #include "FWCore/MessageLogger/interface/JobReport.h"
0004 #include "DQMServices/Core/interface/DQMStore.h"
0005 #include "DataFormats/Histograms/interface/DQMToken.h"
0006 
0007 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
0008 // #include "FWCore/Sources/interface/ProducerSourceBase.h"
0009 
0010 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0011 #include <google/protobuf/io/coded_stream.h>
0012 #include <google/protobuf/io/gzip_stream.h>
0013 #include <google/protobuf/io/zero_copy_stream_impl.h>
0014 
0015 #include "TBufferFile.h"
0016 
0017 #include <regex>
0018 #include <cstdlib>
0019 
0020 using namespace dqmservices;
0021 
0022 DQMProtobufReader::DQMProtobufReader(edm::ParameterSet const& pset, edm::InputSourceDescription const& desc)
0023     : PuttableSourceBase(pset, desc), fiterator_(pset) {
0024   flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
0025   flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
0026   flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
0027   flagLoadFiles_ = pset.getUntrackedParameter<bool>("loadFiles");
0028 
0029   produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceDataPath");
0030   produces<std::string, edm::Transition::BeginLuminosityBlock>("sourceJsonPath");
0031   produces<DQMToken, edm::Transition::BeginRun>("DQMGenerationRecoRun");
0032   produces<DQMToken, edm::Transition::BeginLuminosityBlock>("DQMGenerationRecoLumi");
0033 }
0034 
0035 DQMProtobufReader::~DQMProtobufReader() {}
0036 
0037 edm::InputSource::ItemType DQMProtobufReader::getNextItemType() {
0038   typedef DQMFileIterator::State State;
0039   typedef DQMFileIterator::LumiEntry LumiEntry;
0040 
0041   // fiterator_.logFileAction("getNextItemType");
0042 
0043   for (;;) {
0044     fiterator_.update_state();
0045 
0046     if (edm::shutdown_flag.load()) {
0047       fiterator_.logFileAction("Shutdown flag was set, shutting down.");
0048       return InputSource::IsStop;
0049     }
0050 
0051     // check for end of run file and force quit
0052     if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
0053       return InputSource::IsStop;
0054     }
0055 
0056     // check for end of run and quit if everything has been processed.
0057     // this is the clean exit
0058     if ((!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
0059       return InputSource::IsStop;
0060     }
0061 
0062     // skip to the next file if we have no files openned yet
0063     if (fiterator_.lumiReady()) {
0064       return InputSource::IsLumi;
0065     }
0066 
0067     fiterator_.delay();
0068     // BUG: for an unknown reason it fails after a certain time if we use
0069     // IsSynchronize state
0070     //
0071     // comment out in order to block at this level
0072     // return InputSource::IsSynchronize;
0073   }
0074 
0075   // this is unreachable
0076 }
0077 
0078 std::shared_ptr<edm::RunAuxiliary> DQMProtobufReader::readRunAuxiliary_() {
0079   // fiterator_.logFileAction("readRunAuxiliary_");
0080 
0081   edm::RunAuxiliary* aux = new edm::RunAuxiliary(fiterator_.runNumber(), edm::Timestamp(), edm::Timestamp());
0082   return std::shared_ptr<edm::RunAuxiliary>(aux);
0083 }
0084 
0085 void DQMProtobufReader::readRun_(edm::RunPrincipal& rpCache) {
0086   // fiterator_.logFileAction("readRun_");
0087   rpCache.fillRunPrincipal(processHistoryRegistryForUpdate());
0088 
0089   edm::Service<DQMStore> store;
0090   std::vector<MonitorElement*> allMEs = store->getAllContents("");
0091   for (auto const& ME : allMEs) {
0092     ME->Reset();
0093   }
0094 }
0095 
0096 std::shared_ptr<edm::LuminosityBlockAuxiliary> DQMProtobufReader::readLuminosityBlockAuxiliary_() {
0097   // fiterator_.logFileAction("readLuminosityBlockAuxiliary_");
0098 
0099   currentLumi_ = fiterator_.open();
0100   edm::LuminosityBlockAuxiliary* aux = new edm::LuminosityBlockAuxiliary(
0101       fiterator_.runNumber(), currentLumi_.file_ls, edm::Timestamp(), edm::Timestamp());
0102 
0103   return std::shared_ptr<edm::LuminosityBlockAuxiliary>(aux);
0104 }
0105 
0106 void DQMProtobufReader::readLuminosityBlock_(edm::LuminosityBlockPrincipal& lbCache) {
0107   // fiterator_.logFileAction("readLuminosityBlock_");
0108 
0109   edm::Service<edm::JobReport> jr;
0110   jr->reportInputLumiSection(lbCache.id().run(), lbCache.id().luminosityBlock());
0111   lbCache.fillLuminosityBlockPrincipal(processHistoryRegistry().getMapped(lbCache.aux().processHistoryID()));
0112 }
0113 
0114 void DQMProtobufReader::beginLuminosityBlock(edm::LuminosityBlock& lb) {
0115   edm::Service<DQMStore> store;
0116 
0117   // clear the old lumi histograms
0118   std::vector<MonitorElement*> allMEs = store->getAllContents("");
0119   for (auto const& ME : allMEs) {
0120     // We do not want to reset Run Products here!
0121     if (ME->getLumiFlag()) {
0122       ME->Reset();
0123     }
0124   }
0125 
0126   // load the new file
0127   std::string path = currentLumi_.get_data_path();
0128   std::string jspath = currentLumi_.get_json_path();
0129 
0130   std::unique_ptr<std::string> path_product(new std::string(path));
0131   std::unique_ptr<std::string> json_product(new std::string(jspath));
0132 
0133   lb.put(std::move(path_product), "sourceDataPath");
0134   lb.put(std::move(json_product), "sourceJsonPath");
0135 
0136   if (flagLoadFiles_) {
0137     if (!std::filesystem::exists(path)) {
0138       fiterator_.logFileAction("Data file is missing ", path);
0139       fiterator_.logLumiState(currentLumi_, "error: data file missing");
0140       return;
0141     }
0142 
0143     fiterator_.logFileAction("Initiating request to open file ", path);
0144     fiterator_.logFileAction("Successfully opened file ", path);
0145     load(&*store, path);
0146     fiterator_.logFileAction("Closed file ", path);
0147     fiterator_.logLumiState(currentLumi_, "close: ok");
0148   } else {
0149     fiterator_.logFileAction("Not loading the data file at source level ", path);
0150     fiterator_.logLumiState(currentLumi_, "close: not loading");
0151   }
0152 }
0153 
0154 void DQMProtobufReader::load(DQMStore* store, std::string filename) {
0155   using google::protobuf::io::ArrayInputStream;
0156   using google::protobuf::io::CodedInputStream;
0157   using google::protobuf::io::FileInputStream;
0158   using google::protobuf::io::FileOutputStream;
0159   using google::protobuf::io::GzipInputStream;
0160   using google::protobuf::io::GzipOutputStream;
0161 
0162   int filedescriptor;
0163   if ((filedescriptor = ::open(filename.c_str(), O_RDONLY)) == -1) {
0164     edm::LogError("DQMProtobufReader") << "File " << filename << " does not exist.";
0165   }
0166 
0167   dqmstorepb::ROOTFilePB dqmstore_message;
0168   FileInputStream fin(filedescriptor);
0169   GzipInputStream input(&fin);
0170   CodedInputStream input_coded(&input);
0171   input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0172   if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0173     edm::LogError("DQMProtobufReader") << "Fatal parsing file '" << filename << "'";
0174   }
0175 
0176   ::close(filedescriptor);
0177 
0178   for (int i = 0; i < dqmstore_message.histo_size(); ++i) {
0179     TObject* obj = nullptr;
0180     dqmstorepb::ROOTFilePB::Histo const& h = dqmstore_message.histo(i);
0181 
0182     size_t slash = h.full_pathname().rfind('/');
0183     size_t dirpos = (slash == std::string::npos ? 0 : slash);
0184     size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0185     std::string objname, dirname;
0186     dirname.assign(h.full_pathname(), 0, dirpos);
0187     objname.assign(h.full_pathname(), namepos, std::string::npos);
0188     TBufferFile buf(TBufferFile::kRead, h.size(), (void*)h.streamed_histo().data(), kFALSE);
0189     buf.Reset();
0190     if (buf.Length() == buf.BufferSize()) {
0191       obj = nullptr;
0192     } else {
0193       buf.InitMap();
0194       void* ptr = buf.ReadObjectAny(nullptr);
0195       obj = reinterpret_cast<TObject*>(ptr);
0196     }
0197 
0198     if (!obj) {
0199       edm::LogError("DQMProtobufReader") << "Error reading element:'" << h.full_pathname();
0200     }
0201 
0202     store->setCurrentFolder(dirname);
0203 
0204     if (h.flags() & DQMNet::DQM_PROP_LUMI) {
0205       store->setScope(MonitorElementData::Scope::LUMI);
0206     } else {
0207       store->setScope(MonitorElementData::Scope::RUN);
0208     }
0209 
0210     if (obj) {
0211       int kind = h.flags() & DQMNet::DQM_PROP_TYPE_MASK;
0212       if (kind == DQMNet::DQM_PROP_TYPE_INT) {
0213         MonitorElement* me = store->bookInt(objname);
0214         auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0215         std::regex parseint{"<.*>i=(.*)</.*>"};
0216         std::smatch match;
0217         bool ok = std::regex_match(expression, match, parseint);
0218         if (!ok) {
0219           edm::LogError("DQMProtobufReader") << "Malformed object of type INT: '" << expression << "'";
0220           continue;
0221         }
0222         int value = std::atoi(match[1].str().c_str());
0223         me->Fill(value);
0224       } else if (kind == DQMNet::DQM_PROP_TYPE_REAL) {
0225         MonitorElement* me = store->bookFloat(objname);
0226         auto expression = std::string(static_cast<TObjString*>(obj)->String().View());
0227         std::regex parsefloat{"<.*>f=(.*)</.*>"};
0228         std::smatch match;
0229         bool ok = std::regex_match(expression, match, parsefloat);
0230         if (!ok) {
0231           edm::LogError("DQMProtobufReader") << "Malformed object of type REAL: '" << expression << "'";
0232           continue;
0233         }
0234         double value = std::atof(match[1].str().c_str());
0235         me->Fill(value);
0236       } else if (kind == DQMNet::DQM_PROP_TYPE_STRING) {
0237         auto value = static_cast<TObjString*>(obj)->String();
0238         store->bookString(objname, value);
0239       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1F) {
0240         auto value = static_cast<TH1F*>(obj);
0241         store->book1D(objname, value);
0242       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1S) {
0243         auto value = static_cast<TH1S*>(obj);
0244         store->book1S(objname, value);
0245       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1D) {
0246         auto value = static_cast<TH1D*>(obj);
0247         store->book1DD(objname, value);
0248       } else if (kind == DQMNet::DQM_PROP_TYPE_TH1I) {
0249         auto value = static_cast<TH1I*>(obj);
0250         store->book1I(objname, value);
0251       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2F) {
0252         auto value = static_cast<TH2F*>(obj);
0253         store->book2D(objname, value);
0254       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2S) {
0255         auto value = static_cast<TH2S*>(obj);
0256         store->book2S(objname, value);
0257       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2D) {
0258         auto value = static_cast<TH2D*>(obj);
0259         store->book2DD(objname, value);
0260       } else if (kind == DQMNet::DQM_PROP_TYPE_TH2I) {
0261         auto value = static_cast<TH2I*>(obj);
0262         store->book2I(objname, value);
0263       } else if (kind == DQMNet::DQM_PROP_TYPE_TH3F) {
0264         auto value = static_cast<TH3F*>(obj);
0265         store->book3D(objname, value);
0266       } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF) {
0267         auto value = static_cast<TProfile*>(obj);
0268         store->bookProfile(objname, value);
0269       } else if (kind == DQMNet::DQM_PROP_TYPE_TPROF2D) {
0270         auto value = static_cast<TProfile2D*>(obj);
0271         store->bookProfile2D(objname, value);
0272       } else {
0273         edm::LogError("DQMProtobufReader") << "Unknown type: " << kind;
0274       }
0275       delete obj;
0276     }
0277   }
0278 }
0279 
0280 void DQMProtobufReader::readEvent_(edm::EventPrincipal&){};
0281 
0282 void DQMProtobufReader::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
0283   edm::ParameterSetDescription desc;
0284 
0285   desc.setComment(
0286       "Creates runs and lumis and fills the dqmstore from protocol buffer "
0287       "files.");
0288   edm::ProducerSourceBase::fillDescription(desc);
0289 
0290   desc.addUntracked<bool>("skipFirstLumis", false)
0291       ->setComment(
0292           "Skip (and ignore the minEventsPerLumi parameter) for the files "
0293           "which have been available at the begining of the processing. "
0294           "If set to true, the reader will open last available file for "
0295           "processing.");
0296 
0297   desc.addUntracked<bool>("deleteDatFiles", false)
0298       ->setComment(
0299           "Delete data files after they have been closed, in order to "
0300           "save disk space.");
0301 
0302   desc.addUntracked<bool>("endOfRunKills", false)
0303       ->setComment(
0304           "Kill the processing as soon as the end-of-run file appears, even if "
0305           "there are/will be unprocessed lumisections.");
0306 
0307   desc.addUntracked<bool>("loadFiles", true)
0308       ->setComment(
0309           "Tells the source load the data files. If set to false, source will create skeleton lumi transitions.");
0310 
0311   DQMFileIterator::fillDescription(desc);
0312   descriptions.add("source", desc);
0313 }
0314 
0315 #include "FWCore/Framework/interface/InputSourceMacros.h"
0316 #include "FWCore/Framework/interface/MakerMacros.h"
0317 
0318 using dqmservices::DQMProtobufReader;
0319 DEFINE_FWK_INPUT_SOURCE(DQMProtobufReader);