Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 10:53:14

0001 #!/usr/bin/env python
0002 #-*- coding: utf-8 -*-
0003 #pylint: disable=C0301,C0103,R0914,R0903
0004 
0005 """
0006 DAS command line tool
0007 """
0008 from __future__ import print_function
0009 __author__ = "Valentin Kuznetsov"
0010 
0011 # system modules
0012 import os
0013 import sys
0014 import pwd
0015 if  sys.version_info < (2, 6):
0016     raise Exception("DAS requires python 2.6 or greater")
0017 
0018 DAS_CLIENT = 'das-client/1.1::python/%s.%s' % sys.version_info[:2]
0019 
0020 import os
0021 import re
0022 import ssl
0023 import time
0024 import json
0025 import urllib
0026 import urllib2
0027 import httplib
0028 import cookielib
0029 from   optparse import OptionParser
0030 from   math import log
0031 from   types import GeneratorType
0032 
0033 # define exit codes according to Linux sysexists.h
0034 EX_OK           = 0  # successful termination
0035 EX__BASE        = 64 # base value for error messages
0036 EX_USAGE        = 64 # command line usage error
0037 EX_DATAERR      = 65 # data format error
0038 EX_NOINPUT      = 66 # cannot open input
0039 EX_NOUSER       = 67 # addressee unknown
0040 EX_NOHOST       = 68 # host name unknown
0041 EX_UNAVAILABLE  = 69 # service unavailable
0042 EX_SOFTWARE     = 70 # internal software error
0043 EX_OSERR        = 71 # system error (e.g., can't fork)
0044 EX_OSFILE       = 72 # critical OS file missing
0045 EX_CANTCREAT    = 73 # can't create (user) output file
0046 EX_IOERR        = 74 # input/output error
0047 EX_TEMPFAIL     = 75 # temp failure; user is invited to retry
0048 EX_PROTOCOL     = 76 # remote error in protocol
0049 EX_NOPERM       = 77 # permission denied
0050 EX_CONFIG       = 78 # configuration error
0051 
0052 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
0053     """
0054     Simple HTTPS client authentication class based on provided
0055     key/ca information
0056     """
0057     def __init__(self, key=None, cert=None, capath=None, level=0):
0058         if  level > 1:
0059             urllib2.HTTPSHandler.__init__(self, debuglevel=1)
0060         else:
0061             urllib2.HTTPSHandler.__init__(self)
0062         self.key = key
0063         self.cert = cert
0064     self.capath = capath
0065 
0066     def https_open(self, req):
0067         """Open request method"""
0068         #Rather than pass in a reference to a connection class, we pass in
0069         # a reference to a function which, for all intents and purposes,
0070         # will behave as a constructor
0071         return self.do_open(self.get_connection, req)
0072 
0073     def get_connection(self, host, timeout=300):
0074         """Connection method"""
0075         if  self.key and self.cert and not self.capath:
0076             return httplib.HTTPSConnection(host, key_file=self.key,
0077                                                 cert_file=self.cert)
0078         elif self.cert and self.capath:
0079             context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
0080             context.load_verify_locations(capath=self.capath)
0081             context.load_cert_chain(self.cert)
0082             return httplib.HTTPSConnection(host, context=context)
0083         return httplib.HTTPSConnection(host)
0084 
0085 def x509():
0086     "Helper function to get x509 either from env or tmp file"
0087     proxy = os.environ.get('X509_USER_PROXY', '')
0088     if  not proxy:
0089         proxy = '/tmp/x509up_u%s' % pwd.getpwuid( os.getuid() ).pw_uid
0090         if  not os.path.isfile(proxy):
0091             return ''
0092     return proxy
0093 
0094 def check_glidein():
0095     "Check glideine environment and exit if it is set"
0096     glidein = os.environ.get('GLIDEIN_CMSSite', '')
0097     if  glidein:
0098         msg = "ERROR: das_client is running from GLIDEIN environment, it is prohibited"
0099         print(msg)
0100         sys.exit(EX__BASE)
0101 
0102 def check_auth(key):
0103     "Check if user runs das_client with key/cert and warn users to switch"
0104     if  not key:
0105         msg  = "WARNING: das_client is running without user credentials/X509 proxy, create proxy via 'voms-proxy-init -voms cms -rfc'"
0106         print(msg, file=sys.stderr)
0107 
0108 class DASOptionParser: 
0109     """
0110     DAS cache client option parser
0111     """
0112     def __init__(self):
0113         usage  = "Usage: %prog [options]\n"
0114         usage += "For more help please visit https://cmsweb.cern.ch/das/faq"
0115         self.parser = OptionParser(usage=usage)
0116         self.parser.add_option("-v", "--verbose", action="store", 
0117                                type="int", default=0, dest="verbose",
0118              help="verbose output")
0119         self.parser.add_option("--query", action="store", type="string", 
0120                                default=False, dest="query",
0121              help="specify query for your request")
0122         msg  = "host name of DAS cache server, default is https://cmsweb.cern.ch"
0123         self.parser.add_option("--host", action="store", type="string", 
0124                        default='https://cmsweb.cern.ch', dest="host", help=msg)
0125         msg  = "start index for returned result set, aka pagination,"
0126         msg += " use w/ limit (default is 0)"
0127         self.parser.add_option("--idx", action="store", type="int", 
0128                                default=0, dest="idx", help=msg)
0129         msg  = "number of returned results (default is 10),"
0130         msg += " use --limit=0 to show all results"
0131         self.parser.add_option("--limit", action="store", type="int", 
0132                                default=10, dest="limit", help=msg)
0133         msg  = 'specify return data format (json or plain), default plain.'
0134         self.parser.add_option("--format", action="store", type="string",
0135                                default="plain", dest="format", help=msg)
0136         msg  = 'query waiting threshold in sec, default is 5 minutes'
0137         self.parser.add_option("--threshold", action="store", type="int",
0138                                default=300, dest="threshold", help=msg)
0139         msg  = 'specify private key file name, default $X509_USER_PROXY'
0140         self.parser.add_option("--key", action="store", type="string",
0141                                default=x509(), dest="ckey", help=msg)
0142         msg  = 'specify private certificate file name, default $X509_USER_PROXY'
0143         self.parser.add_option("--cert", action="store", type="string",
0144                                default=x509(), dest="cert", help=msg)
0145         msg  = 'specify CA path, default $X509_CERT_DIR'
0146         self.parser.add_option("--capath", action="store", type="string",
0147                                default=os.environ.get("X509_CERT_DIR", ""),
0148                                dest="capath", help=msg)
0149         msg  = 'specify number of retries upon busy DAS server message'
0150         self.parser.add_option("--retry", action="store", type="string",
0151                                default=0, dest="retry", help=msg)
0152         msg  = 'show DAS headers in JSON format'
0153         msg += ' (obsolete, keep for backward compatibility)'
0154         self.parser.add_option("--das-headers", action="store_true",
0155                                default=False, dest="das_headers", help=msg)
0156         msg = 'specify power base for size_format, default is 10 (can be 2)'
0157         self.parser.add_option("--base", action="store", type="int",
0158                                default=0, dest="base", help=msg)
0159 
0160         msg = 'a file which contains a cached json dictionary for query -> files mapping'
0161         self.parser.add_option("--cache", action="store", type="string",
0162                                default=None, dest="cache", help=msg)
0163 
0164         msg = 'a query cache value'
0165         self.parser.add_option("--query-cache", action="store", type="int",
0166                                default=0, dest="qcache", help=msg)
0167         msg = 'List DAS key/attributes, use "all" or specific DAS key value, e.g. site'
0168         self.parser.add_option("--list-attributes", action="store", type="string",
0169                                default="", dest="keys_attrs", help=msg)
0170     def get_opt(self):
0171         """
0172         Returns parse list of options
0173         """
0174         return self.parser.parse_args()
0175 
0176 def convert_time(val):
0177     "Convert given timestamp into human readable format"
0178     if  isinstance(val, int) or isinstance(val, float):
0179         return time.strftime('%d/%b/%Y_%H:%M:%S_GMT', time.gmtime(val))
0180     return val
0181 
0182 def size_format(uinput, ibase=0):
0183     """
0184     Format file size utility, it converts file size into KB, MB, GB, TB, PB units
0185     """
0186     if  not ibase:
0187         return uinput
0188     try:
0189         num = float(uinput)
0190     except Exception as _exc:
0191         return uinput
0192     if  ibase == 2.: # power of 2
0193         base  = 1024.
0194         xlist = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
0195     else: # default base is 10
0196         base  = 1000.
0197         xlist = ['', 'KB', 'MB', 'GB', 'TB', 'PB']
0198     for xxx in xlist:
0199         if  num < base:
0200             return "%3.1f%s" % (num, xxx)
0201         num /= base
0202 
0203 def unique_filter(rows):
0204     """
0205     Unique filter drop duplicate rows.
0206     """
0207     old_row = {}
0208     row = None
0209     for row in rows:
0210         row_data = dict(row)
0211         try:
0212             del row_data['_id']
0213             del row_data['das']
0214             del row_data['das_id']
0215             del row_data['cache_id']
0216         except:
0217             pass
0218         old_data = dict(old_row)
0219         try:
0220             del old_data['_id']
0221             del old_data['das']
0222             del old_data['das_id']
0223             del old_data['cache_id']
0224         except:
0225             pass
0226         if  row_data == old_data:
0227             continue
0228         if  old_row:
0229             yield old_row
0230         old_row = row
0231     yield row
0232 
0233 def extract_value(row, key, base=10):
0234     """Generator which extracts row[key] value"""
0235     if  isinstance(row, dict) and key in row:
0236         if  key == 'creation_time':
0237             row = convert_time(row[key])
0238         elif  key == 'size':
0239             row = size_format(row[key], base)
0240         else:
0241             row = row[key]
0242         yield row
0243     if  isinstance(row, list) or isinstance(row, GeneratorType):
0244         for item in row:
0245             for vvv in extract_value(item, key, base):
0246                 yield vvv
0247 
0248 def get_value(data, filters, base=10):
0249     """Filter data from a row for given list of filters"""
0250     for ftr in filters:
0251         if  ftr.find('>') != -1 or ftr.find('<') != -1 or ftr.find('=') != -1:
0252             continue
0253         row = dict(data)
0254         values = []
0255         keys = ftr.split('.')
0256         for key in keys:
0257             val = [v for v in extract_value(row, key, base)]
0258             if  key == keys[-1]: # we collect all values at last key
0259                 values += [json.dumps(i) for i in val]
0260             else:
0261                 row = val
0262         if  len(values) == 1:
0263             yield values[0]
0264         else:
0265             yield values
0266 
0267 def fullpath(path):
0268     "Expand path to full path"
0269     if  path and path[0] == '~':
0270         path = path.replace('~', '')
0271         path = path[1:] if path[0] == '/' else path
0272         path = os.path.join(os.environ['HOME'], path)
0273     return path
0274 
0275 def get_data(host, query, idx, limit, debug, threshold=300, ckey=None,
0276         cert=None, capath=None, qcache=0, das_headers=True):
0277     """Contact DAS server and retrieve data for given DAS query"""
0278     params  = {'input':query, 'idx':idx, 'limit':limit}
0279     if  qcache:
0280         params['qcache'] = qcache
0281     path    = '/das/cache'
0282     pat     = re.compile('http[s]{0,1}://')
0283     if  not pat.match(host):
0284         msg = 'Invalid hostname: %s' % host
0285         raise Exception(msg)
0286     url = host + path
0287     client = '%s (%s)' % (DAS_CLIENT, os.environ.get('USER', ''))
0288     headers = {"Accept": "application/json", "User-Agent": client}
0289     encoded_data = urllib.urlencode(params, doseq=True)
0290     url += '?%s' % encoded_data
0291     req  = urllib2.Request(url=url, headers=headers)
0292     if  ckey and cert:
0293         ckey = fullpath(ckey)
0294         cert = fullpath(cert)
0295         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0296     elif cert and capath:
0297         cert = fullpath(cert)
0298         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, capath, debug)
0299     else:
0300         http_hdlr  = urllib2.HTTPHandler(debuglevel=debug)
0301     proxy_handler  = urllib2.ProxyHandler({})
0302     cookie_jar     = cookielib.CookieJar()
0303     cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0304     try:
0305         opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0306         fdesc = opener.open(req)
0307         data = fdesc.read()
0308         fdesc.close()
0309     except urllib2.HTTPError as error:
0310     print(error.read())
0311     sys.exit(1)
0312 
0313     pat = re.compile(r'^[a-z0-9]{32}')
0314     if  data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0315         pid = data
0316     else:
0317         pid = None
0318     iwtime  = 2  # initial waiting time in seconds
0319     wtime   = 20 # final waiting time in seconds
0320     sleep   = iwtime
0321     time0   = time.time()
0322     while pid:
0323         params.update({'pid':data})
0324         encoded_data = urllib.urlencode(params, doseq=True)
0325         url  = host + path + '?%s' % encoded_data
0326         req  = urllib2.Request(url=url, headers=headers)
0327         try:
0328             fdesc = opener.open(req)
0329             data = fdesc.read()
0330             fdesc.close()
0331         except urllib2.HTTPError as err:
0332             return {"status":"fail", "reason":str(err)}
0333         if  data and isinstance(data, str) and pat.match(data) and len(data) == 32:
0334             pid = data
0335         else:
0336             pid = None
0337         time.sleep(sleep)
0338         if  sleep < wtime:
0339             sleep *= 2
0340         elif sleep == wtime:
0341             sleep = iwtime # start new cycle
0342         else:
0343             sleep = wtime
0344         if  (time.time()-time0) > threshold:
0345             reason = "client timeout after %s sec" % int(time.time()-time0)
0346             return {"status":"fail", "reason":reason}
0347     jsondict = json.loads(data)
0348     return jsondict
0349 
0350 def prim_value(row):
0351     """Extract primary key value from DAS record"""
0352     prim_key = row['das']['primary_key']
0353     if  prim_key == 'summary':
0354         return row.get(prim_key, None)
0355     key, att = prim_key.split('.')
0356     if  isinstance(row[key], list):
0357         for item in row[key]:
0358             if  att in item:
0359                 return item[att]
0360     else:
0361         if  key in row:
0362             if  att in row[key]:
0363                 return row[key][att]
0364 
0365 def print_summary(rec):
0366     "Print summary record information on stdout"
0367     if  'summary' not in rec:
0368         msg = 'Summary information is not found in record:\n', rec
0369         raise Exception(msg)
0370     for row in rec['summary']:
0371         keys = [k for k in row.keys()]
0372         maxlen = max([len(k) for k in keys])
0373         for key, val in row.items():
0374             pkey = '%s%s' % (key, ' '*(maxlen-len(key)))
0375             print('%s: %s' % (pkey, val))
0376         print()
0377 
0378 def print_from_cache(cache, query):
0379     "print the list of files reading it from cache"
0380     data = open(cache).read()
0381     jsondict = json.loads(data)
0382     if query in jsondict:
0383       print("\n".join(jsondict[query]))
0384       exit(0)
0385     exit(1)
0386 
0387 def keys_attrs(lkey, oformat, host, ckey, cert, debug=0):
0388     "Contact host for list of key/attributes pairs"
0389     url = '%s/das/keys?view=json' % host
0390     headers = {"Accept": "application/json", "User-Agent": DAS_CLIENT}
0391     req  = urllib2.Request(url=url, headers=headers)
0392     if  ckey and cert:
0393         ckey = fullpath(ckey)
0394         cert = fullpath(cert)
0395         http_hdlr  = HTTPSClientAuthHandler(ckey, cert, debug)
0396     else:
0397         http_hdlr  = urllib2.HTTPHandler(debuglevel=debug)
0398     proxy_handler  = urllib2.ProxyHandler({})
0399     cookie_jar     = cookielib.CookieJar()
0400     cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar)
0401     opener = urllib2.build_opener(http_hdlr, proxy_handler, cookie_handler)
0402     fdesc = opener.open(req)
0403     data = json.load(fdesc)
0404     fdesc.close()
0405     if  oformat.lower() == 'json':
0406         if  lkey == 'all':
0407             print(json.dumps(data))
0408         else:
0409             print(json.dumps({lkey:data[lkey]}))
0410         return
0411     for key, vdict in data.items():
0412         if  lkey == 'all':
0413             pass
0414         elif lkey != key:
0415             continue
0416         print()
0417         print("DAS key:", key)
0418         for attr, examples in vdict.items():
0419             prefix = '    '
0420             print('%s%s' % (prefix, attr))
0421             for item in examples:
0422                 print('%s%s%s' % (prefix, prefix, item))
0423 
0424 def main():
0425     """Main function"""
0426     optmgr  = DASOptionParser()
0427     opts, _ = optmgr.get_opt()
0428     host    = opts.host
0429     debug   = opts.verbose
0430     query   = opts.query
0431     idx     = opts.idx
0432     limit   = opts.limit
0433     thr     = opts.threshold
0434     ckey    = opts.ckey
0435     cert    = opts.cert
0436     capath  = opts.capath
0437     base    = opts.base
0438     qcache  = opts.qcache
0439     check_glidein()
0440     check_auth(ckey)
0441     if  opts.keys_attrs:
0442         keys_attrs(opts.keys_attrs, opts.format, host, ckey, cert, debug)
0443         return
0444     if  not query:
0445         print('Input query is missing')
0446         sys.exit(EX_USAGE)
0447     if  opts.format == 'plain':
0448         jsondict = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0449         cli_msg  = jsondict.get('client_message', None)
0450         if  cli_msg:
0451             print("DAS CLIENT WARNING: %s" % cli_msg)
0452         if  'status' not in jsondict and opts.cache:
0453             print_from_cache(opts.cache, query)
0454         if  'status' not in jsondict:
0455             print('DAS record without status field:\n%s' % jsondict)
0456             sys.exit(EX_PROTOCOL)
0457         if  jsondict["status"] != 'ok' and opts.cache:
0458             print_from_cache(opts.cache, query)
0459         if  jsondict['status'] != 'ok':
0460             print("status: %s, reason: %s" \
0461                 % (jsondict.get('status'), jsondict.get('reason', 'N/A')))
0462             if  opts.retry:
0463                 found = False
0464                 for attempt in xrange(1, int(opts.retry)):
0465                     interval = log(attempt)**5
0466                     print("Retry in %5.3f sec" % interval)
0467                     time.sleep(interval)
0468                     data = get_data(host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0469                     jsondict = json.loads(data)
0470                     if  jsondict.get('status', 'fail') == 'ok':
0471                         found = True
0472                         break
0473             else:
0474                 sys.exit(EX_TEMPFAIL)
0475             if  not found:
0476                 sys.exit(EX_TEMPFAIL)
0477         nres = jsondict.get('nresults', 0)
0478         if  not limit:
0479             drange = '%s' % nres
0480         else:
0481             drange = '%s-%s out of %s' % (idx+1, idx+limit, nres)
0482         if  opts.limit:
0483             msg  = "\nShowing %s results" % drange
0484             msg += ", for more results use --idx/--limit options\n"
0485             print(msg)
0486         mongo_query = jsondict.get('mongo_query', {})
0487         unique  = False
0488         fdict   = mongo_query.get('filters', {})
0489         filters = fdict.get('grep', [])
0490         aggregators = mongo_query.get('aggregators', [])
0491         if  'unique' in fdict.keys():
0492             unique = True
0493         if  filters and not aggregators:
0494             data = jsondict['data']
0495             if  isinstance(data, dict):
0496                 rows = [r for r in get_value(data, filters, base)]
0497                 print(' '.join(rows))
0498             elif isinstance(data, list):
0499                 if  unique:
0500                     data = unique_filter(data)
0501                 for row in data:
0502                     rows = [r for r in get_value(row, filters, base)]
0503                     types = [type(r) for r in rows]
0504                     if  len(types)>1: # mixed types print as is
0505                         print(' '.join([str(r) for r in rows]))
0506                     elif isinstance(rows[0], list):
0507                         out = set()
0508                         for item in rows:
0509                             for elem in item:
0510                                 out.add(elem)
0511                         print(' '.join(out))
0512                     else:
0513                         print(' '.join(rows))
0514             else:
0515                 print(json.dumps(jsondict))
0516         elif aggregators:
0517             data = jsondict['data']
0518             if  unique:
0519                 data = unique_filter(data)
0520             for row in data:
0521                 if  row['key'].find('size') != -1 and \
0522                     row['function'] == 'sum':
0523                     val = size_format(row['result']['value'], base)
0524                 else:
0525                     val = row['result']['value']
0526                 print('%s(%s)=%s' \
0527                 % (row['function'], row['key'], val))
0528         else:
0529             data = jsondict['data']
0530             if  isinstance(data, list):
0531                 old = None
0532                 val = None
0533                 for row in data:
0534                     prim_key = row.get('das', {}).get('primary_key', None)
0535                     if  prim_key == 'summary':
0536                         print_summary(row)
0537                         return
0538                     val = prim_value(row)
0539                     if  not opts.limit:
0540                         if  val != old:
0541                             print(val)
0542                             old = val
0543                     else:
0544                         print(val)
0545                 if  val != old and not opts.limit:
0546                     print(val)
0547             elif isinstance(data, dict):
0548                 print(prim_value(data))
0549             else:
0550                 print(data)
0551     else:
0552         jsondict = get_data(\
0553                 host, query, idx, limit, debug, thr, ckey, cert, capath, qcache)
0554         print(json.dumps(jsondict))
0555 
0556 #
0557 # main
0558 #
0559 if __name__ == '__main__':
0560     main()