1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
#include "DQMServices/Core/src/DQMService.h"
#include "DQMServices/Core/interface/DQMNet.h"
#include "DQMServices/Core/interface/DQMStore.h"
#include "DQMServices/Core/interface/DQMScope.h"
#include "DQMServices/Core/interface/MonitorElement.h"
#include "FWCore/ServiceRegistry/interface/Service.h"
#include <mutex>
#include <iostream>
#include <string>
#include <memory>
#include "TBufferFile.h"
// -------------------------------------------------------------------
static std::recursive_mutex s_mutex;
/// Acquire lock and access to the DQM core from a thread other than
/// the "main" CMSSW processing thread, such as in extra XDAQ threads.
DQMScope::DQMScope() { s_mutex.lock(); }
/// Release access lock to the DQM core.
DQMScope::~DQMScope() { s_mutex.unlock(); }
// -------------------------------------------------------------------
DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
: store_(&*edm::Service<DQMStore>()), net_(nullptr), lastFlush_(0), publishFrequency_(5.0) {
ar.watchPostEvent(this, &DQMService::flush);
ar.watchPostStreamEndLumi(this, &DQMService::flush);
std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
if (!host.empty() && port > 0) {
net_ = new DQMBasicNet;
net_->debug(verbose);
net_->updateToCollector(host, port);
net_->start();
}
}
DQMService::~DQMService() { shutdown(); }
// Flush updates to the network layer at the end of each event. This
// is the only point at which the main application and the network
// layer interact outside initialisation and exit.
void DQMService::flushStandalone() {
// Avoid sending updates excessively often.
uint64_t version = lat::Time::current().ns();
double vtime = version * 1e-9;
if (vtime - lastFlush_ < publishFrequency_)
return;
// OK, send an update.
if (net_) {
DQMNet::Object o;
std::set<std::string> seen;
std::string fullpath;
// Lock the network layer so we can modify the data.
net_->lock();
bool updated = false;
auto mes = store_->getAllContents("");
for (MonitorElement *me : mes) {
auto fullpath = me->getFullname();
seen.insert(fullpath);
if (!me->wasUpdated())
continue;
o.lastreq = 0;
o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
o.flags = me->data_.flags;
o.version = version;
o.dirname = me->data_.dirname.substr(0, me->data_.dirname.size() - 1);
o.objname = me->data_.objname;
assert(o.rawdata.empty());
assert(o.scalar.empty());
assert(o.qdata.empty());
// Pack object and reference, scalar and quality data.
switch (me->kind()) {
case MonitorElement::Kind::INT:
case MonitorElement::Kind::REAL:
case MonitorElement::Kind::STRING:
me->packScalarData(o.scalar, "");
break;
default: {
TBufferFile buffer(TBufferFile::kWrite);
buffer.WriteObject(me->getTH1());
// placeholder for (no longer supported) reference
buffer.WriteObjectAny(nullptr, nullptr);
o.rawdata.resize(buffer.Length());
memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
DQMNet::packQualityData(o.qdata, me->data_.qreports);
break;
}
}
net_->updateLocalObject(o);
DQMNet::DataBlob().swap(o.rawdata);
std::string().swap(o.scalar);
std::string().swap(o.qdata);
updated = true;
}
// Find removed contents and clear the network cache.
if (net_->removeLocalExcept(seen))
updated = true;
// Unlock the network layer.
net_->unlock();
// Tell network to flush if we updated something.
if (updated)
net_->sendLocalChanges();
}
lastFlush_ = lat::Time::current().ns() * 1e-9;
}
void DQMService::flush(edm::StreamContext const &sc) {
// Call a function independent to the framework
flushStandalone();
}
// Disengage the network service.
void DQMService::shutdown() {
// If we have a network, let it go.
if (net_)
net_->shutdown();
}
|