File indexing completed on 2025-03-10 23:53:30
0001 #ifndef DQMSERVICES_CORE_DQM_NET_H
0002 #define DQMSERVICES_CORE_DQM_NET_H
0003
0004 #include "classlib/iobase/Socket.h"
0005 #include "classlib/iobase/IOSelector.h"
0006 #include "classlib/iobase/Pipe.h"
0007 #include "classlib/utils/Error.h"
0008 #include "classlib/utils/Time.h"
0009 #include <pthread.h>
0010 #include <cstdint>
0011 #include <csignal>
0012 #include <iostream>
0013 #include <vector>
0014 #include <string>
0015 #include <list>
0016 #include <map>
0017 #include <set>
0018 #include <unordered_set>
0019
0020
0021 #include "DataFormats/Histograms/interface/MonitorElementCollection.h"
0022
0023
0024
0025 class DQMNet {
0026 public:
0027 static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
0028 static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
0029 static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
0030 static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
0031 static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
0032 static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
0033 static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
0034 static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
0035 static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
0036 static const uint32_t DQM_PROP_TYPE_TH1I = 0x00000013;
0037 static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
0038 static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
0039 static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
0040 static const uint32_t DQM_PROP_TYPE_TH2I = 0x00000023;
0041 static const uint32_t DQM_PROP_TYPE_TH2Poly = 0x00000024;
0042 static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
0043 static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
0044 static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
0045 static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
0046 static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
0047 static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
0048
0049 static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
0050 static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
0051 static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
0052 static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
0053 static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
0054 static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER);
0055
0056 static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
0057 static const uint32_t DQM_PROP_TAGGED = 0x00002000;
0058 static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
0059 static const uint32_t DQM_PROP_RESET = 0x00008000;
0060
0061 static const uint32_t DQM_PROP_NEW = 0x00010000;
0062 static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
0063 static const uint32_t DQM_PROP_LUMI = 0x00040000;
0064 static const uint32_t DQM_PROP_DEAD = 0x00080000;
0065 static const uint32_t DQM_PROP_STALE = 0x00100000;
0066 static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
0067 static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
0068
0069 static const uint32_t DQM_MSG_HELLO = 0;
0070 static const uint32_t DQM_MSG_UPDATE_ME = 1;
0071 static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
0072 static const uint32_t DQM_MSG_GET_OBJECT = 3;
0073
0074 static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
0075 static const uint32_t DQM_REPLY_LIST_END = 102;
0076 static const uint32_t DQM_REPLY_NONE = 103;
0077 static const uint32_t DQM_REPLY_OBJECT = 104;
0078
0079 static const uint32_t MAX_PEER_WAITREQS = 128;
0080
0081 struct Peer;
0082 struct WaitObject;
0083
0084 using QValue = MonitorElementData::QReport::QValue;
0085 using DataBlob = std::vector<unsigned char>;
0086 using QReports = std::vector<QValue>;
0087 using TagList = std::vector<uint32_t>;
0088 using WaitList = std::list<WaitObject>;
0089
0090 struct CoreObject {
0091 uint32_t flags;
0092 uint32_t tag;
0093 uint64_t version;
0094 uint32_t run;
0095 uint32_t lumi;
0096 uint32_t streamId;
0097 uint32_t moduleId;
0098 std::string dirname;
0099 std::string objname;
0100 QReports qreports;
0101 };
0102
0103 struct Object : CoreObject {
0104 uint64_t hash;
0105 uint64_t lastreq;
0106 DataBlob rawdata;
0107 std::string scalar;
0108 std::string qdata;
0109 };
0110
0111 struct Bucket {
0112 Bucket *next;
0113 DataBlob data;
0114 };
0115
0116 struct WaitObject {
0117 lat::Time time;
0118 std::string name;
0119 std::string info;
0120 Peer *peer;
0121 };
0122
0123 struct AutoPeer;
0124 struct Peer {
0125 std::string peeraddr;
0126 lat::Socket *socket;
0127 DataBlob incoming;
0128 Bucket *sendq;
0129 size_t sendpos;
0130
0131 unsigned mask;
0132 bool source;
0133 bool update;
0134 bool updated;
0135 size_t updates;
0136 size_t waiting;
0137 AutoPeer *automatic;
0138 };
0139
0140 struct AutoPeer {
0141 Peer *peer;
0142 lat::Time next;
0143 std::string host;
0144 int port;
0145 bool update;
0146 };
0147
0148 DQMNet(const std::string &appname = "");
0149 virtual ~DQMNet();
0150
0151 void debug(bool doit);
0152 void delay(int delay);
0153 void startLocalServer(int port);
0154 void startLocalServer(const char *path);
0155 void staleObjectWaitLimit(lat::TimeSpan time);
0156 void updateToCollector(const std::string &host, int port);
0157 void listenToCollector(const std::string &host, int port);
0158 void shutdown();
0159 void lock();
0160 void unlock();
0161
0162 void start();
0163 void run();
0164
0165 void sendLocalChanges();
0166
0167 static bool setOrder(const CoreObject &a, const CoreObject &b) {
0168 if (a.run == b.run) {
0169 if (a.lumi == b.lumi) {
0170 if (a.streamId == b.streamId) {
0171 if (a.moduleId == b.moduleId) {
0172 if (a.dirname == b.dirname) {
0173 return a.objname < b.objname;
0174 }
0175 return a.dirname < b.dirname;
0176 }
0177 return a.moduleId < b.moduleId;
0178 }
0179 return a.streamId < b.streamId;
0180 }
0181 return a.lumi < b.lumi;
0182 }
0183 return a.run < b.run;
0184 }
0185
0186 struct HashOp {
0187 uint32_t operator()(const Object &a) const { return a.hash; }
0188 };
0189
0190 struct HashEqual {
0191 bool operator()(const Object &a, const Object &b) const {
0192 return a.hash == b.hash && a.dirname == b.dirname && a.objname == b.objname;
0193 }
0194 };
0195
0196 static size_t dqmhash(const void *key, size_t keylen) {
0197
0198
0199 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
0200 #define dqmhashmix(a, b, c) \
0201 { \
0202 a -= c; \
0203 a ^= dqmhashrot(c, 4); \
0204 c += b; \
0205 b -= a; \
0206 b ^= dqmhashrot(a, 6); \
0207 a += c; \
0208 c -= b; \
0209 c ^= dqmhashrot(b, 8); \
0210 b += a; \
0211 a -= c; \
0212 a ^= dqmhashrot(c, 16); \
0213 c += b; \
0214 b -= a; \
0215 b ^= dqmhashrot(a, 19); \
0216 a += c; \
0217 c -= b; \
0218 c ^= dqmhashrot(b, 4); \
0219 b += a; \
0220 }
0221 #define dqmhashfinal(a, b, c) \
0222 { \
0223 c ^= b; \
0224 c -= dqmhashrot(b, 14); \
0225 a ^= c; \
0226 a -= dqmhashrot(c, 11); \
0227 b ^= a; \
0228 b -= dqmhashrot(a, 25); \
0229 c ^= b; \
0230 c -= dqmhashrot(b, 16); \
0231 a ^= c; \
0232 a -= dqmhashrot(c, 4); \
0233 b ^= a; \
0234 b -= dqmhashrot(a, 14); \
0235 c ^= b; \
0236 c -= dqmhashrot(b, 24); \
0237 }
0238
0239 uint32_t a, b, c;
0240 a = b = c = 0xdeadbeef + (uint32_t)keylen;
0241 const auto *k = (const unsigned char *)key;
0242
0243
0244 while (keylen > 12) {
0245 a += k[0];
0246 a += ((uint32_t)k[1]) << 8;
0247 a += ((uint32_t)k[2]) << 16;
0248 a += ((uint32_t)k[3]) << 24;
0249 b += k[4];
0250 b += ((uint32_t)k[5]) << 8;
0251 b += ((uint32_t)k[6]) << 16;
0252 b += ((uint32_t)k[7]) << 24;
0253 c += k[8];
0254 c += ((uint32_t)k[9]) << 8;
0255 c += ((uint32_t)k[10]) << 16;
0256 c += ((uint32_t)k[11]) << 24;
0257 dqmhashmix(a, b, c);
0258 keylen -= 12;
0259 k += 12;
0260 }
0261
0262
0263 switch (keylen) {
0264 case 12:
0265 c += ((uint32_t)k[11]) << 24;
0266 [[fallthrough]];
0267 case 11:
0268 c += ((uint32_t)k[10]) << 16;
0269 [[fallthrough]];
0270 case 10:
0271 c += ((uint32_t)k[9]) << 8;
0272 [[fallthrough]];
0273 case 9:
0274 c += k[8];
0275 [[fallthrough]];
0276 case 8:
0277 b += ((uint32_t)k[7]) << 24;
0278 [[fallthrough]];
0279 case 7:
0280 b += ((uint32_t)k[6]) << 16;
0281 [[fallthrough]];
0282 case 6:
0283 b += ((uint32_t)k[5]) << 8;
0284 [[fallthrough]];
0285 case 5:
0286 b += k[4];
0287 [[fallthrough]];
0288 case 4:
0289 a += ((uint32_t)k[3]) << 24;
0290 [[fallthrough]];
0291 case 3:
0292 a += ((uint32_t)k[2]) << 16;
0293 [[fallthrough]];
0294 case 2:
0295 a += ((uint32_t)k[1]) << 8;
0296 [[fallthrough]];
0297 case 1:
0298 a += k[0];
0299 break;
0300 case 0:
0301 return c;
0302 }
0303
0304 dqmhashfinal(a, b, c);
0305 return c;
0306 #undef dqmhashrot
0307 #undef dqmhashmix
0308 #undef dqmhashfinal
0309 }
0310
0311 static void packQualityData(std::string &into, const QReports &qr);
0312 static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
0313
0314 protected:
0315 std::ostream &logme();
0316 static void copydata(Bucket *b, const void *data, size_t len);
0317 virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
0318
0319 virtual bool shouldStop();
0320 void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
0321 virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
0322 virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
0323
0324
0325
0326 virtual Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
0327 virtual Object *makeObject(Peer *p, const std::string &name) = 0;
0328 virtual void markObjectsDead(Peer *p) = 0;
0329 virtual void purgeDeadObjects(Peer *p) = 0;
0330
0331 virtual Peer *getPeer(lat::Socket *s) = 0;
0332 virtual Peer *createPeer(lat::Socket *s) = 0;
0333 virtual void removePeer(Peer *p, lat::Socket *s) = 0;
0334 virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
0335 virtual void sendObjectListToPeers(bool all) = 0;
0336
0337 void updateMask(Peer *p);
0338 virtual void updatePeerMasks() = 0;
0339 static void discard(Bucket *&b);
0340
0341 bool debug_;
0342 pthread_mutex_t lock_;
0343
0344 private:
0345 void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err = nullptr);
0346 void requestObjectData(Peer *p, const char *name, size_t len);
0347 void releaseFromWait(WaitList::iterator i, Object *o);
0348 void releaseWaiters(const std::string &name, Object *o);
0349
0350 bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
0351 bool onPeerConnect(lat::IOSelectEvent *ev);
0352 bool onLocalNotify(lat::IOSelectEvent *ev);
0353
0354 std::string appname_;
0355 int pid_;
0356
0357 lat::IOSelector sel_;
0358 lat::Socket *server_;
0359 lat::Pipe wakeup_;
0360 lat::Time version_;
0361
0362 AutoPeer upstream_;
0363 AutoPeer downstream_;
0364 WaitList waiting_;
0365
0366 pthread_t communicate_;
0367 sig_atomic_t shutdown_;
0368
0369 int delay_;
0370 lat::TimeSpan waitStale_;
0371 lat::TimeSpan waitMax_;
0372 bool flush_;
0373
0374 public:
0375
0376 DQMNet(const DQMNet &) = delete;
0377 DQMNet &operator=(const DQMNet &) = delete;
0378 };
0379
0380 template <class ObjType>
0381 class DQMImplNet : public DQMNet {
0382 public:
0383 struct ImplPeer;
0384
0385 using DirMap = std::set<std::string>;
0386 typedef std::unordered_set<ObjType, HashOp, HashEqual> ObjectMap;
0387 typedef std::map<lat::Socket *, ImplPeer> PeerMap;
0388 struct ImplPeer : Peer {
0389 ImplPeer() = default;
0390 ObjectMap objs;
0391 DirMap dirs;
0392 };
0393
0394 DQMImplNet(const std::string &appname = "") : DQMNet(appname) {}
0395
0396 ~DQMImplNet() override = default;
0397
0398 protected:
0399 Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override {
0400 size_t slash = name.rfind('/');
0401 size_t dirpos = (slash == std::string::npos ? 0 : slash);
0402 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0403 std::string path(name, 0, dirpos);
0404 ObjType proto;
0405 proto.hash = dqmhash(name.c_str(), name.size());
0406 proto.dirname = path;
0407 proto.objname.append(name, namepos, std::string::npos);
0408
0409 typename ObjectMap::iterator pos;
0410 typename PeerMap::iterator i, e;
0411 if (owner)
0412 *owner = nullptr;
0413 if (p) {
0414 auto *ip = static_cast<ImplPeer *>(p);
0415 pos = ip->objs.find(proto);
0416 if (pos == ip->objs.end())
0417 return nullptr;
0418 else {
0419 if (owner)
0420 *owner = ip;
0421 return const_cast<ObjType *>(&*pos);
0422 }
0423 } else {
0424 for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
0425 pos = i->second.objs.find(proto);
0426 if (pos != i->second.objs.end()) {
0427 if (owner)
0428 *owner = &i->second;
0429 return const_cast<ObjType *>(&*pos);
0430 }
0431 }
0432 return nullptr;
0433 }
0434 }
0435
0436 Object *makeObject(Peer *p, const std::string &name) override {
0437 auto *ip = static_cast<ImplPeer *>(p);
0438 size_t slash = name.rfind('/');
0439 size_t dirpos = (slash == std::string::npos ? 0 : slash);
0440 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0441 ObjType o;
0442 o.flags = 0;
0443 o.tag = 0;
0444 o.version = 0;
0445 o.lastreq = 0;
0446 o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).first;
0447 o.objname.append(name, namepos, std::string::npos);
0448 o.hash = dqmhash(name.c_str(), name.size());
0449 return const_cast<ObjType *>(&*ip->objs.insert(o).first);
0450 }
0451
0452
0453
0454
0455
0456
0457
0458
0459 void markObjectsDead(Peer *p) override {
0460 uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 , 0, 0)).ns();
0461 auto *ip = static_cast<ImplPeer *>(p);
0462 typename ObjectMap::iterator i, e;
0463 for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i) {
0464 if (i->lastreq && i->lastreq < minreq)
0465 const_cast<ObjType &>(*i).lastreq = 0;
0466 const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
0467 }
0468 }
0469
0470
0471 void purgeDeadObjects(Peer *p) override {
0472 auto *ip = static_cast<ImplPeer *>(p);
0473 typename ObjectMap::iterator i, e;
0474 for (i = ip->objs.begin(), e = ip->objs.end(); i != e;) {
0475 if (i->flags & DQM_PROP_DEAD)
0476 ip->objs.erase(i++);
0477 else
0478 ++i;
0479 }
0480 }
0481
0482 Peer *getPeer(lat::Socket *s) override {
0483 auto pos = peers_.find(s);
0484 auto end = peers_.end();
0485 return pos == end ? nullptr : &pos->second;
0486 }
0487
0488 Peer *createPeer(lat::Socket *s) override {
0489 ImplPeer *ip = &peers_[s];
0490 ip->socket = nullptr;
0491 ip->sendq = nullptr;
0492 ip->sendpos = 0;
0493 ip->mask = 0;
0494 ip->source = false;
0495 ip->update = false;
0496 ip->updated = false;
0497 ip->updates = 0;
0498 ip->waiting = 0;
0499 ip->automatic = nullptr;
0500 return ip;
0501 }
0502
0503 void removePeer(Peer *p, lat::Socket *s) override {
0504 auto *ip = static_cast<ImplPeer *>(p);
0505 bool needflush = !ip->objs.empty();
0506
0507 typename ObjectMap::iterator i, e;
0508 for (i = ip->objs.begin(), e = ip->objs.end(); i != e;)
0509 ip->objs.erase(i++);
0510
0511 peers_.erase(s);
0512
0513
0514
0515 if (needflush)
0516 sendLocalChanges();
0517 }
0518
0519
0520 void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override {
0521 typename PeerMap::iterator pi, pe;
0522 typename ObjectMap::iterator oi, oe;
0523 size_t size = 0;
0524 size_t numobjs = 0;
0525 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
0526 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
0527 if (all || (oi->flags & DQM_PROP_NEW))
0528 size += 9 * sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
0529 oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
0530
0531 msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
0532
0533 uint32_t nupdates = 0;
0534 uint32_t words[4];
0535 words[0] = sizeof(words);
0536 words[1] = DQM_REPLY_LIST_BEGIN;
0537 words[2] = numobjs;
0538 words[3] = all;
0539 copydata(msg, &words[0], sizeof(words));
0540
0541 for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
0542 for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
0543 if (all || (oi->flags & DQM_PROP_NEW)) {
0544 sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
0545 if (clear)
0546 const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
0547 ++nupdates;
0548 }
0549
0550 words[1] = DQM_REPLY_LIST_END;
0551 words[2] = nupdates;
0552 copydata(msg, &words[0], sizeof(words));
0553 }
0554
0555 void sendObjectListToPeers(bool all) override {
0556 typename PeerMap::iterator i, e;
0557 typename ObjectMap::iterator oi, oe;
0558 for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
0559 ImplPeer &p = i->second;
0560 if (!p.update)
0561 continue;
0562
0563 if (debug_)
0564 logme() << "DEBUG: notifying " << p.peeraddr << std::endl;
0565
0566 Bucket msg;
0567 msg.next = nullptr;
0568 sendObjectListToPeer(&msg, !p.updated || all, true);
0569
0570 if (!msg.data.empty()) {
0571 Bucket **prev = &p.sendq;
0572 while (*prev)
0573 prev = &(*prev)->next;
0574
0575 *prev = new Bucket;
0576 (*prev)->next = nullptr;
0577 (*prev)->data.swap(msg.data);
0578 }
0579 p.updated = true;
0580 }
0581 }
0582
0583 void updatePeerMasks() override {
0584 typename PeerMap::iterator i, e;
0585 for (i = peers_.begin(), e = peers_.end(); i != e;)
0586 updateMask(&(i++)->second);
0587 }
0588
0589 protected:
0590 PeerMap peers_;
0591 };
0592
0593 class DQMBasicNet : public DQMImplNet<DQMNet::Object> {
0594 public:
0595 DQMBasicNet(const std::string &appname = "");
0596
0597 void reserveLocalSpace(uint32_t size);
0598 void updateLocalObject(Object &o);
0599 bool removeLocalExcept(const std::set<std::string> &known);
0600
0601 private:
0602 ImplPeer *local_;
0603 };
0604
0605 #endif