Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 10:59:00

0001 #include "DQMServices/Core/interface/DQMNet.h"
0002 #include "classlib/iobase/InetServerSocket.h"
0003 #include "classlib/iobase/LocalServerSocket.h"
0004 #include "classlib/iobase/Filename.h"
0005 #include "classlib/sysapi/InetSocket.h"  // for completing InetAddress
0006 #include "classlib/utils/TimeInfo.h"
0007 #include "classlib/utils/StringList.h"
0008 #include "classlib/utils/StringFormat.h"
0009 #include "classlib/utils/StringOps.h"
0010 #include "classlib/utils/SystemError.h"
0011 #include "classlib/utils/Regexp.h"
0012 #include <unistd.h>
0013 #include <fcntl.h>
0014 #include <sys/wait.h>
0015 #include <cstdio>
0016 #include <cstdint>
0017 #include <iostream>
0018 #include <sstream>
0019 #include <cassert>
0020 #include <cfloat>
0021 #include <cinttypes>
0022 
0023 #include "FWCore/Utilities/interface/EDMException.h"
0024 
0025 #if __APPLE__
0026 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
0027 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
0028 #else
0029 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
0030 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
0031 #endif
0032 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
0033 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
0034 
0035 using namespace lat;
0036 
0037 static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>");
0038 
0039 // TODO: Can't include the header file since that leads to ambiguities.
0040 namespace dqm {
0041   namespace qstatus {
0042     static const int STATUS_OK = 100;  //< Test was succesful.
0043     static const int WARNING = 200;    //< Test had some problems.
0044     static const int ERROR = 300;      //< Test has failed.
0045   }                                    // namespace qstatus
0046 }  // namespace dqm
0047 
0048 //////////////////////////////////////////////////////////////////////
0049 // Generate log prefix.
0050 std::ostream &DQMNet::logme() {
0051   Time now = Time::current();
0052   return std::cout << now.format(true, "%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) << " " << appname_ << "[" << pid_
0053                    << "]: ";
0054 }
0055 
0056 // Append data into a bucket.
0057 void DQMNet::copydata(Bucket *b, const void *data, size_t len) {
0058   b->data.insert(b->data.end(), (const unsigned char *)data, (const unsigned char *)data + len);
0059 }
0060 
0061 // Discard a bucket chain.
0062 void DQMNet::discard(Bucket *&b) {
0063   while (b) {
0064     Bucket *next = b->next;
0065     delete b;
0066     b = next;
0067   }
0068 }
0069 
0070 //////////////////////////////////////////////////////////////////////
0071 /** Handle errors with a peer socket.  Zaps the socket send queue,
0072     the socket itself, detaches the socket from the selector, and
0073     purges any pending wait requests linked to the socket.  */
0074 void DQMNet::losePeer(const char *reason, Peer *peer, IOSelectEvent *ev, Error *err) {
0075   if (reason)
0076     logme() << reason << peer->peeraddr << (err ? "; error was: " + err->explain() : std::string("")) << std::endl;
0077 
0078   Socket *s = peer->socket;
0079 
0080   for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0081     if (i->peer == peer)
0082       waiting_.erase(i++);
0083     else
0084       ++i;
0085 
0086   if (ev)
0087     ev->source = nullptr;
0088 
0089   discard(peer->sendq);
0090   if (peer->automatic)
0091     peer->automatic->peer = nullptr;
0092 
0093   sel_.detach(s);
0094   s->close();
0095   removePeer(peer, s);
0096   delete s;
0097 }
0098 
0099 /// Queue an object request to the data server.
0100 void DQMNet::requestObjectData(Peer *p, const char *name, size_t len) {
0101   // Issue request to peer.
0102   Bucket **msg = &p->sendq;
0103   while (*msg)
0104     msg = &(*msg)->next;
0105   *msg = new Bucket;
0106   (*msg)->next = nullptr;
0107 
0108   uint32_t words[3];
0109   words[0] = sizeof(words) + len;
0110   words[1] = DQM_MSG_GET_OBJECT;
0111   words[2] = len;
0112   copydata(*msg, words, sizeof(words));
0113   copydata(*msg, name, len);
0114 }
0115 
0116 /// Queue a request for an object and put a peer into the mode of
0117 /// waiting for object data to appear.
0118 void DQMNet::waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner) {
0119   // FIXME: Should we automatically record which exact peer the waiter
0120   // is expecting to deliver data so we know to release the waiter if
0121   // the other peer vanishes?  The current implementation stands a
0122   // chance for the waiter to wait indefinitely -- although we do
0123   // force terminate the wait after a while.
0124   requestObjectData(owner, !name.empty() ? &name[0] : nullptr, name.size());
0125   WaitObject wo = {Time::current(), name, info, p};
0126   waiting_.push_back(wo);
0127   p->waiting++;
0128 }
0129 
0130 // Once an object has been updated, this is invoked for all waiting
0131 // peers.  Send the object back to the peer in suitable form.
0132 void DQMNet::releaseFromWait(WaitList::iterator i, Object *o) {
0133   Bucket **msg = &i->peer->sendq;
0134   while (*msg)
0135     msg = &(*msg)->next;
0136   *msg = new Bucket;
0137   (*msg)->next = nullptr;
0138 
0139   releaseFromWait(*msg, *i, o);
0140 
0141   assert(i->peer->waiting > 0);
0142   i->peer->waiting--;
0143   waiting_.erase(i);
0144 }
0145 
0146 // Release everyone waiting for the object @a o.
0147 void DQMNet::releaseWaiters(const std::string &name, Object *o) {
0148   for (auto i = waiting_.begin(), e = waiting_.end(); i != e;)
0149     if (i->name == name)
0150       releaseFromWait(i++, o);
0151     else
0152       ++i;
0153 }
0154 
0155 //////////////////////////////////////////////////////////////////////
0156 /// Pack quality results in @a qr into a string @a into for
0157 /// peristent storage, such as network transfer or archival.
0158 void DQMNet::packQualityData(std::string &into, const QReports &qr) {
0159   char buf[64];
0160   std::ostringstream qrs;
0161   QReports::const_iterator qi, qe;
0162   for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
0163     int pos = 0;
0164     sprintf(buf, "%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
0165     qrs << buf << '\0' << buf + pos << '\0' << qi->qtname << '\0' << qi->algorithm << '\0' << qi->message << '\0'
0166         << '\0';
0167   }
0168   into = qrs.str();
0169 }
0170 
0171 /// Unpack the quality results from string @a from into @a qr.
0172 /// Assumes the data was saved with packQualityData().
0173 void DQMNet::unpackQualityData(QReports &qr, uint32_t &flags, const char *from) {
0174   const char *qdata = from;
0175 
0176   // Count how many qresults there are.
0177   size_t nqv = 0;
0178   while (*qdata) {
0179     ++nqv;
0180     while (*qdata)
0181       ++qdata;
0182     ++qdata;
0183     while (*qdata)
0184       ++qdata;
0185     ++qdata;
0186     while (*qdata)
0187       ++qdata;
0188     ++qdata;
0189     while (*qdata)
0190       ++qdata;
0191     ++qdata;
0192     while (*qdata)
0193       ++qdata;
0194     ++qdata;
0195   }
0196 
0197   // Now extract the qreports.
0198   qdata = from;
0199   qr.reserve(nqv);
0200   while (*qdata) {
0201     qr.emplace_back();
0202     DQMNet::QValue &qv = qr.back();
0203 
0204     qv.code = atoi(qdata);
0205     while (*qdata)
0206       ++qdata;
0207     switch (qv.code) {
0208       case dqm::qstatus::STATUS_OK:
0209         break;
0210       case dqm::qstatus::WARNING:
0211         flags |= DQMNet::DQM_PROP_REPORT_WARN;
0212         break;
0213       case dqm::qstatus::ERROR:
0214         flags |= DQMNet::DQM_PROP_REPORT_ERROR;
0215         break;
0216       default:
0217         flags |= DQMNet::DQM_PROP_REPORT_OTHER;
0218         break;
0219     }
0220 
0221     qv.qtresult = atof(++qdata);
0222     while (*qdata)
0223       ++qdata;
0224 
0225     qv.qtname = ++qdata;
0226     while (*qdata)
0227       ++qdata;
0228 
0229     qv.algorithm = ++qdata;
0230     while (*qdata)
0231       ++qdata;
0232 
0233     qv.message = ++qdata;
0234     while (*qdata)
0235       ++qdata;
0236     ++qdata;
0237   }
0238 }
0239 
0240 #if 0
0241 // Deserialise a ROOT object from a buffer at the current position.
0242 static TObject *
0243 extractNextObject(TBufferFile &buf)
0244 {
0245   if (buf.Length() == buf.BufferSize())
0246     return 0;
0247 
0248   buf.InitMap();
0249   Int_t pos = buf.Length();
0250   TClass *c = buf.ReadClass();
0251   buf.SetBufferOffset(pos);
0252   buf.ResetMap();
0253   return c ? buf.ReadObject(c) : 0;
0254 }
0255 
0256 // Reconstruct an object from the raw data.
0257 bool
0258 DQMNet::reconstructObject(Object &o)
0259 {
0260   TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
0261   buf.Reset();
0262 
0263   // Extract the main object.
0264   if (! (o.object = extractNextObject(buf)))
0265     return false;
0266   
0267   // Extract the reference object.
0268   o.reference = extractNextObject(buf);
0269 
0270   // Extract quality reports.
0271   unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
0272   return true;
0273 }
0274 #endif
0275 
0276 #if 0
0277 bool
0278 DQMNet::reinstateObject(DQMStore *store, Object &o)
0279 {
0280   if (! reconstructObject (o))
0281     return false;
0282 
0283   // Reconstruct the main object
0284   MonitorElement *obj = 0;
0285   store->setCurrentFolder(o.dirname);
0286   switch (o.flags & DQM_PROP_TYPE_MASK)
0287   {
0288   case DQM_PROP_TYPE_INT:
0289     obj = store->bookInt(o.objname);
0290     obj->Fill(atoll(o.scalar.c_str()));
0291     break;
0292 
0293   case DQM_PROP_TYPE_REAL:
0294     obj = store->bookFloat(name);
0295     obj->Fill(atof(o.scalar.c_str()));
0296     break;
0297 
0298   case DQM_PROP_TYPE_STRING:
0299     obj = store->bookString(name, o.scalar);
0300     break;
0301 
0302   case DQM_PROP_TYPE_TH1F:
0303     obj = store->book1D(name, dynamic_cast<TH1F *>(o.object));
0304     break;
0305 
0306   case DQM_PROP_TYPE_TH1S:
0307     obj = store->book1S(name, dynamic_cast<TH1S *>(o.object));
0308     break;
0309 
0310   case DQM_PROP_TYPE_TH1D:
0311     obj = store->book1DD(name, dynamic_cast<TH1D *>(o.object));
0312     break;
0313 
0314   case DQM_PROP_TYPE_TH1I:
0315     obj = store->book1I(name, dynamic_cast<TH1I *>(o.object));
0316     break;
0317 
0318   case DQM_PROP_TYPE_TH2F:
0319     obj = store->book2D(name, dynamic_cast<TH2F *>(o.object));
0320     break;
0321 
0322   case DQM_PROP_TYPE_TH2S:
0323     obj = store->book2S(name, dynamic_cast<TH2S *>(o.object));
0324     break;
0325 
0326   case DQM_PROP_TYPE_TH2D:
0327     obj = store->book2DD(name, dynamic_cast<TH2D *>(o.object));
0328     break;
0329 
0330  case DQM_PROP_TYPE_TH2I:
0331     obj = store->book2I(name, dynamic_cast<TH2I *>(o.object));
0332     break;
0333 
0334   case DQM_PROP_TYPE_TH3F:
0335     obj = store->book3D(name, dynamic_cast<TH3F *>(o.object));
0336     break;
0337 
0338   case DQM_PROP_TYPE_TH3S:
0339     obj = store->book3S(name, dynamic_cast<TH3S *>(o.object));
0340     break;
0341 
0342   case DQM_PROP_TYPE_TH3D:
0343     obj = store->book3DD(name, dynamic_cast<TH3D *>(o.object));
0344     break;
0345 
0346   case DQM_PROP_TYPE_PROF:
0347     obj = store->bookProfile(name, dynamic_cast<TProfile *>(o.object));
0348     break;
0349 
0350   case DQM_PROP_TYPE_PROF2D:
0351     obj = store->bookProfile2D(name, dynamic_cast<TProfile2D *>(o.object));
0352     break;
0353 
0354   default:
0355     logme()
0356       << "ERROR: unexpected monitor element of type "
0357       << (o.flags & DQM_PROP_TYPE_MASK) << " called '"
0358       << o.dirname << '/' << o.objname << "'\n";
0359     return false;
0360   }
0361 
0362   // Reconstruct tag and qreports.
0363   if (obj)
0364   {
0365     obj->data_.tag = o.tag;
0366     obj->data_.qreports = o.qreports;
0367   }
0368 
0369   // Inidicate success.
0370   return true;
0371 }
0372 #endif
0373 
0374 //////////////////////////////////////////////////////////////////////
0375 // Check if the network layer should stop.
0376 bool DQMNet::shouldStop() { return shutdown_; }
0377 
0378 // Once an object has been updated, this is invoked for all waiting
0379 // peers.  Send the requested object to the waiting peer.
0380 void DQMNet::releaseFromWait(Bucket *msg, WaitObject &w, Object *o) {
0381   if (o)
0382     sendObjectToPeer(msg, *o, true);
0383   else {
0384     uint32_t words[3];
0385     words[0] = sizeof(words) + w.name.size();
0386     words[1] = DQM_REPLY_NONE;
0387     words[2] = w.name.size();
0388 
0389     msg->data.reserve(msg->data.size() + words[0]);
0390     copydata(msg, &words[0], sizeof(words));
0391     copydata(msg, &w.name[0], w.name.size());
0392   }
0393 }
0394 
0395 // Send an object to a peer.  If not @a data, only sends a summary
0396 // without the object data, except the data is always sent for scalar
0397 // objects.
0398 void DQMNet::sendObjectToPeer(Bucket *msg, Object &o, bool data) {
0399   uint32_t flags = o.flags & ~DQM_PROP_DEAD;
0400   DataBlob objdata;
0401 
0402   if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
0403     objdata.insert(objdata.end(), &o.scalar[0], &o.scalar[0] + o.scalar.size());
0404   else if (data)
0405     objdata.insert(objdata.end(), &o.rawdata[0], &o.rawdata[0] + o.rawdata.size());
0406 
0407   uint32_t words[9];
0408   uint32_t namelen = o.dirname.size() + o.objname.size() + 1;
0409   uint32_t datalen = objdata.size();
0410   uint32_t qlen = o.qdata.size();
0411 
0412   if (o.dirname.empty())
0413     --namelen;
0414 
0415   words[0] = 9 * sizeof(uint32_t) + namelen + datalen + qlen;
0416   words[1] = DQM_REPLY_OBJECT;
0417   words[2] = flags;
0418   words[3] = (o.version >> 0) & 0xffffffff;
0419   words[4] = (o.version >> 32) & 0xffffffff;
0420   words[5] = o.tag;
0421   words[6] = namelen;
0422   words[7] = datalen;
0423   words[8] = qlen;
0424 
0425   msg->data.reserve(msg->data.size() + words[0]);
0426   copydata(msg, &words[0], 9 * sizeof(uint32_t));
0427   if (namelen) {
0428     copydata(msg, &(o.dirname)[0], o.dirname.size());
0429     if (!o.dirname.empty())
0430       copydata(msg, "/", 1);
0431     copydata(msg, &o.objname[0], o.objname.size());
0432   }
0433   if (datalen)
0434     copydata(msg, &objdata[0], datalen);
0435   if (qlen)
0436     copydata(msg, &o.qdata[0], qlen);
0437 }
0438 
0439 //////////////////////////////////////////////////////////////////////
0440 // Handle peer messages.
0441 bool DQMNet::onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len) {
0442   // Decode and process this message.
0443   uint32_t type;
0444   memcpy(&type, data + sizeof(uint32_t), sizeof(type));
0445   switch (type) {
0446     case DQM_MSG_UPDATE_ME: {
0447       if (len != 2 * sizeof(uint32_t)) {
0448         logme() << "ERROR: corrupt 'UPDATE_ME' message of length " << len << " from peer " << p->peeraddr << std::endl;
0449         return false;
0450       }
0451 
0452       if (debug_)
0453         logme() << "DEBUG: received message 'UPDATE ME' from peer " << p->peeraddr << ", size " << len << std::endl;
0454 
0455       p->update = true;
0456     }
0457       return true;
0458 
0459     case DQM_MSG_LIST_OBJECTS: {
0460       if (debug_)
0461         logme() << "DEBUG: received message 'LIST OBJECTS' from peer " << p->peeraddr << ", size " << len << std::endl;
0462 
0463       // Send over current status: list of known objects.
0464       sendObjectListToPeer(msg, true, false);
0465     }
0466       return true;
0467 
0468     case DQM_MSG_GET_OBJECT: {
0469       if (debug_)
0470         logme() << "DEBUG: received message 'GET OBJECT' from peer " << p->peeraddr << ", size " << len << std::endl;
0471 
0472       if (len < 3 * sizeof(uint32_t)) {
0473         logme() << "ERROR: corrupt 'GET IMAGE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0474         return false;
0475       }
0476 
0477       uint32_t namelen;
0478       memcpy(&namelen, data + 2 * sizeof(uint32_t), sizeof(namelen));
0479       if (len != 3 * sizeof(uint32_t) + namelen) {
0480         logme() << "ERROR: corrupt 'GET OBJECT' message of length " << len << " from peer " << p->peeraddr
0481                 << ", expected length " << (3 * sizeof(uint32_t)) << " + " << namelen << std::endl;
0482         return false;
0483       }
0484 
0485       std::string name((char *)data + 3 * sizeof(uint32_t), namelen);
0486       Peer *owner = nullptr;
0487       Object *o = findObject(nullptr, name, &owner);
0488       if (o) {
0489         o->lastreq = Time::current().ns();
0490         if ((o->rawdata.empty() || (o->flags & DQM_PROP_STALE)) &&
0491             (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0492           waitForData(p, name, "", owner);
0493         else
0494           sendObjectToPeer(msg, *o, true);
0495       } else {
0496         uint32_t words[3];
0497         words[0] = sizeof(words) + name.size();
0498         words[1] = DQM_REPLY_NONE;
0499         words[2] = name.size();
0500 
0501         msg->data.reserve(msg->data.size() + words[0]);
0502         copydata(msg, &words[0], sizeof(words));
0503         copydata(msg, &name[0], name.size());
0504       }
0505     }
0506       return true;
0507 
0508     case DQM_REPLY_LIST_BEGIN: {
0509       if (len != 4 * sizeof(uint32_t)) {
0510         logme() << "ERROR: corrupt 'LIST BEGIN' message of length " << len << " from peer " << p->peeraddr << std::endl;
0511         return false;
0512       }
0513 
0514       // Get the update status: whether this is a full update.
0515       uint32_t flags;
0516       memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0517 
0518       if (debug_)
0519         logme() << "DEBUG: received message 'LIST BEGIN " << (flags ? "FULL" : "INCREMENTAL") << "' from "
0520                 << p->peeraddr << ", size " << len << std::endl;
0521 
0522       // If we are about to receive a full list of objects, flag all
0523       // objects as possibly dead.  Subsequent object notifications
0524       // will undo this for the live objects.  We cannot delete
0525       // objects quite yet, as we may get inquiry from another client
0526       // while we are processing the incoming list, so we keep the
0527       // objects tentatively alive as long as we've not seen the end.
0528       if (flags)
0529         markObjectsDead(p);
0530     }
0531       return true;
0532 
0533     case DQM_REPLY_LIST_END: {
0534       if (len != 4 * sizeof(uint32_t)) {
0535         logme() << "ERROR: corrupt 'LIST END' message of length " << len << " from peer " << p->peeraddr << std::endl;
0536         return false;
0537       }
0538 
0539       // Get the update status: whether this is a full update.
0540       uint32_t flags;
0541       memcpy(&flags, data + 3 * sizeof(uint32_t), sizeof(uint32_t));
0542 
0543       // If we received a full list of objects, now purge all dead
0544       // objects. We need to do this in two stages in case we receive
0545       // updates in many parts, and end up sending updates to others in
0546       // between; this avoids us lying live objects are dead.
0547       if (flags)
0548         purgeDeadObjects(p);
0549 
0550       if (debug_)
0551         logme() << "DEBUG: received message 'LIST END " << (flags ? "FULL" : "INCREMENTAL") << "' from " << p->peeraddr
0552                 << ", size " << len << std::endl;
0553 
0554       // Indicate we have received another update from this peer.
0555       // Also indicate we should flush to our clients.
0556       flush_ = true;
0557       p->updates++;
0558     }
0559       return true;
0560 
0561     case DQM_REPLY_OBJECT: {
0562       uint32_t words[9];
0563       if (len < sizeof(words)) {
0564         logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr << std::endl;
0565         return false;
0566       }
0567 
0568       memcpy(&words[0], data, sizeof(words));
0569       uint32_t &namelen = words[6];
0570       uint32_t &datalen = words[7];
0571       uint32_t &qlen = words[8];
0572 
0573       if (len != sizeof(words) + namelen + datalen + qlen) {
0574         logme() << "ERROR: corrupt 'OBJECT' message of length " << len << " from peer " << p->peeraddr
0575                 << ", expected length " << sizeof(words) << " + " << namelen << " + " << datalen << " + " << qlen
0576                 << std::endl;
0577         return false;
0578       }
0579 
0580       unsigned char *namedata = data + sizeof(words);
0581       unsigned char *objdata = namedata + namelen;
0582       unsigned char *qdata = objdata + datalen;
0583       unsigned char *enddata = qdata + qlen;
0584       std::string name((char *)namedata, namelen);
0585       assert(enddata == data + len);
0586 
0587       if (debug_)
0588         logme() << "DEBUG: received message 'OBJECT " << name << "' from " << p->peeraddr << ", size " << len
0589                 << std::endl;
0590 
0591       // Mark the peer as a known object source.
0592       p->source = true;
0593 
0594       // Initialise or update an object entry.
0595       Object *o = findObject(p, name);
0596       if (!o)
0597         o = makeObject(p, name);
0598 
0599       o->flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
0600       o->tag = words[5];
0601       o->version = ((uint64_t)words[4] << 32 | words[3]);
0602       o->scalar.clear();
0603       o->qdata.clear();
0604       if ((o->flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
0605         o->rawdata.clear();
0606         o->scalar.insert(o->scalar.end(), objdata, qdata);
0607       } else if (datalen) {
0608         o->rawdata.clear();
0609         o->rawdata.insert(o->rawdata.end(), objdata, qdata);
0610       } else if (!o->rawdata.empty())
0611         o->flags |= DQM_PROP_STALE;
0612       o->qdata.insert(o->qdata.end(), qdata, enddata);
0613 
0614       // If we had an object for this one already and this is a list
0615       // update without data, issue an immediate data get request.
0616       if (o->lastreq && !datalen && (o->flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
0617         requestObjectData(p, (namelen ? &name[0] : nullptr), namelen);
0618 
0619       // If we have the object data, release from wait.
0620       if (datalen)
0621         releaseWaiters(name, o);
0622     }
0623       return true;
0624 
0625     case DQM_REPLY_NONE: {
0626       uint32_t words[3];
0627       if (len < sizeof(words)) {
0628         logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr << std::endl;
0629         return false;
0630       }
0631 
0632       memcpy(&words[0], data, sizeof(words));
0633       uint32_t &namelen = words[2];
0634 
0635       if (len != sizeof(words) + namelen) {
0636         logme() << "ERROR: corrupt 'NONE' message of length " << len << " from peer " << p->peeraddr
0637                 << ", expected length " << sizeof(words) << " + " << namelen << std::endl;
0638         return false;
0639       }
0640 
0641       unsigned char *namedata = data + sizeof(words);
0642       std::string name((char *)namedata, namelen);
0643 
0644       if (debug_)
0645         logme() << "DEBUG: received message 'NONE " << name << "' from " << p->peeraddr << ", size " << len
0646                 << std::endl;
0647 
0648       // Mark the peer as a known object source.
0649       p->source = true;
0650 
0651       // If this was a known object, kill it.
0652       if (Object *o = findObject(p, name)) {
0653         o->flags |= DQM_PROP_DEAD;
0654         purgeDeadObjects(p);
0655       }
0656 
0657       // If someone was waiting for this, let them go.
0658       releaseWaiters(name, nullptr);
0659     }
0660       return true;
0661 
0662     default:
0663       logme() << "ERROR: unrecognised message of length " << len << " and type " << type << " from peer " << p->peeraddr
0664               << std::endl;
0665       return false;
0666   }
0667 }
0668 
0669 //////////////////////////////////////////////////////////////////////
0670 /// Handle communication to a particular client.
0671 bool DQMNet::onPeerData(IOSelectEvent *ev, Peer *p) {
0672   lock();
0673   assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
0674 
0675   // If there is a problem with the peer socket, discard the peer
0676   // and tell the selector to stop prcessing events for it.  If
0677   // this is a server connection, we will eventually recreate
0678   // everything if/when the data server comes back.
0679   if (ev->events & IOUrgent) {
0680     if (p->automatic) {
0681       logme() << "WARNING: connection to the DQM server at " << p->peeraddr
0682               << " lost (will attempt to reconnect in 15 seconds)\n";
0683       losePeer(nullptr, p, ev);
0684     } else
0685       losePeer("WARNING: lost peer connection ", p, ev);
0686 
0687     unlock();
0688     return true;
0689   }
0690 
0691   // If we can write to the peer socket, pump whatever we can into it.
0692   if (ev->events & IOWrite) {
0693     while (Bucket *b = p->sendq) {
0694       IOSize len = b->data.size() - p->sendpos;
0695       const void *data = (len ? (const void *)&b->data[p->sendpos] : (const void *)&data);
0696       IOSize done;
0697 
0698       try {
0699         done = (len ? ev->source->write(data, len) : 0);
0700         if (debug_ && len)
0701           logme() << "DEBUG: sent " << done << " bytes to peer " << p->peeraddr << std::endl;
0702       } catch (Error &e) {
0703         losePeer("WARNING: unable to write to peer ", p, ev, &e);
0704         unlock();
0705         return true;
0706       }
0707 
0708       p->sendpos += done;
0709       if (p->sendpos == b->data.size()) {
0710         Bucket *old = p->sendq;
0711         p->sendq = old->next;
0712         p->sendpos = 0;
0713         old->next = nullptr;
0714         discard(old);
0715       }
0716 
0717       if (!done && len)
0718         // Cannot write any more.
0719         break;
0720     }
0721   }
0722 
0723   // If there is data to be read from the peer, first receive what we
0724   // can get out the socket, the process all complete requests.
0725   if (ev->events & IORead) {
0726     // First build up the incoming buffer of data in the socket.
0727     // Remember the last size returned by the socket; we need
0728     // it to determine if the remote end closed the connection.
0729     IOSize sz;
0730     try {
0731       std::vector<unsigned char> buf(SOCKET_READ_SIZE);
0732       do
0733         if ((sz = ev->source->read(&buf[0], buf.size()))) {
0734           if (debug_)
0735             logme() << "DEBUG: received " << sz << " bytes from peer " << p->peeraddr << std::endl;
0736           DataBlob &data = p->incoming;
0737           if (data.capacity() < data.size() + sz)
0738             data.reserve(data.size() + SOCKET_READ_GROWTH);
0739           data.insert(data.end(), &buf[0], &buf[0] + sz);
0740         }
0741       while (sz == sizeof(buf));
0742     } catch (Error &e) {
0743       auto *next = dynamic_cast<SystemError *>(e.next());
0744       if (next && next->portable() == SysErr::ErrTryAgain)
0745         sz = 1;  // Ignore it, and fake no end of data.
0746       else {
0747         // Houston we have a problem.
0748         losePeer("WARNING: failed to read from peer ", p, ev, &e);
0749         unlock();
0750         return true;
0751       }
0752     }
0753 
0754     // Process fully received messages as long as we can.
0755     size_t consumed = 0;
0756     DataBlob &data = p->incoming;
0757     while (data.size() - consumed >= sizeof(uint32_t) && p->waiting < MAX_PEER_WAITREQS) {
0758       uint32_t msglen;
0759       memcpy(&msglen, &data[0] + consumed, sizeof(msglen));
0760 
0761       if (msglen >= MESSAGE_SIZE_LIMIT) {
0762         losePeer("WARNING: excessively large message from ", p, ev);
0763         unlock();
0764         return true;
0765       }
0766 
0767       if (data.size() - consumed >= msglen) {
0768         bool valid = true;
0769         if (msglen < 2 * sizeof(uint32_t)) {
0770           logme() << "ERROR: corrupt peer message of length " << msglen << " from peer " << p->peeraddr << std::endl;
0771           valid = false;
0772         } else {
0773           // Decode and process this message.
0774           Bucket msg;
0775           msg.next = nullptr;
0776           valid = onMessage(&msg, p, &data[0] + consumed, msglen);
0777 
0778           // If we created a response, chain it to the write queue.
0779           if (!msg.data.empty()) {
0780             Bucket **prev = &p->sendq;
0781             while (*prev)
0782               prev = &(*prev)->next;
0783 
0784             *prev = new Bucket;
0785             (*prev)->next = nullptr;
0786             (*prev)->data.swap(msg.data);
0787           }
0788         }
0789 
0790         if (!valid) {
0791           losePeer("WARNING: data stream error with ", p, ev);
0792           unlock();
0793           return true;
0794         }
0795 
0796         consumed += msglen;
0797       } else
0798         break;
0799     }
0800 
0801     data.erase(data.begin(), data.begin() + consumed);
0802 
0803     // If the client has closed the connection, shut down our end.  If
0804     // we have something to send back still, leave the write direction
0805     // open.  Otherwise close the shop for this client.
0806     if (sz == 0)
0807       sel_.setMask(p->socket, p->mask &= ~IORead);
0808   }
0809 
0810   // Yes, please keep processing events for this socket.
0811   unlock();
0812   return false;
0813 }
0814 
0815 /** Respond to new connections on the server socket.  Accepts the
0816     connection and creates a new socket for the peer, and sets it up
0817     for further communication.  Returns @c false always to tell the
0818     IOSelector to keep processing events for the server socket.  */
0819 bool DQMNet::onPeerConnect(IOSelectEvent *ev) {
0820   // Recover the server socket.
0821   assert(ev->source == server_);
0822 
0823   // Accept the connection.
0824   Socket *s = server_->accept();
0825   assert(s);
0826   assert(!s->isBlocking());
0827 
0828   // Record it to our list of peers.
0829   lock();
0830   Peer *p = createPeer(s);
0831   std::string localaddr;
0832   if (auto *inet = dynamic_cast<InetSocket *>(s)) {
0833     InetAddress peeraddr = inet->peername();
0834     InetAddress myaddr = inet->sockname();
0835     p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
0836     localaddr = StringFormat("%1:%2").arg(myaddr.hostname()).arg(myaddr.port()).value();
0837   } else if (auto *local = dynamic_cast<LocalSocket *>(s)) {
0838     p->peeraddr = local->peername().path();
0839     localaddr = local->sockname().path();
0840   } else
0841     assert(false);
0842 
0843   p->mask = IORead | IOUrgent;
0844   p->socket = s;
0845 
0846   // Report the new connection.
0847   if (debug_)
0848     logme() << "INFO: new peer " << p->peeraddr << " is now connected to " << localaddr << std::endl;
0849 
0850   // Attach it to the listener.
0851   sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
0852   unlock();
0853 
0854   // We are never done.
0855   return false;
0856 }
0857 
0858 /** React to notifications from the DQM thread.  This is a simple
0859     message to tell this thread to wake up and send unsollicited
0860     updates to the peers when new DQM data appears.  We don't send
0861     the updates here, but just set a flag to tell the main event
0862     pump to send a notification later.  This avoids sending
0863     unnecessarily frequent DQM object updates.  */
0864 bool DQMNet::onLocalNotify(IOSelectEvent *ev) {
0865   // Discard the data in the pipe, we care only about the wakeup.
0866   try {
0867     IOSize sz;
0868     unsigned char buf[1024];
0869     while ((sz = ev->source->read(buf, sizeof(buf))))
0870       ;
0871   } catch (Error &e) {
0872     auto *next = dynamic_cast<SystemError *>(e.next());
0873     if (next && next->portable() == SysErr::ErrTryAgain)
0874       ;  // Ignore it
0875     else
0876       logme() << "WARNING: error reading from notification pipe: " << e.explain() << std::endl;
0877   }
0878 
0879   // Tell the main event pump to send an update in a little while.
0880   flush_ = true;
0881 
0882   // We are never done, always keep going.
0883   return false;
0884 }
0885 
0886 /// Update the selector mask for a peer based on data queues.  Close
0887 /// the connection if there is no reason to maintain it open.
0888 void DQMNet::updateMask(Peer *p) {
0889   if (!p->socket)
0890     return;
0891 
0892   // Listen to writes iff we have data to send.
0893   unsigned oldmask = p->mask;
0894   if (!p->sendq && (p->mask & IOWrite))
0895     sel_.setMask(p->socket, p->mask &= ~IOWrite);
0896 
0897   if (p->sendq && !(p->mask & IOWrite))
0898     sel_.setMask(p->socket, p->mask |= IOWrite);
0899 
0900   if (debug_ && oldmask != p->mask)
0901     logme() << "DEBUG: updating mask for " << p->peeraddr << " to " << p->mask << " from " << oldmask << std::endl;
0902 
0903   // If we have nothing more to send and are no longer listening
0904   // for reads, close up the shop for this peer.
0905   if (p->mask == IOUrgent && !p->waiting) {
0906     assert(!p->sendq);
0907     if (debug_)
0908       logme() << "INFO: connection closed to " << p->peeraddr << std::endl;
0909     losePeer(nullptr, p, nullptr);
0910   }
0911 }
0912 
0913 //////////////////////////////////////////////////////////////////////
0914 DQMNet::DQMNet(const std::string &appname /* = "" */)
0915     : debug_(false),
0916       appname_(appname.empty() ? "DQMNet" : appname.c_str()),
0917       pid_(getpid()),
0918       server_(nullptr),
0919       version_(Time::current()),
0920       communicate_((pthread_t)-1),
0921       shutdown_(0),
0922       delay_(1000),
0923       waitStale_(0, 0, 0, 0, 500000000 /* 500 ms */),
0924       waitMax_(0, 0, 0, 5 /* seconds */, 0),
0925       flush_(false) {
0926   // Create a pipe for the local DQM to tell the communicator
0927   // thread that local DQM data has changed and that the peers
0928   // should be notified.
0929   fcntl(wakeup_.source()->fd(), F_SETFL, O_RDONLY | O_NONBLOCK);
0930   sel_.attach(wakeup_.source(), IORead, CreateHook(this, &DQMNet::onLocalNotify));
0931 
0932   // Initialise the upstream and downstream to empty.
0933   upstream_.peer = downstream_.peer = nullptr;
0934   upstream_.next = downstream_.next = 0;
0935   upstream_.port = downstream_.port = 0;
0936   upstream_.update = downstream_.update = false;
0937 }
0938 
0939 DQMNet::~DQMNet() {
0940   // FIXME
0941 }
0942 
0943 /// Enable or disable verbose debugging.  Must be called before
0944 /// calling run() or start().
0945 void DQMNet::debug(bool doit) { debug_ = doit; }
0946 
0947 /// Set the I/O dispatching delay.  Must be called before calling
0948 /// run() or start().
0949 void DQMNet::delay(int delay) { delay_ = delay; }
0950 
0951 /// Set the time limit for waiting updates to stale objects.
0952 /// Once limit has been exhausted whatever data exists is returned.
0953 /// Applies only when data has been received, another time limit is
0954 /// applied when no data payload has been received at all.
0955 void DQMNet::staleObjectWaitLimit(lat::TimeSpan time) { waitStale_ = time; }
0956 
0957 /// Start a server socket for accessing this DQM node remotely.  Must
0958 /// be called before calling run() or start().  May throw an Exception
0959 /// if the server socket cannot be initialised.
0960 void DQMNet::startLocalServer(int port) {
0961   if (server_) {
0962     logme() << "ERROR: DQM server was already started.\n";
0963     return;
0964   }
0965 
0966   try {
0967     InetAddress addr("0.0.0.0", port);
0968     auto *s = new InetSocket(SOCK_STREAM, 0, addr.family());
0969     s->bind(addr);
0970     s->listen(10);
0971     s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
0972     s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
0973     s->setBlocking(false);
0974     sel_.attach(server_ = s, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
0975   } catch (Error &e) {
0976     // FIXME: Do we need to do this when we throw an exception anyway?
0977     // FIXME: Abort instead?
0978     logme() << "ERROR: Failed to start server at port " << port << ": " << e.explain() << std::endl;
0979 
0980     throw cms::Exception("DQMNet::startLocalServer") << "Failed to start server at port " <<
0981 
0982         port << ": " << e.explain().c_str();
0983   }
0984 
0985   logme() << "INFO: DQM server started at port " << port << std::endl;
0986 }
0987 
0988 /// Start a server socket for accessing this DQM node over a file
0989 /// system socket.  Must be called before calling run() or start().
0990 /// May throw an Exception if the server socket cannot be initialised.
0991 void DQMNet::startLocalServer(const char *path) {
0992   if (server_) {
0993     logme() << "ERROR: DQM server was already started.\n";
0994     return;
0995   }
0996 
0997   try {
0998     server_ = new LocalServerSocket(path, 10);
0999     server_->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1000     server_->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1001     server_->setBlocking(false);
1002     sel_.attach(server_, IOAccept, CreateHook(this, &DQMNet::onPeerConnect));
1003   } catch (Error &e) {
1004     // FIXME: Do we need to do this when we throw an exception anyway?
1005     // FIXME: Abort instead?
1006     logme() << "ERROR: Failed to start server at path " << path << ": " << e.explain() << std::endl;
1007 
1008     throw cms::Exception("DQMNet::startLocalServer")
1009         << "Failed to start server at path " << path << ": " << e.explain().c_str();
1010   }
1011 
1012   logme() << "INFO: DQM server started at path " << path << std::endl;
1013 }
1014 
1015 /// Tell the network layer to connect to @a host and @a port and
1016 /// automatically send updates whenever local DQM data changes.  Must
1017 /// be called before calling run() or start().
1018 void DQMNet::updateToCollector(const std::string &host, int port) {
1019   if (!downstream_.host.empty()) {
1020     logme() << "ERROR: Already updating another collector at " << downstream_.host << ":" << downstream_.port
1021             << std::endl;
1022     return;
1023   }
1024 
1025   downstream_.update = true;
1026   downstream_.host = host;
1027   downstream_.port = port;
1028 }
1029 
1030 /// Tell the network layer to connect to @a host and @a port and
1031 /// automatically receive updates from upstream DQM sources.  Must be
1032 /// called before calling run() or start().
1033 void DQMNet::listenToCollector(const std::string &host, int port) {
1034   if (!upstream_.host.empty()) {
1035     logme() << "ERROR: Already receiving data from another collector at " << upstream_.host << ":" << upstream_.port
1036             << std::endl;
1037     return;
1038   }
1039 
1040   upstream_.update = false;
1041   upstream_.host = host;
1042   upstream_.port = port;
1043 }
1044 
1045 /// Stop the network layer and wait it to finish.
1046 void DQMNet::shutdown() {
1047   shutdown_ = 1;
1048   if (communicate_ != (pthread_t)-1)
1049     pthread_join(communicate_, nullptr);
1050 }
1051 
1052 /** A thread to communicate with the distributed memory cache peers.
1053     All this does is run the loop to respond to new connections.
1054     Much of the actual work is done when a new connection is
1055     received, and in pumping data around in response to actual
1056     requests.  */
1057 static void *communicate(void *obj) {
1058   sigset_t sigs;
1059   sigfillset(&sigs);
1060   pthread_sigmask(SIG_BLOCK, &sigs, nullptr);
1061   ((DQMNet *)obj)->run();
1062   return nullptr;
1063 }
1064 
1065 /// Acquire a lock on the DQM net layer.
1066 void DQMNet::lock() {
1067   if (communicate_ != (pthread_t)-1)
1068     pthread_mutex_lock(&lock_);
1069 }
1070 
1071 /// Release the lock on the DQM net layer.
1072 void DQMNet::unlock() {
1073   if (communicate_ != (pthread_t)-1)
1074     pthread_mutex_unlock(&lock_);
1075 }
1076 
1077 /// Start running the network layer in a new thread.  This is an
1078 /// exclusive alternative to the run() method, which runs the network
1079 /// layer in the caller's thread.
1080 void DQMNet::start() {
1081   if (communicate_ != (pthread_t)-1) {
1082     logme() << "ERROR: DQM networking thread has already been started\n";
1083     return;
1084   }
1085 
1086   pthread_mutex_init(&lock_, nullptr);
1087   pthread_create(&communicate_, nullptr, &communicate, this);
1088 }
1089 
1090 /** Run the actual I/O processing loop. */
1091 void DQMNet::run() {
1092   Time now;
1093   Time nextFlush = 0;
1094   AutoPeer *automatic[2] = {&upstream_, &downstream_};
1095 
1096   // Perform I/O.  Every once in a while flush updates to peers.
1097   while (!shouldStop()) {
1098     for (auto ap : automatic) {
1099       // If we need a server connection and don't have one yet,
1100       // initiate asynchronous connection creation.  Swallow errors
1101       // in case the server won't talk to us.
1102       if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1103         ap->next = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1104         InetSocket *s = nullptr;
1105         try {
1106           InetAddress addr(ap->host.c_str(), ap->port);
1107           s = new InetSocket(SOCK_STREAM, 0, addr.family());
1108           s->setBlocking(false);
1109           s->connect(addr);
1110           s->setopt(SO_SNDBUF, SOCKET_BUF_SIZE);
1111           s->setopt(SO_RCVBUF, SOCKET_BUF_SIZE);
1112         } catch (Error &e) {
1113           auto *sys = dynamic_cast<SystemError *>(e.next());
1114           if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1115             // "In progress" just means the connection is in progress.
1116             // The connection is ready when the socket is writeable.
1117             // Anything else is a real problem.
1118             if (s)
1119               s->abort();
1120             delete s;
1121             s = nullptr;
1122           }
1123         }
1124 
1125         // Set up with the selector if we were successful.  If this is
1126         // the upstream collector, queue a request for updates.
1127         if (s) {
1128           Peer *p = createPeer(s);
1129           ap->peer = p;
1130 
1131           InetAddress peeraddr = ((InetSocket *)s)->peername();
1132           InetAddress myaddr = ((InetSocket *)s)->sockname();
1133           p->peeraddr = StringFormat("%1:%2").arg(peeraddr.hostname()).arg(peeraddr.port()).value();
1134           p->mask = IORead | IOWrite | IOUrgent;
1135           p->update = ap->update;
1136           p->automatic = ap;
1137           p->socket = s;
1138           sel_.attach(s, p->mask, CreateHook(this, &DQMNet::onPeerData, p));
1139           if (ap == &upstream_) {
1140             uint32_t words[4] = {2 * sizeof(uint32_t), DQM_MSG_LIST_OBJECTS, 2 * sizeof(uint32_t), DQM_MSG_UPDATE_ME};
1141             p->sendq = new Bucket;
1142             p->sendq->next = nullptr;
1143             copydata(p->sendq, words, sizeof(words));
1144           }
1145 
1146           // Report the new connection.
1147           if (debug_)
1148             logme() << "INFO: now connected to " << p->peeraddr << " from " << myaddr.hostname() << ":" << myaddr.port()
1149                     << std::endl;
1150         }
1151       }
1152     }
1153 
1154     // Pump events for a while.
1155     sel_.dispatch(delay_);
1156     now = Time::current();
1157     lock();
1158 
1159     // Check if flush is required.  Flush only if one is needed.
1160     // Always sends the full object list, but only rarely.
1161     if (flush_ && now > nextFlush) {
1162       flush_ = false;
1163       nextFlush = now + TimeSpan(0, 0, 0, 15 /* seconds */, 0);
1164       sendObjectListToPeers(true);
1165     }
1166 
1167     // Update the data server and peer selection masks.  If we
1168     // have no more data to send and listening for writes, remove
1169     // the write mask.  If we have something to write and aren't
1170     // listening for writes, start listening so we can send off
1171     // the data.
1172     updatePeerMasks();
1173 
1174     // Release peers that have been waiting for data for too long.
1175     Time waitold = now - waitMax_;
1176     Time waitstale = now - waitStale_;
1177     for (auto i = waiting_.begin(), e = waiting_.end(); i != e;) {
1178       Object *o = findObject(nullptr, i->name);
1179 
1180       // If we have (stale) object data, wait only up to stale limit.
1181       // Otherwise if we have no data at all, wait up to the max limit.
1182       if (i->time < waitold) {
1183         logme() << "WARNING: source not responding in " << (waitMax_.ns() * 1e-9) << "s to retrieval, releasing '"
1184                 << i->name << "' from wait, have " << (o ? o->rawdata.size() : 0) << " data available\n";
1185         releaseFromWait(i++, o);
1186       } else if (i->time < waitstale && o && (o->flags & DQM_PROP_STALE)) {
1187         logme() << "WARNING: source not responding in " << (waitStale_.ns() * 1e-9) << "s to update, releasing '"
1188                 << i->name << "' from wait, have " << o->rawdata.size() << " data available\n";
1189         releaseFromWait(i++, o);
1190       }
1191 
1192       // Keep it for now.
1193       else
1194         ++i;
1195     }
1196 
1197     unlock();
1198   }
1199 }
1200 
1201 // Tell the network cache that there have been local changes that
1202 // should be advertised to the downstream listeners.
1203 void DQMNet::sendLocalChanges() {
1204   char byte = 0;
1205   wakeup_.sink()->write(&byte, 1);
1206 }
1207 
1208 //////////////////////////////////////////////////////////////////////
1209 //////////////////////////////////////////////////////////////////////
1210 //////////////////////////////////////////////////////////////////////
1211 DQMBasicNet::DQMBasicNet(const std::string &appname /* = "" */) : DQMImplNet<DQMNet::Object>(appname) {
1212   local_ = static_cast<ImplPeer *>(createPeer((Socket *)-1));
1213 }
1214 
1215 /// Give a hint of how much capacity to allocate for local objects.
1216 void DQMBasicNet::reserveLocalSpace(uint32_t size) { local_->objs.resize(size); }
1217 
1218 /// Update the network cache for an object.  The caller must call
1219 /// sendLocalChanges() later to push out the changes.
1220 void DQMBasicNet::updateLocalObject(Object &o) {
1221   o.dirname = *local_->dirs.insert(o.dirname).first;
1222   std::pair<ObjectMap::iterator, bool> info(local_->objs.insert(o));
1223   if (!info.second) {
1224     // Somewhat hackish. Sets are supposedly immutable, but we
1225     // need to change the non-key parts of the object. Erasing
1226     // and re-inserting would produce too much memory churn.
1227     auto &old = const_cast<Object &>(*info.first);
1228     std::swap(old.flags, o.flags);
1229     std::swap(old.tag, o.tag);
1230     std::swap(old.version, o.version);
1231     std::swap(old.qreports, o.qreports);
1232     std::swap(old.rawdata, o.rawdata);
1233     std::swap(old.scalar, o.scalar);
1234     std::swap(old.qdata, o.qdata);
1235   }
1236 }
1237 
1238 /// Delete all local objects not in @a known.  Returns true if
1239 /// something was removed.  The caller must call sendLocalChanges()
1240 /// later to push out the changes.
1241 bool DQMBasicNet::removeLocalExcept(const std::set<std::string> &known) {
1242   size_t removed = 0;
1243   std::string path;
1244   ObjectMap::iterator i, e;
1245   for (i = local_->objs.begin(), e = local_->objs.end(); i != e;) {
1246     path.clear();
1247     path.reserve(i->dirname.size() + i->objname.size() + 2);
1248     path += i->dirname;
1249     if (!path.empty())
1250       path += '/';
1251     path += i->objname;
1252 
1253     if (!known.count(path))
1254       ++removed, local_->objs.erase(i++);
1255     else
1256       ++i;
1257   }
1258 
1259   return removed > 0;
1260 }