summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/testing/life_span_handler.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/litestar/testing/life_span_handler.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/testing/life_span_handler.py')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/testing/life_span_handler.py78
1 files changed, 0 insertions, 78 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/testing/life_span_handler.py b/venv/lib/python3.11/site-packages/litestar/testing/life_span_handler.py
deleted file mode 100644
index 8ee7d22..0000000
--- a/venv/lib/python3.11/site-packages/litestar/testing/life_span_handler.py
+++ /dev/null
@@ -1,78 +0,0 @@
-from __future__ import annotations
-
-from math import inf
-from typing import TYPE_CHECKING, Generic, Optional, TypeVar, cast
-
-from anyio import create_memory_object_stream
-from anyio.streams.stapled import StapledObjectStream
-
-from litestar.testing.client.base import BaseTestClient
-
-if TYPE_CHECKING:
- from litestar.types import (
- LifeSpanReceiveMessage, # noqa: F401
- LifeSpanSendMessage,
- LifeSpanShutdownEvent,
- LifeSpanStartupEvent,
- )
-
-T = TypeVar("T", bound=BaseTestClient)
-
-
-class LifeSpanHandler(Generic[T]):
- __slots__ = "stream_send", "stream_receive", "client", "task"
-
- def __init__(self, client: T) -> None:
- self.client = client
- self.stream_send = StapledObjectStream[Optional["LifeSpanSendMessage"]](*create_memory_object_stream(inf)) # type: ignore[arg-type]
- self.stream_receive = StapledObjectStream["LifeSpanReceiveMessage"](*create_memory_object_stream(inf)) # type: ignore[arg-type]
-
- with self.client.portal() as portal:
- self.task = portal.start_task_soon(self.lifespan)
- portal.call(self.wait_startup)
-
- async def receive(self) -> LifeSpanSendMessage:
- message = await self.stream_send.receive()
- if message is None:
- self.task.result()
- return cast("LifeSpanSendMessage", message)
-
- async def wait_startup(self) -> None:
- event: LifeSpanStartupEvent = {"type": "lifespan.startup"}
- await self.stream_receive.send(event)
-
- message = await self.receive()
- if message["type"] not in (
- "lifespan.startup.complete",
- "lifespan.startup.failed",
- ):
- raise RuntimeError(
- "Received unexpected ASGI message type. Expected 'lifespan.startup.complete' or "
- f"'lifespan.startup.failed'. Got {message['type']!r}",
- )
- if message["type"] == "lifespan.startup.failed":
- await self.receive()
-
- async def wait_shutdown(self) -> None:
- async with self.stream_send:
- lifespan_shutdown_event: LifeSpanShutdownEvent = {"type": "lifespan.shutdown"}
- await self.stream_receive.send(lifespan_shutdown_event)
-
- message = await self.receive()
- if message["type"] not in (
- "lifespan.shutdown.complete",
- "lifespan.shutdown.failed",
- ):
- raise RuntimeError(
- "Received unexpected ASGI message type. Expected 'lifespan.shutdown.complete' or "
- f"'lifespan.shutdown.failed'. Got {message['type']!r}",
- )
- if message["type"] == "lifespan.shutdown.failed":
- await self.receive()
-
- async def lifespan(self) -> None:
- scope = {"type": "lifespan"}
- try:
- await self.client.app(scope, self.stream_receive.receive, self.stream_send.send)
- finally:
- await self.stream_send.send(None)