Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:09

0001 #ifndef DQMSERVICES_CORE_DQM_NET_H
0002 #define DQMSERVICES_CORE_DQM_NET_H
0003 
0004 #include "classlib/iobase/Socket.h"
0005 #include "classlib/iobase/IOSelector.h"
0006 #include "classlib/iobase/Pipe.h"
0007 #include "classlib/utils/Error.h"
0008 #include "classlib/utils/Time.h"
0009 #include <pthread.h>
0010 #include <cstdint>
0011 #include <csignal>
0012 #include <iostream>
0013 #include <vector>
0014 #include <string>
0015 #include <list>
0016 #include <map>
0017 #include <set>
0018 #include <unordered_set>
0019 
0020 // for definition of QValue
0021 #include "DataFormats/Histograms/interface/MonitorElementCollection.h"
0022 
0023 //class DQMStore;
0024 
0025 class DQMNet {
0026 public:
0027   static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
0028   static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
0029   static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
0030   static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
0031   static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
0032   static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
0033   static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
0034   static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
0035   static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
0036   static const uint32_t DQM_PROP_TYPE_TH1I = 0x00000013;
0037   static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
0038   static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
0039   static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
0040   static const uint32_t DQM_PROP_TYPE_TH2I = 0x00000023;
0041   static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
0042   static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
0043   static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
0044   static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
0045   static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
0046   static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
0047 
0048   static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
0049   static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
0050   static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
0051   static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
0052   static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
0053   static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER);
0054 
0055   static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
0056   static const uint32_t DQM_PROP_TAGGED = 0x00002000;
0057   static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
0058   static const uint32_t DQM_PROP_RESET = 0x00008000;
0059 
0060   static const uint32_t DQM_PROP_NEW = 0x00010000;
0061   static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
0062   static const uint32_t DQM_PROP_LUMI = 0x00040000;
0063   static const uint32_t DQM_PROP_DEAD = 0x00080000;
0064   static const uint32_t DQM_PROP_STALE = 0x00100000;
0065   static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
0066   static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
0067 
0068   static const uint32_t DQM_MSG_HELLO = 0;
0069   static const uint32_t DQM_MSG_UPDATE_ME = 1;
0070   static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
0071   static const uint32_t DQM_MSG_GET_OBJECT = 3;
0072 
0073   static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
0074   static const uint32_t DQM_REPLY_LIST_END = 102;
0075   static const uint32_t DQM_REPLY_NONE = 103;
0076   static const uint32_t DQM_REPLY_OBJECT = 104;
0077 
0078   static const uint32_t MAX_PEER_WAITREQS = 128;
0079 
0080   struct Peer;
0081   struct WaitObject;
0082 
0083   using QValue = MonitorElementData::QReport::QValue;
0084   using DataBlob = std::vector<unsigned char>;
0085   using QReports = std::vector<QValue>;
0086   using TagList = std::vector<uint32_t>;  // DEPRECATED
0087   using WaitList = std::list<WaitObject>;
0088 
0089   struct CoreObject {
0090     uint32_t flags;
0091     uint32_t tag;
0092     uint64_t version;
0093     uint32_t run;
0094     uint32_t lumi;
0095     uint32_t streamId;
0096     uint32_t moduleId;
0097     std::string dirname;
0098     std::string objname;
0099     QReports qreports;
0100   };
0101 
0102   struct Object : CoreObject {
0103     uint64_t hash;
0104     uint64_t lastreq;
0105     DataBlob rawdata;
0106     std::string scalar;
0107     std::string qdata;
0108   };
0109 
0110   struct Bucket {
0111     Bucket *next;
0112     DataBlob data;
0113   };
0114 
0115   struct WaitObject {
0116     lat::Time time;
0117     std::string name;
0118     std::string info;
0119     Peer *peer;
0120   };
0121 
0122   struct AutoPeer;
0123   struct Peer {
0124     std::string peeraddr;
0125     lat::Socket *socket;
0126     DataBlob incoming;
0127     Bucket *sendq;
0128     size_t sendpos;
0129 
0130     unsigned mask;
0131     bool source;
0132     bool update;
0133     bool updated;
0134     size_t updates;
0135     size_t waiting;
0136     AutoPeer *automatic;
0137   };
0138 
0139   struct AutoPeer {
0140     Peer *peer;
0141     lat::Time next;
0142     std::string host;
0143     int port;
0144     bool update;
0145   };
0146 
0147   DQMNet(const std::string &appname = "");
0148   virtual ~DQMNet();
0149 
0150   void debug(bool doit);
0151   void delay(int delay);
0152   void startLocalServer(int port);
0153   void startLocalServer(const char *path);
0154   void staleObjectWaitLimit(lat::TimeSpan time);
0155   void updateToCollector(const std::string &host, int port);
0156   void listenToCollector(const std::string &host, int port);
0157   void shutdown();
0158   void lock();
0159   void unlock();
0160 
0161   void start();
0162   void run();
0163 
0164   void sendLocalChanges();
0165 
0166   static bool setOrder(const CoreObject &a, const CoreObject &b) {
0167     if (a.run == b.run) {
0168       if (a.lumi == b.lumi) {
0169         if (a.streamId == b.streamId) {
0170           if (a.moduleId == b.moduleId) {
0171             if (a.dirname == b.dirname) {
0172               return a.objname < b.objname;
0173             }
0174             return a.dirname < b.dirname;
0175           }
0176           return a.moduleId < b.moduleId;
0177         }
0178         return a.streamId < b.streamId;
0179       }
0180       return a.lumi < b.lumi;
0181     }
0182     return a.run < b.run;
0183   }
0184 
0185   struct HashOp {
0186     uint32_t operator()(const Object &a) const { return a.hash; }
0187   };
0188 
0189   struct HashEqual {
0190     bool operator()(const Object &a, const Object &b) const {
0191       return a.hash == b.hash && a.dirname == b.dirname && a.objname == b.objname;
0192     }
0193   };
0194 
0195   static size_t dqmhash(const void *key, size_t keylen) {
0196     // Reduced version of Bob Jenkins' hash function at:
0197     //   http://www.burtleburtle.net/bob/c/lookup3.c
0198 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
0199 #define dqmhashmix(a, b, c) \
0200   {                         \
0201     a -= c;                 \
0202     a ^= dqmhashrot(c, 4);  \
0203     c += b;                 \
0204     b -= a;                 \
0205     b ^= dqmhashrot(a, 6);  \
0206     a += c;                 \
0207     c -= b;                 \
0208     c ^= dqmhashrot(b, 8);  \
0209     b += a;                 \
0210     a -= c;                 \
0211     a ^= dqmhashrot(c, 16); \
0212     c += b;                 \
0213     b -= a;                 \
0214     b ^= dqmhashrot(a, 19); \
0215     a += c;                 \
0216     c -= b;                 \
0217     c ^= dqmhashrot(b, 4);  \
0218     b += a;                 \
0219   }
0220 #define dqmhashfinal(a, b, c) \
0221   {                           \
0222     c ^= b;                   \
0223     c -= dqmhashrot(b, 14);   \
0224     a ^= c;                   \
0225     a -= dqmhashrot(c, 11);   \
0226     b ^= a;                   \
0227     b -= dqmhashrot(a, 25);   \
0228     c ^= b;                   \
0229     c -= dqmhashrot(b, 16);   \
0230     a ^= c;                   \
0231     a -= dqmhashrot(c, 4);    \
0232     b ^= a;                   \
0233     b -= dqmhashrot(a, 14);   \
0234     c ^= b;                   \
0235     c -= dqmhashrot(b, 24);   \
0236   }
0237 
0238     uint32_t a, b, c;
0239     a = b = c = 0xdeadbeef + (uint32_t)keylen;
0240     const auto *k = (const unsigned char *)key;
0241 
0242     // all but the last block: affect some bits of (a, b, c)
0243     while (keylen > 12) {
0244       a += k[0];
0245       a += ((uint32_t)k[1]) << 8;
0246       a += ((uint32_t)k[2]) << 16;
0247       a += ((uint32_t)k[3]) << 24;
0248       b += k[4];
0249       b += ((uint32_t)k[5]) << 8;
0250       b += ((uint32_t)k[6]) << 16;
0251       b += ((uint32_t)k[7]) << 24;
0252       c += k[8];
0253       c += ((uint32_t)k[9]) << 8;
0254       c += ((uint32_t)k[10]) << 16;
0255       c += ((uint32_t)k[11]) << 24;
0256       dqmhashmix(a, b, c);
0257       keylen -= 12;
0258       k += 12;
0259     }
0260 
0261     // last block: affect all 32 bits of (c); all case statements fall through
0262     switch (keylen) {
0263       case 12:
0264         c += ((uint32_t)k[11]) << 24;
0265         [[fallthrough]];
0266       case 11:
0267         c += ((uint32_t)k[10]) << 16;
0268         [[fallthrough]];
0269       case 10:
0270         c += ((uint32_t)k[9]) << 8;
0271         [[fallthrough]];
0272       case 9:
0273         c += k[8];
0274         [[fallthrough]];
0275       case 8:
0276         b += ((uint32_t)k[7]) << 24;
0277         [[fallthrough]];
0278       case 7:
0279         b += ((uint32_t)k[6]) << 16;
0280         [[fallthrough]];
0281       case 6:
0282         b += ((uint32_t)k[5]) << 8;
0283         [[fallthrough]];
0284       case 5:
0285         b += k[4];
0286         [[fallthrough]];
0287       case 4:
0288         a += ((uint32_t)k[3]) << 24;
0289         [[fallthrough]];
0290       case 3:
0291         a += ((uint32_t)k[2]) << 16;
0292         [[fallthrough]];
0293       case 2:
0294         a += ((uint32_t)k[1]) << 8;
0295         [[fallthrough]];
0296       case 1:
0297         a += k[0];
0298         break;
0299       case 0:
0300         return c;
0301     }
0302 
0303     dqmhashfinal(a, b, c);
0304     return c;
0305 #undef dqmhashrot
0306 #undef dqmhashmix
0307 #undef dqmhashfinal
0308   }
0309 
0310   static void packQualityData(std::string &into, const QReports &qr);
0311   static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
0312 
0313 protected:
0314   std::ostream &logme();
0315   static void copydata(Bucket *b, const void *data, size_t len);
0316   virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
0317 
0318   virtual bool shouldStop();
0319   void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
0320   virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
0321   virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
0322 
0323   // bool           reconstructObject(Object &o);
0324   // bool           reinstateObject(DQMStore *store, Object &o);
0325   virtual Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
0326   virtual Object *makeObject(Peer *p, const std::string &name) = 0;
0327   virtual void markObjectsDead(Peer *p) = 0;
0328   virtual void purgeDeadObjects(Peer *p) = 0;
0329 
0330   virtual Peer *getPeer(lat::Socket *s) = 0;
0331   virtual Peer *createPeer(lat::Socket *s) = 0;
0332   virtual void removePeer(Peer *p, lat::Socket *s) = 0;
0333   virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
0334   virtual void sendObjectListToPeers(bool all) = 0;
0335 
0336   void updateMask(Peer *p);
0337   virtual void updatePeerMasks() = 0;
0338   static void discard(Bucket *&b);
0339 
0340   bool debug_;
0341   pthread_mutex_t lock_;
0342 
0343 private:
0344   void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err = nullptr);
0345   void requestObjectData(Peer *p, const char *name, size_t len);
0346   void releaseFromWait(WaitList::iterator i, Object *o);
0347   void releaseWaiters(const std::string &name, Object *o);
0348 
0349   bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
0350   bool onPeerConnect(lat::IOSelectEvent *ev);
0351   bool onLocalNotify(lat::IOSelectEvent *ev);
0352 
0353   std::string appname_;
0354   int pid_;
0355 
0356   lat::IOSelector sel_;
0357   lat::Socket *server_;
0358   lat::Pipe wakeup_;
0359   lat::Time version_;
0360 
0361   AutoPeer upstream_;
0362   AutoPeer downstream_;
0363   WaitList waiting_;
0364 
0365   pthread_t communicate_;
0366   sig_atomic_t shutdown_;
0367 
0368   int delay_;
0369   lat::TimeSpan waitStale_;
0370   lat::TimeSpan waitMax_;
0371   bool flush_;
0372 
0373 public:
0374   // copying is not available
0375   DQMNet(const DQMNet &) = delete;
0376   DQMNet &operator=(const DQMNet &) = delete;
0377 };
0378 
0379 template <class ObjType>
0380 class DQMImplNet : public DQMNet {
0381 public:
0382   struct ImplPeer;
0383 
0384   using DirMap = std::set<std::string>;
0385   typedef std::unordered_set<ObjType, HashOp, HashEqual> ObjectMap;
0386   typedef std::map<lat::Socket *, ImplPeer> PeerMap;
0387   struct ImplPeer : Peer {
0388     ImplPeer() = default;
0389     ObjectMap objs;
0390     DirMap dirs;
0391   };
0392 
0393   DQMImplNet(const std::string &appname = "") : DQMNet(appname) {}
0394 
0395   ~DQMImplNet() override = default;
0396 
0397 protected:
0398   Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override {
0399     size_t slash = name.rfind('/');
0400     size_t dirpos = (slash == std::string::npos ? 0 : slash);
0401     size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0402     std::string path(name, 0, dirpos);
0403     ObjType proto;
0404     proto.hash = dqmhash(name.c_str(), name.size());
0405     proto.dirname = path;
0406     proto.objname.append(name, namepos, std::string::npos);
0407 
0408     typename ObjectMap::iterator pos;
0409     typename PeerMap::iterator i, e;
0410     if (owner)
0411       *owner = nullptr;
0412     if (p) {
0413       auto *ip = static_cast<ImplPeer *>(p);
0414       pos = ip->objs.find(proto);
0415       if (pos == ip->objs.end())
0416         return nullptr;
0417       else {
0418         if (owner)
0419           *owner = ip;
0420         return const_cast<ObjType *>(&*pos);
0421       }
0422     } else {
0423       for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
0424         pos = i->second.objs.find(proto);
0425         if (pos != i->second.objs.end()) {
0426           if (owner)
0427             *owner = &i->second;
0428           return const_cast<ObjType *>(&*pos);
0429         }
0430       }
0431       return nullptr;
0432     }
0433   }
0434 
0435   Object *makeObject(Peer *p, const std::string &name) override {
0436     auto *ip = static_cast<ImplPeer *>(p);
0437     size_t slash = name.rfind('/');
0438     size_t dirpos = (slash == std::string::npos ? 0 : slash);
0439     size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0440     ObjType o;
0441     o.flags = 0;
0442     o.tag = 0;
0443     o.version = 0;
0444     o.lastreq = 0;
0445     o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).first;
0446     o.objname.append(name, namepos, std::string::npos);
0447     o.hash = dqmhash(name.c_str(), name.size());
0448     return const_cast<ObjType *>(&*ip->objs.insert(o).first);
0449   }
0450 
0451   // Mark all the objects dead.  This is intended to be used when
0452   // starting to process a complete list of objects, in order to
0453   // flag the objects that need to be killed at the end.  After
0454   // call to this method, revive all live objects by removing the
0455   // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
0456   // to remove the dead ones.  This also turns off object request
0457   // for objects we've lost interest in.
0458   void markObjectsDead(Peer *p) override {
0459     uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
0460     auto *ip = static_cast<ImplPeer *>(p);
0461     typename ObjectMap::iterator i, e;
0462     for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i) {
0463       if (i->lastreq && i->lastreq < minreq)
0464         const_cast<ObjType &>(*i).lastreq = 0;
0465       const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
0466     }
0467   }
0468 
0469   // Mark remaining zombie objects as dead.  See markObjectsDead().
0470   void purgeDeadObjects(Peer *p) override {
0471     auto *ip = static_cast<ImplPeer *>(p);
0472     typename ObjectMap::iterator i, e;
0473     for (i = ip->objs.begin(), e = ip->objs.end(); i != e;) {
0474       if (i->flags & DQM_PROP_DEAD)
0475         ip->objs.erase(i++);
0476       else
0477         ++i;
0478     }
0479   }
0480 
0481   Peer *getPeer(lat::Socket *s) override {
0482     auto pos = peers_.find(s);
0483     auto end = peers_.end();
0484     return pos == end ? nullptr : &pos->second;
0485   }
0486 
0487   Peer *createPeer(lat::Socket *s) override {
0488     ImplPeer *ip = &peers_[s];
0489     ip->socket = nullptr;
0490     ip->sendq = nullptr;
0491     ip->sendpos = 0;
0492     ip->mask = 0;
0493     ip->source = false;
0494     ip->update = false;
0495     ip->updated = false;
0496     ip->updates = 0;
0497     ip->waiting = 0;
0498     ip->automatic = nullptr;
0499     return ip;
0500   }
0501 
0502   void removePeer(Peer *p, lat::Socket *s) override {
0503     auto *ip = static_cast<ImplPeer *>(p);
0504     bool needflush = !ip->objs.empty();
0505 
0506     typename ObjectMap::iterator i, e;
0507     for (i = ip->objs.begin(), e = ip->objs.end(); i != e;)
0508       ip->objs.erase(i++);
0509 
0510     peers_.erase(s);
0511 
0512     // If we removed a peer with objects, our list of objects
0513     // has changed and we need to update downstream peers.
0514     if (needflush)
0515       sendLocalChanges();
0516   }
0517 
0518   /// Send all objects to a peer and optionally mark sent objects old.
0519   void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override {
0520     typename PeerMap::iterator pi, pe;
0521     typename ObjectMap::iterator oi, oe;
0522     size_t size = 0;
0523     size_t numobjs = 0;
0524     for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
0525       for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
0526         if (all || (oi->flags & DQM_PROP_NEW))
0527           size += 9 * sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
0528                   oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
0529 
0530     msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
0531 
0532     uint32_t nupdates = 0;
0533     uint32_t words[4];
0534     words[0] = sizeof(words);
0535     words[1] = DQM_REPLY_LIST_BEGIN;
0536     words[2] = numobjs;
0537     words[3] = all;
0538     copydata(msg, &words[0], sizeof(words));
0539 
0540     for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
0541       for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
0542         if (all || (oi->flags & DQM_PROP_NEW)) {
0543           sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
0544           if (clear)
0545             const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
0546           ++nupdates;
0547         }
0548 
0549     words[1] = DQM_REPLY_LIST_END;
0550     words[2] = nupdates;
0551     copydata(msg, &words[0], sizeof(words));
0552   }
0553 
0554   void sendObjectListToPeers(bool all) override {
0555     typename PeerMap::iterator i, e;
0556     typename ObjectMap::iterator oi, oe;
0557     for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
0558       ImplPeer &p = i->second;
0559       if (!p.update)
0560         continue;
0561 
0562       if (debug_)
0563         logme() << "DEBUG: notifying " << p.peeraddr << std::endl;
0564 
0565       Bucket msg;
0566       msg.next = nullptr;
0567       sendObjectListToPeer(&msg, !p.updated || all, true);
0568 
0569       if (!msg.data.empty()) {
0570         Bucket **prev = &p.sendq;
0571         while (*prev)
0572           prev = &(*prev)->next;
0573 
0574         *prev = new Bucket;
0575         (*prev)->next = nullptr;
0576         (*prev)->data.swap(msg.data);
0577       }
0578       p.updated = true;
0579     }
0580   }
0581 
0582   void updatePeerMasks() override {
0583     typename PeerMap::iterator i, e;
0584     for (i = peers_.begin(), e = peers_.end(); i != e;)
0585       updateMask(&(i++)->second);
0586   }
0587 
0588 protected:
0589   PeerMap peers_;
0590 };
0591 
0592 class DQMBasicNet : public DQMImplNet<DQMNet::Object> {
0593 public:
0594   DQMBasicNet(const std::string &appname = "");
0595 
0596   void reserveLocalSpace(uint32_t size);
0597   void updateLocalObject(Object &o);
0598   bool removeLocalExcept(const std::set<std::string> &known);
0599 
0600 private:
0601   ImplPeer *local_;
0602 };
0603 
0604 #endif  // DQMSERVICES_CORE_DQM_NET_H