runtimepy.telemetry package#

Submodules#

runtimepy.telemetry.sample module#

A module implementing a simple telemetry sample interface.

class runtimepy.telemetry.sample.SampleTelemetry[source]#

Bases: UdpConnectionFactory[SampleTelemetryTransceiver]

A sample telemetry sending-and-receiving connection factory.

kind#

alias of SampleTelemetryTransceiver

class runtimepy.telemetry.sample.SampleTelemetryPeriodic[source]#

Bases: TaskFactory[SampleTelemetryTask]

A sample-task application factory.

kind#

alias of SampleTelemetryTask

class runtimepy.telemetry.sample.SampleTelemetryStruct(name: str, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]], markdown: str = None)[source]#

Bases: TimestampedStruct

A sample telemetry struct.

byte_order: ByteOrder = 2#
init_env() None[source]#

Initialize this sample environment.

class runtimepy.telemetry.sample.SampleTelemetryTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None, period_controls: dict[str, int | float | bool | dict[str, int | float | bool]] | str = 'period', markdown: str = None, config: dict[str, str | int | float | bool | None | dict[str, str | int | float | bool | None] | list[str | int | float | bool | None]] = None)[source]#

Bases: ArbiterTask, AsyncCommandProcessingMixin

A sample telemetry task.

async dispatch() bool[source]#

Dispatch an iteration of this task.

flush_count: Uint16Primitive#
async init(app: AppInfo) None[source]#

Initialize this task with application information.

rx_conn: SampleTelemetryTransceiver#
tx_conn: SampleTelemetryTransceiver#
class runtimepy.telemetry.sample.SampleTelemetryTransceiver(transport: DatagramTransport, protocol: UdpQueueProtocol, **kwargs)[source]#

Bases: UdpStructTransceiver[SampleTelemetryStruct]

A sample telemetry sending connection.

handle_update(timestamp_ns: int, instance: SampleTelemetryStruct, addr: tuple[str, int]) None[source]#

Handle individual struct updates.

struct_kind#

alias of SampleTelemetryStruct

Module contents#

A module implementing a telemetry interface.

class runtimepy.telemetry.MessageType(*values)[source]#

Bases: RuntimeIntEnum

An enumeration of viable message types.

PROTOCOL_DATA = 2#
PROTOCOL_META = 1#