Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-26 02:34:06

0001 #!/usr/bin/env python
0002 #-*- coding: utf-8 -*-
0003 #pylint: disable=C0301,C0103,R0914,R0903
0004 
0005 """
0006 DAS command line tool
0007 """
0008 __author__ = "Valentin Kuznetsov"
0009 
0010 # system modules
0011 import os
0012 import sys
0013 import pwd
0014 if  sys.version_info < (2, 6):
0015     raise Exception("DAS requires python 2.6 or greater")
0016 
0017 DAS_CLIENT = 'das-client/1.1::python/%s.%s' % sys.version_info[:2]
0018 
0019 import os
0020 import re
0021 import ssl
0022 import time
0023 import json
0024 import urllib
0025 import urllib2
0026 import httplib
0027 import cookielib
0028 from   optparse import OptionParser
0029 from   math import log
0030 from   types import GeneratorType
0031 
0032 # define exit codes according to Linux sysexists.h
0033 EX_OK           = 0  # successful termination
0034 EX__BASE        = 64 # base value for error messages
0035 EX_USAGE        = 64 # command line usage error
0036 EX_DATAERR      = 65 # data format error
0037 EX_NOINPUT      = 66 # cannot open input
0038 EX_NOUSER       = 67 # addressee unknown
0039 EX_NOHOST       = 68 # host name unknown
0040 EX_UNAVAILABLE  = 69 # service unavailable
0041 EX_SOFTWARE     = 70 # internal software error
0042 EX_OSERR        = 71 # system error (e.g., can't fork)
0043 EX_OSFILE       = 72 # critical OS file missing
0044 EX_CANTCREAT    = 73 # can't create (user) output file
0045 EX_IOERR        = 74 # input/output error
0046 EX_TEMPFAIL     = 75 # temp failure; user is invited to retry
0047 EX_PROTOCOL     = 76 # remote error in protocol
0048 EX_NOPERM       = 77 # permission denied
0049 EX_CONFIG       = 78 # configuration error
0050 
0051 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
0052     """
0053     Simple HTTPS client authentication class based on provided
0054     key/ca information
0055     """
0056     def __init__(self, key=None, cert=None, capath=None, level=0):
0057         if  level > 1:
0058             urllib2.HTTPSHandler.__init__(self, debuglevel=1)
0059         else:
0060             urllib2.HTTPSHandler.__init__(self)
0061         self.key = key
0062         self.cert = cert
0063         self.capath = capath
0064 
0065     def https_open(self, req):
0066         """Open request method"""
0067         #Rather than pass in a reference to a connection class, we pass in
0068         # a reference to a function which, for all intents and purposes,
0069         # will behave as a constructor
0070         return self.do_open(self.get_connection, req)
0071 
0072     def get_connection(self, host, timeout=300):
0073         """Connection method"""
0074         if  self.key and self.cert and not self.capath:
0075             return httplib.HTTPSConnection(host, key_file=self.key,
0076                                                 cert_file=self.cert)
0077         elif self.cert and self.capath:
0078             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
0079             context.load_verify_locations(capath=self.capath)
0080             context.load_cert_chain(self.cert)
0081             return httplib.HTTPSConnection(host, context=context)
0082         return httplib.HTTPSConnection(host)
0083 
0084 def x509():
0085     "Helper function to get x509 either from env or tmp file"
0086     proxy = os.environ.get('X509_USER_PROXY', '')
0087     if  not proxy:
0088         proxy = '/tmp/x509up_u%s' % pwd.getpwuid( os.getuid() ).pw_uid
0089         if  not os.path.isfile(proxy):
0090             return ''
0091     return proxy
0092 
0093 def check_glidein():
0094     "Check glideine environment and exit if it is set"
0095     glidein = os.environ.get('GLIDEIN_CMSSite', '')
0096     if  glidein:
0097         msg = "ERROR: das_client is running from GLIDEIN environment, it is prohibited"
0098         print(msg)
0099         sys.exit(EX__BASE)
0100 
0101 def check_auth(key):
0102     "Check if user runs das_client with key/cert and warn users to switch"
0103     if  not key:
0104         msg  = "WARNING: das_client is running without user credentials/X509 proxy, create proxy via 'voms-proxy-init -voms cms -rfc'"
0105         print(msg, file=sys.stderr)
0106 
0107 class DASOptionParser: 
0108     """
0109     DAS cache client option parser
0110     """
0111     def __init__(self):
0112         usage  = "Usage: %prog [options]\n"
0113         usage += "For more help please visit https://cmsweb.cern.ch/das/faq"
0114         self.parser = OptionParser(usage=usage)
0115         self.parser.add_option("-v", "--verbose", action="store", 
0116                                type="int", default=0, dest="verbose",
0117              help="verbose output")
0118         self.parser.add_option("--query", action="store", type="string", 
0119                                default=False, dest="query",
0120              help="specify query for your request")
0121         msg  = "host name of DAS cache server, default is https://cmsweb.cern.ch"
0122         self.parser.add_option("--host", action="store", type="string", 
0123                        default='https://cmsweb.cern.ch', dest="host", help=msg)
0124         msg  = "start index for returned result set, aka pagination,"
0125         msg += " use w/ limit (default is 0)"
0126         self.parser.add_option("--idx", action="store", type="int", 
0127                                default=0, dest="idx", help=msg)
0128         msg  = "number of returned results (default is 10),"
0129         msg += " use --limit=0 to show all results"
0130         self.parser.add_option("--limit", action="store", type="int", 
0131                                default=10, dest="limit", help=msg)
0132         msg  = 'specify return data format (json or plain), default plain.'
0133         self.parser.add_option("--format", action="store", type="string",
0134                                default="plain", dest="format", help=msg)
0135         msg  = 'query waiting threshold in sec, default is 5 minutes'
0136         self.parser.add_option("--threshold", action="store", type="int",
0137                                default=300, dest="threshold", help=msg)
0138         msg  = 'specify private key file name, default $X509_USER_PROXY'
0139         self.parser.add_option("--key", action="store", type="string",
0140                                default=x509(), dest="ckey", help=msg)
0141         msg  = 'specify private certificate file name, default $X509_USER_PROXY'
0142         self.parser.add_option("--cert", action="store", type="string",
0143                                default=x509(), dest="cert", help=msg)
0144         msg  = 'specify CA path, default $X509_CERT_DIR'
0145         self.parser.add_option("--capath", action="store", type="string",
0146                                default=os.environ.get("X509_CERT_DIR", ""),
0147                                dest="capath", help=msg)
0148         msg  = 'specify number of retries upon busy DAS server message'
0149         self.parser.add_option("--retry", action="store", type="string",
0150                                default=0, dest="retry", help=msg)
0151         msg  = 'show DAS headers in JSON format'
0152         msg += ' (obsolete, keep for backward compatibility)'
0153         self.parser.add_option("--das-headers", action="store_true",
0154                                default=False, dest="das_headers", help=msg)
0155         msg = 'specify power base for size_format, default is 10 (can be 2)'
0156         self.parser.add_option("--base", action="store", type="int",
0157                                default=0, dest="base", help=msg)
0158 
0159         msg = 'a file which contains a cached json dictionary for query -> files mapping'
0160         self.parser.add_option("--cache", action="store", type="string",
0161                                default=None, dest="cache", help=msg)
0162 
0163         msg = 'a query cache value'
0164         self.parser.add_option("--query-cache", action="store", type="int",
0165                                default=0, dest="qcache", help=msg)
0166         msg = 'List DAS key/attributes, use "all" or specific DAS key value, e.g. site'
0167         self.parser.add_option("--list-attributes", action="store", type="string",
0168                                default="", dest="keys_attrs", help=msg)
0169     def get_opt(self):
0170         """
0171         Returns parse list of options
0172         """
0173         return self.parser.parse_args()
0174 
0175 def convert_time(val):
0176     "Convert given timestamp into human readable format"
0177     if  isinstance(val, int) or isinstance(val, float):
0178         return time.strftime('%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
0179     return val
0180 
0181 def size_format(uinput, ibase=0):
0182     """
0183     Format file size utility, it converts file size into KB, MB, GB, TB, PB units
0184     """
0185     if  not ibase:
0186         return uinput
0187     try:
0188         num = float(uinput)
0189     except Exception as _exc:
0190         return uinput
0191     if  ibase == 2.: # power of 2
0192         base  = 1024.
0193         xlist = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
0194     else: # default base is 10
0195         base  = 1000.
0196         xlist = ['', 'KB', 'MB', 'GB', 'TB', 'PB']
0197     for xxx in xlist:
0198         if  num < base:
0199             return "%3.1f%s" % (num, xxx)
0200         num /= base
0201 
0202 def unique_filter(rows):
0203     """
0204     Unique filter drop duplicate rows.
0205     """
0206     old_row = {}
0207     row = None
0208     for row in rows:
0209         row_data = dict(row)
0210         try:
0211             del row_data['_id']
0212             del row_data['das']
0213             del row_data['das_id']
0214             del row_data['cache_id']
0215         except:
0216             pass
0217         old_data = dict(old_row)
0218         try:
0219             del old_data['_id']
0220             del old_data['das']
0221             del old_data['das_id']
0222             del old_data['cache_id']
0223         except:
0224             pass
0225         if  row_data == old_data:
0226             continue
0227         if  old_row:
0228             yield old_row
0229         old_row = row
0230     yield row
0231 
0232 def extract_value(row, key, base=10):
0233     """Generator which extracts row[key] value"""
0234     if  isinstance(row, dict) and key in row:
0235         if  key == 'creation_time':
0236             row = convert_time(row[key])
0237         elif  key == 'size':
0238             row = size_format(row[key], base)
0239         else:
0240             row = row[key]
0241         yield row
0242     if  isinstance(row, list) or isinstance(row, GeneratorType):
0243         for item in row:
0244             for vvv in extract_value(item, key, base):
0245                 yield vvv
0246 
0247 def get_value(data, filters, base=10):
0248     """Filter data from a row for given list of filters"""
0249     for ftr in filters:
0250         if  ftr.find('>') != -1 or ftr.find('<') != -1 or ftr.find('=') != -1:
0251             continue
0252         row = dict(data)
0253         values = []
0254         keys = ftr.split('.')
0255         for key in keys:
0256             val = [v for v in extract_value(row, key, base)]
0257             if  key == keys[-1]: # we collect all values at last key
0258                 values += [json.dumps(i) for i in val]
0259             else:
0260                 row = val
0261         if  len(values) == 1:
0262             yield values[0]
0263         else:
0264             yield values
0265 
0266 def fullpath(path):
0267     "Expand path to full path"
0268     if  path and path[0] == '~':
0269         path = path.replace('~', '')
0270         path = path[1:] if path[0] == '/' else path
0271         path = os.path.join(os.environ['HOME'], path)
0272     return path
0273 
0274 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
0275         cert=None, capath=None, qcache=0, das_headers=True):
0276     """Contact DAS server and retrieve data for given DAS query"""
0277     params  = {'input':query, 'idx':idx, 'limit':limit}
0278     if  qcache:
0279         params['qcache'] = qcache
0280     path    = '/das/cache'
0281     pat     = re.compile('http[s]{0,1}://')
0282     if  not pat.match(host):
0283         msg = 'Invalid hostname: %s' % host
0284         raise Exception(msg)
0285     url = host + path
0286     client = '%s (%s)' % (DAS_CLIENT, os.environ.get('USER', ''))
0287     headers = {"Accept": "application/json", "User-Agent": client}
0288     encoded_data = urllib.urlencode(params, doseq=True)
0289     url += '?%s' % encoded_data
0290     req  = urllib2.Request(url=url, headers=headers)
0291     if  ckey and cert:
0292         ckey = fullpath(ckey)
0293         cert = fullpath(cert)
0294         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0295     elif cert and capath:
0296         cert = fullpath(cert)
0297         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0298     else:
0299         http_hdlr  = urllib2.HTTPHandler(debuglevel=debug)
0300     proxy_handler  = urllib2.ProxyHandler({})
0301     cookie_jar     = cookielib.CookieJar()
0302     cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0303     try:
0304         opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0305         fdesc = opener.open(req)
0306         data = fdesc.read()
0307         fdesc.close()
0308     except urllib2.HTTPError as error:
0309         print(error.read())
0310         sys.exit(1)
0311 
0312     pat = re.compile(r'^[a-z0-9]{32}')
0313     if  data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0314         pid = data
0315     else:
0316         pid = None
0317     iwtime  = 2  # initial waiting time in seconds
0318     wtime   = 20 # final waiting time in seconds
0319     sleep   = iwtime
0320     time0   = time.time()
0321     while pid:
0322         params.update({'pid':data})
0323         encoded_data = urllib.urlencode(params, doseq=True)
0324         url  = host + path + '?%s' % encoded_data
0325         req  = urllib2.Request(url=url, headers=headers)
0326         try:
0327             fdesc = opener.open(req)
0328             data = fdesc.read()
0329             fdesc.close()
0330         except urllib2.HTTPError as err:
0331             return {"status":"fail", "reason":str(err)}
0332         if  data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0333             pid = data
0334         else:
0335             pid = None
0336         time.sleep(sleep)
0337         if  sleep < wtime:
0338             sleep *= 2
0339         elif sleep == wtime:
0340             sleep = iwtime # start new cycle
0341         else:
0342             sleep = wtime
0343         if  (time.time()-time0) > threshold:
0344             reason = "client timeout after %s sec" % int(time.time()-time0)
0345             return {"status":"fail", "reason":reason}
0346     jsondict = json.loads(data)
0347     return jsondict
0348 
0349 def prim_value(row):
0350     """Extract primary key value from DAS record"""
0351     prim_key = row['das']['primary_key']
0352     if  prim_key == 'summary':
0353         return row.get(prim_key, None)
0354     key, att = prim_key.split('.')
0355     if  isinstance(row[key], list):
0356         for item in row[key]:
0357             if  att in item:
0358                 return item[att]
0359     else:
0360         if  key in row:
0361             if  att in row[key]:
0362                 return row[key][att]
0363 
0364 def print_summary(rec):
0365     "Print summary record information on stdout"
0366     if  'summary' not in rec:
0367         msg = 'Summary information is not found in record:\n', rec
0368         raise Exception(msg)
0369     for row in rec['summary']:
0370         keys = [k for k in row.keys()]
0371         maxlen = max([len(k) for k in keys])
0372         for key, val in row.items():
0373             pkey = '%s%s' % (key, ' '*(maxlen-len(key)))
0374             print('%s: %s' % (pkey, val))
0375         print()
0376 
0377 def print_from_cache(cache, query):
0378     "print the list of files reading it from cache"
0379     data = open(cache).read()
0380     jsondict = json.loads(data)
0381     if query in jsondict:
0382       print("\n".join(jsondict[query]))
0383       exit(0)
0384     exit(1)
0385 
0386 def keys_attrs(lkey, oformat, host, ckey, cert, debug=0):
0387     "Contact host for list of key/attributes pairs"
0388     url = '%s/das/keys?view=json' % host
0389     headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0390     req  = urllib2.Request(url=url, headers=headers)
0391     if  ckey and cert:
0392         ckey = fullpath(ckey)
0393         cert = fullpath(cert)
0394         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, debug)
0395     else:
0396         http_hdlr  = urllib2.HTTPHandler(debuglevel=debug)
0397     proxy_handler  = urllib2.ProxyHandler({})
0398     cookie_jar     = cookielib.CookieJar()
0399     cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0400     opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0401     fdesc = opener.open(req)
0402     data = json.load(fdesc)
0403     fdesc.close()
0404     if  oformat.lower() == 'json':
0405         if  lkey == 'all':
0406             print(json.dumps(data))
0407         else:
0408             print(json.dumps({lkey:data[lkey]}))
0409         return
0410     for key, vdict in data.items():
0411         if  lkey == 'all':
0412             pass
0413         elif lkey != key:
0414             continue
0415         print()
0416         print("DAS key:", key)
0417         for attr, examples in vdict.items():
0418             prefix = '    '
0419             print('%s%s' % (prefix, attr))
0420             for item in examples:
0421                 print('%s%s%s' % (prefix, prefix, item))
0422 
0423 def main():
0424     """Main function"""
0425     optmgr  = DASOptionParser()
0426     opts, _ = optmgr.get_opt()
0427     host    = opts.host
0428     debug   = opts.verbose
0429     query   = opts.query
0430     idx     = opts.idx
0431     limit   = opts.limit
0432     thr     = opts.threshold
0433     ckey    = opts.ckey
0434     cert    = opts.cert
0435     capath  = opts.capath
0436     base    = opts.base
0437     qcache  = opts.qcache
0438     check_glidein()
0439     check_auth(ckey)
0440     if  opts.keys_attrs:
0441         keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
0442         return
0443     if  not query:
0444         print('Input query is missing')
0445         sys.exit(EX_USAGE)
0446     if  opts.format == 'plain':
0447         jsondict = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0448         cli_msg  = jsondict.get('client_message', None)
0449         if  cli_msg:
0450             print("DAS CLIENT WARNING: %s" % cli_msg)
0451         if  'status' not in jsondict and opts.cache:
0452             print_from_cache(opts.cache, query)
0453         if  'status' not in jsondict:
0454             print('DAS record without status field:\n%s' % jsondict)
0455             sys.exit(EX_PROTOCOL)
0456         if  jsondict["status"] != 'ok' and opts.cache:
0457             print_from_cache(opts.cache, query)
0458         if  jsondict['status'] != 'ok':
0459             print("status: %s, reason: %s" \
0460                 % (jsondict.get('status'), jsondict.get('reason', 'N/A')))
0461             if  opts.retry:
0462                 found = False
0463                 for attempt in xrange(1, int(opts.retry)):
0464                     interval = log(attempt)**5
0465                     print("Retry in %5.3f sec" % interval)
0466                     time.sleep(interval)
0467                     data = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0468                     jsondict = json.loads(data)
0469                     if  jsondict.get('status', 'fail') == 'ok':
0470                         found = True
0471                         break
0472             else:
0473                 sys.exit(EX_TEMPFAIL)
0474             if  not found:
0475                 sys.exit(EX_TEMPFAIL)
0476         nres = jsondict.get('nresults', 0)
0477         if  not limit:
0478             drange = '%s' % nres
0479         else:
0480             drange = '%s-%s out of %s' % (idx+1, idx+limit, nres)
0481         if  opts.limit:
0482             msg  = "\nShowing %s results" % drange
0483             msg += ", for more results use --idx/--limit options\n"
0484             print(msg)
0485         mongo_query = jsondict.get('mongo_query', {})
0486         unique  = False
0487         fdict   = mongo_query.get('filters', {})
0488         filters = fdict.get('grep', [])
0489         aggregators = mongo_query.get('aggregators', [])
0490         if  'unique' in fdict.keys():
0491             unique = True
0492         if  filters and not aggregators:
0493             data = jsondict['data']
0494             if  isinstance(data, dict):
0495                 rows = [r for r in get_value(data, filters, base)]
0496                 print(' '.join(rows))
0497             elif isinstance(data, list):
0498                 if  unique:
0499                     data = unique_filter(data)
0500                 for row in data:
0501                     rows = [r for r in get_value(row, filters, base)]
0502                     types = [type(r) for r in rows]
0503                     if  len(types)>1: # mixed types print as is
0504                         print(' '.join([str(r) for r in rows]))
0505                     elif isinstance(rows[0], list):
0506                         out = set()
0507                         for item in rows:
0508                             for elem in item:
0509                                 out.add(elem)
0510                         print(' '.join(out))
0511                     else:
0512                         print(' '.join(rows))
0513             else:
0514                 print(json.dumps(jsondict))
0515         elif aggregators:
0516             data = jsondict['data']
0517             if  unique:
0518                 data = unique_filter(data)
0519             for row in data:
0520                 if  row['key'].find('size') != -1 and \
0521                     row['function'] == 'sum':
0522                     val = size_format(row['result']['value'], base)
0523                 else:
0524                     val = row['result']['value']
0525                 print('%s(%s)=%s' \
0526                 % (row['function'], row['key'], val))
0527         else:
0528             data = jsondict['data']
0529             if  isinstance(data, list):
0530                 old = None
0531                 val = None
0532                 for row in data:
0533                     prim_key = row.get('das', {}).get('primary_key', None)
0534                     if  prim_key == 'summary':
0535                         print_summary(row)
0536                         return
0537                     val = prim_value(row)
0538                     if  not opts.limit:
0539                         if  val != old:
0540                             print(val)
0541                             old = val
0542                     else:
0543                         print(val)
0544                 if  val != old and not opts.limit:
0545                     print(val)
0546             elif isinstance(data, dict):
0547                 print(prim_value(data))
0548             else:
0549                 print(data)
0550     else:
0551         jsondict = get_data(\
0552                 host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0553         print(json.dumps(jsondict))
0554 
0555 #
0556 # main
0557 #
0558 if __name__ == '__main__':
0559     main()