Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-09-28 23:48:30

0001 #!/usr/bin/env python3
0002 
0003 import argparse
0004 import subprocess
0005 import socket, fcntl, select, atexit, signal, asyncore
0006 import sys, os, time, datetime
0007 import collections
0008 import json
0009 import zlib
0010 
0011 def log(s):
0012     sys.stderr.write("m: " + s + "\n");
0013     sys.stderr.flush()
0014 
0015 def dt2time(dt):
0016     # convert datetime timstamp to unix
0017     return time.mktime(dt.timetuple())
0018 
0019 class JsonEncoder(json.JSONEncoder):
0020         def default(self, obj):
0021             if hasattr(obj, 'to_json'):
0022                 return obj.to_json()
0023 
0024             return json.JSONEncoder.default(self, obj)
0025 
0026 class ElasticReport(object):
0027     def __init__(self, args):
0028         self.last_make_report = None
0029         self.make_report_timer = 30
0030         self.seq = 0
0031         self.args = args
0032         
0033         self.doc = {
0034             "hostname": socket.gethostname(),
0035             "sequence": self.seq,
0036             "cmdline": args.pargs,
0037         }
0038 
0039     def defaults(self):
0040         self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
0041         self.doc["type"] = "dqm-source-state"
0042         self.doc["run"] = 0
0043 
0044         # figure out the tag
0045         c = self.doc["cmdline"]
0046         for l in c:
0047             if l.endswith(".py"):
0048                 t = os.path.basename(l)
0049                 t = t.replace(".py", "")
0050                 t = t.replace("_cfg", "")
0051                 self.doc["tag"] = t
0052 
0053             pr = l.split("=")
0054             if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
0055                 run = int(pr[1])
0056                 self.doc["run"] = run
0057 
0058         self.make_id()
0059 
0060         #if os.environ.has_key("GZIP_LOG"):
0061         #    self.doc["stdlog_gzip"] = os.environ["GZIP_LOG"]
0062 
0063         try:
0064             self.doc["stdout_fn"] = os.readlink("/proc/self/fd/1")
0065             self.doc["stderr_fn"] = os.readlink("/proc/self/fd/2")
0066         except:
0067             pass
0068 
0069         self.update_doc({ "extra": {
0070             "environ": dict(os.environ)
0071         }})
0072 
0073     def make_id(self):
0074         id = self.id_format % self.doc
0075         self.doc["_id"] = id
0076         return id
0077 
0078     def update_doc_recursive(self, old_obj, new_obj):
0079         for key, value in new_obj.items():
0080             if (key in old_obj and 
0081                 isinstance(value, dict) and 
0082                 isinstance(old_obj[key], dict)):
0083 
0084                 self.update_doc_recursive(old_obj[key], value)
0085             else:
0086                 old_obj[key] = value
0087 
0088     def update_doc(self, keys):
0089         self.update_doc_recursive(self.doc, keys)
0090 
0091     def update_ps_status(self):
0092         try:
0093             pid = int(self.doc["pid"])
0094             fn = "/proc/%d/status" % pid
0095             f = open(fn, "r")
0096             d = {}
0097             for line in f:
0098                 k, v = line.strip().split(":", 1)
0099                 d[k.strip()] = v.strip()
0100             f.close()
0101 
0102             self.update_doc({ 'extra': { 'ps_info': d } })
0103         except:
0104             pass
0105 
0106     def update_mem_status(self):
0107         try:
0108             key = str(time.time())
0109 
0110             pid = int(self.doc["pid"])
0111             fn = "/proc/%d/statm" % pid
0112             f = open(fn, "r")
0113             dct = { key: f.read().strip() }
0114             f.close()
0115 
0116             self.update_doc({ 'extra': { 'mem_info': dct } })
0117         except:
0118             pass
0119 
0120     def make_report(self):
0121         self.last_make_report = time.time()
0122         self.doc["report_timestamp"] = time.time()
0123         self.make_id()
0124 
0125         m_path = self.args.path
0126 
0127         if not os.path.isdir(m_path):
0128             if self.args.debug:
0129                 log("File not written, because report directory does not exist: %s." % m_path)
0130             # don't make a report if the directory is not available
0131             return
0132 
0133         self.update_ps_status()
0134         self.update_mem_status()
0135 
0136         fn_id = self.doc["_id"] + ".jsn"
0137 
0138         fn = os.path.join(m_path, fn_id) 
0139         fn_tmp = os.path.join(m_path, fn_id + ".tmp") 
0140 
0141         with open(fn_tmp, "w") as f:
0142             json.dump(self.doc, f, indent=True, cls=JsonEncoder)
0143 
0144         os.rename(fn_tmp, fn)
0145 
0146         if self.args.debug:
0147             log("File %s written." % fn)
0148 
0149     def try_update(self):
0150         # first time
0151         if self.last_make_report is None:
0152             return self.make_report()
0153 
0154         now = time.time()
0155         delta = now - self.last_make_report
0156         if delta > self.make_report_timer:
0157             return self.make_report()
0158 
0159 class LineHistoryEnd(object):
0160     def __init__(self, max_bytes=16*1024, max_lines=256):
0161         self.max_bytes = max_bytes
0162         self.max_lines = max_lines
0163 
0164         self.buf = collections.deque()
0165         self.size = 0
0166 
0167     def pop(self):
0168         elm = self.buf.popleft()
0169         self.size -= len(elm)
0170 
0171     def push(self, rbuf):
0172         self.buf.append(rbuf)
0173         self.size += len(rbuf)
0174 
0175     def write(self, line):
0176         line_size = len(line)
0177 
0178         while len(self.buf) and ((self.size + line_size) > self.max_bytes):
0179             self.pop()
0180 
0181         while (len(self.buf) + 1) > self.max_lines:
0182             self.pop()
0183 
0184         self.push(line)
0185 
0186     def to_json(self):
0187         return list(self.buf)
0188 
0189 class LineHistoryStart(LineHistoryEnd):
0190     def __init__(self, *kargs, **kwargs):
0191         LineHistoryEnd.__init__(self, *kargs, **kwargs)
0192         self.done = False
0193 
0194     def write(self, line):
0195         if self.done:
0196             return
0197 
0198         if ((self.size + len(line)) > self.max_bytes):
0199             self.done = True
0200             return
0201 
0202         if (len(self.buf) > self.max_lines):
0203             self.done = True
0204             return
0205 
0206         self.push(line)
0207 
0208 class AsyncLineReaderMixin(object):
0209     def __init__(self):
0210         self.line_buf = []
0211 
0212     def handle_close(self):
0213         # closing fd
0214         if len(self.line_buf):
0215             self.handle_line("".join(self.line_buf))
0216             self.line_buf = []
0217 
0218         self.close()
0219 
0220     def handle_read(self):
0221         rbuf = self.recv(1024*16)
0222         rbuf = rbuf.decode('utf-8')
0223         ## not needed, since asyncore automatically handles close
0224         #if len(rbuf) == 0:
0225         #    self.handle_close()
0226         #    return
0227 
0228         self.line_buf.append(rbuf)
0229         if "\n" in rbuf:
0230             # split whatever we have
0231             spl = "".join(self.line_buf).split("\n")
0232 
0233             while len(spl) > 1:
0234                 line = spl.pop(0)
0235                 self.handle_line(line + "\n")
0236 
0237             if len(spl[0]):
0238                 self.line_buf = [spl[0]]
0239             else:
0240                 self.line_buf = []
0241 
0242     def handle_line(self):
0243         # override this!
0244         pass
0245 
0246 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
0247     def __init__(self, timeout_secs):
0248         self.timeout_secs = timeout_secs
0249         self.last_read = time.time()
0250 
0251         super(AsyncLineReaderTimeoutMixin, self).__init__()
0252 
0253     def handle_read(self):
0254         self.last_read = time.time()
0255         AsyncLineReaderMixin.handle_read(self)
0256 
0257     def readable(self):
0258         if (time.time() - self.last_read) >= self.timeout_secs:
0259             self.last_read = time.time()
0260             self.handle_timeout()
0261 
0262         return super(AsyncLineReaderTimeoutMixin, self).readable()
0263 
0264 class FDJsonHandler(AsyncLineReaderMixin, asyncore.dispatcher):
0265     def __init__(self, sock, es):
0266         AsyncLineReaderMixin.__init__(self)
0267         asyncore.dispatcher.__init__(self, sock)
0268 
0269         self.es = es
0270 
0271     def handle_line(self, line):
0272         if len(line) < 4:
0273             # keep alive 'ping'
0274             self.es.try_update()
0275             return
0276 
0277         try:
0278             doc = json.loads(line)
0279 
0280             for k in ["pid", "run", "lumi"]:
0281                 if k in doc:
0282                     doc[k] = int(doc[k])
0283 
0284             self.es.update_doc_recursive(self.es.doc, doc)
0285             self.es.try_update()
0286         except:
0287             log("cannot deserialize json len: %d content: %s" % (len(line), line))
0288 
0289     def handle_write(self):
0290         pass
0291 
0292     def writable(self):
0293         return False
0294 
0295 class FDJsonServer(asyncore.file_dispatcher):
0296     def __init__(self, es, args):
0297         asyncore.dispatcher.__init__(self)
0298 
0299         self.fn = None
0300         self.es = es
0301         self.args = args
0302 
0303         prefix = "/tmp"
0304         if os.path.isdir(self.args.path):
0305             prefix = self.args.path
0306 
0307         base = ".es_monitoring_pid%08d" % os.getpid()
0308         self.fn = os.path.join(prefix, base)
0309 
0310         if self.args.debug:
0311             log("Socket path: %s" % self.fn)
0312 
0313         if os.path.exists(self.fn):
0314             os.unlink(self.fn)
0315 
0316         self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
0317         oldmask = os.umask(0o077)
0318         try:
0319             self.bind(self.fn)
0320             self.listen(5)
0321         finally:
0322             os.umask(oldmask)
0323             pass
0324 
0325         atexit.register(self.cleanup)
0326 
0327     def cleanup(self):
0328         if self.fn is not None:
0329             if os.path.exists(self.fn):
0330                 os.unlink(self.fn)
0331 
0332     def handle_accept(self):
0333         pair = self.accept()
0334         if pair is not None:
0335             handler = FDJsonHandler(pair[0], self.es)
0336 
0337     def handle_close(self):
0338         self.close()
0339         self.cleanup()
0340 
0341 class FDOutputListener(AsyncLineReaderTimeoutMixin, asyncore.file_dispatcher):
0342     def __init__(self, fd, es, zlog, close_socket=None):
0343         AsyncLineReaderTimeoutMixin.__init__(self, 5)
0344         asyncore.file_dispatcher.__init__(self, fd)
0345 
0346         self.es = es
0347         self.zlog = zlog
0348         self.close_socket = close_socket
0349 
0350         self.start = LineHistoryStart();
0351         self.end = LineHistoryEnd()
0352 
0353         self.es.update_doc({ 'extra': { 'stdlog_start': self.start } })
0354         self.es.update_doc({ 'extra': { 'stdlog_end': self.end } })
0355 
0356     def writable(self):
0357         return False
0358 
0359     def handle_line(self, line):
0360         if self.zlog is not None:
0361             self.zlog.write(line)
0362         else:
0363             sys.stdout.write(line)
0364             sys.stdout.flush()
0365         
0366         self.start.write(line)
0367         self.end.write(line)
0368         self.es.try_update()
0369 
0370     def handle_timeout(self):
0371         self.es.try_update()
0372 
0373         if self.zlog is not None:
0374             self.zlog.handle_timeout()
0375 
0376     def handle_close(self):
0377         super(FDOutputListener, self).handle_close()
0378 
0379         if self.close_socket is not None:
0380             self.close_socket.handle_close()
0381     
0382     def finish(self):
0383         if self.zlog is not None:
0384             self.zlog.finish()
0385 
0386 
0387 CURRENT_PROC = []
0388 def launch_monitoring(args):
0389     es = ElasticReport(args=args)
0390 
0391     json_handler = FDJsonServer(es=es, args=args)
0392     env = os.environ.copy()
0393     env["DQM2_SOCKET"] = json_handler.fn 
0394 
0395     def preexec():
0396         try:
0397             # ensure the child dies if we are SIGKILLED
0398             import ctypes
0399             libc = ctypes.CDLL("libc.so.6")
0400             PR_SET_PDEATHSIG = 1
0401             libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
0402         except:
0403             log("Failed to setup PR_SET_PDEATHSIG.")
0404             pass
0405 
0406     p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, env=env)
0407     CURRENT_PROC.append(p)
0408 
0409     zlog = None
0410     if args.zlog:
0411         try:
0412             relpath = os.path.dirname(__file__)
0413             sys.path.append(relpath)
0414             from ztee import GZipLog
0415             
0416             zlog_ = GZipLog(log_file=args.zlog)
0417             es.update_doc({ "stdlog_gzip": args.zlog })
0418 
0419             log("Open gzip log file: %s" % args.zlog)
0420             zlog = zlog_
0421         except Exception as e:
0422             log("Failed to setup zlog file: " + str(e))
0423 
0424     es.update_doc({ "pid": p.pid })
0425     es.update_doc({ "monitoring_pid": os.getpid() })
0426     es.update_doc({ "monitoring_socket": json_handler.fn })
0427     es.defaults()
0428     es.make_report()
0429 
0430     log_handler = FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
0431     log_handler.handle_line("-- starting process: %s --\n" % str(args.pargs))
0432 
0433     try:
0434         #manager.event_loop(timeout=5, exit_fd=p.stdout.fileno())
0435         asyncore.loop(timeout=5)
0436     except select.error as e:
0437         # we have this on ctrl+c
0438         # just terminate the child
0439         log("Select error (we will terminate): " + str(e))
0440         p.terminate()
0441 
0442     # at this point the program is dead
0443     r =  p.wait()
0444     log_handler.handle_line("\n-- process exit: %s --\n" % str(r))
0445     log_handler.finish()
0446 
0447     es.update_doc({ "exit_code": r })
0448     es.make_report()
0449 
0450     CURRENT_PROC.remove(p)
0451     return r
0452 
0453 def handle_signal(signum, frame):
0454     for proc in CURRENT_PROC:
0455         proc.send_signal(signum)
0456 
0457 if __name__ == "__main__":
0458     parser = argparse.ArgumentParser(description="Monitor a child process and produce es documents.")
0459     parser.add_argument('--debug', '-d', action='store_true', help="Debug mode")
0460     parser.add_argument('--zlog', '-z', type=str, default=None, help="Don't output anything, zip the log file (uses ztee.py).")
0461     parser.add_argument('--path', '-p', type=str, default="/tmp/dqm_monitoring/", help="Path for the monitoring output.")
0462     parser.add_argument('pargs', nargs=argparse.REMAINDER)
0463     args = parser.parse_args()
0464 
0465     if not args.pargs:
0466         parser.print_help()
0467         sys.exit(-1)
0468     elif args.pargs[0] == "--":
0469         # compat with 2.6
0470         args.pargs = args.pargs[1:]
0471 
0472     # do some signal magic
0473     signal.signal(signal.SIGINT, handle_signal)
0474     signal.signal(signal.SIGTERM, handle_signal)
0475 
0476     sys.exit(launch_monitoring(args))