Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:20

0001 
0002 #-toDo: move this to common?
0003 
0004 import logging
0005 import json
0006 import os
0007 import sys
0008 import time
0009 import subprocess
0010 
0011 import pycurl
0012 
0013 tier0Url = os.getenv('TIER0_API_URL', 'https://cmsweb.cern.ch/t0wmadatasvc/prod/')
0014 
0015 class Tier0Error(Exception):
0016     '''Tier0 exception.
0017     '''
0018 
0019     def __init__(self, message):
0020         self.args = (message, )
0021 
0022 
0023 def unique(seq, keepstr=True):
0024     t = type(seq)
0025     if t is str:
0026         t = (list, t('').join)[bool(keepstr)]
0027     try:
0028         remaining = set(seq)
0029         seen = set()
0030         return t(c for c in seq if (c in remaining and not remaining.remove(c)))
0031     except TypeError: # hashing didn't work, see if seq is sortable
0032         try:
0033             from itertools import groupby
0034             s = sorted(enumerate(seq),key=lambda i_v1:(i_v1[1],i_v1[0]))
0035             return t(next(g) for k,g in groupby(s, lambda i_v: i_v[1]))
0036         except:  # not sortable, use brute force
0037             seen = []
0038             return t(c for c in seq if not (c in seen or seen.append(c)))
0039 
0040 #note: this exception seems unused
0041 class ResponseError( Tier0Error ):
0042 
0043     def __init__( self, curl, response, proxy, timeout, maxTime ):
0044         super( ResponseError, self ).__init__( response )
0045         self.args += ( curl, proxy )
0046         self.timeout = timeout
0047         self.maxTime = maxTime
0048 
0049     def __str__(self):
0050         errStr = f'Wrong response for curl connection to Tier0DataSvc'\
0051                  f' from URL "{self.args[1].getinfo(self.args[1].EFFECTIVE_URL)}"'
0052         if self.args[-1]:
0053             errStr += f' using proxy "{str(self.args[-1])}"'
0054         errStr += f' with connection-timeout "{self.timeout}", max-time "{self.maxtime}"'\
0055                   f' with error code "{self.args[1].getinfo(self.args[1].RESPONSE_CODE)}".'
0056         if '<p>' in self.args[0]:
0057             full_response = self.args[0].partition('<p>')[-1].rpartition('</p>')[0]
0058             errStr += f'\nFull response: "{full_response}".'
0059         else:
0060             errStr += f'\nFull response: "{self.args[0]}".'
0061         
0062         return errStr
0063 
0064 #TODO: Add exceptions for each category of HTTP error codes
0065 #TODO: check response code and raise corresponding exceptions
0066 #note: this function seems to be unused
0067 def _raise_http_error( curl, response, proxy, timeout, maxTime ):
0068     raise ResponseError( curl, response, proxy, timeout, maxTime )
0069 
0070 class Tier0Handler( object ):
0071 
0072     def __init__( self, uri, timeOut, maxTime, retries, retryPeriod, proxy, debug ):
0073         """
0074         Parameters:
0075         uri: Tier0DataSvc URI;
0076         timeOut: time out for connection of Tier0DataSvc HTTPS calls [seconds];
0077         maxTime: maximum time for Tier0DataSvc HTTPS calls (including data transfer) [seconds];
0078         retries: maximum retries for Tier0DataSvc HTTPS calls;
0079         retryPeriod: sleep time between two Tier0DataSvc HTTPS calls [seconds];
0080         proxy: HTTP proxy for accessing Tier0DataSvc HTTPS calls;
0081         debug: if set to True, enables debug information.
0082         """
0083         self._uri = uri
0084         self._timeOut = timeOut
0085         self._maxTime = maxTime
0086         self._retries = retries
0087         self._retryPeriod = retryPeriod
0088         self._proxy = proxy
0089         self._debug = debug
0090 
0091     def setDebug( self ):
0092         self._debug = True
0093 
0094     def unsetDebug( self ):
0095         self._debug = False
0096 
0097     def setProxy( self, proxy ):
0098         self._proxy = proxy
0099 
0100     def _getCerts( self ) -> str:
0101         cert_path = os.getenv('X509_USER_CERT', '')
0102         key_path = os.getenv('X509_USER_KEY', '')
0103         
0104         certs = ""
0105         if cert_path:
0106             certs += f' --cert {cert_path}'
0107         else:
0108             logging.warning("No certificate provided for Tier0 access, use X509_USER_CERT and"
0109                             " optionally X509_USER_KEY env variables to specify the path to the cert"
0110                             " (and the key unless included in the cert file)")
0111         if key_path:
0112             certs += f' --key {key_path}'
0113         return certs
0114 
0115     def _curlQueryTier0( self, url:str, force_debug:bool = False, force_cert:bool = False):
0116         userAgent = "User-Agent: ConditionWebServices/1.0 python/%d.%d.%d PycURL/%s" \
0117             % ( sys.version_info[ :3 ] + ( pycurl.version_info()[ 1 ], ) )
0118         debug = "-v" if self._debug or force_debug else "-s -S"
0119 
0120         proxy = f"--proxy {self._proxy}" if self._proxy else ""
0121         certs = self._getCerts() if not self._proxy or force_cert else ""
0122         
0123         cmd = f'/usr/bin/curl -k -L --user-agent "{userAgent}" {proxy}'\
0124               f' --connect-timeout {self._timeOut} --max-time {self._maxTime} --retry {self._retries}'\
0125               f' {debug} {url} {certs}'
0126 
0127         # time the curl to understand if re-tries have been carried out
0128         start = time.time()
0129         process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0130         (stdoutdata, stderrdata) =  process.communicate()
0131         end = time.time()
0132         return process.returncode, stdoutdata, stderrdata, end-start
0133 
0134     def _queryTier0DataSvc( self, url ):
0135         """
0136         Queries Tier0DataSvc.
0137         url: Tier0DataSvc URL.
0138         @returns: dictionary, from whence the required information must be retrieved according to the API call.
0139         Raises if connection error, bad response, or timeout after retries occur.
0140         """
0141 
0142         retcode, stdoutdata, stderrdata, query_time = self._curlQueryTier0(url)
0143 
0144         if retcode != 0 or stderrdata:
0145 
0146             # if the first curl has failed, logg its stderror and prepare and independent retry
0147             msg = "looks like curl returned an error: retcode=%s and took %s seconds" % (retcode, query_time,)
0148             msg += ' msg = "'+str(stderrdata)+'"'
0149             logging.error(msg)
0150             if self._proxy:
0151                 logging.info("before assumed proxy provides authentication, now trying with both proxy and certificate")
0152                 
0153             time.sleep(self._retryPeriod)
0154             retcode, stdoutdata, stderrdata, query_time = self._curlQueryTier0(url, force_debug=True, force_cert=True)
0155             if retcode != 0:
0156                 msg = "looks like curl returned an error for the second time: retcode=%s" % (retcode,)
0157                 msg += ' msg = "'+str(stderrdata)+'"'
0158                 logging.error(msg)
0159                 raise Tier0Error(msg)
0160             else:
0161                 msg = "curl returned ok upon the second try"
0162                 logging.info(msg)
0163         resp = json.loads( ''.join(stdoutdata.decode()).replace( "'", '"').replace(' None', ' "None"') )
0164         return resp
0165 
0166 
0167     def getFirstSafeRun( self ):
0168         """
0169         Queries Tier0DataSvc to get the first condition safe run.
0170         Parameters:
0171         @returns: integer, the run number.
0172         Raises if connection error, bad response, timeout after retries occur, or if the run number is not available.
0173         """
0174         firstConditionSafeRunAPI = "firstconditionsaferun"
0175         safeRunDict = self._queryTier0DataSvc( os.path.join( self._uri, firstConditionSafeRunAPI ) )
0176         if safeRunDict is None:
0177             errStr = """First condition safe run is not available in Tier0DataSvc from URL \"%s\"""" \
0178                 %( os.path.join( self._uri, firstConditionSafeRunAPI ), )
0179             if self._proxy:
0180                 errStr += """ using proxy \"%s\".""" %( str( self._proxy ), )
0181             raise Tier0Error( errStr )
0182         return int(safeRunDict['result'][0])
0183 
0184     def getGlobalTag( self, config ):
0185         """
0186         Queries Tier0DataSvc to get the most recent Global Tag for a given workflow.
0187         Parameters:
0188         config: Tier0DataSvc API call for the workflow to be looked for;
0189         @returns: a string with the Global Tag name.
0190         Raises if connection error, bad response, timeout after retries occur, or if no Global Tags are available.
0191         """
0192         data = self._queryTier0DataSvc( os.path.join( self._uri, config ) )
0193         gtnames = sorted(unique( [ str( di['global_tag'] ) for di in data['result'] if di['global_tag'] is not None ] ))
0194         try:
0195             recentGT = gtnames[-1]
0196             return recentGT
0197         except IndexError:
0198             errStr = """No Global Tags for \"%s\" are available in Tier0DataSvc from URL \"%s\"""" \
0199                 %( config, os.path.join( self._uri, config ) )
0200             if self._proxy:
0201                 errStr += """ using proxy \"%s\".""" %( str( self._proxy ), )
0202         raise Tier0Error( errStr )
0203 
0204 
0205 def test( url ):
0206     t0 = Tier0Handler( url, 1, 5, 1, 10, None, debug=False)
0207 
0208     print('   fcsr = %s (%s)' % (t0.getFirstSafeRun(), type(t0.getFirstSafeRun()) ))
0209     print('   reco_config = %s' % t0.getGlobalTag('reco_config'))
0210     print('   express_config = %s' % t0.getGlobalTag('express_config'))
0211     print('\n')
0212 
0213 
0214 if __name__ == '__main__':
0215     test( tier0Url )