Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-03-10 23:53:30

0001 #ifndef DQMSERVICES_CORE_DQM_NET_H
0002 #define DQMSERVICES_CORE_DQM_NET_H
0003 
0004 #include "classlib/iobase/Socket.h"
0005 #include "classlib/iobase/IOSelector.h"
0006 #include "classlib/iobase/Pipe.h"
0007 #include "classlib/utils/Error.h"
0008 #include "classlib/utils/Time.h"
0009 #include <pthread.h>
0010 #include <cstdint>
0011 #include <csignal>
0012 #include <iostream>
0013 #include <vector>
0014 #include <string>
0015 #include <list>
0016 #include <map>
0017 #include <set>
0018 #include <unordered_set>
0019 
0020 // for definition of QValue
0021 #include "DataFormats/Histograms/interface/MonitorElementCollection.h"
0022 
0023 //class DQMStore;
0024 
0025 class DQMNet {
0026 public:
0027   static const uint32_t DQM_PROP_TYPE_MASK = 0x000000ff;
0028   static const uint32_t DQM_PROP_TYPE_SCALAR = 0x0000000f;
0029   static const uint32_t DQM_PROP_TYPE_INVALID = 0x00000000;
0030   static const uint32_t DQM_PROP_TYPE_INT = 0x00000001;
0031   static const uint32_t DQM_PROP_TYPE_REAL = 0x00000002;
0032   static const uint32_t DQM_PROP_TYPE_STRING = 0x00000003;
0033   static const uint32_t DQM_PROP_TYPE_TH1F = 0x00000010;
0034   static const uint32_t DQM_PROP_TYPE_TH1S = 0x00000011;
0035   static const uint32_t DQM_PROP_TYPE_TH1D = 0x00000012;
0036   static const uint32_t DQM_PROP_TYPE_TH1I = 0x00000013;
0037   static const uint32_t DQM_PROP_TYPE_TH2F = 0x00000020;
0038   static const uint32_t DQM_PROP_TYPE_TH2S = 0x00000021;
0039   static const uint32_t DQM_PROP_TYPE_TH2D = 0x00000022;
0040   static const uint32_t DQM_PROP_TYPE_TH2I = 0x00000023;
0041   static const uint32_t DQM_PROP_TYPE_TH2Poly = 0x00000024;
0042   static const uint32_t DQM_PROP_TYPE_TH3F = 0x00000030;
0043   static const uint32_t DQM_PROP_TYPE_TH3S = 0x00000031;
0044   static const uint32_t DQM_PROP_TYPE_TH3D = 0x00000032;
0045   static const uint32_t DQM_PROP_TYPE_TPROF = 0x00000040;
0046   static const uint32_t DQM_PROP_TYPE_TPROF2D = 0x00000041;
0047   static const uint32_t DQM_PROP_TYPE_DATABLOB = 0x00000050;
0048 
0049   static const uint32_t DQM_PROP_REPORT_MASK = 0x00000f00;
0050   static const uint32_t DQM_PROP_REPORT_CLEAR = 0x00000000;
0051   static const uint32_t DQM_PROP_REPORT_ERROR = 0x00000100;
0052   static const uint32_t DQM_PROP_REPORT_WARN = 0x00000200;
0053   static const uint32_t DQM_PROP_REPORT_OTHER = 0x00000400;
0054   static const uint32_t DQM_PROP_REPORT_ALARM = (DQM_PROP_REPORT_ERROR | DQM_PROP_REPORT_WARN | DQM_PROP_REPORT_OTHER);
0055 
0056   static const uint32_t DQM_PROP_HAS_REFERENCE = 0x00001000;
0057   static const uint32_t DQM_PROP_TAGGED = 0x00002000;
0058   static const uint32_t DQM_PROP_ACCUMULATE = 0x00004000;
0059   static const uint32_t DQM_PROP_RESET = 0x00008000;
0060 
0061   static const uint32_t DQM_PROP_NEW = 0x00010000;
0062   static const uint32_t DQM_PROP_RECEIVED = 0x00020000;
0063   static const uint32_t DQM_PROP_LUMI = 0x00040000;
0064   static const uint32_t DQM_PROP_DEAD = 0x00080000;
0065   static const uint32_t DQM_PROP_STALE = 0x00100000;
0066   static const uint32_t DQM_PROP_EFFICIENCY_PLOT = 0x00200000;
0067   static const uint32_t DQM_PROP_MARKTODELETE = 0x01000000;
0068 
0069   static const uint32_t DQM_MSG_HELLO = 0;
0070   static const uint32_t DQM_MSG_UPDATE_ME = 1;
0071   static const uint32_t DQM_MSG_LIST_OBJECTS = 2;
0072   static const uint32_t DQM_MSG_GET_OBJECT = 3;
0073 
0074   static const uint32_t DQM_REPLY_LIST_BEGIN = 101;
0075   static const uint32_t DQM_REPLY_LIST_END = 102;
0076   static const uint32_t DQM_REPLY_NONE = 103;
0077   static const uint32_t DQM_REPLY_OBJECT = 104;
0078 
0079   static const uint32_t MAX_PEER_WAITREQS = 128;
0080 
0081   struct Peer;
0082   struct WaitObject;
0083 
0084   using QValue = MonitorElementData::QReport::QValue;
0085   using DataBlob = std::vector<unsigned char>;
0086   using QReports = std::vector<QValue>;
0087   using TagList = std::vector<uint32_t>;  // DEPRECATED
0088   using WaitList = std::list<WaitObject>;
0089 
0090   struct CoreObject {
0091     uint32_t flags;
0092     uint32_t tag;
0093     uint64_t version;
0094     uint32_t run;
0095     uint32_t lumi;
0096     uint32_t streamId;
0097     uint32_t moduleId;
0098     std::string dirname;
0099     std::string objname;
0100     QReports qreports;
0101   };
0102 
0103   struct Object : CoreObject {
0104     uint64_t hash;
0105     uint64_t lastreq;
0106     DataBlob rawdata;
0107     std::string scalar;
0108     std::string qdata;
0109   };
0110 
0111   struct Bucket {
0112     Bucket *next;
0113     DataBlob data;
0114   };
0115 
0116   struct WaitObject {
0117     lat::Time time;
0118     std::string name;
0119     std::string info;
0120     Peer *peer;
0121   };
0122 
0123   struct AutoPeer;
0124   struct Peer {
0125     std::string peeraddr;
0126     lat::Socket *socket;
0127     DataBlob incoming;
0128     Bucket *sendq;
0129     size_t sendpos;
0130 
0131     unsigned mask;
0132     bool source;
0133     bool update;
0134     bool updated;
0135     size_t updates;
0136     size_t waiting;
0137     AutoPeer *automatic;
0138   };
0139 
0140   struct AutoPeer {
0141     Peer *peer;
0142     lat::Time next;
0143     std::string host;
0144     int port;
0145     bool update;
0146   };
0147 
0148   DQMNet(const std::string &appname = "");
0149   virtual ~DQMNet();
0150 
0151   void debug(bool doit);
0152   void delay(int delay);
0153   void startLocalServer(int port);
0154   void startLocalServer(const char *path);
0155   void staleObjectWaitLimit(lat::TimeSpan time);
0156   void updateToCollector(const std::string &host, int port);
0157   void listenToCollector(const std::string &host, int port);
0158   void shutdown();
0159   void lock();
0160   void unlock();
0161 
0162   void start();
0163   void run();
0164 
0165   void sendLocalChanges();
0166 
0167   static bool setOrder(const CoreObject &a, const CoreObject &b) {
0168     if (a.run == b.run) {
0169       if (a.lumi == b.lumi) {
0170         if (a.streamId == b.streamId) {
0171           if (a.moduleId == b.moduleId) {
0172             if (a.dirname == b.dirname) {
0173               return a.objname < b.objname;
0174             }
0175             return a.dirname < b.dirname;
0176           }
0177           return a.moduleId < b.moduleId;
0178         }
0179         return a.streamId < b.streamId;
0180       }
0181       return a.lumi < b.lumi;
0182     }
0183     return a.run < b.run;
0184   }
0185 
0186   struct HashOp {
0187     uint32_t operator()(const Object &a) const { return a.hash; }
0188   };
0189 
0190   struct HashEqual {
0191     bool operator()(const Object &a, const Object &b) const {
0192       return a.hash == b.hash && a.dirname == b.dirname && a.objname == b.objname;
0193     }
0194   };
0195 
0196   static size_t dqmhash(const void *key, size_t keylen) {
0197     // Reduced version of Bob Jenkins' hash function at:
0198     //   http://www.burtleburtle.net/bob/c/lookup3.c
0199 #define dqmhashrot(x, k) (((x) << (k)) | ((x) >> (32 - (k))))
0200 #define dqmhashmix(a, b, c) \
0201   {                         \
0202     a -= c;                 \
0203     a ^= dqmhashrot(c, 4);  \
0204     c += b;                 \
0205     b -= a;                 \
0206     b ^= dqmhashrot(a, 6);  \
0207     a += c;                 \
0208     c -= b;                 \
0209     c ^= dqmhashrot(b, 8);  \
0210     b += a;                 \
0211     a -= c;                 \
0212     a ^= dqmhashrot(c, 16); \
0213     c += b;                 \
0214     b -= a;                 \
0215     b ^= dqmhashrot(a, 19); \
0216     a += c;                 \
0217     c -= b;                 \
0218     c ^= dqmhashrot(b, 4);  \
0219     b += a;                 \
0220   }
0221 #define dqmhashfinal(a, b, c) \
0222   {                           \
0223     c ^= b;                   \
0224     c -= dqmhashrot(b, 14);   \
0225     a ^= c;                   \
0226     a -= dqmhashrot(c, 11);   \
0227     b ^= a;                   \
0228     b -= dqmhashrot(a, 25);   \
0229     c ^= b;                   \
0230     c -= dqmhashrot(b, 16);   \
0231     a ^= c;                   \
0232     a -= dqmhashrot(c, 4);    \
0233     b ^= a;                   \
0234     b -= dqmhashrot(a, 14);   \
0235     c ^= b;                   \
0236     c -= dqmhashrot(b, 24);   \
0237   }
0238 
0239     uint32_t a, b, c;
0240     a = b = c = 0xdeadbeef + (uint32_t)keylen;
0241     const auto *k = (const unsigned char *)key;
0242 
0243     // all but the last block: affect some bits of (a, b, c)
0244     while (keylen > 12) {
0245       a += k[0];
0246       a += ((uint32_t)k[1]) << 8;
0247       a += ((uint32_t)k[2]) << 16;
0248       a += ((uint32_t)k[3]) << 24;
0249       b += k[4];
0250       b += ((uint32_t)k[5]) << 8;
0251       b += ((uint32_t)k[6]) << 16;
0252       b += ((uint32_t)k[7]) << 24;
0253       c += k[8];
0254       c += ((uint32_t)k[9]) << 8;
0255       c += ((uint32_t)k[10]) << 16;
0256       c += ((uint32_t)k[11]) << 24;
0257       dqmhashmix(a, b, c);
0258       keylen -= 12;
0259       k += 12;
0260     }
0261 
0262     // last block: affect all 32 bits of (c); all case statements fall through
0263     switch (keylen) {
0264       case 12:
0265         c += ((uint32_t)k[11]) << 24;
0266         [[fallthrough]];
0267       case 11:
0268         c += ((uint32_t)k[10]) << 16;
0269         [[fallthrough]];
0270       case 10:
0271         c += ((uint32_t)k[9]) << 8;
0272         [[fallthrough]];
0273       case 9:
0274         c += k[8];
0275         [[fallthrough]];
0276       case 8:
0277         b += ((uint32_t)k[7]) << 24;
0278         [[fallthrough]];
0279       case 7:
0280         b += ((uint32_t)k[6]) << 16;
0281         [[fallthrough]];
0282       case 6:
0283         b += ((uint32_t)k[5]) << 8;
0284         [[fallthrough]];
0285       case 5:
0286         b += k[4];
0287         [[fallthrough]];
0288       case 4:
0289         a += ((uint32_t)k[3]) << 24;
0290         [[fallthrough]];
0291       case 3:
0292         a += ((uint32_t)k[2]) << 16;
0293         [[fallthrough]];
0294       case 2:
0295         a += ((uint32_t)k[1]) << 8;
0296         [[fallthrough]];
0297       case 1:
0298         a += k[0];
0299         break;
0300       case 0:
0301         return c;
0302     }
0303 
0304     dqmhashfinal(a, b, c);
0305     return c;
0306 #undef dqmhashrot
0307 #undef dqmhashmix
0308 #undef dqmhashfinal
0309   }
0310 
0311   static void packQualityData(std::string &into, const QReports &qr);
0312   static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from);
0313 
0314 protected:
0315   std::ostream &logme();
0316   static void copydata(Bucket *b, const void *data, size_t len);
0317   virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data);
0318 
0319   virtual bool shouldStop();
0320   void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner);
0321   virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o);
0322   virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len);
0323 
0324   // bool           reconstructObject(Object &o);
0325   // bool           reinstateObject(DQMStore *store, Object &o);
0326   virtual Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) = 0;
0327   virtual Object *makeObject(Peer *p, const std::string &name) = 0;
0328   virtual void markObjectsDead(Peer *p) = 0;
0329   virtual void purgeDeadObjects(Peer *p) = 0;
0330 
0331   virtual Peer *getPeer(lat::Socket *s) = 0;
0332   virtual Peer *createPeer(lat::Socket *s) = 0;
0333   virtual void removePeer(Peer *p, lat::Socket *s) = 0;
0334   virtual void sendObjectListToPeer(Bucket *msg, bool all, bool clear) = 0;
0335   virtual void sendObjectListToPeers(bool all) = 0;
0336 
0337   void updateMask(Peer *p);
0338   virtual void updatePeerMasks() = 0;
0339   static void discard(Bucket *&b);
0340 
0341   bool debug_;
0342   pthread_mutex_t lock_;
0343 
0344 private:
0345   void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err = nullptr);
0346   void requestObjectData(Peer *p, const char *name, size_t len);
0347   void releaseFromWait(WaitList::iterator i, Object *o);
0348   void releaseWaiters(const std::string &name, Object *o);
0349 
0350   bool onPeerData(lat::IOSelectEvent *ev, Peer *p);
0351   bool onPeerConnect(lat::IOSelectEvent *ev);
0352   bool onLocalNotify(lat::IOSelectEvent *ev);
0353 
0354   std::string appname_;
0355   int pid_;
0356 
0357   lat::IOSelector sel_;
0358   lat::Socket *server_;
0359   lat::Pipe wakeup_;
0360   lat::Time version_;
0361 
0362   AutoPeer upstream_;
0363   AutoPeer downstream_;
0364   WaitList waiting_;
0365 
0366   pthread_t communicate_;
0367   sig_atomic_t shutdown_;
0368 
0369   int delay_;
0370   lat::TimeSpan waitStale_;
0371   lat::TimeSpan waitMax_;
0372   bool flush_;
0373 
0374 public:
0375   // copying is not available
0376   DQMNet(const DQMNet &) = delete;
0377   DQMNet &operator=(const DQMNet &) = delete;
0378 };
0379 
0380 template <class ObjType>
0381 class DQMImplNet : public DQMNet {
0382 public:
0383   struct ImplPeer;
0384 
0385   using DirMap = std::set<std::string>;
0386   typedef std::unordered_set<ObjType, HashOp, HashEqual> ObjectMap;
0387   typedef std::map<lat::Socket *, ImplPeer> PeerMap;
0388   struct ImplPeer : Peer {
0389     ImplPeer() = default;
0390     ObjectMap objs;
0391     DirMap dirs;
0392   };
0393 
0394   DQMImplNet(const std::string &appname = "") : DQMNet(appname) {}
0395 
0396   ~DQMImplNet() override = default;
0397 
0398 protected:
0399   Object *findObject(Peer *p, const std::string &name, Peer **owner = nullptr) override {
0400     size_t slash = name.rfind('/');
0401     size_t dirpos = (slash == std::string::npos ? 0 : slash);
0402     size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0403     std::string path(name, 0, dirpos);
0404     ObjType proto;
0405     proto.hash = dqmhash(name.c_str(), name.size());
0406     proto.dirname = path;
0407     proto.objname.append(name, namepos, std::string::npos);
0408 
0409     typename ObjectMap::iterator pos;
0410     typename PeerMap::iterator i, e;
0411     if (owner)
0412       *owner = nullptr;
0413     if (p) {
0414       auto *ip = static_cast<ImplPeer *>(p);
0415       pos = ip->objs.find(proto);
0416       if (pos == ip->objs.end())
0417         return nullptr;
0418       else {
0419         if (owner)
0420           *owner = ip;
0421         return const_cast<ObjType *>(&*pos);
0422       }
0423     } else {
0424       for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
0425         pos = i->second.objs.find(proto);
0426         if (pos != i->second.objs.end()) {
0427           if (owner)
0428             *owner = &i->second;
0429           return const_cast<ObjType *>(&*pos);
0430         }
0431       }
0432       return nullptr;
0433     }
0434   }
0435 
0436   Object *makeObject(Peer *p, const std::string &name) override {
0437     auto *ip = static_cast<ImplPeer *>(p);
0438     size_t slash = name.rfind('/');
0439     size_t dirpos = (slash == std::string::npos ? 0 : slash);
0440     size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0441     ObjType o;
0442     o.flags = 0;
0443     o.tag = 0;
0444     o.version = 0;
0445     o.lastreq = 0;
0446     o.dirname = *ip->dirs.insert(name.substr(0, dirpos)).first;
0447     o.objname.append(name, namepos, std::string::npos);
0448     o.hash = dqmhash(name.c_str(), name.size());
0449     return const_cast<ObjType *>(&*ip->objs.insert(o).first);
0450   }
0451 
0452   // Mark all the objects dead.  This is intended to be used when
0453   // starting to process a complete list of objects, in order to
0454   // flag the objects that need to be killed at the end.  After
0455   // call to this method, revive all live objects by removing the
0456   // DQM_PROP_DEAD flag, then call purgeDeadObjects() at the end
0457   // to remove the dead ones.  This also turns off object request
0458   // for objects we've lost interest in.
0459   void markObjectsDead(Peer *p) override {
0460     uint64_t minreq = (lat::Time::current() - lat::TimeSpan(0, 0, 5 /* minutes */, 0, 0)).ns();
0461     auto *ip = static_cast<ImplPeer *>(p);
0462     typename ObjectMap::iterator i, e;
0463     for (i = ip->objs.begin(), e = ip->objs.end(); i != e; ++i) {
0464       if (i->lastreq && i->lastreq < minreq)
0465         const_cast<ObjType &>(*i).lastreq = 0;
0466       const_cast<ObjType &>(*i).flags |= DQM_PROP_DEAD;
0467     }
0468   }
0469 
0470   // Mark remaining zombie objects as dead.  See markObjectsDead().
0471   void purgeDeadObjects(Peer *p) override {
0472     auto *ip = static_cast<ImplPeer *>(p);
0473     typename ObjectMap::iterator i, e;
0474     for (i = ip->objs.begin(), e = ip->objs.end(); i != e;) {
0475       if (i->flags & DQM_PROP_DEAD)
0476         ip->objs.erase(i++);
0477       else
0478         ++i;
0479     }
0480   }
0481 
0482   Peer *getPeer(lat::Socket *s) override {
0483     auto pos = peers_.find(s);
0484     auto end = peers_.end();
0485     return pos == end ? nullptr : &pos->second;
0486   }
0487 
0488   Peer *createPeer(lat::Socket *s) override {
0489     ImplPeer *ip = &peers_[s];
0490     ip->socket = nullptr;
0491     ip->sendq = nullptr;
0492     ip->sendpos = 0;
0493     ip->mask = 0;
0494     ip->source = false;
0495     ip->update = false;
0496     ip->updated = false;
0497     ip->updates = 0;
0498     ip->waiting = 0;
0499     ip->automatic = nullptr;
0500     return ip;
0501   }
0502 
0503   void removePeer(Peer *p, lat::Socket *s) override {
0504     auto *ip = static_cast<ImplPeer *>(p);
0505     bool needflush = !ip->objs.empty();
0506 
0507     typename ObjectMap::iterator i, e;
0508     for (i = ip->objs.begin(), e = ip->objs.end(); i != e;)
0509       ip->objs.erase(i++);
0510 
0511     peers_.erase(s);
0512 
0513     // If we removed a peer with objects, our list of objects
0514     // has changed and we need to update downstream peers.
0515     if (needflush)
0516       sendLocalChanges();
0517   }
0518 
0519   /// Send all objects to a peer and optionally mark sent objects old.
0520   void sendObjectListToPeer(Bucket *msg, bool all, bool clear) override {
0521     typename PeerMap::iterator pi, pe;
0522     typename ObjectMap::iterator oi, oe;
0523     size_t size = 0;
0524     size_t numobjs = 0;
0525     for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
0526       for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi, ++numobjs)
0527         if (all || (oi->flags & DQM_PROP_NEW))
0528           size += 9 * sizeof(uint32_t) + oi->dirname.size() + oi->objname.size() + 1 + oi->scalar.size() +
0529                   oi->qdata.size() + (oi->lastreq > 0 ? oi->rawdata.size() : 0);
0530 
0531     msg->data.reserve(msg->data.size() + size + 8 * sizeof(uint32_t));
0532 
0533     uint32_t nupdates = 0;
0534     uint32_t words[4];
0535     words[0] = sizeof(words);
0536     words[1] = DQM_REPLY_LIST_BEGIN;
0537     words[2] = numobjs;
0538     words[3] = all;
0539     copydata(msg, &words[0], sizeof(words));
0540 
0541     for (pi = peers_.begin(), pe = peers_.end(); pi != pe; ++pi)
0542       for (oi = pi->second.objs.begin(), oe = pi->second.objs.end(); oi != oe; ++oi)
0543         if (all || (oi->flags & DQM_PROP_NEW)) {
0544           sendObjectToPeer(msg, const_cast<ObjType &>(*oi), oi->lastreq > 0);
0545           if (clear)
0546             const_cast<ObjType &>(*oi).flags &= ~DQM_PROP_NEW;
0547           ++nupdates;
0548         }
0549 
0550     words[1] = DQM_REPLY_LIST_END;
0551     words[2] = nupdates;
0552     copydata(msg, &words[0], sizeof(words));
0553   }
0554 
0555   void sendObjectListToPeers(bool all) override {
0556     typename PeerMap::iterator i, e;
0557     typename ObjectMap::iterator oi, oe;
0558     for (i = peers_.begin(), e = peers_.end(); i != e; ++i) {
0559       ImplPeer &p = i->second;
0560       if (!p.update)
0561         continue;
0562 
0563       if (debug_)
0564         logme() << "DEBUG: notifying " << p.peeraddr << std::endl;
0565 
0566       Bucket msg;
0567       msg.next = nullptr;
0568       sendObjectListToPeer(&msg, !p.updated || all, true);
0569 
0570       if (!msg.data.empty()) {
0571         Bucket **prev = &p.sendq;
0572         while (*prev)
0573           prev = &(*prev)->next;
0574 
0575         *prev = new Bucket;
0576         (*prev)->next = nullptr;
0577         (*prev)->data.swap(msg.data);
0578       }
0579       p.updated = true;
0580     }
0581   }
0582 
0583   void updatePeerMasks() override {
0584     typename PeerMap::iterator i, e;
0585     for (i = peers_.begin(), e = peers_.end(); i != e;)
0586       updateMask(&(i++)->second);
0587   }
0588 
0589 protected:
0590   PeerMap peers_;
0591 };
0592 
0593 class DQMBasicNet : public DQMImplNet<DQMNet::Object> {
0594 public:
0595   DQMBasicNet(const std::string &appname = "");
0596 
0597   void reserveLocalSpace(uint32_t size);
0598   void updateLocalObject(Object &o);
0599   bool removeLocalExcept(const std::set<std::string> &known);
0600 
0601 private:
0602   ImplPeer *local_;
0603 };
0604 
0605 #endif  // DQMSERVICES_CORE_DQM_NET_H