File indexing completed on 2025-03-13 02:31:45
0001 #include "DQMServices/Core/interface/DQMNet.h"
0002 #include "classlib/iobase/InetServerSocket.h"
0003 #include "classlib/iobase/LocalServerSocket.h"
0004 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
0005 #include "classlib/utils/SystemError.h"
0006 #include "classlib/utils/Regexp.h"
0007 #include <fmt/format.h>
0008 #include <unistd.h>
0009 #include <fcntl.h>
0010 #include <sys/wait.h>
0011 #include <cstdio>
0012 #include <cstdint>
0013 #include <iostream>
0014 #include <sstream>
0015 #include <cassert>
0016 #include <cfloat>
0017 #include <cinttypes>
0018
0019 #include "FWCore/Utilities/interface/EDMException.h"
0020
0021 #if __APPLE__
0022 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
0023 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
0024 #else
0025 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
0026 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
0027 #endif
0028 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
0029 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
0030
0031 using namespace lat;
0032
0033 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
0034
0035
0036 namespace dqm {
0037 namespace qstatus {
0038 static const int STATUS_OK = 100;
0039 static const int WARNING = 200;
0040 static const int ERROR = 300;
0041 }
0042 }
0043
0044
0045
0046 std::ostream &DQMNet::logme() {
0047 Time now = Time::current();
0048 return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
0049 << "]: ";
0050 }
0051
0052
0053 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
0054 b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
0055 }
0056
0057
0058 void DQMNet::discard(Bucket *&b) {
0059 while (b) {
0060 Bucket *next = b->next;
0061 delete b;
0062 b = next;
0063 }
0064 }
0065
0066
0067
0068
0069
0070 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
0071 if (reason)
0072 logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
0073
0074 Socket *s = peer->socket;
0075
0076 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0077 if (i->peer == peer)
0078 waiting_.erase(i++);
0079 else
0080 ++i;
0081
0082 if (ev)
0083 ev->source = nullptr;
0084
0085 discard(peer->sendq);
0086 if (peer->automatic)
0087 peer->automatic->peer = nullptr;
0088
0089 sel_.detach(s);
0090 s->close();
0091 removePeer(peer, s);
0092 delete s;
0093 }
0094
0095
0096 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
0097
0098 Bucket **msg = &p->sendq;
0099 while (*msg)
0100 msg = &(*msg)->next;
0101 *msg = new Bucket;
0102 (*msg)->next = nullptr;
0103
0104 uint32_t words[3];
0105 words[0] = sizeof(words) + len;
0106 words[1] = DQM_MSG_GET_OBJECT;
0107 words[2] = len;
0108 copydata(*msg, words, sizeof(words));
0109 copydata(*msg, name, len);
0110 }
0111
0112
0113
0114 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
0115
0116
0117
0118
0119
0120 requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
0121 WaitObject wo = {Time::current(), name, info, p};
0122 waiting_.push_back(wo);
0123 p->waiting++;
0124 }
0125
0126
0127
0128 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
0129 Bucket **msg = &i->peer->sendq;
0130 while (*msg)
0131 msg = &(*msg)->next;
0132 *msg = new Bucket;
0133 (*msg)->next = nullptr;
0134
0135 releaseFromWait(*msg, *i, o);
0136
0137 assert(i->peer->waiting > 0);
0138 i->peer->waiting--;
0139 waiting_.erase(i);
0140 }
0141
0142
0143 void DQMNet::releaseWaiters(const std::string &name, Object *o) {
0144 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0145 if (i->name == name)
0146 releaseFromWait(i++, o);
0147 else
0148 ++i;
0149 }
0150
0151
0152
0153
0154 void DQMNet::packQualityData(std::string &into, const QReports &qr) {
0155 char buf[64];
0156 std::ostringstream qrs;
0157 QReports::const_iterator qi, qe;
0158 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
0159 int pos = 0;
0160 sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
0161 qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
0162 << '\0';
0163 }
0164 into = qrs.str();
0165 }
0166
0167
0168
0169 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
0170 const char *qdata = from;
0171
0172
0173 size_t nqv = 0;
0174 while (*qdata) {
0175 ++nqv;
0176 while (*qdata)
0177 ++qdata;
0178 ++qdata;
0179 while (*qdata)
0180 ++qdata;
0181 ++qdata;
0182 while (*qdata)
0183 ++qdata;
0184 ++qdata;
0185 while (*qdata)
0186 ++qdata;
0187 ++qdata;
0188 while (*qdata)
0189 ++qdata;
0190 ++qdata;
0191 }
0192
0193
0194 qdata = from;
0195 qr.reserve(nqv);
0196 while (*qdata) {
0197 qr.emplace_back();
0198 DQMNet::QValue &qv = qr.back();
0199
0200 qv.code = atoi(qdata);
0201 while (*qdata)
0202 ++qdata;
0203 switch (qv.code) {
0204 case dqm::qstatus::STATUS_OK:
0205 break;
0206 case dqm::qstatus::WARNING:
0207 flags |= DQMNet::DQM_PROP_REPORT_WARN;
0208 break;
0209 case dqm::qstatus::ERROR:
0210 flags |= DQMNet::DQM_PROP_REPORT_ERROR;
0211 break;
0212 default:
0213 flags |= DQMNet::DQM_PROP_REPORT_OTHER;
0214 break;
0215 }
0216
0217 qv.qtresult = atof(++qdata);
0218 while (*qdata)
0219 ++qdata;
0220
0221 qv.qtname = ++qdata;
0222 while (*qdata)
0223 ++qdata;
0224
0225 qv.algorithm = ++qdata;
0226 while (*qdata)
0227 ++qdata;
0228
0229 qv.message = ++qdata;
0230 while (*qdata)
0231 ++qdata;
0232 ++qdata;
0233 }
0234 }
0235
0236 #if 0
0237
0238 static TObject *
0239 extractNextObject(TBufferFile &buf)
0240 {
0241 if (buf.Length() == buf.BufferSize())
0242 return 0;
0243
0244 buf.InitMap();
0245 Int_t pos = buf.Length();
0246 TClass *c = buf.ReadClass();
0247 buf.SetBufferOffset(pos);
0248 buf.ResetMap();
0249 return c ? buf.ReadObject(c) : 0;
0250 }
0251
0252
0253 bool
0254 DQMNet::reconstructObject(Object &o)
0255 {
0256 TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
0257 buf.Reset();
0258
0259
0260 if (! (o.object = extractNextObject(buf)))
0261 return false;
0262
0263
0264 o.reference = extractNextObject(buf);
0265
0266
0267 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
0268 return true;
0269 }
0270 #endif
0271
0272 #if 0
0273 bool
0274 DQMNet::reinstateObject(DQMStore *store, Object &o)
0275 {
0276 if (! reconstructObject (o))
0277 return false;
0278
0279
0280 MonitorElement *obj = 0;
0281 store->setCurrentFolder(o.dirname);
0282 switch (o.flags & DQM_PROP_TYPE_MASK)
0283 {
0284 case DQM_PROP_TYPE_INT:
0285 obj = store->bookInt(o.objname);
0286 obj->Fill(atoll(o.scalar.c_str()));
0287 break;
0288
0289 case DQM_PROP_TYPE_REAL:
0290 obj = store->bookFloat(name);
0291 obj->Fill(atof(o.scalar.c_str()));
0292 break;
0293
0294 case DQM_PROP_TYPE_STRING:
0295 obj = store->bookString(name, o.scalar);
0296 break;
0297
0298 case DQM_PROP_TYPE_TH1F:
0299 obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
0300 break;
0301
0302 case DQM_PROP_TYPE_TH1S:
0303 obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
0304 break;
0305
0306 case DQM_PROP_TYPE_TH1D:
0307 obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
0308 break;
0309
0310 case DQM_PROP_TYPE_TH1I:
0311 obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
0312 break;
0313
0314 case DQM_PROP_TYPE_TH2F:
0315 obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
0316 break;
0317
0318 case DQM_PROP_TYPE_TH2S:
0319 obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
0320 break;
0321
0322 case DQM_PROP_TYPE_TH2D:
0323 obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
0324 break;
0325
0326 case DQM_PROP_TYPE_TH2I:
0327 obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
0328 break;
0329
0330 case DQM_PROP_TYPE_TH2Poly:
0331 obj = store->book2DPoly(name, dynamic_cast<TH2Poly *>(o.object));
0332 break;
0333
0334 case DQM_PROP_TYPE_TH3F:
0335 obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
0336 break;
0337
0338 case DQM_PROP_TYPE_TH3S:
0339 obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
0340 break;
0341
0342 case DQM_PROP_TYPE_TH3D:
0343 obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
0344 break;
0345
0346 case DQM_PROP_TYPE_PROF:
0347 obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
0348 break;
0349
0350 case DQM_PROP_TYPE_PROF2D:
0351 obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
0352 break;
0353
0354 default:
0355 logme()
0356 << "ERROR: unexpected monitor element of type "
0357 << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
0358 << o.dirname << '/' << o.objname << "'\n";
0359 return false;
0360 }
0361
0362
0363 if (obj)
0364 {
0365 obj->data_.tag = o.tag;
0366 obj->data_.qreports = o.qreports;
0367 }
0368
0369
0370 return true;
0371 }
0372 #endif
0373
0374
0375
0376 bool DQMNet::shouldStop() { return shutdown_; }
0377
0378
0379
0380 void DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o) {
0381 if (o)
0382 sendObjectToPeer(msg, *o, true);
0383 else {
0384 uint32_t words[3];
0385 words[0] = sizeof(words) + w.name.size();
0386 words[1] = DQM_REPLY_NONE;
0387 words[2] = w.name.size();
0388
0389 msg->data.reserve(msg->data.size() + words[0]);
0390 copydata(msg, &words[0], sizeof(words));
0391 copydata(msg, &w.name[0], w.name.size());
0392 }
0393 }
0394
0395
0396
0397
0398 void DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data) {
0399 uint32_t flags = o.flags & ~DQM_PROP_DEAD;
0400 DataBlob objdata;
0401
0402 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
0403 objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
0404 else if (data)
0405 objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
0406
0407 uint32_t words[9];
0408 uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
0409 uint32_t datalen = objdata.size();
0410 uint32_t qlen = o.qdata.size();
0411
0412 if (o.dirname.empty())
0413 --namelen;
0414
0415 words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
0416 words[1] = DQM_REPLY_OBJECT;
0417 words[2] = flags;
0418 words[3] = (o.version >> 0) & 0xffffffff;
0419 words[4] = (o.version >> 32) & 0xffffffff;
0420 words[5] = o.tag;
0421 words[6] = namelen;
0422 words[7] = datalen;
0423 words[8] = qlen;
0424
0425 msg->data.reserve(msg->data.size() + words[0]);
0426 copydata(msg, &words[0], 9 * sizeof(uint32_t));
0427 if (namelen) {
0428 copydata(msg, &(o.dirname)[0], o.dirname.size());
0429 if (!o.dirname.empty())
0430 copydata(msg, "/", 1);
0431 copydata(msg, &o.objname[0], o.objname.size());
0432 }
0433 if (datalen)
0434 copydata(msg, &objdata[0], datalen);
0435 if (qlen)
0436 copydata(msg, &o.qdata[0], qlen);
0437 }
0438
0439
0440
0441 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
0442
0443 uint32_t type;
0444 memcpy(&type, data + sizeof(uint32_t), sizeof(type));
0445 switch (type) {
0446 case DQM_MSG_UPDATE_ME: {
0447 if (len != 2 * sizeof(uint32_t)) {
0448 logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
0449 return false;
0450 }
0451
0452 if (debug_)
0453 logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
0454
0455 p->update = true;
0456 }
0457 return true;
0458
0459 case DQM_MSG_LIST_OBJECTS: {
0460 if (debug_)
0461 logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
0462
0463
0464 sendObjectListToPeer(msg, true, false);
0465 }
0466 return true;
0467
0468 case DQM_MSG_GET_OBJECT: {
0469 if (debug_)
0470 logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
0471
0472 if (len < 3 * sizeof(uint32_t)) {
0473 logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0474 return false;
0475 }
0476
0477 uint32_t namelen;
0478 memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
0479 if (len != 3 * sizeof(uint32_t) + namelen) {
0480 logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
0481 << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
0482 return false;
0483 }
0484
0485 std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
0486 Peer *owner = nullptr;
0487 Object *o = findObject(nullptr, name, &owner);
0488 if (o) {
0489 o->lastreq = Time::current().ns();
0490 if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
0491 (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0492 waitForData(p, name, "", owner);
0493 else
0494 sendObjectToPeer(msg, *o, true);
0495 } else {
0496 uint32_t words[3];
0497 words[0] = sizeof(words) + name.size();
0498 words[1] = DQM_REPLY_NONE;
0499 words[2] = name.size();
0500
0501 msg->data.reserve(msg->data.size() + words[0]);
0502 copydata(msg, &words[0], sizeof(words));
0503 copydata(msg, &name[0], name.size());
0504 }
0505 }
0506 return true;
0507
0508 case DQM_REPLY_LIST_BEGIN: {
0509 if (len != 4 * sizeof(uint32_t)) {
0510 logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
0511 return false;
0512 }
0513
0514
0515 uint32_t flags;
0516 memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0517
0518 if (debug_)
0519 logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
0520 << p->peeraddr << ", size " << len << std::endl;
0521
0522
0523
0524
0525
0526
0527
0528 if (flags)
0529 markObjectsDead(p);
0530 }
0531 return true;
0532
0533 case DQM_REPLY_LIST_END: {
0534 if (len != 4 * sizeof(uint32_t)) {
0535 logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
0536 return false;
0537 }
0538
0539
0540 uint32_t flags;
0541 memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0542
0543
0544
0545
0546
0547 if (flags)
0548 purgeDeadObjects(p);
0549
0550 if (debug_)
0551 logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
0552 << ", size " << len << std::endl;
0553
0554
0555
0556 flush_ = true;
0557 p->updates++;
0558 }
0559 return true;
0560
0561 case DQM_REPLY_OBJECT: {
0562 uint32_t words[9];
0563 if (len < sizeof(words)) {
0564 logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
0565 return false;
0566 }
0567
0568 memcpy(&words[0], data, sizeof(words));
0569 uint32_t &namelen = words[6];
0570 uint32_t &datalen = words[7];
0571 uint32_t &qlen = words[8];
0572
0573 if (len != sizeof(words) + namelen + datalen + qlen) {
0574 logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
0575 << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
0576 << std::endl;
0577 return false;
0578 }
0579
0580 unsigned char *namedata = data + sizeof(words);
0581 unsigned char *objdata = namedata + namelen;
0582 unsigned char *qdata = objdata + datalen;
0583 unsigned char *enddata = qdata + qlen;
0584 std::string name((char *)namedata, namelen);
0585 assert(enddata == data + len);
0586
0587 if (debug_)
0588 logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
0589 << std::endl;
0590
0591
0592 p->source = true;
0593
0594
0595 Object *o = findObject(p, name);
0596 if (!o)
0597 o = makeObject(p, name);
0598
0599 o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
0600 o->tag = words[5];
0601 o->version = ((uint64_t)words[4] << 32 | words[3]);
0602 o->scalar.clear();
0603 o->qdata.clear();
0604 if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
0605 o->rawdata.clear();
0606 o->scalar.insert(o->scalar.end(), objdata, qdata);
0607 } else if (datalen) {
0608 o->rawdata.clear();
0609 o->rawdata.insert(o->rawdata.end(), objdata, qdata);
0610 } else if (!o->rawdata.empty())
0611 o->flags |= DQM_PROP_STALE;
0612 o->qdata.insert(o->qdata.end(), qdata, enddata);
0613
0614
0615
0616 if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0617 requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
0618
0619
0620 if (datalen)
0621 releaseWaiters(name, o);
0622 }
0623 return true;
0624
0625 case DQM_REPLY_NONE: {
0626 uint32_t words[3];
0627 if (len < sizeof(words)) {
0628 logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0629 return false;
0630 }
0631
0632 memcpy(&words[0], data, sizeof(words));
0633 uint32_t &namelen = words[2];
0634
0635 if (len != sizeof(words) + namelen) {
0636 logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
0637 << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
0638 return false;
0639 }
0640
0641 unsigned char *namedata = data + sizeof(words);
0642 std::string name((char *)namedata, namelen);
0643
0644 if (debug_)
0645 logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
0646 << std::endl;
0647
0648
0649 p->source = true;
0650
0651
0652 if (Object *o = findObject(p, name)) {
0653 o->flags |= DQM_PROP_DEAD;
0654 purgeDeadObjects(p);
0655 }
0656
0657
0658 releaseWaiters(name, nullptr);
0659 }
0660 return true;
0661
0662 default:
0663 logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
0664 << std::endl;
0665 return false;
0666 }
0667 }
0668
0669
0670
0671 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
0672 lock();
0673 assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
0674
0675
0676
0677
0678
0679 if (ev->events & IOUrgent) {
0680 if (p->automatic) {
0681 logme() << "WARNING: connection to the DQM server at " << p->peeraddr
0682 << " lost (will attempt to reconnect in 15 seconds)\n";
0683 losePeer(nullptr, p, ev);
0684 } else
0685 losePeer("WARNING: lost peer connection ", p, ev);
0686
0687 unlock();
0688 return true;
0689 }
0690
0691
0692 if (ev->events & IOWrite) {
0693 while (Bucket *b = p->sendq) {
0694 IOSize len = b->data.size() - p->sendpos;
0695 const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
0696 IOSize done;
0697
0698 try {
0699 done = (len ? ev->source->write(data, len) : 0);
0700 if (debug_ && len)
0701 logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
0702 } catch (Error &e) {
0703 losePeer("WARNING: unable to write to peer ", p, ev, &e);
0704 unlock();
0705 return true;
0706 }
0707
0708 p->sendpos += done;
0709 if (p->sendpos == b->data.size()) {
0710 Bucket *old = p->sendq;
0711 p->sendq = old->next;
0712 p->sendpos = 0;
0713 old->next = nullptr;
0714 discard(old);
0715 }
0716
0717 if (!done && len)
0718
0719 break;
0720 }
0721 }
0722
0723
0724
0725 if (ev->events & IORead) {
0726
0727
0728
0729 IOSize sz;
0730 try {
0731 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
0732 do
0733 if ((sz = ev->source->read(&buf[0], buf.size()))) {
0734 if (debug_)
0735 logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
0736 DataBlob &data = p->incoming;
0737 if (data.capacity() < data.size() + sz)
0738 data.reserve(data.size() + SOCKET_READ_GROWTH);
0739 data.insert(data.end(), &buf[0], &buf[0] + sz);
0740 }
0741 while (sz == sizeof(buf));
0742 } catch (Error &e) {
0743 auto *next = dynamic_cast<SystemError *>(e.next());
0744 if (next && next->portable() == SysErr::ErrTryAgain)
0745 sz = 1;
0746 else {
0747
0748 losePeer("WARNING: failed to read from peer ", p, ev, &e);
0749 unlock();
0750 return true;
0751 }
0752 }
0753
0754
0755 size_t consumed = 0;
0756 DataBlob &data = p->incoming;
0757 while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
0758 uint32_t msglen;
0759 memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
0760
0761 if (msglen >= MESSAGE_SIZE_LIMIT) {
0762 losePeer("WARNING: excessively large message from ", p, ev);
0763 unlock();
0764 return true;
0765 }
0766
0767 if (data.size() - consumed >= msglen) {
0768 bool valid = true;
0769 if (msglen < 2 * sizeof(uint32_t)) {
0770 logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
0771 valid = false;
0772 } else {
0773
0774 Bucket msg;
0775 msg.next = nullptr;
0776 valid = onMessage(&msg, p, &data[0] + consumed, msglen);
0777
0778
0779 if (!msg.data.empty()) {
0780 Bucket **prev = &p->sendq;
0781 while (*prev)
0782 prev = &(*prev)->next;
0783
0784 *prev = new Bucket;
0785 (*prev)->next = nullptr;
0786 (*prev)->data.swap(msg.data);
0787 }
0788 }
0789
0790 if (!valid) {
0791 losePeer("WARNING: data stream error with ", p, ev);
0792 unlock();
0793 return true;
0794 }
0795
0796 consumed += msglen;
0797 } else
0798 break;
0799 }
0800
0801 data.erase(data.begin(), data.begin() + consumed);
0802
0803
0804
0805
0806 if (sz == 0)
0807 sel_.setMask(p->socket, p->mask &= ~IORead);
0808 }
0809
0810
0811 unlock();
0812 return false;
0813 }
0814
0815
0816
0817
0818
0819 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
0820
0821 assert(ev->source == server_);
0822
0823
0824 Socket *s = server_->accept();
0825 assert(s);
0826 assert(!s->isBlocking());
0827
0828
0829 lock();
0830 Peer *p = createPeer(s);
0831 std::string localaddr;
0832 if (auto *inet = dynamic_cast<InetSocket *>(s)) {
0833 InetAddress peeraddr = inet->peername();
0834 InetAddress myaddr = inet->sockname();
0835 p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
0836 localaddr = fmt::format("{}:{}", myaddr.hostname(), myaddr.port());
0837 } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
0838 p->peeraddr = local->peername().path();
0839 localaddr = local->sockname().path();
0840 } else
0841 assert(false);
0842
0843 p->mask = IORead | IOUrgent;
0844 p->socket = s;
0845
0846
0847 if (debug_)
0848 logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
0849
0850
0851 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
0852 unlock();
0853
0854
0855 return false;
0856 }
0857
0858
0859
0860
0861
0862
0863
0864 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
0865
0866 try {
0867 unsigned char buf[1024];
0868 while ((ev->source->read(buf, sizeof(buf))))
0869 ;
0870 } catch (Error &e) {
0871 auto *next = dynamic_cast<SystemError *>(e.next());
0872 if (next && next->portable() == SysErr::ErrTryAgain)
0873 ;
0874 else
0875 logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
0876 }
0877
0878
0879 flush_ = true;
0880
0881
0882 return false;
0883 }
0884
0885
0886
0887 void DQMNet::updateMask(Peer *p) {
0888 if (!p->socket)
0889 return;
0890
0891
0892 unsigned oldmask = p->mask;
0893 if (!p->sendq && (p->mask & IOWrite))
0894 sel_.setMask(p->socket, p->mask &= ~IOWrite);
0895
0896 if (p->sendq && !(p->mask & IOWrite))
0897 sel_.setMask(p->socket, p->mask |= IOWrite);
0898
0899 if (debug_ && oldmask != p->mask)
0900 logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
0901
0902
0903
0904 if (p->mask == IOUrgent && !p->waiting) {
0905 assert(!p->sendq);
0906 if (debug_)
0907 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
0908 losePeer(nullptr, p, nullptr);
0909 }
0910 }
0911
0912
0913 DQMNet::DQMNet(const std::string &appname )
0914 : debug_(false),
0915 appname_(appname.empty() ? "DQMNet" : appname.c_str()),
0916 pid_(getpid()),
0917 server_(nullptr),
0918 version_(Time::current()),
0919 communicate_((pthread_t)-1),
0920 shutdown_(0),
0921 delay_(1000),
0922 waitStale_(0, 0, 0, 0, 500000000 ),
0923 waitMax_(0, 0, 0, 5 , 0),
0924 flush_(false) {
0925
0926
0927
0928 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
0929 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
0930
0931
0932 upstream_.peer = downstream_.peer = nullptr;
0933 upstream_.next = downstream_.next = 0;
0934 upstream_.port = downstream_.port = 0;
0935 upstream_.update = downstream_.update = false;
0936 }
0937
0938 DQMNet::~DQMNet() {
0939
0940 }
0941
0942
0943
0944 void DQMNet::debug(bool doit) { debug_ = doit; }
0945
0946
0947
0948 void DQMNet::delay(int delay) { delay_ = delay; }
0949
0950
0951
0952
0953
0954 void DQMNet::staleObjectWaitLimit(lat::TimeSpan time) { waitStale_ = time; }
0955
0956
0957
0958
0959 void DQMNet::startLocalServer(int port) {
0960 if (server_) {
0961 logme() << "ERROR: DQM server was already started.\n";
0962 return;
0963 }
0964
0965 try {
0966 InetAddress addr("0.0.0.0", port);
0967 auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
0968 s->bind(addr);
0969 s->listen(10);
0970 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0971 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0972 s->setBlocking(false);
0973 sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0974 } catch (Error &e) {
0975
0976
0977 logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
0978
0979 throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
0980
0981 port << ": " << e.explain().c_str();
0982 }
0983
0984 logme() << "INFO: DQM server started at port " << port << std::endl;
0985 }
0986
0987
0988
0989
0990 void DQMNet::startLocalServer(const char *path) {
0991 if (server_) {
0992 logme() << "ERROR: DQM server was already started.\n";
0993 return;
0994 }
0995
0996 try {
0997 server_ = new LocalServerSocket(path, 10);
0998 server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0999 server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1000 server_->setBlocking(false);
1001 sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1002 } catch (Error &e) {
1003
1004
1005 logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1006
1007 throw cms::Exception("DQMNet::startLocalServer")
1008 << "Failed to start server at path " << path << ": " << e.explain().c_str();
1009 }
1010
1011 logme() << "INFO: DQM server started at path " << path << std::endl;
1012 }
1013
1014
1015
1016
1017 void DQMNet::updateToCollector(const std::string &host, int port) {
1018 if (!downstream_.host.empty()) {
1019 logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1020 << std::endl;
1021 return;
1022 }
1023
1024 downstream_.update = true;
1025 downstream_.host = host;
1026 downstream_.port = port;
1027 }
1028
1029
1030
1031
1032 void DQMNet::listenToCollector(const std::string &host, int port) {
1033 if (!upstream_.host.empty()) {
1034 logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1035 << std::endl;
1036 return;
1037 }
1038
1039 upstream_.update = false;
1040 upstream_.host = host;
1041 upstream_.port = port;
1042 }
1043
1044
1045 void DQMNet::shutdown() {
1046 shutdown_ = 1;
1047 if (communicate_ != (pthread_t)-1)
1048 pthread_join(communicate_, nullptr);
1049 }
1050
1051
1052
1053
1054
1055
1056 static void *communicate(void *obj) {
1057 sigset_t sigs;
1058 sigfillset(&sigs);
1059 pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1060 ((DQMNet *)obj)->run();
1061 return nullptr;
1062 }
1063
1064
1065 void DQMNet::lock() {
1066 if (communicate_ != (pthread_t)-1)
1067 pthread_mutex_lock(&lock_);
1068 }
1069
1070
1071 void DQMNet::unlock() {
1072 if (communicate_ != (pthread_t)-1)
1073 pthread_mutex_unlock(&lock_);
1074 }
1075
1076
1077
1078
1079 void DQMNet::start() {
1080 if (communicate_ != (pthread_t)-1) {
1081 logme() << "ERROR: DQM networking thread has already been started\n";
1082 return;
1083 }
1084
1085 pthread_mutex_init(&lock_, nullptr);
1086 pthread_create(&communicate_, nullptr, &communicate, this);
1087 }
1088
1089
1090 void DQMNet::run() {
1091 Time now;
1092 Time nextFlush = 0;
1093 AutoPeer *automatic[2] = {&upstream_, &downstream_};
1094
1095
1096 while (!shouldStop()) {
1097 for (auto ap : automatic) {
1098
1099
1100
1101 if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1102 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
1103 InetSocket *s = nullptr;
1104 try {
1105 InetAddress addr(ap->host.c_str(), ap->port);
1106 s = new InetSocket(SOCK_STREAM, 0, addr.family());
1107 s->setBlocking(false);
1108 s->connect(addr);
1109 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1110 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1111 } catch (Error &e) {
1112 auto *sys = dynamic_cast<SystemError *>(e.next());
1113 if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1114
1115
1116
1117 if (s)
1118 s->abort();
1119 delete s;
1120 s = nullptr;
1121 }
1122 }
1123
1124
1125
1126 if (s) {
1127 Peer *p = createPeer(s);
1128 ap->peer = p;
1129
1130 InetAddress peeraddr = ((InetSocket *)s)->peername();
1131 InetAddress myaddr = ((InetSocket *)s)->sockname();
1132 p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
1133 p->mask = IORead | IOWrite | IOUrgent;
1134 p->update = ap->update;
1135 p->automatic = ap;
1136 p->socket = s;
1137 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1138 if (ap == &upstream_) {
1139 uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1140 p->sendq = new Bucket;
1141 p->sendq->next = nullptr;
1142 copydata(p->sendq, words, sizeof(words));
1143 }
1144
1145
1146 if (debug_)
1147 logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1148 << std::endl;
1149 }
1150 }
1151 }
1152
1153
1154 sel_.dispatch(delay_);
1155 now = Time::current();
1156 lock();
1157
1158
1159
1160 if (flush_ && now > nextFlush) {
1161 flush_ = false;
1162 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
1163 sendObjectListToPeers(true);
1164 }
1165
1166
1167
1168
1169
1170
1171 updatePeerMasks();
1172
1173
1174 Time waitold = now - waitMax_;
1175 Time waitstale = now - waitStale_;
1176 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1177 Object *o = findObject(nullptr, i->name);
1178
1179
1180
1181 if (i->time < waitold) {
1182 logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1183 << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1184 releaseFromWait(i++, o);
1185 } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1186 logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1187 << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1188 releaseFromWait(i++, o);
1189 }
1190
1191
1192 else
1193 ++i;
1194 }
1195
1196 unlock();
1197 }
1198 }
1199
1200
1201
1202 void DQMNet::sendLocalChanges() {
1203 char byte = 0;
1204 wakeup_.sink()->write(&byte, 1);
1205 }
1206
1207
1208
1209
1210 DQMBasicNet::DQMBasicNet(const std::string &appname ) : DQMImplNet<DQMNet::Object>(appname) {
1211 local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1212 }
1213
1214
1215 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.reserve(size); }
1216
1217
1218
1219 void DQMBasicNet::updateLocalObject(Object &o) {
1220 o.dirname = *local_->dirs.insert(o.dirname).first;
1221 std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1222 if (!info.second) {
1223
1224
1225
1226 auto &old = const_cast<Object &>(*info.first);
1227 std::swap(old.flags, o.flags);
1228 std::swap(old.tag, o.tag);
1229 std::swap(old.version, o.version);
1230 std::swap(old.qreports, o.qreports);
1231 std::swap(old.rawdata, o.rawdata);
1232 std::swap(old.scalar, o.scalar);
1233 std::swap(old.qdata, o.qdata);
1234 }
1235 }
1236
1237
1238
1239
1240 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1241 size_t removed = 0;
1242 std::string path;
1243 ObjectMap::iterator i, e;
1244 for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1245 path.clear();
1246 path.reserve(i->dirname.size() + i->objname.size() + 2);
1247 path += i->dirname;
1248 if (!path.empty())
1249 path += '/';
1250 path += i->objname;
1251
1252 if (!known.count(path))
1253 ++removed, local_->objs.erase(i++);
1254 else
1255 ++i;
1256 }
1257
1258 return removed > 0;
1259 }