Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:08

0001 from builtins import range
0002 from io import StringIO
0003 from io import BytesIO
0004 from pycurl import *
0005 
0006 class RequestManager:
0007   """Manager of multiple concurrent or overlapping HTTP requests.
0008 
0009 This is a utility class acting as a pump of several overlapping
0010 HTTP requests against any number of HTTP or HTTPS servers. It
0011 uses a configurable number of simultaneous connections, ten by
0012 default. The actual connection layer is handled using curl, and
0013 the client classes need to aware of this to a limited degree.
0014 
0015 The client supplies optional callback methods for initialising,
0016 responding and handling errors on connections. At the very least
0017 the request response callback should be defined.
0018 
0019 This class is not designed for multi-threaded use. It employs
0020 overlapping requests, but in a single thread. Only one thread
0021 at a time should be calling `process()`; several threads may
0022 call `.put()` provided the caller uses a mutex so that only one
0023 thread calls into the method at a time."""
0024 
0025   def __init__(self, num_connections = 10, ssl_opts = None,
0026                user_agent = None, request_headers = None,
0027                request_init = None, request_respond = None,
0028                request_error = None, handle_init = None):
0029     """Initialise the request manager. The arguments are:
0030 
0031 :arg num_connections: maximum number of simultaneous connections.
0032 :arg ssl_opts: optional SSLOptions (Monitoring.Core.X509) for SSL
0033 X509 parametre values, e.g. for X509 client authentication.
0034 :arg user_agent: sets user agent identification string if defined.
0035 :arg request_headers: if defined, specifies list of additional HTTP
0036 request headers to be added to each request.
0037 :arg request_init: optional callback to initialise requests; the
0038 default assumes each task is a URL to access and sets the `URL`
0039 property on the curl object to the task value.
0040 :arg request_respond: callback for handling responses; at the very
0041 minimum this should be defined as the default one does nothing.
0042 :arg request_error: callback for handling connection errors; the
0043 default one raises a RuntimeException.
0044 :arg handle_init: callback for customising connection handles at
0045 creation time; the callback will be invoked for each connection
0046 object as it's created and queued to the idle connection list."""
0047     self.request_respond = request_respond or self._request_respond
0048     self.request_error = request_error or self._request_error
0049     self.request_init = request_init or self._request_init
0050     self.cm = CurlMulti()
0051     self.handles = [Curl() for i in range(0, num_connections)]
0052     self.free = [c for c in self.handles]
0053     self.queue = []
0054 
0055     for c in self.handles:
0056       c.buffer = None
0057       c.setopt(NOSIGNAL, 1)
0058       c.setopt(TIMEOUT, 300)
0059       c.setopt(CONNECTTIMEOUT, 30)
0060       c.setopt(FOLLOWLOCATION, 1)
0061       c.setopt(MAXREDIRS, 5)
0062       if user_agent:
0063         c.setopt(USERAGENT, user_agent)
0064       if ssl_opts:
0065         c.setopt(CAPATH, ssl_opts.ca_path)
0066         c.setopt(SSLCERT, ssl_opts.cert_file)
0067         c.setopt(SSLKEY, ssl_opts.key_file)
0068         if ssl_opts.key_pass:
0069           c.setopt(SSLKEYPASSWD, ssl_opts.key_pass)
0070       if request_headers:
0071         c.setopt(HTTPHEADER, request_headers)
0072       if handle_init:
0073         handle_init(c)
0074 
0075   def _request_init(self, c, url):
0076     """Default request initialisation callback."""
0077     c.setopt(URL, url)
0078 
0079   def _request_error(self, c, task, errmsg, errno):
0080     """Default request error callback."""
0081     raise RuntimeError((task, errmsg, errno))
0082 
0083   def _request_respond(self, *args):
0084     """Default request response callback."""
0085     pass
0086 
0087   def put(self, task):
0088     """Add a new task. The task object should be a tuple and is
0089 passed to ``request_init`` callback passed to the constructor."""
0090     self.queue.append(task)
0091 
0092   def process(self):
0093     """Process pending requests until none are left.
0094 
0095 This method processes all requests queued with `.put()` until they
0096 have been fully processed. It calls the ``request_respond`` callback
0097 for all successfully completed requests, and ``request_error`` for
0098 all failed ones.
0099 
0100 Any new requests added by callbacks by invoking ``put()`` are also
0101 processed before returning."""
0102     npending = 0
0103     while self.queue or npending:
0104       while self.queue and self.free:
0105         c = self.free.pop()
0106         c.task = self.queue.pop(0)
0107         c.buffer = b = BytesIO()
0108         c.setopt(WRITEFUNCTION, b.write)
0109         self.request_init(c, *c.task)
0110         self.cm.add_handle(c)
0111         npending += 1
0112 
0113       while True:
0114         ret, nhandles = self.cm.perform()
0115         if ret != E_CALL_MULTI_PERFORM:
0116           break
0117 
0118       while True:
0119         numq, ok, err = self.cm.info_read()
0120 
0121         for c in ok:
0122           assert npending > 0
0123           self.cm.remove_handle(c)
0124           self.request_respond(c)
0125           c.buffer = None
0126           self.free.append(c)
0127           npending -= 1
0128 
0129         for c, errno, errmsg in err:
0130           assert npending > 0
0131           self.cm.remove_handle(c)
0132           self.free.append(c)
0133           npending -= 1
0134           self.request_error(c, c.task, errmsg, errno)
0135 
0136         if numq == 0:
0137           break
0138 
0139       self.cm.select(1.)
0140