File indexing completed on 2024-12-01 23:40:44
0001
0002
0003 import os, urllib, urllib2, httplib, cookielib, sys, HTMLParser, re
0004 from optparse import OptionParser
0005
0006 def getFile(path):
0007 npath = os.path.expanduser(path)
0008 while os.path.islink(npath):
0009 path = os.readlink(npath)
0010 if path[0] != "/": path = os.path.join(os.path.dirname(npath),path)
0011 npath = path
0012 return npath
0013
0014 class HTTPSClientAuthHandler(urllib2.HTTPSHandler):
0015 def __init__(self, key, cert):
0016 urllib2.HTTPSHandler.__init__(self)
0017 self.key = getFile(key)
0018 self.cert = getFile(cert)
0019
0020 def https_open(self, req):
0021 return self.do_open(self.getConnection, req)
0022
0023 def getConnection(self, host, timeout=300):
0024 return httplib.HTTPSConnection(host, key_file=self.key, cert_file=self.cert)
0025
0026 def _getResponse(opener, url, post_data=None, debug=False):
0027 response = opener.open(url, post_data)
0028 if debug:
0029 sys.stderr.write("Code: %s\n" % response.code)
0030 sys.stderr.write("Headers: %s\n" % response.headers)
0031 sys.stderr.write("Msg: %s\n" % response.msg)
0032 sys.stderr.write("Url: %s\n" % response.url)
0033 return response
0034
0035 def getResponseContent(opener, url, post_data=None, debug=False):
0036 return _getResponse(opener, url, post_data, debug).read()
0037
0038 def getResponseURL(opener, url, post_data=None, debug=False):
0039 return urllib2.unquote(_getResponse(opener, url, post_data, debug).url)
0040
0041 def getParentURL(url):
0042 items = url.split("/")
0043 return '%s//%s/%s/' % (items[0],items[2],items[3])
0044
0045 def getSSOCookie(opener, target_url, cookie, debug=False):
0046 opener.addheaders = [('User-agent', 'curl-sso-certificate/0.0.2')]
0047 url = getResponseURL(opener, getParentURL(target_url), debug=debug)
0048 content = getResponseContent(opener, url, debug=debug)
0049 ret = re.search('<form .+? action="(.+?)">', content)
0050 if ret == None:
0051 raise Exception("error: The page doesn't have the form with adfs url, check 'User-agent' header")
0052 url = urllib2.unquote(ret.group(1))
0053 h = HTMLParser.HTMLParser()
0054 post_data_local = ''
0055 for match in re.finditer('input type="hidden" name="([^"]*)" value="([^"]*)"', content):
0056 post_data_local += "&%s=%s" % (match.group(1), urllib.quote(h.unescape(match.group(2))))
0057 is_link_found = True
0058
0059 if not is_link_found:
0060 raise Exception("error: The page doesn't have the form with security attributes, check 'User-agent' header")
0061 post_data_local = post_data_local[1:]
0062 getResponseContent(opener, url, post_data_local, debug)
0063
0064 def getContent(target_url, cert_path, key_path, post_data=None, debug=False, adfslogin=None):
0065 opener = urllib2.build_opener(urllib2.HTTPSHandler())
0066 if adfslogin:
0067 opener.addheaders = [('Adfs-Login', adfslogin)]
0068
0069
0070 try:
0071 content = getResponseContent(opener, target_url, post_data, debug)
0072 if not 'Sign in with your CERN account' in content:
0073 return content
0074 except Exception:
0075 if debug:
0076 sys.stderr.write("The request has an error, will try to create a new cookie\n")
0077
0078 cookie = cookielib.CookieJar()
0079 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie), HTTPSClientAuthHandler(key_path, cert_path))
0080 if debug:
0081 sys.stderr.write("The return page is sso login page, will request cookie.")
0082 hasCookie = False
0083
0084 try:
0085 getSSOCookie(opener, target_url, cookie, debug)
0086 hasCookie = True
0087 result = getResponseContent(opener, target_url, post_data, debug)
0088 except Exception as e:
0089 result = ""
0090 print(sys.stderr.write("ERROR:"+str(e)))
0091 if hasCookie:
0092 burl = getParentURL(target_url)
0093 try:
0094 _getResponse(opener, burl+"signOut").read()
0095 _getResponse(opener, "https://login.cern.ch/adfs/ls/?wa=wsignout1.0").read()
0096 except:
0097 sys.stderr.write("Error, could not logout correctly from server")
0098 return result
0099
0100 def checkRequiredArguments(opts, parser):
0101 missing_options = []
0102 for option in parser.option_list:
0103 if re.match(r'^\[REQUIRED\]', option.help) and eval('opts. %s' % option.dest) == None:
0104 missing_options.extend(option._long_opts)
0105 if len(missing_options) > 0:
0106 parser.error('Missing REQUIRED parameters: %s' % str(missing_options))
0107
0108 if __name__ == "__main__":
0109 parser = OptionParser(usage="%prog [-d(ebug)] -o(ut) COOKIE_FILENAME -c(cert) CERN-PEM -k(ey) CERT-KEY -u(rl) URL")
0110 parser.add_option("-d", "--debug", dest="debug", help="Enable pycurl debugging. Prints to data and headers to stderr.", action="store_true", default=False)
0111 parser.add_option("-p", "--postdata", dest="postdata", help="Data to be sent as post request", action="store", default=None)
0112 parser.add_option("-c", "--cert", dest="cert_path", help="[REQUIRED] Absolute path to cert file.", action="store")
0113 parser.add_option("-k", "--key", dest="key_path", help="[REQUIRED] Absolute path to key file.", action="store")
0114 parser.add_option("-u", "--url", dest="url", help="[REQUIRED] Url to a service behind the SSO", action="store")
0115 (opts, args) = parser.parse_args()
0116 checkRequiredArguments(opts, parser)
0117 content = getContent(opts.url, opts.cert_path, opts.key_path, opts.postdata, opts.debug)
0118 print(content)