Skip to content

πŸ”΄ unaiverse.networking.p2p.messages

What this module does πŸ”΄

Defines the Msg wrapper that builds, serializes and parses protobuf-backed P2P messages including JSON, stream-sample and stats-update payloads.

messages

β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆ β–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘ β–‘β–ˆβ–ˆβ–ˆ β–ˆ β–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ
β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆ
β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘ β–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘ β–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

Msg

Msg(sender: str | None = None, content: any = None, timestamp_net: str | None = None, channel: str | None = None, content_type: str = MISC, piggyback: str | None = None, _proto_msg: Message = None)

A structured network message exchanged between peers in the UNaIVERSE network.

Msg is the fundamental unit of communication between agents, worlds, and any other nodes in the system. It wraps a Protobuf Message (defined in message_pb2.py) and exposes a clean Python API for constructing, reading, and serializing messages without requiring callers to interact with Protobuf directly.

Each message carries:

  • a sender peer ID,
  • a content_type that identifies the message's purpose (one of CONTENT_TYPES),
  • a channel string identifying the logical communication channel,
  • an optional piggyback field for lightweight out-of-band data,
  • a timestamp_net recording when the message was created,
  • a content payload whose encoding depends on content_type: STREAM_SAMPLE messages pack tensors, images, text, or file data into a typed Protobuf sub-message; STATS_UPDATE messages use a dedicated StatBatch sub-message; all other types serialize their payload as a JSON string.

Instances are typically created by passing keyword arguments to the constructor. Received byte streams are parsed with from_bytes. The content property decodes the payload on first access and caches the result.

Note

The class-level constants (PROFILE, WORLD_APPROVAL, etc.) are the canonical string values for content_type. Always use these constants rather than raw strings to avoid typos and to benefit from IDE completion.

Initialize a new Msg from field values, or wrap an existing Protobuf message.

Two usage modes are supported:

  • New message: supply sender and any combination of the other keyword arguments. The constructor validates the inputs, builds the internal Protobuf object, and routes content to the appropriate encoder depending on content_type. If timestamp_net is None, the current UTC time is used. If channel is None, the channel is set to "<unknown>".

  • Parsed message: supply only _proto_msg with the Protobuf object already populated (typically by from_bytes). All other arguments must be None or the default MISC value; mixing them raises ValueError.

When content is None or the sentinel "<empty>", no payload field is set in the Protobuf oneof and Msg.content will return "<empty>". For STREAM_SAMPLE messages the payload is encoded via _build_stream_sample_content; for STATS_UPDATE messages via _build_stats_update_content; all other types use the generic JSON encoder _build_json_content.

Parameters:

Name Type Description Default
sender str | None

The peer ID string of the message originator. Must be provided and must be a str when creating a new message (not when wrapping a _proto_msg).

None
content any

The payload to encode. The expected type depends on content_type: a dict of stream samples for STREAM_SAMPLE, a list of stat dicts for STATS_UPDATE, or any JSON-serialisable object for all other types. Defaults to None (empty payload).

None
timestamp_net str | None

An ISO-like UTC timestamp string for the message. If None, the current UTC time is used. Defaults to None.

None
channel str | None

The logical channel identifier for routing. If None, defaults to "<unknown>". Defaults to None.

None
content_type str

One of the string constants defined in CONTENT_TYPES (e.g. Msg.PROFILE, Msg.STREAM_SAMPLE). Defaults to Msg.MISC.

MISC
piggyback str | None

An optional string carrying lightweight out-of-band data alongside the main payload. Defaults to None (stored as empty string).

None
_proto_msg Message

A pre-built pb.Message Protobuf object. When provided, all other arguments must be absent (None / default); the Protobuf object is stored as-is and no further encoding is performed. Defaults to None.

None

Raises:

Type Description
ValueError

If _proto_msg is provided together with any of sender, content, timestamp_net, channel, or piggyback.

AssertionError

If sender is None or not a str, or if content_type is not in CONTENT_TYPES, when creating a new message.

Source code in unaiverse/networking/p2p/messages.py
def __init__(self,
             sender: str | None = None,
             content: any = None,
             timestamp_net: str | None = None,
             channel: str | None = None,
             content_type: str = MISC,
             piggyback: str | None = None,
             _proto_msg: pb.Message = None):
    """Initialize a new ``Msg`` from field values, or wrap an existing Protobuf message.

    Two usage modes are supported:

    - **New message**: supply ``sender`` and any combination of the other keyword
      arguments. The constructor validates the inputs, builds the internal Protobuf
      object, and routes ``content`` to the appropriate encoder depending on
      ``content_type``. If ``timestamp_net`` is ``None``, the current UTC time is used.
      If ``channel`` is ``None``, the channel is set to ``"<unknown>"``.

    - **Parsed message**: supply only ``_proto_msg`` with the Protobuf object already
      populated (typically by ``from_bytes``). All other arguments must be ``None``
      or the default ``MISC`` value; mixing them raises ``ValueError``.

    When ``content`` is ``None`` or the sentinel ``"<empty>"``, no payload field is
    set in the Protobuf ``oneof`` and ``Msg.content`` will return ``"<empty>"``. For
    ``STREAM_SAMPLE`` messages the payload is encoded via ``_build_stream_sample_content``;
    for ``STATS_UPDATE`` messages via ``_build_stats_update_content``; all other types
    use the generic JSON encoder ``_build_json_content``.

    Args:
        sender: The peer ID string of the message originator. Must be provided and
            must be a ``str`` when creating a new message (not when wrapping a
            ``_proto_msg``).
        content: The payload to encode. The expected type depends on ``content_type``:
            a ``dict`` of stream samples for ``STREAM_SAMPLE``, a ``list`` of stat
            dicts for ``STATS_UPDATE``, or any JSON-serialisable object for all other
            types. Defaults to None (empty payload).
        timestamp_net: An ISO-like UTC timestamp string for the message. If None, the
            current UTC time is used. Defaults to None.
        channel: The logical channel identifier for routing. If None, defaults to
            ``"<unknown>"``. Defaults to None.
        content_type: One of the string constants defined in ``CONTENT_TYPES``
            (e.g. ``Msg.PROFILE``, ``Msg.STREAM_SAMPLE``). Defaults to ``Msg.MISC``.
        piggyback: An optional string carrying lightweight out-of-band data alongside
            the main payload. Defaults to None (stored as empty string).
        _proto_msg: A pre-built ``pb.Message`` Protobuf object. When provided, all
            other arguments must be absent (None / default); the Protobuf object is
            stored as-is and no further encoding is performed. Defaults to None.

    Raises:
        ValueError: If ``_proto_msg`` is provided together with any of ``sender``,
            ``content``, ``timestamp_net``, ``channel``, or ``piggyback``.
        AssertionError: If ``sender`` is ``None`` or not a ``str``, or if
            ``content_type`` is not in ``CONTENT_TYPES``, when creating a new message.
    """

    self._decoded_content: any = None  # Cache for decompressed content

    if _proto_msg is not None:

        # Check if any other arguments were simultaneously provided
        other_args = [sender, content, timestamp_net, channel, piggyback]
        if any(arg is not None for arg in other_args):
            raise ValueError("Cannot specify other arguments when creating a Msg from a _proto_msg.")

        # This path is used by from_bytes, message is already built
        self._proto_msg = _proto_msg
        return

    # Sanity checks
    assert sender is not None, "Sender must be specified for a new message."
    assert isinstance(sender, str), "Sender must be a string"
    assert timestamp_net is None or isinstance(timestamp_net, str), "Invalid timestamp_net"
    assert channel is None or isinstance(channel, str), "Invalid channel"
    assert content_type in Msg.CONTENT_TYPES, "Invalid content type"

    # --- SMART CONSTRUCTOR: Populates the correct 'oneof' field ---
    self._proto_msg = pb.Message()
    self._proto_msg.sender = sender if sender is not None else ""
    self._proto_msg.timestamp_net = timestamp_net if timestamp_net is not None else \
        datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")
    self._proto_msg.content_type = content_type if content_type is not None else self.MISC
    self._proto_msg.channel = channel if channel is not None else "<unknown>"
    self._proto_msg.piggyback = piggyback if piggyback is not None else ""

    if content is None or content == "<empty>":
        return  # Nothing to set in the 'oneof'

    # Route the content to the correct builder
    if content_type == Msg.STREAM_SAMPLE:
        self._build_stream_sample_content(content)
    elif content_type == Msg.STATS_UPDATE:
        self._build_stats_update_content(content)
    else:
        # All other structured types use the generic json_content field
        self._build_json_content(content)

PROFILE class-attribute instance-attribute

PROFILE = 'profile'

WORLD_APPROVAL class-attribute instance-attribute

WORLD_APPROVAL = 'world_approval'

AGENT_APPROVAL class-attribute instance-attribute

AGENT_APPROVAL = 'agent_approval'

PROFILE_REQUEST class-attribute instance-attribute

PROFILE_REQUEST = 'profile_request'

ADDRESS_UPDATE class-attribute instance-attribute

ADDRESS_UPDATE = 'address_update'

STREAM_SAMPLE class-attribute instance-attribute

STREAM_SAMPLE = 'stream_sample'

ROLE_SUGGESTION class-attribute instance-attribute

ROLE_SUGGESTION = 'role_suggestion'

INTERACTION class-attribute instance-attribute

INTERACTION = 'interaction'

INTERACTION_STATUS class-attribute instance-attribute

INTERACTION_STATUS = 'interaction_status'

HSM class-attribute instance-attribute

HSM = 'hsm'

MISC class-attribute instance-attribute

MISC = 'misc'

GET_CV_FROM_ROOT class-attribute instance-attribute

GET_CV_FROM_ROOT = 'get_cv_from_root'

BADGE_SUGGESTIONS class-attribute instance-attribute

BADGE_SUGGESTIONS = 'badge_suggestions'

INSPECT_ON class-attribute instance-attribute

INSPECT_ON = 'inspect_on'

INSPECT_CMD class-attribute instance-attribute

INSPECT_CMD = 'inspect_cmd'

WORLD_AGENTS_LIST class-attribute instance-attribute

WORLD_AGENTS_LIST = 'world_agents_list'

CONSOLE_AND_BEHAV_STATUS class-attribute instance-attribute

CONSOLE_AND_BEHAV_STATUS = 'console_and_behav_status'

STATS_UPDATE class-attribute instance-attribute

STATS_UPDATE = 'stats_update'

STATS_REQUEST class-attribute instance-attribute

STATS_REQUEST = 'stats_request'

STATS_RESPONSE class-attribute instance-attribute

STATS_RESPONSE = 'stats_response'

sender property writable

sender

Return the peer ID of the message originator.

Reads directly from the underlying Protobuf field. The value is always a non-None string; an empty string indicates the sender was not set.

Returns:

Type Description

The sender peer ID string stored in the Protobuf message.

content_type property writable

content_type

Return the content type identifier of the message.

The value is one of the string constants defined on Msg (for example Msg.PROFILE, Msg.STREAM_SAMPLE, Msg.STATS_UPDATE). It controls how the content property decodes the payload and how receiving code dispatches the message.

Returns:

Type Description

The content type string stored in the Protobuf message.

channel property writable

channel

Return the logical channel identifier of the message.

The channel string is used by routing code to identify which logical communication channel a message belongs to. A value of "<unknown>" indicates no channel was specified at construction time.

Returns:

Type Description

The channel string stored in the Protobuf message.

piggyback property writable

piggyback

Return the piggyback field of the message.

The piggyback field carries a small auxiliary string alongside the main payload. It is intended for lightweight out-of-band data that does not warrant a separate message. An empty string indicates no piggyback data was attached.

Returns:

Type Description

The piggyback string stored in the Protobuf message.

timestamp_net property writable

timestamp_net

Return the network timestamp of the message.

The timestamp records when the message was created, formatted as a UTC string ("%Y-%m-%d %H:%M:%S.%f"). If not set explicitly at construction time, it is populated automatically with the current UTC time.

Returns:

Type Description

The timestamp string stored in the Protobuf message.

content property

content: any

Return the decoded payload of the message, with result caching.

On the first access the Protobuf oneof field is inspected to determine the payload kind, and the appropriate parser is called:

  • "stream_sample" -> _parse_stream_sample_content, returns a dict mapping stream names to sample dicts (each with "data", "data_tag", and "data_uuid" keys).
  • "stats_update" -> _parse_stats_update_content, returns a list of stat dicts (each with "group_key", "stat_name", "timestamp", and "value" keys).
  • "json_content" -> _parse_json_content, returns the JSON-decoded object (typically a dict).
  • No payload set -> returns the sentinel string "<empty>".

The decoded result is cached in _decoded_content so subsequent accesses are free. The cache is never invalidated; if the Protobuf message is mutated after the first access, the cached value will be stale.

Returns:

Type Description
any

The decoded payload. The type depends on content_type: a dict for

any

most message types, a list of dicts for STATS_UPDATE, or the string

any

"<empty>" when no payload is present.

to_bytes

to_bytes() -> bytes

Serialize the internal Protobuf message to a byte string.

Delegates directly to SerializeToString on the underlying pb.Message object. The resulting bytes can be transmitted over the network and later reconstructed with from_bytes.

Returns:

Type Description
bytes

The binary representation of the Protobuf message as a bytes object.

Source code in unaiverse/networking/p2p/messages.py
def to_bytes(self) -> bytes:
    """Serialize the internal Protobuf message to a byte string.

    Delegates directly to ``SerializeToString`` on the underlying ``pb.Message``
    object. The resulting bytes can be transmitted over the network and later
    reconstructed with ``from_bytes``.

    Returns:
        The binary representation of the Protobuf message as a ``bytes`` object.
    """
    return self._proto_msg.SerializeToString()

from_bytes classmethod

from_bytes(msg_bytes: bytes) -> Msg

Deserialize a byte string into a new Msg instance.

Parses msg_bytes as a Protobuf Message and wraps it in a new Msg without performing any additional decoding. The content property will decode the payload lazily on first access. This is the inverse of to_bytes.

Parameters:

Name Type Description Default
msg_bytes bytes

The raw bytes produced by a previous call to to_bytes.

required

Returns:

Type Description
Msg

A new Msg instance wrapping the parsed Protobuf message.

Raises:

Type Description
DecodeError

If msg_bytes is not a valid serialized pb.Message.

Source code in unaiverse/networking/p2p/messages.py
@classmethod
def from_bytes(cls, msg_bytes: bytes) -> 'Msg':
    """Deserialize a byte string into a new ``Msg`` instance.

    Parses ``msg_bytes`` as a Protobuf ``Message`` and wraps it in a new ``Msg``
    without performing any additional decoding. The ``content`` property will decode
    the payload lazily on first access. This is the inverse of ``to_bytes``.

    Args:
        msg_bytes: The raw bytes produced by a previous call to ``to_bytes``.

    Returns:
        A new ``Msg`` instance wrapping the parsed Protobuf message.

    Raises:
        google.protobuf.message.DecodeError: If ``msg_bytes`` is not a valid
            serialized ``pb.Message``.
    """
    pb_msg = pb.Message()
    pb_msg.ParseFromString(msg_bytes)

    # Pass the parsed protobuf message to the constructor
    return cls(_proto_msg=pb_msg)