runtimepy.net.udp package#
Subpackages#
- runtimepy.net.udp.tftp package
- Submodules
- runtimepy.net.udp.tftp.base module
BaseTftpConnectionBaseTftpConnection.default_auto_restartBaseTftpConnection.endpoint()BaseTftpConnection.init()BaseTftpConnection.log_aliasBaseTftpConnection.pathBaseTftpConnection.process_datagram()BaseTftpConnection.send_ack()BaseTftpConnection.send_data()BaseTftpConnection.send_error()BaseTftpConnection.send_rrq()BaseTftpConnection.send_wrq()BaseTftpConnection.set_root()BaseTftpConnection.should_connect
- runtimepy.net.udp.tftp.endpoint module
TftpEndpointTftpEndpoint.chunk_sender()TftpEndpoint.get_path()TftpEndpoint.handle_ack()TftpEndpoint.handle_data()TftpEndpoint.handle_error()TftpEndpoint.handle_read_request()TftpEndpoint.handle_write_request()TftpEndpoint.ingest_file()TftpEndpoint.serve_file()TftpEndpoint.set_root()TftpEndpoint.update_from_other()
- runtimepy.net.udp.tftp.enums module
- runtimepy.net.udp.tftp.io module
- Module contents
Submodules#
runtimepy.net.udp.connection module#
A module implementing a UDP connection interface.
- class runtimepy.net.udp.connection.EchoUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection,EchoConnectionAn echo connection for UDP.
- class runtimepy.net.udp.connection.NullUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection,NullConnectionA null UDP connection.
- class runtimepy.net.udp.connection.UdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
Connection,TransportMixinA UDP connection interface.
- async classmethod create_connection(markdown: str = None, **kwargs) T[source]#
Create a UDP connection.
- env: ChannelEnvironment#
- latest_rx_address: tuple[str, int] | None#
- log_alias = 'UDP'#
- logger: LoggerType#
- markdown: str#
- abstractmethod async process_datagram(data: bytes | bytearray | memoryview, addr: tuple[str, int]) bool[source]#
Process a datagram.
- remote_address: _Optional[_IpHost]#
- async restart() bool[source]#
Reset necessary underlying state for this connection to ‘process’ again.
- sendto(data: bytes | bytearray | memoryview, addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None[source]#
Send to a specific address.
- set_remote_address(addr: IPv4Host | IPv6Host) None[source]#
Set a new remote address. Note that this doesn’t interact with or attempt to ‘connect’ the underlying socket. That should be done at creation time.
- should_connect: bool = True#
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#
runtimepy.net.udp.create module#
A module for instantiating the underlying networking resources for UdpConnection.
- async runtimepy.net.udp.create.try_udp_transport_protocol(callback: Callable[[tuple[DatagramTransport, UdpQueueProtocol]], None] = None, **kwargs) tuple[DatagramTransport, UdpQueueProtocol] | None[source]#
Attempt to create a transport and protocol pair.
- async runtimepy.net.udp.create.udp_transport_protocol(**kwargs) tuple[DatagramTransport, UdpQueueProtocol][source]#
Create a transport and protocol pair relevant for this class’s implementation.
- async runtimepy.net.udp.create.udp_transport_protocol_backoff(backoff: ExponentialBackoff = None, **kwargs) tuple[DatagramTransport, UdpQueueProtocol][source]#
Create a transport and protocol pair relevant for this class’s implementation.
runtimepy.net.udp.protocol module#
A module implementing a DatagramProtocol for UdpConnection.
- class runtimepy.net.udp.protocol.UdpQueueProtocol[source]#
Bases:
DatagramProtocolA simple UDP protocol that populates a message queue.
- conn: Connection#
- logger: Logger | LoggerAdapter[Any]#
runtimepy.net.udp.queue module#
A module implementing a simple queue-based UDP interface.
- class runtimepy.net.udp.queue.QueueUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnectionAn echo connection for UDP.
- datagrams: Queue[tuple[bytes | bytearray | memoryview, tuple[str, int]]]#
Module contents#
A module implementing networking utilities relevant to UDP.
- class runtimepy.net.udp.EchoUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection,EchoConnectionAn echo connection for UDP.
- class runtimepy.net.udp.NullUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnection,NullConnectionA null UDP connection.
- class runtimepy.net.udp.QueueUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
UdpConnectionAn echo connection for UDP.
- datagrams: Queue[tuple[bytes | bytearray | memoryview, tuple[str, int]]]#
- env: ChannelEnvironment#
- latest_rx_address: _Optional[tuple[str, int]]#
- logger: LoggerType#
- markdown: str#
- async process_datagram(data: bytes | bytearray | memoryview, addr: tuple[str, int]) bool[source]#
Process a datagram.
- remote_address: _Optional[_IpHost]#
- class runtimepy.net.udp.UdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#
Bases:
Connection,TransportMixinA UDP connection interface.
- async classmethod create_connection(markdown: str = None, **kwargs) T[source]#
Create a UDP connection.
- env: ChannelEnvironment#
- latest_rx_address: tuple[str, int] | None#
- log_alias = 'UDP'#
- logger: LoggerType#
- markdown: str#
- abstractmethod async process_datagram(data: bytes | bytearray | memoryview, addr: tuple[str, int]) bool[source]#
Process a datagram.
- remote_address: _Optional[_IpHost]#
- async restart() bool[source]#
Reset necessary underlying state for this connection to ‘process’ again.
- sendto(data: bytes | bytearray | memoryview, addr: IPv4Host | IPv6Host | tuple[str, int] | tuple[str, int, int, int] = None) None[source]#
Send to a specific address.
- set_remote_address(addr: IPv4Host | IPv6Host) None[source]#
Set a new remote address. Note that this doesn’t interact with or attempt to ‘connect’ the underlying socket. That should be done at creation time.
- should_connect: bool = True#
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#