File indexing completed on 2024-11-25 02:29:20
0001
0002
0003
0004 import logging
0005 import json
0006 import os
0007 import sys
0008 import time
0009 import subprocess
0010
0011 import pycurl
0012
0013 tier0Url = os.getenv('TIER0_API_URL', 'https://cmsweb.cern.ch/t0wmadatasvc/prod/')
0014
0015 class Tier0Error(Exception):
0016 '''Tier0 exception.
0017 '''
0018
0019 def __init__(self, message):
0020 self.args = (message, )
0021
0022
0023 def unique(seq, keepstr=True):
0024 t = type(seq)
0025 if t is str:
0026 t = (list, t('').join)[bool(keepstr)]
0027 try:
0028 remaining = set(seq)
0029 seen = set()
0030 return t(c for c in seq if (c in remaining and not remaining.remove(c)))
0031 except TypeError:
0032 try:
0033 from itertools import groupby
0034 s = sorted(enumerate(seq),key=lambda i_v1:(i_v1[1],i_v1[0]))
0035 return t(next(g) for k,g in groupby(s, lambda i_v: i_v[1]))
0036 except:
0037 seen = []
0038 return t(c for c in seq if not (c in seen or seen.append(c)))
0039
0040
0041 class ResponseError( Tier0Error ):
0042
0043 def __init__( self, curl, response, proxy, timeout, maxTime ):
0044 super( ResponseError, self ).__init__( response )
0045 self.args += ( curl, proxy )
0046 self.timeout = timeout
0047 self.maxTime = maxTime
0048
0049 def __str__(self):
0050 errStr = f'Wrong response for curl connection to Tier0DataSvc'\
0051 f' from URL "{self.args[1].getinfo(self.args[1].EFFECTIVE_URL)}"'
0052 if self.args[-1]:
0053 errStr += f' using proxy "{str(self.args[-1])}"'
0054 errStr += f' with connection-timeout "{self.timeout}", max-time "{self.maxtime}"'\
0055 f' with error code "{self.args[1].getinfo(self.args[1].RESPONSE_CODE)}".'
0056 if '<p>' in self.args[0]:
0057 full_response = self.args[0].partition('<p>')[-1].rpartition('</p>')[0]
0058 errStr += f'\nFull response: "{full_response}".'
0059 else:
0060 errStr += f'\nFull response: "{self.args[0]}".'
0061
0062 return errStr
0063
0064
0065
0066
0067 def _raise_http_error( curl, response, proxy, timeout, maxTime ):
0068 raise ResponseError( curl, response, proxy, timeout, maxTime )
0069
0070 class Tier0Handler( object ):
0071
0072 def __init__( self, uri, timeOut, maxTime, retries, retryPeriod, proxy, debug ):
0073 """
0074 Parameters:
0075 uri: Tier0DataSvc URI;
0076 timeOut: time out for connection of Tier0DataSvc HTTPS calls [seconds];
0077 maxTime: maximum time for Tier0DataSvc HTTPS calls (including data transfer) [seconds];
0078 retries: maximum retries for Tier0DataSvc HTTPS calls;
0079 retryPeriod: sleep time between two Tier0DataSvc HTTPS calls [seconds];
0080 proxy: HTTP proxy for accessing Tier0DataSvc HTTPS calls;
0081 debug: if set to True, enables debug information.
0082 """
0083 self._uri = uri
0084 self._timeOut = timeOut
0085 self._maxTime = maxTime
0086 self._retries = retries
0087 self._retryPeriod = retryPeriod
0088 self._proxy = proxy
0089 self._debug = debug
0090
0091 def setDebug( self ):
0092 self._debug = True
0093
0094 def unsetDebug( self ):
0095 self._debug = False
0096
0097 def setProxy( self, proxy ):
0098 self._proxy = proxy
0099
0100 def _getCerts( self ) -> str:
0101 cert_path = os.getenv('X509_USER_CERT', '')
0102 key_path = os.getenv('X509_USER_KEY', '')
0103
0104 certs = ""
0105 if cert_path:
0106 certs += f' --cert {cert_path}'
0107 else:
0108 logging.warning("No certificate provided for Tier0 access, use X509_USER_CERT and"
0109 " optionally X509_USER_KEY env variables to specify the path to the cert"
0110 " (and the key unless included in the cert file)")
0111 if key_path:
0112 certs += f' --key {key_path}'
0113 return certs
0114
0115 def _curlQueryTier0( self, url:str, force_debug:bool = False, force_cert:bool = False):
0116 userAgent = "User-Agent: ConditionWebServices/1.0 python/%d.%d.%d PycURL/%s" \
0117 % ( sys.version_info[ :3 ] + ( pycurl.version_info()[ 1 ], ) )
0118 debug = "-v" if self._debug or force_debug else "-s -S"
0119
0120 proxy = f"--proxy {self._proxy}" if self._proxy else ""
0121 certs = self._getCerts() if not self._proxy or force_cert else ""
0122
0123 cmd = f'/usr/bin/curl -k -L --user-agent "{userAgent}" {proxy}'\
0124 f' --connect-timeout {self._timeOut} --max-time {self._maxTime} --retry {self._retries}'\
0125 f' {debug} {url} {certs}'
0126
0127
0128 start = time.time()
0129 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0130 (stdoutdata, stderrdata) = process.communicate()
0131 end = time.time()
0132 return process.returncode, stdoutdata, stderrdata, end-start
0133
0134 def _queryTier0DataSvc( self, url ):
0135 """
0136 Queries Tier0DataSvc.
0137 url: Tier0DataSvc URL.
0138 @returns: dictionary, from whence the required information must be retrieved according to the API call.
0139 Raises if connection error, bad response, or timeout after retries occur.
0140 """
0141
0142 retcode, stdoutdata, stderrdata, query_time = self._curlQueryTier0(url)
0143
0144 if retcode != 0 or stderrdata:
0145
0146
0147 msg = "looks like curl returned an error: retcode=%s and took %s seconds" % (retcode, query_time,)
0148 msg += ' msg = "'+str(stderrdata)+'"'
0149 logging.error(msg)
0150 if self._proxy:
0151 logging.info("before assumed proxy provides authentication, now trying with both proxy and certificate")
0152
0153 time.sleep(self._retryPeriod)
0154 retcode, stdoutdata, stderrdata, query_time = self._curlQueryTier0(url, force_debug=True, force_cert=True)
0155 if retcode != 0:
0156 msg = "looks like curl returned an error for the second time: retcode=%s" % (retcode,)
0157 msg += ' msg = "'+str(stderrdata)+'"'
0158 logging.error(msg)
0159 raise Tier0Error(msg)
0160 else:
0161 msg = "curl returned ok upon the second try"
0162 logging.info(msg)
0163 resp = json.loads( ''.join(stdoutdata.decode()).replace( "'", '"').replace(' None', ' "None"') )
0164 return resp
0165
0166
0167 def getFirstSafeRun( self ):
0168 """
0169 Queries Tier0DataSvc to get the first condition safe run.
0170 Parameters:
0171 @returns: integer, the run number.
0172 Raises if connection error, bad response, timeout after retries occur, or if the run number is not available.
0173 """
0174 firstConditionSafeRunAPI = "firstconditionsaferun"
0175 safeRunDict = self._queryTier0DataSvc( os.path.join( self._uri, firstConditionSafeRunAPI ) )
0176 if safeRunDict is None:
0177 errStr = """First condition safe run is not available in Tier0DataSvc from URL \"%s\"""" \
0178 %( os.path.join( self._uri, firstConditionSafeRunAPI ), )
0179 if self._proxy:
0180 errStr += """ using proxy \"%s\".""" %( str( self._proxy ), )
0181 raise Tier0Error( errStr )
0182 return int(safeRunDict['result'][0])
0183
0184 def getGlobalTag( self, config ):
0185 """
0186 Queries Tier0DataSvc to get the most recent Global Tag for a given workflow.
0187 Parameters:
0188 config: Tier0DataSvc API call for the workflow to be looked for;
0189 @returns: a string with the Global Tag name.
0190 Raises if connection error, bad response, timeout after retries occur, or if no Global Tags are available.
0191 """
0192 data = self._queryTier0DataSvc( os.path.join( self._uri, config ) )
0193 gtnames = sorted(unique( [ str( di['global_tag'] ) for di in data['result'] if di['global_tag'] is not None ] ))
0194 try:
0195 recentGT = gtnames[-1]
0196 return recentGT
0197 except IndexError:
0198 errStr = """No Global Tags for \"%s\" are available in Tier0DataSvc from URL \"%s\"""" \
0199 %( config, os.path.join( self._uri, config ) )
0200 if self._proxy:
0201 errStr += """ using proxy \"%s\".""" %( str( self._proxy ), )
0202 raise Tier0Error( errStr )
0203
0204
0205 def test( url ):
0206 t0 = Tier0Handler( url, 1, 5, 1, 10, None, debug=False)
0207
0208 print(' fcsr = %s (%s)' % (t0.getFirstSafeRun(), type(t0.getFirstSafeRun()) ))
0209 print(' reco_config = %s' % t0.getGlobalTag('reco_config'))
0210 print(' express_config = %s' % t0.getGlobalTag('express_config'))
0211 print('\n')
0212
0213
0214 if __name__ == '__main__':
0215 test( tier0Url )