File indexing completed on 2024-04-06 12:10:10
0001 #include "DQMServices/Core/src/DQMService.h"
0002 #include "DQMServices/Core/interface/DQMNet.h"
0003 #include "DQMServices/Core/interface/DQMStore.h"
0004 #include "DQMServices/Core/interface/DQMScope.h"
0005 #include "DQMServices/Core/interface/MonitorElement.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include <mutex>
0008 #include <iostream>
0009 #include <string>
0010 #include <memory>
0011 #include "TBufferFile.h"
0012
0013
0014 static std::recursive_mutex s_mutex;
0015
0016
0017
0018 DQMScope::DQMScope() { s_mutex.lock(); }
0019
0020
0021 DQMScope::~DQMScope() { s_mutex.unlock(); }
0022
0023
0024 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
0025 : store_(&*edm::Service<DQMStore>()), net_(nullptr), lastFlush_(0), publishFrequency_(5.0) {
0026 ar.watchPostEvent(this, &DQMService::flush);
0027 ar.watchPostStreamEndLumi(this, &DQMService::flush);
0028
0029 std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
0030 int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
0031 bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
0032 publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
0033
0034 if (!host.empty() && port > 0) {
0035 net_ = new DQMBasicNet;
0036 net_->debug(verbose);
0037 net_->updateToCollector(host, port);
0038 net_->start();
0039 }
0040 }
0041
0042 DQMService::~DQMService() { shutdown(); }
0043
0044
0045
0046
0047 void DQMService::flushStandalone() {
0048
0049 uint64_t version = lat::Time::current().ns();
0050 double vtime = version * 1e-9;
0051 if (vtime - lastFlush_ < publishFrequency_)
0052 return;
0053
0054
0055 if (net_) {
0056 DQMNet::Object o;
0057 std::set<std::string> seen;
0058 std::string fullpath;
0059
0060
0061 net_->lock();
0062 bool updated = false;
0063
0064 auto mes = store_->getAllContents("");
0065 for (MonitorElement *me : mes) {
0066 auto fullpath = me->getFullname();
0067 seen.insert(fullpath);
0068 if (!me->wasUpdated())
0069 continue;
0070
0071 o.lastreq = 0;
0072 o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
0073 o.flags = me->data_.flags;
0074 o.version = version;
0075 o.dirname = me->data_.dirname.substr(0, me->data_.dirname.size() - 1);
0076 o.objname = me->data_.objname;
0077 assert(o.rawdata.empty());
0078 assert(o.scalar.empty());
0079 assert(o.qdata.empty());
0080
0081
0082
0083 switch (me->kind()) {
0084 case MonitorElement::Kind::INT:
0085 case MonitorElement::Kind::REAL:
0086 case MonitorElement::Kind::STRING:
0087 me->packScalarData(o.scalar, "");
0088 break;
0089 default: {
0090 TBufferFile buffer(TBufferFile::kWrite);
0091 buffer.WriteObject(me->getTH1());
0092
0093 buffer.WriteObjectAny(nullptr, nullptr);
0094 o.rawdata.resize(buffer.Length());
0095 memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
0096 DQMNet::packQualityData(o.qdata, me->data_.qreports);
0097 break;
0098 }
0099 }
0100
0101 net_->updateLocalObject(o);
0102 DQMNet::DataBlob().swap(o.rawdata);
0103 std::string().swap(o.scalar);
0104 std::string().swap(o.qdata);
0105 updated = true;
0106 }
0107
0108
0109 if (net_->removeLocalExcept(seen))
0110 updated = true;
0111
0112
0113 net_->unlock();
0114
0115
0116 if (updated)
0117 net_->sendLocalChanges();
0118 }
0119
0120 lastFlush_ = lat::Time::current().ns() * 1e-9;
0121 }
0122 void DQMService::flush(edm::StreamContext const &sc) {
0123
0124 flushStandalone();
0125 }
0126
0127
0128 void DQMService::shutdown() {
0129
0130 if (net_)
0131 net_->shutdown();
0132 }