Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-13 02:31:45

0001 #include "DQMServices/Core/interface/DQMNet.h"
0002 #include "classlib/iobase/InetServerSocket.h"
0003 #include "classlib/iobase/LocalServerSocket.h"
0004 #include "classlib/sysapi/InetSocket.h"  // for completing InetAddress
0005 #include "classlib/utils/SystemError.h"
0006 #include "classlib/utils/Regexp.h"
0007 #include <fmt/format.h>
0008 #include <unistd.h>
0009 #include <fcntl.h>
0010 #include <sys/wait.h>
0011 #include <cstdio>
0012 #include <cstdint>
0013 #include <iostream>
0014 #include <sstream>
0015 #include <cassert>
0016 #include <cfloat>
0017 #include <cinttypes>
0018 
0019 #include "FWCore/Utilities/interface/EDMException.h"
0020 
0021 #if __APPLE__
0022 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
0023 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
0024 #else
0025 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
0026 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
0027 #endif
0028 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
0029 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
0030 
0031 using namespace lat;
0032 
0033 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
0034 
0035 // TODO: Can't include the header file since that leads to ambiguities.
0036 namespace dqm {
0037   namespace qstatus {
0038     static const int STATUS_OK = 100;  //< Test was succesful.
0039     static const int WARNING = 200;    //< Test had some problems.
0040     static const int ERROR = 300;      //< Test has failed.
0041   }  // namespace qstatus
0042 }  // namespace dqm
0043 
0044 //////////////////////////////////////////////////////////////////////
0045 // Generate log prefix.
0046 std::ostream &DQMNet::logme() {
0047   Time now = Time::current();
0048   return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
0049                    << "]: ";
0050 }
0051 
0052 // Append data into a bucket.
0053 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
0054   b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
0055 }
0056 
0057 // Discard a bucket chain.
0058 void DQMNet::discard(Bucket *&b) {
0059   while (b) {
0060     Bucket *next = b->next;
0061     delete b;
0062     b = next;
0063   }
0064 }
0065 
0066 //////////////////////////////////////////////////////////////////////
0067 /** Handle errors with a peer socket.  Zaps the socket send queue,
0068     the socket itself, detaches the socket from the selector, and
0069     purges any pending wait requests linked to the socket.  */
0070 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
0071   if (reason)
0072     logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
0073 
0074   Socket *s = peer->socket;
0075 
0076   for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0077     if (i->peer == peer)
0078       waiting_.erase(i++);
0079     else
0080       ++i;
0081 
0082   if (ev)
0083     ev->source = nullptr;
0084 
0085   discard(peer->sendq);
0086   if (peer->automatic)
0087     peer->automatic->peer = nullptr;
0088 
0089   sel_.detach(s);
0090   s->close();
0091   removePeer(peer, s);
0092   delete s;
0093 }
0094 
0095 /// Queue an object request to the data server.
0096 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
0097   // Issue request to peer.
0098   Bucket **msg = &p->sendq;
0099   while (*msg)
0100     msg = &(*msg)->next;
0101   *msg = new Bucket;
0102   (*msg)->next = nullptr;
0103 
0104   uint32_t words[3];
0105   words[0] = sizeof(words) + len;
0106   words[1] = DQM_MSG_GET_OBJECT;
0107   words[2] = len;
0108   copydata(*msg, words, sizeof(words));
0109   copydata(*msg, name, len);
0110 }
0111 
0112 /// Queue a request for an object and put a peer into the mode of
0113 /// waiting for object data to appear.
0114 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
0115   // FIXME: Should we automatically record which exact peer the waiter
0116   // is expecting to deliver data so we know to release the waiter if
0117   // the other peer vanishes?  The current implementation stands a
0118   // chance for the waiter to wait indefinitely -- although we do
0119   // force terminate the wait after a while.
0120   requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
0121   WaitObject wo = {Time::current(), name, info, p};
0122   waiting_.push_back(wo);
0123   p->waiting++;
0124 }
0125 
0126 // Once an object has been updated, this is invoked for all waiting
0127 // peers.  Send the object back to the peer in suitable form.
0128 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
0129   Bucket **msg = &i->peer->sendq;
0130   while (*msg)
0131     msg = &(*msg)->next;
0132   *msg = new Bucket;
0133   (*msg)->next = nullptr;
0134 
0135   releaseFromWait(*msg, *i, o);
0136 
0137   assert(i->peer->waiting > 0);
0138   i->peer->waiting--;
0139   waiting_.erase(i);
0140 }
0141 
0142 // Release everyone waiting for the object @a o.
0143 void DQMNet::releaseWaiters(const std::string &name, Object *o) {
0144   for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0145     if (i->name == name)
0146       releaseFromWait(i++, o);
0147     else
0148       ++i;
0149 }
0150 
0151 //////////////////////////////////////////////////////////////////////
0152 /// Pack quality results in @a qr into a string @a into for
0153 /// peristent storage, such as network transfer or archival.
0154 void DQMNet::packQualityData(std::string &into, const QReports &qr) {
0155   char buf[64];
0156   std::ostringstream qrs;
0157   QReports::const_iterator qi, qe;
0158   for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
0159     int pos = 0;
0160     sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
0161     qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
0162         << '\0';
0163   }
0164   into = qrs.str();
0165 }
0166 
0167 /// Unpack the quality results from string @a from into @a qr.
0168 /// Assumes the data was saved with packQualityData().
0169 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
0170   const char *qdata = from;
0171 
0172   // Count how many qresults there are.
0173   size_t nqv = 0;
0174   while (*qdata) {
0175     ++nqv;
0176     while (*qdata)
0177       ++qdata;
0178     ++qdata;
0179     while (*qdata)
0180       ++qdata;
0181     ++qdata;
0182     while (*qdata)
0183       ++qdata;
0184     ++qdata;
0185     while (*qdata)
0186       ++qdata;
0187     ++qdata;
0188     while (*qdata)
0189       ++qdata;
0190     ++qdata;
0191   }
0192 
0193   // Now extract the qreports.
0194   qdata = from;
0195   qr.reserve(nqv);
0196   while (*qdata) {
0197     qr.emplace_back();
0198     DQMNet::QValue &qv = qr.back();
0199 
0200     qv.code = atoi(qdata);
0201     while (*qdata)
0202       ++qdata;
0203     switch (qv.code) {
0204       case dqm::qstatus::STATUS_OK:
0205         break;
0206       case dqm::qstatus::WARNING:
0207         flags |= DQMNet::DQM_PROP_REPORT_WARN;
0208         break;
0209       case dqm::qstatus::ERROR:
0210         flags |= DQMNet::DQM_PROP_REPORT_ERROR;
0211         break;
0212       default:
0213         flags |= DQMNet::DQM_PROP_REPORT_OTHER;
0214         break;
0215     }
0216 
0217     qv.qtresult = atof(++qdata);
0218     while (*qdata)
0219       ++qdata;
0220 
0221     qv.qtname = ++qdata;
0222     while (*qdata)
0223       ++qdata;
0224 
0225     qv.algorithm = ++qdata;
0226     while (*qdata)
0227       ++qdata;
0228 
0229     qv.message = ++qdata;
0230     while (*qdata)
0231       ++qdata;
0232     ++qdata;
0233   }
0234 }
0235 
0236 #if 0
0237 // Deserialise a ROOT object from a buffer at the current position.
0238 static TObject *
0239 extractNextObject(TBufferFile &buf)
0240 {
0241   if (buf.Length() == buf.BufferSize())
0242     return 0;
0243 
0244   buf.InitMap();
0245   Int_t pos = buf.Length();
0246   TClass *c = buf.ReadClass();
0247   buf.SetBufferOffset(pos);
0248   buf.ResetMap();
0249   return c ? buf.ReadObject(c) : 0;
0250 }
0251 
0252 // Reconstruct an object from the raw data.
0253 bool
0254 DQMNet::reconstructObject(Object &o)
0255 {
0256   TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
0257   buf.Reset();
0258 
0259   // Extract the main object.
0260   if (! (o.object = extractNextObject(buf)))
0261     return false;
0262   
0263   // Extract the reference object.
0264   o.reference = extractNextObject(buf);
0265 
0266   // Extract quality reports.
0267   unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
0268   return true;
0269 }
0270 #endif
0271 
0272 #if 0
0273 bool
0274 DQMNet::reinstateObject(DQMStore *store, Object &o)
0275 {
0276   if (! reconstructObject (o))
0277     return false;
0278 
0279   // Reconstruct the main object
0280   MonitorElement *obj = 0;
0281   store->setCurrentFolder(o.dirname);
0282   switch (o.flags & DQM_PROP_TYPE_MASK)
0283   {
0284   case DQM_PROP_TYPE_INT:
0285     obj = store->bookInt(o.objname);
0286     obj->Fill(atoll(o.scalar.c_str()));
0287     break;
0288 
0289   case DQM_PROP_TYPE_REAL:
0290     obj = store->bookFloat(name);
0291     obj->Fill(atof(o.scalar.c_str()));
0292     break;
0293 
0294   case DQM_PROP_TYPE_STRING:
0295     obj = store->bookString(name, o.scalar);
0296     break;
0297 
0298   case DQM_PROP_TYPE_TH1F:
0299     obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
0300     break;
0301 
0302   case DQM_PROP_TYPE_TH1S:
0303     obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
0304     break;
0305 
0306   case DQM_PROP_TYPE_TH1D:
0307     obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
0308     break;
0309 
0310   case DQM_PROP_TYPE_TH1I:
0311     obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
0312     break;
0313 
0314   case DQM_PROP_TYPE_TH2F:
0315     obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
0316     break;
0317 
0318   case DQM_PROP_TYPE_TH2S:
0319     obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
0320     break;
0321 
0322   case DQM_PROP_TYPE_TH2D:
0323     obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
0324     break;
0325 
0326  case DQM_PROP_TYPE_TH2I:
0327     obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
0328     break;
0329 
0330   case DQM_PROP_TYPE_TH2Poly:
0331     obj = store->book2DPoly(name, dynamic_cast<TH2Poly *>(o.object));
0332     break;
0333 
0334   case DQM_PROP_TYPE_TH3F:
0335     obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
0336     break;
0337 
0338   case DQM_PROP_TYPE_TH3S:
0339     obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
0340     break;
0341 
0342   case DQM_PROP_TYPE_TH3D:
0343     obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
0344     break;
0345 
0346   case DQM_PROP_TYPE_PROF:
0347     obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
0348     break;
0349 
0350   case DQM_PROP_TYPE_PROF2D:
0351     obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
0352     break;
0353 
0354   default:
0355     logme()
0356       << "ERROR: unexpected monitor element of type "
0357       << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
0358       << o.dirname << '/' << o.objname << "'\n";
0359     return false;
0360   }
0361 
0362   // Reconstruct tag and qreports.
0363   if (obj)
0364   {
0365     obj->data_.tag = o.tag;
0366     obj->data_.qreports = o.qreports;
0367   }
0368 
0369   // Inidicate success.
0370   return true;
0371 }
0372 #endif
0373 
0374 //////////////////////////////////////////////////////////////////////
0375 // Check if the network layer should stop.
0376 bool DQMNet::shouldStop() { return shutdown_; }
0377 
0378 // Once an object has been updated, this is invoked for all waiting
0379 // peers.  Send the requested object to the waiting peer.
0380 void DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o) {
0381   if (o)
0382     sendObjectToPeer(msg, *o, true);
0383   else {
0384     uint32_t words[3];
0385     words[0] = sizeof(words) + w.name.size();
0386     words[1] = DQM_REPLY_NONE;
0387     words[2] = w.name.size();
0388 
0389     msg->data.reserve(msg->data.size() + words[0]);
0390     copydata(msg, &words[0], sizeof(words));
0391     copydata(msg, &w.name[0], w.name.size());
0392   }
0393 }
0394 
0395 // Send an object to a peer.  If not @a data, only sends a summary
0396 // without the object data, except the data is always sent for scalar
0397 // objects.
0398 void DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data) {
0399   uint32_t flags = o.flags & ~DQM_PROP_DEAD;
0400   DataBlob objdata;
0401 
0402   if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
0403     objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
0404   else if (data)
0405     objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
0406 
0407   uint32_t words[9];
0408   uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
0409   uint32_t datalen = objdata.size();
0410   uint32_t qlen = o.qdata.size();
0411 
0412   if (o.dirname.empty())
0413     --namelen;
0414 
0415   words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
0416   words[1] = DQM_REPLY_OBJECT;
0417   words[2] = flags;
0418   words[3] = (o.version >> 0) & 0xffffffff;
0419   words[4] = (o.version >> 32) & 0xffffffff;
0420   words[5] = o.tag;
0421   words[6] = namelen;
0422   words[7] = datalen;
0423   words[8] = qlen;
0424 
0425   msg->data.reserve(msg->data.size() + words[0]);
0426   copydata(msg, &words[0], 9 * sizeof(uint32_t));
0427   if (namelen) {
0428     copydata(msg, &(o.dirname)[0], o.dirname.size());
0429     if (!o.dirname.empty())
0430       copydata(msg, "/", 1);
0431     copydata(msg, &o.objname[0], o.objname.size());
0432   }
0433   if (datalen)
0434     copydata(msg, &objdata[0], datalen);
0435   if (qlen)
0436     copydata(msg, &o.qdata[0], qlen);
0437 }
0438 
0439 //////////////////////////////////////////////////////////////////////
0440 // Handle peer messages.
0441 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
0442   // Decode and process this message.
0443   uint32_t type;
0444   memcpy(&type, data + sizeof(uint32_t), sizeof(type));
0445   switch (type) {
0446     case DQM_MSG_UPDATE_ME: {
0447       if (len != 2 * sizeof(uint32_t)) {
0448         logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
0449         return false;
0450       }
0451 
0452       if (debug_)
0453         logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
0454 
0455       p->update = true;
0456     }
0457       return true;
0458 
0459     case DQM_MSG_LIST_OBJECTS: {
0460       if (debug_)
0461         logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
0462 
0463       // Send over current status: list of known objects.
0464       sendObjectListToPeer(msg, true, false);
0465     }
0466       return true;
0467 
0468     case DQM_MSG_GET_OBJECT: {
0469       if (debug_)
0470         logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
0471 
0472       if (len < 3 * sizeof(uint32_t)) {
0473         logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0474         return false;
0475       }
0476 
0477       uint32_t namelen;
0478       memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
0479       if (len != 3 * sizeof(uint32_t) + namelen) {
0480         logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
0481                 << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
0482         return false;
0483       }
0484 
0485       std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
0486       Peer *owner = nullptr;
0487       Object *o = findObject(nullptr, name, &owner);
0488       if (o) {
0489         o->lastreq = Time::current().ns();
0490         if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
0491             (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0492           waitForData(p, name, "", owner);
0493         else
0494           sendObjectToPeer(msg, *o, true);
0495       } else {
0496         uint32_t words[3];
0497         words[0] = sizeof(words) + name.size();
0498         words[1] = DQM_REPLY_NONE;
0499         words[2] = name.size();
0500 
0501         msg->data.reserve(msg->data.size() + words[0]);
0502         copydata(msg, &words[0], sizeof(words));
0503         copydata(msg, &name[0], name.size());
0504       }
0505     }
0506       return true;
0507 
0508     case DQM_REPLY_LIST_BEGIN: {
0509       if (len != 4 * sizeof(uint32_t)) {
0510         logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
0511         return false;
0512       }
0513 
0514       // Get the update status: whether this is a full update.
0515       uint32_t flags;
0516       memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0517 
0518       if (debug_)
0519         logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
0520                 << p->peeraddr << ", size " << len << std::endl;
0521 
0522       // If we are about to receive a full list of objects, flag all
0523       // objects as possibly dead.  Subsequent object notifications
0524       // will undo this for the live objects.  We cannot delete
0525       // objects quite yet, as we may get inquiry from another client
0526       // while we are processing the incoming list, so we keep the
0527       // objects tentatively alive as long as we've not seen the end.
0528       if (flags)
0529         markObjectsDead(p);
0530     }
0531       return true;
0532 
0533     case DQM_REPLY_LIST_END: {
0534       if (len != 4 * sizeof(uint32_t)) {
0535         logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
0536         return false;
0537       }
0538 
0539       // Get the update status: whether this is a full update.
0540       uint32_t flags;
0541       memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0542 
0543       // If we received a full list of objects, now purge all dead
0544       // objects. We need to do this in two stages in case we receive
0545       // updates in many parts, and end up sending updates to others in
0546       // between; this avoids us lying live objects are dead.
0547       if (flags)
0548         purgeDeadObjects(p);
0549 
0550       if (debug_)
0551         logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
0552                 << ", size " << len << std::endl;
0553 
0554       // Indicate we have received another update from this peer.
0555       // Also indicate we should flush to our clients.
0556       flush_ = true;
0557       p->updates++;
0558     }
0559       return true;
0560 
0561     case DQM_REPLY_OBJECT: {
0562       uint32_t words[9];
0563       if (len < sizeof(words)) {
0564         logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
0565         return false;
0566       }
0567 
0568       memcpy(&words[0], data, sizeof(words));
0569       uint32_t &namelen = words[6];
0570       uint32_t &datalen = words[7];
0571       uint32_t &qlen = words[8];
0572 
0573       if (len != sizeof(words) + namelen + datalen + qlen) {
0574         logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
0575                 << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
0576                 << std::endl;
0577         return false;
0578       }
0579 
0580       unsigned char *namedata = data + sizeof(words);
0581       unsigned char *objdata = namedata + namelen;
0582       unsigned char *qdata = objdata + datalen;
0583       unsigned char *enddata = qdata + qlen;
0584       std::string name((char *)namedata, namelen);
0585       assert(enddata == data + len);
0586 
0587       if (debug_)
0588         logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
0589                 << std::endl;
0590 
0591       // Mark the peer as a known object source.
0592       p->source = true;
0593 
0594       // Initialise or update an object entry.
0595       Object *o = findObject(p, name);
0596       if (!o)
0597         o = makeObject(p, name);
0598 
0599       o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
0600       o->tag = words[5];
0601       o->version = ((uint64_t)words[4] << 32 | words[3]);
0602       o->scalar.clear();
0603       o->qdata.clear();
0604       if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
0605         o->rawdata.clear();
0606         o->scalar.insert(o->scalar.end(), objdata, qdata);
0607       } else if (datalen) {
0608         o->rawdata.clear();
0609         o->rawdata.insert(o->rawdata.end(), objdata, qdata);
0610       } else if (!o->rawdata.empty())
0611         o->flags |= DQM_PROP_STALE;
0612       o->qdata.insert(o->qdata.end(), qdata, enddata);
0613 
0614       // If we had an object for this one already and this is a list
0615       // update without data, issue an immediate data get request.
0616       if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0617         requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
0618 
0619       // If we have the object data, release from wait.
0620       if (datalen)
0621         releaseWaiters(name, o);
0622     }
0623       return true;
0624 
0625     case DQM_REPLY_NONE: {
0626       uint32_t words[3];
0627       if (len < sizeof(words)) {
0628         logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0629         return false;
0630       }
0631 
0632       memcpy(&words[0], data, sizeof(words));
0633       uint32_t &namelen = words[2];
0634 
0635       if (len != sizeof(words) + namelen) {
0636         logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
0637                 << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
0638         return false;
0639       }
0640 
0641       unsigned char *namedata = data + sizeof(words);
0642       std::string name((char *)namedata, namelen);
0643 
0644       if (debug_)
0645         logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
0646                 << std::endl;
0647 
0648       // Mark the peer as a known object source.
0649       p->source = true;
0650 
0651       // If this was a known object, kill it.
0652       if (Object *o = findObject(p, name)) {
0653         o->flags |= DQM_PROP_DEAD;
0654         purgeDeadObjects(p);
0655       }
0656 
0657       // If someone was waiting for this, let them go.
0658       releaseWaiters(name, nullptr);
0659     }
0660       return true;
0661 
0662     default:
0663       logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
0664               << std::endl;
0665       return false;
0666   }
0667 }
0668 
0669 //////////////////////////////////////////////////////////////////////
0670 /// Handle communication to a particular client.
0671 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
0672   lock();
0673   assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
0674 
0675   // If there is a problem with the peer socket, discard the peer
0676   // and tell the selector to stop prcessing events for it.  If
0677   // this is a server connection, we will eventually recreate
0678   // everything if/when the data server comes back.
0679   if (ev->events & IOUrgent) {
0680     if (p->automatic) {
0681       logme() << "WARNING: connection to the DQM server at " << p->peeraddr
0682               << " lost (will attempt to reconnect in 15 seconds)\n";
0683       losePeer(nullptr, p, ev);
0684     } else
0685       losePeer("WARNING: lost peer connection ", p, ev);
0686 
0687     unlock();
0688     return true;
0689   }
0690 
0691   // If we can write to the peer socket, pump whatever we can into it.
0692   if (ev->events & IOWrite) {
0693     while (Bucket *b = p->sendq) {
0694       IOSize len = b->data.size() - p->sendpos;
0695       const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
0696       IOSize done;
0697 
0698       try {
0699         done = (len ? ev->source->write(data, len) : 0);
0700         if (debug_ && len)
0701           logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
0702       } catch (Error &e) {
0703         losePeer("WARNING: unable to write to peer ", p, ev, &e);
0704         unlock();
0705         return true;
0706       }
0707 
0708       p->sendpos += done;
0709       if (p->sendpos == b->data.size()) {
0710         Bucket *old = p->sendq;
0711         p->sendq = old->next;
0712         p->sendpos = 0;
0713         old->next = nullptr;
0714         discard(old);
0715       }
0716 
0717       if (!done && len)
0718         // Cannot write any more.
0719         break;
0720     }
0721   }
0722 
0723   // If there is data to be read from the peer, first receive what we
0724   // can get out the socket, the process all complete requests.
0725   if (ev->events & IORead) {
0726     // First build up the incoming buffer of data in the socket.
0727     // Remember the last size returned by the socket; we need
0728     // it to determine if the remote end closed the connection.
0729     IOSize sz;
0730     try {
0731       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
0732       do
0733         if ((sz = ev->source->read(&buf[0], buf.size()))) {
0734           if (debug_)
0735             logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
0736           DataBlob &data = p->incoming;
0737           if (data.capacity() < data.size() + sz)
0738             data.reserve(data.size() + SOCKET_READ_GROWTH);
0739           data.insert(data.end(), &buf[0], &buf[0] + sz);
0740         }
0741       while (sz == sizeof(buf));
0742     } catch (Error &e) {
0743       auto *next = dynamic_cast<SystemError *>(e.next());
0744       if (next && next->portable() == SysErr::ErrTryAgain)
0745         sz = 1;  // Ignore it, and fake no end of data.
0746       else {
0747         // Houston we have a problem.
0748         losePeer("WARNING: failed to read from peer ", p, ev, &e);
0749         unlock();
0750         return true;
0751       }
0752     }
0753 
0754     // Process fully received messages as long as we can.
0755     size_t consumed = 0;
0756     DataBlob &data = p->incoming;
0757     while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
0758       uint32_t msglen;
0759       memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
0760 
0761       if (msglen >= MESSAGE_SIZE_LIMIT) {
0762         losePeer("WARNING: excessively large message from ", p, ev);
0763         unlock();
0764         return true;
0765       }
0766 
0767       if (data.size() - consumed >= msglen) {
0768         bool valid = true;
0769         if (msglen < 2 * sizeof(uint32_t)) {
0770           logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
0771           valid = false;
0772         } else {
0773           // Decode and process this message.
0774           Bucket msg;
0775           msg.next = nullptr;
0776           valid = onMessage(&msg, p, &data[0] + consumed, msglen);
0777 
0778           // If we created a response, chain it to the write queue.
0779           if (!msg.data.empty()) {
0780             Bucket **prev = &p->sendq;
0781             while (*prev)
0782               prev = &(*prev)->next;
0783 
0784             *prev = new Bucket;
0785             (*prev)->next = nullptr;
0786             (*prev)->data.swap(msg.data);
0787           }
0788         }
0789 
0790         if (!valid) {
0791           losePeer("WARNING: data stream error with ", p, ev);
0792           unlock();
0793           return true;
0794         }
0795 
0796         consumed += msglen;
0797       } else
0798         break;
0799     }
0800 
0801     data.erase(data.begin(), data.begin() + consumed);
0802 
0803     // If the client has closed the connection, shut down our end.  If
0804     // we have something to send back still, leave the write direction
0805     // open.  Otherwise close the shop for this client.
0806     if (sz == 0)
0807       sel_.setMask(p->socket, p->mask &= ~IORead);
0808   }
0809 
0810   // Yes, please keep processing events for this socket.
0811   unlock();
0812   return false;
0813 }
0814 
0815 /** Respond to new connections on the server socket.  Accepts the
0816     connection and creates a new socket for the peer, and sets it up
0817     for further communication.  Returns @c false always to tell the
0818     IOSelector to keep processing events for the server socket.  */
0819 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
0820   // Recover the server socket.
0821   assert(ev->source == server_);
0822 
0823   // Accept the connection.
0824   Socket *s = server_->accept();
0825   assert(s);
0826   assert(!s->isBlocking());
0827 
0828   // Record it to our list of peers.
0829   lock();
0830   Peer *p = createPeer(s);
0831   std::string localaddr;
0832   if (auto *inet = dynamic_cast<InetSocket *>(s)) {
0833     InetAddress peeraddr = inet->peername();
0834     InetAddress myaddr = inet->sockname();
0835     p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
0836     localaddr = fmt::format("{}:{}", myaddr.hostname(), myaddr.port());
0837   } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
0838     p->peeraddr = local->peername().path();
0839     localaddr = local->sockname().path();
0840   } else
0841     assert(false);
0842 
0843   p->mask = IORead | IOUrgent;
0844   p->socket = s;
0845 
0846   // Report the new connection.
0847   if (debug_)
0848     logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
0849 
0850   // Attach it to the listener.
0851   sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
0852   unlock();
0853 
0854   // We are never done.
0855   return false;
0856 }
0857 
0858 /** React to notifications from the DQM thread.  This is a simple
0859     message to tell this thread to wake up and send unsollicited
0860     updates to the peers when new DQM data appears.  We don't send
0861     the updates here, but just set a flag to tell the main event
0862     pump to send a notification later.  This avoids sending
0863     unnecessarily frequent DQM object updates.  */
0864 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
0865   // Discard the data in the pipe, we care only about the wakeup.
0866   try {
0867     unsigned char buf[1024];
0868     while ((ev->source->read(buf, sizeof(buf))))
0869       ;
0870   } catch (Error &e) {
0871     auto *next = dynamic_cast<SystemError *>(e.next());
0872     if (next && next->portable() == SysErr::ErrTryAgain)
0873       ;  // Ignore it
0874     else
0875       logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
0876   }
0877 
0878   // Tell the main event pump to send an update in a little while.
0879   flush_ = true;
0880 
0881   // We are never done, always keep going.
0882   return false;
0883 }
0884 
0885 /// Update the selector mask for a peer based on data queues.  Close
0886 /// the connection if there is no reason to maintain it open.
0887 void DQMNet::updateMask(Peer *p) {
0888   if (!p->socket)
0889     return;
0890 
0891   // Listen to writes iff we have data to send.
0892   unsigned oldmask = p->mask;
0893   if (!p->sendq && (p->mask & IOWrite))
0894     sel_.setMask(p->socket, p->mask &= ~IOWrite);
0895 
0896   if (p->sendq && !(p->mask & IOWrite))
0897     sel_.setMask(p->socket, p->mask |= IOWrite);
0898 
0899   if (debug_ && oldmask != p->mask)
0900     logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
0901 
0902   // If we have nothing more to send and are no longer listening
0903   // for reads, close up the shop for this peer.
0904   if (p->mask == IOUrgent && !p->waiting) {
0905     assert(!p->sendq);
0906     if (debug_)
0907       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
0908     losePeer(nullptr, p, nullptr);
0909   }
0910 }
0911 
0912 //////////////////////////////////////////////////////////////////////
0913 DQMNet::DQMNet(const std::string &appname /* = "" */)
0914     : debug_(false),
0915       appname_(appname.empty() ? "DQMNet" : appname.c_str()),
0916       pid_(getpid()),
0917       server_(nullptr),
0918       version_(Time::current()),
0919       communicate_((pthread_t)-1),
0920       shutdown_(0),
0921       delay_(1000),
0922       waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
0923       waitMax_(0, 0, 0, 5 /* seconds */, 0),
0924       flush_(false) {
0925   // Create a pipe for the local DQM to tell the communicator
0926   // thread that local DQM data has changed and that the peers
0927   // should be notified.
0928   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
0929   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
0930 
0931   // Initialise the upstream and downstream to empty.
0932   upstream_.peer = downstream_.peer = nullptr;
0933   upstream_.next = downstream_.next = 0;
0934   upstream_.port = downstream_.port = 0;
0935   upstream_.update = downstream_.update = false;
0936 }
0937 
0938 DQMNet::~DQMNet() {
0939   // FIXME
0940 }
0941 
0942 /// Enable or disable verbose debugging.  Must be called before
0943 /// calling run() or start().
0944 void DQMNet::debug(bool doit) { debug_ = doit; }
0945 
0946 /// Set the I/O dispatching delay.  Must be called before calling
0947 /// run() or start().
0948 void DQMNet::delay(int delay) { delay_ = delay; }
0949 
0950 /// Set the time limit for waiting updates to stale objects.
0951 /// Once limit has been exhausted whatever data exists is returned.
0952 /// Applies only when data has been received, another time limit is
0953 /// applied when no data payload has been received at all.
0954 void DQMNet::staleObjectWaitLimit(lat::TimeSpan time) { waitStale_ = time; }
0955 
0956 /// Start a server socket for accessing this DQM node remotely.  Must
0957 /// be called before calling run() or start().  May throw an Exception
0958 /// if the server socket cannot be initialised.
0959 void DQMNet::startLocalServer(int port) {
0960   if (server_) {
0961     logme() << "ERROR: DQM server was already started.\n";
0962     return;
0963   }
0964 
0965   try {
0966     InetAddress addr("0.0.0.0", port);
0967     auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
0968     s->bind(addr);
0969     s->listen(10);
0970     s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0971     s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0972     s->setBlocking(false);
0973     sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0974   } catch (Error &e) {
0975     // FIXME: Do we need to do this when we throw an exception anyway?
0976     // FIXME: Abort instead?
0977     logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
0978 
0979     throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
0980 
0981         port << ": " << e.explain().c_str();
0982   }
0983 
0984   logme() << "INFO: DQM server started at port " << port << std::endl;
0985 }
0986 
0987 /// Start a server socket for accessing this DQM node over a file
0988 /// system socket.  Must be called before calling run() or start().
0989 /// May throw an Exception if the server socket cannot be initialised.
0990 void DQMNet::startLocalServer(const char *path) {
0991   if (server_) {
0992     logme() << "ERROR: DQM server was already started.\n";
0993     return;
0994   }
0995 
0996   try {
0997     server_ = new LocalServerSocket(path, 10);
0998     server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0999     server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1000     server_->setBlocking(false);
1001     sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1002   } catch (Error &e) {
1003     // FIXME: Do we need to do this when we throw an exception anyway?
1004     // FIXME: Abort instead?
1005     logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1006 
1007     throw cms::Exception("DQMNet::startLocalServer")
1008         << "Failed to start server at path " << path << ": " << e.explain().c_str();
1009   }
1010 
1011   logme() << "INFO: DQM server started at path " << path << std::endl;
1012 }
1013 
1014 /// Tell the network layer to connect to @a host and @a port and
1015 /// automatically send updates whenever local DQM data changes.  Must
1016 /// be called before calling run() or start().
1017 void DQMNet::updateToCollector(const std::string &host, int port) {
1018   if (!downstream_.host.empty()) {
1019     logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1020             << std::endl;
1021     return;
1022   }
1023 
1024   downstream_.update = true;
1025   downstream_.host = host;
1026   downstream_.port = port;
1027 }
1028 
1029 /// Tell the network layer to connect to @a host and @a port and
1030 /// automatically receive updates from upstream DQM sources.  Must be
1031 /// called before calling run() or start().
1032 void DQMNet::listenToCollector(const std::string &host, int port) {
1033   if (!upstream_.host.empty()) {
1034     logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1035             << std::endl;
1036     return;
1037   }
1038 
1039   upstream_.update = false;
1040   upstream_.host = host;
1041   upstream_.port = port;
1042 }
1043 
1044 /// Stop the network layer and wait it to finish.
1045 void DQMNet::shutdown() {
1046   shutdown_ = 1;
1047   if (communicate_ != (pthread_t)-1)
1048     pthread_join(communicate_, nullptr);
1049 }
1050 
1051 /** A thread to communicate with the distributed memory cache peers.
1052     All this does is run the loop to respond to new connections.
1053     Much of the actual work is done when a new connection is
1054     received, and in pumping data around in response to actual
1055     requests.  */
1056 static void *communicate(void *obj) {
1057   sigset_t sigs;
1058   sigfillset(&sigs);
1059   pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1060   ((DQMNet *)obj)->run();
1061   return nullptr;
1062 }
1063 
1064 /// Acquire a lock on the DQM net layer.
1065 void DQMNet::lock() {
1066   if (communicate_ != (pthread_t)-1)
1067     pthread_mutex_lock(&lock_);
1068 }
1069 
1070 /// Release the lock on the DQM net layer.
1071 void DQMNet::unlock() {
1072   if (communicate_ != (pthread_t)-1)
1073     pthread_mutex_unlock(&lock_);
1074 }
1075 
1076 /// Start running the network layer in a new thread.  This is an
1077 /// exclusive alternative to the run() method, which runs the network
1078 /// layer in the caller's thread.
1079 void DQMNet::start() {
1080   if (communicate_ != (pthread_t)-1) {
1081     logme() << "ERROR: DQM networking thread has already been started\n";
1082     return;
1083   }
1084 
1085   pthread_mutex_init(&lock_, nullptr);
1086   pthread_create(&communicate_, nullptr, &communicate, this);
1087 }
1088 
1089 /** Run the actual I/O processing loop. */
1090 void DQMNet::run() {
1091   Time now;
1092   Time nextFlush = 0;
1093   AutoPeer *automatic[2] = {&upstream_, &downstream_};
1094 
1095   // Perform I/O.  Every once in a while flush updates to peers.
1096   while (!shouldStop()) {
1097     for (auto ap : automatic) {
1098       // If we need a server connection and don't have one yet,
1099       // initiate asynchronous connection creation.  Swallow errors
1100       // in case the server won't talk to us.
1101       if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1102         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1103         InetSocket *s = nullptr;
1104         try {
1105           InetAddress addr(ap->host.c_str(), ap->port);
1106           s = new InetSocket(SOCK_STREAM, 0, addr.family());
1107           s->setBlocking(false);
1108           s->connect(addr);
1109           s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1110           s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1111         } catch (Error &e) {
1112           auto *sys = dynamic_cast<SystemError *>(e.next());
1113           if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1114             // "In progress" just means the connection is in progress.
1115             // The connection is ready when the socket is writeable.
1116             // Anything else is a real problem.
1117             if (s)
1118               s->abort();
1119             delete s;
1120             s = nullptr;
1121           }
1122         }
1123 
1124         // Set up with the selector if we were successful.  If this is
1125         // the upstream collector, queue a request for updates.
1126         if (s) {
1127           Peer *p = createPeer(s);
1128           ap->peer = p;
1129 
1130           InetAddress peeraddr = ((InetSocket *)s)->peername();
1131           InetAddress myaddr = ((InetSocket *)s)->sockname();
1132           p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
1133           p->mask = IORead | IOWrite | IOUrgent;
1134           p->update = ap->update;
1135           p->automatic = ap;
1136           p->socket = s;
1137           sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1138           if (ap == &upstream_) {
1139             uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1140             p->sendq = new Bucket;
1141             p->sendq->next = nullptr;
1142             copydata(p->sendq, words, sizeof(words));
1143           }
1144 
1145           // Report the new connection.
1146           if (debug_)
1147             logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1148                     << std::endl;
1149         }
1150       }
1151     }
1152 
1153     // Pump events for a while.
1154     sel_.dispatch(delay_);
1155     now = Time::current();
1156     lock();
1157 
1158     // Check if flush is required.  Flush only if one is needed.
1159     // Always sends the full object list, but only rarely.
1160     if (flush_ && now > nextFlush) {
1161       flush_ = false;
1162       nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1163       sendObjectListToPeers(true);
1164     }
1165 
1166     // Update the data server and peer selection masks.  If we
1167     // have no more data to send and listening for writes, remove
1168     // the write mask.  If we have something to write and aren't
1169     // listening for writes, start listening so we can send off
1170     // the data.
1171     updatePeerMasks();
1172 
1173     // Release peers that have been waiting for data for too long.
1174     Time waitold = now - waitMax_;
1175     Time waitstale = now - waitStale_;
1176     for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1177       Object *o = findObject(nullptr, i->name);
1178 
1179       // If we have (stale) object data, wait only up to stale limit.
1180       // Otherwise if we have no data at all, wait up to the max limit.
1181       if (i->time < waitold) {
1182         logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1183                 << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1184         releaseFromWait(i++, o);
1185       } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1186         logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1187                 << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1188         releaseFromWait(i++, o);
1189       }
1190 
1191       // Keep it for now.
1192       else
1193         ++i;
1194     }
1195 
1196     unlock();
1197   }
1198 }
1199 
1200 // Tell the network cache that there have been local changes that
1201 // should be advertised to the downstream listeners.
1202 void DQMNet::sendLocalChanges() {
1203   char byte = 0;
1204   wakeup_.sink()->write(&byte, 1);
1205 }
1206 
1207 //////////////////////////////////////////////////////////////////////
1208 //////////////////////////////////////////////////////////////////////
1209 //////////////////////////////////////////////////////////////////////
1210 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */) : DQMImplNet<DQMNet::Object>(appname) {
1211   local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1212 }
1213 
1214 /// Give a hint of how much capacity to allocate for local objects.
1215 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.reserve(size); }
1216 
1217 /// Update the network cache for an object.  The caller must call
1218 /// sendLocalChanges() later to push out the changes.
1219 void DQMBasicNet::updateLocalObject(Object &o) {
1220   o.dirname = *local_->dirs.insert(o.dirname).first;
1221   std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1222   if (!info.second) {
1223     // Somewhat hackish. Sets are supposedly immutable, but we
1224     // need to change the non-key parts of the object. Erasing
1225     // and re-inserting would produce too much memory churn.
1226     auto &old = const_cast<Object &>(*info.first);
1227     std::swap(old.flags, o.flags);
1228     std::swap(old.tag, o.tag);
1229     std::swap(old.version, o.version);
1230     std::swap(old.qreports, o.qreports);
1231     std::swap(old.rawdata, o.rawdata);
1232     std::swap(old.scalar, o.scalar);
1233     std::swap(old.qdata, o.qdata);
1234   }
1235 }
1236 
1237 /// Delete all local objects not in @a known.  Returns true if
1238 /// something was removed.  The caller must call sendLocalChanges()
1239 /// later to push out the changes.
1240 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1241   size_t removed = 0;
1242   std::string path;
1243   ObjectMap::iterator i, e;
1244   for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1245     path.clear();
1246     path.reserve(i->dirname.size() + i->objname.size() + 2);
1247     path += i->dirname;
1248     if (!path.empty())
1249       path += '/';
1250     path += i->objname;
1251 
1252     if (!known.count(path))
1253       ++removed, local_->objs.erase(i++);
1254     else
1255       ++i;
1256   }
1257 
1258   return removed > 0;
1259 }