vcorelib.io.bus package#

Module contents#

A module implementing a message bus interface.

class vcorelib.io.bus.AsyncMessageBus[source]#

Bases: LoggerMixin

A class implementing a runtime message bus interface.

register(key: str, ident: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[None]]) None[source]#

Register a bus message handler.

register_ro(key: str, handler: Callable[[dict[str, Any]], Awaitable[None]]) None[source]#

Register a bus message handler.

async send(key: str, payload: dict[str, Any], send_ro: bool = True) dict[str, dict[str, Any]][source]#

Send a message and gather responses.

async send_ro(key: str, payload: dict[str, Any]) int[source]#

Send a message to read-only handlers, returns the number of handlers called.