Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-07-07 22:33:07

0001 from builtins import range
0002 from io import StringIO
0003 from pycurl import *
0004 
0005 class RequestManager:
0006   """Manager of multiple concurrent or overlapping HTTP requests.
0007 
0008 This is a utility class acting as a pump of several overlapping
0009 HTTP requests against any number of HTTP or HTTPS servers. It
0010 uses a configurable number of simultaneous connections, ten by
0011 default. The actual connection layer is handled using curl, and
0012 the client classes need to aware of this to a limited degree.
0013 
0014 The client supplies optional callback methods for initialising,
0015 responding and handling errors on connections. At the very least
0016 the request response callback should be defined.
0017 
0018 This class is not designed for multi-threaded use. It employs
0019 overlapping requests, but in a single thread. Only one thread
0020 at a time should be calling `process()`; several threads may
0021 call `.put()` provided the caller uses a mutex so that only one
0022 thread calls into the method at a time."""
0023 
0024   def __init__(self, num_connections = 10, ssl_opts = None,
0025                user_agent = None, request_headers = None,
0026                request_init = None, request_respond = None,
0027                request_error = None, handle_init = None):
0028     """Initialise the request manager. The arguments are:
0029 
0030 :arg num_connections: maximum number of simultaneous connections.
0031 :arg ssl_opts: optional SSLOptions (Monitoring.Core.X509) for SSL
0032 X509 parametre values, e.g. for X509 client authentication.
0033 :arg user_agent: sets user agent identification string if defined.
0034 :arg request_headers: if defined, specifies list of additional HTTP
0035 request headers to be added to each request.
0036 :arg request_init: optional callback to initialise requests; the
0037 default assumes each task is a URL to access and sets the `URL`
0038 property on the curl object to the task value.
0039 :arg request_respond: callback for handling responses; at the very
0040 minimum this should be defined as the default one does nothing.
0041 :arg request_error: callback for handling connection errors; the
0042 default one raises a RuntimeException.
0043 :arg handle_init: callback for customising connection handles at
0044 creation time; the callback will be invoked for each connection
0045 object as it's created and queued to the idle connection list."""
0046     self.request_respond = request_respond or self._request_respond
0047     self.request_error = request_error or self._request_error
0048     self.request_init = request_init or self._request_init
0049     self.cm = CurlMulti()
0050     self.handles = [Curl() for i in range(0, num_connections)]
0051     self.free = [c for c in self.handles]
0052     self.queue = []
0053 
0054     for c in self.handles:
0055       c.buffer = None
0056       c.setopt(NOSIGNAL, 1)
0057       c.setopt(TIMEOUT, 300)
0058       c.setopt(CONNECTTIMEOUT, 30)
0059       c.setopt(FOLLOWLOCATION, 1)
0060       c.setopt(MAXREDIRS, 5)
0061       if user_agent:
0062         c.setopt(USERAGENT, user_agent)
0063       if ssl_opts:
0064         c.setopt(CAPATH, ssl_opts.ca_path)
0065         c.setopt(SSLCERT, ssl_opts.cert_file)
0066         c.setopt(SSLKEY, ssl_opts.key_file)
0067         if ssl_opts.key_pass:
0068           c.setopt(SSLKEYPASSWD, ssl_opts.key_pass)
0069       if request_headers:
0070         c.setopt(HTTPHEADER, request_headers)
0071       if handle_init:
0072         handle_init(c)
0073 
0074   def _request_init(self, c, url):
0075     """Default request initialisation callback."""
0076     c.setopt(URL, url)
0077 
0078   def _request_error(self, c, task, errmsg, errno):
0079     """Default request error callback."""
0080     raise RuntimeError((task, errmsg, errno))
0081 
0082   def _request_respond(self, *args):
0083     """Default request response callback."""
0084     pass
0085 
0086   def put(self, task):
0087     """Add a new task. The task object should be a tuple and is
0088 passed to ``request_init`` callback passed to the constructor."""
0089     self.queue.append(task)
0090 
0091   def process(self):
0092     """Process pending requests until none are left.
0093 
0094 This method processes all requests queued with `.put()` until they
0095 have been fully processed. It calls the ``request_respond`` callback
0096 for all successfully completed requests, and ``request_error`` for
0097 all failed ones.
0098 
0099 Any new requests added by callbacks by invoking ``put()`` are also
0100 processed before returning."""
0101     npending = 0
0102     while self.queue or npending:
0103       while self.queue and self.free:
0104         c = self.free.pop()
0105         c.task = self.queue.pop(0)
0106         c.buffer = b = StringIO()
0107         c.setopt(WRITEFUNCTION, b.write)
0108         self.request_init(c, *c.task)
0109         self.cm.add_handle(c)
0110         npending += 1
0111 
0112       while True:
0113         ret, nhandles = self.cm.perform()
0114         if ret != E_CALL_MULTI_PERFORM:
0115           break
0116 
0117       while True:
0118         numq, ok, err = self.cm.info_read()
0119 
0120         for c in ok:
0121           assert npending > 0
0122           self.cm.remove_handle(c)
0123           self.request_respond(c)
0124           c.buffer = None
0125           self.free.append(c)
0126           npending -= 1
0127 
0128         for c, errno, errmsg in err:
0129           assert npending > 0
0130           self.cm.remove_handle(c)
0131           self.free.append(c)
0132           npending -= 1
0133           self.request_error(c, c.task, errmsg, errno)
0134 
0135         if numq == 0:
0136           break
0137 
0138       self.cm.select(1.)
0139