runtimepy.net.tcp package#
Subpackages#
- runtimepy.net.tcp.http package
- Module contents
HttpConnectionHttpConnection.envHttpConnection.expecting_responseHttpConnection.get_handler()HttpConnection.get_log_prefix()HttpConnection.handlersHttpConnection.headersHttpConnection.identityHttpConnection.init()HttpConnection.log_aliasHttpConnection.loggerHttpConnection.markdownHttpConnection.post_handler()HttpConnection.process_binary()HttpConnection.remote_addressHttpConnection.request()HttpConnection.request_json()
to_json()
- Module contents
- runtimepy.net.tcp.scpi package
- runtimepy.net.tcp.telnet package
- Submodules
- runtimepy.net.tcp.telnet.codes module
- runtimepy.net.tcp.telnet.np_05b module
Np05bConnectionNp05bConnection.async_init()Np05bConnection.chan_to_idxNp05bConnection.default_auto_restartNp05bConnection.handle_command()Np05bConnection.init()Np05bConnection.lockNp05bConnection.np05b_command()Np05bConnection.num_portsNp05bConnection.prev_cmdNp05bConnection.process_response()Np05bConnection.process_text()Np05bConnection.promptNp05bConnection.request_outlet_status()Np05bConnection.set_outlet_state()
Np05bStrings
- Module contents
Submodules#
runtimepy.net.tcp.connection module#
A module implementing a TCP connection interface.
- class runtimepy.net.tcp.connection.EchoTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection,EchoConnectionAn echo connection for TCP.
- class runtimepy.net.tcp.connection.NullTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection,NullConnectionA null TCP connection.
- class runtimepy.net.tcp.connection.TcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
Connection,TransportMixinA TCP connection interface.
- async classmethod app(stop_sig: Event, callback: Callable[[T], None] = None, serving_callback: Callable[[Any], None] = None, manager: ConnectionManager = None, **kwargs) None[source]#
Run an application that serves new connections.
- async classmethod create_connection(backoff: ExponentialBackoff = None, markdown: str = None, **kwargs) T[source]#
Create a TCP connection.
- classmethod create_pair(peer: type[V] = None, serve_kwargs: dict[str, Any] = None, connect_kwargs: dict[str, Any] = None, host: str = '127.0.0.1') AsyncIterator[tuple[V, T]][source]#
Create a connection pair.
- env: ChannelEnvironment#
- classmethod get_log_prefix(is_ssl: bool = False) str[source]#
Get a logging prefix for this instance.
- property is_ssl: bool#
Determine if this connection uses SSL.
- log_alias = 'TCP'#
- log_prefix = ''#
- logger: LoggerType#
- markdown: str#
- remote_address: _Optional[_IpHost]#
- async restart() bool[source]#
Reset necessary underlying state for this connection to ‘process’ again.
- classmethod serve(callback: Callable[[T], None] = None, **kwargs) AsyncIterator[Any][source]#
Serve incoming connections.
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#
runtimepy.net.tcp.create module#
A module for instantiating the underlying networking resources for TcpConnection.
- async runtimepy.net.tcp.create.tcp_transport_protocol(**kwargs) tuple[Transport, QueueProtocol][source]#
Create a transport and protocol pair relevant for this class’s implementation.
- async runtimepy.net.tcp.create.tcp_transport_protocol_backoff(backoff: ExponentialBackoff = None, **kwargs) tuple[Transport, QueueProtocol][source]#
Create a transport and protocol pair relevant for this class’s implementation.
- async runtimepy.net.tcp.create.try_tcp_transport_protocol(callback: Callable[[tuple[Transport, QueueProtocol]], None] = None, **kwargs) tuple[Transport, QueueProtocol] | None[source]#
Attempt to create a transport and protocol pair.
runtimepy.net.tcp.protocol module#
A module implementing a Protocol for TcpConnection.
- class runtimepy.net.tcp.protocol.QueueProtocol[source]#
Bases:
BinaryMessageQueueMixin,ProtocolA simple streaming protocol that populates a message queue.
- conn: Connection#
- logger: Logger | LoggerAdapter[Any]#
Module contents#
A module aggregating all TCP-related interfaces.
- class runtimepy.net.tcp.EchoTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection,EchoConnectionAn echo connection for TCP.
- class runtimepy.net.tcp.NullTcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
TcpConnection,NullConnectionA null TCP connection.
- class runtimepy.net.tcp.TcpConnection(transport: Transport, protocol: QueueProtocol, **kwargs)[source]#
Bases:
Connection,TransportMixinA TCP connection interface.
- async classmethod app(stop_sig: Event, callback: Callable[[T], None] = None, serving_callback: Callable[[Any], None] = None, manager: ConnectionManager = None, **kwargs) None[source]#
Run an application that serves new connections.
- async classmethod create_connection(backoff: ExponentialBackoff = None, markdown: str = None, **kwargs) T[source]#
Create a TCP connection.
- classmethod create_pair(peer: type[V] = None, serve_kwargs: dict[str, Any] = None, connect_kwargs: dict[str, Any] = None, host: str = '127.0.0.1') AsyncIterator[tuple[V, T]][source]#
Create a connection pair.
- env: ChannelEnvironment#
- classmethod get_log_prefix(is_ssl: bool = False) str[source]#
Get a logging prefix for this instance.
- property is_ssl: bool#
Determine if this connection uses SSL.
- log_alias = 'TCP'#
- log_prefix = ''#
- logger: LoggerType#
- markdown: str#
- remote_address: _Optional[_IpHost]#
- async restart() bool[source]#
Reset necessary underlying state for this connection to ‘process’ again.
- classmethod serve(callback: Callable[[T], None] = None, **kwargs) AsyncIterator[Any][source]#
Serve incoming connections.
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#