| |
| |
| |
| |
| |
| import usocket |
|
|
|
|
| class Response: |
| def __init__(self, f): |
| self.raw = f |
| self.encoding = "utf-8" |
| self._cached = None |
|
|
| def close(self): |
| if self.raw: |
| self.raw.close() |
| self.raw = None |
| self._cached = None |
|
|
| @property |
| def content(self): |
| if self._cached is None: |
| try: |
| self._cached = self.raw.read() |
| finally: |
| self.raw.close() |
| self.raw = None |
| return self._cached |
|
|
| @property |
| def text(self): |
| return str(self.content, self.encoding) |
|
|
| def json(self): |
| import ujson |
|
|
| return ujson.loads(self.content) |
|
|
|
|
| def request( |
| method, |
| url, |
| data=None, |
| json=None, |
| headers={}, |
| stream=None, |
| auth=None, |
| timeout=None, |
| parse_headers=True, |
| ): |
| redirect = None |
| chunked_data = ( |
| data and getattr(data, "__next__", None) and not getattr(data, "__len__", None) |
| ) |
|
|
| if auth is not None: |
| import ubinascii |
|
|
| username, password = auth |
| formated = b"{}:{}".format(username, password) |
| formated = str(ubinascii.b2a_base64(formated)[:-1], "ascii") |
| headers["Authorization"] = "Basic {}".format(formated) |
|
|
| try: |
| proto, dummy, host, path = url.split("/", 3) |
| except ValueError: |
| proto, dummy, host = url.split("/", 2) |
| path = "" |
| if proto == "http:": |
| port = 80 |
| elif proto == "https:": |
| import ussl |
|
|
| port = 443 |
| else: |
| raise ValueError("Unsupported protocol: " + proto) |
|
|
| if ":" in host: |
| host, port = host.split(":", 1) |
| port = int(port) |
|
|
| ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM) |
| ai = ai[0] |
|
|
| resp_d = None |
| if parse_headers is not False: |
| resp_d = {} |
|
|
| s = usocket.socket(ai[0], usocket.SOCK_STREAM, ai[2]) |
|
|
| if timeout is not None: |
| |
| |
| s.settimeout(timeout) |
|
|
| try: |
| s.connect(ai[-1]) |
| if proto == "https:": |
| s = ussl.wrap_socket(s, server_hostname=host) |
| s.write(b"%s /%s HTTP/1.1\r\n" % (method, path)) |
| if "Host" not in headers: |
| s.write(b"Host: %s\r\n" % host) |
| |
| for k in headers: |
| s.write(k) |
| s.write(b": ") |
| s.write(headers[k]) |
| s.write(b"\r\n") |
| if json is not None: |
| assert data is None |
| import ujson |
|
|
| data = ujson.dumps(json) |
| s.write(b"Content-Type: application/json\r\n") |
| if data: |
| if chunked_data: |
| s.write(b"Transfer-Encoding: chunked\r\n") |
| else: |
| s.write(b"Content-Length: %d\r\n" % len(data)) |
| s.write(b"Connection: close\r\n\r\n") |
| if data: |
| if chunked_data: |
| for chunk in data: |
| s.write(b"%x\r\n" % len(chunk)) |
| s.write(chunk) |
| s.write(b"\r\n") |
| s.write("0\r\n\r\n") |
| else: |
| s.write(data) |
|
|
| l = s.readline() |
| |
| l = l.split(None, 2) |
| if len(l) < 2: |
| |
| raise ValueError("HTTP error: BadStatusLine:\n%s" % l) |
| status = int(l[1]) |
| reason = "" |
| if len(l) > 2: |
| reason = l[2].rstrip() |
| while True: |
| l = s.readline() |
| if not l or l == b"\r\n": |
| break |
| |
| if l.startswith(b"Transfer-Encoding:"): |
| if b"chunked" in l: |
| raise ValueError("Unsupported " + str(l, "utf-8")) |
| elif l.startswith(b"Location:") and not 200 <= status <= 299: |
| if status in [301, 302, 303, 307, 308]: |
| redirect = str(l[10:-2], "utf-8") |
| else: |
| raise NotImplementedError("Redirect %d not yet supported" % status) |
| if parse_headers is False: |
| pass |
| elif parse_headers is True: |
| l = str(l, "utf-8") |
| k, v = l.split(":", 1) |
| resp_d[k] = v.strip() |
| else: |
| parse_headers(l, resp_d) |
| except OSError: |
| s.close() |
| raise |
|
|
| if redirect: |
| s.close() |
| if status in [301, 302, 303]: |
| return request("GET", redirect, None, None, headers, stream) |
| else: |
| return request(method, redirect, data, json, headers, stream) |
| else: |
| resp = Response(s) |
| resp.status_code = status |
| resp.reason = reason |
| if resp_d is not None: |
| resp.headers = resp_d |
| return resp |
|
|
|
|
| def head(url, **kw): |
| return request("HEAD", url, **kw) |
|
|
|
|
| def get(url, **kw): |
| return request("GET", url, **kw) |
|
|
|
|
| def post(url, **kw): |
| return request("POST", url, **kw) |
|
|
|
|
| def put(url, **kw): |
| return request("PUT", url, **kw) |
|
|
|
|
| def patch(url, **kw): |
| return request("PATCH", url, **kw) |
|
|
|
|
| def delete(url, **kw): |
| return request("DELETE", url, **kw) |
|
|