1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
|
from __future__ import annotations
import socket
from abc import abstractmethod
from collections.abc import Callable, Collection, Mapping
from contextlib import AsyncExitStack
from io import IOBase
from ipaddress import IPv4Address, IPv6Address
from socket import AddressFamily
from types import TracebackType
from typing import Any, Tuple, TypeVar, Union
from .._core._typedattr import (
TypedAttributeProvider,
TypedAttributeSet,
typed_attribute,
)
from ._streams import ByteStream, Listener, UnreliableObjectStream
from ._tasks import TaskGroup
IPAddressType = Union[str, IPv4Address, IPv6Address]
IPSockAddrType = Tuple[str, int]
SockAddrType = Union[IPSockAddrType, str]
UDPPacketType = Tuple[bytes, IPSockAddrType]
UNIXDatagramPacketType = Tuple[bytes, str]
T_Retval = TypeVar("T_Retval")
class _NullAsyncContextManager:
async def __aenter__(self) -> None:
pass
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None:
return None
class SocketAttribute(TypedAttributeSet):
#: the address family of the underlying socket
family: AddressFamily = typed_attribute()
#: the local socket address of the underlying socket
local_address: SockAddrType = typed_attribute()
#: for IP addresses, the local port the underlying socket is bound to
local_port: int = typed_attribute()
#: the underlying stdlib socket object
raw_socket: socket.socket = typed_attribute()
#: the remote address the underlying socket is connected to
remote_address: SockAddrType = typed_attribute()
#: for IP addresses, the remote port the underlying socket is connected to
remote_port: int = typed_attribute()
class _SocketProvider(TypedAttributeProvider):
@property
def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
from .._core._sockets import convert_ipv6_sockaddr as convert
attributes: dict[Any, Callable[[], Any]] = {
SocketAttribute.family: lambda: self._raw_socket.family,
SocketAttribute.local_address: lambda: convert(
self._raw_socket.getsockname()
),
SocketAttribute.raw_socket: lambda: self._raw_socket,
}
try:
peername: tuple[str, int] | None = convert(self._raw_socket.getpeername())
except OSError:
peername = None
# Provide the remote address for connected sockets
if peername is not None:
attributes[SocketAttribute.remote_address] = lambda: peername
# Provide local and remote ports for IP based sockets
if self._raw_socket.family in (AddressFamily.AF_INET, AddressFamily.AF_INET6):
attributes[SocketAttribute.local_port] = (
lambda: self._raw_socket.getsockname()[1]
)
if peername is not None:
remote_port = peername[1]
attributes[SocketAttribute.remote_port] = lambda: remote_port
return attributes
@property
@abstractmethod
def _raw_socket(self) -> socket.socket:
pass
class SocketStream(ByteStream, _SocketProvider):
"""
Transports bytes over a socket.
Supports all relevant extra attributes from :class:`~SocketAttribute`.
"""
class UNIXSocketStream(SocketStream):
@abstractmethod
async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
"""
Send file descriptors along with a message to the peer.
:param message: a non-empty bytestring
:param fds: a collection of files (either numeric file descriptors or open file
or socket objects)
"""
@abstractmethod
async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
"""
Receive file descriptors along with a message from the peer.
:param msglen: length of the message to expect from the peer
:param maxfds: maximum number of file descriptors to expect from the peer
:return: a tuple of (message, file descriptors)
"""
class SocketListener(Listener[SocketStream], _SocketProvider):
"""
Listens to incoming socket connections.
Supports all relevant extra attributes from :class:`~SocketAttribute`.
"""
@abstractmethod
async def accept(self) -> SocketStream:
"""Accept an incoming connection."""
async def serve(
self,
handler: Callable[[SocketStream], Any],
task_group: TaskGroup | None = None,
) -> None:
from .. import create_task_group
async with AsyncExitStack() as stack:
if task_group is None:
task_group = await stack.enter_async_context(create_task_group())
while True:
stream = await self.accept()
task_group.start_soon(handler, stream)
class UDPSocket(UnreliableObjectStream[UDPPacketType], _SocketProvider):
"""
Represents an unconnected UDP socket.
Supports all relevant extra attributes from :class:`~SocketAttribute`.
"""
async def sendto(self, data: bytes, host: str, port: int) -> None:
"""
Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, (host, port))).
"""
return await self.send((data, (host, port)))
class ConnectedUDPSocket(UnreliableObjectStream[bytes], _SocketProvider):
"""
Represents an connected UDP socket.
Supports all relevant extra attributes from :class:`~SocketAttribute`.
"""
class UNIXDatagramSocket(
UnreliableObjectStream[UNIXDatagramPacketType], _SocketProvider
):
"""
Represents an unconnected Unix datagram socket.
Supports all relevant extra attributes from :class:`~SocketAttribute`.
"""
async def sendto(self, data: bytes, path: str) -> None:
"""Alias for :meth:`~.UnreliableObjectSendStream.send` ((data, path))."""
return await self.send((data, path))
class ConnectedUNIXDatagramSocket(UnreliableObjectStream[bytes], _SocketProvider):
"""
Represents a connected Unix datagram socket.
Supports all relevant extra attributes from :class:`~SocketAttribute`.
"""
|