File indexing completed on 2023-03-17 10:53:14
0001
0002
0003
0004
0005 """
0006 DAS command line tool
0007 """
0008 from __future__ import print_function
0009 __author__ = "Valentin Kuznetsov"
0010
0011
0012 import os
0013 import sys
0014 import pwd
0015 if sys.version_info < (2, 6):
0016 raise Exception("DAS requires python 2.6 or greater")
0017
0018 DAS_CLIENT = 'das-client/1.1::python/%s.%s' % sys.version_info[:2]
0019
0020 import os
0021 import re
0022 import ssl
0023 import time
0024 import json
0025 import urllib
0026 import urllib2
0027 import httplib
0028 import cookielib
0029 from optparse import OptionParser
0030 from math import log
0031 from types import GeneratorType
0032
0033
0034 EX_OK = 0
0035 EX__BASE = 64
0036 EX_USAGE = 64
0037 EX_DATAERR = 65
0038 EX_NOINPUT = 66
0039 EX_NOUSER = 67
0040 EX_NOHOST = 68
0041 EX_UNAVAILABLE = 69
0042 EX_SOFTWARE = 70
0043 EX_OSERR = 71
0044 EX_OSFILE = 72
0045 EX_CANTCREAT = 73
0046 EX_IOERR = 74
0047 EX_TEMPFAIL = 75
0048 EX_PROTOCOL = 76
0049 EX_NOPERM = 77
0050 EX_CONFIG = 78
0051
0052 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
0053 """
0054 Simple HTTPS client authentication class based on provided
0055 key/ca information
0056 """
0057 def __init__(self, key=None, cert=None, capath=None, level=0):
0058 if level > 1:
0059 urllib2.HTTPSHandler.__init__(self, debuglevel=1)
0060 else:
0061 urllib2.HTTPSHandler.__init__(self)
0062 self.key = key
0063 self.cert = cert
0064 self.capath = capath
0065
0066 def https_open(self, req):
0067 """Open request method"""
0068
0069
0070
0071 return self.do_open(self.get_connection, req)
0072
0073 def get_connection(self, host, timeout=300):
0074 """Connection method"""
0075 if self.key and self.cert and not self.capath:
0076 return httplib.HTTPSConnection(host, key_file=self.key,
0077 cert_file=self.cert)
0078 elif self.cert and self.capath:
0079 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
0080 context.load_verify_locations(capath=self.capath)
0081 context.load_cert_chain(self.cert)
0082 return httplib.HTTPSConnection(host, context=context)
0083 return httplib.HTTPSConnection(host)
0084
0085 def x509():
0086 "Helper function to get x509 either from env or tmp file"
0087 proxy = os.environ.get('X509_USER_PROXY', '')
0088 if not proxy:
0089 proxy = '/tmp/x509up_u%s' % pwd.getpwuid( os.getuid() ).pw_uid
0090 if not os.path.isfile(proxy):
0091 return ''
0092 return proxy
0093
0094 def check_glidein():
0095 "Check glideine environment and exit if it is set"
0096 glidein = os.environ.get('GLIDEIN_CMSSite', '')
0097 if glidein:
0098 msg = "ERROR: das_client is running from GLIDEIN environment, it is prohibited"
0099 print(msg)
0100 sys.exit(EX__BASE)
0101
0102 def check_auth(key):
0103 "Check if user runs das_client with key/cert and warn users to switch"
0104 if not key:
0105 msg = "WARNING: das_client is running without user credentials/X509 proxy, create proxy via 'voms-proxy-init -voms cms -rfc'"
0106 print(msg, file=sys.stderr)
0107
0108 class DASOptionParser:
0109 """
0110 DAS cache client option parser
0111 """
0112 def __init__(self):
0113 usage = "Usage: %prog [options]\n"
0114 usage += "For more help please visit https://cmsweb.cern.ch/das/faq"
0115 self.parser = OptionParser(usage=usage)
0116 self.parser.add_option("-v", "--verbose", action="store",
0117 type="int", default=0, dest="verbose",
0118 help="verbose output")
0119 self.parser.add_option("--query", action="store", type="string",
0120 default=False, dest="query",
0121 help="specify query for your request")
0122 msg = "host name of DAS cache server, default is https://cmsweb.cern.ch"
0123 self.parser.add_option("--host", action="store", type="string",
0124 default='https://cmsweb.cern.ch', dest="host", help=msg)
0125 msg = "start index for returned result set, aka pagination,"
0126 msg += " use w/ limit (default is 0)"
0127 self.parser.add_option("--idx", action="store", type="int",
0128 default=0, dest="idx", help=msg)
0129 msg = "number of returned results (default is 10),"
0130 msg += " use --limit=0 to show all results"
0131 self.parser.add_option("--limit", action="store", type="int",
0132 default=10, dest="limit", help=msg)
0133 msg = 'specify return data format (json or plain), default plain.'
0134 self.parser.add_option("--format", action="store", type="string",
0135 default="plain", dest="format", help=msg)
0136 msg = 'query waiting threshold in sec, default is 5 minutes'
0137 self.parser.add_option("--threshold", action="store", type="int",
0138 default=300, dest="threshold", help=msg)
0139 msg = 'specify private key file name, default $X509_USER_PROXY'
0140 self.parser.add_option("--key", action="store", type="string",
0141 default=x509(), dest="ckey", help=msg)
0142 msg = 'specify private certificate file name, default $X509_USER_PROXY'
0143 self.parser.add_option("--cert", action="store", type="string",
0144 default=x509(), dest="cert", help=msg)
0145 msg = 'specify CA path, default $X509_CERT_DIR'
0146 self.parser.add_option("--capath", action="store", type="string",
0147 default=os.environ.get("X509_CERT_DIR", ""),
0148 dest="capath", help=msg)
0149 msg = 'specify number of retries upon busy DAS server message'
0150 self.parser.add_option("--retry", action="store", type="string",
0151 default=0, dest="retry", help=msg)
0152 msg = 'show DAS headers in JSON format'
0153 msg += ' (obsolete, keep for backward compatibility)'
0154 self.parser.add_option("--das-headers", action="store_true",
0155 default=False, dest="das_headers", help=msg)
0156 msg = 'specify power base for size_format, default is 10 (can be 2)'
0157 self.parser.add_option("--base", action="store", type="int",
0158 default=0, dest="base", help=msg)
0159
0160 msg = 'a file which contains a cached json dictionary for query -> files mapping'
0161 self.parser.add_option("--cache", action="store", type="string",
0162 default=None, dest="cache", help=msg)
0163
0164 msg = 'a query cache value'
0165 self.parser.add_option("--query-cache", action="store", type="int",
0166 default=0, dest="qcache", help=msg)
0167 msg = 'List DAS key/attributes, use "all" or specific DAS key value, e.g. site'
0168 self.parser.add_option("--list-attributes", action="store", type="string",
0169 default="", dest="keys_attrs", help=msg)
0170 def get_opt(self):
0171 """
0172 Returns parse list of options
0173 """
0174 return self.parser.parse_args()
0175
0176 def convert_time(val):
0177 "Convert given timestamp into human readable format"
0178 if isinstance(val, int) or isinstance(val, float):
0179 return time.strftime('%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
0180 return val
0181
0182 def size_format(uinput, ibase=0):
0183 """
0184 Format file size utility, it converts file size into KB, MB, GB, TB, PB units
0185 """
0186 if not ibase:
0187 return uinput
0188 try:
0189 num = float(uinput)
0190 except Exception as _exc:
0191 return uinput
0192 if ibase == 2.:
0193 base = 1024.
0194 xlist = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
0195 else:
0196 base = 1000.
0197 xlist = ['', 'KB', 'MB', 'GB', 'TB', 'PB']
0198 for xxx in xlist:
0199 if num < base:
0200 return "%3.1f%s" % (num, xxx)
0201 num /= base
0202
0203 def unique_filter(rows):
0204 """
0205 Unique filter drop duplicate rows.
0206 """
0207 old_row = {}
0208 row = None
0209 for row in rows:
0210 row_data = dict(row)
0211 try:
0212 del row_data['_id']
0213 del row_data['das']
0214 del row_data['das_id']
0215 del row_data['cache_id']
0216 except:
0217 pass
0218 old_data = dict(old_row)
0219 try:
0220 del old_data['_id']
0221 del old_data['das']
0222 del old_data['das_id']
0223 del old_data['cache_id']
0224 except:
0225 pass
0226 if row_data == old_data:
0227 continue
0228 if old_row:
0229 yield old_row
0230 old_row = row
0231 yield row
0232
0233 def extract_value(row, key, base=10):
0234 """Generator which extracts row[key] value"""
0235 if isinstance(row, dict) and key in row:
0236 if key == 'creation_time':
0237 row = convert_time(row[key])
0238 elif key == 'size':
0239 row = size_format(row[key], base)
0240 else:
0241 row = row[key]
0242 yield row
0243 if isinstance(row, list) or isinstance(row, GeneratorType):
0244 for item in row:
0245 for vvv in extract_value(item, key, base):
0246 yield vvv
0247
0248 def get_value(data, filters, base=10):
0249 """Filter data from a row for given list of filters"""
0250 for ftr in filters:
0251 if ftr.find('>') != -1 or ftr.find('<') != -1 or ftr.find('=') != -1:
0252 continue
0253 row = dict(data)
0254 values = []
0255 keys = ftr.split('.')
0256 for key in keys:
0257 val = [v for v in extract_value(row, key, base)]
0258 if key == keys[-1]:
0259 values += [json.dumps(i) for i in val]
0260 else:
0261 row = val
0262 if len(values) == 1:
0263 yield values[0]
0264 else:
0265 yield values
0266
0267 def fullpath(path):
0268 "Expand path to full path"
0269 if path and path[0] == '~':
0270 path = path.replace('~', '')
0271 path = path[1:] if path[0] == '/' else path
0272 path = os.path.join(os.environ['HOME'], path)
0273 return path
0274
0275 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
0276 cert=None, capath=None, qcache=0, das_headers=True):
0277 """Contact DAS server and retrieve data for given DAS query"""
0278 params = {'input':query, 'idx':idx, 'limit':limit}
0279 if qcache:
0280 params['qcache'] = qcache
0281 path = '/das/cache'
0282 pat = re.compile('http[s]{0,1}://')
0283 if not pat.match(host):
0284 msg = 'Invalid hostname: %s' % host
0285 raise Exception(msg)
0286 url = host + path
0287 client = '%s (%s)' % (DAS_CLIENT, os.environ.get('USER', ''))
0288 headers = {"Accept": "application/json", "User-Agent": client}
0289 encoded_data = urllib.urlencode(params, doseq=True)
0290 url += '?%s' % encoded_data
0291 req = urllib2.Request(url=url, headers=headers)
0292 if ckey and cert:
0293 ckey = fullpath(ckey)
0294 cert = fullpath(cert)
0295 http_hdlr = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0296 elif cert and capath:
0297 cert = fullpath(cert)
0298 http_hdlr = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0299 else:
0300 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
0301 proxy_handler = urllib2.ProxyHandler({})
0302 cookie_jar = cookielib.CookieJar()
0303 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0304 try:
0305 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0306 fdesc = opener.open(req)
0307 data = fdesc.read()
0308 fdesc.close()
0309 except urllib2.HTTPError as error:
0310 print(error.read())
0311 sys.exit(1)
0312
0313 pat = re.compile(r'^[a-z0-9]{32}')
0314 if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0315 pid = data
0316 else:
0317 pid = None
0318 iwtime = 2
0319 wtime = 20
0320 sleep = iwtime
0321 time0 = time.time()
0322 while pid:
0323 params.update({'pid':data})
0324 encoded_data = urllib.urlencode(params, doseq=True)
0325 url = host + path + '?%s' % encoded_data
0326 req = urllib2.Request(url=url, headers=headers)
0327 try:
0328 fdesc = opener.open(req)
0329 data = fdesc.read()
0330 fdesc.close()
0331 except urllib2.HTTPError as err:
0332 return {"status":"fail", "reason":str(err)}
0333 if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0334 pid = data
0335 else:
0336 pid = None
0337 time.sleep(sleep)
0338 if sleep < wtime:
0339 sleep *= 2
0340 elif sleep == wtime:
0341 sleep = iwtime
0342 else:
0343 sleep = wtime
0344 if (time.time()-time0) > threshold:
0345 reason = "client timeout after %s sec" % int(time.time()-time0)
0346 return {"status":"fail", "reason":reason}
0347 jsondict = json.loads(data)
0348 return jsondict
0349
0350 def prim_value(row):
0351 """Extract primary key value from DAS record"""
0352 prim_key = row['das']['primary_key']
0353 if prim_key == 'summary':
0354 return row.get(prim_key, None)
0355 key, att = prim_key.split('.')
0356 if isinstance(row[key], list):
0357 for item in row[key]:
0358 if att in item:
0359 return item[att]
0360 else:
0361 if key in row:
0362 if att in row[key]:
0363 return row[key][att]
0364
0365 def print_summary(rec):
0366 "Print summary record information on stdout"
0367 if 'summary' not in rec:
0368 msg = 'Summary information is not found in record:\n', rec
0369 raise Exception(msg)
0370 for row in rec['summary']:
0371 keys = [k for k in row.keys()]
0372 maxlen = max([len(k) for k in keys])
0373 for key, val in row.items():
0374 pkey = '%s%s' % (key, ' '*(maxlen-len(key)))
0375 print('%s: %s' % (pkey, val))
0376 print()
0377
0378 def print_from_cache(cache, query):
0379 "print the list of files reading it from cache"
0380 data = open(cache).read()
0381 jsondict = json.loads(data)
0382 if query in jsondict:
0383 print("\n".join(jsondict[query]))
0384 exit(0)
0385 exit(1)
0386
0387 def keys_attrs(lkey, oformat, host, ckey, cert, debug=0):
0388 "Contact host for list of key/attributes pairs"
0389 url = '%s/das/keys?view=json' % host
0390 headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0391 req = urllib2.Request(url=url, headers=headers)
0392 if ckey and cert:
0393 ckey = fullpath(ckey)
0394 cert = fullpath(cert)
0395 http_hdlr = HTTPSClientAuthHandler(ckey, cert, debug)
0396 else:
0397 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
0398 proxy_handler = urllib2.ProxyHandler({})
0399 cookie_jar = cookielib.CookieJar()
0400 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0401 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0402 fdesc = opener.open(req)
0403 data = json.load(fdesc)
0404 fdesc.close()
0405 if oformat.lower() == 'json':
0406 if lkey == 'all':
0407 print(json.dumps(data))
0408 else:
0409 print(json.dumps({lkey:data[lkey]}))
0410 return
0411 for key, vdict in data.items():
0412 if lkey == 'all':
0413 pass
0414 elif lkey != key:
0415 continue
0416 print()
0417 print("DAS key:", key)
0418 for attr, examples in vdict.items():
0419 prefix = ' '
0420 print('%s%s' % (prefix, attr))
0421 for item in examples:
0422 print('%s%s%s' % (prefix, prefix, item))
0423
0424 def main():
0425 """Main function"""
0426 optmgr = DASOptionParser()
0427 opts, _ = optmgr.get_opt()
0428 host = opts.host
0429 debug = opts.verbose
0430 query = opts.query
0431 idx = opts.idx
0432 limit = opts.limit
0433 thr = opts.threshold
0434 ckey = opts.ckey
0435 cert = opts.cert
0436 capath = opts.capath
0437 base = opts.base
0438 qcache = opts.qcache
0439 check_glidein()
0440 check_auth(ckey)
0441 if opts.keys_attrs:
0442 keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
0443 return
0444 if not query:
0445 print('Input query is missing')
0446 sys.exit(EX_USAGE)
0447 if opts.format == 'plain':
0448 jsondict = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0449 cli_msg = jsondict.get('client_message', None)
0450 if cli_msg:
0451 print("DAS CLIENT WARNING: %s" % cli_msg)
0452 if 'status' not in jsondict and opts.cache:
0453 print_from_cache(opts.cache, query)
0454 if 'status' not in jsondict:
0455 print('DAS record without status field:\n%s' % jsondict)
0456 sys.exit(EX_PROTOCOL)
0457 if jsondict["status"] != 'ok' and opts.cache:
0458 print_from_cache(opts.cache, query)
0459 if jsondict['status'] != 'ok':
0460 print("status: %s, reason: %s" \
0461 % (jsondict.get('status'), jsondict.get('reason', 'N/A')))
0462 if opts.retry:
0463 found = False
0464 for attempt in xrange(1, int(opts.retry)):
0465 interval = log(attempt)**5
0466 print("Retry in %5.3f sec" % interval)
0467 time.sleep(interval)
0468 data = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0469 jsondict = json.loads(data)
0470 if jsondict.get('status', 'fail') == 'ok':
0471 found = True
0472 break
0473 else:
0474 sys.exit(EX_TEMPFAIL)
0475 if not found:
0476 sys.exit(EX_TEMPFAIL)
0477 nres = jsondict.get('nresults', 0)
0478 if not limit:
0479 drange = '%s' % nres
0480 else:
0481 drange = '%s-%s out of %s' % (idx+1, idx+limit, nres)
0482 if opts.limit:
0483 msg = "\nShowing %s results" % drange
0484 msg += ", for more results use --idx/--limit options\n"
0485 print(msg)
0486 mongo_query = jsondict.get('mongo_query', {})
0487 unique = False
0488 fdict = mongo_query.get('filters', {})
0489 filters = fdict.get('grep', [])
0490 aggregators = mongo_query.get('aggregators', [])
0491 if 'unique' in fdict.keys():
0492 unique = True
0493 if filters and not aggregators:
0494 data = jsondict['data']
0495 if isinstance(data, dict):
0496 rows = [r for r in get_value(data, filters, base)]
0497 print(' '.join(rows))
0498 elif isinstance(data, list):
0499 if unique:
0500 data = unique_filter(data)
0501 for row in data:
0502 rows = [r for r in get_value(row, filters, base)]
0503 types = [type(r) for r in rows]
0504 if len(types)>1:
0505 print(' '.join([str(r) for r in rows]))
0506 elif isinstance(rows[0], list):
0507 out = set()
0508 for item in rows:
0509 for elem in item:
0510 out.add(elem)
0511 print(' '.join(out))
0512 else:
0513 print(' '.join(rows))
0514 else:
0515 print(json.dumps(jsondict))
0516 elif aggregators:
0517 data = jsondict['data']
0518 if unique:
0519 data = unique_filter(data)
0520 for row in data:
0521 if row['key'].find('size') != -1 and \
0522 row['function'] == 'sum':
0523 val = size_format(row['result']['value'], base)
0524 else:
0525 val = row['result']['value']
0526 print('%s(%s)=%s' \
0527 % (row['function'], row['key'], val))
0528 else:
0529 data = jsondict['data']
0530 if isinstance(data, list):
0531 old = None
0532 val = None
0533 for row in data:
0534 prim_key = row.get('das', {}).get('primary_key', None)
0535 if prim_key == 'summary':
0536 print_summary(row)
0537 return
0538 val = prim_value(row)
0539 if not opts.limit:
0540 if val != old:
0541 print(val)
0542 old = val
0543 else:
0544 print(val)
0545 if val != old and not opts.limit:
0546 print(val)
0547 elif isinstance(data, dict):
0548 print(prim_value(data))
0549 else:
0550 print(data)
0551 else:
0552 jsondict = get_data(\
0553 host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0554 print(json.dumps(jsondict))
0555
0556
0557
0558
0559 if __name__ == '__main__':
0560 main()