import asyncio cdef class Server: def __cinit__(self, Loop loop): self._loop = loop self._servers = [] self._waiters = [] self._active_count = 0 self._serving_forever_fut = None cdef _add_server(self, UVStreamServer srv): self._servers.append(srv) cdef _start_serving(self): if self._serving: return self._serving = 1 for server in self._servers: (server).listen() cdef _wakeup(self): cdef list waiters waiters = self._waiters self._waiters = None for waiter in waiters: if not waiter.done(): waiter.set_result(waiter) cdef _attach(self): assert self._servers is not None self._active_count += 1 cdef _detach(self): assert self._active_count > 0 self._active_count -= 1 if self._active_count == 0 and self._servers is None: self._wakeup() cdef _ref(self): # Keep the server object alive while it's not explicitly closed. self._loop._servers.add(self) cdef _unref(self): self._loop._servers.discard(self) # Public API @cython.iterable_coroutine async def __aenter__(self): return self @cython.iterable_coroutine async def __aexit__(self, *exc): self.close() await self.wait_closed() def __repr__(self): return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets) def get_loop(self): return self._loop @cython.iterable_coroutine async def wait_closed(self): # Do not remove `self._servers is None` below # because close() method only closes server sockets # and existing client connections are left open. if self._servers is None or self._waiters is None: return waiter = self._loop._new_future() self._waiters.append(waiter) await waiter def close(self): cdef list servers if self._servers is None: return try: servers = self._servers self._servers = None self._serving = 0 for server in servers: (server)._close() if self._active_count == 0: self._wakeup() finally: self._unref() def is_serving(self): return self._serving @cython.iterable_coroutine async def start_serving(self): self._start_serving() @cython.iterable_coroutine async def serve_forever(self): if self._serving_forever_fut is not None: raise RuntimeError( f'server {self!r} is already being awaited on serve_forever()') if self._servers is None: raise RuntimeError(f'server {self!r} is closed') self._start_serving() self._serving_forever_fut = self._loop.create_future() try: await self._serving_forever_fut except asyncio.CancelledError: try: self.close() await self.wait_closed() finally: raise finally: self._serving_forever_fut = None property sockets: def __get__(self): cdef list sockets = [] # Guard against `self._servers is None` if self._servers: for server in self._servers: sockets.append( (server)._get_socket() ) return sockets