File indexing completed on 2024-04-06 12:10:08
0001 from builtins import range
0002 from io import StringIO
0003 from io import BytesIO
0004 from pycurl import *
0005
0006 class RequestManager:
0007 """Manager of multiple concurrent or overlapping HTTP requests.
0008
0009 This is a utility class acting as a pump of several overlapping
0010 HTTP requests against any number of HTTP or HTTPS servers. It
0011 uses a configurable number of simultaneous connections, ten by
0012 default. The actual connection layer is handled using curl, and
0013 the client classes need to aware of this to a limited degree.
0014
0015 The client supplies optional callback methods for initialising,
0016 responding and handling errors on connections. At the very least
0017 the request response callback should be defined.
0018
0019 This class is not designed for multi-threaded use. It employs
0020 overlapping requests, but in a single thread. Only one thread
0021 at a time should be calling `process()`; several threads may
0022 call `.put()` provided the caller uses a mutex so that only one
0023 thread calls into the method at a time."""
0024
0025 def __init__(self, num_connections = 10, ssl_opts = None,
0026 user_agent = None, request_headers = None,
0027 request_init = None, request_respond = None,
0028 request_error = None, handle_init = None):
0029 """Initialise the request manager. The arguments are:
0030
0031 :arg num_connections: maximum number of simultaneous connections.
0032 :arg ssl_opts: optional SSLOptions (Monitoring.Core.X509) for SSL
0033 X509 parametre values, e.g. for X509 client authentication.
0034 :arg user_agent: sets user agent identification string if defined.
0035 :arg request_headers: if defined, specifies list of additional HTTP
0036 request headers to be added to each request.
0037 :arg request_init: optional callback to initialise requests; the
0038 default assumes each task is a URL to access and sets the `URL`
0039 property on the curl object to the task value.
0040 :arg request_respond: callback for handling responses; at the very
0041 minimum this should be defined as the default one does nothing.
0042 :arg request_error: callback for handling connection errors; the
0043 default one raises a RuntimeException.
0044 :arg handle_init: callback for customising connection handles at
0045 creation time; the callback will be invoked for each connection
0046 object as it's created and queued to the idle connection list."""
0047 self.request_respond = request_respond or self._request_respond
0048 self.request_error = request_error or self._request_error
0049 self.request_init = request_init or self._request_init
0050 self.cm = CurlMulti()
0051 self.handles = [Curl() for i in range(0, num_connections)]
0052 self.free = [c for c in self.handles]
0053 self.queue = []
0054
0055 for c in self.handles:
0056 c.buffer = None
0057 c.setopt(NOSIGNAL, 1)
0058 c.setopt(TIMEOUT, 300)
0059 c.setopt(CONNECTTIMEOUT, 30)
0060 c.setopt(FOLLOWLOCATION, 1)
0061 c.setopt(MAXREDIRS, 5)
0062 if user_agent:
0063 c.setopt(USERAGENT, user_agent)
0064 if ssl_opts:
0065 c.setopt(CAPATH, ssl_opts.ca_path)
0066 c.setopt(SSLCERT, ssl_opts.cert_file)
0067 c.setopt(SSLKEY, ssl_opts.key_file)
0068 if ssl_opts.key_pass:
0069 c.setopt(SSLKEYPASSWD, ssl_opts.key_pass)
0070 if request_headers:
0071 c.setopt(HTTPHEADER, request_headers)
0072 if handle_init:
0073 handle_init(c)
0074
0075 def _request_init(self, c, url):
0076 """Default request initialisation callback."""
0077 c.setopt(URL, url)
0078
0079 def _request_error(self, c, task, errmsg, errno):
0080 """Default request error callback."""
0081 raise RuntimeError((task, errmsg, errno))
0082
0083 def _request_respond(self, *args):
0084 """Default request response callback."""
0085 pass
0086
0087 def put(self, task):
0088 """Add a new task. The task object should be a tuple and is
0089 passed to ``request_init`` callback passed to the constructor."""
0090 self.queue.append(task)
0091
0092 def process(self):
0093 """Process pending requests until none are left.
0094
0095 This method processes all requests queued with `.put()` until they
0096 have been fully processed. It calls the ``request_respond`` callback
0097 for all successfully completed requests, and ``request_error`` for
0098 all failed ones.
0099
0100 Any new requests added by callbacks by invoking ``put()`` are also
0101 processed before returning."""
0102 npending = 0
0103 while self.queue or npending:
0104 while self.queue and self.free:
0105 c = self.free.pop()
0106 c.task = self.queue.pop(0)
0107 c.buffer = b = BytesIO()
0108 c.setopt(WRITEFUNCTION, b.write)
0109 self.request_init(c, *c.task)
0110 self.cm.add_handle(c)
0111 npending += 1
0112
0113 while True:
0114 ret, nhandles = self.cm.perform()
0115 if ret != E_CALL_MULTI_PERFORM:
0116 break
0117
0118 while True:
0119 numq, ok, err = self.cm.info_read()
0120
0121 for c in ok:
0122 assert npending > 0
0123 self.cm.remove_handle(c)
0124 self.request_respond(c)
0125 c.buffer = None
0126 self.free.append(c)
0127 npending -= 1
0128
0129 for c, errno, errmsg in err:
0130 assert npending > 0
0131 self.cm.remove_handle(c)
0132 self.free.append(c)
0133 npending -= 1
0134 self.request_error(c, c.task, errmsg, errno)
0135
0136 if numq == 0:
0137 break
0138
0139 self.cm.select(1.)
0140