File indexing completed on 2023-03-17 10:59:00
0001 #include "DQMServices/Core/interface/DQMNet.h"
0002 #include "classlib/iobase/InetServerSocket.h"
0003 #include "classlib/iobase/LocalServerSocket.h"
0004 #include "classlib/iobase/Filename.h"
0005 #include "classlib/sysapi/InetSocket.h" // for completing InetAddress
0006 #include "classlib/utils/TimeInfo.h"
0007 #include "classlib/utils/StringList.h"
0008 #include "classlib/utils/StringFormat.h"
0009 #include "classlib/utils/StringOps.h"
0010 #include "classlib/utils/SystemError.h"
0011 #include "classlib/utils/Regexp.h"
0012 #include <unistd.h>
0013 #include <fcntl.h>
0014 #include <sys/wait.h>
0015 #include <cstdio>
0016 #include <cstdint>
0017 #include <iostream>
0018 #include <sstream>
0019 #include <cassert>
0020 #include <cfloat>
0021 #include <cinttypes>
0022
0023 #include "FWCore/Utilities/interface/EDMException.h"
0024
0025 #if __APPLE__
0026 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
0027 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
0028 #else
0029 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
0030 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
0031 #endif
0032 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
0033 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
0034
0035 using namespace lat;
0036
0037 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
0038
0039
0040 namespace dqm {
0041 namespace qstatus {
0042 static const int STATUS_OK = 100;
0043 static const int WARNING = 200;
0044 static const int ERROR = 300;
0045 }
0046 }
0047
0048
0049
0050 std::ostream &DQMNet::logme() {
0051 Time now = Time::current();
0052 return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
0053 << "]: ";
0054 }
0055
0056
0057 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
0058 b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
0059 }
0060
0061
0062 void DQMNet::discard(Bucket *&b) {
0063 while (b) {
0064 Bucket *next = b->next;
0065 delete b;
0066 b = next;
0067 }
0068 }
0069
0070
0071
0072
0073
0074 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
0075 if (reason)
0076 logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
0077
0078 Socket *s = peer->socket;
0079
0080 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0081 if (i->peer == peer)
0082 waiting_.erase(i++);
0083 else
0084 ++i;
0085
0086 if (ev)
0087 ev->source = nullptr;
0088
0089 discard(peer->sendq);
0090 if (peer->automatic)
0091 peer->automatic->peer = nullptr;
0092
0093 sel_.detach(s);
0094 s->close();
0095 removePeer(peer, s);
0096 delete s;
0097 }
0098
0099
0100 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
0101
0102 Bucket **msg = &p->sendq;
0103 while (*msg)
0104 msg = &(*msg)->next;
0105 *msg = new Bucket;
0106 (*msg)->next = nullptr;
0107
0108 uint32_t words[3];
0109 words[0] = sizeof(words) + len;
0110 words[1] = DQM_MSG_GET_OBJECT;
0111 words[2] = len;
0112 copydata(*msg, words, sizeof(words));
0113 copydata(*msg, name, len);
0114 }
0115
0116
0117
0118 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
0119
0120
0121
0122
0123
0124 requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
0125 WaitObject wo = {Time::current(), name, info, p};
0126 waiting_.push_back(wo);
0127 p->waiting++;
0128 }
0129
0130
0131
0132 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
0133 Bucket **msg = &i->peer->sendq;
0134 while (*msg)
0135 msg = &(*msg)->next;
0136 *msg = new Bucket;
0137 (*msg)->next = nullptr;
0138
0139 releaseFromWait(*msg, *i, o);
0140
0141 assert(i->peer->waiting > 0);
0142 i->peer->waiting--;
0143 waiting_.erase(i);
0144 }
0145
0146
0147 void DQMNet::releaseWaiters(const std::string &name, Object *o) {
0148 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0149 if (i->name == name)
0150 releaseFromWait(i++, o);
0151 else
0152 ++i;
0153 }
0154
0155
0156
0157
0158 void DQMNet::packQualityData(std::string &into, const QReports &qr) {
0159 char buf[64];
0160 std::ostringstream qrs;
0161 QReports::const_iterator qi, qe;
0162 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
0163 int pos = 0;
0164 sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
0165 qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
0166 << '\0';
0167 }
0168 into = qrs.str();
0169 }
0170
0171
0172
0173 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
0174 const char *qdata = from;
0175
0176
0177 size_t nqv = 0;
0178 while (*qdata) {
0179 ++nqv;
0180 while (*qdata)
0181 ++qdata;
0182 ++qdata;
0183 while (*qdata)
0184 ++qdata;
0185 ++qdata;
0186 while (*qdata)
0187 ++qdata;
0188 ++qdata;
0189 while (*qdata)
0190 ++qdata;
0191 ++qdata;
0192 while (*qdata)
0193 ++qdata;
0194 ++qdata;
0195 }
0196
0197
0198 qdata = from;
0199 qr.reserve(nqv);
0200 while (*qdata) {
0201 qr.emplace_back();
0202 DQMNet::QValue &qv = qr.back();
0203
0204 qv.code = atoi(qdata);
0205 while (*qdata)
0206 ++qdata;
0207 switch (qv.code) {
0208 case dqm::qstatus::STATUS_OK:
0209 break;
0210 case dqm::qstatus::WARNING:
0211 flags |= DQMNet::DQM_PROP_REPORT_WARN;
0212 break;
0213 case dqm::qstatus::ERROR:
0214 flags |= DQMNet::DQM_PROP_REPORT_ERROR;
0215 break;
0216 default:
0217 flags |= DQMNet::DQM_PROP_REPORT_OTHER;
0218 break;
0219 }
0220
0221 qv.qtresult = atof(++qdata);
0222 while (*qdata)
0223 ++qdata;
0224
0225 qv.qtname = ++qdata;
0226 while (*qdata)
0227 ++qdata;
0228
0229 qv.algorithm = ++qdata;
0230 while (*qdata)
0231 ++qdata;
0232
0233 qv.message = ++qdata;
0234 while (*qdata)
0235 ++qdata;
0236 ++qdata;
0237 }
0238 }
0239
0240 #if 0
0241
0242 static TObject *
0243 extractNextObject(TBufferFile &buf)
0244 {
0245 if (buf.Length() == buf.BufferSize())
0246 return 0;
0247
0248 buf.InitMap();
0249 Int_t pos = buf.Length();
0250 TClass *c = buf.ReadClass();
0251 buf.SetBufferOffset(pos);
0252 buf.ResetMap();
0253 return c ? buf.ReadObject(c) : 0;
0254 }
0255
0256
0257 bool
0258 DQMNet::reconstructObject(Object &o)
0259 {
0260 TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
0261 buf.Reset();
0262
0263
0264 if (! (o.object = extractNextObject(buf)))
0265 return false;
0266
0267
0268 o.reference = extractNextObject(buf);
0269
0270
0271 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
0272 return true;
0273 }
0274 #endif
0275
0276 #if 0
0277 bool
0278 DQMNet::reinstateObject(DQMStore *store, Object &o)
0279 {
0280 if (! reconstructObject (o))
0281 return false;
0282
0283
0284 MonitorElement *obj = 0;
0285 store->setCurrentFolder(o.dirname);
0286 switch (o.flags & DQM_PROP_TYPE_MASK)
0287 {
0288 case DQM_PROP_TYPE_INT:
0289 obj = store->bookInt(o.objname);
0290 obj->Fill(atoll(o.scalar.c_str()));
0291 break;
0292
0293 case DQM_PROP_TYPE_REAL:
0294 obj = store->bookFloat(name);
0295 obj->Fill(atof(o.scalar.c_str()));
0296 break;
0297
0298 case DQM_PROP_TYPE_STRING:
0299 obj = store->bookString(name, o.scalar);
0300 break;
0301
0302 case DQM_PROP_TYPE_TH1F:
0303 obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
0304 break;
0305
0306 case DQM_PROP_TYPE_TH1S:
0307 obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
0308 break;
0309
0310 case DQM_PROP_TYPE_TH1D:
0311 obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
0312 break;
0313
0314 case DQM_PROP_TYPE_TH1I:
0315 obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
0316 break;
0317
0318 case DQM_PROP_TYPE_TH2F:
0319 obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
0320 break;
0321
0322 case DQM_PROP_TYPE_TH2S:
0323 obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
0324 break;
0325
0326 case DQM_PROP_TYPE_TH2D:
0327 obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
0328 break;
0329
0330 case DQM_PROP_TYPE_TH2I:
0331 obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
0332 break;
0333
0334 case DQM_PROP_TYPE_TH3F:
0335 obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
0336 break;
0337
0338 case DQM_PROP_TYPE_TH3S:
0339 obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
0340 break;
0341
0342 case DQM_PROP_TYPE_TH3D:
0343 obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
0344 break;
0345
0346 case DQM_PROP_TYPE_PROF:
0347 obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
0348 break;
0349
0350 case DQM_PROP_TYPE_PROF2D:
0351 obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
0352 break;
0353
0354 default:
0355 logme()
0356 << "ERROR: unexpected monitor element of type "
0357 << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
0358 << o.dirname << '/' << o.objname << "'\n";
0359 return false;
0360 }
0361
0362
0363 if (obj)
0364 {
0365 obj->data_.tag = o.tag;
0366 obj->data_.qreports = o.qreports;
0367 }
0368
0369
0370 return true;
0371 }
0372 #endif
0373
0374
0375
0376 bool DQMNet::shouldStop() { return shutdown_; }
0377
0378
0379
0380 void DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o) {
0381 if (o)
0382 sendObjectToPeer(msg, *o, true);
0383 else {
0384 uint32_t words[3];
0385 words[0] = sizeof(words) + w.name.size();
0386 words[1] = DQM_REPLY_NONE;
0387 words[2] = w.name.size();
0388
0389 msg->data.reserve(msg->data.size() + words[0]);
0390 copydata(msg, &words[0], sizeof(words));
0391 copydata(msg, &w.name[0], w.name.size());
0392 }
0393 }
0394
0395
0396
0397
0398 void DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data) {
0399 uint32_t flags = o.flags & ~DQM_PROP_DEAD;
0400 DataBlob objdata;
0401
0402 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
0403 objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
0404 else if (data)
0405 objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
0406
0407 uint32_t words[9];
0408 uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
0409 uint32_t datalen = objdata.size();
0410 uint32_t qlen = o.qdata.size();
0411
0412 if (o.dirname.empty())
0413 --namelen;
0414
0415 words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
0416 words[1] = DQM_REPLY_OBJECT;
0417 words[2] = flags;
0418 words[3] = (o.version >> 0) & 0xffffffff;
0419 words[4] = (o.version >> 32) & 0xffffffff;
0420 words[5] = o.tag;
0421 words[6] = namelen;
0422 words[7] = datalen;
0423 words[8] = qlen;
0424
0425 msg->data.reserve(msg->data.size() + words[0]);
0426 copydata(msg, &words[0], 9 * sizeof(uint32_t));
0427 if (namelen) {
0428 copydata(msg, &(o.dirname)[0], o.dirname.size());
0429 if (!o.dirname.empty())
0430 copydata(msg, "/", 1);
0431 copydata(msg, &o.objname[0], o.objname.size());
0432 }
0433 if (datalen)
0434 copydata(msg, &objdata[0], datalen);
0435 if (qlen)
0436 copydata(msg, &o.qdata[0], qlen);
0437 }
0438
0439
0440
0441 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
0442
0443 uint32_t type;
0444 memcpy(&type, data + sizeof(uint32_t), sizeof(type));
0445 switch (type) {
0446 case DQM_MSG_UPDATE_ME: {
0447 if (len != 2 * sizeof(uint32_t)) {
0448 logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
0449 return false;
0450 }
0451
0452 if (debug_)
0453 logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
0454
0455 p->update = true;
0456 }
0457 return true;
0458
0459 case DQM_MSG_LIST_OBJECTS: {
0460 if (debug_)
0461 logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
0462
0463
0464 sendObjectListToPeer(msg, true, false);
0465 }
0466 return true;
0467
0468 case DQM_MSG_GET_OBJECT: {
0469 if (debug_)
0470 logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
0471
0472 if (len < 3 * sizeof(uint32_t)) {
0473 logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0474 return false;
0475 }
0476
0477 uint32_t namelen;
0478 memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
0479 if (len != 3 * sizeof(uint32_t) + namelen) {
0480 logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
0481 << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
0482 return false;
0483 }
0484
0485 std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
0486 Peer *owner = nullptr;
0487 Object *o = findObject(nullptr, name, &owner);
0488 if (o) {
0489 o->lastreq = Time::current().ns();
0490 if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
0491 (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0492 waitForData(p, name, "", owner);
0493 else
0494 sendObjectToPeer(msg, *o, true);
0495 } else {
0496 uint32_t words[3];
0497 words[0] = sizeof(words) + name.size();
0498 words[1] = DQM_REPLY_NONE;
0499 words[2] = name.size();
0500
0501 msg->data.reserve(msg->data.size() + words[0]);
0502 copydata(msg, &words[0], sizeof(words));
0503 copydata(msg, &name[0], name.size());
0504 }
0505 }
0506 return true;
0507
0508 case DQM_REPLY_LIST_BEGIN: {
0509 if (len != 4 * sizeof(uint32_t)) {
0510 logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
0511 return false;
0512 }
0513
0514
0515 uint32_t flags;
0516 memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0517
0518 if (debug_)
0519 logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
0520 << p->peeraddr << ", size " << len << std::endl;
0521
0522
0523
0524
0525
0526
0527
0528 if (flags)
0529 markObjectsDead(p);
0530 }
0531 return true;
0532
0533 case DQM_REPLY_LIST_END: {
0534 if (len != 4 * sizeof(uint32_t)) {
0535 logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
0536 return false;
0537 }
0538
0539
0540 uint32_t flags;
0541 memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0542
0543
0544
0545
0546
0547 if (flags)
0548 purgeDeadObjects(p);
0549
0550 if (debug_)
0551 logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
0552 << ", size " << len << std::endl;
0553
0554
0555
0556 flush_ = true;
0557 p->updates++;
0558 }
0559 return true;
0560
0561 case DQM_REPLY_OBJECT: {
0562 uint32_t words[9];
0563 if (len < sizeof(words)) {
0564 logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
0565 return false;
0566 }
0567
0568 memcpy(&words[0], data, sizeof(words));
0569 uint32_t &namelen = words[6];
0570 uint32_t &datalen = words[7];
0571 uint32_t &qlen = words[8];
0572
0573 if (len != sizeof(words) + namelen + datalen + qlen) {
0574 logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
0575 << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
0576 << std::endl;
0577 return false;
0578 }
0579
0580 unsigned char *namedata = data + sizeof(words);
0581 unsigned char *objdata = namedata + namelen;
0582 unsigned char *qdata = objdata + datalen;
0583 unsigned char *enddata = qdata + qlen;
0584 std::string name((char *)namedata, namelen);
0585 assert(enddata == data + len);
0586
0587 if (debug_)
0588 logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
0589 << std::endl;
0590
0591
0592 p->source = true;
0593
0594
0595 Object *o = findObject(p, name);
0596 if (!o)
0597 o = makeObject(p, name);
0598
0599 o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
0600 o->tag = words[5];
0601 o->version = ((uint64_t)words[4] << 32 | words[3]);
0602 o->scalar.clear();
0603 o->qdata.clear();
0604 if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
0605 o->rawdata.clear();
0606 o->scalar.insert(o->scalar.end(), objdata, qdata);
0607 } else if (datalen) {
0608 o->rawdata.clear();
0609 o->rawdata.insert(o->rawdata.end(), objdata, qdata);
0610 } else if (!o->rawdata.empty())
0611 o->flags |= DQM_PROP_STALE;
0612 o->qdata.insert(o->qdata.end(), qdata, enddata);
0613
0614
0615
0616 if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0617 requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
0618
0619
0620 if (datalen)
0621 releaseWaiters(name, o);
0622 }
0623 return true;
0624
0625 case DQM_REPLY_NONE: {
0626 uint32_t words[3];
0627 if (len < sizeof(words)) {
0628 logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0629 return false;
0630 }
0631
0632 memcpy(&words[0], data, sizeof(words));
0633 uint32_t &namelen = words[2];
0634
0635 if (len != sizeof(words) + namelen) {
0636 logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
0637 << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
0638 return false;
0639 }
0640
0641 unsigned char *namedata = data + sizeof(words);
0642 std::string name((char *)namedata, namelen);
0643
0644 if (debug_)
0645 logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
0646 << std::endl;
0647
0648
0649 p->source = true;
0650
0651
0652 if (Object *o = findObject(p, name)) {
0653 o->flags |= DQM_PROP_DEAD;
0654 purgeDeadObjects(p);
0655 }
0656
0657
0658 releaseWaiters(name, nullptr);
0659 }
0660 return true;
0661
0662 default:
0663 logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
0664 << std::endl;
0665 return false;
0666 }
0667 }
0668
0669
0670
0671 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
0672 lock();
0673 assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
0674
0675
0676
0677
0678
0679 if (ev->events & IOUrgent) {
0680 if (p->automatic) {
0681 logme() << "WARNING: connection to the DQM server at " << p->peeraddr
0682 << " lost (will attempt to reconnect in 15 seconds)\n";
0683 losePeer(nullptr, p, ev);
0684 } else
0685 losePeer("WARNING: lost peer connection ", p, ev);
0686
0687 unlock();
0688 return true;
0689 }
0690
0691
0692 if (ev->events & IOWrite) {
0693 while (Bucket *b = p->sendq) {
0694 IOSize len = b->data.size() - p->sendpos;
0695 const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
0696 IOSize done;
0697
0698 try {
0699 done = (len ? ev->source->write(data, len) : 0);
0700 if (debug_ && len)
0701 logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
0702 } catch (Error &e) {
0703 losePeer("WARNING: unable to write to peer ", p, ev, &e);
0704 unlock();
0705 return true;
0706 }
0707
0708 p->sendpos += done;
0709 if (p->sendpos == b->data.size()) {
0710 Bucket *old = p->sendq;
0711 p->sendq = old->next;
0712 p->sendpos = 0;
0713 old->next = nullptr;
0714 discard(old);
0715 }
0716
0717 if (!done && len)
0718
0719 break;
0720 }
0721 }
0722
0723
0724
0725 if (ev->events & IORead) {
0726
0727
0728
0729 IOSize sz;
0730 try {
0731 std::vector<unsigned char> buf(SOCKET_READ_SIZE);
0732 do
0733 if ((sz = ev->source->read(&buf[0], buf.size()))) {
0734 if (debug_)
0735 logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
0736 DataBlob &data = p->incoming;
0737 if (data.capacity() < data.size() + sz)
0738 data.reserve(data.size() + SOCKET_READ_GROWTH);
0739 data.insert(data.end(), &buf[0], &buf[0] + sz);
0740 }
0741 while (sz == sizeof(buf));
0742 } catch (Error &e) {
0743 auto *next = dynamic_cast<SystemError *>(e.next());
0744 if (next && next->portable() == SysErr::ErrTryAgain)
0745 sz = 1;
0746 else {
0747
0748 losePeer("WARNING: failed to read from peer ", p, ev, &e);
0749 unlock();
0750 return true;
0751 }
0752 }
0753
0754
0755 size_t consumed = 0;
0756 DataBlob &data = p->incoming;
0757 while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
0758 uint32_t msglen;
0759 memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
0760
0761 if (msglen >= MESSAGE_SIZE_LIMIT) {
0762 losePeer("WARNING: excessively large message from ", p, ev);
0763 unlock();
0764 return true;
0765 }
0766
0767 if (data.size() - consumed >= msglen) {
0768 bool valid = true;
0769 if (msglen < 2 * sizeof(uint32_t)) {
0770 logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
0771 valid = false;
0772 } else {
0773
0774 Bucket msg;
0775 msg.next = nullptr;
0776 valid = onMessage(&msg, p, &data[0] + consumed, msglen);
0777
0778
0779 if (!msg.data.empty()) {
0780 Bucket **prev = &p->sendq;
0781 while (*prev)
0782 prev = &(*prev)->next;
0783
0784 *prev = new Bucket;
0785 (*prev)->next = nullptr;
0786 (*prev)->data.swap(msg.data);
0787 }
0788 }
0789
0790 if (!valid) {
0791 losePeer("WARNING: data stream error with ", p, ev);
0792 unlock();
0793 return true;
0794 }
0795
0796 consumed += msglen;
0797 } else
0798 break;
0799 }
0800
0801 data.erase(data.begin(), data.begin() + consumed);
0802
0803
0804
0805
0806 if (sz == 0)
0807 sel_.setMask(p->socket, p->mask &= ~IORead);
0808 }
0809
0810
0811 unlock();
0812 return false;
0813 }
0814
0815
0816
0817
0818
0819 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
0820
0821 assert(ev->source == server_);
0822
0823
0824 Socket *s = server_->accept();
0825 assert(s);
0826 assert(!s->isBlocking());
0827
0828
0829 lock();
0830 Peer *p = createPeer(s);
0831 std::string localaddr;
0832 if (auto *inet = dynamic_cast<InetSocket *>(s)) {
0833 InetAddress peeraddr = inet->peername();
0834 InetAddress myaddr = inet->sockname();
0835 p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
0836 localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
0837 } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
0838 p->peeraddr = local->peername().path();
0839 localaddr = local->sockname().path();
0840 } else
0841 assert(false);
0842
0843 p->mask = IORead | IOUrgent;
0844 p->socket = s;
0845
0846
0847 if (debug_)
0848 logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
0849
0850
0851 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
0852 unlock();
0853
0854
0855 return false;
0856 }
0857
0858
0859
0860
0861
0862
0863
0864 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
0865
0866 try {
0867 IOSize sz;
0868 unsigned char buf[1024];
0869 while ((sz = ev->source->read(buf, sizeof(buf))))
0870 ;
0871 } catch (Error &e) {
0872 auto *next = dynamic_cast<SystemError *>(e.next());
0873 if (next && next->portable() == SysErr::ErrTryAgain)
0874 ;
0875 else
0876 logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
0877 }
0878
0879
0880 flush_ = true;
0881
0882
0883 return false;
0884 }
0885
0886
0887
0888 void DQMNet::updateMask(Peer *p) {
0889 if (!p->socket)
0890 return;
0891
0892
0893 unsigned oldmask = p->mask;
0894 if (!p->sendq && (p->mask & IOWrite))
0895 sel_.setMask(p->socket, p->mask &= ~IOWrite);
0896
0897 if (p->sendq && !(p->mask & IOWrite))
0898 sel_.setMask(p->socket, p->mask |= IOWrite);
0899
0900 if (debug_ && oldmask != p->mask)
0901 logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
0902
0903
0904
0905 if (p->mask == IOUrgent && !p->waiting) {
0906 assert(!p->sendq);
0907 if (debug_)
0908 logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
0909 losePeer(nullptr, p, nullptr);
0910 }
0911 }
0912
0913
0914 DQMNet::DQMNet(const std::string &appname )
0915 : debug_(false),
0916 appname_(appname.empty() ? "DQMNet" : appname.c_str()),
0917 pid_(getpid()),
0918 server_(nullptr),
0919 version_(Time::current()),
0920 communicate_((pthread_t)-1),
0921 shutdown_(0),
0922 delay_(1000),
0923 waitStale_(0, 0, 0, 0, 500000000 ),
0924 waitMax_(0, 0, 0, 5 , 0),
0925 flush_(false) {
0926
0927
0928
0929 fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
0930 sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
0931
0932
0933 upstream_.peer = downstream_.peer = nullptr;
0934 upstream_.next = downstream_.next = 0;
0935 upstream_.port = downstream_.port = 0;
0936 upstream_.update = downstream_.update = false;
0937 }
0938
0939 DQMNet::~DQMNet() {
0940
0941 }
0942
0943
0944
0945 void DQMNet::debug(bool doit) { debug_ = doit; }
0946
0947
0948
0949 void DQMNet::delay(int delay) { delay_ = delay; }
0950
0951
0952
0953
0954
0955 void DQMNet::staleObjectWaitLimit(lat::TimeSpan time) { waitStale_ = time; }
0956
0957
0958
0959
0960 void DQMNet::startLocalServer(int port) {
0961 if (server_) {
0962 logme() << "ERROR: DQM server was already started.\n";
0963 return;
0964 }
0965
0966 try {
0967 InetAddress addr("0.0.0.0", port);
0968 auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
0969 s->bind(addr);
0970 s->listen(10);
0971 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0972 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0973 s->setBlocking(false);
0974 sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0975 } catch (Error &e) {
0976
0977
0978 logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
0979
0980 throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
0981
0982 port << ": " << e.explain().c_str();
0983 }
0984
0985 logme() << "INFO: DQM server started at port " << port << std::endl;
0986 }
0987
0988
0989
0990
0991 void DQMNet::startLocalServer(const char *path) {
0992 if (server_) {
0993 logme() << "ERROR: DQM server was already started.\n";
0994 return;
0995 }
0996
0997 try {
0998 server_ = new LocalServerSocket(path, 10);
0999 server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1000 server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1001 server_->setBlocking(false);
1002 sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1003 } catch (Error &e) {
1004
1005
1006 logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1007
1008 throw cms::Exception("DQMNet::startLocalServer")
1009 << "Failed to start server at path " << path << ": " << e.explain().c_str();
1010 }
1011
1012 logme() << "INFO: DQM server started at path " << path << std::endl;
1013 }
1014
1015
1016
1017
1018 void DQMNet::updateToCollector(const std::string &host, int port) {
1019 if (!downstream_.host.empty()) {
1020 logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1021 << std::endl;
1022 return;
1023 }
1024
1025 downstream_.update = true;
1026 downstream_.host = host;
1027 downstream_.port = port;
1028 }
1029
1030
1031
1032
1033 void DQMNet::listenToCollector(const std::string &host, int port) {
1034 if (!upstream_.host.empty()) {
1035 logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1036 << std::endl;
1037 return;
1038 }
1039
1040 upstream_.update = false;
1041 upstream_.host = host;
1042 upstream_.port = port;
1043 }
1044
1045
1046 void DQMNet::shutdown() {
1047 shutdown_ = 1;
1048 if (communicate_ != (pthread_t)-1)
1049 pthread_join(communicate_, nullptr);
1050 }
1051
1052
1053
1054
1055
1056
1057 static void *communicate(void *obj) {
1058 sigset_t sigs;
1059 sigfillset(&sigs);
1060 pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1061 ((DQMNet *)obj)->run();
1062 return nullptr;
1063 }
1064
1065
1066 void DQMNet::lock() {
1067 if (communicate_ != (pthread_t)-1)
1068 pthread_mutex_lock(&lock_);
1069 }
1070
1071
1072 void DQMNet::unlock() {
1073 if (communicate_ != (pthread_t)-1)
1074 pthread_mutex_unlock(&lock_);
1075 }
1076
1077
1078
1079
1080 void DQMNet::start() {
1081 if (communicate_ != (pthread_t)-1) {
1082 logme() << "ERROR: DQM networking thread has already been started\n";
1083 return;
1084 }
1085
1086 pthread_mutex_init(&lock_, nullptr);
1087 pthread_create(&communicate_, nullptr, &communicate, this);
1088 }
1089
1090
1091 void DQMNet::run() {
1092 Time now;
1093 Time nextFlush = 0;
1094 AutoPeer *automatic[2] = {&upstream_, &downstream_};
1095
1096
1097 while (!shouldStop()) {
1098 for (auto ap : automatic) {
1099
1100
1101
1102 if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1103 ap->next = now + TimeSpan(0, 0, 0, 15 , 0);
1104 InetSocket *s = nullptr;
1105 try {
1106 InetAddress addr(ap->host.c_str(), ap->port);
1107 s = new InetSocket(SOCK_STREAM, 0, addr.family());
1108 s->setBlocking(false);
1109 s->connect(addr);
1110 s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1111 s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1112 } catch (Error &e) {
1113 auto *sys = dynamic_cast<SystemError *>(e.next());
1114 if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1115
1116
1117
1118 if (s)
1119 s->abort();
1120 delete s;
1121 s = nullptr;
1122 }
1123 }
1124
1125
1126
1127 if (s) {
1128 Peer *p = createPeer(s);
1129 ap->peer = p;
1130
1131 InetAddress peeraddr = ((InetSocket *)s)->peername();
1132 InetAddress myaddr = ((InetSocket *)s)->sockname();
1133 p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1134 p->mask = IORead | IOWrite | IOUrgent;
1135 p->update = ap->update;
1136 p->automatic = ap;
1137 p->socket = s;
1138 sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1139 if (ap == &upstream_) {
1140 uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1141 p->sendq = new Bucket;
1142 p->sendq->next = nullptr;
1143 copydata(p->sendq, words, sizeof(words));
1144 }
1145
1146
1147 if (debug_)
1148 logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1149 << std::endl;
1150 }
1151 }
1152 }
1153
1154
1155 sel_.dispatch(delay_);
1156 now = Time::current();
1157 lock();
1158
1159
1160
1161 if (flush_ && now > nextFlush) {
1162 flush_ = false;
1163 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
1164 sendObjectListToPeers(true);
1165 }
1166
1167
1168
1169
1170
1171
1172 updatePeerMasks();
1173
1174
1175 Time waitold = now - waitMax_;
1176 Time waitstale = now - waitStale_;
1177 for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1178 Object *o = findObject(nullptr, i->name);
1179
1180
1181
1182 if (i->time < waitold) {
1183 logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1184 << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1185 releaseFromWait(i++, o);
1186 } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1187 logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1188 << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1189 releaseFromWait(i++, o);
1190 }
1191
1192
1193 else
1194 ++i;
1195 }
1196
1197 unlock();
1198 }
1199 }
1200
1201
1202
1203 void DQMNet::sendLocalChanges() {
1204 char byte = 0;
1205 wakeup_.sink()->write(&byte, 1);
1206 }
1207
1208
1209
1210
1211 DQMBasicNet::DQMBasicNet(const std::string &appname ) : DQMImplNet<DQMNet::Object>(appname) {
1212 local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1213 }
1214
1215
1216 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.resize(size); }
1217
1218
1219
1220 void DQMBasicNet::updateLocalObject(Object &o) {
1221 o.dirname = *local_->dirs.insert(o.dirname).first;
1222 std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1223 if (!info.second) {
1224
1225
1226
1227 auto &old = const_cast<Object &>(*info.first);
1228 std::swap(old.flags, o.flags);
1229 std::swap(old.tag, o.tag);
1230 std::swap(old.version, o.version);
1231 std::swap(old.qreports, o.qreports);
1232 std::swap(old.rawdata, o.rawdata);
1233 std::swap(old.scalar, o.scalar);
1234 std::swap(old.qdata, o.qdata);
1235 }
1236 }
1237
1238
1239
1240
1241 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1242 size_t removed = 0;
1243 std::string path;
1244 ObjectMap::iterator i, e;
1245 for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1246 path.clear();
1247 path.reserve(i->dirname.size() + i->objname.size() + 2);
1248 path += i->dirname;
1249 if (!path.empty())
1250 path += '/';
1251 path += i->objname;
1252
1253 if (!known.count(path))
1254 ++removed, local_->objs.erase(i++);
1255 else
1256 ++i;
1257 }
1258
1259 return removed > 0;
1260 }