File indexing completed on 2024-04-06 12:10:10
0001 #include "DQMServices/Core/interface/DQMNet.h"
0002 #include "classlib/iobase/InetServerSocket.h"
0003 #include "classlib/iobase/LocalServerSocket.h"
0004 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
0005 #include "classlib/utils/SystemError.h"
0006 #include "classlib/utils/Regexp.h"
0007 #include <fmt/format.h>
0008 #include <unistd.h>
0009 #include <fcntl.h>
0010 #include <sys/wait.h>
0011 #include <cstdio>
0012 #include <cstdint>
0013 #include <iostream>
0014 #include <sstream>
0015 #include <cassert>
0016 #include <cfloat>
0017 #include <cinttypes>
0018
0019 #include "FWCore/Utilities/interface/EDMException.h"
0020
0021 #if __APPLE__
0022 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
0023 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
0024 #else
0025 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
0026 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
0027 #endif
0028 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
0029 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
0030
0031 using namespace lat;
0032
0033 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
0034
0035
0036 namespace dqm {
0037 namespace qstatus {
0038 static const int STATUS_OK = 100;
0039 static const int WARNING = 200;
0040 static const int ERROR = 300;
0041 }
0042 }
0043
0044
0045
0046 std::ostream &DQMNet::logme() {
0047 Time now = Time::current();
0048 return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
0049 << "]: ";
0050 }
0051
0052
0053 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
0054 b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
0055 }
0056
0057
0058 void DQMNet::discard(Bucket *&b) {
0059 while (b) {
0060 Bucket *next = b->next;
0061 delete b;
0062 b = next;
0063 }
0064 }
0065
0066
0067
0068
0069
0070 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
0071 if (reason)
0072 logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
0073
0074 Socket *s = peer->socket;
0075
0076 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0077 if (i->peer == peer)
0078 waiting_.erase(i++);
0079 else
0080 ++i;
0081
0082 if (ev)
0083 ev->source = nullptr;
0084
0085 discard(peer->sendq);
0086 if (peer->automatic)
0087 peer->automatic->peer = nullptr;
0088
0089 sel_.detach(s);
0090 s->close();
0091 removePeer(peer, s);
0092 delete s;
0093 }
0094
0095
0096 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
0097
0098 Bucket **msg = &p->sendq;
0099 while (*msg)
0100 msg = &(*msg)->next;
0101 *msg = new Bucket;
0102 (*msg)->next = nullptr;
0103
0104 uint32_t words[3];
0105 words[0] = sizeof(words) + len;
0106 words[1] = DQM_MSG_GET_OBJECT;
0107 words[2] = len;
0108 copydata(*msg, words, sizeof(words));
0109 copydata(*msg, name, len);
0110 }
0111
0112
0113
0114 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
0115
0116
0117
0118
0119
0120 requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
0121 WaitObject wo = {Time::current(), name, info, p};
0122 waiting_.push_back(wo);
0123 p->waiting++;
0124 }
0125
0126
0127
0128 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
0129 Bucket **msg = &i->peer->sendq;
0130 while (*msg)
0131 msg = &(*msg)->next;
0132 *msg = new Bucket;
0133 (*msg)->next = nullptr;
0134
0135 releaseFromWait(*msg, *i, o);
0136
0137 assert(i->peer->waiting > 0);
0138 i->peer->waiting--;
0139 waiting_.erase(i);
0140 }
0141
0142
0143 void DQMNet::releaseWaiters(const std::string &name, Object *o) {
0144 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0145 if (i->name == name)
0146 releaseFromWait(i++, o);
0147 else
0148 ++i;
0149 }
0150
0151
0152
0153
0154 void DQMNet::packQualityData(std::string &into, const QReports &qr) {
0155 char buf[64];
0156 std::ostringstream qrs;
0157 QReports::const_iterator qi, qe;
0158 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
0159 int pos = 0;
0160 sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
0161 qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
0162 << '\0';
0163 }
0164 into = qrs.str();
0165 }
0166
0167
0168
0169 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
0170 const char *qdata = from;
0171
0172
0173 size_t nqv = 0;
0174 while (*qdata) {
0175 ++nqv;
0176 while (*qdata)
0177 ++qdata;
0178 ++qdata;
0179 while (*qdata)
0180 ++qdata;
0181 ++qdata;
0182 while (*qdata)
0183 ++qdata;
0184 ++qdata;
0185 while (*qdata)
0186 ++qdata;
0187 ++qdata;
0188 while (*qdata)
0189 ++qdata;
0190 ++qdata;
0191 }
0192
0193
0194 qdata = from;
0195 qr.reserve(nqv);
0196 while (*qdata) {
0197 qr.emplace_back();
0198 DQMNet::QValue &qv = qr.back();
0199
0200 qv.code = atoi(qdata);
0201 while (*qdata)
0202 ++qdata;
0203 switch (qv.code) {
0204 case dqm::qstatus::STATUS_OK:
0205 break;
0206 case dqm::qstatus::WARNING:
0207 flags |= DQMNet::DQM_PROP_REPORT_WARN;
0208 break;
0209 case dqm::qstatus::ERROR:
0210 flags |= DQMNet::DQM_PROP_REPORT_ERROR;
0211 break;
0212 default:
0213 flags |= DQMNet::DQM_PROP_REPORT_OTHER;
0214 break;
0215 }
0216
0217 qv.qtresult = atof(++qdata);
0218 while (*qdata)
0219 ++qdata;
0220
0221 qv.qtname = ++qdata;
0222 while (*qdata)
0223 ++qdata;
0224
0225 qv.algorithm = ++qdata;
0226 while (*qdata)
0227 ++qdata;
0228
0229 qv.message = ++qdata;
0230 while (*qdata)
0231 ++qdata;
0232 ++qdata;
0233 }
0234 }
0235
0236 #if 0
0237
0238 static TObject *
0239 extractNextObject(TBufferFile &buf)
0240 {
0241 if (buf.Length() == buf.BufferSize())
0242 return 0;
0243
0244 buf.InitMap();
0245 Int_t pos = buf.Length();
0246 TClass *c = buf.ReadClass();
0247 buf.SetBufferOffset(pos);
0248 buf.ResetMap();
0249 return c ? buf.ReadObject(c) : 0;
0250 }
0251
0252
0253 bool
0254 DQMNet::reconstructObject(Object &o)
0255 {
0256 TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
0257 buf.Reset();
0258
0259
0260 if (! (o.object = extractNextObject(buf)))
0261 return false;
0262
0263
0264 o.reference = extractNextObject(buf);
0265
0266
0267 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
0268 return true;
0269 }
0270 #endif
0271
0272 #if 0
0273 bool
0274 DQMNet::reinstateObject(DQMStore *store, Object &o)
0275 {
0276 if (! reconstructObject (o))
0277 return false;
0278
0279
0280 MonitorElement *obj = 0;
0281 store->setCurrentFolder(o.dirname);
0282 switch (o.flags & DQM_PROP_TYPE_MASK)
0283 {
0284 case DQM_PROP_TYPE_INT:
0285 obj = store->bookInt(o.objname);
0286 obj->Fill(atoll(o.scalar.c_str()));
0287 break;
0288
0289 case DQM_PROP_TYPE_REAL:
0290 obj = store->bookFloat(name);
0291 obj->Fill(atof(o.scalar.c_str()));
0292 break;
0293
0294 case DQM_PROP_TYPE_STRING:
0295 obj = store->bookString(name, o.scalar);
0296 break;
0297
0298 case DQM_PROP_TYPE_TH1F:
0299 obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
0300 break;
0301
0302 case DQM_PROP_TYPE_TH1S:
0303 obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
0304 break;
0305
0306 case DQM_PROP_TYPE_TH1D:
0307 obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
0308 break;
0309
0310 case DQM_PROP_TYPE_TH1I:
0311 obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
0312 break;
0313
0314 case DQM_PROP_TYPE_TH2F:
0315 obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
0316 break;
0317
0318 case DQM_PROP_TYPE_TH2S:
0319 obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
0320 break;
0321
0322 case DQM_PROP_TYPE_TH2D:
0323 obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
0324 break;
0325
0326 case DQM_PROP_TYPE_TH2I:
0327 obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
0328 break;
0329
0330 case DQM_PROP_TYPE_TH3F:
0331 obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
0332 break;
0333
0334 case DQM_PROP_TYPE_TH3S:
0335 obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
0336 break;
0337
0338 case DQM_PROP_TYPE_TH3D:
0339 obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
0340 break;
0341
0342 case DQM_PROP_TYPE_PROF:
0343 obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
0344 break;
0345
0346 case DQM_PROP_TYPE_PROF2D:
0347 obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
0348 break;
0349
0350 default:
0351 logme()
0352 << "ERROR: unexpected monitor element of type "
0353 << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
0354 << o.dirname << '/' << o.objname << "'\n";
0355 return false;
0356 }
0357
0358
0359 if (obj)
0360 {
0361 obj->data_.tag = o.tag;
0362 obj->data_.qreports = o.qreports;
0363 }
0364
0365
0366 return true;
0367 }
0368 #endif
0369
0370
0371
0372 bool DQMNet::shouldStop() { return shutdown_; }
0373
0374
0375
0376 void DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o) {
0377 if (o)
0378 sendObjectToPeer(msg, *o, true);
0379 else {
0380 uint32_t words[3];
0381 words[0] = sizeof(words) + w.name.size();
0382 words[1] = DQM_REPLY_NONE;
0383 words[2] = w.name.size();
0384
0385 msg->data.reserve(msg->data.size() + words[0]);
0386 copydata(msg, &words[0], sizeof(words));
0387 copydata(msg, &w.name[0], w.name.size());
0388 }
0389 }
0390
0391
0392
0393
0394 void DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data) {
0395 uint32_t flags = o.flags & ~DQM_PROP_DEAD;
0396 DataBlob objdata;
0397
0398 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
0399 objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
0400 else if (data)
0401 objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
0402
0403 uint32_t words[9];
0404 uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
0405 uint32_t datalen = objdata.size();
0406 uint32_t qlen = o.qdata.size();
0407
0408 if (o.dirname.empty())
0409 --namelen;
0410
0411 words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
0412 words[1] = DQM_REPLY_OBJECT;
0413 words[2] = flags;
0414 words[3] = (o.version >> 0) & 0xffffffff;
0415 words[4] = (o.version >> 32) & 0xffffffff;
0416 words[5] = o.tag;
0417 words[6] = namelen;
0418 words[7] = datalen;
0419 words[8] = qlen;
0420
0421 msg->data.reserve(msg->data.size() + words[0]);
0422 copydata(msg, &words[0], 9 * sizeof(uint32_t));
0423 if (namelen) {
0424 copydata(msg, &(o.dirname)[0], o.dirname.size());
0425 if (!o.dirname.empty())
0426 copydata(msg, "/", 1);
0427 copydata(msg, &o.objname[0], o.objname.size());
0428 }
0429 if (datalen)
0430 copydata(msg, &objdata[0], datalen);
0431 if (qlen)
0432 copydata(msg, &o.qdata[0], qlen);
0433 }
0434
0435
0436
0437 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
0438
0439 uint32_t type;
0440 memcpy(&type, data + sizeof(uint32_t), sizeof(type));
0441 switch (type) {
0442 case DQM_MSG_UPDATE_ME: {
0443 if (len != 2 * sizeof(uint32_t)) {
0444 logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
0445 return false;
0446 }
0447
0448 if (debug_)
0449 logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
0450
0451 p->update = true;
0452 }
0453 return true;
0454
0455 case DQM_MSG_LIST_OBJECTS: {
0456 if (debug_)
0457 logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
0458
0459
0460 sendObjectListToPeer(msg, true, false);
0461 }
0462 return true;
0463
0464 case DQM_MSG_GET_OBJECT: {
0465 if (debug_)
0466 logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
0467
0468 if (len < 3 * sizeof(uint32_t)) {
0469 logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0470 return false;
0471 }
0472
0473 uint32_t namelen;
0474 memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
0475 if (len != 3 * sizeof(uint32_t) + namelen) {
0476 logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
0477 << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
0478 return false;
0479 }
0480
0481 std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
0482 Peer *owner = nullptr;
0483 Object *o = findObject(nullptr, name, &owner);
0484 if (o) {
0485 o->lastreq = Time::current().ns();
0486 if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
0487 (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0488 waitForData(p, name, "", owner);
0489 else
0490 sendObjectToPeer(msg, *o, true);
0491 } else {
0492 uint32_t words[3];
0493 words[0] = sizeof(words) + name.size();
0494 words[1] = DQM_REPLY_NONE;
0495 words[2] = name.size();
0496
0497 msg->data.reserve(msg->data.size() + words[0]);
0498 copydata(msg, &words[0], sizeof(words));
0499 copydata(msg, &name[0], name.size());
0500 }
0501 }
0502 return true;
0503
0504 case DQM_REPLY_LIST_BEGIN: {
0505 if (len != 4 * sizeof(uint32_t)) {
0506 logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
0507 return false;
0508 }
0509
0510
0511 uint32_t flags;
0512 memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0513
0514 if (debug_)
0515 logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
0516 << p->peeraddr << ", size " << len << std::endl;
0517
0518
0519
0520
0521
0522
0523
0524 if (flags)
0525 markObjectsDead(p);
0526 }
0527 return true;
0528
0529 case DQM_REPLY_LIST_END: {
0530 if (len != 4 * sizeof(uint32_t)) {
0531 logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
0532 return false;
0533 }
0534
0535
0536 uint32_t flags;
0537 memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0538
0539
0540
0541
0542
0543 if (flags)
0544 purgeDeadObjects(p);
0545
0546 if (debug_)
0547 logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
0548 << ", size " << len << std::endl;
0549
0550
0551
0552 flush_ = true;
0553 p->updates++;
0554 }
0555 return true;
0556
0557 case DQM_REPLY_OBJECT: {
0558 uint32_t words[9];
0559 if (len < sizeof(words)) {
0560 logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
0561 return false;
0562 }
0563
0564 memcpy(&words[0], data, sizeof(words));
0565 uint32_t &namelen = words[6];
0566 uint32_t &datalen = words[7];
0567 uint32_t &qlen = words[8];
0568
0569 if (len != sizeof(words) + namelen + datalen + qlen) {
0570 logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
0571 << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
0572 << std::endl;
0573 return false;
0574 }
0575
0576 unsigned char *namedata = data + sizeof(words);
0577 unsigned char *objdata = namedata + namelen;
0578 unsigned char *qdata = objdata + datalen;
0579 unsigned char *enddata = qdata + qlen;
0580 std::string name((char *)namedata, namelen);
0581 assert(enddata == data + len);
0582
0583 if (debug_)
0584 logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
0585 << std::endl;
0586
0587
0588 p->source = true;
0589
0590
0591 Object *o = findObject(p, name);
0592 if (!o)
0593 o = makeObject(p, name);
0594
0595 o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
0596 o->tag = words[5];
0597 o->version = ((uint64_t)words[4] << 32 | words[3]);
0598 o->scalar.clear();
0599 o->qdata.clear();
0600 if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
0601 o->rawdata.clear();
0602 o->scalar.insert(o->scalar.end(), objdata, qdata);
0603 } else if (datalen) {
0604 o->rawdata.clear();
0605 o->rawdata.insert(o->rawdata.end(), objdata, qdata);
0606 } else if (!o->rawdata.empty())
0607 o->flags |= DQM_PROP_STALE;
0608 o->qdata.insert(o->qdata.end(), qdata, enddata);
0609
0610
0611
0612 if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0613 requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
0614
0615
0616 if (datalen)
0617 releaseWaiters(name, o);
0618 }
0619 return true;
0620
0621 case DQM_REPLY_NONE: {
0622 uint32_t words[3];
0623 if (len < sizeof(words)) {
0624 logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0625 return false;
0626 }
0627
0628 memcpy(&words[0], data, sizeof(words));
0629 uint32_t &namelen = words[2];
0630
0631 if (len != sizeof(words) + namelen) {
0632 logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
0633 << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
0634 return false;
0635 }
0636
0637 unsigned char *namedata = data + sizeof(words);
0638 std::string name((char *)namedata, namelen);
0639
0640 if (debug_)
0641 logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
0642 << std::endl;
0643
0644
0645 p->source = true;
0646
0647
0648 if (Object *o = findObject(p, name)) {
0649 o->flags |= DQM_PROP_DEAD;
0650 purgeDeadObjects(p);
0651 }
0652
0653
0654 releaseWaiters(name, nullptr);
0655 }
0656 return true;
0657
0658 default:
0659 logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
0660 << std::endl;
0661 return false;
0662 }
0663 }
0664
0665
0666
0667 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
0668 lock();
0669 assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
0670
0671
0672
0673
0674
0675 if (ev->events & IOUrgent) {
0676 if (p->automatic) {
0677 logme() << "WARNING: connection to the DQM server at " << p->peeraddr
0678 << " lost (will attempt to reconnect in 15 seconds)\n";
0679 losePeer(nullptr, p, ev);
0680 } else
0681 losePeer("WARNING: lost peer connection ", p, ev);
0682
0683 unlock();
0684 return true;
0685 }
0686
0687
0688 if (ev->events & IOWrite) {
0689 while (Bucket *b = p->sendq) {
0690 IOSize len = b->data.size() - p->sendpos;
0691 const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
0692 IOSize done;
0693
0694 try {
0695 done = (len ? ev->source->write(data, len) : 0);
0696 if (debug_ && len)
0697 logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
0698 } catch (Error &e) {
0699 losePeer("WARNING: unable to write to peer ", p, ev, &e);
0700 unlock();
0701 return true;
0702 }
0703
0704 p->sendpos += done;
0705 if (p->sendpos == b->data.size()) {
0706 Bucket *old = p->sendq;
0707 p->sendq = old->next;
0708 p->sendpos = 0;
0709 old->next = nullptr;
0710 discard(old);
0711 }
0712
0713 if (!done && len)
0714
0715 break;
0716 }
0717 }
0718
0719
0720
0721 if (ev->events & IORead) {
0722
0723
0724
0725 IOSize sz;
0726 try {
0727 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
0728 do
0729 if ((sz = ev->source->read(&buf[0], buf.size()))) {
0730 if (debug_)
0731 logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
0732 DataBlob &data = p->incoming;
0733 if (data.capacity() < data.size() + sz)
0734 data.reserve(data.size() + SOCKET_READ_GROWTH);
0735 data.insert(data.end(), &buf[0], &buf[0] + sz);
0736 }
0737 while (sz == sizeof(buf));
0738 } catch (Error &e) {
0739 auto *next = dynamic_cast<SystemError *>(e.next());
0740 if (next && next->portable() == SysErr::ErrTryAgain)
0741 sz = 1;
0742 else {
0743
0744 losePeer("WARNING: failed to read from peer ", p, ev, &e);
0745 unlock();
0746 return true;
0747 }
0748 }
0749
0750
0751 size_t consumed = 0;
0752 DataBlob &data = p->incoming;
0753 while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
0754 uint32_t msglen;
0755 memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
0756
0757 if (msglen >= MESSAGE_SIZE_LIMIT) {
0758 losePeer("WARNING: excessively large message from ", p, ev);
0759 unlock();
0760 return true;
0761 }
0762
0763 if (data.size() - consumed >= msglen) {
0764 bool valid = true;
0765 if (msglen < 2 * sizeof(uint32_t)) {
0766 logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
0767 valid = false;
0768 } else {
0769
0770 Bucket msg;
0771 msg.next = nullptr;
0772 valid = onMessage(&msg, p, &data[0] + consumed, msglen);
0773
0774
0775 if (!msg.data.empty()) {
0776 Bucket **prev = &p->sendq;
0777 while (*prev)
0778 prev = &(*prev)->next;
0779
0780 *prev = new Bucket;
0781 (*prev)->next = nullptr;
0782 (*prev)->data.swap(msg.data);
0783 }
0784 }
0785
0786 if (!valid) {
0787 losePeer("WARNING: data stream error with ", p, ev);
0788 unlock();
0789 return true;
0790 }
0791
0792 consumed += msglen;
0793 } else
0794 break;
0795 }
0796
0797 data.erase(data.begin(), data.begin() + consumed);
0798
0799
0800
0801
0802 if (sz == 0)
0803 sel_.setMask(p->socket, p->mask &= ~IORead);
0804 }
0805
0806
0807 unlock();
0808 return false;
0809 }
0810
0811
0812
0813
0814
0815 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
0816
0817 assert(ev->source == server_);
0818
0819
0820 Socket *s = server_->accept();
0821 assert(s);
0822 assert(!s->isBlocking());
0823
0824
0825 lock();
0826 Peer *p = createPeer(s);
0827 std::string localaddr;
0828 if (auto *inet = dynamic_cast<InetSocket *>(s)) {
0829 InetAddress peeraddr = inet->peername();
0830 InetAddress myaddr = inet->sockname();
0831 p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
0832 localaddr = fmt::format("{}:{}", myaddr.hostname(), myaddr.port());
0833 } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
0834 p->peeraddr = local->peername().path();
0835 localaddr = local->sockname().path();
0836 } else
0837 assert(false);
0838
0839 p->mask = IORead | IOUrgent;
0840 p->socket = s;
0841
0842
0843 if (debug_)
0844 logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
0845
0846
0847 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
0848 unlock();
0849
0850
0851 return false;
0852 }
0853
0854
0855
0856
0857
0858
0859
0860 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
0861
0862 try {
0863 IOSize sz;
0864 unsigned char buf[1024];
0865 while ((sz = ev->source->read(buf, sizeof(buf))))
0866 ;
0867 } catch (Error &e) {
0868 auto *next = dynamic_cast<SystemError *>(e.next());
0869 if (next && next->portable() == SysErr::ErrTryAgain)
0870 ;
0871 else
0872 logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
0873 }
0874
0875
0876 flush_ = true;
0877
0878
0879 return false;
0880 }
0881
0882
0883
0884 void DQMNet::updateMask(Peer *p) {
0885 if (!p->socket)
0886 return;
0887
0888
0889 unsigned oldmask = p->mask;
0890 if (!p->sendq && (p->mask & IOWrite))
0891 sel_.setMask(p->socket, p->mask &= ~IOWrite);
0892
0893 if (p->sendq && !(p->mask & IOWrite))
0894 sel_.setMask(p->socket, p->mask |= IOWrite);
0895
0896 if (debug_ && oldmask != p->mask)
0897 logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
0898
0899
0900
0901 if (p->mask == IOUrgent && !p->waiting) {
0902 assert(!p->sendq);
0903 if (debug_)
0904 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
0905 losePeer(nullptr, p, nullptr);
0906 }
0907 }
0908
0909
0910 DQMNet::DQMNet(const std::string &appname )
0911 : debug_(false),
0912 appname_(appname.empty() ? "DQMNet" : appname.c_str()),
0913 pid_(getpid()),
0914 server_(nullptr),
0915 version_(Time::current()),
0916 communicate_((pthread_t)-1),
0917 shutdown_(0),
0918 delay_(1000),
0919 waitStale_(0, 0, 0, 0, 500000000 ),
0920 waitMax_(0, 0, 0, 5 , 0),
0921 flush_(false) {
0922
0923
0924
0925 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
0926 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
0927
0928
0929 upstream_.peer = downstream_.peer = nullptr;
0930 upstream_.next = downstream_.next = 0;
0931 upstream_.port = downstream_.port = 0;
0932 upstream_.update = downstream_.update = false;
0933 }
0934
0935 DQMNet::~DQMNet() {
0936
0937 }
0938
0939
0940
0941 void DQMNet::debug(bool doit) { debug_ = doit; }
0942
0943
0944
0945 void DQMNet::delay(int delay) { delay_ = delay; }
0946
0947
0948
0949
0950
0951 void DQMNet::staleObjectWaitLimit(lat::TimeSpan time) { waitStale_ = time; }
0952
0953
0954
0955
0956 void DQMNet::startLocalServer(int port) {
0957 if (server_) {
0958 logme() << "ERROR: DQM server was already started.\n";
0959 return;
0960 }
0961
0962 try {
0963 InetAddress addr("0.0.0.0", port);
0964 auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
0965 s->bind(addr);
0966 s->listen(10);
0967 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0968 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0969 s->setBlocking(false);
0970 sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0971 } catch (Error &e) {
0972
0973
0974 logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
0975
0976 throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
0977
0978 port << ": " << e.explain().c_str();
0979 }
0980
0981 logme() << "INFO: DQM server started at port " << port << std::endl;
0982 }
0983
0984
0985
0986
0987 void DQMNet::startLocalServer(const char *path) {
0988 if (server_) {
0989 logme() << "ERROR: DQM server was already started.\n";
0990 return;
0991 }
0992
0993 try {
0994 server_ = new LocalServerSocket(path, 10);
0995 server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0996 server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0997 server_->setBlocking(false);
0998 sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0999 } catch (Error &e) {
1000
1001
1002 logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1003
1004 throw cms::Exception("DQMNet::startLocalServer")
1005 << "Failed to start server at path " << path << ": " << e.explain().c_str();
1006 }
1007
1008 logme() << "INFO: DQM server started at path " << path << std::endl;
1009 }
1010
1011
1012
1013
1014 void DQMNet::updateToCollector(const std::string &host, int port) {
1015 if (!downstream_.host.empty()) {
1016 logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1017 << std::endl;
1018 return;
1019 }
1020
1021 downstream_.update = true;
1022 downstream_.host = host;
1023 downstream_.port = port;
1024 }
1025
1026
1027
1028
1029 void DQMNet::listenToCollector(const std::string &host, int port) {
1030 if (!upstream_.host.empty()) {
1031 logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1032 << std::endl;
1033 return;
1034 }
1035
1036 upstream_.update = false;
1037 upstream_.host = host;
1038 upstream_.port = port;
1039 }
1040
1041
1042 void DQMNet::shutdown() {
1043 shutdown_ = 1;
1044 if (communicate_ != (pthread_t)-1)
1045 pthread_join(communicate_, nullptr);
1046 }
1047
1048
1049
1050
1051
1052
1053 static void *communicate(void *obj) {
1054 sigset_t sigs;
1055 sigfillset(&sigs);
1056 pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1057 ((DQMNet *)obj)->run();
1058 return nullptr;
1059 }
1060
1061
1062 void DQMNet::lock() {
1063 if (communicate_ != (pthread_t)-1)
1064 pthread_mutex_lock(&lock_);
1065 }
1066
1067
1068 void DQMNet::unlock() {
1069 if (communicate_ != (pthread_t)-1)
1070 pthread_mutex_unlock(&lock_);
1071 }
1072
1073
1074
1075
1076 void DQMNet::start() {
1077 if (communicate_ != (pthread_t)-1) {
1078 logme() << "ERROR: DQM networking thread has already been started\n";
1079 return;
1080 }
1081
1082 pthread_mutex_init(&lock_, nullptr);
1083 pthread_create(&communicate_, nullptr, &communicate, this);
1084 }
1085
1086
1087 void DQMNet::run() {
1088 Time now;
1089 Time nextFlush = 0;
1090 AutoPeer *automatic[2] = {&upstream_, &downstream_};
1091
1092
1093 while (!shouldStop()) {
1094 for (auto ap : automatic) {
1095
1096
1097
1098 if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1099 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
1100 InetSocket *s = nullptr;
1101 try {
1102 InetAddress addr(ap->host.c_str(), ap->port);
1103 s = new InetSocket(SOCK_STREAM, 0, addr.family());
1104 s->setBlocking(false);
1105 s->connect(addr);
1106 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1107 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1108 } catch (Error &e) {
1109 auto *sys = dynamic_cast<SystemError *>(e.next());
1110 if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1111
1112
1113
1114 if (s)
1115 s->abort();
1116 delete s;
1117 s = nullptr;
1118 }
1119 }
1120
1121
1122
1123 if (s) {
1124 Peer *p = createPeer(s);
1125 ap->peer = p;
1126
1127 InetAddress peeraddr = ((InetSocket *)s)->peername();
1128 InetAddress myaddr = ((InetSocket *)s)->sockname();
1129 p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
1130 p->mask = IORead | IOWrite | IOUrgent;
1131 p->update = ap->update;
1132 p->automatic = ap;
1133 p->socket = s;
1134 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1135 if (ap == &upstream_) {
1136 uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1137 p->sendq = new Bucket;
1138 p->sendq->next = nullptr;
1139 copydata(p->sendq, words, sizeof(words));
1140 }
1141
1142
1143 if (debug_)
1144 logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1145 << std::endl;
1146 }
1147 }
1148 }
1149
1150
1151 sel_.dispatch(delay_);
1152 now = Time::current();
1153 lock();
1154
1155
1156
1157 if (flush_ && now > nextFlush) {
1158 flush_ = false;
1159 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
1160 sendObjectListToPeers(true);
1161 }
1162
1163
1164
1165
1166
1167
1168 updatePeerMasks();
1169
1170
1171 Time waitold = now - waitMax_;
1172 Time waitstale = now - waitStale_;
1173 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1174 Object *o = findObject(nullptr, i->name);
1175
1176
1177
1178 if (i->time < waitold) {
1179 logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1180 << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1181 releaseFromWait(i++, o);
1182 } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1183 logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1184 << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1185 releaseFromWait(i++, o);
1186 }
1187
1188
1189 else
1190 ++i;
1191 }
1192
1193 unlock();
1194 }
1195 }
1196
1197
1198
1199 void DQMNet::sendLocalChanges() {
1200 char byte = 0;
1201 wakeup_.sink()->write(&byte, 1);
1202 }
1203
1204
1205
1206
1207 DQMBasicNet::DQMBasicNet(const std::string &appname ) : DQMImplNet<DQMNet::Object>(appname) {
1208 local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1209 }
1210
1211
1212 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.reserve(size); }
1213
1214
1215
1216 void DQMBasicNet::updateLocalObject(Object &o) {
1217 o.dirname = *local_->dirs.insert(o.dirname).first;
1218 std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1219 if (!info.second) {
1220
1221
1222
1223 auto &old = const_cast<Object &>(*info.first);
1224 std::swap(old.flags, o.flags);
1225 std::swap(old.tag, o.tag);
1226 std::swap(old.version, o.version);
1227 std::swap(old.qreports, o.qreports);
1228 std::swap(old.rawdata, o.rawdata);
1229 std::swap(old.scalar, o.scalar);
1230 std::swap(old.qdata, o.qdata);
1231 }
1232 }
1233
1234
1235
1236
1237 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1238 size_t removed = 0;
1239 std::string path;
1240 ObjectMap::iterator i, e;
1241 for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1242 path.clear();
1243 path.reserve(i->dirname.size() + i->objname.size() + 2);
1244 path += i->dirname;
1245 if (!path.empty())
1246 path += '/';
1247 path += i->objname;
1248
1249 if (!known.count(path))
1250 ++removed, local_->objs.erase(i++);
1251 else
1252 ++i;
1253 }
1254
1255 return removed > 0;
1256 }