Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 11:26:49

0001 #!/usr/bin/env python3
0002 ###Description: The tool reads cern web services behind SSO using user certificates
0003 from __future__ import print_function
0004 import os, urllib, urllib2, httplib, cookielib, sys, HTMLParser, re
0005 from optparse import OptionParser
0006 
0007 def getFile(path):
0008   npath = os.path.expanduser(path)
0009   while os.path.islink(npath):
0010     path = os.readlink(npath)
0011     if path[0] != "/": path = os.path.join(os.path.dirname(npath),path)
0012     npath = path
0013   return npath
0014 
0015 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):  
0016   def __init__(self, key, cert):  
0017     urllib2.HTTPSHandler.__init__(self)  
0018     self.key = getFile(key)  
0019     self.cert = getFile(cert) 
0020 
0021   def https_open(self, req):  
0022     return self.do_open(self.getConnection, req)  
0023 
0024   def getConnection(self, host, timeout=300):  
0025     return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)
0026 
0027 def _getResponse(opener, url, post_data=None, debug=False):
0028   response = opener.open(url, post_data)
0029   if debug:
0030     sys.stderr.write("Code: %s\n" % response.code)
0031     sys.stderr.write("Headers: %s\n" % response.headers)
0032     sys.stderr.write("Msg: %s\n" % response.msg)
0033     sys.stderr.write("Url: %s\n" % response.url)
0034   return response
0035 
0036 def getResponseContent(opener, url, post_data=None, debug=False):
0037   return _getResponse(opener, url, post_data, debug).read()
0038 
0039 def getResponseURL(opener, url, post_data=None, debug=False):
0040   return urllib2.unquote(_getResponse(opener, url, post_data, debug).url)
0041 
0042 def getParentURL(url):
0043   items = url.split("/")
0044   return '%s//%s/%s/' % (items[0],items[2],items[3])
0045 
0046 def getSSOCookie(opener, target_url, cookie, debug=False):
0047   opener.addheaders = [('User-agent', 'curl-sso-certificate/0.0.2')] #in sync with cern-get-sso-cookie tool
0048   url = getResponseURL(opener, getParentURL(target_url), debug=debug)
0049   content = getResponseContent(opener, url, debug=debug)
0050   ret = re.search('<form .+? action="(.+?)">', content)
0051   if ret == None:
0052     raise Exception("error: The page doesn't have the form with adfs url, check 'User-agent' header")
0053   url = urllib2.unquote(ret.group(1))
0054   h = HTMLParser.HTMLParser()
0055   post_data_local = ''
0056   for match in re.finditer('input type="hidden" name="([^"]*)" value="([^"]*)"', content):
0057     post_data_local += "&%s=%s" % (match.group(1), urllib.quote(h.unescape(match.group(2))))
0058     is_link_found = True
0059   
0060   if not is_link_found:
0061     raise Exception("error: The page doesn't have the form with security attributes, check 'User-agent' header")
0062   post_data_local = post_data_local[1:] #remove first &
0063   getResponseContent(opener, url, post_data_local, debug)
0064 
0065 def getContent(target_url, cert_path, key_path, post_data=None, debug=False, adfslogin=None):
0066   opener = urllib2.build_opener(urllib2.HTTPSHandler())
0067   if adfslogin:
0068     opener.addheaders = [('Adfs-Login', adfslogin)] #local version of tc test
0069   
0070   #try to access the url first
0071   try:
0072     content = getResponseContent(opener, target_url, post_data, debug)
0073     if not 'Sign in with your CERN account' in content:
0074       return content
0075   except Exception:
0076     if debug:
0077       sys.stderr.write("The request has an error, will try to create a new cookie\n")
0078 
0079   cookie = cookielib.CookieJar()
0080   opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie), HTTPSClientAuthHandler(key_path, cert_path))  #will use private key and ceritifcate
0081   if debug:
0082     sys.stderr.write("The return page is sso login page, will request cookie.")
0083   hasCookie = False
0084   # if the access gave an exception, try to get a cookie
0085   try:
0086     getSSOCookie(opener, target_url, cookie, debug)
0087     hasCookie = True 
0088     result = getResponseContent(opener, target_url, post_data, debug)
0089   except Exception as e:
0090     result = ""
0091     print(sys.stderr.write("ERROR:"+str(e)))
0092   if hasCookie:
0093     burl = getParentURL(target_url)
0094     try:
0095       _getResponse(opener, burl+"signOut").read()
0096       _getResponse(opener, "https://login.cern.ch/adfs/ls/?wa=wsignout1.0").read()
0097     except:
0098       sys.stderr.write("Error, could not logout correctly from server") 
0099   return result
0100 
0101 def checkRequiredArguments(opts, parser):
0102   missing_options = []
0103   for option in parser.option_list:
0104     if re.match(r'^\[REQUIRED\]', option.help) and eval('opts. %s' % option.dest) == None:
0105       missing_options.extend(option._long_opts)
0106     if len(missing_options) > 0:
0107       parser.error('Missing REQUIRED parameters: %s' % str(missing_options))    
0108 
0109 if __name__ == "__main__":
0110   parser = OptionParser(usage="%prog [-d(ebug)] -o(ut) COOKIE_FILENAME -c(cert) CERN-PEM -k(ey) CERT-KEY -u(rl) URL") 
0111   parser.add_option("-d", "--debug", dest="debug", help="Enable pycurl debugging. Prints to data and headers to stderr.", action="store_true", default=False)
0112   parser.add_option("-p", "--postdata", dest="postdata", help="Data to be sent as post request", action="store", default=None)
0113   parser.add_option("-c", "--cert", dest="cert_path", help="[REQUIRED] Absolute path to cert file.", action="store")
0114   parser.add_option("-k", "--key", dest="key_path", help="[REQUIRED] Absolute path to key file.", action="store")
0115   parser.add_option("-u", "--url", dest="url", help="[REQUIRED] Url to a service behind the SSO", action="store")
0116   (opts, args) = parser.parse_args()
0117   checkRequiredArguments(opts, parser)
0118   content = getContent(opts.url, opts.cert_path, opts.key_path, opts.postdata, opts.debug)
0119   print(content)