Warning, /DPGAnalysis/HcalTools/scripts/cmt/das_client.py_For_7 is written in an unsupported language. File is not indexed.
0001 #!/usr/bin/env python
0002 #-*- coding: utf-8 -*-
0003 #pylint: disable=C0301,C0103,R0914,R0903
0004
0005 """
0006 DAS command line tool
0007 """
0008 __author__ = "Valentin Kuznetsov"
0009
0010 import sys
0011 if sys.version_info < (2, 6):
0012 raise Exception("DAS requires python 2.6 or greater")
0013
0014 DAS_CLIENT = 'das-client/1.0::python/%s.%s' % sys.version_info[:2]
0015
0016 import os
0017 import re
0018 import time
0019 import json
0020 import urllib
0021 import urllib2
0022 import httplib
0023 import cookielib
0024 from optparse import OptionParser
0025 from math import log
0026
0027 # define exit codes according to Linux sysexists.h
0028 EX_OK = 0 # successful termination
0029 EX__BASE = 64 # base value for error messages
0030 EX_USAGE = 64 # command line usage error
0031 EX_DATAERR = 65 # data format error
0032 EX_NOINPUT = 66 # cannot open input
0033 EX_NOUSER = 67 # addressee unknown
0034 EX_NOHOST = 68 # host name unknown
0035 EX_UNAVAILABLE = 69 # service unavailable
0036 EX_SOFTWARE = 70 # internal software error
0037 EX_OSERR = 71 # system error (e.g., can't fork)
0038 EX_OSFILE = 72 # critical OS file missing
0039 EX_CANTCREAT = 73 # can't create (user) output file
0040 EX_IOERR = 74 # input/output error
0041 EX_TEMPFAIL = 75 # temp failure; user is invited to retry
0042 EX_PROTOCOL = 76 # remote error in protocol
0043 EX_NOPERM = 77 # permission denied
0044 EX_CONFIG = 78 # configuration error
0045
0046 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
0047 """
0048 Simple HTTPS client authentication class based on provided
0049 key/ca information
0050 """
0051 def __init__(self, key=None, cert=None, level=0):
0052 if level > 1:
0053 urllib2.HTTPSHandler.__init__(self, debuglevel=1)
0054 else:
0055 urllib2.HTTPSHandler.__init__(self)
0056 self.key = key
0057 self.cert = cert
0058
0059 def https_open(self, req):
0060 """Open request method"""
0061 #Rather than pass in a reference to a connection class, we pass in
0062 # a reference to a function which, for all intents and purposes,
0063 # will behave as a constructor
0064 return self.do_open(self.get_connection, req)
0065
0066 def get_connection(self, host, timeout=300):
0067 """Connection method"""
0068 if self.key:
0069 return httplib.HTTPSConnection(host, key_file=self.key,
0070 cert_file=self.cert)
0071 return httplib.HTTPSConnection(host)
0072
0073 class DASOptionParser:
0074 """
0075 DAS cache client option parser
0076 """
0077 def __init__(self):
0078 usage = "Usage: %prog [options]\n"
0079 usage += "For more help please visit https://cmsweb.cern.ch/das/faq"
0080 self.parser = OptionParser(usage=usage)
0081 self.parser.add_option("-v", "--verbose", action="store",
0082 type="int", default=0, dest="verbose",
0083 help="verbose output")
0084 self.parser.add_option("--query", action="store", type="string",
0085 default=False, dest="query",
0086 help="specify query for your request")
0087 msg = "host name of DAS cache server, default is https://cmsweb.cern.ch"
0088 self.parser.add_option("--host", action="store", type="string",
0089 default='https://cmsweb.cern.ch', dest="host", help=msg)
0090 msg = "start index for returned result set, aka pagination,"
0091 msg += " use w/ limit (default is 0)"
0092 self.parser.add_option("--idx", action="store", type="int",
0093 default=0, dest="idx", help=msg)
0094 msg = "number of returned results (default is 10),"
0095 msg += " use --limit=0 to show all results"
0096 self.parser.add_option("--limit", action="store", type="int",
0097 default=10, dest="limit", help=msg)
0098 msg = 'specify return data format (json or plain), default plain.'
0099 self.parser.add_option("--format", action="store", type="string",
0100 default="plain", dest="format", help=msg)
0101 msg = 'query waiting threshold in sec, default is 5 minutes'
0102 self.parser.add_option("--threshold", action="store", type="int",
0103 default=300, dest="threshold", help=msg)
0104 msg = 'specify private key file name'
0105 self.parser.add_option("--key", action="store", type="string",
0106 default="", dest="ckey", help=msg)
0107 msg = 'specify private certificate file name'
0108 self.parser.add_option("--cert", action="store", type="string",
0109 default="", dest="cert", help=msg)
0110 msg = 'specify number of retries upon busy DAS server message'
0111 self.parser.add_option("--retry", action="store", type="string",
0112 default=0, dest="retry", help=msg)
0113 msg = 'show DAS headers in JSON format'
0114 msg += ' (obsolete, keep for backward compatibility)'
0115 self.parser.add_option("--das-headers", action="store_true",
0116 default=False, dest="das_headers", help=msg)
0117 msg = 'specify power base for size_format, default is 10 (can be 2)'
0118 self.parser.add_option("--base", action="store", type="int",
0119 default=0, dest="base", help=msg)
0120
0121 msg = 'a file which contains a cached json dictionary for query -> files mapping'
0122 self.parser.add_option("--cache", action="store", type="string",
0123 default=None, dest="cache", help=msg)
0124
0125 msg = 'List DAS key/attributes, use "all" or specific DAS key value, e.g. site'
0126 self.parser.add_option("--list-attributes", action="store", type="string",
0127 default="", dest="keys_attrs", help=msg)
0128 def get_opt(self):
0129 """
0130 Returns parse list of options
0131 """
0132 return self.parser.parse_args()
0133
0134 def convert_time(val):
0135 "Convert given timestamp into human readable format"
0136 if isinstance(val, int) or isinstance(val, float):
0137 return time.strftime('%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
0138 return val
0139
0140 def size_format(uinput, ibase=0):
0141 """
0142 Format file size utility, it converts file size into KB, MB, GB, TB, PB units
0143 """
0144 if not ibase:
0145 return uinput
0146 try:
0147 num = float(uinput)
0148 except Exception as _exc:
0149 return uinput
0150 if ibase == 2.: # power of 2
0151 base = 1024.
0152 xlist = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
0153 else: # default base is 10
0154 base = 1000.
0155 xlist = ['', 'KB', 'MB', 'GB', 'TB', 'PB']
0156 for xxx in xlist:
0157 if num < base:
0158 return "%3.1f%s" % (num, xxx)
0159 num /= base
0160
0161 def unique_filter(rows):
0162 """
0163 Unique filter drop duplicate rows.
0164 """
0165 old_row = {}
0166 row = None
0167 for row in rows:
0168 row_data = dict(row)
0169 try:
0170 del row_data['_id']
0171 del row_data['das']
0172 del row_data['das_id']
0173 del row_data['cache_id']
0174 except:
0175 pass
0176 old_data = dict(old_row)
0177 try:
0178 del old_data['_id']
0179 del old_data['das']
0180 del old_data['das_id']
0181 del old_data['cache_id']
0182 except:
0183 pass
0184 if row_data == old_data:
0185 continue
0186 if old_row:
0187 yield old_row
0188 old_row = row
0189 yield row
0190
0191 def get_value(data, filters, base=10):
0192 """Filter data from a row for given list of filters"""
0193 for ftr in filters:
0194 if ftr.find('>') != -1 or ftr.find('<') != -1 or ftr.find('=') != -1:
0195 continue
0196 row = dict(data)
0197 values = set()
0198 for key in ftr.split('.'):
0199 if isinstance(row, dict) and key in row:
0200 if key == 'creation_time':
0201 row = convert_time(row[key])
0202 elif key == 'size':
0203 row = size_format(row[key], base)
0204 else:
0205 row = row[key]
0206 if isinstance(row, list):
0207 for item in row:
0208 if isinstance(item, dict) and key in item:
0209 if key == 'creation_time':
0210 row = convert_time(item[key])
0211 elif key == 'size':
0212 row = size_format(item[key], base)
0213 else:
0214 row = item[key]
0215 values.add(row)
0216 else:
0217 if isinstance(item, basestring):
0218 values.add(item)
0219 if len(values) == 1:
0220 yield str(values.pop())
0221 else:
0222 yield str(list(values))
0223
0224 def fullpath(path):
0225 "Expand path to full path"
0226 if path and path[0] == '~':
0227 path = path.replace('~', '')
0228 path = path[1:] if path[0] == '/' else path
0229 path = os.path.join(os.environ['HOME'], path)
0230 return path
0231
0232 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
0233 cert=None, das_headers=True):
0234 """Contact DAS server and retrieve data for given DAS query"""
0235 params = {'input':query, 'idx':idx, 'limit':limit}
0236 path = '/das/cache'
0237 pat = re.compile('http[s]{0,1}://')
0238 if not pat.match(host):
0239 msg = 'Invalid hostname: %s' % host
0240 raise Exception(msg)
0241 url = host + path
0242 headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0243 encoded_data = urllib.urlencode(params, doseq=True)
0244 url += '?%s' % encoded_data
0245 req = urllib2.Request(url=url, headers=headers)
0246 if ckey and cert:
0247 ckey = fullpath(ckey)
0248 cert = fullpath(cert)
0249 http_hdlr = HTTPSClientAuthHandler(ckey, cert, debug)
0250 else:
0251 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
0252 proxy_handler = urllib2.ProxyHandler({})
0253 cookie_jar = cookielib.CookieJar()
0254 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0255 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0256 fdesc = opener.open(req)
0257 data = fdesc.read()
0258 fdesc.close()
0259
0260 pat = re.compile(r'^[a-z0-9]{32}')
0261 if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0262 pid = data
0263 else:
0264 pid = None
0265 iwtime = 2 # initial waiting time in seconds
0266 wtime = 20 # final waiting time in seconds
0267 sleep = iwtime
0268 time0 = time.time()
0269 while pid:
0270 params.update({'pid':data})
0271 encoded_data = urllib.urlencode(params, doseq=True)
0272 url = host + path + '?%s' % encoded_data
0273 req = urllib2.Request(url=url, headers=headers)
0274 try:
0275 fdesc = opener.open(req)
0276 data = fdesc.read()
0277 fdesc.close()
0278 except urllib2.HTTPError as err:
0279 return {"status":"fail", "reason":str(err)}
0280 if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0281 pid = data
0282 else:
0283 pid = None
0284 time.sleep(sleep)
0285 if sleep < wtime:
0286 sleep *= 2
0287 elif sleep == wtime:
0288 sleep = iwtime # start new cycle
0289 else:
0290 sleep = wtime
0291 if (time.time()-time0) > threshold:
0292 reason = "client timeout after %s sec" % int(time.time()-time0)
0293 return {"status":"fail", "reason":reason}
0294 jsondict = json.loads(data)
0295 return jsondict
0296
0297 def prim_value(row):
0298 """Extract primary key value from DAS record"""
0299 prim_key = row['das']['primary_key']
0300 if prim_key == 'summary':
0301 return row[prim_key]
0302 key, att = prim_key.split('.')
0303 if isinstance(row[key], list):
0304 for item in row[key]:
0305 if att in item:
0306 return item[att]
0307 else:
0308 return row[key][att]
0309
0310 def print_summary(rec):
0311 "Print summary record information on stdout"
0312 if 'summary' not in rec:
0313 msg = 'Summary information is not found in record:\n', rec
0314 raise Exception(msg)
0315 for row in rec['summary']:
0316 keys = [k for k in row.keys()]
0317 maxlen = max([len(k) for k in keys])
0318 for key, val in row.items():
0319 pkey = '%s%s' % (key, ' '*(maxlen-len(key)))
0320 print '%s: %s' % (pkey, val)
0321 print
0322
0323 def print_from_cache(cache, query):
0324 "print the list of files reading it from cache"
0325 data = open(cache).read()
0326 jsondict = json.loads(data)
0327 if query in jsondict:
0328 print "\n".join(jsondict[query])
0329 exit(0)
0330 exit(1)
0331
0332 def keys_attrs(lkey, oformat, host, ckey, cert, debug=0):
0333 "Contact host for list of key/attributes pairs"
0334 url = '%s/das/keys?view=json' % host
0335 headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0336 req = urllib2.Request(url=url, headers=headers)
0337 if ckey and cert:
0338 ckey = fullpath(ckey)
0339 cert = fullpath(cert)
0340 http_hdlr = HTTPSClientAuthHandler(ckey, cert, debug)
0341 else:
0342 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
0343 proxy_handler = urllib2.ProxyHandler({})
0344 cookie_jar = cookielib.CookieJar()
0345 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0346 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0347 fdesc = opener.open(req)
0348 data = json.load(fdesc)
0349 fdesc.close()
0350 if oformat.lower() == 'json':
0351 if lkey == 'all':
0352 print json.dumps(data)
0353 else:
0354 print json.dumps({lkey:data[lkey]})
0355 return
0356 for key, vdict in data.items():
0357 if lkey == 'all':
0358 pass
0359 elif lkey != key:
0360 continue
0361 print
0362 print "DAS key:", key
0363 for attr, examples in vdict.items():
0364 prefix = ' '
0365 print '%s%s' % (prefix, attr)
0366 for item in examples:
0367 print '%s%s%s' % (prefix, prefix, item)
0368
0369 def main():
0370 """Main function"""
0371 optmgr = DASOptionParser()
0372 opts, _ = optmgr.get_opt()
0373 host = opts.host
0374 debug = opts.verbose
0375 query = opts.query
0376 idx = opts.idx
0377 limit = opts.limit
0378 thr = opts.threshold
0379 ckey = opts.ckey
0380 cert = opts.cert
0381 base = opts.base
0382 if opts.keys_attrs:
0383 keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
0384 return
0385 if not query:
0386 print 'Input query is missing'
0387 sys.exit(EX_USAGE)
0388 if opts.format == 'plain':
0389 jsondict = get_data(host, query, idx, limit, debug, thr, ckey, cert)
0390 cli_msg = jsondict.get('client_message', None)
0391 if cli_msg:
0392 print "DAS CLIENT WARNING: %s" % cli_msg
0393 if 'status' not in jsondict and opts.cache:
0394 print_from_cache(opts.cache, query)
0395 if 'status' not in jsondict:
0396 print 'DAS record without status field:\n%s' % jsondict
0397 sys.exit(EX_PROTOCOL)
0398 if jsondict["status"] != 'ok' and opts.cache:
0399 print_from_cache(opts.cache, query)
0400 if jsondict['status'] != 'ok':
0401 print "status: %s, reason: %s" \
0402 % (jsondict.get('status'), jsondict.get('reason', 'N/A'))
0403 if opts.retry:
0404 found = False
0405 for attempt in xrange(1, int(opts.retry)):
0406 interval = log(attempt)**5
0407 print "Retry in %5.3f sec" % interval
0408 time.sleep(interval)
0409 data = get_data(host, query, idx, limit, debug, thr, ckey, cert)
0410 jsondict = json.loads(data)
0411 if jsondict.get('status', 'fail') == 'ok':
0412 found = True
0413 break
0414 else:
0415 sys.exit(EX_TEMPFAIL)
0416 if not found:
0417 sys.exit(EX_TEMPFAIL)
0418 nres = jsondict['nresults']
0419 if not limit:
0420 drange = '%s' % nres
0421 else:
0422 drange = '%s-%s out of %s' % (idx+1, idx+limit, nres)
0423 if opts.limit:
0424 msg = "\nShowing %s results" % drange
0425 msg += ", for more results use --idx/--limit options\n"
0426 print msg
0427 mongo_query = jsondict['mongo_query']
0428 unique = False
0429 fdict = mongo_query.get('filters', {})
0430 filters = fdict.get('grep', [])
0431 aggregators = mongo_query.get('aggregators', [])
0432 if 'unique' in fdict.keys():
0433 unique = True
0434 if filters and not aggregators:
0435 data = jsondict['data']
0436 if isinstance(data, dict):
0437 rows = [r for r in get_value(data, filters, base)]
0438 print ' '.join(rows)
0439 elif isinstance(data, list):
0440 if unique:
0441 data = unique_filter(data)
0442 for row in data:
0443 rows = [r for r in get_value(row, filters, base)]
0444 print ' '.join(rows)
0445 else:
0446 print(json.dumps(jsondict))
0447 elif aggregators:
0448 data = jsondict['data']
0449 if unique:
0450 data = unique_filter(data)
0451 for row in data:
0452 if row['key'].find('size') != -1 and \
0453 row['function'] == 'sum':
0454 val = size_format(row['result']['value'], base)
0455 else:
0456 val = row['result']['value']
0457 print '%s(%s)=%s' \
0458 % (row['function'], row['key'], val)
0459 else:
0460 data = jsondict['data']
0461 if isinstance(data, list):
0462 old = None
0463 val = None
0464 for row in data:
0465 prim_key = row.get('das', {}).get('primary_key', None)
0466 if prim_key == 'summary':
0467 print_summary(row)
0468 return
0469 val = prim_value(row)
0470 if not opts.limit:
0471 if val != old:
0472 print val
0473 old = val
0474 else:
0475 print val
0476 if val != old and not opts.limit:
0477 print val
0478 elif isinstance(data, dict):
0479 print prim_value(data)
0480 else:
0481 print data
0482 else:
0483 jsondict = get_data(\
0484 host, query, idx, limit, debug, thr, ckey, cert)
0485 print(json.dumps(jsondict))
0486
0487 #
0488 # main
0489 #
0490 if __name__ == '__main__':
0491 main()