File indexing completed on 2023-03-17 10:59:00
0001 #include "DQMServices/Core/src/DQMService.h"
0002 #include "DQMServices/Core/interface/DQMNet.h"
0003 #include "DQMServices/Core/interface/DQMStore.h"
0004 #include "DQMServices/Core/interface/DQMScope.h"
0005 #include "DQMServices/Core/interface/MonitorElement.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "classlib/utils/Error.h"
0008 #include <mutex>
0009 #include <iostream>
0010 #include <string>
0011 #include <memory>
0012 #include "TBufferFile.h"
0013
0014
0015 static std::recursive_mutex s_mutex;
0016
0017
0018
0019 DQMScope::DQMScope() { s_mutex.lock(); }
0020
0021
0022 DQMScope::~DQMScope() { s_mutex.unlock(); }
0023
0024
0025 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
0026 : store_(&*edm::Service<DQMStore>()), net_(nullptr), lastFlush_(0), publishFrequency_(5.0) {
0027 ar.watchPostEvent(this, &DQMService::flush);
0028 ar.watchPostStreamEndLumi(this, &DQMService::flush);
0029
0030 std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
0031 int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
0032 bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
0033 publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
0034
0035 if (!host.empty() && port > 0) {
0036 net_ = new DQMBasicNet;
0037 net_->debug(verbose);
0038 net_->updateToCollector(host, port);
0039 net_->start();
0040 }
0041 }
0042
0043 DQMService::~DQMService() { shutdown(); }
0044
0045
0046
0047
0048 void DQMService::flushStandalone() {
0049
0050 uint64_t version = lat::Time::current().ns();
0051 double vtime = version * 1e-9;
0052 if (vtime - lastFlush_ < publishFrequency_)
0053 return;
0054
0055
0056 if (net_) {
0057 DQMNet::Object o;
0058 std::set<std::string> seen;
0059 std::string fullpath;
0060
0061
0062 net_->lock();
0063 bool updated = false;
0064
0065 auto mes = store_->getAllContents("");
0066 for (MonitorElement *me : mes) {
0067 auto fullpath = me->getFullname();
0068 seen.insert(fullpath);
0069 if (!me->wasUpdated())
0070 continue;
0071
0072 o.lastreq = 0;
0073 o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
0074 o.flags = me->data_.flags;
0075 o.version = version;
0076 o.dirname = me->data_.dirname.substr(0, me->data_.dirname.size() - 1);
0077 o.objname = me->data_.objname;
0078 assert(o.rawdata.empty());
0079 assert(o.scalar.empty());
0080 assert(o.qdata.empty());
0081
0082
0083
0084 switch (me->kind()) {
0085 case MonitorElement::Kind::INT:
0086 case MonitorElement::Kind::REAL:
0087 case MonitorElement::Kind::STRING:
0088 me->packScalarData(o.scalar, "");
0089 break;
0090 default: {
0091 TBufferFile buffer(TBufferFile::kWrite);
0092 buffer.WriteObject(me->getTH1());
0093
0094 buffer.WriteObjectAny(nullptr, nullptr);
0095 o.rawdata.resize(buffer.Length());
0096 memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
0097 DQMNet::packQualityData(o.qdata, me->data_.qreports);
0098 break;
0099 }
0100 }
0101
0102 net_->updateLocalObject(o);
0103 DQMNet::DataBlob().swap(o.rawdata);
0104 std::string().swap(o.scalar);
0105 std::string().swap(o.qdata);
0106 updated = true;
0107 }
0108
0109
0110 if (net_->removeLocalExcept(seen))
0111 updated = true;
0112
0113
0114 net_->unlock();
0115
0116
0117 if (updated)
0118 net_->sendLocalChanges();
0119 }
0120
0121 lastFlush_ = lat::Time::current().ns() * 1e-9;
0122 }
0123 void DQMService::flush(edm::StreamContext const &sc) {
0124
0125 flushStandalone();
0126 }
0127
0128
0129 void DQMService::shutdown() {
0130
0131 if (net_)
0132 net_->shutdown();
0133 }