File indexing completed on 2024-11-26 02:34:06
0001
0002
0003
0004
0005 """
0006 DAS command line tool
0007 """
0008 __author__ = "Valentin Kuznetsov"
0009
0010
0011 import os
0012 import sys
0013 import pwd
0014 if sys.version_info < (2, 6):
0015 raise Exception("DAS requires python 2.6 or greater")
0016
0017 DAS_CLIENT = 'das-client/1.1::python/%s.%s' % sys.version_info[:2]
0018
0019 import os
0020 import re
0021 import ssl
0022 import time
0023 import json
0024 import urllib
0025 import urllib2
0026 import httplib
0027 import cookielib
0028 from optparse import OptionParser
0029 from math import log
0030 from types import GeneratorType
0031
0032
0033 EX_OK = 0
0034 EX__BASE = 64
0035 EX_USAGE = 64
0036 EX_DATAERR = 65
0037 EX_NOINPUT = 66
0038 EX_NOUSER = 67
0039 EX_NOHOST = 68
0040 EX_UNAVAILABLE = 69
0041 EX_SOFTWARE = 70
0042 EX_OSERR = 71
0043 EX_OSFILE = 72
0044 EX_CANTCREAT = 73
0045 EX_IOERR = 74
0046 EX_TEMPFAIL = 75
0047 EX_PROTOCOL = 76
0048 EX_NOPERM = 77
0049 EX_CONFIG = 78
0050
0051 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
0052 """
0053 Simple HTTPS client authentication class based on provided
0054 key/ca information
0055 """
0056 def __init__(self, key=None, cert=None, capath=None, level=0):
0057 if level > 1:
0058 urllib2.HTTPSHandler.__init__(self, debuglevel=1)
0059 else:
0060 urllib2.HTTPSHandler.__init__(self)
0061 self.key = key
0062 self.cert = cert
0063 self.capath = capath
0064
0065 def https_open(self, req):
0066 """Open request method"""
0067
0068
0069
0070 return self.do_open(self.get_connection, req)
0071
0072 def get_connection(self, host, timeout=300):
0073 """Connection method"""
0074 if self.key and self.cert and not self.capath:
0075 return httplib.HTTPSConnection(host, key_file=self.key,
0076 cert_file=self.cert)
0077 elif self.cert and self.capath:
0078 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
0079 context.load_verify_locations(capath=self.capath)
0080 context.load_cert_chain(self.cert)
0081 return httplib.HTTPSConnection(host, context=context)
0082 return httplib.HTTPSConnection(host)
0083
0084 def x509():
0085 "Helper function to get x509 either from env or tmp file"
0086 proxy = os.environ.get('X509_USER_PROXY', '')
0087 if not proxy:
0088 proxy = '/tmp/x509up_u%s' % pwd.getpwuid( os.getuid() ).pw_uid
0089 if not os.path.isfile(proxy):
0090 return ''
0091 return proxy
0092
0093 def check_glidein():
0094 "Check glideine environment and exit if it is set"
0095 glidein = os.environ.get('GLIDEIN_CMSSite', '')
0096 if glidein:
0097 msg = "ERROR: das_client is running from GLIDEIN environment, it is prohibited"
0098 print(msg)
0099 sys.exit(EX__BASE)
0100
0101 def check_auth(key):
0102 "Check if user runs das_client with key/cert and warn users to switch"
0103 if not key:
0104 msg = "WARNING: das_client is running without user credentials/X509 proxy, create proxy via 'voms-proxy-init -voms cms -rfc'"
0105 print(msg, file=sys.stderr)
0106
0107 class DASOptionParser:
0108 """
0109 DAS cache client option parser
0110 """
0111 def __init__(self):
0112 usage = "Usage: %prog [options]\n"
0113 usage += "For more help please visit https://cmsweb.cern.ch/das/faq"
0114 self.parser = OptionParser(usage=usage)
0115 self.parser.add_option("-v", "--verbose", action="store",
0116 type="int", default=0, dest="verbose",
0117 help="verbose output")
0118 self.parser.add_option("--query", action="store", type="string",
0119 default=False, dest="query",
0120 help="specify query for your request")
0121 msg = "host name of DAS cache server, default is https://cmsweb.cern.ch"
0122 self.parser.add_option("--host", action="store", type="string",
0123 default='https://cmsweb.cern.ch', dest="host", help=msg)
0124 msg = "start index for returned result set, aka pagination,"
0125 msg += " use w/ limit (default is 0)"
0126 self.parser.add_option("--idx", action="store", type="int",
0127 default=0, dest="idx", help=msg)
0128 msg = "number of returned results (default is 10),"
0129 msg += " use --limit=0 to show all results"
0130 self.parser.add_option("--limit", action="store", type="int",
0131 default=10, dest="limit", help=msg)
0132 msg = 'specify return data format (json or plain), default plain.'
0133 self.parser.add_option("--format", action="store", type="string",
0134 default="plain", dest="format", help=msg)
0135 msg = 'query waiting threshold in sec, default is 5 minutes'
0136 self.parser.add_option("--threshold", action="store", type="int",
0137 default=300, dest="threshold", help=msg)
0138 msg = 'specify private key file name, default $X509_USER_PROXY'
0139 self.parser.add_option("--key", action="store", type="string",
0140 default=x509(), dest="ckey", help=msg)
0141 msg = 'specify private certificate file name, default $X509_USER_PROXY'
0142 self.parser.add_option("--cert", action="store", type="string",
0143 default=x509(), dest="cert", help=msg)
0144 msg = 'specify CA path, default $X509_CERT_DIR'
0145 self.parser.add_option("--capath", action="store", type="string",
0146 default=os.environ.get("X509_CERT_DIR", ""),
0147 dest="capath", help=msg)
0148 msg = 'specify number of retries upon busy DAS server message'
0149 self.parser.add_option("--retry", action="store", type="string",
0150 default=0, dest="retry", help=msg)
0151 msg = 'show DAS headers in JSON format'
0152 msg += ' (obsolete, keep for backward compatibility)'
0153 self.parser.add_option("--das-headers", action="store_true",
0154 default=False, dest="das_headers", help=msg)
0155 msg = 'specify power base for size_format, default is 10 (can be 2)'
0156 self.parser.add_option("--base", action="store", type="int",
0157 default=0, dest="base", help=msg)
0158
0159 msg = 'a file which contains a cached json dictionary for query -> files mapping'
0160 self.parser.add_option("--cache", action="store", type="string",
0161 default=None, dest="cache", help=msg)
0162
0163 msg = 'a query cache value'
0164 self.parser.add_option("--query-cache", action="store", type="int",
0165 default=0, dest="qcache", help=msg)
0166 msg = 'List DAS key/attributes, use "all" or specific DAS key value, e.g. site'
0167 self.parser.add_option("--list-attributes", action="store", type="string",
0168 default="", dest="keys_attrs", help=msg)
0169 def get_opt(self):
0170 """
0171 Returns parse list of options
0172 """
0173 return self.parser.parse_args()
0174
0175 def convert_time(val):
0176 "Convert given timestamp into human readable format"
0177 if isinstance(val, int) or isinstance(val, float):
0178 return time.strftime('%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
0179 return val
0180
0181 def size_format(uinput, ibase=0):
0182 """
0183 Format file size utility, it converts file size into KB, MB, GB, TB, PB units
0184 """
0185 if not ibase:
0186 return uinput
0187 try:
0188 num = float(uinput)
0189 except Exception as _exc:
0190 return uinput
0191 if ibase == 2.:
0192 base = 1024.
0193 xlist = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
0194 else:
0195 base = 1000.
0196 xlist = ['', 'KB', 'MB', 'GB', 'TB', 'PB']
0197 for xxx in xlist:
0198 if num < base:
0199 return "%3.1f%s" % (num, xxx)
0200 num /= base
0201
0202 def unique_filter(rows):
0203 """
0204 Unique filter drop duplicate rows.
0205 """
0206 old_row = {}
0207 row = None
0208 for row in rows:
0209 row_data = dict(row)
0210 try:
0211 del row_data['_id']
0212 del row_data['das']
0213 del row_data['das_id']
0214 del row_data['cache_id']
0215 except:
0216 pass
0217 old_data = dict(old_row)
0218 try:
0219 del old_data['_id']
0220 del old_data['das']
0221 del old_data['das_id']
0222 del old_data['cache_id']
0223 except:
0224 pass
0225 if row_data == old_data:
0226 continue
0227 if old_row:
0228 yield old_row
0229 old_row = row
0230 yield row
0231
0232 def extract_value(row, key, base=10):
0233 """Generator which extracts row[key] value"""
0234 if isinstance(row, dict) and key in row:
0235 if key == 'creation_time':
0236 row = convert_time(row[key])
0237 elif key == 'size':
0238 row = size_format(row[key], base)
0239 else:
0240 row = row[key]
0241 yield row
0242 if isinstance(row, list) or isinstance(row, GeneratorType):
0243 for item in row:
0244 for vvv in extract_value(item, key, base):
0245 yield vvv
0246
0247 def get_value(data, filters, base=10):
0248 """Filter data from a row for given list of filters"""
0249 for ftr in filters:
0250 if ftr.find('>') != -1 or ftr.find('<') != -1 or ftr.find('=') != -1:
0251 continue
0252 row = dict(data)
0253 values = []
0254 keys = ftr.split('.')
0255 for key in keys:
0256 val = [v for v in extract_value(row, key, base)]
0257 if key == keys[-1]:
0258 values += [json.dumps(i) for i in val]
0259 else:
0260 row = val
0261 if len(values) == 1:
0262 yield values[0]
0263 else:
0264 yield values
0265
0266 def fullpath(path):
0267 "Expand path to full path"
0268 if path and path[0] == '~':
0269 path = path.replace('~', '')
0270 path = path[1:] if path[0] == '/' else path
0271 path = os.path.join(os.environ['HOME'], path)
0272 return path
0273
0274 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
0275 cert=None, capath=None, qcache=0, das_headers=True):
0276 """Contact DAS server and retrieve data for given DAS query"""
0277 params = {'input':query, 'idx':idx, 'limit':limit}
0278 if qcache:
0279 params['qcache'] = qcache
0280 path = '/das/cache'
0281 pat = re.compile('http[s]{0,1}://')
0282 if not pat.match(host):
0283 msg = 'Invalid hostname: %s' % host
0284 raise Exception(msg)
0285 url = host + path
0286 client = '%s (%s)' % (DAS_CLIENT, os.environ.get('USER', ''))
0287 headers = {"Accept": "application/json", "User-Agent": client}
0288 encoded_data = urllib.urlencode(params, doseq=True)
0289 url += '?%s' % encoded_data
0290 req = urllib2.Request(url=url, headers=headers)
0291 if ckey and cert:
0292 ckey = fullpath(ckey)
0293 cert = fullpath(cert)
0294 http_hdlr = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0295 elif cert and capath:
0296 cert = fullpath(cert)
0297 http_hdlr = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0298 else:
0299 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
0300 proxy_handler = urllib2.ProxyHandler({})
0301 cookie_jar = cookielib.CookieJar()
0302 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0303 try:
0304 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0305 fdesc = opener.open(req)
0306 data = fdesc.read()
0307 fdesc.close()
0308 except urllib2.HTTPError as error:
0309 print(error.read())
0310 sys.exit(1)
0311
0312 pat = re.compile(r'^[a-z0-9]{32}')
0313 if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0314 pid = data
0315 else:
0316 pid = None
0317 iwtime = 2
0318 wtime = 20
0319 sleep = iwtime
0320 time0 = time.time()
0321 while pid:
0322 params.update({'pid':data})
0323 encoded_data = urllib.urlencode(params, doseq=True)
0324 url = host + path + '?%s' % encoded_data
0325 req = urllib2.Request(url=url, headers=headers)
0326 try:
0327 fdesc = opener.open(req)
0328 data = fdesc.read()
0329 fdesc.close()
0330 except urllib2.HTTPError as err:
0331 return {"status":"fail", "reason":str(err)}
0332 if data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0333 pid = data
0334 else:
0335 pid = None
0336 time.sleep(sleep)
0337 if sleep < wtime:
0338 sleep *= 2
0339 elif sleep == wtime:
0340 sleep = iwtime
0341 else:
0342 sleep = wtime
0343 if (time.time()-time0) > threshold:
0344 reason = "client timeout after %s sec" % int(time.time()-time0)
0345 return {"status":"fail", "reason":reason}
0346 jsondict = json.loads(data)
0347 return jsondict
0348
0349 def prim_value(row):
0350 """Extract primary key value from DAS record"""
0351 prim_key = row['das']['primary_key']
0352 if prim_key == 'summary':
0353 return row.get(prim_key, None)
0354 key, att = prim_key.split('.')
0355 if isinstance(row[key], list):
0356 for item in row[key]:
0357 if att in item:
0358 return item[att]
0359 else:
0360 if key in row:
0361 if att in row[key]:
0362 return row[key][att]
0363
0364 def print_summary(rec):
0365 "Print summary record information on stdout"
0366 if 'summary' not in rec:
0367 msg = 'Summary information is not found in record:\n', rec
0368 raise Exception(msg)
0369 for row in rec['summary']:
0370 keys = [k for k in row.keys()]
0371 maxlen = max([len(k) for k in keys])
0372 for key, val in row.items():
0373 pkey = '%s%s' % (key, ' '*(maxlen-len(key)))
0374 print('%s: %s' % (pkey, val))
0375 print()
0376
0377 def print_from_cache(cache, query):
0378 "print the list of files reading it from cache"
0379 data = open(cache).read()
0380 jsondict = json.loads(data)
0381 if query in jsondict:
0382 print("\n".join(jsondict[query]))
0383 exit(0)
0384 exit(1)
0385
0386 def keys_attrs(lkey, oformat, host, ckey, cert, debug=0):
0387 "Contact host for list of key/attributes pairs"
0388 url = '%s/das/keys?view=json' % host
0389 headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0390 req = urllib2.Request(url=url, headers=headers)
0391 if ckey and cert:
0392 ckey = fullpath(ckey)
0393 cert = fullpath(cert)
0394 http_hdlr = HTTPSClientAuthHandler(ckey, cert, debug)
0395 else:
0396 http_hdlr = urllib2.HTTPHandler(debuglevel=debug)
0397 proxy_handler = urllib2.ProxyHandler({})
0398 cookie_jar = cookielib.CookieJar()
0399 cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0400 opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0401 fdesc = opener.open(req)
0402 data = json.load(fdesc)
0403 fdesc.close()
0404 if oformat.lower() == 'json':
0405 if lkey == 'all':
0406 print(json.dumps(data))
0407 else:
0408 print(json.dumps({lkey:data[lkey]}))
0409 return
0410 for key, vdict in data.items():
0411 if lkey == 'all':
0412 pass
0413 elif lkey != key:
0414 continue
0415 print()
0416 print("DAS key:", key)
0417 for attr, examples in vdict.items():
0418 prefix = ' '
0419 print('%s%s' % (prefix, attr))
0420 for item in examples:
0421 print('%s%s%s' % (prefix, prefix, item))
0422
0423 def main():
0424 """Main function"""
0425 optmgr = DASOptionParser()
0426 opts, _ = optmgr.get_opt()
0427 host = opts.host
0428 debug = opts.verbose
0429 query = opts.query
0430 idx = opts.idx
0431 limit = opts.limit
0432 thr = opts.threshold
0433 ckey = opts.ckey
0434 cert = opts.cert
0435 capath = opts.capath
0436 base = opts.base
0437 qcache = opts.qcache
0438 check_glidein()
0439 check_auth(ckey)
0440 if opts.keys_attrs:
0441 keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
0442 return
0443 if not query:
0444 print('Input query is missing')
0445 sys.exit(EX_USAGE)
0446 if opts.format == 'plain':
0447 jsondict = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0448 cli_msg = jsondict.get('client_message', None)
0449 if cli_msg:
0450 print("DAS CLIENT WARNING: %s" % cli_msg)
0451 if 'status' not in jsondict and opts.cache:
0452 print_from_cache(opts.cache, query)
0453 if 'status' not in jsondict:
0454 print('DAS record without status field:\n%s' % jsondict)
0455 sys.exit(EX_PROTOCOL)
0456 if jsondict["status"] != 'ok' and opts.cache:
0457 print_from_cache(opts.cache, query)
0458 if jsondict['status'] != 'ok':
0459 print("status: %s, reason: %s" \
0460 % (jsondict.get('status'), jsondict.get('reason', 'N/A')))
0461 if opts.retry:
0462 found = False
0463 for attempt in xrange(1, int(opts.retry)):
0464 interval = log(attempt)**5
0465 print("Retry in %5.3f sec" % interval)
0466 time.sleep(interval)
0467 data = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0468 jsondict = json.loads(data)
0469 if jsondict.get('status', 'fail') == 'ok':
0470 found = True
0471 break
0472 else:
0473 sys.exit(EX_TEMPFAIL)
0474 if not found:
0475 sys.exit(EX_TEMPFAIL)
0476 nres = jsondict.get('nresults', 0)
0477 if not limit:
0478 drange = '%s' % nres
0479 else:
0480 drange = '%s-%s out of %s' % (idx+1, idx+limit, nres)
0481 if opts.limit:
0482 msg = "\nShowing %s results" % drange
0483 msg += ", for more results use --idx/--limit options\n"
0484 print(msg)
0485 mongo_query = jsondict.get('mongo_query', {})
0486 unique = False
0487 fdict = mongo_query.get('filters', {})
0488 filters = fdict.get('grep', [])
0489 aggregators = mongo_query.get('aggregators', [])
0490 if 'unique' in fdict.keys():
0491 unique = True
0492 if filters and not aggregators:
0493 data = jsondict['data']
0494 if isinstance(data, dict):
0495 rows = [r for r in get_value(data, filters, base)]
0496 print(' '.join(rows))
0497 elif isinstance(data, list):
0498 if unique:
0499 data = unique_filter(data)
0500 for row in data:
0501 rows = [r for r in get_value(row, filters, base)]
0502 types = [type(r) for r in rows]
0503 if len(types)>1:
0504 print(' '.join([str(r) for r in rows]))
0505 elif isinstance(rows[0], list):
0506 out = set()
0507 for item in rows:
0508 for elem in item:
0509 out.add(elem)
0510 print(' '.join(out))
0511 else:
0512 print(' '.join(rows))
0513 else:
0514 print(json.dumps(jsondict))
0515 elif aggregators:
0516 data = jsondict['data']
0517 if unique:
0518 data = unique_filter(data)
0519 for row in data:
0520 if row['key'].find('size') != -1 and \
0521 row['function'] == 'sum':
0522 val = size_format(row['result']['value'], base)
0523 else:
0524 val = row['result']['value']
0525 print('%s(%s)=%s' \
0526 % (row['function'], row['key'], val))
0527 else:
0528 data = jsondict['data']
0529 if isinstance(data, list):
0530 old = None
0531 val = None
0532 for row in data:
0533 prim_key = row.get('das', {}).get('primary_key', None)
0534 if prim_key == 'summary':
0535 print_summary(row)
0536 return
0537 val = prim_value(row)
0538 if not opts.limit:
0539 if val != old:
0540 print(val)
0541 old = val
0542 else:
0543 print(val)
0544 if val != old and not opts.limit:
0545 print(val)
0546 elif isinstance(data, dict):
0547 print(prim_value(data))
0548 else:
0549 print(data)
0550 else:
0551 jsondict = get_data(\
0552 host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0553 print(json.dumps(jsondict))
0554
0555
0556
0557
0558 if __name__ == '__main__':
0559 main()