Skip to content

unaiverse.world

What this module does 🔴

Defines World, a special AgentBasics subclass representing a world node with no processor or behaviour that coordinates peers, per-role behaviour, and world-level statistics.

world

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

World

World(world_folder: str, merge_flat_stream_labels: bool = False, stats: Stats | None = None)

Bases: AgentBasics

A world node: a special agent that hosts and orchestrates other agents.

A world is a peer in the UNaIVERSE network that other agents join. Unlike a regular agent, a world has no processor and no behaviour of its own: it does not produce data and does not run a state machine. Instead, it coordinates the agents connected to it by assigning them roles, broadcasting role changes, keeping track of world membership (masters, agents, humans, artificial agents), awarding badges, and aggregating the statistics that every peer reports into a live connectivity graph.

World is the base class that world designers subclass to define custom behaviour. The typical pattern is to place an agent.py file inside the world folder that defines a World subclass overriding hooks such as assign_role (to decide which role each joining agent receives) and add_peer_stats (to react to incoming statistics).

Because a world is itself an AgentBasics instance, it inherits the full membership bookkeeping (all_agents, world_masters, world_agents, human_agents, artificial_agents) and the role constants (ROLE_WORLD_MASTER, ROLE_WORLD_AGENT, and the ROLE_BITS_TO_STR lookup) used throughout this class.

Attributes:

Name Type Description
private_peer_of

Mapping from a peer's public peer ID to its private peer ID, kept in sync as agents join and leave the world.

agent_badges dict[str, list[dict]]

Mapping from a peer ID to the list of badge dictionaries awarded to that agent (see add_badge).

stats

The Stats recorder used to store the world's own metrics and the metrics reported by connected peers.

Examples:

A minimal custom world, defined in the world folder's agent.py:

>>> from unaiverse.world import World
>>> from unaiverse.networking.node.profile import NodeProfile
>>>
>>> class ChatRoom(World):
...     def assign_role(self, profile: NodeProfile, is_world_master: bool) -> str:
...         # The first master keeps the master role; everyone else is a plain agent.
...         role = (World.ROLE_WORLD_MASTER if is_world_master and len(self.world_masters) <= 1
...                 else World.ROLE_WORLD_AGENT)
...         return World.ROLE_BITS_TO_STR[role]
>>>
>>> world = ChatRoom(world_folder="./my_chat_room")

Initialize a world as a special agent with no processor and no behaviour.

The parent AgentBasics constructor is invoked with all processor-related arguments set to None so that no processor is attached. Any dummy processor allocated during base initialization is then cleared, and the processor-related attributes are reset to empty (not None) so the rest of the framework can iterate over them safely. If no stats recorder is supplied, a default one is created under <world_folder>/stats/world_stats.db.

The world_folder is the single source of truth for a world: it holds the per-role behaviour JSON files (loaded into role_to_behav and used by set_role when broadcasting role changes) and the agent.py that defines this World subclass.

Parameters:

Name Type Description Default
world_folder str

Path to the world folder. It contains the per-role behaviour JSON files and the agent.py that defines this World subclass.

required
merge_flat_stream_labels bool

If True, flat stream labels coming from different peers are merged into a single label space. Defaults to False.

False
stats Stats | None

An existing Stats instance to record world metrics into. If None, a default world-scoped Stats instance is created inside the world folder. Defaults to None.

None

Examples:

>>> world = World(world_folder="./my_world")
>>> # Reuse a pre-configured stats recorder instead of the default one:
>>> from unaiverse.stats import Stats
>>> shared = Stats(is_world=True, db_path="./my_world/stats/world_stats.db")
>>> world = World(world_folder="./my_world", stats=shared)
Source code in unaiverse/world.py
def __init__(self, world_folder: str, merge_flat_stream_labels: bool = False,
             stats: Stats | None = None) -> None:
    """Initialize a world as a special agent with no processor and no behaviour.

    The parent ``AgentBasics`` constructor is invoked with all processor-related
    arguments set to ``None`` so that no processor is attached. Any dummy processor
    allocated during base initialization is then cleared, and the processor-related
    attributes are reset to empty (not ``None``) so the rest of the framework can
    iterate over them safely. If no ``stats`` recorder is supplied, a default one is
    created under ``<world_folder>/stats/world_stats.db``.

    The ``world_folder`` is the single source of truth for a world: it holds the
    per-role behaviour JSON files (loaded into ``role_to_behav`` and used by
    ``set_role`` when broadcasting role changes) and the ``agent.py`` that defines
    this ``World`` subclass.

    Args:
        world_folder: Path to the world folder. It contains the per-role behaviour
            JSON files and the ``agent.py`` that defines this ``World`` subclass.
        merge_flat_stream_labels: If True, flat stream labels coming from different
            peers are merged into a single label space. Defaults to False.
        stats: An existing ``Stats`` instance to record world metrics into. If None,
            a default world-scoped ``Stats`` instance is created inside the world
            folder. Defaults to None.

    Examples:
        >>> world = World(world_folder="./my_world")
        >>> # Reuse a pre-configured stats recorder instead of the default one:
        >>> from unaiverse.stats import Stats
        >>> shared = Stats(is_world=True, db_path="./my_world/stats/world_stats.db")
        >>> world = World(world_folder="./my_world", stats=shared)
    """

    # Creating a "special" agent with no processor and no behavior, but with a "world_folder", which is our world
    super().__init__(proc=None, proc_inputs=None, proc_outputs=None, proc_opts=None, behav=None,
                     world_folder=world_folder, merge_flat_stream_labels=merge_flat_stream_labels)

    # Clearing processor (world must have no processor, and, maybe, a dummy processor was allocated when building
    # the agent in the init call above)
    self.proc = None
    self.proc_inputs = []  # Do not set it to None
    self.proc_outputs = []  # Do not set it to None
    self.proc_opts = {}
    self.proc_optional_inputs = []
    self.compat_in_streams = None
    self.compat_out_streams = None

    # Map from public peer IDs to private peer IDs
    self.private_peer_of = {}

    # Stats
    if stats is not None:
        self.stats = stats
    else:
        # fallback to default Stats class
        self.stats = Stats(is_world=True, db_path=f"{self.world_folder}/stats/world_stats.db",
                           cache_window_hours=2.0)

proc instance-attribute

proc = None

proc_inputs instance-attribute

proc_inputs = []

proc_outputs instance-attribute

proc_outputs = []

proc_opts instance-attribute

proc_opts = {}

proc_optional_inputs instance-attribute

proc_optional_inputs = []

compat_in_streams instance-attribute

compat_in_streams = None

compat_out_streams instance-attribute

compat_out_streams = None

private_peer_of instance-attribute

private_peer_of = {}

stats instance-attribute

stats = stats

assign_role

assign_role(profile: NodeProfile, is_world_master: bool) -> str

Assign an initial role to a newly connected agent.

This is the main extension point for world designers: override it to implement custom admission logic (for example, a role based on the agent's profile, its CV, or its declared capabilities). In this basic implementation the role depends only on whether the agent is requesting to be a master, and the master role is granted to at most one agent: any further master request is downgraded to a plain world agent.

The returned role is the initial role assigned at join time. It can be changed later at runtime with set_role, which preserves the base role bits and broadcasts the change back to the agent.

Parameters:

Name Type Description Default
profile NodeProfile

The NodeProfile of the new agent, exposing its static and dynamic profile information.

required
is_world_master bool

True if the new agent is attempting to join as a master.

required

Returns:

Type Description
str

The assigned role as a human-readable string, taken from

str

ROLE_BITS_TO_STR (for example "world_master" or "world_agent").

Raises:

Type Description
AssertionError

If called on a node that is not a world.

Examples:

>>> # Inside a World subclass, accept the first master and downgrade the rest:
>>> def assign_role(self, profile, is_world_master):
...     if is_world_master and len(self.world_masters) <= 1:
...         return World.ROLE_BITS_TO_STR[World.ROLE_WORLD_MASTER]
...     return World.ROLE_BITS_TO_STR[World.ROLE_WORLD_AGENT]
Source code in unaiverse/world.py
def assign_role(self, profile: NodeProfile, is_world_master: bool) -> str:
    """Assign an initial role to a newly connected agent.

    This is the main extension point for world designers: override it to implement
    custom admission logic (for example, a role based on the agent's profile, its CV,
    or its declared capabilities). In this basic implementation the role depends only
    on whether the agent is requesting to be a master, and the master role is granted
    to at most one agent: any further master request is downgraded to a plain world
    agent.

    The returned role is the *initial* role assigned at join time. It can be changed
    later at runtime with ``set_role``, which preserves the base role bits and
    broadcasts the change back to the agent.

    Args:
        profile: The ``NodeProfile`` of the new agent, exposing its static and
            dynamic profile information.
        is_world_master: True if the new agent is attempting to join as a master.

    Returns:
        The assigned role as a human-readable string, taken from
        ``ROLE_BITS_TO_STR`` (for example ``"world_master"`` or ``"world_agent"``).

    Raises:
        AssertionError: If called on a node that is not a world.

    Examples:
        >>> # Inside a World subclass, accept the first master and downgrade the rest:
        >>> def assign_role(self, profile, is_world_master):
        ...     if is_world_master and len(self.world_masters) <= 1:
        ...         return World.ROLE_BITS_TO_STR[World.ROLE_WORLD_MASTER]
        ...     return World.ROLE_BITS_TO_STR[World.ROLE_WORLD_AGENT]
    """
    assert self.is_world, "Assigning a role is expected to be done by the world"

    if ('guessed_location' in profile.get_dynamic_profile() and
            profile.get_dynamic_profile()['guessed_location'] == 'Some Dummy Location, Just An Example Here'):
        pass

    # Currently, roles are only world masters and world agents
    if is_world_master:
        if len(self.world_masters) <= 1:
            return AgentBasics.ROLE_BITS_TO_STR[AgentBasics.ROLE_WORLD_MASTER]
        else:
            return AgentBasics.ROLE_BITS_TO_STR[AgentBasics.ROLE_WORLD_AGENT]
    else:
        return AgentBasics.ROLE_BITS_TO_STR[AgentBasics.ROLE_WORLD_AGENT]

set_role async

set_role(peer_id: str, role: int) -> None

Set a new role for a connected agent and broadcast the change to it (async).

The two least significant bits of the agent's current role (its base role bits) are preserved, while the higher bits are replaced by role. If the resulting role is identical to the current one, nothing happens. Otherwise the new role is stored locally and a ROLE_SUGGESTION message is sent to the agent carrying both the new role and the default behaviour associated with that role (looked up in role_to_behav). If the message cannot be delivered, the agent is purged (disconnected) from the world.

This is the runtime counterpart of assign_role: assign_role decides the role at join time, while set_role changes it afterwards and notifies the agent.

Parameters:

Name Type Description Default
peer_id str

The peer ID of the agent whose role is to be changed.

required
role int

The new role to assign, encoded as an integer bitmask.

required

Raises:

Type Description
AssertionError

If called on a node that is not a world.

Note

This coroutine has side effects beyond the role update: on a successful broadcast it sets role_changed_by_world to True, and on a failed broadcast it disconnects the target agent via the node purge callback.

Source code in unaiverse/world.py
async def set_role(self, peer_id: str, role: int) -> None:
    """Set a new role for a connected agent and broadcast the change to it (async).

    The two least significant bits of the agent's current role (its base role bits)
    are preserved, while the higher bits are replaced by ``role``. If the resulting
    role is identical to the current one, nothing happens. Otherwise the new role is
    stored locally and a ``ROLE_SUGGESTION`` message is sent to the agent carrying
    both the new role and the default behaviour associated with that role (looked up
    in ``role_to_behav``). If the message cannot be delivered, the agent is purged
    (disconnected) from the world.

    This is the runtime counterpart of ``assign_role``: ``assign_role`` decides the
    role at join time, while ``set_role`` changes it afterwards and notifies the
    agent.

    Args:
        peer_id: The peer ID of the agent whose role is to be changed.
        role: The new role to assign, encoded as an integer bitmask.

    Raises:
        AssertionError: If called on a node that is not a world.

    Note:
        This coroutine has side effects beyond the role update: on a successful
        broadcast it sets ``role_changed_by_world`` to True, and on a failed
        broadcast it disconnects the target agent via the node purge callback.
    """
    assert self.is_world, "Setting the role is expected to be done by the world, which will broadcast such info"

    # Computing new role (keeping the first two bits as before)
    cur_role = self._node_conn.get_role(peer_id)
    new_role_without_base_int = (role >> 2) << 2
    new_role = (cur_role & 3) | new_role_without_base_int

    if new_role != cur_role:
        self._node_conn.set_role(peer_id, new_role)
        log.misc("Telling an agent that his role changed")
        if not (await self._node_conn.send(peer_id, channel_trail=None,
                                           content={'peer_id': peer_id, 'role': new_role,
                                                    'default_behav':
                                                        self.role_to_behav[
                                                            self.ROLE_BITS_TO_STR[new_role_without_base_int]]
                                                        if self.role_to_behav is not None else
                                                        str(HybridStateMachine(None))},
                                           content_type=Msg.ROLE_SUGGESTION)):
            log.error("Failed to send role change, removing (disconnecting) " + peer_id)
            await self._node_purge_fcn(peer_id)
        else:
            self.role_changed_by_world = True

set_addresses_in_profile

set_addresses_in_profile(peer_id: str, addresses: list[str]) -> None

Update the network addresses stored in a known agent's profile.

The existing address list inside the agent's dynamic profile is updated in place (cleared and refilled) rather than replaced, because that list object is shared by reference with other parts of the framework. If the peer is unknown, the call is ignored and an error is logged. On success, received_address_update is set to True to signal that addresses changed.

Parameters:

Name Type Description Default
peer_id str

The peer ID of the agent whose profile is being updated.

required
addresses list[str]

The new list of network addresses to store.

required
Note

No exception is raised for an unknown peer_id; the condition is logged and the method returns without modifying any state.

Source code in unaiverse/world.py
def set_addresses_in_profile(self, peer_id: str, addresses: list[str]) -> None:
    """Update the network addresses stored in a known agent's profile.

    The existing address list inside the agent's dynamic profile is updated in place
    (cleared and refilled) rather than replaced, because that list object is shared
    by reference with other parts of the framework. If the peer is unknown, the call
    is ignored and an error is logged. On success, ``received_address_update`` is set
    to True to signal that addresses changed.

    Args:
        peer_id: The peer ID of the agent whose profile is being updated.
        addresses: The new list of network addresses to store.

    Note:
        No exception is raised for an unknown ``peer_id``; the condition is logged
        and the method returns without modifying any state.
    """
    if peer_id in self.all_agents:
        profile = self.all_agents[peer_id]
        addrs = profile.get_dynamic_profile()['private_peer_addresses']
        addrs.clear()  # Warning: do not allocate a new list, keep the current one (it is referenced by others)
        for _addrs in addresses:
            addrs.append(_addrs)
        self.received_address_update = True
    else:
        log.error(f"Cannot set addresses in profile, unknown peer_id {peer_id}")

add_badge

add_badge(peer_id: str, score: float, badge_type: str, agent_token: str, badge_description: str | None = None) -> None

Award a badge to a connected agent to track and reward its performance.

The score and badge type are validated first, then the badge (together with the agent's node ID, token, description, and an edit timestamp) is appended to the agent's badge list. Awarding a badge marks a change in connections, which forces the world to send out its dynamic profile at the next scheduled instant so that the new badge becomes visible to the network.

Badges accumulate per agent: calling this method several times for the same peer_id appends to that agent's list rather than replacing it. See get_all_badges to read the full record and clear_badges to reset it.

Parameters:

Name Type Description Default
peer_id str

The peer ID of the agent receiving the badge.

required
score float

The score associated with the badge. Must be in the range [0.0, 1.0].

required
badge_type str

The type of badge to award. Must be one of BADGE_TYPES.

required
agent_token str

The token of the agent receiving the badge. The world does not necessarily know this token on its own, because agents usually do not send messages to the world, so the caller must provide it.

required
badge_description str | None

An optional free-text description for the badge. Defaults to None, which is stored as an empty string.

None

Raises:

Type Description
ValueError

If score is outside [0.0, 1.0], or if badge_type is not one of the recognised BADGE_TYPES.

Examples:

>>> world.add_badge(peer_id, score=0.95, badge_type="completion",
...                 agent_token=token, badge_description="Finished the task")
Source code in unaiverse/world.py
def add_badge(self, peer_id: str, score: float, badge_type: str, agent_token: str,
              badge_description: str | None = None) -> None:
    """Award a badge to a connected agent to track and reward its performance.

    The score and badge type are validated first, then the badge (together with the
    agent's node ID, token, description, and an edit timestamp) is appended to the
    agent's badge list. Awarding a badge marks a change in connections, which forces
    the world to send out its dynamic profile at the next scheduled instant so that
    the new badge becomes visible to the network.

    Badges accumulate per agent: calling this method several times for the same
    ``peer_id`` appends to that agent's list rather than replacing it. See
    ``get_all_badges`` to read the full record and ``clear_badges`` to reset it.

    Args:
        peer_id: The peer ID of the agent receiving the badge.
        score: The score associated with the badge. Must be in the range [0.0, 1.0].
        badge_type: The type of badge to award. Must be one of ``BADGE_TYPES``.
        agent_token: The token of the agent receiving the badge. The world does not
            necessarily know this token on its own, because agents usually do not
            send messages to the world, so the caller must provide it.
        badge_description: An optional free-text description for the badge. Defaults
            to None, which is stored as an empty string.

    Raises:
        ValueError: If ``score`` is outside [0.0, 1.0], or if ``badge_type`` is not
            one of the recognised ``BADGE_TYPES``.

    Examples:
        >>> world.add_badge(peer_id, score=0.95, badge_type="completion",
        ...                 agent_token=token, badge_description="Finished the task")
    """

    # Validate score
    if score < 0. or score > 1.:
        raise ValueError(f"Score must be in [0.0, 1.0], got {score}")

    # Validate badge_type
    if badge_type not in AgentBasics.BADGE_TYPES:
        raise ValueError(f"Invalid badge_type '{badge_type}'. Must be one of {AgentBasics.BADGE_TYPES}.")

    if badge_description is None:
        badge_description = ""

    # The world not necessarily knows the token of the agents, since they usually do not send messages to the world
    badge = {
        'agent_node_id': self.all_agents[peer_id].get_static_profile()['node_id'],
        'agent_token': agent_token,
        'badge_type': badge_type,
        'score': score,
        'badge_description': badge_description,
        'last_edit_utc': clock.get_time_as_string(),
    }

    if peer_id not in self.agent_badges:
        self.agent_badges[peer_id] = [badge]
    else:
        self.agent_badges[peer_id].append(badge)

    # This will force the sending of the dynamic profile at the defined time instants
    self._node_profile.mark_change_in_connections()

get_all_badges

get_all_badges() -> dict[str, list[dict[str, Any]]]

Retrieve every badge the world has recorded, grouped by agent.

This provides a central log of achievements and performance metrics across all agents the world has awarded badges to via add_badge.

Returns:

Type Description
dict[str, list[dict[str, Any]]]

A dictionary mapping each agent's peer ID to the list of badge dictionaries

dict[str, list[dict[str, Any]]]

awarded to it. Each badge dictionary contains the keys "agent_node_id",

dict[str, list[dict[str, Any]]]

"agent_token", "badge_type", "score", "badge_description",

dict[str, list[dict[str, Any]]]

and "last_edit_utc". The returned object is the world's live internal

dict[str, list[dict[str, Any]]]

dictionary, not a copy, so callers should not mutate it directly.

Examples:

>>> for peer_id, badges in world.get_all_badges().items():
...     print(peer_id, sum(b["score"] for b in badges))
Source code in unaiverse/world.py
def get_all_badges(self) -> dict[str, list[dict[str, Any]]]:
    """Retrieve every badge the world has recorded, grouped by agent.

    This provides a central log of achievements and performance metrics across all
    agents the world has awarded badges to via ``add_badge``.

    Returns:
        A dictionary mapping each agent's peer ID to the list of badge dictionaries
        awarded to it. Each badge dictionary contains the keys ``"agent_node_id"``,
        ``"agent_token"``, ``"badge_type"``, ``"score"``, ``"badge_description"``,
        and ``"last_edit_utc"``. The returned object is the world's live internal
        dictionary, not a copy, so callers should not mutate it directly.

    Examples:
        >>> for peer_id, badges in world.get_all_badges().items():
        ...     print(peer_id, sum(b["score"] for b in badges))
    """
    return self.agent_badges

clear_badges

clear_badges() -> None

Remove every badge record from the world's memory.

Useful to reset competition results or to clean up state after a specific event. Only the in-memory record held by the world is cleared (the same dictionary returned by get_all_badges); badges already propagated to the network are not retracted.

Source code in unaiverse/world.py
def clear_badges(self) -> None:
    """Remove every badge record from the world's memory.

    Useful to reset competition results or to clean up state after a specific event.
    Only the in-memory record held by the world is cleared (the same dictionary
    returned by ``get_all_badges``); badges already propagated to the network are not
    retracted.
    """
    self.agent_badges = {}

add_agent async

add_agent(peer_id: str, profile: NodeProfile, add_proc_streams: bool = True, add_env_streams: bool = True, add_pubsub_streams: bool = True) -> bool

Register a new agent in the world and record its peer ID mapping (async).

The actual registration is delegated to the parent AgentBasics.add_agent. On success, the mapping from the agent's public peer ID to the private peer ID under which it is known inside the world is additionally recorded (in private_peer_of), so that later lookups can translate between the two namespaces. The inverse cleanup happens in remove_agent.

The add_proc_streams, add_env_streams, and add_pubsub_streams flags exist for parity with the agent-side signature but have no effect here: a world has no processor and does not subscribe to peer streams.

Parameters:

Name Type Description Default
peer_id str

The (private) peer ID of the agent being added.

required
profile NodeProfile

The NodeProfile carrying the agent's information.

required
add_proc_streams bool

Whether to add the peer's processor streams to the set of known streams when compatible. Ignored for worlds. Defaults to True.

True
add_env_streams bool

Whether to add the peer's environmental streams to the set of known streams when compatible. Ignored for worlds. Defaults to True.

True
add_pubsub_streams bool

Whether to add and subscribe to the peer's pubsub streams. Ignored for worlds. Defaults to True.

True

Returns:

Type Description
bool

True if the parent class successfully added the agent, False otherwise. When

bool

False, the peer ID mapping is left untouched.

Source code in unaiverse/world.py
async def add_agent(self, peer_id: str, profile: NodeProfile,
                    add_proc_streams: bool = True,
                    add_env_streams: bool = True,
                    add_pubsub_streams: bool = True) -> bool:
    """Register a new agent in the world and record its peer ID mapping (async).

    The actual registration is delegated to the parent ``AgentBasics.add_agent``. On
    success, the mapping from the agent's public peer ID to the private peer ID under
    which it is known inside the world is additionally recorded (in
    ``private_peer_of``), so that later lookups can translate between the two
    namespaces. The inverse cleanup happens in ``remove_agent``.

    The ``add_proc_streams``, ``add_env_streams``, and ``add_pubsub_streams`` flags
    exist for parity with the agent-side signature but have no effect here: a world
    has no processor and does not subscribe to peer streams.

    Args:
        peer_id: The (private) peer ID of the agent being added.
        profile: The ``NodeProfile`` carrying the agent's information.
        add_proc_streams: Whether to add the peer's processor streams to the set of
            known streams when compatible. Ignored for worlds. Defaults to True.
        add_env_streams: Whether to add the peer's environmental streams to the set
            of known streams when compatible. Ignored for worlds. Defaults to True.
        add_pubsub_streams: Whether to add and subscribe to the peer's pubsub
            streams. Ignored for worlds. Defaults to True.

    Returns:
        True if the parent class successfully added the agent, False otherwise. When
        False, the peer ID mapping is left untouched.
    """
    if await super().add_agent(peer_id, profile):
        public_peer_id = profile.get_dynamic_profile()["peer_id"]
        self.private_peer_of[public_peer_id] = peer_id
        return True
    else:
        return False

remove_agent async

remove_agent(peer_id: str) -> bool

Remove an agent from the world and clean up its peer ID mapping (async).

The agent's profile is captured before delegating removal to the parent AgentBasics.remove_agent. On success, the corresponding public-to-private peer ID entry is deleted from private_peer_of so no stale mapping is left behind. This is the inverse of add_agent.

Parameters:

Name Type Description Default
peer_id str

The private peer ID of the agent to remove.

required

Returns:

Type Description
bool

True if the parent class successfully removed the agent, False otherwise.

bool

When False, the peer ID mapping is left untouched.

Source code in unaiverse/world.py
async def remove_agent(self, peer_id: str) -> bool:
    """Remove an agent from the world and clean up its peer ID mapping (async).

    The agent's profile is captured before delegating removal to the parent
    ``AgentBasics.remove_agent``. On success, the corresponding public-to-private
    peer ID entry is deleted from ``private_peer_of`` so no stale mapping is left
    behind. This is the inverse of ``add_agent``.

    Args:
        peer_id: The private peer ID of the agent to remove.

    Returns:
        True if the parent class successfully removed the agent, False otherwise.
        When False, the peer ID mapping is left untouched.
    """
    profile = None
    if peer_id in self.all_agents:
        profile = self.all_agents[peer_id]
    if await super().remove_agent(peer_id):
        if profile is not None:
            public_peer_id = profile.get_dynamic_profile()["peer_id"]
            if public_peer_id in self.private_peer_of:
                del self.private_peer_of[public_peer_id]
        return True
    else:
        return False

collect_and_store_own_stats

collect_and_store_own_stats() -> None

Collect this world's own membership counts and push them to the recorder.

The current number of world masters, world agents, human agents, and artificial agents is recorded under the world's own private peer ID at the current time. If no Stats recorder is configured, the call returns immediately. Any error raised by the recorder is caught and logged rather than propagated, so a stats failure never interrupts world operation. This is invoked once per batch at the start of add_peer_stats.

Source code in unaiverse/world.py
def collect_and_store_own_stats(self) -> None:
    """Collect this world's own membership counts and push them to the recorder.

    The current number of world masters, world agents, human agents, and artificial
    agents is recorded under the world's own private peer ID at the current time. If
    no ``Stats`` recorder is configured, the call returns immediately. Any error
    raised by the recorder is caught and logged rather than propagated, so a stats
    failure never interrupts world operation. This is invoked once per batch at the
    start of ``add_peer_stats``.
    """
    if self.stats is None:
        return

    t = clock.get_time_ms()
    _, own_private_pid = self.get_peer_ids()

    try:
        self.stats.store_stat("world_masters", len(self.world_masters), group_key=own_private_pid,
                              timestamp=t)
        self.stats.store_stat("world_agents", len(self.world_agents), group_key=own_private_pid,
                              timestamp=t)
        self.stats.store_stat("human_agents", len(self.human_agents), group_key=own_private_pid,
                              timestamp=t)
        self.stats.store_stat("artificial_agents", len(self.artificial_agents), group_key=own_private_pid,
                              timestamp=t)
    except Exception as e:
        log.error(f"[Stats] Error updating own world stats: {e}")

add_peer_stats

add_peer_stats(peer_stats_batch: list[dict[str, Any]], sender_peer_id: str | None = None) -> None

Process a batch of stats reported by peers and update world state (world-only).

For each update in the batch, the world's own membership stats are refreshed first, then the stat is either handled by a custom hook (_process_custom_stat), deferred for graph processing (connected_peers updates), or stored in the Stats recorder when the stat name is known. After every update is processed, the connectivity graph is rebuilt from the deferred connected_peers entries and stale nodes are pruned. Graph updates are deferred to the end of the batch on purpose: node metadata for a peer may only become available after all updates in the batch have been seen. Errors on individual updates are caught and logged so a single malformed update does not abort the whole batch.

Parameters:

Name Type Description Default
peer_stats_batch list[dict[str, Any]]

A list of stat-update dictionaries. Each entry is expected to contain "group_key" (the reporting peer ID), "stat_name", "timestamp", and "value" keys.

required
sender_peer_id str | None

The peer ID of the sender. Currently not used for filtering. Defaults to None.

None
Source code in unaiverse/world.py
def add_peer_stats(self, peer_stats_batch: list[dict[str, Any]],
                   sender_peer_id: str | None = None) -> None:
    """Process a batch of stats reported by peers and update world state (world-only).

    For each update in the batch, the world's own membership stats are refreshed
    first, then the stat is either handled by a custom hook (``_process_custom_stat``),
    deferred for graph processing (``connected_peers`` updates), or stored in the
    ``Stats`` recorder when the stat name is known. After every update is processed,
    the connectivity graph is rebuilt from the deferred ``connected_peers`` entries
    and stale nodes are pruned. Graph updates are deferred to the end of the batch on
    purpose: node metadata for a peer may only become available after all updates in
    the batch have been seen. Errors on individual updates are caught and logged so a
    single malformed update does not abort the whole batch.

    Args:
        peer_stats_batch: A list of stat-update dictionaries. Each entry is expected
            to contain ``"group_key"`` (the reporting peer ID), ``"stat_name"``,
            ``"timestamp"``, and ``"value"`` keys.
        sender_peer_id: The peer ID of the sender. Currently not used for filtering.
            Defaults to None.
    """

    # 1. Update own stats (this logic is now in the World)
    self.collect_and_store_own_stats()

    # 2. Process peer stats
    connected_peers = []
    for update in peer_stats_batch:
        try:
            p_id = update['group_key']
            if p_id != sender_peer_id:
                # TODO: decide if we want to filter the stats
                pass
            stat_name = update['stat_name']
            t = int(update['timestamp'])
            v = update['value']

            # Call the hook (which also lives in the World now)
            if self._process_custom_stat(stat_name, v, p_id, t):
                continue  # The custom processor handled it

            # Generate the graph and handle the connected_peers stat
            if stat_name == 'connected_peers':
                # We need to wait for all the info to arrive before updating the graph.
                # Otherwise, _extract_graph_node_info may not find data yet.
                connected_peers.append((p_id, v, t))
                continue

            # 3. Push to the "dumb" Stats recorder
            if stat_name in self.stats.all_keys:
                self.stats.store_stat(stat_name, v, group_key=p_id, timestamp=t)
            else:
                log.error(f"[World] Unknown stat received: {stat_name}")

        except Exception as e:
            log.error(f"[World] Error processing stats update {update}: {e}")

    # Now update the graph for all collected connected_peers stats
    for p_id, v, t in connected_peers:
        self._update_graph(p_id, v, t)

    # Clean the graph from potentially stale peers
    self._prune_graph()

debug_stats_dashboard

debug_stats_dashboard() -> None

Render the world's stats dashboard to a temporary file and open it (dev only).

Convenience helper for development: the current stats are rendered as an HTML dashboard, written to a temporary file, and opened in the default web browser. If the renderer produces no HTML, nothing is opened. This method is intended for local debugging and is not part of the normal world lifecycle.

Source code in unaiverse/world.py
def debug_stats_dashboard(self) -> None:
    """Render the world's stats dashboard to a temporary file and open it (dev only).

    Convenience helper for development: the current stats are rendered as an HTML
    dashboard, written to a temporary file, and opened in the default web browser. If
    the renderer produces no HTML, nothing is opened. This method is intended for
    local debugging and is not part of the normal world lifecycle.
    """
    import tempfile
    import webbrowser

    log.debug("Rendering Dashboard...")
    html = self.stats.plot()
    if html:
        tmp = tempfile.NamedTemporaryFile(suffix=".html", delete=False, mode="w", encoding="utf-8")
        tmp.write(html)
        tmp.close()
        webbrowser.open(f"file://{tmp.name}")