Skip to content

unaiverse.agent

What this module does 🔴

Defines the high-level Agent class that orchestrates peer engagement, data streaming, learning, evaluation, and connection management on top of AgentBasics within the decentralized network.

agent

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████ ░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█ ░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

Agent

Agent(*args, **kwargs)

Bases: AgentBasics

An agent node: the primary participant in a UNaIVERSE world.

Agent extends AgentBasics with a rich set of pre-built action methods that cover the full agent lifecycle - from joining a world and exchanging data streams to engaging and disengaging with peers, running inference, learning, and reporting statistics.

Custom agents are built by subclassing Agent and overriding hooks such as hook_proc_tweak_inputs and hook_proc_tweak_outputs, or by adding new @action-decorated coroutines that can be wired into a HybridStateMachine behaviour JSON.

Because Agent is also an AgentBasics instance, it inherits all membership bookkeeping (all_agents, world_agents, world_masters, human_agents, artificial_agents) and the role constants (ROLE_WORLD_MASTER, ROLE_WORLD_AGENT, ROLE_BITS_TO_STR) used throughout the framework.

Attributes:

Name Type Description
stats

The Stats recorder used to collect this agent's own metrics and to receive aggregated views from the world.

overwrite_stats

When True, the next STATS_RESPONSE received from the world will overwrite the local stats view instead of merging into it.

Examples:

A minimal custom agent that processes incoming data and reports to the world:

>>> from unaiverse.agent import Agent, action
>>> class Recogniser(Agent):
...     @action
...     async def on_result(self, interaction=None) -> bool:
...         print("Interaction completed:", interaction)
...         return True
>>>
>>> agent = Recogniser(proc=my_proc, proc_inputs=inputs, proc_outputs=outputs,
...                    proc_opts=opts, behav=my_behav, world_folder=None)

Initialize the agent, setting up status tracking and the stats recorder.

All positional and keyword arguments are forwarded unchanged to the parent AgentBasics.__init__. After the parent is initialised, agent-level status tracking sets (engaged agents, found agents, evaluation results, playlist state) and a fresh Stats recorder are set up.

The Stats instance created here is agent-scoped (is_world=False) and starts empty. It will be populated as the agent runs actions and receives statistics responses from the world.

Parameters:

Name Type Description Default
*args

Positional arguments forwarded to AgentBasics.__init__.

()
**kwargs

Keyword arguments forwarded to AgentBasics.__init__.

{}

Examples:

>>> from unaiverse.agent import Agent
>>> agent = Agent(proc=my_proc, proc_inputs=inputs, proc_outputs=outputs,
...               proc_opts=opts, behav=my_behav, world_folder="./my_world")
Source code in unaiverse/agent.py
def __init__(self, *args, **kwargs):
    """Initialize the agent, setting up status tracking and the stats recorder.

    All positional and keyword arguments are forwarded unchanged to the parent
    ``AgentBasics.__init__``. After the parent is initialised, agent-level
    status tracking sets (engaged agents, found agents, evaluation results,
    playlist state) and a fresh ``Stats`` recorder are set up.

    The ``Stats`` instance created here is agent-scoped (``is_world=False``)
    and starts empty. It will be populated as the agent runs actions and
    receives statistics responses from the world.

    Args:
        *args: Positional arguments forwarded to ``AgentBasics.__init__``.
        **kwargs: Keyword arguments forwarded to ``AgentBasics.__init__``.

    Examples:
        >>> from unaiverse.agent import Agent
        >>> agent = Agent(proc=my_proc, proc_inputs=inputs, proc_outputs=outputs,
        ...               proc_opts=opts, behav=my_behav, world_folder="./my_world")
    """
    super().__init__(*args, **kwargs)

    # Status variables (assumed to start with "_"): Agent exchanges
    self._available = True  # It will be automatically set/changed during the agent's life
    self._found_agents = set()  # Peer IDs discovered
    self._valid_cmp_agents = set()  # Agents for which the last evaluation was positive
    self._engaged_agents = set()
    self._agents_who_completed_what_they_were_asked = set()
    self._agents_who_were_asked = set()
    self._eval_results = {}

    # Status variables (assumed to start with "_"): Recordings
    self._last_recorded_stream_num = 1
    self._last_recorded_stream_dict = None
    self._last_recorded_count = 0

    # Status variables (assumed to start with "_"): Playlist
    self._preferred_streams = []  # List of preferred streams
    self._cur_preferred_stream = 0  # ID of the current preferred stream from the list
    self._repeat = 1  # Number of repetitions of the playlist

    # Stats
    self.stats = Stats(is_world=False)
    self.overwrite_stats = False  # Whether to overwrite stats when receiving the next STATS_RESPONSE from the world

stats instance-attribute

stats = Stats(is_world=False)

overwrite_stats instance-attribute

overwrite_stats = False

collect_and_store_own_stats

collect_and_store_own_stats() -> None

Collect this agent's own runtime metrics and push them to the stats recorder.

Three categories of metrics are sampled at the current wall-clock time and stored under the agent's own private peer ID:

  • Network connectivity: the list of peer IDs currently connected through the p2p_world connection pool is stored under the key "connected_peers". If the connection pool raises an exception (for example while the node is still bootstrapping), an empty list is stored instead and the error is logged.
  • HSM state and action: the current state name, the currently executing action name, and the last completed action name are read from self.behav and stored under "state", "action", and "last_action" respectively.

If self.stats is None, the method returns immediately without performing any work. Errors from the stats recorder are caught and logged rather than propagated, so a stats failure never interrupts the agent.

Note

This method is called automatically by send_stats_to_world before assembling the payload. It can also be called manually to force a stats snapshot at any time.

Source code in unaiverse/agent.py
def collect_and_store_own_stats(self) -> None:
    """Collect this agent's own runtime metrics and push them to the stats recorder.

    Three categories of metrics are sampled at the current wall-clock time and
    stored under the agent's own private peer ID:

    - Network connectivity: the list of peer IDs currently connected through
      the ``p2p_world`` connection pool is stored under the key
      ``"connected_peers"``. If the connection pool raises an exception (for
      example while the node is still bootstrapping), an empty list is stored
      instead and the error is logged.
    - HSM state and action: the current state name, the currently executing
      action name, and the last completed action name are read from
      ``self.behav`` and stored under ``"state"``, ``"action"``, and
      ``"last_action"`` respectively.

    If ``self.stats`` is ``None``, the method returns immediately without
    performing any work. Errors from the stats recorder are caught and logged
    rather than propagated, so a stats failure never interrupts the agent.

    Note:
        This method is called automatically by ``send_stats_to_world`` before
        assembling the payload. It can also be called manually to force a
        stats snapshot at any time.
    """
    if self.stats is None:
        return

    _, own_private_pid = self.get_peer_ids()
    t = clock.get_time_ms()
    try:
        info = self._node_conn['p2p_world'].get_connected_peers_info()
        peers_list = [i['id'] for i in info]
        self.stats.store_stat('connected_peers', peers_list, group_key=own_private_pid, timestamp=t)
    except Exception as e:
        self.stats.store_stat('connected_peers', [], group_key=own_private_pid, timestamp=t)
        log.error(f"[Stats] Error collecting and storing own stats, clearing: {e}")

    try:
        behav = self.behav
        self.stats.store_stat('state', behav.get_state_name(), group_key=own_private_pid, timestamp=t)
        self.stats.store_stat('action', behav.get_action_name(), group_key=own_private_pid, timestamp=t)
        self.stats.store_stat('last_action', behav.get_last_completed_action_name(),
                              group_key=own_private_pid, timestamp=t)
    except Exception as e:
        log.error(f"[Stats] Error storing HSM stats: {e}")

send_stats_to_world async

send_stats_to_world() -> None

Collect and transmit the agent's buffered statistics to the world node (async).

The method first verifies that the agent is currently connected to a world. If not, it returns early. Otherwise it calls collect_and_store_own_stats to snapshot the latest runtime metrics, then assembles the payload via stats.get_payload_for_world() and sends it to the world peer using the Msg.STATS_UPDATE message type.

If the payload is empty (no stats have accumulated since the last send) the transmission is skipped. If the network send fails, an error is logged but no exception is raised.

Note

The stats payload is consumed (cleared) by get_payload_for_world after being assembled, so repeated calls will not re-send the same data.

Source code in unaiverse/agent.py
async def send_stats_to_world(self) -> None:
    """Collect and transmit the agent's buffered statistics to the world node (async).

    The method first verifies that the agent is currently connected to a world.
    If not, it returns early. Otherwise it calls ``collect_and_store_own_stats``
    to snapshot the latest runtime metrics, then assembles the payload via
    ``stats.get_payload_for_world()`` and sends it to the world peer using the
    ``Msg.STATS_UPDATE`` message type.

    If the payload is empty (no stats have accumulated since the last send) the
    transmission is skipped. If the network send fails, an error is logged but
    no exception is raised.

    Note:
        The stats payload is consumed (cleared) by ``get_payload_for_world``
        after being assembled, so repeated calls will not re-send the same data.
    """
    if not self.in_world():
        log.debug("[send_stats_to_world] Not in a world, skipping stats send.")
        return

    world_peer_id = self._node_conn.get_world_peer_id()
    if world_peer_id is None:
        log.error("Agent in world, but world_peer_id is None.")
        return

    self.collect_and_store_own_stats()  # update own stats
    payload = self.stats.get_payload_for_world()
    if not payload:
        log.debug("[send_stats_to_world] No stats to send.")
        return

    # Send all stats
    log.misc(f"Sending stats update to world {world_peer_id}...")
    if not (await self._node_conn.send(world_peer_id,
                                       channel_trail=None,
                                       content=payload,
                                       content_type=Msg.STATS_UPDATE)):
        log.error("Failed to send stats update to world.")

update_stats_view

update_stats_view(received_view: dict, overwrite: bool = False) -> None

Merge a stats view received from the world into the agent's local stats.

The received_view dictionary is passed directly to stats.update_view. Existing entries in the local stats are preserved unless overwrite is True, in which case any key present in the incoming view replaces the locally held value.

This method is typically invoked by the networking layer when the world replies to a STATS_REQUEST message with a STATS_RESPONSE.

Parameters:

Name Type Description Default
received_view dict

The stats view dictionary received from the world. Its structure mirrors the internal Stats view format.

required
overwrite bool

When True, existing local entries are replaced by the incoming values. When False (default), local entries are kept and only absent keys are added. Defaults to False.

False
Source code in unaiverse/agent.py
def update_stats_view(self, received_view: dict, overwrite: bool = False) -> None:
    """Merge a stats view received from the world into the agent's local stats.

    The ``received_view`` dictionary is passed directly to
    ``stats.update_view``. Existing entries in the local stats are preserved
    unless ``overwrite`` is True, in which case any key present in the incoming
    view replaces the locally held value.

    This method is typically invoked by the networking layer when the world
    replies to a ``STATS_REQUEST`` message with a ``STATS_RESPONSE``.

    Args:
        received_view: The stats view dictionary received from the world. Its
            structure mirrors the internal ``Stats`` view format.
        overwrite: When True, existing local entries are replaced by the
            incoming values. When False (default), local entries are kept and
            only absent keys are added. Defaults to False.
    """
    self.stats.update_view(received_view, overwrite)

suggest_role_to_world async

suggest_role_to_world(agent: str | None, role: str) -> bool

Suggest a role change for one or more agents to the world master (async).

The method resolves agent to the set of involved peer IDs (see __involved_agents), converts role to its integer bitmask, and filters out any peer whose current role already matches the requested role. For the remaining peers a ROLE_SUGGESTION message is sent to the world master carrying a list of {"peer_id": ..., "role": ...} entries.

If all involved agents already have the requested role, no message is sent and True is returned immediately.

Parameters:

Name Type Description Default
agent str | None

The peer ID of the target agent, a wildcard string (such as "<valid_cmp>"), or None to use the currently engaged agents.

required
role str

The desired role as a human-readable string, matching a key in ROLE_STR_TO_BITS (for example "world_agent").

required

Returns:

Type Description
bool

True if the suggestion message was sent successfully (or if no message

bool

was needed), False if the network send failed.

Source code in unaiverse/agent.py
async def suggest_role_to_world(self, agent: str | None, role: str) -> bool:
    """Suggest a role change for one or more agents to the world master (async).

    The method resolves ``agent`` to the set of involved peer IDs (see
    ``__involved_agents``), converts ``role`` to its integer bitmask, and
    filters out any peer whose current role already matches the requested role.
    For the remaining peers a ``ROLE_SUGGESTION`` message is sent to the world
    master carrying a list of ``{"peer_id": ..., "role": ...}`` entries.

    If all involved agents already have the requested role, no message is sent
    and ``True`` is returned immediately.

    Args:
        agent: The peer ID of the target agent, a wildcard string (such as
            ``"<valid_cmp>"``), or ``None`` to use the currently engaged agents.
        role: The desired role as a human-readable string, matching a key in
            ``ROLE_STR_TO_BITS`` (for example ``"world_agent"``).

    Returns:
        True if the suggestion message was sent successfully (or if no message
        was needed), False if the network send failed.
    """
    log.misc("Suggesting role to world")

    agents = self.__involved_agents(agent)
    role_bits = (self.ROLE_STR_TO_BITS[role] >> 2) << 2

    content = []

    for _agent in agents:
        cur_role_bits = self.ROLE_STR_TO_BITS[self.all_agents[_agent].get_dynamic_profile()['connections']['role']]
        cur_role_bits = (cur_role_bits >> 2) << 2
        if cur_role_bits == role_bits:
            log.misc(f"Not suggesting to change the role of {_agent} "
                     f"since it has already such a role")
        else:
            log.misc(f"Suggesting to change the role of {_agent} to {self.ROLE_BITS_TO_STR[role_bits]}")
            content.append({'peer_id': _agent, 'role': role_bits})

    if len(content) > 0:
        world_peer_id = self._node_conn.get_world_peer_id()
        if not (await self._node_conn.send(world_peer_id, channel_trail=None,
                                           content=content,
                                           content_type=Msg.ROLE_SUGGESTION)):
            log.error("Failed to send role suggestion to the world")
            return False
    return True

suggest_badges_to_world async

suggest_badges_to_world(agent: str | None = None, score: float = -1.0, badge_type: str = 'completed', badge_description: str | None = None) -> bool

Suggest one or more performance badges to the world master (async).

This action is typically called by an evaluator agent to reward other agents after a competition round. For each involved peer it constructs a badge dictionary containing the peer ID, score, badge type, optional description, and the agent's last known token retrieved via the connection pool, then sends all dictionaries in a single Msg.BADGE_SUGGESTIONS message to the world master.

The score must be non-negative and the badge type must be one of Agent.BADGE_TYPES; both are validated before any message is sent.

Parameters:

Name Type Description Default
agent str | None

The peer ID of the target agent, a wildcard (such as "<valid_cmp>"), or None to use the currently engaged agents.

None
score float

A non-negative float representing the agent's performance score. Defaults to -1.0, which will cause an early-out error return.

-1.0
badge_type str

The category of the badge. Must be one of Agent.BADGE_TYPES. Defaults to "completed".

'completed'
badge_description str | None

An optional free-text description for the badge. Defaults to None.

None

Returns:

Type Description
bool

True if the badge suggestion message was sent successfully,

bool

False if the score is invalid, the badge type is unknown, or the

bool

network send failed.

Source code in unaiverse/agent.py
async def suggest_badges_to_world(self, agent: str | None = None,
                                  score: float = -1.0, badge_type: str = "completed",
                                  badge_description: str | None = None) -> bool:
    """Suggest one or more performance badges to the world master (async).

    This action is typically called by an evaluator agent to reward other agents
    after a competition round. For each involved peer it constructs a badge
    dictionary containing the peer ID, score, badge type, optional description,
    and the agent's last known token retrieved via the connection pool, then
    sends all dictionaries in a single ``Msg.BADGE_SUGGESTIONS`` message to
    the world master.

    The score must be non-negative and the badge type must be one of
    ``Agent.BADGE_TYPES``; both are validated before any message is sent.

    Args:
        agent: The peer ID of the target agent, a wildcard (such as
            ``"<valid_cmp>"``), or ``None`` to use the currently engaged agents.
        score: A non-negative float representing the agent's performance score.
            Defaults to ``-1.0``, which will cause an early-out error return.
        badge_type: The category of the badge. Must be one of
            ``Agent.BADGE_TYPES``. Defaults to ``"completed"``.
        badge_description: An optional free-text description for the badge.
            Defaults to None.

    Returns:
        True if the badge suggestion message was sent successfully,
        False if the score is invalid, the badge type is unknown, or the
        network send failed.
    """
    log.misc("Suggesting one or more badges to world")

    if score < 0.:
        log.error("Invalid score (did you specify the 'score' argument? it must be positive)")
        return False

    agents = self.__involved_agents(agent)
    world_peer_id = self._node_conn.get_world_peer_id()

    if badge_type not in Agent.BADGE_TYPES:
        log.error(f"Unknown badge type: {badge_type}")
        return False

    list_of_badge_dictionaries = []
    for peer_id in agents:
        list_of_badge_dictionaries.append({'peer_id': peer_id,
                                           'score': score,
                                           'badge_type': badge_type,
                                           'badge_description': badge_description,
                                           'agent_token': self._node_conn.get_last_token(peer_id)})

    if not (await self._node_conn.send(world_peer_id, channel_trail=None,
                                       content=list_of_badge_dictionaries,
                                       content_type=Msg.BADGE_SUGGESTIONS)):
        log.error("Failed to send badge suggestions to the world")
        return False
    else:
        return True

remove_peer_from_agent_status_attrs

remove_peer_from_agent_status_attrs(peer_id: str) -> None

Remove a peer from all agent-level status tracking sets.

Delegates to the parent AgentBasics.remove_peer_from_agent_status_attrs first, which removes peer_id from _found_agents, _valid_cmp_agents, _engaged_agents, and related bookkeeping structures. Afterwards, _available is recomputed: the agent is considered available again only when _engaged_agents is empty.

This override ensures that disconnecting a peer automatically restores availability if the peer was the last engagement partner.

Parameters:

Name Type Description Default
peer_id str

The private peer ID to remove from all status tracking structures.

required
Source code in unaiverse/agent.py
def remove_peer_from_agent_status_attrs(self, peer_id: str) -> None:
    """Remove a peer from all agent-level status tracking sets.

    Delegates to the parent ``AgentBasics.remove_peer_from_agent_status_attrs``
    first, which removes ``peer_id`` from ``_found_agents``,
    ``_valid_cmp_agents``, ``_engaged_agents``, and related bookkeeping
    structures. Afterwards, ``_available`` is recomputed: the agent is
    considered available again only when ``_engaged_agents`` is empty.

    This override ensures that disconnecting a peer automatically restores
    availability if the peer was the last engagement partner.

    Args:
        peer_id: The private peer ID to remove from all status tracking
            structures.
    """
    super().remove_peer_from_agent_status_attrs(peer_id)
    self._available = len(self._engaged_agents) == 0

reset_agent_status_attrs

reset_agent_status_attrs() -> None

Reset all agent-level status attributes to their initial values.

Delegates to AgentBasics.reset_agent_status_attrs first, which zeros or clears generic status variables (lists, dicts, counters, and boolean flags). The following Agent-specific attributes are then explicitly restored to their construction-time defaults:

  • _available is set to True (the agent starts as available).
  • _repeat is set to 1 (single-pass playlist).
  • _last_recorded_stream_num is set to 1.
  • _last_recorded_count is set to 0.

This method is called by the framework when the agent re-enters a clean state, for example after a world disconnection.

Source code in unaiverse/agent.py
def reset_agent_status_attrs(self) -> None:
    """Reset all agent-level status attributes to their initial values.

    Delegates to ``AgentBasics.reset_agent_status_attrs`` first, which zeros
    or clears generic status variables (lists, dicts, counters, and boolean
    flags). The following ``Agent``-specific attributes are then explicitly
    restored to their construction-time defaults:

    - ``_available`` is set to True (the agent starts as available).
    - ``_repeat`` is set to 1 (single-pass playlist).
    - ``_last_recorded_stream_num`` is set to 1.
    - ``_last_recorded_count`` is set to 0.

    This method is called by the framework when the agent re-enters a clean
    state, for example after a world disconnection.
    """
    super().reset_agent_status_attrs()  # this sets status vars to [], {}, 0, 0., False, in function of their type
    self._available = True
    self._repeat = 1
    self._last_recorded_stream_num = 1
    self._last_recorded_count = 0

send async

send(action_name: str | None = None, target: str | list[str] | None = None, action_kwargs: dict | None = None, streams: list[tuple[str, int]] | list[str] | None = None, data_samples: list[str | Image | Tensor] | dict[str, str | Image | Tensor] | None = None, num_steps: int = -1, timeout: float = -1.0, from_state: str | None = None, to_state: str | None = None, callback: str | None = None, forced_uuid: str | None = 'do_not_force', id: str | None = 'random', copy_sys: bool = False, wait_completion: bool = False, volatile: bool = False, interaction: Interaction | None = None) -> bool

Send an interaction request to one or more target agents (async).

Accepts either a pre-built Interaction object supplied via interaction or raw keyword arguments from which one will be created internally. When interaction carries a non-system interaction, the call is rejected immediately.

On the first invocation (interaction.get_mark() is None) the target agents are resolved, a new interaction is dispatched via _send, and (if wait_completion=True) the interaction is stored as a mark on the system interaction so the HSM can re-enter this action on subsequent ticks until completion.

On re-entry (interaction.get_mark() is not None), the stored mark is retrieved and True is returned only when the tracked interaction has been completed by the interaction manager.

Parameters:

Name Type Description Default
action_name str | None

The action name to invoke on the target agent(s). Ignored when a pre-built Interaction is provided.

None
target str | list[str] | None

Peer ID, wildcard (such as "<valid_cmp>"), or list of peer IDs to send the interaction to. Ignored when a pre-built Interaction is provided.

None
action_kwargs dict | None

Keyword arguments to pass to the target action. Ignored when a pre-built Interaction is provided.

None
streams list[tuple[str, int]] | list[str] | None

List of (stream_hash, num_samples) tuples, or plain stream hashes (implying one sample each). Both net hashes and user hashes are accepted; the name or group alone is also valid and will be resolved automatically. Ignored when a pre-built Interaction is provided.

None
data_samples list[str | Image | Tensor] | dict[str, str | Image | Tensor] | None

Actual data samples to attach to the interaction. May be a list of raw values or a dict mapping stream user hashes to samples. Ignored when a pre-built Interaction is provided.

None
num_steps int

Number of time steps the interaction spans. Ignored when a pre-built Interaction is provided.

-1
timeout float

Maximum wall-clock seconds allowed for the interaction to complete. A value of -1. disables the timeout. Ignored when a pre-built Interaction is provided.

-1.0
from_state str | None

The HSM state from which the interaction originates. Ignored when a pre-built Interaction is provided.

None
to_state str | None

The HSM state the sender will transition to after the interaction completes. Ignored when a pre-built Interaction is provided.

None
callback str | None

Name of an action method to call on this agent once the interaction is completed by the recipient.

None
forced_uuid str | None

An explicit UUID to assign to the new interaction. Use with care to avoid UUID collisions. Defaults to "do_not_force", which lets the framework generate a UUID.

'do_not_force'
id str | None

A human-readable identifier attached to the interaction for logging. Ignored when a pre-built Interaction is provided. Defaults to "random".

'random'
copy_sys bool

When True, the last stream data published under the system UUID is copied into the new interaction to seed its initial data.

False
wait_completion bool

When True, the action keeps the HSM in the current state until all involved agents confirm completion (pedantic mode). Defaults to False.

False
volatile bool

When True, the recipient is instructed not to send back any completion status for this interaction. Incompatible with wait_completion. Defaults to False.

False
interaction Interaction | None

The system Interaction injected automatically by the dispatch layer. Must be a system interaction or None; non-system interactions cause an immediate False return.

None

Returns:

Type Description
bool

True if at least one target agent was reached (or if completion was

bool

confirmed when wait_completion=True), False otherwise.

Raises:

Type Description
GenException

If both wait_completion and volatile are True.

Examples:

Trigger process on an engaged agent and wait for it to finish:

>>> await agent.send(action_name="process",
...                  target="<engaged>",
...                  wait_completion=True)
Source code in unaiverse/agent.py
@action
async def send(self, action_name: str | None = None,
               target: str | list[str] | None = None,
               action_kwargs: dict | None = None,
               streams: list[tuple[str, int]] | list[str] | None = None,
               data_samples: list[str | Image | torch.Tensor] | dict[
                   str, str | Image | torch.Tensor] | None = None,
               num_steps: int = -1,
               timeout: float = -1.,
               from_state: str | None = None,
               to_state: str | None = None,
               callback: str | None = None,
               forced_uuid: str | None = "do_not_force",
               id: str | None = "random",
               copy_sys: bool = False,
               wait_completion: bool = False,
               volatile: bool = False,
               interaction: Interaction | None = None) -> bool:
    """Send an interaction request to one or more target agents (async).

    Accepts either a pre-built ``Interaction`` object supplied via
    ``interaction`` *or* raw keyword arguments from which one will be created
    internally. When ``interaction`` carries a non-system interaction, the call
    is rejected immediately.

    On the first invocation (``interaction.get_mark() is None``) the target
    agents are resolved, a new interaction is dispatched via ``_send``, and
    (if ``wait_completion=True``) the interaction is stored as a mark on the
    system interaction so the HSM can re-enter this action on subsequent ticks
    until completion.

    On re-entry (``interaction.get_mark() is not None``), the stored mark is
    retrieved and ``True`` is returned only when the tracked interaction has
    been completed by the interaction manager.

    Args:
        action_name: The action name to invoke on the target agent(s). Ignored
            when a pre-built ``Interaction`` is provided.
        target: Peer ID, wildcard (such as ``"<valid_cmp>"``), or list of peer
            IDs to send the interaction to. Ignored when a pre-built
            ``Interaction`` is provided.
        action_kwargs: Keyword arguments to pass to the target action. Ignored
            when a pre-built ``Interaction`` is provided.
        streams: List of ``(stream_hash, num_samples)`` tuples, or plain
            stream hashes (implying one sample each). Both net hashes and user
            hashes are accepted; the name or group alone is also valid and will
            be resolved automatically. Ignored when a pre-built ``Interaction``
            is provided.
        data_samples: Actual data samples to attach to the interaction. May be
            a list of raw values or a dict mapping stream user hashes to
            samples. Ignored when a pre-built ``Interaction`` is provided.
        num_steps: Number of time steps the interaction spans. Ignored when a
            pre-built ``Interaction`` is provided.
        timeout: Maximum wall-clock seconds allowed for the interaction to
            complete. A value of ``-1.`` disables the timeout. Ignored when a
            pre-built ``Interaction`` is provided.
        from_state: The HSM state from which the interaction originates.
            Ignored when a pre-built ``Interaction`` is provided.
        to_state: The HSM state the sender will transition to after the
            interaction completes. Ignored when a pre-built ``Interaction``
            is provided.
        callback: Name of an action method to call on this agent once the
            interaction is completed by the recipient.
        forced_uuid: An explicit UUID to assign to the new interaction. Use
            with care to avoid UUID collisions. Defaults to
            ``"do_not_force"``, which lets the framework generate a UUID.
        id: A human-readable identifier attached to the interaction for
            logging. Ignored when a pre-built ``Interaction`` is provided.
            Defaults to ``"random"``.
        copy_sys: When True, the last stream data published under the system
            UUID is copied into the new interaction to seed its initial data.
        wait_completion: When True, the action keeps the HSM in the current
            state until all involved agents confirm completion (pedantic mode).
            Defaults to False.
        volatile: When True, the recipient is instructed not to send back any
            completion status for this interaction. Incompatible with
            ``wait_completion``. Defaults to False.
        interaction: The system ``Interaction`` injected automatically by the
            dispatch layer. Must be a system interaction or ``None``; non-system
            interactions cause an immediate ``False`` return.

    Returns:
        True if at least one target agent was reached (or if completion was
        confirmed when ``wait_completion=True``), False otherwise.

    Raises:
        GenException: If both ``wait_completion`` and ``volatile`` are True.

    Examples:
        Trigger ``process`` on an engaged agent and wait for it to finish:

        >>> await agent.send(action_name="process",
        ...                  target="<engaged>",
        ...                  wait_completion=True)
    """

    # This action can only be triggered by the system or by the developer, no ways
    if interaction is not None and not interaction.is_system():
        return False

    if wait_completion and volatile:
        raise GenException("You cannot wait for completion a volatile interaction (it will not return any updates)")

    first_run = True
    system_interaction = None
    if interaction is not None:
        system_interaction = interaction
        first_run = system_interaction.get_mark() is None

    if first_run:
        target = self.__involved_agents(target)
        sent_interaction = await self._send(None, action_name, target, action_kwargs, streams,
                                            data_samples, num_steps, timeout, from_state, to_state, callback,
                                            forced_uuid, id, copy_sys, volatile)
        if wait_completion:
            system_interaction.action_ref.set_default_timeout()  # This will make the action pedantic
            system_interaction.set_mark(sent_interaction)  # First run, Saving the interaction that was sent
            return False
        else:
            if system_interaction:
                system_interaction.action_ref.set_timeout(-1.)  # Clear timeouts to ensure it is not pedantic
            return sent_interaction is not None
    else:
        sent_interaction = system_interaction.get_mark()  # Loading the interaction that was sent at the first run
        if sent_interaction.is_completed():
            return True  # The mark will be automatically cleared by the Action class
        else:
            return False

all_sent_completed async

all_sent_completed(action_name: str | None = None) -> bool

Return True if every sent interaction with the given action name has been completed (async).

Iterates over self.im.sent and checks whether each interaction that matches action_name (or all interactions when action_name is None) has been marked as completed by the interaction manager.

This is the negation of "are there still-pending sends outstanding?" and is useful as an HSM exit predicate after a send call that did not use wait_completion=True.

Parameters:

Name Type Description Default
action_name str | None

The action name to filter by, or None to consider all sent interactions regardless of their action name. Defaults to None.

None

Returns:

Type Description
bool

True if every matching sent interaction is completed, False if at

bool

least one is still pending.

Source code in unaiverse/agent.py
@action
async def all_sent_completed(self, action_name: str | None = None) -> bool:
    """Return True if every sent interaction with the given action name has been completed (async).

    Iterates over ``self.im.sent`` and checks whether each interaction that
    matches ``action_name`` (or all interactions when ``action_name`` is
    ``None``) has been marked as completed by the interaction manager.

    This is the negation of "are there still-pending sends outstanding?" and
    is useful as an HSM exit predicate after a ``send`` call that did not use
    ``wait_completion=True``.

    Args:
        action_name: The action name to filter by, or ``None`` to consider all
            sent interactions regardless of their action name. Defaults to None.

    Returns:
        True if every matching sent interaction is completed, False if at
        least one is still pending.
    """
    for inter in self.im.sent.values():
        if action_name is None or inter.action_name == action_name:
            if not inter.is_completed():
                return False
    return True

process async

process(interaction: Interaction | None = None) -> bool

Run one inference step: read from stdin, call the processor, and write to stdout (async).

The method follows these steps in order:

  1. For human processors, __rebind_stdin_if_human is called to redirect stdin to the appropriate interaction-specific binding; if it returns False the action aborts early.
  2. Input data is read from self.stdin under the interaction's UUID. If no data is available, False is returned.
  3. The data tag is read from self.stdin. If it is missing, False is returned.
  4. hook_proc_tweak_inputs is invoked so subclasses can pre-process inputs before forwarding them to the processor.
  5. self.proc is called with the input data, passing first=True on step 0 and last=True on the final step.
  6. hook_proc_tweak_outputs is invoked so subclasses can post-process the outputs.
  7. The output data is written to self.stdout under the same UUID and data tag.

Parameters:

Name Type Description Default
interaction Interaction | None

The driving interaction injected automatically by the dispatch layer. When None, the system UUID is used. Defaults to None.

None

Returns:

Type Description
bool

True if the full inference pipeline completed successfully,

bool

False if any step failed (missing data, processor error, hook error).

Source code in unaiverse/agent.py
@action
async def process(self, interaction: Interaction | None = None) -> bool:
    """Run one inference step: read from stdin, call the processor, and write to stdout (async).

    The method follows these steps in order:

    1. For human processors, ``__rebind_stdin_if_human`` is called to
       redirect stdin to the appropriate interaction-specific binding; if it
       returns False the action aborts early.
    2. Input data is read from ``self.stdin`` under the interaction's UUID.
       If no data is available, ``False`` is returned.
    3. The data tag is read from ``self.stdin``. If it is missing, ``False``
       is returned.
    4. ``hook_proc_tweak_inputs`` is invoked so subclasses can pre-process
       inputs before forwarding them to the processor.
    5. ``self.proc`` is called with the input data, passing ``first=True``
       on step 0 and ``last=True`` on the final step.
    6. ``hook_proc_tweak_outputs`` is invoked so subclasses can post-process
       the outputs.
    7. The output data is written to ``self.stdout`` under the same UUID and
       data tag.

    Args:
        interaction: The driving interaction injected automatically by the
            dispatch layer. When ``None``, the system UUID is used. Defaults
            to None.

    Returns:
        True if the full inference pipeline completed successfully,
        False if any step failed (missing data, processor error, hook error).
    """
    # Getting UUID of the interaction (used for tags and stdout)
    if interaction is not None:
        uuid = interaction.uuid
    else:
        uuid = Custom.SYSTEM_INTERACTION_UUID

    # Possibly re-binding stdin to cope with human processor (it might also let the action fail, if needed)
    if not self.__rebind_stdin_if_human(interaction):
        return False

    # Getting input data from the input stream
    input_data = self.stdin.get(uuid=uuid, requested_by="process")  # Use kwargs

    if input_data is None:
        return False

    # Getting data tag
    data_tag = self.stdin.get_tag(uuid=uuid)

    if data_tag is None:
        return False

    # Post get actions to finalize what possibly started with the previous call to the rebind operation
    # (do this only after having received valid data from stdin)
    self.__rebind_stdin_if_human_finalizer(interaction)

    # Customizable input hook
    try:
        input_data = self.hook_proc_tweak_inputs(input_data)
        if input_data is None:
            return False
    except Exception as e:
        log.error(f"Error tweaking the processor inputs: {e}")
        return False

    # Processing data
    try:
        output_data = self.proc(*input_data, first=self.get_action_step() == 0, last=self.is_last_action_step())

        if not isinstance(output_data, tuple):
            output_data = (output_data,)
    except Exception as e:
        log.error(f"Error running the processor: {e}")
        return False

    # Customizable output hook
    try:
        output_data = self.hook_proc_tweak_outputs(output_data)
    except Exception as e:
        log.error(f"Error tweaking the processor outputs: {e}")
        return False

    # Pushing output data to the output stream
    self.stdout.set(output_data, data_tag=data_tag, uuid=uuid)  # Use kwargs for data_tag and uuid
    return True

learn async

learn(interaction: Interaction | None = None) -> bool

Run one supervised learning step: inference followed by a backward pass (async).

First verifies that the processor has learning capabilities (a configured optimiser and at least one loss function). If not, the action returns False immediately.

Then process(interaction) is called to perform the forward pass. If it succeeds, target data is read from self.stdtar under the same UUID, and proc.learn_backward is invoked with those targets. Loss values are printed to the user log.

Parameters:

Name Type Description Default
interaction Interaction | None

The driving interaction injected automatically by the dispatch layer. Must not be None; an assertion error will be raised if it is.

None

Returns:

Type Description
bool

True if both the inference step and the backward pass completed

bool

without errors, False otherwise.

Raises:

Type Description
AssertionError

If interaction is None.

AssertionError

If self.proc is None at the time the backward pass is invoked.

Source code in unaiverse/agent.py
@action
async def learn(self, interaction: Interaction | None = None) -> bool:
    """Run one supervised learning step: inference followed by a backward pass (async).

    First verifies that the processor has learning capabilities (a configured
    optimiser and at least one loss function). If not, the action returns
    ``False`` immediately.

    Then ``process(interaction)`` is called to perform the forward pass.
    If it succeeds, target data is read from ``self.stdtar`` under the same
    UUID, and ``proc.learn_backward`` is invoked with those targets. Loss
    values are printed to the user log.

    Args:
        interaction: The driving interaction injected automatically by the
            dispatch layer. Must not be ``None``; an assertion error will be
            raised if it is.

    Returns:
        True if both the inference step and the backward pass completed
        without errors, False otherwise.

    Raises:
        AssertionError: If ``interaction`` is ``None``.
        AssertionError: If ``self.proc`` is ``None`` at the time the backward
            pass is invoked.
    """
    if (self.proc_opts['optimizer'] is None
            or self.proc_opts['losses'] is None
            or len(self.proc_opts['losses']) == 0):
        log.misc(f"Current processor has no learning skills (or learning options not specified)")
        return False

    assert interaction is not None

    # Inference first
    if await self.process(interaction):

        # Getting the UUID of the interaction
        uuid = interaction.uuid

        # Getting input data from the input stream
        target_data = self.stdtar.get(uuid=uuid, requested_by="learn")  # Use kwargs

        # Learning from the last performed inference and the current targets
        try:
            assert self.proc is not None
            loss_values = self.proc.learn_backward(target_data)
        except Exception as e:
            log.error(f"Error while learning: {e}")
            return False

        # Printing
        log.user(f"Losses: {[x.item() for x in loss_values]}, "
                 f"Step: {self.get_action_step()}, Tag: {self.stdin.get_tag()}, "
                 f"Last Step: {interaction.num_steps-1}", rep=True),
        return True
    else:
        return False

show async

show(interaction: Interaction | None = None)

Log the completed interaction for inspection purposes (async).

A generic action intended to be used as a completion callback in HSM behaviours. It logs the interaction as a code string at the debug level. Subclasses frequently override this method to implement custom post-completion logic such as storing results or triggering follow-up actions.

Parameters:

Name Type Description Default
interaction Interaction | None

The interaction object that triggered this action. Injected automatically by the dispatch layer. Defaults to None.

None

Returns:

Type Description

Always True.

Source code in unaiverse/agent.py
@action
async def show(self, interaction: Interaction | None = None):
    """Log the completed interaction for inspection purposes (async).

    A generic action intended to be used as a completion callback in HSM
    behaviours. It logs the interaction as a code string at the debug level.
    Subclasses frequently override this method to implement custom
    post-completion logic such as storing results or triggering follow-up
    actions.

    Args:
        interaction: The interaction object that triggered this action.
            Injected automatically by the dispatch layer. Defaults to None.

    Returns:
        Always True.
    """
    log.debug(f"The following interaction was completed: "
              f"{interaction.to_code_str(True, True)}")
    return True

set_engaged_partner async

set_engaged_partner(agent: str | list[str] | set[str] | None, clear_found: bool = True) -> bool

Force the engagement set to a specific agent or group of agents (async).

All existing engagement partners are cleared first. If agent is not None, each element in the provided string, list, or set is added to _engaged_agents. The availability flag _available is set to True when the resulting engagement set is empty, and False otherwise.

This is a programmatic shortcut to bypass the normal send_engage / engage handshake when the pairing is already known at design time.

Parameters:

Name Type Description Default
agent str | list[str] | set[str] | None

The peer ID, list of peer IDs, or set of peer IDs to force as the active engagement partners. Pass None to only clear the existing engagements without adding new ones.

required
clear_found bool

When True (default), _found_agents is cleared before updating the engagement set. Set to False to preserve the results of a previous find_agents call. Defaults to True.

True

Returns:

Type Description
bool

Always True.

Source code in unaiverse/agent.py
@action
async def set_engaged_partner(self, agent: str | list[str] | set[str] | None, clear_found: bool = True) -> bool:
    """Force the engagement set to a specific agent or group of agents (async).

    All existing engagement partners are cleared first. If ``agent`` is not
    ``None``, each element in the provided string, list, or set is added to
    ``_engaged_agents``. The availability flag ``_available`` is set to
    ``True`` when the resulting engagement set is empty, and ``False``
    otherwise.

    This is a programmatic shortcut to bypass the normal
    ``send_engage`` / ``engage`` handshake when the pairing is already known
    at design time.

    Args:
        agent: The peer ID, list of peer IDs, or set of peer IDs to force as
            the active engagement partners. Pass ``None`` to only clear the
            existing engagements without adding new ones.
        clear_found: When True (default), ``_found_agents`` is cleared before
            updating the engagement set. Set to False to preserve the results
            of a previous ``find_agents`` call. Defaults to True.

    Returns:
        Always True.
    """
    if clear_found:
        self._found_agents.clear()
    self._engaged_agents.clear()
    if agent is not None:
        if isinstance(agent, str):
            agent = [agent]
        for a in agent:
            self._engaged_agents.add(a)
    self._available = len(self._engaged_agents) == 0
    return True

send_engage async

send_engage() -> bool

Offer engagement to all agents currently in _found_agents (async).

Sends an "engage" interaction to every peer ID in _found_agents, passing the local agent's current role string as the sender_role argument. The send is performed with wait_completion=True, so on the first call the action returns False and re-registers itself; on the second call (after all targets replied) it checks which agents actually confirmed.

For each agent that confirmed engagement: - its peer ID is added to _engaged_agents. - _available is set to False. - its peer ID is removed from _found_agents.

If no confirmed agents are found in last_sent_interaction.target, or if the underlying send call fails, the method returns False.

Returns:

Type Description
bool

True if at least one agent successfully confirmed engagement,

bool

False otherwise.

Source code in unaiverse/agent.py
@action
async def send_engage(self) -> bool:
    """Offer engagement to all agents currently in ``_found_agents`` (async).

    Sends an ``"engage"`` interaction to every peer ID in ``_found_agents``,
    passing the local agent's current role string as the ``sender_role``
    argument. The send is performed with ``wait_completion=True``, so on the
    first call the action returns False and re-registers itself; on the second
    call (after all targets replied) it checks which agents actually confirmed.

    For each agent that confirmed engagement:
    - its peer ID is added to ``_engaged_agents``.
    - ``_available`` is set to False.
    - its peer ID is removed from ``_found_agents``.

    If no confirmed agents are found in ``last_sent_interaction.target``,
    or if the underlying ``send`` call fails, the method returns False.

    Returns:
        True if at least one agent successfully confirmed engagement,
        False otherwise.
    """
    if len(self._found_agents) > 0:
        log.misc(f"Sending engagement request to {', '.join([x for x in self._found_agents])}")
    my_role_str = self._node_profile.get_dynamic_profile()['connections']['role']
    ret = await self.send(target=self._found_agents, action_name="engage",
                          action_kwargs={"sender_role": my_role_str}, wait_completion=True,
                          interaction=self.im.get_current())
    if ret:
        engaged = self.last_sent_interaction.target if self.last_sent_interaction is not None else None
        if (engaged is not None) and isinstance(engaged, list) and len(engaged) > 0:
            for eng in engaged:
                self._engaged_agents.add(eng)

                # Marking this agent as not available since it engaged with another one
                self._available = False

                # Removing the agent from the list of asked agents
                self._found_agents.discard(eng)
            return True
        else:
            return False
    else:
        return False

engage async

engage(acceptable_role: str | None = None, sender_role: str | None = None, interaction: Interaction | None = None) -> bool

Accept or reject an incoming engagement request from another agent (async).

Called on the recipient side of a send_engage / engage handshake. The method validates the requester in four steps:

  1. The requester is looked up in world_agents and world_masters; an unknown requester causes an immediate False return.
  2. sender_role must not be None.
  3. acceptable_role must not be None.
  4. If the agent is currently available (_available is True), the sender's role bitmask is compared against acceptable_role. On a match the requester is added to _engaged_agents, _available is set to False, and True is returned.

Parameters:

Name Type Description Default
acceptable_role str | None

The role the sender must have for the engagement to be accepted, as a string from ROLE_STR_TO_BITS. When set to a role without the "~" separator, only the base bits (above the first two) are compared. Defaults to None.

None
sender_role str | None

The role string advertised by the sender in the engagement request. Defaults to None.

None
interaction Interaction | None

The interaction injected automatically by the dispatch layer; used to retrieve the requester's peer ID. Defaults to None.

None

Returns:

Type Description
bool

True if the engagement was accepted and confirmed, False if the agent

bool

is unavailable, the role does not match, or any validation check fails.

Source code in unaiverse/agent.py
@action
async def engage(self, acceptable_role: str | None = None, sender_role: str | None = None,
                 interaction: Interaction | None = None) -> bool:
    """Accept or reject an incoming engagement request from another agent (async).

    Called on the recipient side of a ``send_engage`` / ``engage`` handshake.
    The method validates the requester in four steps:

    1. The requester is looked up in ``world_agents`` and ``world_masters``; an
       unknown requester causes an immediate False return.
    2. ``sender_role`` must not be None.
    3. ``acceptable_role`` must not be None.
    4. If the agent is currently available (``_available is True``), the
       sender's role bitmask is compared against ``acceptable_role``. On a
       match the requester is added to ``_engaged_agents``, ``_available`` is
       set to False, and True is returned.

    Args:
        acceptable_role: The role the sender must have for the engagement to be
            accepted, as a string from ``ROLE_STR_TO_BITS``. When set to a
            role without the ``"~"`` separator, only the base bits (above the
            first two) are compared. Defaults to None.
        sender_role: The role string advertised by the sender in the engagement
            request. Defaults to None.
        interaction: The interaction injected automatically by the dispatch
            layer; used to retrieve the requester's peer ID. Defaults to None.

    Returns:
        True if the engagement was accepted and confirmed, False if the agent
        is unavailable, the role does not match, or any validation check fails.
    """
    _requester = interaction.requester if interaction is not None else None
    log.misc(
        f"Getting engagement from {_requester}, whose role is {sender_role} (looking for {acceptable_role})")
    if _requester not in self.world_agents and _requester not in self.world_masters:
        log.error(f"Unknown agent: {_requester}")
        return False

    if sender_role is None:
        log.error(f"Unknown role of {_requester}")
        return False

    if acceptable_role is None:
        log.error(f"Invalid acceptable role: {acceptable_role}")
        return False

    # Confirming
    if self._available:
        acceptable_role_int = self.ROLE_STR_TO_BITS[acceptable_role]
        if "~" not in acceptable_role:
            sender_role_int = (self.ROLE_STR_TO_BITS[sender_role] >> 2) << 2
        else:
            sender_role_int = self.ROLE_STR_TO_BITS[sender_role]

        if acceptable_role_int == sender_role_int:
            self._engaged_agents.add(_requester)

            # Marking this agent as not available since it engaged with another one
            self._available = False
            return True
        else:
            log.error(f"Cannot engage to {_requester}")
            return False
    else:
        log.error(f"Cannot engage to {_requester}")
        return False

send_disengage async

send_disengage(send_disconnection_too: bool = False) -> bool

Send a disengagement request to all currently engaged agents (async).

Dispatches a "disengage" interaction to every peer ID in _engaged_agents, optionally requesting that each recipient also disconnects. On success, _engaged_agents is cleared immediately (the local agent does not wait for a disengage callback to confirm).

If the underlying send fails, an error is logged and False is returned without clearing the engagement set.

Parameters:

Name Type Description Default
send_disconnection_too bool

When True, the disconnect_too=True flag is forwarded to each recipient's disengage action, which will cause them to call _node_purge_fcn and disconnect. Defaults to False.

False

Returns:

Type Description
bool

True if the disengagement message was delivered to at least one

bool

engaged agent and _engaged_agents was cleared, False if the send

bool

failed.

Source code in unaiverse/agent.py
@action
async def send_disengage(self, send_disconnection_too: bool = False) -> bool:
    """Send a disengagement request to all currently engaged agents (async).

    Dispatches a ``"disengage"`` interaction to every peer ID in
    ``_engaged_agents``, optionally requesting that each recipient also
    disconnects. On success, ``_engaged_agents`` is cleared immediately (the
    local agent does not wait for a ``disengage`` callback to confirm).

    If the underlying send fails, an error is logged and ``False`` is returned
    without clearing the engagement set.

    Args:
        send_disconnection_too: When True, the ``disconnect_too=True`` flag is
            forwarded to each recipient's ``disengage`` action, which will cause
            them to call ``_node_purge_fcn`` and disconnect. Defaults to False.

    Returns:
        True if the disengagement message was delivered to at least one
        engaged agent and ``_engaged_agents`` was cleared, False if the send
        failed.
    """
    at_least_one_sent = False

    if len(self._engaged_agents) > 0:
        log.misc(f"Sending disengagement request to {', '.join([x for x in self._engaged_agents])}")
    if await self._send(target=list(self._engaged_agents), action_name="disengage",
                        action_kwargs={"disconnect_too": send_disconnection_too}):
        self._engaged_agents.clear()  # There is no "got_disengagement"
        return True
    else:
        log.error(f"Unable to send disengagement to {self._engaged_agents}")
        return False

disengage async

disengage(disconnect_too: bool = False, interaction: Interaction | None = None) -> bool

Process an incoming disengagement request from another agent (async).

Called on the recipient side of a send_disengage / disengage handshake. The requester is validated against world_agents and world_masters; an unknown or not-previously-engaged requester causes an early False return.

When validated, the requester is removed from _engaged_agents. If disconnect_too is True, the status confirmation for this interaction is suppressed (interaction.send_status = False) and _node_purge_fcn is awaited to disconnect the requester from the network. Finally, _available is recomputed as True if no other agents remain engaged.

Parameters:

Name Type Description Default
disconnect_too bool

When True, the agent that sent the disengagement is also disconnected from the network via _node_purge_fcn. Defaults to False.

False
interaction Interaction | None

The interaction injected automatically by the dispatch layer; used to retrieve the requester's peer ID. Defaults to None.

None

Returns:

Type Description
bool

True if the disengagement was processed successfully, False if the

bool

requester is unknown or was not previously engaged.

Source code in unaiverse/agent.py
@action
async def disengage(self, disconnect_too: bool = False, interaction: Interaction | None = None) -> bool:
    """Process an incoming disengagement request from another agent (async).

    Called on the recipient side of a ``send_disengage`` / ``disengage``
    handshake. The requester is validated against ``world_agents`` and
    ``world_masters``; an unknown or not-previously-engaged requester causes
    an early False return.

    When validated, the requester is removed from ``_engaged_agents``. If
    ``disconnect_too`` is True, the status confirmation for this interaction
    is suppressed (``interaction.send_status = False``) and
    ``_node_purge_fcn`` is awaited to disconnect the requester from the
    network. Finally, ``_available`` is recomputed as True if no other
    agents remain engaged.

    Args:
        disconnect_too: When True, the agent that sent the disengagement is
            also disconnected from the network via ``_node_purge_fcn``.
            Defaults to False.
        interaction: The interaction injected automatically by the dispatch
            layer; used to retrieve the requester's peer ID. Defaults to None.

    Returns:
        True if the disengagement was processed successfully, False if the
        requester is unknown or was not previously engaged.
    """

    _requester = interaction.requester if interaction is not None else None
    log.misc(f"Getting a disengagement request from {_requester}")
    if _requester not in self.world_agents and _requester not in self.world_masters:
        log.error(f"Unknown agent: {_requester}")
        return False

    if _requester not in self._engaged_agents:
        log.error(f"Not previously engaged to {_requester}")
        return False

    self._engaged_agents.discard(_requester)  # Remove if present

    if disconnect_too:
        if interaction is not None:
            interaction.send_status = False  # This will avoid sending back a confirmation for this interaction
        await self._node_purge_fcn(_requester)

    # Marking this agent as available if not engaged to any agent
    self._available = len(self._engaged_agents) == 0
    return True

disengage_all async

disengage_all() -> bool

Clear the local engagement set without notifying the remote peers (async).

Unlike send_disengage, this action does not send any message to the engaged agents. It simply empties _engaged_agents in place and restores _available to True. Use this when the remote side has already been notified (or when the communication channel is gone) and only the local bookkeeping needs to be reset.

Returns:

Type Description
bool

Always True.

Source code in unaiverse/agent.py
@action
async def disengage_all(self) -> bool:
    """Clear the local engagement set without notifying the remote peers (async).

    Unlike ``send_disengage``, this action does not send any message to the engaged agents. It
    simply empties ``_engaged_agents`` in place and restores ``_available`` to True. Use this
    when the remote side has already been notified (or when the communication channel is gone)
    and only the local bookkeeping needs to be reset.

    Returns:
        Always True.
    """
    log.misc(f"Disengaging all agents")
    self._engaged_agents = set()

    # Marking this agent as available
    self._available = True
    return True

disconnect async

disconnect(agent: str) -> bool

Disconnect a single peer from the network and clean up its local state (async).

Invokes _node_purge_fcn on the given peer ID. The purge function is responsible for removing the peer from the node's connection pool, which in turn calls remove_agent and remove_streams to clean up all bookkeeping structures associated with that agent.

Parameters:

Name Type Description Default
agent str

The peer ID of the agent to disconnect.

required

Returns:

Type Description
bool

Always True.

Source code in unaiverse/agent.py
@action
async def disconnect(self, agent: str) -> bool:
    """Disconnect a single peer from the network and clean up its local state (async).

    Invokes ``_node_purge_fcn`` on the given peer ID. The purge function is responsible
    for removing the peer from the node's connection pool, which in turn calls
    ``remove_agent`` and ``remove_streams`` to clean up all bookkeeping structures
    associated with that agent.

    Args:
        agent: The peer ID of the agent to disconnect.

    Returns:
        Always True.
    """
    log.misc(f"Disconnecting agent: {agent}")
    await self._node_purge_fcn(agent)  # This will also call remove_agent, that will call remove_streams
    return True

disconnect_by_role async

disconnect_by_role(role: str | list[str], disengage_too: bool = False) -> bool

Disconnect every connected agent whose role matches one of the specified roles (async).

The method first optionally sends a disengagement message (with disconnect_too=True) to all currently engaged agents. It then calls find_agents to populate _found_agents with peer IDs whose role matches role. For each found agent it calls _node_purge_fcn, which removes the peer from the connection pool and triggers remove_agent and remove_streams on the local state.

A deep copy of _found_agents is taken before iterating so that the purge callbacks that modify _found_agents do not interfere with the loop.

Parameters:

Name Type Description Default
role str | list[str]

A role string (e.g. "world_agent") or a list of role strings. Each value must be a key in ROLE_STR_TO_BITS.

required
disengage_too bool

When True, send_disengage(send_disconnection_too=True) is called before disconnecting, instructing the remote peers to also purge this agent. Defaults to False.

False

Returns:

Type Description
bool

Always True.

Source code in unaiverse/agent.py
@action
async def disconnect_by_role(self, role: str | list[str], disengage_too: bool = False) -> bool:
    """Disconnect every connected agent whose role matches one of the specified roles (async).

    The method first optionally sends a disengagement message (with ``disconnect_too=True``) to
    all currently engaged agents. It then calls ``find_agents`` to populate ``_found_agents``
    with peer IDs whose role matches ``role``. For each found agent it calls
    ``_node_purge_fcn``, which removes the peer from the connection pool and triggers
    ``remove_agent`` and ``remove_streams`` on the local state.

    A deep copy of ``_found_agents`` is taken before iterating so that the purge callbacks
    that modify ``_found_agents`` do not interfere with the loop.

    Args:
        role: A role string (e.g. ``"world_agent"``) or a list of role strings. Each value
            must be a key in ``ROLE_STR_TO_BITS``.
        disengage_too: When True, ``send_disengage(send_disconnection_too=True)`` is called
            before disconnecting, instructing the remote peers to also purge this agent.
            Defaults to False.

    Returns:
        Always True.
    """
    log.misc(f"Disconnecting agents with role: {role}")
    if disengage_too:
        await self.send_disengage(send_disconnection_too=True)
    if await self.find_agents(role):
        found_agents = copy.deepcopy(self._found_agents)
        for agent in found_agents:
            await self._node_purge_fcn(agent)  # This will also call remove_agent, that will call remove_streams
    return True

disconnected async

disconnected(agent: str | None = None, handshake_completed: bool = False) -> bool

Return True if all specified agents are no longer connected (async).

The set of agents to check is resolved via __involved_agents: a concrete peer ID produces a single-element list, a known wildcard such as "<valid_cmp>" expands to _valid_cmp_agents, and None defaults to _engaged_agents (falling back to _found_agents when the engagement set is empty). If the resolved set is empty, False is returned immediately.

When handshake_completed is True the check uses all_agents (agents that completed the handshake). When False, both all_agents and the raw connection pool (_node_conn.is_connected) are consulted so that peers in mid-handshake are also considered connected.

Parameters:

Name Type Description Default
agent str | None

The peer ID, wildcard string, or None to use the currently engaged or found agents. Defaults to None.

None
handshake_completed bool

When True, only agents that completed the handshake are considered still connected. Defaults to False.

False

Returns:

Type Description
bool

True if all involved agents are disconnected, False if at least one is still

bool

connected or if the resolved agent set is empty.

Source code in unaiverse/agent.py
@action
@action
async def disconnected(self, agent: str | None = None, handshake_completed: bool = False) -> bool:
    """Return True if all specified agents are no longer connected (async).

    The set of agents to check is resolved via ``__involved_agents``: a concrete peer ID
    produces a single-element list, a known wildcard such as ``"<valid_cmp>"`` expands to
    ``_valid_cmp_agents``, and ``None`` defaults to ``_engaged_agents`` (falling back to
    ``_found_agents`` when the engagement set is empty). If the resolved set is empty,
    ``False`` is returned immediately.

    When ``handshake_completed`` is True the check uses ``all_agents`` (agents that
    completed the handshake). When False, both ``all_agents`` and the raw connection pool
    (``_node_conn.is_connected``) are consulted so that peers in mid-handshake are also
    considered connected.

    Args:
        agent: The peer ID, wildcard string, or ``None`` to use the currently engaged or
            found agents. Defaults to None.
        handshake_completed: When True, only agents that completed the handshake are
            considered still connected. Defaults to False.

    Returns:
        True if all involved agents are disconnected, False if at least one is still
        connected or if the resolved agent set is empty.
    """

    # - if "agent" is a peer ID, the involved agents will be a list with one element.
    # - if "agent" is a known wildcard, as "<valid_cmp>", then involved agents will be self._valid_cmp_agents
    # - if "agent" is None, then the current agent in self._engaged_agents will be returned
    involved_agents = self.__involved_agents(agent)
    if len(involved_agents) == 0:
        return False

    log.misc(f"Checking if all these agents are not connected to me anymore: {involved_agents}")
    all_disconnected = True
    for agent in involved_agents:
        if handshake_completed:
            if agent in self.all_agents:
                all_disconnected = False
                break
        else:
            if agent in self.all_agents or self._node_conn.is_connected(agent):
                all_disconnected = False
                break
    return all_disconnected

received_some_asked_data async

received_some_asked_data(processing_fcn: str | None = None, data_type: str | None = None) -> bool

Return True if any previously asked agent has published a stream sample (async).

Iterates over all interactions that were sent (both currently active and recently completed) and, for each target agent, scans the agent's processor streams. A non-public stream that contains data under the interaction's UUID is considered a match. If processing_fcn is provided and callable on self, it is called with (agent, stream_props, data, data_tag) for every matching sample; in that case the method returns True only after collecting all hits. Without processing_fcn it returns True on the very first hit found.

Parameters:

Name Type Description Default
processing_fcn str | None

The name of an instance method on this agent that will be called with (agent_peer_id, stream_props, data, data_tag) for each received sample. If None, no processing is applied and the first hit causes an immediate True return. Defaults to None.

None
data_type str | None

When provided, only streams whose props.data_type equals this string are considered; all other streams are skipped. Defaults to None.

None

Returns:

Type Description
bool

True if at least one stream sample was received from an asked agent,

bool

False otherwise.

Source code in unaiverse/agent.py
@action
async def received_some_asked_data(self, processing_fcn: str | None = None, data_type: str | None = None) -> bool:
    """Return True if any previously asked agent has published a stream sample (async).

    Iterates over all interactions that were sent (both currently active and recently
    completed) and, for each target agent, scans the agent's processor streams. A
    non-public stream that contains data under the interaction's UUID is considered a
    match. If ``processing_fcn`` is provided and callable on ``self``, it is called with
    ``(agent, stream_props, data, data_tag)`` for every matching sample; in that case the
    method returns True only after collecting all hits. Without ``processing_fcn`` it
    returns True on the very first hit found.

    Args:
        processing_fcn: The name of an instance method on this agent that will be called
            with ``(agent_peer_id, stream_props, data, data_tag)`` for each received
            sample. If None, no processing is applied and the first hit causes an
            immediate True return. Defaults to None.
        data_type: When provided, only streams whose ``props.data_type`` equals this
            string are considered; all other streams are skipped. Defaults to None.

    Returns:
        True if at least one stream sample was received from an asked agent,
        False otherwise.
    """
    _processing_fcn = None
    if processing_fcn is not None:
        if hasattr(self, processing_fcn):
            _processing_fcn = getattr(self, processing_fcn)
            if not callable(_processing_fcn):
                _processing_fcn = None
        if _processing_fcn is None:
            log.error(f"Processing function not found: {processing_fcn}")

    got_something = False
    sent = list(self.im.sent.values()) + list(self.im.sent_recently_completed)
    for interaction in sent:
        uuid = interaction.uuid
        agents = interaction.target
        for agent in agents:
            net_hash_to_stream_dict = self.find_streams(agent, "processor", discard_owned=True)
            for stream_dict in net_hash_to_stream_dict.values():
                for stream_obj in stream_dict.values():
                    if data_type is not None and stream_obj.props.data_type != data_type:
                        continue
                    if not stream_obj.props.is_public():
                        data = stream_obj.get("received_some_asked_data", uuid=uuid)
                        data_tag = stream_obj.get_tag(uuid=uuid)

                        if data is not None:
                            if _processing_fcn is None:
                                return True
                            else:
                                got_something = True
                                _processing_fcn(agent, stream_obj.props, data, data_tag)
    return got_something

nop async

nop(message: str | None = None) -> bool

Perform no operation and optionally log a message (async).

A convenience placeholder action for HSM transitions that require an action slot but have no meaningful work to perform. If message is provided it is emitted at the misc log level.

Parameters:

Name Type Description Default
message str | None

An optional message to log. Defaults to None.

None

Returns:

Type Description
bool

Always True.

Source code in unaiverse/agent.py
@action
async def nop(self, message: str | None = None) -> bool:
    """Perform no operation and optionally log a message (async).

    A convenience placeholder action for HSM transitions that require an action slot but
    have no meaningful work to perform. If ``message`` is provided it is emitted at the
    ``misc`` log level.

    Args:
        message: An optional message to log. Defaults to None.

    Returns:
        Always True.
    """
    if message is not None:
        log.misc(message)
    return True

connected async

connected(agent: str | list[str] | None = None, handshake_completed: bool = False) -> bool

Return True if all specified agents are currently connected (async).

The set of agents to check is resolved via __involved_agents: a concrete peer ID or list of peer IDs, a wildcard such as "<valid_cmp>", or None to default to _engaged_agents (falling back to _found_agents). If the resolved set is empty, False is returned immediately.

When handshake_completed is True, a peer is considered connected only if it appears in all_agents (i.e., the handshake finished). When False, the lower-level _node_conn.is_connected is used, which also returns True for peers in mid-handshake.

Parameters:

Name Type Description Default
agent str | list[str] | None

The peer ID, list of peer IDs, wildcard string, or None to use the currently engaged or found agents. Defaults to None.

None
handshake_completed bool

When True, only agents that completed the handshake are considered connected. Defaults to False.

False

Returns:

Type Description
bool

True if all involved agents are connected, False if at least one is missing or if

bool

the resolved agent set is empty.

Source code in unaiverse/agent.py
@action
async def connected(self, agent: str | list[str] | None = None, handshake_completed: bool = False) -> bool:
    """Return True if all specified agents are currently connected (async).

    The set of agents to check is resolved via ``__involved_agents``: a concrete peer ID or
    list of peer IDs, a wildcard such as ``"<valid_cmp>"``, or ``None`` to default to
    ``_engaged_agents`` (falling back to ``_found_agents``). If the resolved set is empty,
    ``False`` is returned immediately.

    When ``handshake_completed`` is True, a peer is considered connected only if it appears
    in ``all_agents`` (i.e., the handshake finished). When False, the lower-level
    ``_node_conn.is_connected`` is used, which also returns True for peers in mid-handshake.

    Args:
        agent: The peer ID, list of peer IDs, wildcard string, or ``None`` to use the
            currently engaged or found agents. Defaults to None.
        handshake_completed: When True, only agents that completed the handshake are
            considered connected. Defaults to False.

    Returns:
        True if all involved agents are connected, False if at least one is missing or if
        the resolved agent set is empty.
    """

    # - if "agent" is a peer ID, the involved agents will be a list with one element.
    # - if "agent" is a known wildcard, as "<valid_cmp>", then involved agents will be self._valid_cmp_agents
    # - if "agent" is None, then the current agent in self._engaged_agents will be returned
    involved_agents = self.__involved_agents(agent)
    if len(involved_agents) == 0:
        return False

    log.misc(f"Checking if all these agents are connected to me now: {involved_agents}")

    for agent in involved_agents:
        if handshake_completed:
            if agent not in self.all_agents:
                return False
        else:
            if not self._node_conn.is_connected(agent):
                return False
    return True

all_asked_finished async

all_asked_finished() -> bool

Return True if every asked agent has confirmed task completion (async).

Compares _agents_who_were_asked with _agents_who_completed_what_they_were_asked. These sets are populated and cleared by actions such as send_subscribe (and related ask-style actions) as agents are contacted and as they report back. The check is a simple set equality test, so it returns True when both sets are empty as well (no agents were asked).

Returns:

Type Description
bool

True if all previously asked agents have sent a completion confirmation,

bool

False if at least one is still pending.

Source code in unaiverse/agent.py
@action
async def all_asked_finished(self) -> bool:
    """Return True if every asked agent has confirmed task completion (async).

    Compares ``_agents_who_were_asked`` with
    ``_agents_who_completed_what_they_were_asked``. These sets are populated and cleared by
    actions such as ``send_subscribe`` (and related ask-style actions) as agents are
    contacted and as they report back. The check is a simple set equality test, so it
    returns True when both sets are empty as well (no agents were asked).

    Returns:
        True if all previously asked agents have sent a completion confirmation,
        False if at least one is still pending.
    """
    return self._agents_who_were_asked == self._agents_who_completed_what_they_were_asked

all_engagements_completed async

all_engagements_completed() -> bool

Return True if no agents remain in _found_agents (async).

After send_engage is called, each agent that confirmed engagement is moved from _found_agents to _engaged_agents. This predicate becomes True when _found_agents is empty, meaning every discovered agent has either confirmed engagement or has been discarded.

Returns:

Type Description
bool

True if _found_agents is empty, False if at least one discovered agent has

bool

not yet been engaged.

Source code in unaiverse/agent.py
@action
async def all_engagements_completed(self) -> bool:
    """Return True if no agents remain in ``_found_agents`` (async).

    After ``send_engage`` is called, each agent that confirmed engagement is moved from
    ``_found_agents`` to ``_engaged_agents``. This predicate becomes True when
    ``_found_agents`` is empty, meaning every discovered agent has either confirmed
    engagement or has been discarded.

    Returns:
        True if ``_found_agents`` is empty, False if at least one discovered agent has
        not yet been engaged.
    """
    return len(self._found_agents) == 0

agents_are_waiting async

agents_are_waiting() -> bool

Return True if any found agent is still pending full registration (async).

Peers that connect to the node are held in _node_agents_waiting until the handshake and profile exchange complete and they are moved into all_agents. This action checks whether any peer ID in _found_agents is also present in _node_agents_waiting, which would mean the agent connected but has not yet been fully registered. The current waiting set is also logged at the misc level.

Returns:

Type Description
bool

True if at least one found agent is still waiting to be fully registered,

bool

False if all found agents have completed registration or if _found_agents is

bool

empty.

Source code in unaiverse/agent.py
@action
async def agents_are_waiting(self) -> bool:
    """Return True if any found agent is still pending full registration (async).

    Peers that connect to the node are held in ``_node_agents_waiting`` until the
    handshake and profile exchange complete and they are moved into ``all_agents``. This
    action checks whether any peer ID in ``_found_agents`` is also present in
    ``_node_agents_waiting``, which would mean the agent connected but has not yet been
    fully registered. The current waiting set is also logged at the ``misc`` level.

    Returns:
        True if at least one found agent is still waiting to be fully registered,
        False if all found agents have completed registration or if ``_found_agents`` is
        empty.
    """
    log.misc(f"Current set of {len(self._node_agents_waiting)} connected peer IDs non managed yet: "
             f"{list(self._node_agents_waiting.keys())}")
    for found_agent in self._found_agents:
        if found_agent in self._node_agents_waiting:
            return True
    return False

send_subscribe async

send_subscribe(agent: str | None = None, stream_hashes: list[str] | None = None, unsubscribe: bool = False) -> bool

Ask one or more remote agents to subscribe or unsubscribe from PubSub streams (async).

The involved agents are resolved via __involved_agents from agent. The stream_hashes list is normalised to full network hashes with __normalize_user_hash, the DataProps for each hash are read from known_streams, and a "subscribe" action request is sent to every involved agent carrying the stream owners and JSON-serialised stream properties. Both _agents_who_were_asked and _agents_who_completed_what_they_were_asked are reset before dispatching.

Parameters:

Name Type Description Default
agent str | None

The peer ID, wildcard string, or None to target the currently engaged or found agents. Defaults to None.

None
stream_hashes list[str] | None

A list of stream hashes (net hashes, user hashes, or "<playlist>" wildcard) identifying the PubSub streams to act on. Defaults to None.

None
unsubscribe bool

When True, asks the remote agents to unsubscribe rather than subscribe. Defaults to False.

False

Returns:

Type Description
bool

True if at least one agent was successfully contacted, False if no agents were

bool

resolved or all sends failed.

Source code in unaiverse/agent.py
@action
async def send_subscribe(self, agent: str | None = None,
                         stream_hashes: list[str] | None = None, unsubscribe: bool = False) -> bool:
    """Ask one or more remote agents to subscribe or unsubscribe from PubSub streams (async).

    The involved agents are resolved via ``__involved_agents`` from ``agent``. The
    ``stream_hashes`` list is normalised to full network hashes with
    ``__normalize_user_hash``, the ``DataProps`` for each hash are read from
    ``known_streams``, and a ``"subscribe"`` action request is sent to every involved
    agent carrying the stream owners and JSON-serialised stream properties. Both
    ``_agents_who_were_asked`` and ``_agents_who_completed_what_they_were_asked`` are
    reset before dispatching.

    Args:
        agent: The peer ID, wildcard string, or ``None`` to target the currently engaged
            or found agents. Defaults to None.
        stream_hashes: A list of stream hashes (net hashes, user hashes, or
            ``"<playlist>"`` wildcard) identifying the PubSub streams to act on.
            Defaults to None.
        unsubscribe: When True, asks the remote agents to unsubscribe rather than
            subscribe. Defaults to False.

    Returns:
        True if at least one agent was successfully contacted, False if no agents were
        resolved or all sends failed.
    """

    # - if "agent" is a peer ID, the involved agents will be a list with one element.
    # - if "agent" is a known wildcard, as "<valid_cmp>", then involved agents will be self._valid_cmp_agents
    # - if "agent" is None, then the current agent in self._engaged_agents will be returned
    involved_agents = self.__involved_agents(agent)
    log.debug(f"[send_subscribe] Involved_agents: {involved_agents}")

    if len(involved_agents) == 0:
        log.debug(f"[send_subscribe] No involved agents, action ask_gen returns False")
        return False

    # Create a copy of the stream hashes, normalizing them in the appropriate way
    stream_hashes_copy = self.__normalize_user_hash(stream_hashes)

    # Getting properties
    stream_owners = []
    stream_props = []
    for i in range(len(stream_hashes_copy)):
        stream_dict = self.known_streams[stream_hashes_copy[i]]
        peer_id = DataProps.peer_id_from_net_hash(stream_hashes_copy[i])
        for name, stream_obj in stream_dict.items():
            stream_owners.append(peer_id)
            stream_props.append(json.dumps(stream_obj.props.to_dict()))

    what = "subscribe to" if not unsubscribe else "unsubscribe from "
    log.misc(f"Asking {', '.join(involved_agents)} to {what} {stream_hashes}")
    self._agents_who_completed_what_they_were_asked = set()
    self._agents_who_were_asked = set()
    correctly_asked = []
    for agent in involved_agents:
        if await self._send(target=agent, action_name="subscribe",
                            action_kwargs={"stream_owners": stream_owners,
                                           "stream_props": stream_props,
                                           "unsubscribe": unsubscribe}):
            self._agents_who_were_asked.add(agent)
            ret = True
        else:
            what = "subscribe" if not unsubscribe else "unsubscribe"
            log.error(f"Unable to ask {agent} to {what}")
            ret = False
        log.debug(f"[send_subscribe] Asking {agent} returned {ret}")
        if ret:
            correctly_asked.append(agent)

    log.debug(f"[send_subscribe] Overall, the action send_subscribe (unsubscribe: {unsubscribe})"
              f" will return {len(correctly_asked) > 0}")
    return len(correctly_asked) > 0

subscribe async

subscribe(stream_owners: list[str] | None = None, stream_props: list[str] | None = None, unsubscribe: bool = False, interaction: Interaction | None = None) -> bool

Process a subscription or unsubscription request received from a remote agent (async).

Called on the recipient side of a send_subscribe / subscribe handshake. The requester is validated against world_agents and world_masters (when behaving in a world) or public_agents (when operating outside a world). If validation passes, each entry in stream_props is deserialized from JSON and verified to be a PubSub stream; non-PubSub streams cause an immediate False return.

For each valid (stream_owner, stream_props) pair:

  • When subscribing, add_compatible_streams is called to register the stream and set up the underlying PubSub topic subscription.
  • When unsubscribing, remove_streams is called to deregister the stream.

Parameters:

Name Type Description Default
stream_owners list[str] | None

Parallel list of peer IDs that own the streams. Must have the same length as stream_props. Defaults to None.

None
stream_props list[str] | None

Parallel list of JSON-serialized DataProps dictionaries describing the PubSub streams to subscribe to or unsubscribe from. Each entry must represent a PubSub stream. Defaults to None.

None
unsubscribe bool

When True, the streams are removed rather than added. Defaults to False.

False
interaction Interaction | None

The interaction injected automatically by the dispatch layer; used to retrieve the requester's peer ID. Defaults to None.

None

Returns:

Type Description
bool

True if all streams were processed successfully, False if the requester is

bool

unknown, any stream is not a PubSub stream, or any other validation check fails.

Source code in unaiverse/agent.py
@action
async def subscribe(self, stream_owners: list[str] | None = None, stream_props: list[str] | None = None,
                    unsubscribe: bool = False, interaction: Interaction | None = None) -> bool:
    """Process a subscription or unsubscription request received from a remote agent (async).

    Called on the recipient side of a ``send_subscribe`` / ``subscribe`` handshake. The
    requester is validated against ``world_agents`` and ``world_masters`` (when behaving
    in a world) or ``public_agents`` (when operating outside a world). If validation
    passes, each entry in ``stream_props`` is deserialized from JSON and verified to be a
    PubSub stream; non-PubSub streams cause an immediate False return.

    For each valid (``stream_owner``, ``stream_props``) pair:

    - When subscribing, ``add_compatible_streams`` is called to register the stream and
      set up the underlying PubSub topic subscription.
    - When unsubscribing, ``remove_streams`` is called to deregister the stream.

    Args:
        stream_owners: Parallel list of peer IDs that own the streams. Must have the same
            length as ``stream_props``. Defaults to None.
        stream_props: Parallel list of JSON-serialized ``DataProps`` dictionaries
            describing the PubSub streams to subscribe to or unsubscribe from. Each entry
            must represent a PubSub stream. Defaults to None.
        unsubscribe: When True, the streams are removed rather than added. Defaults to
            False.
        interaction: The interaction injected automatically by the dispatch layer; used
            to retrieve the requester's peer ID. Defaults to None.

    Returns:
        True if all streams were processed successfully, False if the requester is
        unknown, any stream is not a PubSub stream, or any other validation check fails.
    """
    log.debug(f"[subscribe] unsubscribe: {unsubscribe}, "
              f"stream_owners: {stream_owners}, stream_props: ... ({len(stream_props)} props)")

    _requester = interaction.requester
    if _requester is not None:
        if isinstance(_requester, list):
            for _r in _requester:
                if self.behaving_in_world():
                    if _r not in self.world_agents and _requester not in self.world_masters:
                        log.error(f"Unknown agent: {_r} in list {_requester} (fully skipping action subscribe)")
                        return False
                else:
                    if _r not in self.public_agents:
                        log.error(f"Unknown agent: {_r} in list {_requester} (fully skipping action subscribe)")
                        return False
        else:
            if self.behaving_in_world():
                if _requester not in self.world_agents and _requester not in self.world_masters:
                    log.error(f"Unknown agent: {_requester} (fully skipping action subscribe)")
                    return False
            else:
                if _requester not in self.public_agents:
                    log.error(f"Unknown agent: {_requester} (fully skipping action subscribe)")
                    return False
    else:
        log.error("Unknown requester (None)")
        return False

    # Building properties
    props_dicts = []
    props_objs = []
    for i in range(len(stream_props)):
        p_dict = json.loads(stream_props[i])
        props = DataProps.from_dict(p_dict)
        if props.is_pubsub():
            props_dicts.append(p_dict)
            props_objs.append(props)
        else:
            log.error(f"Expecting a pubsub stream, got a stream named {props.get_name()} "
                      f"(group is {props.get_group()}), which is not pubsub")
            return False

    # Adding new streams and subscribing (if compatible with our processor)
    for stream_owner, prop_dict, prop_obj in zip(stream_owners, props_dicts, props_objs):
        if not unsubscribe:
            if not (await self.add_compatible_streams(peer_id=stream_owner, streams_in_profile=[prop_dict],
                                                      buffered=False, public=False)):
                log.error(f"Unable to add a pubsub stream ({prop_obj.get_name()}) from agent {stream_owner}: "
                          f"no compatible streams were found")
        else:
            if not (await self.remove_streams(peer_id=stream_owner, name=prop_obj.get_name())):
                log.misc(f"Unable to unsubscribe from pubsub stream ({prop_obj.get_name()}) "
                         f"of agent {stream_owner}")
    return True

record async

record(interaction: Interaction | None = None, record_uuid: str | None = 'see_interaction_uuid') -> bool

Records interaction.num_steps samples from the interaction's stdin stream group into a new owned BufferedDataStream group and exposes it via pubsub. Multistep by virtue of the interaction's num_steps; the framework's readiness gate already pauses execution on no-fresh-data cycles.

Parameters:

Name Type Description Default
interaction Interaction | None

The driving interaction (auto-injected by the dispatch layer).

None
record_uuid str | None

Which uuid to read each source stream under. The default sentinel "see_interaction_uuid" means "use interaction.uuid" — the right choice when the source agent is a peer that knows about this interaction and publishes under it (process-style flow). Pass record_uuid=None (or any other explicit uuid string) when the source publishes under a different uuid — for instance when recording a world stream that has no per-interaction publish, in which case data lives in the source's data_by_uuid[None] slot.

'see_interaction_uuid'

Triggered from HSM transits via:

behav.add_transit("init", "next_state", action="record",
                  args={"streams": {"stdin": ["<world>:group"]},
                        "num_steps": "<wildcard_or_int>",
                        "record_uuid": None})
Source code in unaiverse/agent.py
@action
async def record(self, interaction: Interaction | None = None,
                 record_uuid: str | None = "see_interaction_uuid") -> bool:
    """Records `interaction.num_steps` samples from the interaction's stdin stream group into a
    new owned `BufferedDataStream` group and exposes it via pubsub. Multistep by virtue of the
    interaction's `num_steps`; the framework's readiness gate already pauses execution on
    no-fresh-data cycles.

    Args:
        interaction: The driving interaction (auto-injected by the dispatch layer).
        record_uuid: Which `uuid` to read each source stream under. The default sentinel
            `"see_interaction_uuid"` means "use `interaction.uuid`" — the right choice when the
            source agent is a peer that knows about this interaction and publishes under it
            (process-style flow). Pass `record_uuid=None` (or any other explicit uuid string)
            when the source publishes under a different uuid — for instance when recording a
            world stream that has no per-interaction publish, in which case data lives in the
            source's `data_by_uuid[None]` slot.

    Triggered from HSM transits via:

        behav.add_transit("init", "next_state", action="record",
                          args={"streams": {"stdin": ["<world>:group"]},
                                "num_steps": "<wildcard_or_int>",
                                "record_uuid": None})
    """
    if interaction is None:
        return False

    read_uuid = interaction.uuid if record_uuid == "see_interaction_uuid" else record_uuid
    k = self.get_action_step()

    if k == 0:
        dest = {}
        for s_obj in interaction.stream_proxy.values():
            if not isinstance(s_obj, Stream):
                continue
            assert s_obj.props is not None
            props = s_obj.props.clone()
            props.set_group("recorded" + str(self._last_recorded_stream_num))
            dest[s_obj.props.get_name()] = BufferedDataStream(props=props)
        self._last_recorded_stream_dict = dest

    dest = self._last_recorded_stream_dict

    for s_obj in interaction.stream_proxy.values():
        if not isinstance(s_obj, Stream):
            continue
        x = s_obj.get(uuid=read_uuid, requested_by="record")
        if x is None:
            return False
        assert s_obj.props is not None
        dest_stream = dest[s_obj.props.get_name()]
        if k % 10 == 0:
            log.user(f"Recording Stream: {dest_stream.props.get_group()}.{dest_stream.props.get_name()}, "
                     f"Step: {k}, Last: {interaction.num_steps - 1}")
        dest_stream.set(x, k)

    if self.is_last_action_step():
        for s_obj in dest.values():
            log.user(f"Recording Stream: {s_obj.props.get_name()}, Step: {k}, Last: {interaction.num_steps - 1}")
            s_obj.is_read_only = True
            s_obj.get(requested_by="send_stream_samples")
        self.add_streams(list(dest.values()), owned=True)
        self.update_streams_in_profile()
        await self.subscribe_to_pubsub_owned_streams()
        await self.send_profile_to_all()
        self._last_recorded_stream_num += 1
        self._last_recorded_stream_dict = None
    return True

connect_to async

connect_to(peer_id: str)
Source code in unaiverse/agent.py
@action
async def connect_to(self, peer_id: str):
    addresses = self._node_conn.get_addrs(peer_id)
    if not self._node_conn.is_connected(peer_id):
        log.misc(f"Asking to get in touch with {peer_id}...")
        peer_id = await self._node_ask_to_get_in_touch_fcn(addresses=addresses, public=False)
        if peer_id is not None:
            return True
        else:
            return False
    else:
        log.misc(f"Not-asking to get in touch with {peer_id}, "
                 f"since I am already connected to the corresponding peer...")
        return True

connect_by_role async

connect_by_role(role: str | list[str], filter_fcn: str | None = None) -> bool

Finds and attempts to connect with agents whose profiles match a specific role. It can be optionally filtered by a custom function. It returns True if at least one valid agent is found (async).

Parameters:

Name Type Description Default
role str | list[str]

The role or list of roles to search for.

required
filter_fcn str | None

The name of an optional filter function.

None

Returns:

Type Description
bool

True if at least one agent is found and a NEW connection request is made, False otherwise.

Source code in unaiverse/agent.py
@action
async def connect_by_role(self, role: str | list[str], filter_fcn: str | None = None) -> bool:
    """Finds and attempts to connect with agents whose profiles match a specific role. It can be optionally
    filtered by a custom function. It returns True if at least one valid agent is found (async).

    Args:
        role: The role or list of roles to search for.
        filter_fcn: The name of an optional filter function.

    Returns:
        True if at least one agent is found and a NEW connection request is made, False otherwise.
    """
    log.misc(f"Asking to get in touch with all agents whose role is {role}")

    self._found_agents.clear()
    role_list = role if isinstance(role, list) else [role]
    at_least_one_is_valid = False

    for role in role_list:
        role = self.ROLE_STR_TO_BITS[role]

        found_addresses1, found_peer_ids1 = self._node_conn.find_addrs_by_role(Agent.ROLE_WORLD_MASTER | role,
                                                                               return_peer_ids_too=True,
                                                                               discard_yourself=True)
        found_addresses2, found_peer_ids2 = self._node_conn.find_addrs_by_role(Agent.ROLE_WORLD_AGENT | role,
                                                                               return_peer_ids_too=True,
                                                                               discard_yourself=True)
        found_addresses = found_addresses1 + found_addresses2
        found_peer_ids = found_peer_ids1 + found_peer_ids2

        if filter_fcn is not None:
            if hasattr(self, filter_fcn):
                filter_fcn = getattr(self, filter_fcn)
                if callable(filter_fcn):
                    found_addresses, found_peer_ids = filter_fcn(found_addresses, found_peer_ids)
            else:
                log.error(f"Filter function not found: {filter_fcn}")

        log.misc(f"Found addresses ({len(found_addresses)}) with role: {role}")
        for f_addr, f_peer_id in zip(found_addresses, found_peer_ids):
            if not self._node_conn.is_connected(f_peer_id):
                log.misc(f"Asking to get in touch with {f_peer_id}...")
                peer_id = await self._node_ask_to_get_in_touch_fcn(addresses=f_addr, public=False)
                if peer_id is not None:
                    at_least_one_is_valid = True
                    self._found_agents.add(peer_id)
            else:
                log.misc(f"Not-asking to get in touch with {f_addr}, "
                         f"since I am already connected to the corresponding peer...")
                peer_id = f_peer_id
            log.misc(f"...returned {peer_id}")
    return at_least_one_is_valid

find_agents async

find_agents(role: str | list[str], engage: bool = False, handshake_completed: bool = False) -> bool

Locally searches through the agent's known peers (world and public agents) to find agents with a specific role. It populates the _found_agents set with the peer IDs of matching agents (async).

Parameters:

Name Type Description Default
role str | list[str]

The role or list of roles to search for.

required
handshake_completed bool

If True, only consider agents that have completed the handshake.

False
engage bool

If you want to force the found agents to be the ones that you are engaged with.

False

Returns:

Type Description
bool

True if at least one agent is found, False otherwise.

Source code in unaiverse/agent.py
@action
async def find_agents(self, role: str | list[str], engage: bool = False, handshake_completed: bool = False) -> bool:
    """Locally searches through the agent's known peers (world and public agents) to find agents with a specific
    role. It populates the `_found_agents` set with the peer IDs of matching agents (async).

    Args:
        role: The role or list of roles to search for.
        handshake_completed: If True, only consider agents that have completed the handshake.
        engage: If you want to force the found agents to be the ones that you are engaged with.

    Returns:
        True if at least one agent is found, False otherwise.
    """
    log.misc(f"Finding an available agent whose role is {role}")
    self._found_agents.clear()
    self._found_agents.update(self.get_agents_by_role(role, handshake_completed=handshake_completed))

    log.debug(f"[find_agents] Found these agents: {self._found_agents}")
    if engage:
        self._engaged_agents = copy.deepcopy(self._found_agents)
    return len(self._found_agents) > 0

next_pref_stream async

next_pref_stream() -> bool

Moves the internal pointer to the next stream in the list of preferred streams, which is often used for playlist-like operations. It wraps around to the beginning if it reaches the end (async).

Returns:

Type Description
bool

True if the move is successful, False if the list is empty.

Source code in unaiverse/agent.py
@action
async def next_pref_stream(self) -> bool:
    """Moves the internal pointer to the next stream in the list of preferred streams, which is often used for
    playlist-like operations. It wraps around to the beginning if it reaches the end (async).

    Returns:
        True if the move is successful, False if the list is empty.
    """
    if len(self._preferred_streams) == 0:
        log.error(f"Cannot move to the next stream because the list of preferred streams is empty")
        return False

    self._cur_preferred_stream = (self._cur_preferred_stream + 1) % len(self._preferred_streams)
    suffix = ", warning: restarted" if self._cur_preferred_stream == 0 else ""
    log.misc(
        f"Moving to the next preferred stream ({self._preferred_streams[self._cur_preferred_stream]}){suffix}")
    return True

first_pref_stream async

first_pref_stream() -> bool

Resets the internal pointer to the first stream in the list of preferred streams. This is useful for restarting a playback or processing loop (async).

Returns:

Type Description
bool

True if the move is successful, False if the list is empty.

Source code in unaiverse/agent.py
@action
async def first_pref_stream(self) -> bool:
    """Resets the internal pointer to the first stream in the list of preferred streams. This is useful for
    restarting a playback or processing loop (async).

    Returns:
        True if the move is successful, False if the list is empty.
    """
    if len(self._preferred_streams) == 0:
        log.error(f"Cannot move to the first stream because the list of preferred streams is empty")
        return False

    self._cur_preferred_stream = 0
    log.misc(f"Moving to the first preferred stream ({self._preferred_streams[self._cur_preferred_stream]})")
    return True

check_pref_stream async

check_pref_stream(what: str = 'last') -> bool

Checks the position of the current preferred stream within the list. It can check if it's the first, last, or if it has completed a full round, among other checks (async).

Parameters:

Name Type Description Default
what str

A string specifying the type of check to perform (e.g., 'first', 'last', 'last_round').

'last'

Returns:

Type Description
bool

True if the condition is met, False otherwise.

Source code in unaiverse/agent.py
@action
async def check_pref_stream(self, what: str = "last") -> bool:
    """Checks the position of the current preferred stream within the list. It can check if it's the first, last,
    or if it has completed a full round, among other checks (async).

    Args:
        what: A string specifying the type of check to perform (e.g., 'first', 'last', 'last_round').

    Returns:
        True if the condition is met, False otherwise.
    """
    valid = ['first', 'last', 'not_first', 'not_last', 'last_round', 'not_last_round', 'last_song', 'not_last_song']
    assert what in valid, f"The what argument can only be one of {valid}"

    log.misc(f"Checking if the current preferred playlist item "
             f"(id: {self._cur_preferred_stream}) is the '{what}' one")
    if what == "first":
        return self._cur_preferred_stream == 0
    elif what == "last":
        return self._cur_preferred_stream == len(self._preferred_streams) - 1
    elif what == "not_first":
        return self._cur_preferred_stream != 0
    elif what == "not_last":
        return self._cur_preferred_stream != len(self._preferred_streams) - 1
    elif what == "last_round":
        return (self._cur_preferred_stream + len(self._preferred_streams) // self._repeat >=
                len(self._preferred_streams))
    elif what == "not_last_round":
        return (self._cur_preferred_stream + len(self._preferred_streams) // self._repeat <
                len(self._preferred_streams))
    elif what == "last_song":
        num_streams_in_playlist = len(self._preferred_streams) // self._repeat
        return (self._cur_preferred_stream + 1) % num_streams_in_playlist == 0
    elif what == "not_last_song":
        num_streams_in_playlist = len(self._preferred_streams) // self._repeat
        return (self._cur_preferred_stream + 1) % num_streams_in_playlist != 0

set_pref_streams async

set_pref_streams(net_hashes: list[str], repeat: int = 1) -> bool

Fills the agent's list of preferred streams (a playlist). It can repeat the playlist a specified number of times and resolves user-provided stream hashes to their full network hashes (async).

Parameters:

Name Type Description Default
net_hashes list[str]

A list of stream hashes to add to the playlist.

required
repeat int

The number of times to repeat the playlist.

1

Returns:

Type Description
bool

Always True.

Source code in unaiverse/agent.py
@action
async def set_pref_streams(self, net_hashes: list[str], repeat: int = 1) -> bool:
    """Fills the agent's list of preferred streams (a playlist). It can repeat the playlist a specified number of
    times and resolves user-provided stream hashes to their full network hashes (async).

    Args:
        net_hashes: A list of stream hashes to add to the playlist.
        repeat: The number of times to repeat the playlist.

    Returns:
        Always True.
    """
    log.misc(f"Setting up a list of {len(net_hashes)} preferred streams")
    self._cur_preferred_stream = 0
    self._preferred_streams = []
    self._repeat = repeat
    for i in range(0, self._repeat):
        for net_hash in net_hashes:

            # We are tolerating both peer_id:name_or_group and also peer_id::ps:name_or_group
            components = net_hash.split(":")
            peer_id = components[0]
            name_or_group = components[-1]
            net_hash_to_streams = self.find_streams(peer_id=peer_id, name_or_group=name_or_group)
            for _net_hash in net_hash_to_streams.keys():
                self._preferred_streams.append(_net_hash)

    return True

evaluate async

evaluate(stream_hash: str, how: str, steps: int = 100, re_offset: bool = False, agents_to_evaluate: str | list[str] | None = None) -> bool

Evaluates the performance of agents that have completed a generation task. It compares the generated data from each agent with a local stream (which can be a ground truth or reference stream) using a specified comparison method (async).

Parameters:

Name Type Description Default
stream_hash str

The hash of the local stream to use for comparison.

required
how str

The name of the comparison method to use.

required
steps int

The number of steps to perform the evaluation.

100
re_offset bool

A boolean to indicate whether to re-offset the streams.

False
agents_to_evaluate str | list[str] | None

peer ID or list of peer IDs of agents that returned streams that were buffered, and that we are going to evaluate (default is None, meaning the agents in self._agents_who_completed_what_they_were_asked are considered).

None

Returns:

Type Description
bool

True if the evaluation is successful, False otherwise.

Source code in unaiverse/agent.py
@action
async def evaluate(self, stream_hash: str, how: str, steps: int = 100, re_offset: bool = False,
                   agents_to_evaluate: str | list[str] | None = None) -> bool:
    """Evaluates the performance of agents that have completed a generation task. It compares the generated data
    from each agent with a local stream (which can be a ground truth or reference stream) using a specified
    comparison method (async).

    Args:
        stream_hash: The hash of the local stream to use for comparison.
        how: The name of the comparison method to use.
        steps: The number of steps to perform the evaluation.
        re_offset: A boolean to indicate whether to re-offset the streams.
        agents_to_evaluate: peer ID or list of peer IDs of agents that returned streams that were buffered, and
            that we are going to evaluate (default is None, meaning the agents in
            self._agents_who_completed_what_they_were_asked are considered).

    Returns:
        True if the evaluation is successful, False otherwise.
    """
    if not self.buffer_generated_by_others:
        log.error("Cannot evaluate if not buffering data generated by others")
        return False

    if stream_hash == "<playlist>":
        net_hash = self._preferred_streams[self._cur_preferred_stream]
    else:
        net_hash = self.user_stream_hash_to_net_hash(stream_hash)

    # Agents to evaluate
    if agents_to_evaluate is None:
        agents_to_evaluate = self._engaged_agents
    else:
        if isinstance(agents_to_evaluate, str):
            agents_to_evaluate = [agents_to_evaluate]

    self._eval_results = {}
    log.debug(f"[evaluate] Agents to evaluate: {agents_to_evaluate}")
    for peer_id in agents_to_evaluate:
        if peer_id not in self.last_buffered_peer_id_to_info:
            log.error(f"Missing buffered stream for {peer_id}, cannot evaluate it (skipping)")
            continue
        received_net_hash = self.last_buffered_peer_id_to_info[peer_id]["net_hash"]
        log.misc(f"Comparing {net_hash} with {received_net_hash}")
        eval_result, ret = self.__compare_streams(net_hash_a=net_hash,
                                                  net_hash_b=received_net_hash,
                                                  how=how, steps=steps, re_offset=re_offset)
        log.misc(f"Result of the comparison: {eval_result}")
        if not ret:
            return False
        else:
            peer_id = DataProps.peer_id_from_net_hash(received_net_hash)
            self._eval_results[peer_id] = eval_result

    return True

compare_eval async

compare_eval(cmp: str, thres: float, good_if_true: bool = True) -> bool

Compares the results of a previous evaluation to a given threshold or finds the best result among all agents. It can check for minimum, maximum, or simple threshold-based comparisons, and it populates a list of 'valid' agents that passed the comparison (async).

Parameters:

Name Type Description Default
cmp str

The comparison operator (e.g., '<', '>', 'min').

required
thres float

The threshold value for comparison.

required
good_if_true bool

A boolean to invert the pass/fail logic.

True

Returns:

Type Description
bool

True if at least one agent passed the comparison, False otherwise.

Source code in unaiverse/agent.py
@action
async def compare_eval(self, cmp: str, thres: float, good_if_true: bool = True) -> bool:
    """Compares the results of a previous evaluation to a given threshold or finds the best result among all
    agents. It can check for minimum, maximum, or simple threshold-based comparisons, and it populates a list of
    'valid' agents that passed the comparison (async).

    Args:
        cmp: The comparison operator (e.g., '<', '>', 'min').
        thres: The threshold value for comparison.
        good_if_true: A boolean to invert the pass/fail logic.

    Returns:
        True if at least one agent passed the comparison, False otherwise.
    """
    assert cmp in ["<", ">", ">=", "<=", "min", "max"], f"Invalid comparison operator: {cmp}"
    assert thres >= 0. or cmp in ["min", "max"], f"Invalid evaluation threshold: {thres} (it must be in >= 0.)"

    self._valid_cmp_agents = set()
    msgs = []
    best_so_far = -1

    min_or_max = None
    leq_or_geq = None
    if cmp in ["min", "max"]:
        min_or_max = "minimum" if cmp == "min" else "maximum"
        leq_or_geq = "<=" if cmp == "min" else ">="

    if len(self._eval_results) == 0:
        log.user("No results to evaluate!")

    for agent_peer_id, eval_result in self._eval_results.items():
        if cmp not in ["min", "max"]:
            log.misc(f"Checking if result {eval_result} {cmp} {thres}, for agent {agent_peer_id}")
        else:
            if thres >= 0:
                log.misc(f"Checking if result {eval_result} is the {min_or_max} so far, "
                         f"only if {leq_or_geq} {thres}, for agent {agent_peer_id}")
            else:
                log.misc(
                    f"Checking if result {eval_result} is the {min_or_max} so far, for agent {agent_peer_id}")

        if eval_result < 0.:
            log.user(f"Invalid evaluation result: {eval_result}")
            return False

        owner_account = self.all_agents[agent_peer_id].get_static_profile()['email']
        agent_name = self.all_agents[agent_peer_id].get_static_profile()['node_name']

        if cmp != "min" and cmp != "max":
            outcome = False
            if cmp == "<" and eval_result < thres:
                outcome = True
            elif cmp == "<=" and eval_result <= thres:
                outcome = True
            elif cmp == ">" and eval_result > thres:
                outcome = True
            elif cmp == ">=" and eval_result >= thres:
                outcome = True

            if cmp[0] == "<" or cmp[0] == "<=":
                alias = 'error level' if good_if_true else 'mark'
            else:
                alias = 'mark' if good_if_true else 'error level'

            if good_if_true:
                if outcome:
                    msgs.append(f"Agent {owner_account}/{agent_name} passed with {alias} {eval_result}/{thres}")
                    self._valid_cmp_agents.add(agent_peer_id)
                else:
                    msgs.append(f"Agent {owner_account}/{agent_name} did not pass, {alias} {eval_result}/{thres}")
            else:
                if outcome:
                    msgs.append(f"Agent {owner_account}/{agent_name} did not pass, {alias} {eval_result}/{thres}")
                else:
                    msgs.append(f"Agent {owner_account}/{agent_name} passed with {alias} {eval_result}/{thres}")
                    self._valid_cmp_agents.add(agent_peer_id)
        else:
            if ((cmp == "min" and (thres < 0 or eval_result <= thres) and
                 (eval_result < best_so_far or best_so_far < 0)) or
                    (cmp == "max" and (thres < 0 or eval_result >= thres) and
                     (eval_result > best_so_far or best_so_far < 0))):
                best_so_far = eval_result
                self._valid_cmp_agents = {agent_peer_id}
                msgs = [f"The best agent is {owner_account}/{agent_name}"]
            else:
                msgs = [f"No best agent found for the considered threshold ({thres})"]

    for msg in msgs:
        log.user(msg)

    if len(self._valid_cmp_agents) == 0:

        # # cheating (hack):
        # self._valid_cmp_agents.append(agent_peer_id)
        # log.misc(", ".join(msgs))
        # return True
        return False
    else:
        return True

resolve_agent_ref

resolve_agent_ref(ref: str) -> str | None

Resolve an agent name or peer_id to a peer_id.

If ref is already a known peer_id, it is returned as-is. Otherwise, the method searches all known agents by node_name.

Parameters:

Name Type Description Default
ref str

A peer_id or agent name.

required

Returns:

Type Description
str | None

The peer_id if found, None otherwise.

Source code in unaiverse/agent.py
def resolve_agent_ref(self, ref: str) -> str | None:
    """Resolve an agent name or peer_id to a peer_id.

    If ``ref`` is already a known peer_id, it is returned as-is.
    Otherwise, the method searches all known agents by node_name.

    Args:
        ref: A peer_id or agent name.

    Returns:
        The peer_id if found, None otherwise.
    """

    # If already a valid peer_id
    if ref in self.all_agents:
        return ref

    # Search by email@node_name (if only node_name is provided, assumes it is one of your agents)
    if '@' not in ref:
        your_email = self.get_profile().get_static_profile()['email']
        ref = your_email + '@' + ref

    for peer_id, profile in self.all_agents.items():
        static_profile = profile.get_static_profile() if hasattr(profile, 'get_static_profile') else {}
        if isinstance(static_profile, dict):
            if (static_profile.get('email', '') + '@' + static_profile.get('node_name')) == ref:
                return peer_id

    return None

resolve_stream_ref

resolve_stream_ref(ref: str, public: bool) -> str | None

Resolve a stream name to a stream user or net hash (heuristic, gives priority to owned streams).

Parameters:

Name Type Description Default
ref str

A stream name, without peer IDs, just the name.

required
public bool

A boolean to inform the resolve procedure whether it must look for streams among public agents (True) or world-related agents (False).

required

Returns:

Type Description
str | None

The stream user hash or net hash if found, None otherwise.

Source code in unaiverse/agent.py
def resolve_stream_ref(self, ref: str, public: bool) -> str | None:
    """Resolve a stream name to a stream user or net hash (heuristic, gives priority to owned streams).

    Args:
        ref: A stream name, without peer IDs, just the name.
        public: A boolean to inform the resolve procedure whether it must look for streams among public agents
            (True) or world-related agents (False).

    Returns:
        The stream user hash or net hash if found, None otherwise.
    """

    # If already a valid user hash
    if ref in self.known_streams_by_user_hash:
        return ref

    # If already a valid net hash
    if ref in self.known_streams:
        return ref

    if ref == "<playlist>":
        return self._preferred_streams[self._cur_preferred_stream]

    # If it is formatted as a net hash, but was not found, then nothing to do
    if Stream.is_net_hash(ref):
        return None

    # By default, we will try to resolve by looking at owned (first) and other (then) streams
    search_owned_streams = True
    search_other_streams = True

    # If it looks like a user hash but was not found in the code above, then it could be a stream group
    # represented as a user hash, so we clear it, and we keep the group name only. Moreover, since it will also
    # contain a peer ID, we immediately know where to look
    if Stream.is_user_hash(ref):
        peer_id = DataProps.peer_id_from_user_hash(ref)
        ref = DataProps.name_from_user_hash(ref)
        search_owned_streams = peer_id == self.get_peer_id()
        search_other_streams = not search_owned_streams

    # Search your own streams first (priority)
    if search_owned_streams:
        for user_hash, stream_obj in self.owned_streams_by_user_hash.items():
            if public and not stream_obj.is_public():
                continue
            if ref == Stream.name_from_user_hash(user_hash):
                return user_hash
        for net_hash, stream_dict in self.owned_streams.items():
            if public and not next(iter(stream_dict.values())).is_public():
                continue
            if ref == Stream.name_or_group_from_net_hash(net_hash):
                return net_hash

    # Search all streams then
    if search_other_streams:
        for user_hash, stream_obj in self.known_streams_by_user_hash.items():
            if public and not stream_obj.is_public():
                continue
            if ref == Stream.name_from_user_hash(user_hash):
                return user_hash
        for net_hash, stream_dict in self.known_streams.items():
            if public and not next(iter(stream_dict.values())).is_public():
                continue
            if ref == Stream.name_or_group_from_net_hash(net_hash):
                return net_hash

    return None

action

action(func: Callable) -> Callable

Decorator that marks and wraps a coroutine as a UNaIVERSE action method.

Applying @action to an async method on an Agent subclass does three things automatically at call time:

  1. Agent reference resolution - any keyword argument whose name appears in Custom.AGENT_ARG_NAMES is resolved from a human-readable name or "email@node_name" notation to the underlying private peer ID via resolve_agent_ref. Lists and tuples of references are resolved element by element.
  2. Stream reference resolution - any keyword argument whose name appears in Custom.STREAM_ARG_NAMES is resolved from a user-friendly hash or stream name to the canonical net hash via resolve_stream_ref. Public vs. world streams are selected automatically based on whether the agent is currently behaving in a world.
  3. Exception isolation - if the wrapped coroutine raises any exception, the error is logged and False is returned. The HSM will see a failed action without crashing the agent.

Methods listed in Custom.SPECIAL_ACTION_NAMES (such as send) bypass steps 1 and 2 because they perform their own argument resolution internally.

After the decorator is applied, the resulting callable has _is_action = True set on it, which allows the framework to discover all action methods through introspection.

Parameters:

Name Type Description Default
func Callable

The async action method to wrap. It must be a coroutine function defined on an Agent subclass.

required

Returns:

Type Description
Callable

An async wrapper with the same signature and __name__ as func,

Callable

enriched with the resolution and isolation logic described above.

Examples:

>>> from unaiverse.agent import Agent, action
>>> class MyAgent(Agent):
...     @action
...     async def greet(self, agent: str, message: str) -> bool:
...         # "agent" is resolved automatically from name to peer ID
...         print(f"Hello {agent}: {message}")
...         return True
Source code in unaiverse/agent.py
def action(func: Callable) -> Callable:
    """Decorator that marks and wraps a coroutine as a UNaIVERSE action method.

    Applying ``@action`` to an async method on an ``Agent`` subclass does three
    things automatically at call time:

    1. Agent reference resolution - any keyword argument whose name appears in
       ``Custom.AGENT_ARG_NAMES`` is resolved from a human-readable name or
       ``"email@node_name"`` notation to the underlying private peer ID via
       ``resolve_agent_ref``. Lists and tuples of references are resolved element
       by element.
    2. Stream reference resolution - any keyword argument whose name appears in
       ``Custom.STREAM_ARG_NAMES`` is resolved from a user-friendly hash or stream
       name to the canonical net hash via ``resolve_stream_ref``. Public vs. world
       streams are selected automatically based on whether the agent is currently
       behaving in a world.
    3. Exception isolation - if the wrapped coroutine raises any exception, the
       error is logged and ``False`` is returned. The HSM will see a failed action
       without crashing the agent.

    Methods listed in ``Custom.SPECIAL_ACTION_NAMES`` (such as ``send``) bypass
    steps 1 and 2 because they perform their own argument resolution internally.

    After the decorator is applied, the resulting callable has
    ``_is_action = True`` set on it, which allows the framework to discover all
    action methods through introspection.

    Args:
        func: The async action method to wrap. It must be a coroutine function
            defined on an ``Agent`` subclass.

    Returns:
        An async wrapper with the same signature and ``__name__`` as ``func``,
        enriched with the resolution and isolation logic described above.

    Examples:
        >>> from unaiverse.agent import Agent, action
        >>> class MyAgent(Agent):
        ...     @action
        ...     async def greet(self, agent: str, message: str) -> bool:
        ...         # "agent" is resolved automatically from name to peer ID
        ...         print(f"Hello {agent}: {message}")
        ...         return True
    """

    @functools.wraps(func)
    async def wrapper(self, *args, **kwargs):
        self: Agent
        resolved_kwargs = {}

        if func.__name__ not in Custom.SPECIAL_ACTION_NAMES:  # Special actions (like 'send') resolve by themselves
            for key, value in kwargs.items():

                if key in Custom.AGENT_ARG_NAMES:
                    if isinstance(value, str):
                        resolved = self.resolve_agent_ref(value)
                        resolved_kwargs[key] = resolved if resolved is not None else value
                    elif isinstance(value, list | tuple):
                        _values = list(value)  # Shallow
                        for i, _value in enumerate(_values):
                            _resolved = self.resolve_agent_ref(_value)
                            if _resolved is not None:
                                _values[i] = _resolved
                        resolved_kwargs[key] = _values
                    else:
                        resolved_kwargs[key] = value

                elif key in Custom.STREAM_ARG_NAMES:
                    public = not self.behaving_in_world()
                    if isinstance(value, str):
                        resolved = self.resolve_stream_ref(value, public)
                        resolved_kwargs[key] = resolved if resolved is not None else value
                    elif isinstance(value, (list, tuple)):
                        _values = list(value)  # Shallow
                        for i, _value in enumerate(_values):
                            _resolved = self.resolve_stream_ref(_value, public)
                            if _resolved is not None:
                                _values[i] = _resolved
                        resolved_kwargs[key] = _values
                    else:
                        resolved_kwargs[key] = value

                else:
                    resolved_kwargs[key] = value
        else:
            resolved_kwargs = kwargs

        # Calling the action
        try:
            ret = await func(self, *args, **resolved_kwargs)
            if ret is None:
                log.critical(f"Invalid action '{func.__name__}' detected, it must return True/False "
                             "(it is tolerated to return non-None stuff and interpreted as True, "
                             "but not if it returns None, as happens here)")
        except Exception as e:
            log.error(f"Action '{func.__name__}' failed since it raised an exception: {e}")  # No-Debug
            # import traceback  # Debug
            # log.error(f"{traceback.format_exc()}")  # Debug
            ret = False
        return ret

    wrapper._is_action = True
    return wrapper