summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/httpx/_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/httpx/_client.py')
-rw-r--r--venv/lib/python3.11/site-packages/httpx/_client.py2052
1 files changed, 0 insertions, 2052 deletions
diff --git a/venv/lib/python3.11/site-packages/httpx/_client.py b/venv/lib/python3.11/site-packages/httpx/_client.py
deleted file mode 100644
index e2c6702..0000000
--- a/venv/lib/python3.11/site-packages/httpx/_client.py
+++ /dev/null
@@ -1,2052 +0,0 @@
-from __future__ import annotations
-
-import datetime
-import enum
-import logging
-import typing
-import warnings
-from contextlib import asynccontextmanager, contextmanager
-from types import TracebackType
-
-from .__version__ import __version__
-from ._auth import Auth, BasicAuth, FunctionAuth
-from ._config import (
- DEFAULT_LIMITS,
- DEFAULT_MAX_REDIRECTS,
- DEFAULT_TIMEOUT_CONFIG,
- Limits,
- Proxy,
- Timeout,
-)
-from ._decoders import SUPPORTED_DECODERS
-from ._exceptions import (
- InvalidURL,
- RemoteProtocolError,
- TooManyRedirects,
- request_context,
-)
-from ._models import Cookies, Headers, Request, Response
-from ._status_codes import codes
-from ._transports.asgi import ASGITransport
-from ._transports.base import AsyncBaseTransport, BaseTransport
-from ._transports.default import AsyncHTTPTransport, HTTPTransport
-from ._transports.wsgi import WSGITransport
-from ._types import (
- AsyncByteStream,
- AuthTypes,
- CertTypes,
- CookieTypes,
- HeaderTypes,
- ProxiesTypes,
- ProxyTypes,
- QueryParamTypes,
- RequestContent,
- RequestData,
- RequestExtensions,
- RequestFiles,
- SyncByteStream,
- TimeoutTypes,
- URLTypes,
- VerifyTypes,
-)
-from ._urls import URL, QueryParams
-from ._utils import (
- Timer,
- URLPattern,
- get_environment_proxies,
- is_https_redirect,
- same_origin,
-)
-
-# The type annotation for @classmethod and context managers here follows PEP 484
-# https://www.python.org/dev/peps/pep-0484/#annotating-instance-and-class-methods
-T = typing.TypeVar("T", bound="Client")
-U = typing.TypeVar("U", bound="AsyncClient")
-
-
-class UseClientDefault:
- """
- For some parameters such as `auth=...` and `timeout=...` we need to be able
- to indicate the default "unset" state, in a way that is distinctly different
- to using `None`.
-
- The default "unset" state indicates that whatever default is set on the
- client should be used. This is different to setting `None`, which
- explicitly disables the parameter, possibly overriding a client default.
-
- For example we use `timeout=USE_CLIENT_DEFAULT` in the `request()` signature.
- Omitting the `timeout` parameter will send a request using whatever default
- timeout has been configured on the client. Including `timeout=None` will
- ensure no timeout is used.
-
- Note that user code shouldn't need to use the `USE_CLIENT_DEFAULT` constant,
- but it is used internally when a parameter is not included.
- """
-
-
-USE_CLIENT_DEFAULT = UseClientDefault()
-
-
-logger = logging.getLogger("httpx")
-
-USER_AGENT = f"python-httpx/{__version__}"
-ACCEPT_ENCODING = ", ".join(
- [key for key in SUPPORTED_DECODERS.keys() if key != "identity"]
-)
-
-
-class ClientState(enum.Enum):
- # UNOPENED:
- # The client has been instantiated, but has not been used to send a request,
- # or been opened by entering the context of a `with` block.
- UNOPENED = 1
- # OPENED:
- # The client has either sent a request, or is within a `with` block.
- OPENED = 2
- # CLOSED:
- # The client has either exited the `with` block, or `close()` has
- # been called explicitly.
- CLOSED = 3
-
-
-class BoundSyncStream(SyncByteStream):
- """
- A byte stream that is bound to a given response instance, and that
- ensures the `response.elapsed` is set once the response is closed.
- """
-
- def __init__(
- self, stream: SyncByteStream, response: Response, timer: Timer
- ) -> None:
- self._stream = stream
- self._response = response
- self._timer = timer
-
- def __iter__(self) -> typing.Iterator[bytes]:
- for chunk in self._stream:
- yield chunk
-
- def close(self) -> None:
- seconds = self._timer.sync_elapsed()
- self._response.elapsed = datetime.timedelta(seconds=seconds)
- self._stream.close()
-
-
-class BoundAsyncStream(AsyncByteStream):
- """
- An async byte stream that is bound to a given response instance, and that
- ensures the `response.elapsed` is set once the response is closed.
- """
-
- def __init__(
- self, stream: AsyncByteStream, response: Response, timer: Timer
- ) -> None:
- self._stream = stream
- self._response = response
- self._timer = timer
-
- async def __aiter__(self) -> typing.AsyncIterator[bytes]:
- async for chunk in self._stream:
- yield chunk
-
- async def aclose(self) -> None:
- seconds = await self._timer.async_elapsed()
- self._response.elapsed = datetime.timedelta(seconds=seconds)
- await self._stream.aclose()
-
-
-EventHook = typing.Callable[..., typing.Any]
-
-
-class BaseClient:
- def __init__(
- self,
- *,
- auth: AuthTypes | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
- follow_redirects: bool = False,
- max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None,
- base_url: URLTypes = "",
- trust_env: bool = True,
- default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
- ) -> None:
- event_hooks = {} if event_hooks is None else event_hooks
-
- self._base_url = self._enforce_trailing_slash(URL(base_url))
-
- self._auth = self._build_auth(auth)
- self._params = QueryParams(params)
- self.headers = Headers(headers)
- self._cookies = Cookies(cookies)
- self._timeout = Timeout(timeout)
- self.follow_redirects = follow_redirects
- self.max_redirects = max_redirects
- self._event_hooks = {
- "request": list(event_hooks.get("request", [])),
- "response": list(event_hooks.get("response", [])),
- }
- self._trust_env = trust_env
- self._default_encoding = default_encoding
- self._state = ClientState.UNOPENED
-
- @property
- def is_closed(self) -> bool:
- """
- Check if the client being closed
- """
- return self._state == ClientState.CLOSED
-
- @property
- def trust_env(self) -> bool:
- return self._trust_env
-
- def _enforce_trailing_slash(self, url: URL) -> URL:
- if url.raw_path.endswith(b"/"):
- return url
- return url.copy_with(raw_path=url.raw_path + b"/")
-
- def _get_proxy_map(
- self, proxies: ProxiesTypes | None, allow_env_proxies: bool
- ) -> dict[str, Proxy | None]:
- if proxies is None:
- if allow_env_proxies:
- return {
- key: None if url is None else Proxy(url=url)
- for key, url in get_environment_proxies().items()
- }
- return {}
- if isinstance(proxies, dict):
- new_proxies = {}
- for key, value in proxies.items():
- proxy = Proxy(url=value) if isinstance(value, (str, URL)) else value
- new_proxies[str(key)] = proxy
- return new_proxies
- else:
- proxy = Proxy(url=proxies) if isinstance(proxies, (str, URL)) else proxies
- return {"all://": proxy}
-
- @property
- def timeout(self) -> Timeout:
- return self._timeout
-
- @timeout.setter
- def timeout(self, timeout: TimeoutTypes) -> None:
- self._timeout = Timeout(timeout)
-
- @property
- def event_hooks(self) -> dict[str, list[EventHook]]:
- return self._event_hooks
-
- @event_hooks.setter
- def event_hooks(self, event_hooks: dict[str, list[EventHook]]) -> None:
- self._event_hooks = {
- "request": list(event_hooks.get("request", [])),
- "response": list(event_hooks.get("response", [])),
- }
-
- @property
- def auth(self) -> Auth | None:
- """
- Authentication class used when none is passed at the request-level.
-
- See also [Authentication][0].
-
- [0]: /quickstart/#authentication
- """
- return self._auth
-
- @auth.setter
- def auth(self, auth: AuthTypes) -> None:
- self._auth = self._build_auth(auth)
-
- @property
- def base_url(self) -> URL:
- """
- Base URL to use when sending requests with relative URLs.
- """
- return self._base_url
-
- @base_url.setter
- def base_url(self, url: URLTypes) -> None:
- self._base_url = self._enforce_trailing_slash(URL(url))
-
- @property
- def headers(self) -> Headers:
- """
- HTTP headers to include when sending requests.
- """
- return self._headers
-
- @headers.setter
- def headers(self, headers: HeaderTypes) -> None:
- client_headers = Headers(
- {
- b"Accept": b"*/*",
- b"Accept-Encoding": ACCEPT_ENCODING.encode("ascii"),
- b"Connection": b"keep-alive",
- b"User-Agent": USER_AGENT.encode("ascii"),
- }
- )
- client_headers.update(headers)
- self._headers = client_headers
-
- @property
- def cookies(self) -> Cookies:
- """
- Cookie values to include when sending requests.
- """
- return self._cookies
-
- @cookies.setter
- def cookies(self, cookies: CookieTypes) -> None:
- self._cookies = Cookies(cookies)
-
- @property
- def params(self) -> QueryParams:
- """
- Query parameters to include in the URL when sending requests.
- """
- return self._params
-
- @params.setter
- def params(self, params: QueryParamTypes) -> None:
- self._params = QueryParams(params)
-
- def build_request(
- self,
- method: str,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Request:
- """
- Build and return a request instance.
-
- * The `params`, `headers` and `cookies` arguments
- are merged with any values set on the client.
- * The `url` argument is merged with any `base_url` set on the client.
-
- See also: [Request instances][0]
-
- [0]: /advanced/#request-instances
- """
- url = self._merge_url(url)
- headers = self._merge_headers(headers)
- cookies = self._merge_cookies(cookies)
- params = self._merge_queryparams(params)
- extensions = {} if extensions is None else extensions
- if "timeout" not in extensions:
- timeout = (
- self.timeout
- if isinstance(timeout, UseClientDefault)
- else Timeout(timeout)
- )
- extensions = dict(**extensions, timeout=timeout.as_dict())
- return Request(
- method,
- url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- extensions=extensions,
- )
-
- def _merge_url(self, url: URLTypes) -> URL:
- """
- Merge a URL argument together with any 'base_url' on the client,
- to create the URL used for the outgoing request.
- """
- merge_url = URL(url)
- if merge_url.is_relative_url:
- # To merge URLs we always append to the base URL. To get this
- # behaviour correct we always ensure the base URL ends in a '/'
- # separator, and strip any leading '/' from the merge URL.
- #
- # So, eg...
- #
- # >>> client = Client(base_url="https://www.example.com/subpath")
- # >>> client.base_url
- # URL('https://www.example.com/subpath/')
- # >>> client.build_request("GET", "/path").url
- # URL('https://www.example.com/subpath/path')
- merge_raw_path = self.base_url.raw_path + merge_url.raw_path.lstrip(b"/")
- return self.base_url.copy_with(raw_path=merge_raw_path)
- return merge_url
-
- def _merge_cookies(self, cookies: CookieTypes | None = None) -> CookieTypes | None:
- """
- Merge a cookies argument together with any cookies on the client,
- to create the cookies used for the outgoing request.
- """
- if cookies or self.cookies:
- merged_cookies = Cookies(self.cookies)
- merged_cookies.update(cookies)
- return merged_cookies
- return cookies
-
- def _merge_headers(self, headers: HeaderTypes | None = None) -> HeaderTypes | None:
- """
- Merge a headers argument together with any headers on the client,
- to create the headers used for the outgoing request.
- """
- merged_headers = Headers(self.headers)
- merged_headers.update(headers)
- return merged_headers
-
- def _merge_queryparams(
- self, params: QueryParamTypes | None = None
- ) -> QueryParamTypes | None:
- """
- Merge a queryparams argument together with any queryparams on the client,
- to create the queryparams used for the outgoing request.
- """
- if params or self.params:
- merged_queryparams = QueryParams(self.params)
- return merged_queryparams.merge(params)
- return params
-
- def _build_auth(self, auth: AuthTypes | None) -> Auth | None:
- if auth is None:
- return None
- elif isinstance(auth, tuple):
- return BasicAuth(username=auth[0], password=auth[1])
- elif isinstance(auth, Auth):
- return auth
- elif callable(auth):
- return FunctionAuth(func=auth)
- else:
- raise TypeError(f'Invalid "auth" argument: {auth!r}')
-
- def _build_request_auth(
- self,
- request: Request,
- auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
- ) -> Auth:
- auth = (
- self._auth if isinstance(auth, UseClientDefault) else self._build_auth(auth)
- )
-
- if auth is not None:
- return auth
-
- username, password = request.url.username, request.url.password
- if username or password:
- return BasicAuth(username=username, password=password)
-
- return Auth()
-
- def _build_redirect_request(self, request: Request, response: Response) -> Request:
- """
- Given a request and a redirect response, return a new request that
- should be used to effect the redirect.
- """
- method = self._redirect_method(request, response)
- url = self._redirect_url(request, response)
- headers = self._redirect_headers(request, url, method)
- stream = self._redirect_stream(request, method)
- cookies = Cookies(self.cookies)
- return Request(
- method=method,
- url=url,
- headers=headers,
- cookies=cookies,
- stream=stream,
- extensions=request.extensions,
- )
-
- def _redirect_method(self, request: Request, response: Response) -> str:
- """
- When being redirected we may want to change the method of the request
- based on certain specs or browser behavior.
- """
- method = request.method
-
- # https://tools.ietf.org/html/rfc7231#section-6.4.4
- if response.status_code == codes.SEE_OTHER and method != "HEAD":
- method = "GET"
-
- # Do what the browsers do, despite standards...
- # Turn 302s into GETs.
- if response.status_code == codes.FOUND and method != "HEAD":
- method = "GET"
-
- # If a POST is responded to with a 301, turn it into a GET.
- # This bizarre behaviour is explained in 'requests' issue 1704.
- if response.status_code == codes.MOVED_PERMANENTLY and method == "POST":
- method = "GET"
-
- return method
-
- def _redirect_url(self, request: Request, response: Response) -> URL:
- """
- Return the URL for the redirect to follow.
- """
- location = response.headers["Location"]
-
- try:
- url = URL(location)
- except InvalidURL as exc:
- raise RemoteProtocolError(
- f"Invalid URL in location header: {exc}.", request=request
- ) from None
-
- # Handle malformed 'Location' headers that are "absolute" form, have no host.
- # See: https://github.com/encode/httpx/issues/771
- if url.scheme and not url.host:
- url = url.copy_with(host=request.url.host)
-
- # Facilitate relative 'Location' headers, as allowed by RFC 7231.
- # (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
- if url.is_relative_url:
- url = request.url.join(url)
-
- # Attach previous fragment if needed (RFC 7231 7.1.2)
- if request.url.fragment and not url.fragment:
- url = url.copy_with(fragment=request.url.fragment)
-
- return url
-
- def _redirect_headers(self, request: Request, url: URL, method: str) -> Headers:
- """
- Return the headers that should be used for the redirect request.
- """
- headers = Headers(request.headers)
-
- if not same_origin(url, request.url):
- if not is_https_redirect(request.url, url):
- # Strip Authorization headers when responses are redirected
- # away from the origin. (Except for direct HTTP to HTTPS redirects.)
- headers.pop("Authorization", None)
-
- # Update the Host header.
- headers["Host"] = url.netloc.decode("ascii")
-
- if method != request.method and method == "GET":
- # If we've switch to a 'GET' request, then strip any headers which
- # are only relevant to the request body.
- headers.pop("Content-Length", None)
- headers.pop("Transfer-Encoding", None)
-
- # We should use the client cookie store to determine any cookie header,
- # rather than whatever was on the original outgoing request.
- headers.pop("Cookie", None)
-
- return headers
-
- def _redirect_stream(
- self, request: Request, method: str
- ) -> SyncByteStream | AsyncByteStream | None:
- """
- Return the body that should be used for the redirect request.
- """
- if method != request.method and method == "GET":
- return None
-
- return request.stream
-
-
-class Client(BaseClient):
- """
- An HTTP client, with connection pooling, HTTP/2, redirects, cookie persistence, etc.
-
- It can be shared between threads.
-
- Usage:
-
- ```python
- >>> client = httpx.Client()
- >>> response = client.get('https://example.org')
- ```
-
- **Parameters:**
-
- * **auth** - *(optional)* An authentication class to use when sending
- requests.
- * **params** - *(optional)* Query parameters to include in request URLs, as
- a string, dictionary, or sequence of two-tuples.
- * **headers** - *(optional)* Dictionary of HTTP headers to include when
- sending requests.
- * **cookies** - *(optional)* Dictionary of Cookie items to include when
- sending requests.
- * **verify** - *(optional)* SSL certificates (a.k.a CA bundle) used to
- verify the identity of requested hosts. Either `True` (default CA bundle),
- a path to an SSL certificate file, an `ssl.SSLContext`, or `False`
- (which will disable verification).
- * **cert** - *(optional)* An SSL certificate used by the requested host
- to authenticate the client. Either a path to an SSL certificate file, or
- two-tuple of (certificate file, key file), or a three-tuple of (certificate
- file, key file, password).
- * **http2** - *(optional)* A boolean indicating if HTTP/2 support should be
- enabled. Defaults to `False`.
- * **proxy** - *(optional)* A proxy URL where all the traffic should be routed.
- * **proxies** - *(optional)* A dictionary mapping proxy keys to proxy
- URLs.
- * **timeout** - *(optional)* The timeout configuration to use when sending
- requests.
- * **limits** - *(optional)* The limits configuration to use.
- * **max_redirects** - *(optional)* The maximum number of redirect responses
- that should be followed.
- * **base_url** - *(optional)* A URL to use as the base when building
- request URLs.
- * **transport** - *(optional)* A transport class to use for sending requests
- over the network.
- * **app** - *(optional)* An WSGI application to send requests to,
- rather than sending actual network requests.
- * **trust_env** - *(optional)* Enables or disables usage of environment
- variables for configuration.
- * **default_encoding** - *(optional)* The default encoding to use for decoding
- response text, if no charset information is included in a response Content-Type
- header. Set to a callable for automatic character set detection. Default: "utf-8".
- """
-
- def __init__(
- self,
- *,
- auth: AuthTypes | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- verify: VerifyTypes = True,
- cert: CertTypes | None = None,
- http1: bool = True,
- http2: bool = False,
- proxy: ProxyTypes | None = None,
- proxies: ProxiesTypes | None = None,
- mounts: None | (typing.Mapping[str, BaseTransport | None]) = None,
- timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
- follow_redirects: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: None | (typing.Mapping[str, list[EventHook]]) = None,
- base_url: URLTypes = "",
- transport: BaseTransport | None = None,
- app: typing.Callable[..., typing.Any] | None = None,
- trust_env: bool = True,
- default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
- ) -> None:
- super().__init__(
- auth=auth,
- params=params,
- headers=headers,
- cookies=cookies,
- timeout=timeout,
- follow_redirects=follow_redirects,
- max_redirects=max_redirects,
- event_hooks=event_hooks,
- base_url=base_url,
- trust_env=trust_env,
- default_encoding=default_encoding,
- )
-
- if http2:
- try:
- import h2 # noqa
- except ImportError: # pragma: no cover
- raise ImportError(
- "Using http2=True, but the 'h2' package is not installed. "
- "Make sure to install httpx using `pip install httpx[http2]`."
- ) from None
-
- if proxies:
- message = (
- "The 'proxies' argument is now deprecated."
- " Use 'proxy' or 'mounts' instead."
- )
- warnings.warn(message, DeprecationWarning)
- if proxy:
- raise RuntimeError("Use either `proxy` or 'proxies', not both.")
-
- if app:
- message = (
- "The 'app' shortcut is now deprecated."
- " Use the explicit style 'transport=WSGITransport(app=...)' instead."
- )
- warnings.warn(message, DeprecationWarning)
-
- allow_env_proxies = trust_env and app is None and transport is None
- proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies)
-
- self._transport = self._init_transport(
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- transport=transport,
- app=app,
- trust_env=trust_env,
- )
- self._mounts: dict[URLPattern, BaseTransport | None] = {
- URLPattern(key): None
- if proxy is None
- else self._init_proxy_transport(
- proxy,
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- trust_env=trust_env,
- )
- for key, proxy in proxy_map.items()
- }
- if mounts is not None:
- self._mounts.update(
- {URLPattern(key): transport for key, transport in mounts.items()}
- )
-
- self._mounts = dict(sorted(self._mounts.items()))
-
- def _init_transport(
- self,
- verify: VerifyTypes = True,
- cert: CertTypes | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- transport: BaseTransport | None = None,
- app: typing.Callable[..., typing.Any] | None = None,
- trust_env: bool = True,
- ) -> BaseTransport:
- if transport is not None:
- return transport
-
- if app is not None:
- return WSGITransport(app=app)
-
- return HTTPTransport(
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- trust_env=trust_env,
- )
-
- def _init_proxy_transport(
- self,
- proxy: Proxy,
- verify: VerifyTypes = True,
- cert: CertTypes | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- trust_env: bool = True,
- ) -> BaseTransport:
- return HTTPTransport(
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- trust_env=trust_env,
- proxy=proxy,
- )
-
- def _transport_for_url(self, url: URL) -> BaseTransport:
- """
- Returns the transport instance that should be used for a given URL.
- This will either be the standard connection pool, or a proxy.
- """
- for pattern, transport in self._mounts.items():
- if pattern.matches(url):
- return self._transport if transport is None else transport
-
- return self._transport
-
- def request(
- self,
- method: str,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Build and send a request.
-
- Equivalent to:
-
- ```python
- request = client.build_request(...)
- response = client.send(request, ...)
- ```
-
- See `Client.build_request()`, `Client.send()` and
- [Merging of configuration][0] for how the various parameters
- are merged with client-level configuration.
-
- [0]: /advanced/#merging-of-configuration
- """
- if cookies is not None:
- message = (
- "Setting per-request cookies=<...> is being deprecated, because "
- "the expected behaviour on cookie persistence is ambiguous. Set "
- "cookies directly on the client instance instead."
- )
- warnings.warn(message, DeprecationWarning)
-
- request = self.build_request(
- method=method,
- url=url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- timeout=timeout,
- extensions=extensions,
- )
- return self.send(request, auth=auth, follow_redirects=follow_redirects)
-
- @contextmanager
- def stream(
- self,
- method: str,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> typing.Iterator[Response]:
- """
- Alternative to `httpx.request()` that streams the response body
- instead of loading it into memory at once.
-
- **Parameters**: See `httpx.request`.
-
- See also: [Streaming Responses][0]
-
- [0]: /quickstart#streaming-responses
- """
- request = self.build_request(
- method=method,
- url=url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- timeout=timeout,
- extensions=extensions,
- )
- response = self.send(
- request=request,
- auth=auth,
- follow_redirects=follow_redirects,
- stream=True,
- )
- try:
- yield response
- finally:
- response.close()
-
- def send(
- self,
- request: Request,
- *,
- stream: bool = False,
- auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- ) -> Response:
- """
- Send a request.
-
- The request is sent as-is, unmodified.
-
- Typically you'll want to build one with `Client.build_request()`
- so that any client-level configuration is merged into the request,
- but passing an explicit `httpx.Request()` is supported as well.
-
- See also: [Request instances][0]
-
- [0]: /advanced/#request-instances
- """
- if self._state == ClientState.CLOSED:
- raise RuntimeError("Cannot send a request, as the client has been closed.")
-
- self._state = ClientState.OPENED
- follow_redirects = (
- self.follow_redirects
- if isinstance(follow_redirects, UseClientDefault)
- else follow_redirects
- )
-
- auth = self._build_request_auth(request, auth)
-
- response = self._send_handling_auth(
- request,
- auth=auth,
- follow_redirects=follow_redirects,
- history=[],
- )
- try:
- if not stream:
- response.read()
-
- return response
-
- except BaseException as exc:
- response.close()
- raise exc
-
- def _send_handling_auth(
- self,
- request: Request,
- auth: Auth,
- follow_redirects: bool,
- history: list[Response],
- ) -> Response:
- auth_flow = auth.sync_auth_flow(request)
- try:
- request = next(auth_flow)
-
- while True:
- response = self._send_handling_redirects(
- request,
- follow_redirects=follow_redirects,
- history=history,
- )
- try:
- try:
- next_request = auth_flow.send(response)
- except StopIteration:
- return response
-
- response.history = list(history)
- response.read()
- request = next_request
- history.append(response)
-
- except BaseException as exc:
- response.close()
- raise exc
- finally:
- auth_flow.close()
-
- def _send_handling_redirects(
- self,
- request: Request,
- follow_redirects: bool,
- history: list[Response],
- ) -> Response:
- while True:
- if len(history) > self.max_redirects:
- raise TooManyRedirects(
- "Exceeded maximum allowed redirects.", request=request
- )
-
- for hook in self._event_hooks["request"]:
- hook(request)
-
- response = self._send_single_request(request)
- try:
- for hook in self._event_hooks["response"]:
- hook(response)
- response.history = list(history)
-
- if not response.has_redirect_location:
- return response
-
- request = self._build_redirect_request(request, response)
- history = history + [response]
-
- if follow_redirects:
- response.read()
- else:
- response.next_request = request
- return response
-
- except BaseException as exc:
- response.close()
- raise exc
-
- def _send_single_request(self, request: Request) -> Response:
- """
- Sends a single request, without handling any redirections.
- """
- transport = self._transport_for_url(request.url)
- timer = Timer()
- timer.sync_start()
-
- if not isinstance(request.stream, SyncByteStream):
- raise RuntimeError(
- "Attempted to send an async request with a sync Client instance."
- )
-
- with request_context(request=request):
- response = transport.handle_request(request)
-
- assert isinstance(response.stream, SyncByteStream)
-
- response.request = request
- response.stream = BoundSyncStream(
- response.stream, response=response, timer=timer
- )
- self.cookies.extract_cookies(response)
- response.default_encoding = self._default_encoding
-
- logger.info(
- 'HTTP Request: %s %s "%s %d %s"',
- request.method,
- request.url,
- response.http_version,
- response.status_code,
- response.reason_phrase,
- )
-
- return response
-
- def get(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `GET` request.
-
- **Parameters**: See `httpx.request`.
- """
- return self.request(
- "GET",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- def options(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send an `OPTIONS` request.
-
- **Parameters**: See `httpx.request`.
- """
- return self.request(
- "OPTIONS",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- def head(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `HEAD` request.
-
- **Parameters**: See `httpx.request`.
- """
- return self.request(
- "HEAD",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- def post(
- self,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `POST` request.
-
- **Parameters**: See `httpx.request`.
- """
- return self.request(
- "POST",
- url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- def put(
- self,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `PUT` request.
-
- **Parameters**: See `httpx.request`.
- """
- return self.request(
- "PUT",
- url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- def patch(
- self,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `PATCH` request.
-
- **Parameters**: See `httpx.request`.
- """
- return self.request(
- "PATCH",
- url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- def delete(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `DELETE` request.
-
- **Parameters**: See `httpx.request`.
- """
- return self.request(
- "DELETE",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- def close(self) -> None:
- """
- Close transport and proxies.
- """
- if self._state != ClientState.CLOSED:
- self._state = ClientState.CLOSED
-
- self._transport.close()
- for transport in self._mounts.values():
- if transport is not None:
- transport.close()
-
- def __enter__(self: T) -> T:
- if self._state != ClientState.UNOPENED:
- msg = {
- ClientState.OPENED: "Cannot open a client instance more than once.",
- ClientState.CLOSED: (
- "Cannot reopen a client instance, once it has been closed."
- ),
- }[self._state]
- raise RuntimeError(msg)
-
- self._state = ClientState.OPENED
-
- self._transport.__enter__()
- for transport in self._mounts.values():
- if transport is not None:
- transport.__enter__()
- return self
-
- def __exit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: TracebackType | None = None,
- ) -> None:
- self._state = ClientState.CLOSED
-
- self._transport.__exit__(exc_type, exc_value, traceback)
- for transport in self._mounts.values():
- if transport is not None:
- transport.__exit__(exc_type, exc_value, traceback)
-
-
-class AsyncClient(BaseClient):
- """
- An asynchronous HTTP client, with connection pooling, HTTP/2, redirects,
- cookie persistence, etc.
-
- It can be shared between tasks.
-
- Usage:
-
- ```python
- >>> async with httpx.AsyncClient() as client:
- >>> response = await client.get('https://example.org')
- ```
-
- **Parameters:**
-
- * **auth** - *(optional)* An authentication class to use when sending
- requests.
- * **params** - *(optional)* Query parameters to include in request URLs, as
- a string, dictionary, or sequence of two-tuples.
- * **headers** - *(optional)* Dictionary of HTTP headers to include when
- sending requests.
- * **cookies** - *(optional)* Dictionary of Cookie items to include when
- sending requests.
- * **verify** - *(optional)* SSL certificates (a.k.a CA bundle) used to
- verify the identity of requested hosts. Either `True` (default CA bundle),
- a path to an SSL certificate file, an `ssl.SSLContext`, or `False`
- (which will disable verification).
- * **cert** - *(optional)* An SSL certificate used by the requested host
- to authenticate the client. Either a path to an SSL certificate file, or
- two-tuple of (certificate file, key file), or a three-tuple of (certificate
- file, key file, password).
- * **http2** - *(optional)* A boolean indicating if HTTP/2 support should be
- enabled. Defaults to `False`.
- * **proxy** - *(optional)* A proxy URL where all the traffic should be routed.
- * **proxies** - *(optional)* A dictionary mapping HTTP protocols to proxy
- URLs.
- * **timeout** - *(optional)* The timeout configuration to use when sending
- requests.
- * **limits** - *(optional)* The limits configuration to use.
- * **max_redirects** - *(optional)* The maximum number of redirect responses
- that should be followed.
- * **base_url** - *(optional)* A URL to use as the base when building
- request URLs.
- * **transport** - *(optional)* A transport class to use for sending requests
- over the network.
- * **app** - *(optional)* An ASGI application to send requests to,
- rather than sending actual network requests.
- * **trust_env** - *(optional)* Enables or disables usage of environment
- variables for configuration.
- * **default_encoding** - *(optional)* The default encoding to use for decoding
- response text, if no charset information is included in a response Content-Type
- header. Set to a callable for automatic character set detection. Default: "utf-8".
- """
-
- def __init__(
- self,
- *,
- auth: AuthTypes | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- verify: VerifyTypes = True,
- cert: CertTypes | None = None,
- http1: bool = True,
- http2: bool = False,
- proxy: ProxyTypes | None = None,
- proxies: ProxiesTypes | None = None,
- mounts: None | (typing.Mapping[str, AsyncBaseTransport | None]) = None,
- timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
- follow_redirects: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- max_redirects: int = DEFAULT_MAX_REDIRECTS,
- event_hooks: None
- | (typing.Mapping[str, list[typing.Callable[..., typing.Any]]]) = None,
- base_url: URLTypes = "",
- transport: AsyncBaseTransport | None = None,
- app: typing.Callable[..., typing.Any] | None = None,
- trust_env: bool = True,
- default_encoding: str | typing.Callable[[bytes], str] = "utf-8",
- ) -> None:
- super().__init__(
- auth=auth,
- params=params,
- headers=headers,
- cookies=cookies,
- timeout=timeout,
- follow_redirects=follow_redirects,
- max_redirects=max_redirects,
- event_hooks=event_hooks,
- base_url=base_url,
- trust_env=trust_env,
- default_encoding=default_encoding,
- )
-
- if http2:
- try:
- import h2 # noqa
- except ImportError: # pragma: no cover
- raise ImportError(
- "Using http2=True, but the 'h2' package is not installed. "
- "Make sure to install httpx using `pip install httpx[http2]`."
- ) from None
-
- if proxies:
- message = (
- "The 'proxies' argument is now deprecated."
- " Use 'proxy' or 'mounts' instead."
- )
- warnings.warn(message, DeprecationWarning)
- if proxy:
- raise RuntimeError("Use either `proxy` or 'proxies', not both.")
-
- if app:
- message = (
- "The 'app' shortcut is now deprecated."
- " Use the explicit style 'transport=ASGITransport(app=...)' instead."
- )
- warnings.warn(message, DeprecationWarning)
-
- allow_env_proxies = trust_env and transport is None
- proxy_map = self._get_proxy_map(proxies or proxy, allow_env_proxies)
-
- self._transport = self._init_transport(
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- transport=transport,
- app=app,
- trust_env=trust_env,
- )
-
- self._mounts: dict[URLPattern, AsyncBaseTransport | None] = {
- URLPattern(key): None
- if proxy is None
- else self._init_proxy_transport(
- proxy,
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- trust_env=trust_env,
- )
- for key, proxy in proxy_map.items()
- }
- if mounts is not None:
- self._mounts.update(
- {URLPattern(key): transport for key, transport in mounts.items()}
- )
- self._mounts = dict(sorted(self._mounts.items()))
-
- def _init_transport(
- self,
- verify: VerifyTypes = True,
- cert: CertTypes | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- transport: AsyncBaseTransport | None = None,
- app: typing.Callable[..., typing.Any] | None = None,
- trust_env: bool = True,
- ) -> AsyncBaseTransport:
- if transport is not None:
- return transport
-
- if app is not None:
- return ASGITransport(app=app)
-
- return AsyncHTTPTransport(
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- trust_env=trust_env,
- )
-
- def _init_proxy_transport(
- self,
- proxy: Proxy,
- verify: VerifyTypes = True,
- cert: CertTypes | None = None,
- http1: bool = True,
- http2: bool = False,
- limits: Limits = DEFAULT_LIMITS,
- trust_env: bool = True,
- ) -> AsyncBaseTransport:
- return AsyncHTTPTransport(
- verify=verify,
- cert=cert,
- http1=http1,
- http2=http2,
- limits=limits,
- trust_env=trust_env,
- proxy=proxy,
- )
-
- def _transport_for_url(self, url: URL) -> AsyncBaseTransport:
- """
- Returns the transport instance that should be used for a given URL.
- This will either be the standard connection pool, or a proxy.
- """
- for pattern, transport in self._mounts.items():
- if pattern.matches(url):
- return self._transport if transport is None else transport
-
- return self._transport
-
- async def request(
- self,
- method: str,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Build and send a request.
-
- Equivalent to:
-
- ```python
- request = client.build_request(...)
- response = await client.send(request, ...)
- ```
-
- See `AsyncClient.build_request()`, `AsyncClient.send()`
- and [Merging of configuration][0] for how the various parameters
- are merged with client-level configuration.
-
- [0]: /advanced/#merging-of-configuration
- """
-
- if cookies is not None: # pragma: no cover
- message = (
- "Setting per-request cookies=<...> is being deprecated, because "
- "the expected behaviour on cookie persistence is ambiguous. Set "
- "cookies directly on the client instance instead."
- )
- warnings.warn(message, DeprecationWarning)
-
- request = self.build_request(
- method=method,
- url=url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- timeout=timeout,
- extensions=extensions,
- )
- return await self.send(request, auth=auth, follow_redirects=follow_redirects)
-
- @asynccontextmanager
- async def stream(
- self,
- method: str,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> typing.AsyncIterator[Response]:
- """
- Alternative to `httpx.request()` that streams the response body
- instead of loading it into memory at once.
-
- **Parameters**: See `httpx.request`.
-
- See also: [Streaming Responses][0]
-
- [0]: /quickstart#streaming-responses
- """
- request = self.build_request(
- method=method,
- url=url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- timeout=timeout,
- extensions=extensions,
- )
- response = await self.send(
- request=request,
- auth=auth,
- follow_redirects=follow_redirects,
- stream=True,
- )
- try:
- yield response
- finally:
- await response.aclose()
-
- async def send(
- self,
- request: Request,
- *,
- stream: bool = False,
- auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- ) -> Response:
- """
- Send a request.
-
- The request is sent as-is, unmodified.
-
- Typically you'll want to build one with `AsyncClient.build_request()`
- so that any client-level configuration is merged into the request,
- but passing an explicit `httpx.Request()` is supported as well.
-
- See also: [Request instances][0]
-
- [0]: /advanced/#request-instances
- """
- if self._state == ClientState.CLOSED:
- raise RuntimeError("Cannot send a request, as the client has been closed.")
-
- self._state = ClientState.OPENED
- follow_redirects = (
- self.follow_redirects
- if isinstance(follow_redirects, UseClientDefault)
- else follow_redirects
- )
-
- auth = self._build_request_auth(request, auth)
-
- response = await self._send_handling_auth(
- request,
- auth=auth,
- follow_redirects=follow_redirects,
- history=[],
- )
- try:
- if not stream:
- await response.aread()
-
- return response
-
- except BaseException as exc:
- await response.aclose()
- raise exc
-
- async def _send_handling_auth(
- self,
- request: Request,
- auth: Auth,
- follow_redirects: bool,
- history: list[Response],
- ) -> Response:
- auth_flow = auth.async_auth_flow(request)
- try:
- request = await auth_flow.__anext__()
-
- while True:
- response = await self._send_handling_redirects(
- request,
- follow_redirects=follow_redirects,
- history=history,
- )
- try:
- try:
- next_request = await auth_flow.asend(response)
- except StopAsyncIteration:
- return response
-
- response.history = list(history)
- await response.aread()
- request = next_request
- history.append(response)
-
- except BaseException as exc:
- await response.aclose()
- raise exc
- finally:
- await auth_flow.aclose()
-
- async def _send_handling_redirects(
- self,
- request: Request,
- follow_redirects: bool,
- history: list[Response],
- ) -> Response:
- while True:
- if len(history) > self.max_redirects:
- raise TooManyRedirects(
- "Exceeded maximum allowed redirects.", request=request
- )
-
- for hook in self._event_hooks["request"]:
- await hook(request)
-
- response = await self._send_single_request(request)
- try:
- for hook in self._event_hooks["response"]:
- await hook(response)
-
- response.history = list(history)
-
- if not response.has_redirect_location:
- return response
-
- request = self._build_redirect_request(request, response)
- history = history + [response]
-
- if follow_redirects:
- await response.aread()
- else:
- response.next_request = request
- return response
-
- except BaseException as exc:
- await response.aclose()
- raise exc
-
- async def _send_single_request(self, request: Request) -> Response:
- """
- Sends a single request, without handling any redirections.
- """
- transport = self._transport_for_url(request.url)
- timer = Timer()
- await timer.async_start()
-
- if not isinstance(request.stream, AsyncByteStream):
- raise RuntimeError(
- "Attempted to send an sync request with an AsyncClient instance."
- )
-
- with request_context(request=request):
- response = await transport.handle_async_request(request)
-
- assert isinstance(response.stream, AsyncByteStream)
- response.request = request
- response.stream = BoundAsyncStream(
- response.stream, response=response, timer=timer
- )
- self.cookies.extract_cookies(response)
- response.default_encoding = self._default_encoding
-
- logger.info(
- 'HTTP Request: %s %s "%s %d %s"',
- request.method,
- request.url,
- response.http_version,
- response.status_code,
- response.reason_phrase,
- )
-
- return response
-
- async def get(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `GET` request.
-
- **Parameters**: See `httpx.request`.
- """
- return await self.request(
- "GET",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- async def options(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send an `OPTIONS` request.
-
- **Parameters**: See `httpx.request`.
- """
- return await self.request(
- "OPTIONS",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- async def head(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `HEAD` request.
-
- **Parameters**: See `httpx.request`.
- """
- return await self.request(
- "HEAD",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- async def post(
- self,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `POST` request.
-
- **Parameters**: See `httpx.request`.
- """
- return await self.request(
- "POST",
- url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- async def put(
- self,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `PUT` request.
-
- **Parameters**: See `httpx.request`.
- """
- return await self.request(
- "PUT",
- url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- async def patch(
- self,
- url: URLTypes,
- *,
- content: RequestContent | None = None,
- data: RequestData | None = None,
- files: RequestFiles | None = None,
- json: typing.Any | None = None,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `PATCH` request.
-
- **Parameters**: See `httpx.request`.
- """
- return await self.request(
- "PATCH",
- url,
- content=content,
- data=data,
- files=files,
- json=json,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- async def delete(
- self,
- url: URLTypes,
- *,
- params: QueryParamTypes | None = None,
- headers: HeaderTypes | None = None,
- cookies: CookieTypes | None = None,
- auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
- timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
- extensions: RequestExtensions | None = None,
- ) -> Response:
- """
- Send a `DELETE` request.
-
- **Parameters**: See `httpx.request`.
- """
- return await self.request(
- "DELETE",
- url,
- params=params,
- headers=headers,
- cookies=cookies,
- auth=auth,
- follow_redirects=follow_redirects,
- timeout=timeout,
- extensions=extensions,
- )
-
- async def aclose(self) -> None:
- """
- Close transport and proxies.
- """
- if self._state != ClientState.CLOSED:
- self._state = ClientState.CLOSED
-
- await self._transport.aclose()
- for proxy in self._mounts.values():
- if proxy is not None:
- await proxy.aclose()
-
- async def __aenter__(self: U) -> U:
- if self._state != ClientState.UNOPENED:
- msg = {
- ClientState.OPENED: "Cannot open a client instance more than once.",
- ClientState.CLOSED: (
- "Cannot reopen a client instance, once it has been closed."
- ),
- }[self._state]
- raise RuntimeError(msg)
-
- self._state = ClientState.OPENED
-
- await self._transport.__aenter__()
- for proxy in self._mounts.values():
- if proxy is not None:
- await proxy.__aenter__()
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None = None,
- exc_value: BaseException | None = None,
- traceback: TracebackType | None = None,
- ) -> None:
- self._state = ClientState.CLOSED
-
- await self._transport.__aexit__(exc_type, exc_value, traceback)
- for proxy in self._mounts.values():
- if proxy is not None:
- await proxy.__aexit__(exc_type, exc_value, traceback)