import asyncio
from .version import __version__
class ClosedError(Exception):
pass
class _SocketProtocol:
def __init__(self, packets_queue_max_size):
self._error = None
self._packets = asyncio.Queue(packets_queue_max_size)
def connection_made(self, transport):
pass
def connection_lost(self, transport):
self._packets.put_nowait(None)
def datagram_received(self, data, addr):
self._packets.put_nowait((data, addr))
def error_received(self, exc):
self._error = exc
self._packets.put_nowait(None)
async def recvfrom(self):
return await self._packets.get()
def raise_if_error(self):
if self._error is None:
return
error = self._error
self._error = None
raise error
[docs]class Socket:
"""A UDP socket. Use :func:`~asyncudp.create_socket()` to create an
instance of this class.
"""
def __init__(self, transport, protocol):
self._transport = transport
self._protocol = protocol
[docs] def close(self):
"""Close the socket.
"""
self._transport.close()
[docs] def sendto(self, data, addr=None):
"""Send given packet to given address ``addr``. Sends to
``remote_addr`` given to the constructor if ``addr`` is
``None``.
Raises an error if a connection error has occurred.
>>> sock.sendto(b'Hi!')
"""
self._transport.sendto(data, addr)
self._protocol.raise_if_error()
[docs] async def recvfrom(self):
"""Receive a UDP packet.
Raises ClosedError on connection error, often by calling the
close() method from another task. May raise other errors as
well.
>>> data, addr = sock.recvfrom()
"""
packet = await self._protocol.recvfrom()
self._protocol.raise_if_error()
if packet is None:
raise ClosedError()
return packet
[docs] def getsockname(self):
"""Get bound infomation.
>>> local_address, local_port = sock.getsockname()
"""
return self._transport.get_extra_info('sockname')
async def __aenter__(self):
return self
async def __aexit__(self, *exc_info):
self.close()
[docs]async def create_socket(local_addr=None,
remote_addr=None,
packets_queue_max_size=0,
reuse_port=None):
"""Create a UDP socket with given local and remote addresses.
>>> sock = await asyncudp.create_socket(local_addr=('127.0.0.1', 9999))
"""
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
lambda: _SocketProtocol(packets_queue_max_size),
local_addr=local_addr,
remote_addr=remote_addr,
reuse_port=reuse_port)
return Socket(transport, protocol)