from __future__ import annotations import datetime import enum import logging import typing import warnings from contextlib import asynccontextmanager, contextmanager from types import TracebackType from .__version__ import __version__ from ._auth import Auth, BasicAuth, FunctionAuth from ._config import ( DEFAULT_LIMITS, DEFAULT_MAX_REDIRECTS, DEFAULT_TIMEOUT_CONFIG, Limits, Proxy, Timeout, ) from ._decoders import SUPPORTED_DECODERS from ._exceptions import ( InvalidURL, RemoteProtocolError, TooManyRedirects, request_context, ) from ._models import Cookies, Headers, Request, Response from ._status_codes import codes from ._transports.asgi import ASGITransport from ._transports.base import AsyncBaseTransport, BaseTransport from ._transports.default import AsyncHTTPTransport, HTTPTransport from ._transports.wsgi import WSGITransport from ._types import ( AsyncByteStream, AuthTypes, CertTypes, CookieTypes, HeaderTypes, ProxiesTypes, ProxyTypes, QueryParamTypes, RequestContent, RequestData, RequestExtensions, RequestFiles, SyncByteStream, TimeoutTypes, URLTypes, VerifyTypes, ) from ._urls import URL, QueryParams from ._utils import ( Timer, URLPattern, get_environment_proxies, is_https_redirect, same_origin, ) # The type annotation for @classmethod and context managers here follows PEP 484 # https://www.python.org/dev/peps/pep-0484/#annotating-instance-and-class-methods T = typing.TypeVar("T", bound="Client") U = typing.TypeVar("U", bound="AsyncClient") class UseClientDefault: """ For some parameters such as `auth=...` and `timeout=...` we need to be able to indicate the default "unset" state, in a way that is distinctly different to using `None`. The default "unset" state indicates that whatever default is set on the client should be used. This is different to setting `None`, which explicitly disables the parameter, possibly overriding a client default. For example we use `timeout=USE_CLIENT_DEFAULT` in the `request()` signature. Omitting the `timeout` parameter will send a request using whatever default timeout has been configured on the client. Including `timeout=None` will ensure no timeout is used. Note that user code shouldn't need to use the `USE_CLIENT_DEFAULT` constant, but it is used internally when a parameter is not included. """ USE_CLIENT_DEFAULT = UseClientDefault() logger = logging.getLogger("httpx") USER_AGENT = f"python-httpx/{__version__}" ACCEPT_ENCODING = ", ".join( [key for key in SUPPORTED_DECODERS.keys() if key != "identity"] ) class ClientState(enum.Enum): # UNOPENED: # The client has been instantiated, but has not been used to send a request, # or been opened by entering the context of a `with` block. UNOPENED = 1 # OPENED: # The client has either sent a request, or is within a `with` block. OPENED = 2 # CLOSED: # The client has either exited the `with` block, or `close()` has # been called explicitly. CLOSED = 3 class BoundSyncStream(SyncByteStream): """ A byte stream that is bound to a given response instance, and that ensures the `response.elapsed` is set once the response is closed. """ def __init__( self, stream: SyncByteStream, response: Response, timer: Timer ) -> None: self._stream = stream self._response = response self._timer = timer def __iter__(self) -> typing.Iterator[bytes]: for chunk in self._stream: yield chunk def close(self) -> None: seconds = self._timer.sync_elapsed() self._response.elapsed = datetime.timedelta(seconds=seconds) self._stream.close() class BoundAsyncStream(AsyncByteStream): """ An async byte stream that is bound to a given response instance, and that ensures the `response.elapsed` is set once the response is closed. """ def __init__( self, stream: AsyncByteStream, response: Response, timer: Timer ) -> None: self._stream = stream self._response = response self._timer = timer async def __aiter__(self) -> typing.AsyncIterator[bytes]: async for chunk in self._stream: yield chunk async def aclose(self) -> None: seconds = await self._timer.async_elapsed() self._response.elapsed = datetime.timedelta(seconds=seconds) await self._stream.aclose() EventHook = typing.Callable[..., typing.Any] class BaseClient: def __init__( self, *, auth: AuthTypes | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, max_redirects: int = DEFAULT_MAX_REDIRECTS, event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None, base_url: URLTypes = "", trust_env: bool = True, default_encoding: str | typing.Callable[[bytes], str] = "utf-8", ) -> None: event_hooks = {} if event_hooks is None else event_hooks self._base_url = self._enforce_trailing_slash(URL(base_url)) self._auth = self._build_auth(auth) self._params = QueryParams(params) self.headers = Headers(headers) self._cookies = Cookies(cookies) self._timeout = Timeout(timeout) self.follow_redirects = follow_redirects self.max_redirects = max_redirects self._event_hooks = { "request": list(event_hooks.get("request", [])), "response": list(event_hooks.get("response", [])), } self._trust_env = trust_env self._default_encoding = default_encoding self._state = ClientState.UNOPENED @property def is_closed(self) -> bool: """ Check if the client being closed """ return self._state == ClientState.CLOSED @property def trust_env(self) -> bool: return self._trust_env def _enforce_trailing_slash(self, url: URL) -> URL: if url.raw_path.endswith(b"/"): return url return url.copy_with(raw_path=url.raw_path + b"/") def _get_proxy_map( self, proxies: ProxiesTypes | None, allow_env_proxies: bool ) -> dict[str, Proxy | None]: if proxies is None: if allow_env_proxies: return { key: None if url is None else Proxy(url=url) for key, url in get_environment_proxies().items() } return {} if isinstance(proxies, dict): new_proxies = {} for key, value in proxies.items(): proxy = Proxy(url=value) if isinstance(value, (str, URL)) else value new_proxies[str(key)] = proxy return new_proxies else: proxy = Proxy(url=proxies) if isinstance(proxies, (str, URL)) else proxies return {"all://": proxy} @property def timeout(self) -> Timeout: return self._timeout @timeout.setter def timeout(self, timeout: TimeoutTypes) -> None: self._timeout = Timeout(timeout) @property def event_hooks(self) -> dict[str, list[EventHook]]: return self._event_hooks @event_hooks.setter def event_hooks(self, event_hooks: dict[str, list[EventHook]]) -> None: self._event_hooks = { "request": list(event_hooks.get("request", [])), "response": list(event_hooks.get("response", [])), } @property def auth(self) -> Auth | None: """ Authentication class used when none is passed at the request-level. See also [Authentication][0]. [0]: /quickstart/#authentication """ return self._auth @auth.setter def auth(self, auth: AuthTypes) -> None: self._auth = self._build_auth(auth) @property def base_url(self) -> URL: """ Base URL to use when sending requests with relative URLs. """ return self._base_url @base_url.setter def base_url(self, url: URLTypes) -> None: self._base_url = self._enforce_trailing_slash(URL(url)) @property def headers(self) -> Headers: """ HTTP headers to include when sending requests. """ return self._headers @headers.setter def headers(self, headers: HeaderTypes) -> None: client_headers = Headers( { b"Accept": b"*/*", b"Accept-Encoding": ACCEPT_ENCODING.encode("ascii"), b"Connection": b"keep-alive", b"User-Agent": USER_AGENT.encode("ascii"), } ) client_headers.update(headers) self._headers = client_headers @property def cookies(self) -> Cookies: """ Cookie values to include when sending requests. """ return self._cookies @cookies.setter def cookies(self, cookies: CookieTypes) -> None: self._cookies = Cookies(cookies) @property def params(self) -> QueryParams: """ Query parameters to include in the URL when sending requests. """ return self._params @params.setter def params(self, params: QueryParamTypes) -> None: self._params = QueryParams(params) def build_request( self, method: str, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Request: """ Build and return a request instance. * The `params`, `headers` and `cookies` arguments are merged with any values set on the client. * The `url` argument is merged with any `base_url` set on the client. See also: [Request instances][0] [0]: /advanced/#request-instances """ url = self._merge_url(url) headers = self._merge_headers(headers) cookies = self._merge_cookies(cookies) params = self._merge_queryparams(params) extensions = {} if extensions is None else extensions if "timeout" not in extensions: timeout = ( self.timeout if isinstance(timeout, UseClientDefault) else Timeout(timeout) ) extensions = dict(**extensions, timeout=timeout.as_dict()) return Request( method, url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, extensions=extensions, ) def _merge_url(self, url: URLTypes) -> URL: """ Merge a URL argument together with any 'base_url' on the client, to create the URL used for the outgoing request. """ merge_url = URL(url) if merge_url.is_relative_url: # To merge URLs we always append to the base URL. To get this # behaviour correct we always ensure the base URL ends in a '/' # separator, and strip any leading '/' from the merge URL. # # So, eg... # # >>> client = Client(base_url="https://www.example.com/subpath") # >>> client.base_url # URL('https://www.example.com/subpath/') # >>> client.build_request("GET", "/path").url # URL('https://www.example.com/subpath/path') merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/") return self.base_url.copy_with(raw_path=merge_raw_path) return merge_url def _merge_cookies(self, cookies: CookieTypes | None = None) -> CookieTypes | None: """ Merge a cookies argument together with any cookies on the client, to create the cookies used for the outgoing request. """ if cookies or self.cookies: merged_cookies = Cookies(self.cookies) merged_cookies.update(cookies) return merged_cookies return cookies def _merge_headers(self, headers: HeaderTypes | None = None) -> HeaderTypes | None: """ Merge a headers argument together with any headers on the client, to create the headers used for the outgoing request. """ merged_headers = Headers(self.headers) merged_headers.update(headers) return merged_headers def _merge_queryparams( self, params: QueryParamTypes | None = None ) -> QueryParamTypes | None: """ Merge a queryparams argument together with any queryparams on the client, to create the queryparams used for the outgoing request. """ if params or self.params: merged_queryparams = QueryParams(self.params) return merged_queryparams.merge(params) return params def _build_auth(self, auth: AuthTypes | None) -> Auth | None: if auth is None: return None elif isinstance(auth, tuple): return BasicAuth(username=auth[0], password=auth[1]) elif isinstance(auth, Auth): return auth elif callable(auth): return FunctionAuth(func=auth) else: raise TypeError(f'Invalid "auth" argument: {auth!r}') def _build_request_auth( self, request: Request, auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, ) -> Auth: auth = ( self._auth if isinstance(auth, UseClientDefault) else self._build_auth(auth) ) if auth is not None: return auth username, password = request.url.username, request.url.password if username or password: return BasicAuth(username=username, password=password) return Auth() def _build_redirect_request(self, request: Request, response: Response) -> Request: """ Given a request and a redirect response, return a new request that should be used to effect the redirect. """ method = self._redirect_method(request, response) url = self._redirect_url(request, response) headers = self._redirect_headers(request, url, method) stream = self._redirect_stream(request, method) cookies = Cookies(self.cookies) return Request( method=method, url=url, headers=headers, cookies=cookies, stream=stream, extensions=request.extensions, ) def _redirect_method(self, request: Request, response: Response) -> str: """ When being redirected we may want to change the method of the request based on certain specs or browser behavior. """ method = request.method # https://tools.ietf.org/html/rfc7231#section-6.4.4 if response.status_code == codes.SEE_OTHER and method != "HEAD": method = "GET" # Do what the browsers do, despite standards... # Turn 302s into GETs. if response.status_code == codes.FOUND and method != "HEAD": method = "GET" # If a POST is responded to with a 301, turn it into a GET. # This bizarre behaviour is explained in 'requests' issue 1704. if response.status_code == codes.MOVED_PERMANENTLY and method == "POST": method = "GET" return method def _redirect_url(self, request: Request, response: Response) -> URL: """ Return the URL for the redirect to follow. """ location = response.headers["Location"] try: url = URL(location) except InvalidURL as exc: raise RemoteProtocolError( f"Invalid URL in location header: {exc}.", request=request ) from None # Handle malformed 'Location' headers that are "absolute" form, have no host. # See: https://github.com/encode/httpx/issues/771 if url.scheme and not url.host: url = url.copy_with(host=request.url.host) # Facilitate relative 'Location' headers, as allowed by RFC 7231. # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource') if url.is_relative_url: url = request.url.join(url) # Attach previous fragment if needed (RFC 7231 7.1.2) if request.url.fragment and not url.fragment: url = url.copy_with(fragment=request.url.fragment) return url def _redirect_headers(self, request: Request, url: URL, method: str) -> Headers: """ Return the headers that should be used for the redirect request. """ headers = Headers(request.headers) if not same_origin(url, request.url): if not is_https_redirect(request.url, url): # Strip Authorization headers when responses are redirected # away from the origin. (Except for direct HTTP to HTTPS redirects.) headers.pop("Authorization", None) # Update the Host header. headers["Host"] = url.netloc.decode("ascii") if method != request.method and method == "GET": # If we've switch to a 'GET' request, then strip any headers which # are only relevant to the request body. headers.pop("Content-Length", None) headers.pop("Transfer-Encoding", None) # We should use the client cookie store to determine any cookie header, # rather than whatever was on the original outgoing request. headers.pop("Cookie", None) return headers def _redirect_stream( self, request: Request, method: str ) -> SyncByteStream | AsyncByteStream | None: """ Return the body that should be used for the redirect request. """ if method != request.method and method == "GET": return None return request.stream class Client(BaseClient): """ An HTTP client, with connection pooling, HTTP/2, redirects, cookie persistence, etc. It can be shared between threads. Usage: ```python >>> client = httpx.Client() >>> response = client.get('https://example.org') ``` **Parameters:** * **auth** - *(optional)* An authentication class to use when sending requests. * **params** - *(optional)* Query parameters to include in request URLs, as a string, dictionary, or sequence of two-tuples. * **headers** - *(optional)* Dictionary of HTTP headers to include when sending requests. * **cookies** - *(optional)* Dictionary of Cookie items to include when sending requests. * **verify** - *(optional)* SSL certificates (a.k.a CA bundle) used to verify the identity of requested hosts. Either `True` (default CA bundle), a path to an SSL certificate file, an `ssl.SSLContext`, or `False` (which will disable verification). * **cert** - *(optional)* An SSL certificate used by the requested host to authenticate the client. Either a path to an SSL certificate file, or two-tuple of (certificate file, key file), or a three-tuple of (certificate file, key file, password). * **http2** - *(optional)* A boolean indicating if HTTP/2 support should be enabled. Defaults to `False`. * **proxy** - *(optional)* A proxy URL where all the traffic should be routed. * **proxies** - *(optional)* A dictionary mapping proxy keys to proxy URLs. * **timeout** - *(optional)* The timeout configuration to use when sending requests. * **limits** - *(optional)* The limits configuration to use. * **max_redirects** - *(optional)* The maximum number of redirect responses that should be followed. * **base_url** - *(optional)* A URL to use as the base when building request URLs. * **transport** - *(optional)* A transport class to use for sending requests over the network. * **app** - *(optional)* An WSGI application to send requests to, rather than sending actual network requests. * **trust_env** - *(optional)* Enables or disables usage of environment variables for configuration. * **default_encoding** - *(optional)* The default encoding to use for decoding response text, if no charset information is included in a response Content-Type header. Set to a callable for automatic character set detection. Default: "utf-8". """ def __init__( self, *, auth: AuthTypes | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, proxy: ProxyTypes | None = None, proxies: ProxiesTypes | None = None, mounts: None | (typing.Mapping[str, BaseTransport | None]) = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, limits: Limits = DEFAULT_LIMITS, max_redirects: int = DEFAULT_MAX_REDIRECTS, event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None, base_url: URLTypes = "", transport: BaseTransport | None = None, app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, default_encoding: str | typing.Callable[[bytes], str] = "utf-8", ) -> None: super().__init__( auth=auth, params=params, headers=headers, cookies=cookies, timeout=timeout, follow_redirects=follow_redirects, max_redirects=max_redirects, event_hooks=event_hooks, base_url=base_url, trust_env=trust_env, default_encoding=default_encoding, ) if http2: try: import h2 # noqa except ImportError: # pragma: no cover raise ImportError( "Using http2=True, but the 'h2' package is not installed. " "Make sure to install httpx using `pip install httpx[http2]`." ) from None if proxies: message = ( "The 'proxies' argument is now deprecated." " Use 'proxy' or 'mounts' instead." ) warnings.warn(message, DeprecationWarning) if proxy: raise RuntimeError("Use either `proxy` or 'proxies', not both.") if app: message = ( "The 'app' shortcut is now deprecated." " Use the explicit style 'transport=WSGITransport(app=...)' instead." ) warnings.warn(message, DeprecationWarning) allow_env_proxies = trust_env and app is None and transport is None proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies) self._transport = self._init_transport( verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, transport=transport, app=app, trust_env=trust_env, ) self._mounts: dict[URLPattern, BaseTransport | None] = { URLPattern(key): None if proxy is None else self._init_proxy_transport( proxy, verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, trust_env=trust_env, ) for key, proxy in proxy_map.items() } if mounts is not None: self._mounts.update( {URLPattern(key): transport for key, transport in mounts.items()} ) self._mounts = dict(sorted(self._mounts.items())) def _init_transport( self, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, transport: BaseTransport | None = None, app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, ) -> BaseTransport: if transport is not None: return transport if app is not None: return WSGITransport(app=app) return HTTPTransport( verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, trust_env=trust_env, ) def _init_proxy_transport( self, proxy: Proxy, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, ) -> BaseTransport: return HTTPTransport( verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, trust_env=trust_env, proxy=proxy, ) def _transport_for_url(self, url: URL) -> BaseTransport: """ Returns the transport instance that should be used for a given URL. This will either be the standard connection pool, or a proxy. """ for pattern, transport in self._mounts.items(): if pattern.matches(url): return self._transport if transport is None else transport return self._transport def request( self, method: str, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Build and send a request. Equivalent to: ```python request = client.build_request(...) response = client.send(request, ...) ``` See `Client.build_request()`, `Client.send()` and [Merging of configuration][0] for how the various parameters are merged with client-level configuration. [0]: /advanced/#merging-of-configuration """ if cookies is not None: message = ( "Setting per-request cookies=<...> is being deprecated, because " "the expected behaviour on cookie persistence is ambiguous. Set " "cookies directly on the client instance instead." ) warnings.warn(message, DeprecationWarning) request = self.build_request( method=method, url=url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, timeout=timeout, extensions=extensions, ) return self.send(request, auth=auth, follow_redirects=follow_redirects) @contextmanager def stream( self, method: str, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> typing.Iterator[Response]: """ Alternative to `httpx.request()` that streams the response body instead of loading it into memory at once. **Parameters**: See `httpx.request`. See also: [Streaming Responses][0] [0]: /quickstart#streaming-responses """ request = self.build_request( method=method, url=url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, timeout=timeout, extensions=extensions, ) response = self.send( request=request, auth=auth, follow_redirects=follow_redirects, stream=True, ) try: yield response finally: response.close() def send( self, request: Request, *, stream: bool = False, auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, ) -> Response: """ Send a request. The request is sent as-is, unmodified. Typically you'll want to build one with `Client.build_request()` so that any client-level configuration is merged into the request, but passing an explicit `httpx.Request()` is supported as well. See also: [Request instances][0] [0]: /advanced/#request-instances """ if self._state == ClientState.CLOSED: raise RuntimeError("Cannot send a request, as the client has been closed.") self._state = ClientState.OPENED follow_redirects = ( self.follow_redirects if isinstance(follow_redirects, UseClientDefault) else follow_redirects ) auth = self._build_request_auth(request, auth) response = self._send_handling_auth( request, auth=auth, follow_redirects=follow_redirects, history=[], ) try: if not stream: response.read() return response except BaseException as exc: response.close() raise exc def _send_handling_auth( self, request: Request, auth: Auth, follow_redirects: bool, history: list[Response], ) -> Response: auth_flow = auth.sync_auth_flow(request) try: request = next(auth_flow) while True: response = self._send_handling_redirects( request, follow_redirects=follow_redirects, history=history, ) try: try: next_request = auth_flow.send(response) except StopIteration: return response response.history = list(history) response.read() request = next_request history.append(response) except BaseException as exc: response.close() raise exc finally: auth_flow.close() def _send_handling_redirects( self, request: Request, follow_redirects: bool, history: list[Response], ) -> Response: while True: if len(history) > self.max_redirects: raise TooManyRedirects( "Exceeded maximum allowed redirects.", request=request ) for hook in self._event_hooks["request"]: hook(request) response = self._send_single_request(request) try: for hook in self._event_hooks["response"]: hook(response) response.history = list(history) if not response.has_redirect_location: return response request = self._build_redirect_request(request, response) history = history + [response] if follow_redirects: response.read() else: response.next_request = request return response except BaseException as exc: response.close() raise exc def _send_single_request(self, request: Request) -> Response: """ Sends a single request, without handling any redirections. """ transport = self._transport_for_url(request.url) timer = Timer() timer.sync_start() if not isinstance(request.stream, SyncByteStream): raise RuntimeError( "Attempted to send an async request with a sync Client instance." ) with request_context(request=request): response = transport.handle_request(request) assert isinstance(response.stream, SyncByteStream) response.request = request response.stream = BoundSyncStream( response.stream, response=response, timer=timer ) self.cookies.extract_cookies(response) response.default_encoding = self._default_encoding logger.info( 'HTTP Request: %s %s "%s %d %s"', request.method, request.url, response.http_version, response.status_code, response.reason_phrase, ) return response def get( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `GET` request. **Parameters**: See `httpx.request`. """ return self.request( "GET", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) def options( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send an `OPTIONS` request. **Parameters**: See `httpx.request`. """ return self.request( "OPTIONS", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) def head( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `HEAD` request. **Parameters**: See `httpx.request`. """ return self.request( "HEAD", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) def post( self, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `POST` request. **Parameters**: See `httpx.request`. """ return self.request( "POST", url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) def put( self, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PUT` request. **Parameters**: See `httpx.request`. """ return self.request( "PUT", url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) def patch( self, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PATCH` request. **Parameters**: See `httpx.request`. """ return self.request( "PATCH", url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) def delete( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `DELETE` request. **Parameters**: See `httpx.request`. """ return self.request( "DELETE", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) def close(self) -> None: """ Close transport and proxies. """ if self._state != ClientState.CLOSED: self._state = ClientState.CLOSED self._transport.close() for transport in self._mounts.values(): if transport is not None: transport.close() def __enter__(self: T) -> T: if self._state != ClientState.UNOPENED: msg = { ClientState.OPENED: "Cannot open a client instance more than once.", ClientState.CLOSED: ( "Cannot reopen a client instance, once it has been closed." ), }[self._state] raise RuntimeError(msg) self._state = ClientState.OPENED self._transport.__enter__() for transport in self._mounts.values(): if transport is not None: transport.__enter__() return self def __exit__( self, exc_type: type[BaseException] | None = None, exc_value: BaseException | None = None, traceback: TracebackType | None = None, ) -> None: self._state = ClientState.CLOSED self._transport.__exit__(exc_type, exc_value, traceback) for transport in self._mounts.values(): if transport is not None: transport.__exit__(exc_type, exc_value, traceback) class AsyncClient(BaseClient): """ An asynchronous HTTP client, with connection pooling, HTTP/2, redirects, cookie persistence, etc. It can be shared between tasks. Usage: ```python >>> async with httpx.AsyncClient() as client: >>> response = await client.get('https://example.org') ``` **Parameters:** * **auth** - *(optional)* An authentication class to use when sending requests. * **params** - *(optional)* Query parameters to include in request URLs, as a string, dictionary, or sequence of two-tuples. * **headers** - *(optional)* Dictionary of HTTP headers to include when sending requests. * **cookies** - *(optional)* Dictionary of Cookie items to include when sending requests. * **verify** - *(optional)* SSL certificates (a.k.a CA bundle) used to verify the identity of requested hosts. Either `True` (default CA bundle), a path to an SSL certificate file, an `ssl.SSLContext`, or `False` (which will disable verification). * **cert** - *(optional)* An SSL certificate used by the requested host to authenticate the client. Either a path to an SSL certificate file, or two-tuple of (certificate file, key file), or a three-tuple of (certificate file, key file, password). * **http2** - *(optional)* A boolean indicating if HTTP/2 support should be enabled. Defaults to `False`. * **proxy** - *(optional)* A proxy URL where all the traffic should be routed. * **proxies** - *(optional)* A dictionary mapping HTTP protocols to proxy URLs. * **timeout** - *(optional)* The timeout configuration to use when sending requests. * **limits** - *(optional)* The limits configuration to use. * **max_redirects** - *(optional)* The maximum number of redirect responses that should be followed. * **base_url** - *(optional)* A URL to use as the base when building request URLs. * **transport** - *(optional)* A transport class to use for sending requests over the network. * **app** - *(optional)* An ASGI application to send requests to, rather than sending actual network requests. * **trust_env** - *(optional)* Enables or disables usage of environment variables for configuration. * **default_encoding** - *(optional)* The default encoding to use for decoding response text, if no charset information is included in a response Content-Type header. Set to a callable for automatic character set detection. Default: "utf-8". """ def __init__( self, *, auth: AuthTypes | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, proxy: ProxyTypes | None = None, proxies: ProxiesTypes | None = None, mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None, timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG, follow_redirects: bool = False, limits: Limits = DEFAULT_LIMITS, max_redirects: int = DEFAULT_MAX_REDIRECTS, event_hooks: None | (typing.Mapping[str, list[typing.Callable[..., typing.Any]]]) = None, base_url: URLTypes = "", transport: AsyncBaseTransport | None = None, app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, default_encoding: str | typing.Callable[[bytes], str] = "utf-8", ) -> None: super().__init__( auth=auth, params=params, headers=headers, cookies=cookies, timeout=timeout, follow_redirects=follow_redirects, max_redirects=max_redirects, event_hooks=event_hooks, base_url=base_url, trust_env=trust_env, default_encoding=default_encoding, ) if http2: try: import h2 # noqa except ImportError: # pragma: no cover raise ImportError( "Using http2=True, but the 'h2' package is not installed. " "Make sure to install httpx using `pip install httpx[http2]`." ) from None if proxies: message = ( "The 'proxies' argument is now deprecated." " Use 'proxy' or 'mounts' instead." ) warnings.warn(message, DeprecationWarning) if proxy: raise RuntimeError("Use either `proxy` or 'proxies', not both.") if app: message = ( "The 'app' shortcut is now deprecated." " Use the explicit style 'transport=ASGITransport(app=...)' instead." ) warnings.warn(message, DeprecationWarning) allow_env_proxies = trust_env and transport is None proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies) self._transport = self._init_transport( verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, transport=transport, app=app, trust_env=trust_env, ) self._mounts: dict[URLPattern, AsyncBaseTransport | None] = { URLPattern(key): None if proxy is None else self._init_proxy_transport( proxy, verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, trust_env=trust_env, ) for key, proxy in proxy_map.items() } if mounts is not None: self._mounts.update( {URLPattern(key): transport for key, transport in mounts.items()} ) self._mounts = dict(sorted(self._mounts.items())) def _init_transport( self, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, transport: AsyncBaseTransport | None = None, app: typing.Callable[..., typing.Any] | None = None, trust_env: bool = True, ) -> AsyncBaseTransport: if transport is not None: return transport if app is not None: return ASGITransport(app=app) return AsyncHTTPTransport( verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, trust_env=trust_env, ) def _init_proxy_transport( self, proxy: Proxy, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, ) -> AsyncBaseTransport: return AsyncHTTPTransport( verify=verify, cert=cert, http1=http1, http2=http2, limits=limits, trust_env=trust_env, proxy=proxy, ) def _transport_for_url(self, url: URL) -> AsyncBaseTransport: """ Returns the transport instance that should be used for a given URL. This will either be the standard connection pool, or a proxy. """ for pattern, transport in self._mounts.items(): if pattern.matches(url): return self._transport if transport is None else transport return self._transport async def request( self, method: str, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Build and send a request. Equivalent to: ```python request = client.build_request(...) response = await client.send(request, ...) ``` See `AsyncClient.build_request()`, `AsyncClient.send()` and [Merging of configuration][0] for how the various parameters are merged with client-level configuration. [0]: /advanced/#merging-of-configuration """ if cookies is not None: # pragma: no cover message = ( "Setting per-request cookies=<...> is being deprecated, because " "the expected behaviour on cookie persistence is ambiguous. Set " "cookies directly on the client instance instead." ) warnings.warn(message, DeprecationWarning) request = self.build_request( method=method, url=url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, timeout=timeout, extensions=extensions, ) return await self.send(request, auth=auth, follow_redirects=follow_redirects) @asynccontextmanager async def stream( self, method: str, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> typing.AsyncIterator[Response]: """ Alternative to `httpx.request()` that streams the response body instead of loading it into memory at once. **Parameters**: See `httpx.request`. See also: [Streaming Responses][0] [0]: /quickstart#streaming-responses """ request = self.build_request( method=method, url=url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, timeout=timeout, extensions=extensions, ) response = await self.send( request=request, auth=auth, follow_redirects=follow_redirects, stream=True, ) try: yield response finally: await response.aclose() async def send( self, request: Request, *, stream: bool = False, auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, ) -> Response: """ Send a request. The request is sent as-is, unmodified. Typically you'll want to build one with `AsyncClient.build_request()` so that any client-level configuration is merged into the request, but passing an explicit `httpx.Request()` is supported as well. See also: [Request instances][0] [0]: /advanced/#request-instances """ if self._state == ClientState.CLOSED: raise RuntimeError("Cannot send a request, as the client has been closed.") self._state = ClientState.OPENED follow_redirects = ( self.follow_redirects if isinstance(follow_redirects, UseClientDefault) else follow_redirects ) auth = self._build_request_auth(request, auth) response = await self._send_handling_auth( request, auth=auth, follow_redirects=follow_redirects, history=[], ) try: if not stream: await response.aread() return response except BaseException as exc: await response.aclose() raise exc async def _send_handling_auth( self, request: Request, auth: Auth, follow_redirects: bool, history: list[Response], ) -> Response: auth_flow = auth.async_auth_flow(request) try: request = await auth_flow.__anext__() while True: response = await self._send_handling_redirects( request, follow_redirects=follow_redirects, history=history, ) try: try: next_request = await auth_flow.asend(response) except StopAsyncIteration: return response response.history = list(history) await response.aread() request = next_request history.append(response) except BaseException as exc: await response.aclose() raise exc finally: await auth_flow.aclose() async def _send_handling_redirects( self, request: Request, follow_redirects: bool, history: list[Response], ) -> Response: while True: if len(history) > self.max_redirects: raise TooManyRedirects( "Exceeded maximum allowed redirects.", request=request ) for hook in self._event_hooks["request"]: await hook(request) response = await self._send_single_request(request) try: for hook in self._event_hooks["response"]: await hook(response) response.history = list(history) if not response.has_redirect_location: return response request = self._build_redirect_request(request, response) history = history + [response] if follow_redirects: await response.aread() else: response.next_request = request return response except BaseException as exc: await response.aclose() raise exc async def _send_single_request(self, request: Request) -> Response: """ Sends a single request, without handling any redirections. """ transport = self._transport_for_url(request.url) timer = Timer() await timer.async_start() if not isinstance(request.stream, AsyncByteStream): raise RuntimeError( "Attempted to send an sync request with an AsyncClient instance." ) with request_context(request=request): response = await transport.handle_async_request(request) assert isinstance(response.stream, AsyncByteStream) response.request = request response.stream = BoundAsyncStream( response.stream, response=response, timer=timer ) self.cookies.extract_cookies(response) response.default_encoding = self._default_encoding logger.info( 'HTTP Request: %s %s "%s %d %s"', request.method, request.url, response.http_version, response.status_code, response.reason_phrase, ) return response async def get( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `GET` request. **Parameters**: See `httpx.request`. """ return await self.request( "GET", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) async def options( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send an `OPTIONS` request. **Parameters**: See `httpx.request`. """ return await self.request( "OPTIONS", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) async def head( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `HEAD` request. **Parameters**: See `httpx.request`. """ return await self.request( "HEAD", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) async def post( self, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `POST` request. **Parameters**: See `httpx.request`. """ return await self.request( "POST", url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) async def put( self, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PUT` request. **Parameters**: See `httpx.request`. """ return await self.request( "PUT", url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) async def patch( self, url: URLTypes, *, content: RequestContent | None = None, data: RequestData | None = None, files: RequestFiles | None = None, json: typing.Any | None = None, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `PATCH` request. **Parameters**: See `httpx.request`. """ return await self.request( "PATCH", url, content=content, data=data, files=files, json=json, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) async def delete( self, url: URLTypes, *, params: QueryParamTypes | None = None, headers: HeaderTypes | None = None, cookies: CookieTypes | None = None, auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT, follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT, timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT, extensions: RequestExtensions | None = None, ) -> Response: """ Send a `DELETE` request. **Parameters**: See `httpx.request`. """ return await self.request( "DELETE", url, params=params, headers=headers, cookies=cookies, auth=auth, follow_redirects=follow_redirects, timeout=timeout, extensions=extensions, ) async def aclose(self) -> None: """ Close transport and proxies. """ if self._state != ClientState.CLOSED: self._state = ClientState.CLOSED await self._transport.aclose() for proxy in self._mounts.values(): if proxy is not None: await proxy.aclose() async def __aenter__(self: U) -> U: if self._state != ClientState.UNOPENED: msg = { ClientState.OPENED: "Cannot open a client instance more than once.", ClientState.CLOSED: ( "Cannot reopen a client instance, once it has been closed." ), }[self._state] raise RuntimeError(msg) self._state = ClientState.OPENED await self._transport.__aenter__() for proxy in self._mounts.values(): if proxy is not None: await proxy.__aenter__() return self async def __aexit__( self, exc_type: type[BaseException] | None = None, exc_value: BaseException | None = None, traceback: TracebackType | None = None, ) -> None: self._state = ClientState.CLOSED await self._transport.__aexit__(exc_type, exc_value, traceback) for proxy in self._mounts.values(): if proxy is not None: await proxy.__aexit__(exc_type, exc_value, traceback)