Quick search

Table Of Contents

Source code for


.. versionadded:: 1.0.8

You can use the :class:`UrlRequest` to make asynchronous requests on the
web and get the result when the request is completed. The spirit is the
same as the XHR object in Javascript.

The content is also decoded if the Content-Type is
application/json and the result automatically passed through json.loads.

The syntax to create a request::

    from import UrlRequest
    req = UrlRequest(url, on_success, on_redirect, on_failure, on_error,
                     on_progress, req_body, req_headers, chunk_size,
                     timeout, method, decode, debug, file_path, ca_file,

Only the first argument is mandatory: the rest are optional.
By default, a "GET" request will be sent. If the :attr:`UrlRequest.req_body` is
not None, a "POST" request will be sent. It's up to you to adjust
:attr:`UrlRequest.req_headers` to suit your requirements and the response
to the request will be accessible as the parameter called "result" on
the callback function of the on_success event.

Example of fetching JSON::

    def got_json(req, result):
        for key, value in result['headers'].items():
            print('{}: {}'.format(key, value))

    req = UrlRequest('', got_json)

Example of Posting data (adapted from httplib example)::

    import urllib

    def bug_posted(req, result):
        print('Our bug is posted!')

    params = urllib.urlencode({'@number': 12524, '@type': 'issue',
        '@action': 'show'})
    headers = {'Content-type': 'application/x-www-form-urlencoded',
              'Accept': 'text/plain'}
    req = UrlRequest('', on_success=bug_posted, req_body=params,

If you want a synchronous request, you can call the wait() method.


from base64 import b64encode
from collections import deque
from threading import Thread
from json import loads
from time import sleep
from kivy.compat import PY2

if PY2:
    from httplib import HTTPConnection
    from urlparse import urlparse, urlunparse
    from http.client import HTTPConnection
    from urllib.parse import urlparse, urlunparse

    import ssl

    HTTPSConnection = None
    if PY2:
        from httplib import HTTPSConnection
        from http.client import HTTPSConnection
except ImportError:
    # depending the platform, if openssl support wasn't compiled before python,
    # this class is not available.

from kivy.clock import Clock
from kivy.weakmethod import WeakMethod
from kivy.logger import Logger

# list to save UrlRequest and prevent GC on un-referenced objects
g_requests = []

[docs]class UrlRequest(Thread): '''A UrlRequest. See module documentation for usage. .. versionchanged:: 1.5.1 Add `debug` parameter .. versionchanged:: 1.0.10 Add `method` parameter .. versionchanged:: 1.8.0 Parameter `decode` added. Parameter `file_path` added. Parameter `on_redirect` added. Parameter `on_failure` added. .. versionchanged:: 1.9.1 Parameter `ca_file` added. Parameter `verify` added. .. versionchanged:: 1.10.0 Parameters `proxy_host`, `proxy_port` and `proxy_headers` added. :Parameters: `url`: str Complete url string to call. `on_success`: callback(request, result) Callback function to call when the result has been fetched. `on_redirect`: callback(request, result) Callback function to call if the server returns a Redirect. `on_failure`: callback(request, result) Callback function to call if the server returns a Client or Server Error. `on_error`: callback(request, error) Callback function to call if an error occurs. `on_progress`: callback(request, current_size, total_size) Callback function that will be called to report progression of the download. `total_size` might be -1 if no Content-Length has been reported in the http response. This callback will be called after each `chunk_size` is read. `req_body`: str, defaults to None Data to sent in the request. If it's not None, a POST will be done instead of a GET. `req_headers`: dict, defaults to None Custom headers to add to the request. `chunk_size`: int, defaults to 8192 Size of each chunk to read, used only when `on_progress` callback has been set. If you decrease it too much, a lot of on_progress callbacks will be fired and will slow down your download. If you want to have the maximum download speed, increase the chunk_size or don't use ``on_progress``. `timeout`: int, defaults to None If set, blocking operations will timeout after this many seconds. `method`: str, defaults to 'GET' (or 'POST' if ``body`` is specified) The HTTP method to use. `decode`: bool, defaults to True If False, skip decoding of the response. `debug`: bool, defaults to False If True, it will use the Logger.debug to print information about url access/progression/errors. `file_path`: str, defaults to None If set, the result of the UrlRequest will be written to this path instead of in memory. `ca_file`: str, defaults to None Indicates a SSL CA certificate file path to validate HTTPS certificates against `verify`: bool, defaults to True If False, disables SSL CA certificate verification `proxy_host`: str, defaults to None If set, the proxy host to use for this connection. `proxy_port`: int, defaults to None If set, and `proxy_host` is also set, the port to use for connecting to the proxy server. `proxy_headers`: dict, defaults to None If set, and `proxy_host` is also set, the headers to send to the proxy server in the ``CONNECT`` request. ''' def __init__(self, url, on_success=None, on_redirect=None, on_failure=None, on_error=None, on_progress=None, req_body=None, req_headers=None, chunk_size=8192, timeout=None, method=None, decode=True, debug=False, file_path=None, ca_file=None, verify=True, proxy_host=None, proxy_port=None, proxy_headers=None): super(UrlRequest, self).__init__() self._queue = deque() self._trigger_result = Clock.create_trigger(self._dispatch_result, 0) self.daemon = True self.on_success = WeakMethod(on_success) if on_success else None self.on_redirect = WeakMethod(on_redirect) if on_redirect else None self.on_failure = WeakMethod(on_failure) if on_failure else None self.on_error = WeakMethod(on_error) if on_error else None self.on_progress = WeakMethod(on_progress) if on_progress else None self.decode = decode self.file_path = file_path self._debug = debug self._result = None self._error = None self._is_finished = False self._resp_status = None self._resp_headers = None self._resp_length = -1 self._chunk_size = chunk_size self._timeout = timeout self._method = method self.ca_file = ca_file self.verify = verify self._proxy_host = proxy_host self._proxy_port = proxy_port self._proxy_headers = proxy_headers #: Url of the request self.url = url #: Request body passed in __init__ self.req_body = req_body #: Request headers passed in __init__ self.req_headers = req_headers # save our request to prevent GC g_requests.append(self) self.start()
[docs] def run(self): q = self._queue.appendleft url = self.url req_body = self.req_body req_headers = self.req_headers try: result, resp = self._fetch_url(url, req_body, req_headers, q) if self.decode: result = self.decode_result(result, resp) except Exception as e: q(('error', None, e)) else: q(('success', resp, result)) # using trigger can result in a missed on_success event self._trigger_result() # clean ourself when the queue is empty while len(self._queue): sleep(.1) self._trigger_result() # ok, authorize the GC to clean us. if self in g_requests: g_requests.remove(self)
def _parse_url(self, url): parse = urlparse(url) host = parse.hostname port = parse.port userpass = None # append user + pass to hostname if specified if parse.username and parse.password: userpass = { "Authorization": "Basic {}".format(b64encode( "{}:{}".format( parse.username, parse.password ).encode('utf-8') ).decode('utf-8')) } return host, port, userpass, parse def _fetch_url(self, url, body, headers, q): # Parse and fetch the current url trigger = self._trigger_result chunk_size = self._chunk_size report_progress = self.on_progress is not None timeout = self._timeout file_path = self.file_path ca_file = self.ca_file verify = self.verify if self._debug: Logger.debug('UrlRequest: {0} Fetch url <{1}>'.format( id(self), url)) Logger.debug('UrlRequest: {0} - body: {1}'.format( id(self), body)) Logger.debug('UrlRequest: {0} - headers: {1}'.format( id(self), headers)) # parse url host, port, userpass, parse = self._parse_url(url) if userpass and not headers: headers = userpass elif userpass and headers: key = list(userpass.keys())[0] headers[key] = userpass[key] # translate scheme to connection class cls = self.get_connection_for_scheme(parse.scheme) # reconstruct path to pass on the request path = parse.path if parse.params: path += ';' + parse.params if parse.query: path += '?' + parse.query if parse.fragment: path += '#' + parse.fragment # create connection instance args = {} if timeout is not None: args['timeout'] = timeout if ca_file is not None and hasattr(ssl, 'create_default_context'): ctx = ssl.create_default_context(cafile=ca_file) ctx.verify_mode = ssl.CERT_REQUIRED args['context'] = ctx if not verify and parse.scheme == 'https' and ( hasattr(ssl, 'create_default_context')): ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE args['context'] = ctx if self._proxy_host: Logger.debug('UrlRequest: {0} - proxy via {1}:{2}'.format( id(self), self._proxy_host, self._proxy_port )) req = cls(self._proxy_host, self._proxy_port, **args) if parse.scheme == 'https': req.set_tunnel(host, port, self._proxy_headers) else: path = urlunparse(parse) else: req = cls(host, port, **args) # send request method = self._method if method is None: method = 'GET' if body is None else 'POST' req.request(method, path, body, headers or {}) # read header resp = req.getresponse() # read content if report_progress or file_path is not None: try: total_size = int(resp.getheader('content-length')) except: total_size = -1 # before starting the download, send a fake progress to permit the # user to initialize his ui if report_progress: q(('progress', resp, (0, total_size))) def get_chunks(fd=None): bytes_so_far = 0 result = b'' while 1: chunk = if not chunk: break if fd: fd.write(chunk) else: result += chunk bytes_so_far += len(chunk) # report progress to user if report_progress: q(('progress', resp, (bytes_so_far, total_size))) trigger() return bytes_so_far, result if file_path is not None: with open(file_path, 'wb') as fd: bytes_so_far, result = get_chunks(fd) else: bytes_so_far, result = get_chunks() # ensure that results are dispatched for the last chunk, # avoid trigger if report_progress: q(('progress', resp, (bytes_so_far, total_size))) trigger() else: result = try: if isinstance(result, bytes): result = result.decode('utf-8') except UnicodeDecodeError: # if it's an image? decoding would not work pass req.close() # return everything return result, resp
[docs] def get_connection_for_scheme(self, scheme): '''Return the Connection class for a particular scheme. This is an internal function that can be expanded to support custom schemes. Actual supported schemes: http, https. ''' if scheme == 'http': return HTTPConnection elif scheme == 'https' and HTTPSConnection is not None: return HTTPSConnection else: raise Exception('No class for scheme %s' % scheme)
[docs] def decode_result(self, result, resp): '''Decode the result fetched from url according to his Content-Type. Currently supports only application/json. ''' # Entry to decode url from the content type. # For example, if the content type is a json, it will be automatically # decoded. content_type = resp.getheader('Content-Type', None) if content_type is not None: ct = content_type.split(';')[0] if ct == 'application/json': if isinstance(result, bytes): result = result.decode('utf-8') try: return loads(result) except: return result return result
def _dispatch_result(self, dt): while True: # Read the result pushed on the queue, and dispatch to the client try: result, resp, data = self._queue.pop() except IndexError: return if resp: # XXX usage of dict can be dangerous if multiple headers # are set even if it's invalid. But it look like it's ok # ? # ..urllib2-multiple-set-cookie-headers-in-response self._resp_headers = dict(resp.getheaders()) self._resp_status = resp.status if result == 'success': status_class = resp.status // 100 if status_class in (1, 2): if self._debug: Logger.debug('UrlRequest: {0} Download finished with' ' {1} datalen'.format(id(self), len(data))) self._is_finished = True self._result = data if self.on_success: func = self.on_success() if func: func(self, data) elif status_class == 3: if self._debug: Logger.debug('UrlRequest: {} Download ' 'redirected'.format(id(self))) self._is_finished = True self._result = data if self.on_redirect: func = self.on_redirect() if func: func(self, data) elif status_class in (4, 5): if self._debug: Logger.debug('UrlRequest: {} Download failed with ' 'http error {}'.format(id(self), resp.status)) self._is_finished = True self._result = data if self.on_failure: func = self.on_failure() if func: func(self, data) elif result == 'error': if self._debug: Logger.debug('UrlRequest: {0} Download error ' '<{1}>'.format(id(self), data)) self._is_finished = True self._error = data if self.on_error: func = self.on_error() if func: func(self, data) elif result == 'progress': if self._debug: Logger.debug('UrlRequest: {0} Download progress ' '{1}'.format(id(self), data)) if self.on_progress: func = self.on_progress() if func: func(self, data[0], data[1]) else: assert(0) @property def is_finished(self): '''Return True if the request has finished, whether it's a success or a failure. ''' return self._is_finished @property def result(self): '''Return the result of the request. This value is not determined until the request is finished. ''' return self._result @property def resp_headers(self): '''If the request has been completed, return a dictionary containing the headers of the response. Otherwise, it will return None. ''' return self._resp_headers @property def resp_status(self): '''Return the status code of the response if the request is complete, otherwise return None. ''' return self._resp_status @property def error(self): '''Return the error of the request. This value is not determined until the request is completed. ''' return self._error @property def chunk_size(self): '''Return the size of a chunk, used only in "progress" mode (when on_progress callback is set.) ''' return self._chunk_size
[docs] def wait(self, delay=0.5): '''Wait for the request to finish (until :attr:`resp_status` is not None) .. note:: This method is intended to be used in the main thread, and the callback will be dispatched from the same thread from which you're calling. .. versionadded:: 1.1.0 ''' while self.resp_status is None: self._dispatch_result(delay) sleep(delay)
if __name__ == '__main__': from pprint import pprint def on_success(req, result): pprint('Got the result:') pprint(result) def on_error(req, error): pprint('Got an error:') pprint(error) req = UrlRequest('' '=json&action=query&titles=Kivy&prop=revisions&rvprop=content', on_success, on_error) while not req.is_finished: sleep(1) Clock.tick() print('result =', req.result) print('error =', req.error)