Back to home page

Project CMSSW displayed by LXR

 
 

    


Warning, /DPGAnalysis/HcalTools/scripts/cmt/das_client.py_For_7 is written in an unsupported language. File is not indexed.

0001 #!/usr/bin/env python
0002 #-*- coding: utf-8 -*-
0003 #pylint: disable=C0301,C0103,R0914,R0903
0004 
0005 """
0006 DAS command line tool
0007 """
0008 __author__ = "Valentin Kuznetsov"
0009 
0010 import sys
0011 if  sys.version_info < (2, 6):
0012     raise Exception("DAS requires python 2.6 or greater")
0013 
0014 DAS_CLIENT = 'das-client/1.0::python/%s.%s' % sys.version_info[:2]
0015 
0016 import os
0017 import re
0018 import time
0019 import json
0020 import urllib
0021 import urllib2
0022 import httplib
0023 import cookielib
0024 from   optparse import OptionParser
0025 from   math import log
0026 
0027 # define exit codes according to Linux sysexists.h
0028 EX_OK           = 0  # successful termination
0029 EX__BASE        = 64 # base value for error messages
0030 EX_USAGE        = 64 # command line usage error
0031 EX_DATAERR      = 65 # data format error
0032 EX_NOINPUT      = 66 # cannot open input
0033 EX_NOUSER       = 67 # addressee unknown
0034 EX_NOHOST       = 68 # host name unknown
0035 EX_UNAVAILABLE  = 69 # service unavailable
0036 EX_SOFTWARE     = 70 # internal software error
0037 EX_OSERR        = 71 # system error (e.g., can't fork)
0038 EX_OSFILE       = 72 # critical OS file missing
0039 EX_CANTCREAT    = 73 # can't create (user) output file
0040 EX_IOERR        = 74 # input/output error
0041 EX_TEMPFAIL     = 75 # temp failure; user is invited to retry
0042 EX_PROTOCOL     = 76 # remote error in protocol
0043 EX_NOPERM       = 77 # permission denied
0044 EX_CONFIG       = 78 # configuration error
0045 
0046 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
0047     """
0048     Simple HTTPS client authentication class based on provided
0049     key/ca information
0050     """
0051     def __init__(self, key=None, cert=None, level=0):
0052         if  level > 1:
0053             urllib2.HTTPSHandler.__init__(self, debuglevel=1)
0054         else:
0055             urllib2.HTTPSHandler.__init__(self)
0056         self.key = key
0057         self.cert = cert
0058 
0059     def https_open(self, req):
0060         """Open request method"""
0061         #Rather than pass in a reference to a connection class, we pass in
0062         # a reference to a function which, for all intents and purposes,
0063         # will behave as a constructor
0064         return self.do_open(self.get_connection, req)
0065 
0066     def get_connection(self, host, timeout=300):
0067         """Connection method"""
0068         if  self.key:
0069             return httplib.HTTPSConnection(host, key_file=self.key,
0070                                                 cert_file=self.cert)
0071         return httplib.HTTPSConnection(host)
0072 
0073 class DASOptionParser: 
0074     """
0075     DAS cache client option parser
0076     """
0077     def __init__(self):
0078         usage  = "Usage: %prog [options]\n"
0079         usage += "For more help please visit https://cmsweb.cern.ch/das/faq"
0080         self.parser = OptionParser(usage=usage)
0081         self.parser.add_option("-v", "--verbose", action="store", 
0082                                type="int", default=0, dest="verbose",
0083              help="verbose output")
0084         self.parser.add_option("--query", action="store", type="string", 
0085                                default=False, dest="query",
0086              help="specify query for your request")
0087         msg  = "host name of DAS cache server, default is https://cmsweb.cern.ch"
0088         self.parser.add_option("--host", action="store", type="string", 
0089                        default='https://cmsweb.cern.ch', dest="host", help=msg)
0090         msg  = "start index for returned result set, aka pagination,"
0091         msg += " use w/ limit (default is 0)"
0092         self.parser.add_option("--idx", action="store", type="int", 
0093                                default=0, dest="idx", help=msg)
0094         msg  = "number of returned results (default is 10),"
0095         msg += " use --limit=0 to show all results"
0096         self.parser.add_option("--limit", action="store", type="int", 
0097                                default=10, dest="limit", help=msg)
0098         msg  = 'specify return data format (json or plain), default plain.'
0099         self.parser.add_option("--format", action="store", type="string",
0100                                default="plain", dest="format", help=msg)
0101         msg  = 'query waiting threshold in sec, default is 5 minutes'
0102         self.parser.add_option("--threshold", action="store", type="int",
0103                                default=300, dest="threshold", help=msg)
0104         msg  = 'specify private key file name'
0105         self.parser.add_option("--key", action="store", type="string",
0106                                default="", dest="ckey", help=msg)
0107         msg  = 'specify private certificate file name'
0108         self.parser.add_option("--cert", action="store", type="string",
0109                                default="", dest="cert", help=msg)
0110         msg  = 'specify number of retries upon busy DAS server message'
0111         self.parser.add_option("--retry", action="store", type="string",
0112                                default=0, dest="retry", help=msg)
0113         msg  = 'show DAS headers in JSON format'
0114         msg += ' (obsolete, keep for backward compatibility)'
0115         self.parser.add_option("--das-headers", action="store_true",
0116                                default=False, dest="das_headers", help=msg)
0117         msg = 'specify power base for size_format, default is 10 (can be 2)'
0118         self.parser.add_option("--base", action="store", type="int",
0119                                default=0, dest="base", help=msg)
0120 
0121         msg = 'a file which contains a cached json dictionary for query -> files mapping'
0122         self.parser.add_option("--cache", action="store", type="string",
0123                                default=None, dest="cache", help=msg)
0124 
0125         msg = 'List DAS key/attributes, use "all" or specific DAS key value, e.g. site'
0126         self.parser.add_option("--list-attributes", action="store", type="string",
0127                                default="", dest="keys_attrs", help=msg)
0128     def get_opt(self):
0129         """
0130         Returns parse list of options
0131         """
0132         return self.parser.parse_args()
0133 
0134 def convert_time(val):
0135     "Convert given timestamp into human readable format"
0136     if  isinstance(val, int) or isinstance(val, float):
0137         return time.strftime('%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
0138     return val
0139 
0140 def size_format(uinput, ibase=0):
0141     """
0142     Format file size utility, it converts file size into KB, MB, GB, TB, PB units
0143     """
0144     if  not ibase:
0145         return uinput
0146     try:
0147         num = float(uinput)
0148     except Exception as _exc:
0149         return uinput
0150     if  ibase == 2.: # power of 2
0151         base  = 1024.
0152         xlist = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
0153     else: # default base is 10
0154         base  = 1000.
0155         xlist = ['', 'KB', 'MB', 'GB', 'TB', 'PB']
0156     for xxx in xlist:
0157         if  num < base:
0158             return "%3.1f%s" % (num, xxx)
0159         num /= base
0160 
0161 def unique_filter(rows):
0162     """
0163     Unique filter drop duplicate rows.
0164     """
0165     old_row = {}
0166     row = None
0167     for row in rows:
0168         row_data = dict(row)
0169         try:
0170             del row_data['_id']
0171             del row_data['das']
0172             del row_data['das_id']
0173             del row_data['cache_id']
0174         except:
0175             pass
0176         old_data = dict(old_row)
0177         try:
0178             del old_data['_id']
0179             del old_data['das']
0180             del old_data['das_id']
0181             del old_data['cache_id']
0182         except:
0183             pass
0184         if  row_data == old_data:
0185             continue
0186         if  old_row:
0187             yield old_row
0188         old_row = row
0189     yield row
0190 
0191 def get_value(data, filters, base=10):
0192     """Filter data from a row for given list of filters"""
0193     for ftr in filters:
0194         if  ftr.find('>') != -1 or ftr.find('<') != -1 or ftr.find('=') != -1:
0195             continue
0196         row = dict(data)
0197         values = set()
0198         for key in ftr.split('.'):
0199             if  isinstance(row, dict) and key in row:
0200                 if  key == 'creation_time':
0201                     row = convert_time(row[key])
0202                 elif  key == 'size':
0203                     row = size_format(row[key], base)
0204                 else:
0205                     row = row[key]
0206             if  isinstance(row, list):
0207                 for item in row:
0208                     if  isinstance(item, dict) and key in item:
0209                         if  key == 'creation_time':
0210                             row = convert_time(item[key])
0211                         elif  key == 'size':
0212                             row = size_format(item[key], base)
0213                         else:
0214                             row = item[key]
0215                         values.add(row)
0216                     else:
0217                         if  isinstance(item, basestring):
0218                             values.add(item)
0219         if  len(values) == 1:
0220             yield str(values.pop())
0221         else:
0222             yield str(list(values))
0223 
0224 def fullpath(path):
0225     "Expand path to full path"
0226     if  path and path[0] == '~':
0227         path = path.replace('~', '')
0228         path = path[1:] if path[0] == '/' else path
0229         path = os.path.join(os.environ['HOME'], path)
0230     return path
0231 
0232 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
0233         cert=None, das_headers=True):
0234     """Contact DAS server and retrieve data for given DAS query"""
0235     params  = {'input':query, 'idx':idx, 'limit':limit}
0236     path    = '/das/cache'
0237     pat     = re.compile('http[s]{0,1}://')
0238     if  not pat.match(host):
0239         msg = 'Invalid hostname: %s' % host
0240         raise Exception(msg)
0241     url = host + path
0242     headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0243     encoded_data = urllib.urlencode(params, doseq=True)
0244     url += '?%s' % encoded_data
0245     req  = urllib2.Request(url=url, headers=headers)
0246     if  ckey and cert:
0247         ckey = fullpath(ckey)
0248         cert = fullpath(cert)
0249         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, debug)
0250     else:
0251         http_hdlr  = urllib2.HTTPHandler(debuglevel=debug)
0252     proxy_handler  = urllib2.ProxyHandler({})
0253     cookie_jar     = cookielib.CookieJar()
0254     cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0255     opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0256     fdesc = opener.open(req)
0257     data = fdesc.read()
0258     fdesc.close()
0259 
0260     pat = re.compile(r'^[a-z0-9]{32}')
0261     if  data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0262         pid = data
0263     else:
0264         pid = None
0265     iwtime  = 2  # initial waiting time in seconds
0266     wtime   = 20 # final waiting time in seconds
0267     sleep   = iwtime
0268     time0   = time.time()
0269     while pid:
0270         params.update({'pid':data})
0271         encoded_data = urllib.urlencode(params, doseq=True)
0272         url  = host + path + '?%s' % encoded_data
0273         req  = urllib2.Request(url=url, headers=headers)
0274         try:
0275             fdesc = opener.open(req)
0276             data = fdesc.read()
0277             fdesc.close()
0278         except urllib2.HTTPError as err:
0279             return {"status":"fail", "reason":str(err)}
0280         if  data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0281             pid = data
0282         else:
0283             pid = None
0284         time.sleep(sleep)
0285         if  sleep < wtime:
0286             sleep *= 2
0287         elif sleep == wtime:
0288             sleep = iwtime # start new cycle
0289         else:
0290             sleep = wtime
0291         if  (time.time()-time0) > threshold:
0292             reason = "client timeout after %s sec" % int(time.time()-time0)
0293             return {"status":"fail", "reason":reason}
0294     jsondict = json.loads(data)
0295     return jsondict
0296 
0297 def prim_value(row):
0298     """Extract primary key value from DAS record"""
0299     prim_key = row['das']['primary_key']
0300     if  prim_key == 'summary':
0301         return row[prim_key]
0302     key, att = prim_key.split('.')
0303     if  isinstance(row[key], list):
0304         for item in row[key]:
0305             if  att in item:
0306                 return item[att]
0307     else:
0308         return row[key][att]
0309 
0310 def print_summary(rec):
0311     "Print summary record information on stdout"
0312     if  'summary' not in rec:
0313         msg = 'Summary information is not found in record:\n', rec
0314         raise Exception(msg)
0315     for row in rec['summary']:
0316         keys = [k for k in row.keys()]
0317         maxlen = max([len(k) for k in keys])
0318         for key, val in row.items():
0319             pkey = '%s%s' % (key, ' '*(maxlen-len(key)))
0320             print '%s: %s' % (pkey, val)
0321         print
0322 
0323 def print_from_cache(cache, query):
0324     "print the list of files reading it from cache"
0325     data = open(cache).read()
0326     jsondict = json.loads(data)
0327     if query in jsondict:
0328       print "\n".join(jsondict[query])
0329       exit(0)
0330     exit(1)
0331 
0332 def keys_attrs(lkey, oformat, host, ckey, cert, debug=0):
0333     "Contact host for list of key/attributes pairs"
0334     url = '%s/das/keys?view=json' % host
0335     headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0336     req  = urllib2.Request(url=url, headers=headers)
0337     if  ckey and cert:
0338         ckey = fullpath(ckey)
0339         cert = fullpath(cert)
0340         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, debug)
0341     else:
0342         http_hdlr  = urllib2.HTTPHandler(debuglevel=debug)
0343     proxy_handler  = urllib2.ProxyHandler({})
0344     cookie_jar     = cookielib.CookieJar()
0345     cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0346     opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0347     fdesc = opener.open(req)
0348     data = json.load(fdesc)
0349     fdesc.close()
0350     if  oformat.lower() == 'json':
0351         if  lkey == 'all':
0352             print json.dumps(data)
0353         else:
0354             print json.dumps({lkey:data[lkey]})
0355         return
0356     for key, vdict in data.items():
0357         if  lkey == 'all':
0358             pass
0359         elif lkey != key:
0360             continue
0361         print
0362         print "DAS key:", key
0363         for attr, examples in vdict.items():
0364             prefix = '    '
0365             print '%s%s' % (prefix, attr)
0366             for item in examples:
0367                 print '%s%s%s' % (prefix, prefix, item)
0368 
0369 def main():
0370     """Main function"""
0371     optmgr  = DASOptionParser()
0372     opts, _ = optmgr.get_opt()
0373     host    = opts.host
0374     debug   = opts.verbose
0375     query   = opts.query
0376     idx     = opts.idx
0377     limit   = opts.limit
0378     thr     = opts.threshold
0379     ckey    = opts.ckey
0380     cert    = opts.cert
0381     base    = opts.base
0382     if  opts.keys_attrs:
0383         keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
0384         return
0385     if  not query:
0386         print 'Input query is missing'
0387         sys.exit(EX_USAGE)
0388     if  opts.format == 'plain':
0389         jsondict = get_data(host, query, idx, limit, debug, thr, ckey, cert)
0390         cli_msg  = jsondict.get('client_message', None)
0391         if  cli_msg:
0392             print "DAS CLIENT WARNING: %s" % cli_msg
0393         if  'status' not in jsondict and opts.cache:
0394             print_from_cache(opts.cache, query)
0395         if  'status' not in jsondict:
0396             print 'DAS record without status field:\n%s' % jsondict
0397             sys.exit(EX_PROTOCOL)
0398         if  jsondict["status"] != 'ok' and opts.cache:
0399             print_from_cache(opts.cache, query)
0400         if  jsondict['status'] != 'ok':
0401             print "status: %s, reason: %s" \
0402                 % (jsondict.get('status'), jsondict.get('reason', 'N/A'))
0403             if  opts.retry:
0404                 found = False
0405                 for attempt in xrange(1, int(opts.retry)):
0406                     interval = log(attempt)**5
0407                     print "Retry in %5.3f sec" % interval
0408                     time.sleep(interval)
0409                     data = get_data(host, query, idx, limit, debug, thr, ckey, cert)
0410                     jsondict = json.loads(data)
0411                     if  jsondict.get('status', 'fail') == 'ok':
0412                         found = True
0413                         break
0414             else:
0415                 sys.exit(EX_TEMPFAIL)
0416             if  not found:
0417                 sys.exit(EX_TEMPFAIL)
0418         nres = jsondict['nresults']
0419         if  not limit:
0420             drange = '%s' % nres
0421         else:
0422             drange = '%s-%s out of %s' % (idx+1, idx+limit, nres)
0423         if  opts.limit:
0424             msg  = "\nShowing %s results" % drange
0425             msg += ", for more results use --idx/--limit options\n"
0426             print msg
0427         mongo_query = jsondict['mongo_query']
0428         unique  = False
0429         fdict   = mongo_query.get('filters', {})
0430         filters = fdict.get('grep', [])
0431         aggregators = mongo_query.get('aggregators', [])
0432         if  'unique' in fdict.keys():
0433             unique = True
0434         if  filters and not aggregators:
0435             data = jsondict['data']
0436             if  isinstance(data, dict):
0437                 rows = [r for r in get_value(data, filters, base)]
0438                 print ' '.join(rows)
0439             elif isinstance(data, list):
0440                 if  unique:
0441                     data = unique_filter(data)
0442                 for row in data:
0443                     rows = [r for r in get_value(row, filters, base)]
0444                     print ' '.join(rows)
0445             else:
0446                 print(json.dumps(jsondict))
0447         elif aggregators:
0448             data = jsondict['data']
0449             if  unique:
0450                 data = unique_filter(data)
0451             for row in data:
0452                 if  row['key'].find('size') != -1 and \
0453                     row['function'] == 'sum':
0454                     val = size_format(row['result']['value'], base)
0455                 else:
0456                     val = row['result']['value']
0457                 print '%s(%s)=%s' \
0458                 % (row['function'], row['key'], val)
0459         else:
0460             data = jsondict['data']
0461             if  isinstance(data, list):
0462                 old = None
0463                 val = None
0464                 for row in data:
0465                     prim_key = row.get('das', {}).get('primary_key', None)
0466                     if  prim_key == 'summary':
0467                         print_summary(row)
0468                         return
0469                     val = prim_value(row)
0470                     if  not opts.limit:
0471                         if  val != old:
0472                             print val
0473                             old = val
0474                     else:
0475                         print val
0476                 if  val != old and not opts.limit:
0477                     print val
0478             elif isinstance(data, dict):
0479                 print prim_value(data)
0480             else:
0481                 print data
0482     else:
0483         jsondict = get_data(\
0484                 host, query, idx, limit, debug, thr, ckey, cert)
0485         print(json.dumps(jsondict))
0486 
0487 #
0488 # main
0489 #
0490 if __name__ == '__main__':
0491     main()