Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:10

0001 #include "DQMServices/Core/interface/DQMNet.h"
0002 #include "classlib/iobase/InetServerSocket.h"
0003 #include "classlib/iobase/LocalServerSocket.h"
0004 #include "classlib/sysapi/InetSocket.h"  // for completing InetAddress
0005 #include "classlib/utils/SystemError.h"
0006 #include "classlib/utils/Regexp.h"
0007 #include <fmt/format.h>
0008 #include <unistd.h>
0009 #include <fcntl.h>
0010 #include <sys/wait.h>
0011 #include <cstdio>
0012 #include <cstdint>
0013 #include <iostream>
0014 #include <sstream>
0015 #include <cassert>
0016 #include <cfloat>
0017 #include <cinttypes>
0018 
0019 #include "FWCore/Utilities/interface/EDMException.h"
0020 
0021 #if __APPLE__
0022 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
0023 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
0024 #else
0025 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
0026 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
0027 #endif
0028 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
0029 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
0030 
0031 using namespace lat;
0032 
0033 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
0034 
0035 // TODO: Can't include the header file since that leads to ambiguities.
0036 namespace dqm {
0037   namespace qstatus {
0038     static const int STATUS_OK = 100;  //< Test was succesful.
0039     static const int WARNING = 200;    //< Test had some problems.
0040     static const int ERROR = 300;      //< Test has failed.
0041   }                                    // namespace qstatus
0042 }  // namespace dqm
0043 
0044 //////////////////////////////////////////////////////////////////////
0045 // Generate log prefix.
0046 std::ostream &DQMNet::logme() {
0047   Time now = Time::current();
0048   return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
0049                    << "]: ";
0050 }
0051 
0052 // Append data into a bucket.
0053 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
0054   b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
0055 }
0056 
0057 // Discard a bucket chain.
0058 void DQMNet::discard(Bucket *&b) {
0059   while (b) {
0060     Bucket *next = b->next;
0061     delete b;
0062     b = next;
0063   }
0064 }
0065 
0066 //////////////////////////////////////////////////////////////////////
0067 /** Handle errors with a peer socket.  Zaps the socket send queue,
0068     the socket itself, detaches the socket from the selector, and
0069     purges any pending wait requests linked to the socket.  */
0070 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
0071   if (reason)
0072     logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
0073 
0074   Socket *s = peer->socket;
0075 
0076   for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0077     if (i->peer == peer)
0078       waiting_.erase(i++);
0079     else
0080       ++i;
0081 
0082   if (ev)
0083     ev->source = nullptr;
0084 
0085   discard(peer->sendq);
0086   if (peer->automatic)
0087     peer->automatic->peer = nullptr;
0088 
0089   sel_.detach(s);
0090   s->close();
0091   removePeer(peer, s);
0092   delete s;
0093 }
0094 
0095 /// Queue an object request to the data server.
0096 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
0097   // Issue request to peer.
0098   Bucket **msg = &p->sendq;
0099   while (*msg)
0100     msg = &(*msg)->next;
0101   *msg = new Bucket;
0102   (*msg)->next = nullptr;
0103 
0104   uint32_t words[3];
0105   words[0] = sizeof(words) + len;
0106   words[1] = DQM_MSG_GET_OBJECT;
0107   words[2] = len;
0108   copydata(*msg, words, sizeof(words));
0109   copydata(*msg, name, len);
0110 }
0111 
0112 /// Queue a request for an object and put a peer into the mode of
0113 /// waiting for object data to appear.
0114 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
0115   // FIXME: Should we automatically record which exact peer the waiter
0116   // is expecting to deliver data so we know to release the waiter if
0117   // the other peer vanishes?  The current implementation stands a
0118   // chance for the waiter to wait indefinitely -- although we do
0119   // force terminate the wait after a while.
0120   requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
0121   WaitObject wo = {Time::current(), name, info, p};
0122   waiting_.push_back(wo);
0123   p->waiting++;
0124 }
0125 
0126 // Once an object has been updated, this is invoked for all waiting
0127 // peers.  Send the object back to the peer in suitable form.
0128 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
0129   Bucket **msg = &i->peer->sendq;
0130   while (*msg)
0131     msg = &(*msg)->next;
0132   *msg = new Bucket;
0133   (*msg)->next = nullptr;
0134 
0135   releaseFromWait(*msg, *i, o);
0136 
0137   assert(i->peer->waiting > 0);
0138   i->peer->waiting--;
0139   waiting_.erase(i);
0140 }
0141 
0142 // Release everyone waiting for the object @a o.
0143 void DQMNet::releaseWaiters(const std::string &name, Object *o) {
0144   for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0145     if (i->name == name)
0146       releaseFromWait(i++, o);
0147     else
0148       ++i;
0149 }
0150 
0151 //////////////////////////////////////////////////////////////////////
0152 /// Pack quality results in @a qr into a string @a into for
0153 /// peristent storage, such as network transfer or archival.
0154 void DQMNet::packQualityData(std::string &into, const QReports &qr) {
0155   char buf[64];
0156   std::ostringstream qrs;
0157   QReports::const_iterator qi, qe;
0158   for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
0159     int pos = 0;
0160     sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
0161     qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
0162         << '\0';
0163   }
0164   into = qrs.str();
0165 }
0166 
0167 /// Unpack the quality results from string @a from into @a qr.
0168 /// Assumes the data was saved with packQualityData().
0169 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
0170   const char *qdata = from;
0171 
0172   // Count how many qresults there are.
0173   size_t nqv = 0;
0174   while (*qdata) {
0175     ++nqv;
0176     while (*qdata)
0177       ++qdata;
0178     ++qdata;
0179     while (*qdata)
0180       ++qdata;
0181     ++qdata;
0182     while (*qdata)
0183       ++qdata;
0184     ++qdata;
0185     while (*qdata)
0186       ++qdata;
0187     ++qdata;
0188     while (*qdata)
0189       ++qdata;
0190     ++qdata;
0191   }
0192 
0193   // Now extract the qreports.
0194   qdata = from;
0195   qr.reserve(nqv);
0196   while (*qdata) {
0197     qr.emplace_back();
0198     DQMNet::QValue &qv = qr.back();
0199 
0200     qv.code = atoi(qdata);
0201     while (*qdata)
0202       ++qdata;
0203     switch (qv.code) {
0204       case dqm::qstatus::STATUS_OK:
0205         break;
0206       case dqm::qstatus::WARNING:
0207         flags |= DQMNet::DQM_PROP_REPORT_WARN;
0208         break;
0209       case dqm::qstatus::ERROR:
0210         flags |= DQMNet::DQM_PROP_REPORT_ERROR;
0211         break;
0212       default:
0213         flags |= DQMNet::DQM_PROP_REPORT_OTHER;
0214         break;
0215     }
0216 
0217     qv.qtresult = atof(++qdata);
0218     while (*qdata)
0219       ++qdata;
0220 
0221     qv.qtname = ++qdata;
0222     while (*qdata)
0223       ++qdata;
0224 
0225     qv.algorithm = ++qdata;
0226     while (*qdata)
0227       ++qdata;
0228 
0229     qv.message = ++qdata;
0230     while (*qdata)
0231       ++qdata;
0232     ++qdata;
0233   }
0234 }
0235 
0236 #if 0
0237 // Deserialise a ROOT object from a buffer at the current position.
0238 static TObject *
0239 extractNextObject(TBufferFile &buf)
0240 {
0241   if (buf.Length() == buf.BufferSize())
0242     return 0;
0243 
0244   buf.InitMap();
0245   Int_t pos = buf.Length();
0246   TClass *c = buf.ReadClass();
0247   buf.SetBufferOffset(pos);
0248   buf.ResetMap();
0249   return c ? buf.ReadObject(c) : 0;
0250 }
0251 
0252 // Reconstruct an object from the raw data.
0253 bool
0254 DQMNet::reconstructObject(Object &o)
0255 {
0256   TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
0257   buf.Reset();
0258 
0259   // Extract the main object.
0260   if (! (o.object = extractNextObject(buf)))
0261     return false;
0262   
0263   // Extract the reference object.
0264   o.reference = extractNextObject(buf);
0265 
0266   // Extract quality reports.
0267   unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
0268   return true;
0269 }
0270 #endif
0271 
0272 #if 0
0273 bool
0274 DQMNet::reinstateObject(DQMStore *store, Object &o)
0275 {
0276   if (! reconstructObject (o))
0277     return false;
0278 
0279   // Reconstruct the main object
0280   MonitorElement *obj = 0;
0281   store->setCurrentFolder(o.dirname);
0282   switch (o.flags & DQM_PROP_TYPE_MASK)
0283   {
0284   case DQM_PROP_TYPE_INT:
0285     obj = store->bookInt(o.objname);
0286     obj->Fill(atoll(o.scalar.c_str()));
0287     break;
0288 
0289   case DQM_PROP_TYPE_REAL:
0290     obj = store->bookFloat(name);
0291     obj->Fill(atof(o.scalar.c_str()));
0292     break;
0293 
0294   case DQM_PROP_TYPE_STRING:
0295     obj = store->bookString(name, o.scalar);
0296     break;
0297 
0298   case DQM_PROP_TYPE_TH1F:
0299     obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
0300     break;
0301 
0302   case DQM_PROP_TYPE_TH1S:
0303     obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
0304     break;
0305 
0306   case DQM_PROP_TYPE_TH1D:
0307     obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
0308     break;
0309 
0310   case DQM_PROP_TYPE_TH1I:
0311     obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
0312     break;
0313 
0314   case DQM_PROP_TYPE_TH2F:
0315     obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
0316     break;
0317 
0318   case DQM_PROP_TYPE_TH2S:
0319     obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
0320     break;
0321 
0322   case DQM_PROP_TYPE_TH2D:
0323     obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
0324     break;
0325 
0326  case DQM_PROP_TYPE_TH2I:
0327     obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
0328     break;
0329 
0330   case DQM_PROP_TYPE_TH3F:
0331     obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
0332     break;
0333 
0334   case DQM_PROP_TYPE_TH3S:
0335     obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
0336     break;
0337 
0338   case DQM_PROP_TYPE_TH3D:
0339     obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
0340     break;
0341 
0342   case DQM_PROP_TYPE_PROF:
0343     obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
0344     break;
0345 
0346   case DQM_PROP_TYPE_PROF2D:
0347     obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
0348     break;
0349 
0350   default:
0351     logme()
0352       << "ERROR: unexpected monitor element of type "
0353       << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
0354       << o.dirname << '/' << o.objname << "'\n";
0355     return false;
0356   }
0357 
0358   // Reconstruct tag and qreports.
0359   if (obj)
0360   {
0361     obj->data_.tag = o.tag;
0362     obj->data_.qreports = o.qreports;
0363   }
0364 
0365   // Inidicate success.
0366   return true;
0367 }
0368 #endif
0369 
0370 //////////////////////////////////////////////////////////////////////
0371 // Check if the network layer should stop.
0372 bool DQMNet::shouldStop() { return shutdown_; }
0373 
0374 // Once an object has been updated, this is invoked for all waiting
0375 // peers.  Send the requested object to the waiting peer.
0376 void DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o) {
0377   if (o)
0378     sendObjectToPeer(msg, *o, true);
0379   else {
0380     uint32_t words[3];
0381     words[0] = sizeof(words) + w.name.size();
0382     words[1] = DQM_REPLY_NONE;
0383     words[2] = w.name.size();
0384 
0385     msg->data.reserve(msg->data.size() + words[0]);
0386     copydata(msg, &words[0], sizeof(words));
0387     copydata(msg, &w.name[0], w.name.size());
0388   }
0389 }
0390 
0391 // Send an object to a peer.  If not @a data, only sends a summary
0392 // without the object data, except the data is always sent for scalar
0393 // objects.
0394 void DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data) {
0395   uint32_t flags = o.flags & ~DQM_PROP_DEAD;
0396   DataBlob objdata;
0397 
0398   if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
0399     objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
0400   else if (data)
0401     objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
0402 
0403   uint32_t words[9];
0404   uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
0405   uint32_t datalen = objdata.size();
0406   uint32_t qlen = o.qdata.size();
0407 
0408   if (o.dirname.empty())
0409     --namelen;
0410 
0411   words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
0412   words[1] = DQM_REPLY_OBJECT;
0413   words[2] = flags;
0414   words[3] = (o.version >> 0) & 0xffffffff;
0415   words[4] = (o.version >> 32) & 0xffffffff;
0416   words[5] = o.tag;
0417   words[6] = namelen;
0418   words[7] = datalen;
0419   words[8] = qlen;
0420 
0421   msg->data.reserve(msg->data.size() + words[0]);
0422   copydata(msg, &words[0], 9 * sizeof(uint32_t));
0423   if (namelen) {
0424     copydata(msg, &(o.dirname)[0], o.dirname.size());
0425     if (!o.dirname.empty())
0426       copydata(msg, "/", 1);
0427     copydata(msg, &o.objname[0], o.objname.size());
0428   }
0429   if (datalen)
0430     copydata(msg, &objdata[0], datalen);
0431   if (qlen)
0432     copydata(msg, &o.qdata[0], qlen);
0433 }
0434 
0435 //////////////////////////////////////////////////////////////////////
0436 // Handle peer messages.
0437 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
0438   // Decode and process this message.
0439   uint32_t type;
0440   memcpy(&type, data + sizeof(uint32_t), sizeof(type));
0441   switch (type) {
0442     case DQM_MSG_UPDATE_ME: {
0443       if (len != 2 * sizeof(uint32_t)) {
0444         logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
0445         return false;
0446       }
0447 
0448       if (debug_)
0449         logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
0450 
0451       p->update = true;
0452     }
0453       return true;
0454 
0455     case DQM_MSG_LIST_OBJECTS: {
0456       if (debug_)
0457         logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
0458 
0459       // Send over current status: list of known objects.
0460       sendObjectListToPeer(msg, true, false);
0461     }
0462       return true;
0463 
0464     case DQM_MSG_GET_OBJECT: {
0465       if (debug_)
0466         logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
0467 
0468       if (len < 3 * sizeof(uint32_t)) {
0469         logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0470         return false;
0471       }
0472 
0473       uint32_t namelen;
0474       memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
0475       if (len != 3 * sizeof(uint32_t) + namelen) {
0476         logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
0477                 << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
0478         return false;
0479       }
0480 
0481       std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
0482       Peer *owner = nullptr;
0483       Object *o = findObject(nullptr, name, &owner);
0484       if (o) {
0485         o->lastreq = Time::current().ns();
0486         if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
0487             (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0488           waitForData(p, name, "", owner);
0489         else
0490           sendObjectToPeer(msg, *o, true);
0491       } else {
0492         uint32_t words[3];
0493         words[0] = sizeof(words) + name.size();
0494         words[1] = DQM_REPLY_NONE;
0495         words[2] = name.size();
0496 
0497         msg->data.reserve(msg->data.size() + words[0]);
0498         copydata(msg, &words[0], sizeof(words));
0499         copydata(msg, &name[0], name.size());
0500       }
0501     }
0502       return true;
0503 
0504     case DQM_REPLY_LIST_BEGIN: {
0505       if (len != 4 * sizeof(uint32_t)) {
0506         logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
0507         return false;
0508       }
0509 
0510       // Get the update status: whether this is a full update.
0511       uint32_t flags;
0512       memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0513 
0514       if (debug_)
0515         logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
0516                 << p->peeraddr << ", size " << len << std::endl;
0517 
0518       // If we are about to receive a full list of objects, flag all
0519       // objects as possibly dead.  Subsequent object notifications
0520       // will undo this for the live objects.  We cannot delete
0521       // objects quite yet, as we may get inquiry from another client
0522       // while we are processing the incoming list, so we keep the
0523       // objects tentatively alive as long as we've not seen the end.
0524       if (flags)
0525         markObjectsDead(p);
0526     }
0527       return true;
0528 
0529     case DQM_REPLY_LIST_END: {
0530       if (len != 4 * sizeof(uint32_t)) {
0531         logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
0532         return false;
0533       }
0534 
0535       // Get the update status: whether this is a full update.
0536       uint32_t flags;
0537       memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0538 
0539       // If we received a full list of objects, now purge all dead
0540       // objects. We need to do this in two stages in case we receive
0541       // updates in many parts, and end up sending updates to others in
0542       // between; this avoids us lying live objects are dead.
0543       if (flags)
0544         purgeDeadObjects(p);
0545 
0546       if (debug_)
0547         logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
0548                 << ", size " << len << std::endl;
0549 
0550       // Indicate we have received another update from this peer.
0551       // Also indicate we should flush to our clients.
0552       flush_ = true;
0553       p->updates++;
0554     }
0555       return true;
0556 
0557     case DQM_REPLY_OBJECT: {
0558       uint32_t words[9];
0559       if (len < sizeof(words)) {
0560         logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
0561         return false;
0562       }
0563 
0564       memcpy(&words[0], data, sizeof(words));
0565       uint32_t &namelen = words[6];
0566       uint32_t &datalen = words[7];
0567       uint32_t &qlen = words[8];
0568 
0569       if (len != sizeof(words) + namelen + datalen + qlen) {
0570         logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
0571                 << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
0572                 << std::endl;
0573         return false;
0574       }
0575 
0576       unsigned char *namedata = data + sizeof(words);
0577       unsigned char *objdata = namedata + namelen;
0578       unsigned char *qdata = objdata + datalen;
0579       unsigned char *enddata = qdata + qlen;
0580       std::string name((char *)namedata, namelen);
0581       assert(enddata == data + len);
0582 
0583       if (debug_)
0584         logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
0585                 << std::endl;
0586 
0587       // Mark the peer as a known object source.
0588       p->source = true;
0589 
0590       // Initialise or update an object entry.
0591       Object *o = findObject(p, name);
0592       if (!o)
0593         o = makeObject(p, name);
0594 
0595       o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
0596       o->tag = words[5];
0597       o->version = ((uint64_t)words[4] << 32 | words[3]);
0598       o->scalar.clear();
0599       o->qdata.clear();
0600       if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
0601         o->rawdata.clear();
0602         o->scalar.insert(o->scalar.end(), objdata, qdata);
0603       } else if (datalen) {
0604         o->rawdata.clear();
0605         o->rawdata.insert(o->rawdata.end(), objdata, qdata);
0606       } else if (!o->rawdata.empty())
0607         o->flags |= DQM_PROP_STALE;
0608       o->qdata.insert(o->qdata.end(), qdata, enddata);
0609 
0610       // If we had an object for this one already and this is a list
0611       // update without data, issue an immediate data get request.
0612       if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0613         requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
0614 
0615       // If we have the object data, release from wait.
0616       if (datalen)
0617         releaseWaiters(name, o);
0618     }
0619       return true;
0620 
0621     case DQM_REPLY_NONE: {
0622       uint32_t words[3];
0623       if (len < sizeof(words)) {
0624         logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0625         return false;
0626       }
0627 
0628       memcpy(&words[0], data, sizeof(words));
0629       uint32_t &namelen = words[2];
0630 
0631       if (len != sizeof(words) + namelen) {
0632         logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
0633                 << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
0634         return false;
0635       }
0636 
0637       unsigned char *namedata = data + sizeof(words);
0638       std::string name((char *)namedata, namelen);
0639 
0640       if (debug_)
0641         logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
0642                 << std::endl;
0643 
0644       // Mark the peer as a known object source.
0645       p->source = true;
0646 
0647       // If this was a known object, kill it.
0648       if (Object *o = findObject(p, name)) {
0649         o->flags |= DQM_PROP_DEAD;
0650         purgeDeadObjects(p);
0651       }
0652 
0653       // If someone was waiting for this, let them go.
0654       releaseWaiters(name, nullptr);
0655     }
0656       return true;
0657 
0658     default:
0659       logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
0660               << std::endl;
0661       return false;
0662   }
0663 }
0664 
0665 //////////////////////////////////////////////////////////////////////
0666 /// Handle communication to a particular client.
0667 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
0668   lock();
0669   assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
0670 
0671   // If there is a problem with the peer socket, discard the peer
0672   // and tell the selector to stop prcessing events for it.  If
0673   // this is a server connection, we will eventually recreate
0674   // everything if/when the data server comes back.
0675   if (ev->events & IOUrgent) {
0676     if (p->automatic) {
0677       logme() << "WARNING: connection to the DQM server at " << p->peeraddr
0678               << " lost (will attempt to reconnect in 15 seconds)\n";
0679       losePeer(nullptr, p, ev);
0680     } else
0681       losePeer("WARNING: lost peer connection ", p, ev);
0682 
0683     unlock();
0684     return true;
0685   }
0686 
0687   // If we can write to the peer socket, pump whatever we can into it.
0688   if (ev->events & IOWrite) {
0689     while (Bucket *b = p->sendq) {
0690       IOSize len = b->data.size() - p->sendpos;
0691       const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
0692       IOSize done;
0693 
0694       try {
0695         done = (len ? ev->source->write(data, len) : 0);
0696         if (debug_ && len)
0697           logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
0698       } catch (Error &e) {
0699         losePeer("WARNING: unable to write to peer ", p, ev, &e);
0700         unlock();
0701         return true;
0702       }
0703 
0704       p->sendpos += done;
0705       if (p->sendpos == b->data.size()) {
0706         Bucket *old = p->sendq;
0707         p->sendq = old->next;
0708         p->sendpos = 0;
0709         old->next = nullptr;
0710         discard(old);
0711       }
0712 
0713       if (!done && len)
0714         // Cannot write any more.
0715         break;
0716     }
0717   }
0718 
0719   // If there is data to be read from the peer, first receive what we
0720   // can get out the socket, the process all complete requests.
0721   if (ev->events & IORead) {
0722     // First build up the incoming buffer of data in the socket.
0723     // Remember the last size returned by the socket; we need
0724     // it to determine if the remote end closed the connection.
0725     IOSize sz;
0726     try {
0727       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
0728       do
0729         if ((sz = ev->source->read(&buf[0], buf.size()))) {
0730           if (debug_)
0731             logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
0732           DataBlob &data = p->incoming;
0733           if (data.capacity() < data.size() + sz)
0734             data.reserve(data.size() + SOCKET_READ_GROWTH);
0735           data.insert(data.end(), &buf[0], &buf[0] + sz);
0736         }
0737       while (sz == sizeof(buf));
0738     } catch (Error &e) {
0739       auto *next = dynamic_cast<SystemError *>(e.next());
0740       if (next && next->portable() == SysErr::ErrTryAgain)
0741         sz = 1;  // Ignore it, and fake no end of data.
0742       else {
0743         // Houston we have a problem.
0744         losePeer("WARNING: failed to read from peer ", p, ev, &e);
0745         unlock();
0746         return true;
0747       }
0748     }
0749 
0750     // Process fully received messages as long as we can.
0751     size_t consumed = 0;
0752     DataBlob &data = p->incoming;
0753     while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
0754       uint32_t msglen;
0755       memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
0756 
0757       if (msglen >= MESSAGE_SIZE_LIMIT) {
0758         losePeer("WARNING: excessively large message from ", p, ev);
0759         unlock();
0760         return true;
0761       }
0762 
0763       if (data.size() - consumed >= msglen) {
0764         bool valid = true;
0765         if (msglen < 2 * sizeof(uint32_t)) {
0766           logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
0767           valid = false;
0768         } else {
0769           // Decode and process this message.
0770           Bucket msg;
0771           msg.next = nullptr;
0772           valid = onMessage(&msg, p, &data[0] + consumed, msglen);
0773 
0774           // If we created a response, chain it to the write queue.
0775           if (!msg.data.empty()) {
0776             Bucket **prev = &p->sendq;
0777             while (*prev)
0778               prev = &(*prev)->next;
0779 
0780             *prev = new Bucket;
0781             (*prev)->next = nullptr;
0782             (*prev)->data.swap(msg.data);
0783           }
0784         }
0785 
0786         if (!valid) {
0787           losePeer("WARNING: data stream error with ", p, ev);
0788           unlock();
0789           return true;
0790         }
0791 
0792         consumed += msglen;
0793       } else
0794         break;
0795     }
0796 
0797     data.erase(data.begin(), data.begin() + consumed);
0798 
0799     // If the client has closed the connection, shut down our end.  If
0800     // we have something to send back still, leave the write direction
0801     // open.  Otherwise close the shop for this client.
0802     if (sz == 0)
0803       sel_.setMask(p->socket, p->mask &= ~IORead);
0804   }
0805 
0806   // Yes, please keep processing events for this socket.
0807   unlock();
0808   return false;
0809 }
0810 
0811 /** Respond to new connections on the server socket.  Accepts the
0812     connection and creates a new socket for the peer, and sets it up
0813     for further communication.  Returns @c false always to tell the
0814     IOSelector to keep processing events for the server socket.  */
0815 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
0816   // Recover the server socket.
0817   assert(ev->source == server_);
0818 
0819   // Accept the connection.
0820   Socket *s = server_->accept();
0821   assert(s);
0822   assert(!s->isBlocking());
0823 
0824   // Record it to our list of peers.
0825   lock();
0826   Peer *p = createPeer(s);
0827   std::string localaddr;
0828   if (auto *inet = dynamic_cast<InetSocket *>(s)) {
0829     InetAddress peeraddr = inet->peername();
0830     InetAddress myaddr = inet->sockname();
0831     p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
0832     localaddr = fmt::format("{}:{}", myaddr.hostname(), myaddr.port());
0833   } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
0834     p->peeraddr = local->peername().path();
0835     localaddr = local->sockname().path();
0836   } else
0837     assert(false);
0838 
0839   p->mask = IORead | IOUrgent;
0840   p->socket = s;
0841 
0842   // Report the new connection.
0843   if (debug_)
0844     logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
0845 
0846   // Attach it to the listener.
0847   sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
0848   unlock();
0849 
0850   // We are never done.
0851   return false;
0852 }
0853 
0854 /** React to notifications from the DQM thread.  This is a simple
0855     message to tell this thread to wake up and send unsollicited
0856     updates to the peers when new DQM data appears.  We don't send
0857     the updates here, but just set a flag to tell the main event
0858     pump to send a notification later.  This avoids sending
0859     unnecessarily frequent DQM object updates.  */
0860 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
0861   // Discard the data in the pipe, we care only about the wakeup.
0862   try {
0863     IOSize sz;
0864     unsigned char buf[1024];
0865     while ((sz = ev->source->read(buf, sizeof(buf))))
0866       ;
0867   } catch (Error &e) {
0868     auto *next = dynamic_cast<SystemError *>(e.next());
0869     if (next && next->portable() == SysErr::ErrTryAgain)
0870       ;  // Ignore it
0871     else
0872       logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
0873   }
0874 
0875   // Tell the main event pump to send an update in a little while.
0876   flush_ = true;
0877 
0878   // We are never done, always keep going.
0879   return false;
0880 }
0881 
0882 /// Update the selector mask for a peer based on data queues.  Close
0883 /// the connection if there is no reason to maintain it open.
0884 void DQMNet::updateMask(Peer *p) {
0885   if (!p->socket)
0886     return;
0887 
0888   // Listen to writes iff we have data to send.
0889   unsigned oldmask = p->mask;
0890   if (!p->sendq && (p->mask & IOWrite))
0891     sel_.setMask(p->socket, p->mask &= ~IOWrite);
0892 
0893   if (p->sendq && !(p->mask & IOWrite))
0894     sel_.setMask(p->socket, p->mask |= IOWrite);
0895 
0896   if (debug_ && oldmask != p->mask)
0897     logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
0898 
0899   // If we have nothing more to send and are no longer listening
0900   // for reads, close up the shop for this peer.
0901   if (p->mask == IOUrgent && !p->waiting) {
0902     assert(!p->sendq);
0903     if (debug_)
0904       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
0905     losePeer(nullptr, p, nullptr);
0906   }
0907 }
0908 
0909 //////////////////////////////////////////////////////////////////////
0910 DQMNet::DQMNet(const std::string &appname /* = "" */)
0911     : debug_(false),
0912       appname_(appname.empty() ? "DQMNet" : appname.c_str()),
0913       pid_(getpid()),
0914       server_(nullptr),
0915       version_(Time::current()),
0916       communicate_((pthread_t)-1),
0917       shutdown_(0),
0918       delay_(1000),
0919       waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
0920       waitMax_(0, 0, 0, 5 /* seconds */, 0),
0921       flush_(false) {
0922   // Create a pipe for the local DQM to tell the communicator
0923   // thread that local DQM data has changed and that the peers
0924   // should be notified.
0925   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
0926   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
0927 
0928   // Initialise the upstream and downstream to empty.
0929   upstream_.peer = downstream_.peer = nullptr;
0930   upstream_.next = downstream_.next = 0;
0931   upstream_.port = downstream_.port = 0;
0932   upstream_.update = downstream_.update = false;
0933 }
0934 
0935 DQMNet::~DQMNet() {
0936   // FIXME
0937 }
0938 
0939 /// Enable or disable verbose debugging.  Must be called before
0940 /// calling run() or start().
0941 void DQMNet::debug(bool doit) { debug_ = doit; }
0942 
0943 /// Set the I/O dispatching delay.  Must be called before calling
0944 /// run() or start().
0945 void DQMNet::delay(int delay) { delay_ = delay; }
0946 
0947 /// Set the time limit for waiting updates to stale objects.
0948 /// Once limit has been exhausted whatever data exists is returned.
0949 /// Applies only when data has been received, another time limit is
0950 /// applied when no data payload has been received at all.
0951 void DQMNet::staleObjectWaitLimit(lat::TimeSpan time) { waitStale_ = time; }
0952 
0953 /// Start a server socket for accessing this DQM node remotely.  Must
0954 /// be called before calling run() or start().  May throw an Exception
0955 /// if the server socket cannot be initialised.
0956 void DQMNet::startLocalServer(int port) {
0957   if (server_) {
0958     logme() << "ERROR: DQM server was already started.\n";
0959     return;
0960   }
0961 
0962   try {
0963     InetAddress addr("0.0.0.0", port);
0964     auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
0965     s->bind(addr);
0966     s->listen(10);
0967     s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0968     s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0969     s->setBlocking(false);
0970     sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0971   } catch (Error &e) {
0972     // FIXME: Do we need to do this when we throw an exception anyway?
0973     // FIXME: Abort instead?
0974     logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
0975 
0976     throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
0977 
0978         port << ": " << e.explain().c_str();
0979   }
0980 
0981   logme() << "INFO: DQM server started at port " << port << std::endl;
0982 }
0983 
0984 /// Start a server socket for accessing this DQM node over a file
0985 /// system socket.  Must be called before calling run() or start().
0986 /// May throw an Exception if the server socket cannot be initialised.
0987 void DQMNet::startLocalServer(const char *path) {
0988   if (server_) {
0989     logme() << "ERROR: DQM server was already started.\n";
0990     return;
0991   }
0992 
0993   try {
0994     server_ = new LocalServerSocket(path, 10);
0995     server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0996     server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0997     server_->setBlocking(false);
0998     sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0999   } catch (Error &e) {
1000     // FIXME: Do we need to do this when we throw an exception anyway?
1001     // FIXME: Abort instead?
1002     logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1003 
1004     throw cms::Exception("DQMNet::startLocalServer")
1005         << "Failed to start server at path " << path << ": " << e.explain().c_str();
1006   }
1007 
1008   logme() << "INFO: DQM server started at path " << path << std::endl;
1009 }
1010 
1011 /// Tell the network layer to connect to @a host and @a port and
1012 /// automatically send updates whenever local DQM data changes.  Must
1013 /// be called before calling run() or start().
1014 void DQMNet::updateToCollector(const std::string &host, int port) {
1015   if (!downstream_.host.empty()) {
1016     logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1017             << std::endl;
1018     return;
1019   }
1020 
1021   downstream_.update = true;
1022   downstream_.host = host;
1023   downstream_.port = port;
1024 }
1025 
1026 /// Tell the network layer to connect to @a host and @a port and
1027 /// automatically receive updates from upstream DQM sources.  Must be
1028 /// called before calling run() or start().
1029 void DQMNet::listenToCollector(const std::string &host, int port) {
1030   if (!upstream_.host.empty()) {
1031     logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1032             << std::endl;
1033     return;
1034   }
1035 
1036   upstream_.update = false;
1037   upstream_.host = host;
1038   upstream_.port = port;
1039 }
1040 
1041 /// Stop the network layer and wait it to finish.
1042 void DQMNet::shutdown() {
1043   shutdown_ = 1;
1044   if (communicate_ != (pthread_t)-1)
1045     pthread_join(communicate_, nullptr);
1046 }
1047 
1048 /** A thread to communicate with the distributed memory cache peers.
1049     All this does is run the loop to respond to new connections.
1050     Much of the actual work is done when a new connection is
1051     received, and in pumping data around in response to actual
1052     requests.  */
1053 static void *communicate(void *obj) {
1054   sigset_t sigs;
1055   sigfillset(&sigs);
1056   pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1057   ((DQMNet *)obj)->run();
1058   return nullptr;
1059 }
1060 
1061 /// Acquire a lock on the DQM net layer.
1062 void DQMNet::lock() {
1063   if (communicate_ != (pthread_t)-1)
1064     pthread_mutex_lock(&lock_);
1065 }
1066 
1067 /// Release the lock on the DQM net layer.
1068 void DQMNet::unlock() {
1069   if (communicate_ != (pthread_t)-1)
1070     pthread_mutex_unlock(&lock_);
1071 }
1072 
1073 /// Start running the network layer in a new thread.  This is an
1074 /// exclusive alternative to the run() method, which runs the network
1075 /// layer in the caller's thread.
1076 void DQMNet::start() {
1077   if (communicate_ != (pthread_t)-1) {
1078     logme() << "ERROR: DQM networking thread has already been started\n";
1079     return;
1080   }
1081 
1082   pthread_mutex_init(&lock_, nullptr);
1083   pthread_create(&communicate_, nullptr, &communicate, this);
1084 }
1085 
1086 /** Run the actual I/O processing loop. */
1087 void DQMNet::run() {
1088   Time now;
1089   Time nextFlush = 0;
1090   AutoPeer *automatic[2] = {&upstream_, &downstream_};
1091 
1092   // Perform I/O.  Every once in a while flush updates to peers.
1093   while (!shouldStop()) {
1094     for (auto ap : automatic) {
1095       // If we need a server connection and don't have one yet,
1096       // initiate asynchronous connection creation.  Swallow errors
1097       // in case the server won't talk to us.
1098       if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1099         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1100         InetSocket *s = nullptr;
1101         try {
1102           InetAddress addr(ap->host.c_str(), ap->port);
1103           s = new InetSocket(SOCK_STREAM, 0, addr.family());
1104           s->setBlocking(false);
1105           s->connect(addr);
1106           s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1107           s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1108         } catch (Error &e) {
1109           auto *sys = dynamic_cast<SystemError *>(e.next());
1110           if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1111             // "In progress" just means the connection is in progress.
1112             // The connection is ready when the socket is writeable.
1113             // Anything else is a real problem.
1114             if (s)
1115               s->abort();
1116             delete s;
1117             s = nullptr;
1118           }
1119         }
1120 
1121         // Set up with the selector if we were successful.  If this is
1122         // the upstream collector, queue a request for updates.
1123         if (s) {
1124           Peer *p = createPeer(s);
1125           ap->peer = p;
1126 
1127           InetAddress peeraddr = ((InetSocket *)s)->peername();
1128           InetAddress myaddr = ((InetSocket *)s)->sockname();
1129           p->peeraddr = fmt::format("{}:{}", peeraddr.hostname(), peeraddr.port());
1130           p->mask = IORead | IOWrite | IOUrgent;
1131           p->update = ap->update;
1132           p->automatic = ap;
1133           p->socket = s;
1134           sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1135           if (ap == &upstream_) {
1136             uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1137             p->sendq = new Bucket;
1138             p->sendq->next = nullptr;
1139             copydata(p->sendq, words, sizeof(words));
1140           }
1141 
1142           // Report the new connection.
1143           if (debug_)
1144             logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1145                     << std::endl;
1146         }
1147       }
1148     }
1149 
1150     // Pump events for a while.
1151     sel_.dispatch(delay_);
1152     now = Time::current();
1153     lock();
1154 
1155     // Check if flush is required.  Flush only if one is needed.
1156     // Always sends the full object list, but only rarely.
1157     if (flush_ && now > nextFlush) {
1158       flush_ = false;
1159       nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1160       sendObjectListToPeers(true);
1161     }
1162 
1163     // Update the data server and peer selection masks.  If we
1164     // have no more data to send and listening for writes, remove
1165     // the write mask.  If we have something to write and aren't
1166     // listening for writes, start listening so we can send off
1167     // the data.
1168     updatePeerMasks();
1169 
1170     // Release peers that have been waiting for data for too long.
1171     Time waitold = now - waitMax_;
1172     Time waitstale = now - waitStale_;
1173     for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1174       Object *o = findObject(nullptr, i->name);
1175 
1176       // If we have (stale) object data, wait only up to stale limit.
1177       // Otherwise if we have no data at all, wait up to the max limit.
1178       if (i->time < waitold) {
1179         logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1180                 << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1181         releaseFromWait(i++, o);
1182       } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1183         logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1184                 << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1185         releaseFromWait(i++, o);
1186       }
1187 
1188       // Keep it for now.
1189       else
1190         ++i;
1191     }
1192 
1193     unlock();
1194   }
1195 }
1196 
1197 // Tell the network cache that there have been local changes that
1198 // should be advertised to the downstream listeners.
1199 void DQMNet::sendLocalChanges() {
1200   char byte = 0;
1201   wakeup_.sink()->write(&byte, 1);
1202 }
1203 
1204 //////////////////////////////////////////////////////////////////////
1205 //////////////////////////////////////////////////////////////////////
1206 //////////////////////////////////////////////////////////////////////
1207 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */) : DQMImplNet<DQMNet::Object>(appname) {
1208   local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1209 }
1210 
1211 /// Give a hint of how much capacity to allocate for local objects.
1212 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.reserve(size); }
1213 
1214 /// Update the network cache for an object.  The caller must call
1215 /// sendLocalChanges() later to push out the changes.
1216 void DQMBasicNet::updateLocalObject(Object &o) {
1217   o.dirname = *local_->dirs.insert(o.dirname).first;
1218   std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1219   if (!info.second) {
1220     // Somewhat hackish. Sets are supposedly immutable, but we
1221     // need to change the non-key parts of the object. Erasing
1222     // and re-inserting would produce too much memory churn.
1223     auto &old = const_cast<Object &>(*info.first);
1224     std::swap(old.flags, o.flags);
1225     std::swap(old.tag, o.tag);
1226     std::swap(old.version, o.version);
1227     std::swap(old.qreports, o.qreports);
1228     std::swap(old.rawdata, o.rawdata);
1229     std::swap(old.scalar, o.scalar);
1230     std::swap(old.qdata, o.qdata);
1231   }
1232 }
1233 
1234 /// Delete all local objects not in @a known.  Returns true if
1235 /// something was removed.  The caller must call sendLocalChanges()
1236 /// later to push out the changes.
1237 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1238   size_t removed = 0;
1239   std::string path;
1240   ObjectMap::iterator i, e;
1241   for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1242     path.clear();
1243     path.reserve(i->dirname.size() + i->objname.size() + 2);
1244     path += i->dirname;
1245     if (!path.empty())
1246       path += '/';
1247     path += i->objname;
1248 
1249     if (!known.count(path))
1250       ++removed, local_->objs.erase(i++);
1251     else
1252       ++i;
1253   }
1254 
1255   return removed > 0;
1256 }