File indexing completed on 2023-03-17 10:59:08
0001
0002
0003 import argparse
0004 import subprocess
0005 import socket, fcntl, select, atexit, signal, asyncore
0006 import sys, os, time, datetime
0007 import collections
0008 import json
0009 import zlib
0010
0011 def log(s):
0012 sys.stderr.write("m: " + s + "\n");
0013 sys.stderr.flush()
0014
0015 def dt2time(dt):
0016
0017 return time.mktime(dt.timetuple())
0018
0019 class JsonEncoder(json.JSONEncoder):
0020 def default(self, obj):
0021 if hasattr(obj, 'to_json'):
0022 return obj.to_json()
0023
0024 return json.JSONEncoder.default(self, obj)
0025
0026 class ElasticReport(object):
0027 def __init__(self, args):
0028 self.last_make_report = None
0029 self.make_report_timer = 30
0030 self.seq = 0
0031 self.args = args
0032
0033 self.doc = {
0034 "hostname": socket.gethostname(),
0035 "sequence": self.seq,
0036 "cmdline": args.pargs,
0037 }
0038
0039 def defaults(self):
0040 self.id_format = u"%(type)s-run%(run)06d-host%(hostname)s-pid%(pid)06d"
0041 self.doc["type"] = "dqm-source-state"
0042 self.doc["run"] = 0
0043
0044
0045 c = self.doc["cmdline"]
0046 for l in c:
0047 if l.endswith(".py"):
0048 t = os.path.basename(l)
0049 t = t.replace(".py", "")
0050 t = t.replace("_cfg", "")
0051 self.doc["tag"] = t
0052
0053 pr = l.split("=")
0054 if len(pr) > 1 and pr[0] == "runNumber" and pr[1].isdigit():
0055 run = int(pr[1])
0056 self.doc["run"] = run
0057
0058 self.make_id()
0059
0060
0061
0062
0063 try:
0064 self.doc["stdout_fn"] = os.readlink("/proc/self/fd/1")
0065 self.doc["stderr_fn"] = os.readlink("/proc/self/fd/2")
0066 except:
0067 pass
0068
0069 self.update_doc({ "extra": {
0070 "environ": dict(os.environ)
0071 }})
0072
0073 def make_id(self):
0074 id = self.id_format % self.doc
0075 self.doc["_id"] = id
0076 return id
0077
0078 def update_doc_recursive(self, old_obj, new_obj):
0079 for key, value in new_obj.items():
0080 if (key in old_obj and
0081 isinstance(value, dict) and
0082 isinstance(old_obj[key], dict)):
0083
0084 self.update_doc_recursive(old_obj[key], value)
0085 else:
0086 old_obj[key] = value
0087
0088 def update_doc(self, keys):
0089 self.update_doc_recursive(self.doc, keys)
0090
0091 def update_ps_status(self):
0092 try:
0093 pid = int(self.doc["pid"])
0094 fn = "/proc/%d/status" % pid
0095 f = open(fn, "r")
0096 d = {}
0097 for line in f:
0098 k, v = line.strip().split(":", 1)
0099 d[k.strip()] = v.strip()
0100 f.close()
0101
0102 self.update_doc({ 'extra': { 'ps_info': d } })
0103 except:
0104 pass
0105
0106 def update_mem_status(self):
0107 try:
0108 key = str(time.time())
0109
0110 pid = int(self.doc["pid"])
0111 fn = "/proc/%d/statm" % pid
0112 f = open(fn, "r")
0113 dct = { key: f.read().strip() }
0114 f.close()
0115
0116 self.update_doc({ 'extra': { 'mem_info': dct } })
0117 except:
0118 pass
0119
0120 def make_report(self):
0121 self.last_make_report = time.time()
0122 self.doc["report_timestamp"] = time.time()
0123 self.make_id()
0124
0125 m_path = self.args.path
0126
0127 if not os.path.isdir(m_path):
0128 if self.args.debug:
0129 log("File not written, because report directory does not exist: %s." % m_path)
0130
0131 return
0132
0133 self.update_ps_status()
0134 self.update_mem_status()
0135
0136 fn_id = self.doc["_id"] + ".jsn"
0137
0138 fn = os.path.join(m_path, fn_id)
0139 fn_tmp = os.path.join(m_path, fn_id + ".tmp")
0140
0141 with open(fn_tmp, "w") as f:
0142 json.dump(self.doc, f, indent=True, cls=JsonEncoder)
0143
0144 os.rename(fn_tmp, fn)
0145
0146 if self.args.debug:
0147 log("File %s written." % fn)
0148
0149 def try_update(self):
0150
0151 if self.last_make_report is None:
0152 return self.make_report()
0153
0154 now = time.time()
0155 delta = now - self.last_make_report
0156 if delta > self.make_report_timer:
0157 return self.make_report()
0158
0159 class LineHistoryEnd(object):
0160 def __init__(self, max_bytes=16*1024, max_lines=256):
0161 self.max_bytes = max_bytes
0162 self.max_lines = max_lines
0163
0164 self.buf = collections.deque()
0165 self.size = 0
0166
0167 def pop(self):
0168 elm = self.buf.popleft()
0169 self.size -= len(elm)
0170
0171 def push(self, rbuf):
0172 self.buf.append(rbuf)
0173 self.size += len(rbuf)
0174
0175 def write(self, line):
0176 line_size = len(line)
0177
0178 while len(self.buf) and ((self.size + line_size) > self.max_bytes):
0179 self.pop()
0180
0181 while (len(self.buf) + 1) > self.max_lines:
0182 self.pop()
0183
0184 self.push(line)
0185
0186 def to_json(self):
0187 return list(self.buf)
0188
0189 class LineHistoryStart(LineHistoryEnd):
0190 def __init__(self, *kargs, **kwargs):
0191 LineHistoryEnd.__init__(self, *kargs, **kwargs)
0192 self.done = False
0193
0194 def write(self, line):
0195 if self.done:
0196 return
0197
0198 if ((self.size + len(line)) > self.max_bytes):
0199 self.done = True
0200 return
0201
0202 if (len(self.buf) > self.max_lines):
0203 self.done = True
0204 return
0205
0206 self.push(line)
0207
0208 class AsyncLineReaderMixin(object):
0209 def __init__(self):
0210 self.line_buf = []
0211
0212 def handle_close(self):
0213
0214 if len(self.line_buf):
0215 self.handle_line("".join(self.line_buf))
0216 self.line_buf = []
0217
0218 self.close()
0219
0220 def handle_read(self):
0221 rbuf = self.recv(1024*16)
0222 rbuf = rbuf.decode('utf-8')
0223
0224
0225
0226
0227
0228 self.line_buf.append(rbuf)
0229 if "\n" in rbuf:
0230
0231 spl = "".join(self.line_buf).split("\n")
0232
0233 while len(spl) > 1:
0234 line = spl.pop(0)
0235 self.handle_line(line + "\n")
0236
0237 if len(spl[0]):
0238 self.line_buf = [spl[0]]
0239 else:
0240 self.line_buf = []
0241
0242 def handle_line(self):
0243
0244 pass
0245
0246 class AsyncLineReaderTimeoutMixin(AsyncLineReaderMixin):
0247 def __init__(self, timeout_secs):
0248 self.timeout_secs = timeout_secs
0249 self.last_read = time.time()
0250
0251 super(AsyncLineReaderTimeoutMixin, self).__init__()
0252
0253 def handle_read(self):
0254 self.last_read = time.time()
0255 AsyncLineReaderMixin.handle_read(self)
0256
0257 def readable(self):
0258 if (time.time() - self.last_read) >= self.timeout_secs:
0259 self.last_read = time.time()
0260 self.handle_timeout()
0261
0262 return super(AsyncLineReaderTimeoutMixin, self).readable()
0263
0264 class FDJsonHandler(AsyncLineReaderMixin, asyncore.dispatcher):
0265 def __init__(self, sock, es):
0266 AsyncLineReaderMixin.__init__(self)
0267 asyncore.dispatcher.__init__(self, sock)
0268
0269 self.es = es
0270
0271 def handle_line(self, line):
0272 if len(line) < 4:
0273
0274 self.es.try_update()
0275 return
0276
0277 try:
0278 doc = json.loads(line)
0279
0280 for k in ["pid", "run", "lumi"]:
0281 if k in doc:
0282 doc[k] = int(doc[k])
0283
0284 self.es.update_doc_recursive(self.es.doc, doc)
0285 self.es.try_update()
0286 except:
0287 log("cannot deserialize json len: %d content: %s" % (len(line), line))
0288
0289 def handle_write(self):
0290 pass
0291
0292 def writable(self):
0293 return False
0294
0295 class FDJsonServer(asyncore.file_dispatcher):
0296 def __init__(self, es, args):
0297 asyncore.dispatcher.__init__(self)
0298
0299 self.fn = None
0300 self.es = es
0301 self.args = args
0302
0303 prefix = "/tmp"
0304 if os.path.isdir(self.args.path):
0305 prefix = self.args.path
0306
0307 base = ".es_monitoring_pid%08d" % os.getpid()
0308 self.fn = os.path.join(prefix, base)
0309
0310 if self.args.debug:
0311 log("Socket path: %s" % self.fn)
0312
0313 if os.path.exists(self.fn):
0314 os.unlink(self.fn)
0315
0316 self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
0317 oldmask = os.umask(0o077)
0318 try:
0319 self.bind(self.fn)
0320 self.listen(5)
0321 finally:
0322 os.umask(oldmask)
0323 pass
0324
0325 atexit.register(self.cleanup)
0326
0327 def cleanup(self):
0328 if self.fn is not None:
0329 if os.path.exists(self.fn):
0330 os.unlink(self.fn)
0331
0332 def handle_accept(self):
0333 pair = self.accept()
0334 if pair is not None:
0335 handler = FDJsonHandler(pair[0], self.es)
0336
0337 def handle_close(self):
0338 self.close()
0339 self.cleanup()
0340
0341 class FDOutputListener(AsyncLineReaderTimeoutMixin, asyncore.file_dispatcher):
0342 def __init__(self, fd, es, zlog, close_socket=None):
0343 AsyncLineReaderTimeoutMixin.__init__(self, 5)
0344 asyncore.file_dispatcher.__init__(self, fd)
0345
0346 self.es = es
0347 self.zlog = zlog
0348 self.close_socket = close_socket
0349
0350 self.start = LineHistoryStart();
0351 self.end = LineHistoryEnd()
0352
0353 self.es.update_doc({ 'extra': { 'stdlog_start': self.start } })
0354 self.es.update_doc({ 'extra': { 'stdlog_end': self.end } })
0355
0356 def writable(self):
0357 return False
0358
0359 def handle_line(self, line):
0360 if self.zlog is not None:
0361 self.zlog.write(line)
0362 else:
0363 sys.stdout.write(line)
0364 sys.stdout.flush()
0365
0366 self.start.write(line)
0367 self.end.write(line)
0368 self.es.try_update()
0369
0370 def handle_timeout(self):
0371 self.es.try_update()
0372
0373 if self.zlog is not None:
0374 self.zlog.handle_timeout()
0375
0376 def handle_close(self):
0377 super(FDOutputListener, self).handle_close()
0378
0379 if self.close_socket is not None:
0380 self.close_socket.handle_close()
0381
0382 def finish(self):
0383 if self.zlog is not None:
0384 self.zlog.finish()
0385
0386
0387 CURRENT_PROC = []
0388 def launch_monitoring(args):
0389 es = ElasticReport(args=args)
0390
0391 json_handler = FDJsonServer(es=es, args=args)
0392 env = os.environ.copy()
0393 env["DQM2_SOCKET"] = json_handler.fn
0394
0395 def preexec():
0396 try:
0397
0398 import ctypes
0399 libc = ctypes.CDLL("libc.so.6")
0400 PR_SET_PDEATHSIG = 1
0401 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
0402 except:
0403 log("Failed to setup PR_SET_PDEATHSIG.")
0404 pass
0405
0406 p = subprocess.Popen(args.pargs, preexec_fn=preexec, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True, env=env)
0407 CURRENT_PROC.append(p)
0408
0409 zlog = None
0410 if args.zlog:
0411 try:
0412 relpath = os.path.dirname(__file__)
0413 sys.path.append(relpath)
0414 from ztee import GZipLog
0415
0416 zlog_ = GZipLog(log_file=args.zlog)
0417 es.update_doc({ "stdlog_gzip": args.zlog })
0418
0419 log("Open gzip log file: %s" % args.zlog)
0420 zlog = zlog_
0421 except Exception as e:
0422 log("Failed to setup zlog file: " + str(e))
0423
0424 es.update_doc({ "pid": p.pid })
0425 es.update_doc({ "monitoring_pid": os.getpid() })
0426 es.update_doc({ "monitoring_socket": json_handler.fn })
0427 es.defaults()
0428 es.make_report()
0429
0430 log_handler = FDOutputListener(fd=p.stdout.fileno(), es=es, zlog=zlog, close_socket=json_handler)
0431 log_handler.handle_line("-- starting process: %s --\n" % str(args.pargs))
0432
0433 try:
0434
0435 asyncore.loop(timeout=5)
0436 except select.error as e:
0437
0438
0439 log("Select error (we will terminate): " + str(e))
0440 p.terminate()
0441
0442
0443 r = p.wait()
0444 log_handler.handle_line("\n-- process exit: %s --\n" % str(r))
0445 log_handler.finish()
0446
0447 es.update_doc({ "exit_code": r })
0448 es.make_report()
0449
0450 CURRENT_PROC.remove(p)
0451 return r
0452
0453 def handle_signal(signum, frame):
0454 for proc in CURRENT_PROC:
0455 proc.send_signal(signum)
0456
0457 if __name__ == "__main__":
0458 parser = argparse.ArgumentParser(description="Monitor a child process and produce es documents.")
0459 parser.add_argument('--debug', '-d', action='store_true', help="Debug mode")
0460 parser.add_argument('--zlog', '-z', type=str, default=None, help="Don't output anything, zip the log file (uses ztee.py).")
0461 parser.add_argument('--path', '-p', type=str, default="/tmp/dqm_monitoring/", help="Path for the monitoring output.")
0462 parser.add_argument('pargs', nargs=argparse.REMAINDER)
0463 args = parser.parse_args()
0464
0465 if not args.pargs:
0466 parser.print_help()
0467 sys.exit(-1)
0468 elif args.pargs[0] == "--":
0469
0470 args.pargs = args.pargs[1:]
0471
0472
0473 signal.signal(signal.SIGINT, handle_signal)
0474 signal.signal(signal.SIGTERM, handle_signal)
0475
0476 sys.exit(launch_monitoring(args))