Skip to content

unaiverse.agent_basics

What this module does 🔴

Foundational AgentBasics class managing an agent's streams, roles, behaviors, processors, policies and profile state that the higher-level Agent builds upon.

agent_basics

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

AgentBasics

AgentBasics(proc: ModuleWrapper | Module | None, proc_inputs: list[StreamType | str] | None = None, proc_outputs: list[StreamType | str] | None = None, proc_opts: dict | None = None, behav: HybridStateMachine | None = None, behav_lone_wolf: HybridStateMachine | str = 'serve', merge_flat_stream_labels: bool = False, buffer_generated: bool = False, buffer_generated_by_others: str = 'none', world_folder: str | None = None, policy_filter: Callable | None = None, policy_filter_lone_wolf: Callable | None = None)

Base class that encapsulates agent identity, stream management, and peer bookkeeping.

AgentBasics holds everything needed to describe an agent or a world within the UNaIVERSE network: the processor configuration, all known and owned streams, the catalogue of connected peers (all_agents, world_masters, world_agents, human_agents, artificial_agents), role constants and lookup tables, and the interaction manager used to coordinate multi-step dialogues.

Neither action selection nor execution logic lives here - those are provided by the Agent subclass. World also inherits from this class and reuses the peer-tracking machinery while overriding stream and role assignment behaviour.

AgentBasics is not usually instantiated directly; use Agent or World instead.

Attributes:

Name Type Description
ROLE_PUBLIC

Integer bitmask for the public role (value 0).

ROLE_WORLD_MASTER

Integer bitmask for the world-master role (value 3).

ROLE_WORLD_AGENT

Integer bitmask for the world-agent role (value 1).

CUSTOM_ROLES

Class-level list of custom role name strings defined by the world.

ROLE_BITS_TO_STR

Mapping from integer role bitmasks to human-readable role strings.

ROLE_STR_TO_BITS

Mapping from human-readable role strings to integer bitmasks.

BADGE_TYPES

Set of valid badge type strings ("completed", "attended", "intermediate", "pro").

HUMAN

Constant string "human" used to tag human agent nodes.

all_agents dict[str, NodeProfile]

Mapping from peer ID to NodeProfile for every connected peer.

world_agents dict[str, NodeProfile]

Mapping from peer ID to NodeProfile for world-agent peers.

world_masters dict[str, NodeProfile]

Mapping from peer ID to NodeProfile for world-master peers.

human_agents dict[str, NodeProfile]

Mapping from peer ID to NodeProfile for human peers.

artificial_agents dict[str, NodeProfile]

Mapping from peer ID to NodeProfile for artificial peers.

im

The InteractionManager that tracks live interactions for this agent.

stdin

StreamsProxyWithDefaults bound to processor input streams.

stdout

StreamProxy bound to processor output streams.

stdtar

StreamProxy bound to target streams.

stdext

StreamProxy bound to environmental streams.

stats Stats | None

Optional Stats recorder; None unless explicitly set.

Examples:

AgentBasics is normally used through its Agent subclass:

>>> from unaiverse.agent import Agent
>>> import torch
>>> agent = Agent(proc=torch.nn.Linear(10, 2))
>>> print(agent.ROLE_BITS_TO_STR[Agent.ROLE_WORLD_AGENT])
world_agent

Initialize an agent or world with its processor, streams, and behavioral state machines.

This constructor wires together the processor, the two behavioral HSMs (world-scoped behav and public-network behav_lone_wolf), the stream bookkeeping dictionaries, the peer membership catalogues, and the InteractionManager. When the concrete class defines a process method (i.e., it is an Agent rather than a World), a default empty HSM is created if behav is None, and the lone-wolf HSM is loaded from the built-in public.json template when behav_lone_wolf is the string "serve" or "ask".

If the class is a World (no process method present), world_folder is mandatory and is_world is set to True.

Parameters:

Name Type Description Default
proc ModuleWrapper | Module | None

The processing module for the agent (e.g., a torch.nn.Module or a ModuleWrapper). Pass None for a world or for a no-processor agent.

required
proc_inputs list[StreamType | str] | None

List of StreamType objects describing the expected processor inputs. If None, the inputs are inferred automatically by AgentProcessorChecker. Defaults to None.

None
proc_outputs list[StreamType | str] | None

List of StreamType objects describing the expected processor outputs. If None, the outputs are inferred automatically by AgentProcessorChecker. Defaults to None.

None
proc_opts dict | None

Dictionary of extra options forwarded to the processor (e.g., loss functions under the "losses" key). Defaults to None.

None
behav HybridStateMachine | None

The HybridStateMachine describing the agent's behavior inside a world. If None, a default empty HSM is created automatically (agent only). Defaults to None.

None
behav_lone_wolf HybridStateMachine | str

The HybridStateMachine describing the agent's behavior on the public network, or one of the strings "serve" or "ask" to load a pre-designed template HSM. Defaults to "serve".

'serve'
merge_flat_stream_labels bool

If True, flat descriptor labels from all owned streams are merged into a shared superset so every stream uses the same label space. Defaults to False.

False
buffer_generated bool

If True, processor output streams are created as BufferedStream objects. Defaults to False.

False
buffer_generated_by_others str

Controls buffering of streams produced by peer agents. "one" buffers per-peer, "all" buffers across all peers, "none" disables buffering. Defaults to "none".

'none'
world_folder str | None

Path to the folder that holds the world's .json role-behaviour files and agent.py. Required when the instance is a World; ignored otherwise. Defaults to None.

None
policy_filter Callable | None

A callable (or name of an Agent method) that overrides the action selected by the HSM policy when the agent is inside a world. Defaults to None.

None
policy_filter_lone_wolf Callable | None

Same as policy_filter, but applied to the public- network HSM. Defaults to None.

None

Raises:

Type Description
GenException

If buffer_generated_by_others is not one of "one", "all", or "none".

GenException

If behav is provided but is not a HybridStateMachine instance.

GenException

If the instance is a World and world_folder is None.

ValueError

If behav_lone_wolf is a string other than "serve" or "ask".

Examples:

Create a minimal agent with a PyTorch linear layer:

>>> import torch
>>> from unaiverse.agent import Agent
>>> agent = Agent(proc=torch.nn.Linear(10, 2))

Create a world-only instance (no processor):

>>> from unaiverse.world import World
>>> world = World(world_folder="./my_world")
Source code in unaiverse/agent_basics.py
def __init__(self,
             proc: ModuleWrapper | torch.nn.Module | None,
             proc_inputs: list[StreamType | str] | None = None,
             proc_outputs: list[StreamType | str] | None = None,
             proc_opts: dict | None = None,
             behav: HybridStateMachine | None = None,
             behav_lone_wolf: HybridStateMachine | str = "serve",
             merge_flat_stream_labels: bool = False,
             buffer_generated: bool = False,
             buffer_generated_by_others: str = "none",
             world_folder: str | None = None,
             policy_filter: Callable | None = None,
             policy_filter_lone_wolf: Callable | None = None):
    """Initialize an agent or world with its processor, streams, and behavioral state machines.

    This constructor wires together the processor, the two behavioral HSMs (world-scoped
    ``behav`` and public-network ``behav_lone_wolf``), the stream bookkeeping dictionaries,
    the peer membership catalogues, and the ``InteractionManager``. When the concrete class
    defines a ``process`` method (i.e., it is an ``Agent`` rather than a ``World``), a default
    empty HSM is created if ``behav`` is ``None``, and the lone-wolf HSM is loaded from the
    built-in ``public.json`` template when ``behav_lone_wolf`` is the string ``"serve"`` or
    ``"ask"``.

    If the class is a ``World`` (no ``process`` method present), ``world_folder`` is mandatory
    and ``is_world`` is set to ``True``.

    Args:
        proc: The processing module for the agent (e.g., a ``torch.nn.Module`` or a
            ``ModuleWrapper``). Pass ``None`` for a world or for a no-processor agent.
        proc_inputs: List of ``StreamType`` objects describing the expected processor inputs.
            If ``None``, the inputs are inferred automatically by ``AgentProcessorChecker``.
            Defaults to ``None``.
        proc_outputs: List of ``StreamType`` objects describing the expected processor outputs.
            If ``None``, the outputs are inferred automatically by ``AgentProcessorChecker``.
            Defaults to ``None``.
        proc_opts: Dictionary of extra options forwarded to the processor (e.g., loss functions
            under the ``"losses"`` key). Defaults to ``None``.
        behav: The ``HybridStateMachine`` describing the agent's behavior inside a world.
            If ``None``, a default empty HSM is created automatically (agent only).
            Defaults to ``None``.
        behav_lone_wolf: The ``HybridStateMachine`` describing the agent's behavior on the
            public network, or one of the strings ``"serve"`` or ``"ask"`` to load a
            pre-designed template HSM. Defaults to ``"serve"``.
        merge_flat_stream_labels: If ``True``, flat descriptor labels from all owned streams
            are merged into a shared superset so every stream uses the same label space.
            Defaults to ``False``.
        buffer_generated: If ``True``, processor output streams are created as
            ``BufferedStream`` objects. Defaults to ``False``.
        buffer_generated_by_others: Controls buffering of streams produced by peer agents.
            ``"one"`` buffers per-peer, ``"all"`` buffers across all peers, ``"none"``
            disables buffering. Defaults to ``"none"``.
        world_folder: Path to the folder that holds the world's ``.json`` role-behaviour
            files and ``agent.py``. Required when the instance is a ``World``; ignored
            otherwise. Defaults to ``None``.
        policy_filter: A callable (or name of an ``Agent`` method) that overrides the
            action selected by the HSM policy when the agent is inside a world.
            Defaults to ``None``.
        policy_filter_lone_wolf: Same as ``policy_filter``, but applied to the public-
            network HSM. Defaults to ``None``.

    Raises:
        GenException: If ``buffer_generated_by_others`` is not one of ``"one"``, ``"all"``,
            or ``"none"``.
        GenException: If ``behav`` is provided but is not a ``HybridStateMachine`` instance.
        GenException: If the instance is a ``World`` and ``world_folder`` is ``None``.
        ValueError: If ``behav_lone_wolf`` is a string other than ``"serve"`` or ``"ask"``.

    Examples:
        Create a minimal agent with a PyTorch linear layer:

        >>> import torch
        >>> from unaiverse.agent import Agent
        >>> agent = Agent(proc=torch.nn.Linear(10, 2))

        Create a world-only instance (no processor):

        >>> from unaiverse.world import World
        >>> world = World(world_folder="./my_world")
    """

    # Agent-related features
    self.behav = behav  # HSM that describes the agent behavior in the private/world net
    self.behav_lone_wolf = behav_lone_wolf  # HSM that describes the agent behavior in the public net
    self.behav_wildcards = {}
    self.proc = proc
    self.proc_updated_since_last_save = False
    self.proc_inputs = proc_inputs
    self.proc_outputs = proc_outputs
    self.proc_opts = proc_opts
    self.proc_last_inputs = None
    self.proc_last_outputs = None
    self.proc_human_peer_id_to_interaction = {}
    self.proc_optional_inputs: list[dict] | None = None  # It must be None (do not put an empty list)
    self.proc_net_hash: dict[str, None | str] = {'public': None, 'private': None}
    self.proc_in_net_hash: dict[str, None | str] = {'public': None, 'private': None}
    self.merge_flat_stream_labels = merge_flat_stream_labels
    self.buffer_generated = buffer_generated
    self.buffer_generated_by_others = buffer_generated_by_others
    self.world_folder = world_folder
    self.policy_filter = policy_filter
    self.policy_filter_opts = {}
    self.policy_filter_lone_wolf = policy_filter_lone_wolf
    self.policy_filter_lone_wolf_opts = {}
    self.clock = clock  # Backward compatibility

    if self.buffer_generated_by_others not in {"one", "all", "none"}:
        raise GenException("Param buffer_generated_by_others can be set to 'one', 'all', or 'none' only.")

    # Streams
    self.known_streams = {}  # All streams that are known to this agent, organized by net hash
    self.owned_streams = {}  # The streams that are generated/offered by this agent, organized by net hash
    self.env_streams = {}  # The owned streams that come from environmental sources (e.g., a camera), by net hash
    self.proc_streams = {}  # The owned streams that are generated by the agent's processor, organized by net hash
    self.proc_in_streams = {}  # The owned streams that are input to the agent's processor, organized by net hash
    self.known_streams_by_user_hash = {}  # From peer_id:name to stream object
    self.owned_streams_by_user_hash = {}  # From stream name to stream object
    self.env_streams_by_user_hash = {}  # From peer_id:name to stream object
    self.proc_streams_by_user_hash = {}  # From peer_id:name to stream object
    self.proc_in_streams_by_user_hash = {}  # From peer_id:name to stream object
    self.compat_in_streams = []  # Streams compatible with the processor input (dynamically set)
    self.compat_out_streams = []  # Streams compatible with the processor output (dynamically set)

    # Agents, world masters, expected world masters
    self.all_agents: dict[str, NodeProfile] = {}  # ID -> profile (all types of agent)
    self.public_agents: dict[str, NodeProfile] = {}  # ID -> profile of agents talking in public manner
    self.world_agents: dict[str, NodeProfile] = {}  # ID -> profile of all agents living in this world
    self.world_masters: dict[str, NodeProfile] = {}  # ID -> profile of all master-agents living in this world
    self.human_agents: dict[str, NodeProfile] = {}  # ID -> profile (human agents)
    self.artificial_agents: dict[str, NodeProfile] = {}  # ID -> profile (artificial agent)
    self.world_profile = None
    self.is_world = False  # If this instance is about a world: it will be discovered at creation time
    self.last_sent_interaction = None

    # World specific attributes (they are only used if this agent is actually a world)
    self.packed_agent_files: str = ""
    self.role_to_behav = {}
    self.agent_badges: dict[str, list[dict]] = {}  # Peer_id -> collected badges for other agents
    self.role_changed_by_world: bool = False
    self.received_address_update: bool = False

    # Internal properties about the way streams are used
    self.last_buffered_peer_id_to_info = {}  # If buffering was turned on
    self.last_ref_uuid = None
    self.overridden_action_step = None
    self.locked_set_proc_input = False

    # Stats
    self.stats: Stats | None = None
    self.agent_stats_code = None

    # Information inherited from the node that hosts this agent
    self._node_name = "unk"
    self._node_conn = None
    self._node_profile = None
    self._node_out_fcn = print
    self._node_ask_to_get_in_touch_fcn = None
    self._node_purge_fcn = None
    self._node_agents_waiting = None
    self._node_identity_dir = ''

    # Utilities
    self._proc_in_streams_by_user_hash_pub = {}
    self._proc_streams_by_user_hash_pub = {}
    self._env_streams_by_user_hash_pub = {}
    self._proc_in_streams_by_user_hash_prv = {}
    self._proc_streams_by_user_hash_prv = {}
    self._env_streams_by_user_hash_prv = {}

    # Checking and filling (guessing) missing processor-related info (proc_inputs and proc_outputs)
    # and allocating a dummy processor if it was not specified (if None)
    AgentProcessorChecker(self)

    # Checking HSMs
    if not (self.behav is None or isinstance(self.behav, HybridStateMachine)):
        raise GenException("Invalid behavior: it must be either None or a HybridStateMachine")

    # The stream_hash of compatible streams for each data_props are stored in a set
    self.compat_in_streams = [set() for _ in range(len(self.proc_inputs))] \
        if self.proc_inputs is not None else None
    self.compat_out_streams = [set() for _ in range(len(self.proc_outputs))] \
        if self.proc_outputs is not None else None

    # Interaction Manager, stdin/stdout
    self.im = InteractionManager(self, max_interactions=Custom.MAX_INTERACTIONS)
    self.stdin = StreamsProxyWithDefaults(default_values=[item["default_value"] if item["has_default"] else None
                                                          for item in self.proc_optional_inputs])
    self.stdtar = StreamProxy()
    self.stdext = StreamProxy()
    self.stdout = StreamProxy()

    # Loading default public HSM
    if hasattr(self, "process"):  # Trick to distinguish if this is an Agent or a World (both sons of this class)
        self.is_world = False

        # Setting an empty HSM as default is not provided (private/world)
        if self.behav is None:
            self.behav = HybridStateMachine(self, policy=self.policy_default)
            self.behav.add_state("empty")

        if self.behav_lone_wolf is not None and isinstance(self.behav_lone_wolf, str):
            template_string = self.behav_lone_wolf
            if template_string == "serve":
                json_to_load = "public.json"
            elif template_string == "ask":
                json_to_load = "public.json"
            else:
                raise ValueError("Invalid behav_lone_wolf: it must be an HybridStateMachine or a string "
                                 "in ('serve', 'ask')")

            # Safe way to load a file packed in a pip package
            self.behav_lone_wolf = HybridStateMachine(self, policy=self.policy_default)
            utils_path = importlib.resources.files("unaiverse.utils")
            json_file = utils_path.joinpath(json_to_load)
            file = json_file.open()
            self.behav_lone_wolf.load(file)
            file.close()
            self.set_policy_filter(self.policy_filter_lone_wolf, public=True)
    else:
        self.is_world = True
        if self.world_folder is None:
            raise GenException("No world folder was indicated (world_folder argument)")

ROLE_PUBLIC class-attribute instance-attribute

ROLE_PUBLIC = 0 << 0

ROLE_WORLD_MASTER class-attribute instance-attribute

ROLE_WORLD_MASTER = 1 << 0 | 1 << 1

ROLE_WORLD_AGENT class-attribute instance-attribute

ROLE_WORLD_AGENT = 1 << 0 | 0 << 1

CUSTOM_ROLES class-attribute instance-attribute

CUSTOM_ROLES = []

ROLE_BITS_TO_STR class-attribute instance-attribute

ROLE_BITS_TO_STR = {ROLE_PUBLIC: 'public_agent', ROLE_WORLD_MASTER: 'world_master', ROLE_WORLD_AGENT: 'world_agent'}

ROLE_STR_TO_BITS class-attribute instance-attribute

ROLE_STR_TO_BITS = {'public_agent': ROLE_PUBLIC, 'world_master': ROLE_WORLD_MASTER, 'world_agent': ROLE_WORLD_AGENT}

BADGE_TYPES class-attribute instance-attribute

BADGE_TYPES = {'completed', 'attended', 'intermediate', 'pro'}

HUMAN class-attribute instance-attribute

HUMAN = 'human'

behav instance-attribute

behav = behav

behav_lone_wolf instance-attribute

behav_lone_wolf = behav_lone_wolf

behav_wildcards instance-attribute

behav_wildcards = {}

proc instance-attribute

proc = proc

proc_updated_since_last_save instance-attribute

proc_updated_since_last_save = False

proc_inputs instance-attribute

proc_inputs = proc_inputs

proc_outputs instance-attribute

proc_outputs = proc_outputs

proc_opts instance-attribute

proc_opts = proc_opts

proc_last_inputs instance-attribute

proc_last_inputs = None

proc_last_outputs instance-attribute

proc_last_outputs = None

proc_human_peer_id_to_interaction instance-attribute

proc_human_peer_id_to_interaction = {}

proc_optional_inputs instance-attribute

proc_optional_inputs: list[dict] | None = None

proc_net_hash instance-attribute

proc_net_hash: dict[str, None | str] = {'public': None, 'private': None}

proc_in_net_hash instance-attribute

proc_in_net_hash: dict[str, None | str] = {'public': None, 'private': None}

merge_flat_stream_labels instance-attribute

merge_flat_stream_labels = merge_flat_stream_labels

buffer_generated instance-attribute

buffer_generated = buffer_generated

buffer_generated_by_others instance-attribute

buffer_generated_by_others = buffer_generated_by_others

world_folder instance-attribute

world_folder = world_folder

policy_filter instance-attribute

policy_filter = policy_filter

policy_filter_opts instance-attribute

policy_filter_opts = {}

policy_filter_lone_wolf instance-attribute

policy_filter_lone_wolf = policy_filter_lone_wolf

policy_filter_lone_wolf_opts instance-attribute

policy_filter_lone_wolf_opts = {}

clock instance-attribute

clock = clock

known_streams instance-attribute

known_streams = {}

owned_streams instance-attribute

owned_streams = {}

env_streams instance-attribute

env_streams = {}

proc_streams instance-attribute

proc_streams = {}

proc_in_streams instance-attribute

proc_in_streams = {}

known_streams_by_user_hash instance-attribute

known_streams_by_user_hash = {}

owned_streams_by_user_hash instance-attribute

owned_streams_by_user_hash = {}

env_streams_by_user_hash instance-attribute

env_streams_by_user_hash = {}

proc_streams_by_user_hash instance-attribute

proc_streams_by_user_hash = {}

proc_in_streams_by_user_hash instance-attribute

proc_in_streams_by_user_hash = {}

all_agents instance-attribute

all_agents: dict[str, NodeProfile] = {}

public_agents instance-attribute

public_agents: dict[str, NodeProfile] = {}

world_agents instance-attribute

world_agents: dict[str, NodeProfile] = {}

world_masters instance-attribute

world_masters: dict[str, NodeProfile] = {}

human_agents instance-attribute

human_agents: dict[str, NodeProfile] = {}

artificial_agents instance-attribute

artificial_agents: dict[str, NodeProfile] = {}

world_profile instance-attribute

world_profile = None

is_world instance-attribute

is_world = False

last_sent_interaction instance-attribute

last_sent_interaction = None

packed_agent_files instance-attribute

packed_agent_files: str = ''

role_to_behav instance-attribute

role_to_behav = {}

agent_badges instance-attribute

agent_badges: dict[str, list[dict]] = {}

role_changed_by_world instance-attribute

role_changed_by_world: bool = False

received_address_update instance-attribute

received_address_update: bool = False

last_buffered_peer_id_to_info instance-attribute

last_buffered_peer_id_to_info = {}

last_ref_uuid instance-attribute

last_ref_uuid = None

overridden_action_step instance-attribute

overridden_action_step = None

locked_set_proc_input instance-attribute

locked_set_proc_input = False

stats instance-attribute

stats: Stats | None = None

agent_stats_code instance-attribute

agent_stats_code = None

compat_in_streams instance-attribute

compat_in_streams = [(set()) for _ in (range(len(proc_inputs)))] if proc_inputs is not None else None

compat_out_streams instance-attribute

compat_out_streams = [(set()) for _ in (range(len(proc_outputs)))] if proc_outputs is not None else None

im instance-attribute

im = InteractionManager(self, max_interactions=MAX_INTERACTIONS)

stdin instance-attribute

stdin = StreamsProxyWithDefaults(default_values=[(item['default_value'] if item['has_default'] else None) for item in (proc_optional_inputs)])

stdtar instance-attribute

stdtar = StreamProxy()

stdext instance-attribute

stdext = StreamProxy()

stdout instance-attribute

stdout = StreamProxy()

set_node_info

set_node_info(conn: NodeConn, profile: NodeProfile, ask_to_get_in_touch_fcn: Callable, purge_fcn: Callable, node_identity_dir: str, agents_waiting: dict) -> bool

Bind node-level resources to this agent after the hosting node is ready.

This method is called by the framework once the underlying network node has been fully initialised and a peer ID is available. It stores references to the connection pool, the node profile, and the various node callbacks, then resolves any placeholder peer IDs ("<public_peer_id>", "<private_peer_id>") that were embedded in stream objects registered before the node was ready. If the processor wraps a HumanModule, the node's static profile is updated to mark the node as a human node.

For world nodes, this method additionally loads and refactors all role-behaviour JSON files from world_folder (via load_and_refactor_action_file_and_behav_files), calls create_behav_files to let subclasses generate JSON files dynamically, invokes augment_roles to build the full role lookup tables, and loads any custom stats.py found in the world folder.

After all role/stream setup is complete, processor input and output streams are created (create_proc_input_streams, create_proc_output_streams), the node profile is updated with stream metadata (update_streams_in_profile), and stdin is disabled for human agents.

Parameters:

Name Type Description Default
conn NodeConn

The NodeConn connection pool manager provided by the hosting node.

required
profile NodeProfile

The NodeProfile of the hosting node.

required
ask_to_get_in_touch_fcn Callable

Callback invoked to request a rendezvous with another peer.

required
purge_fcn Callable

Callback invoked to disconnect and remove a peer from the node.

required
node_identity_dir str

Path to the folder storing the node's identity files.

required
agents_waiting dict

Dictionary of peers that connected but have not yet been evaluated for addition to all_agents.

required

Returns:

Type Description
bool

True upon successful setup.

Raises:

Type Description
GenException

If this is a world node and world_folder is None.

GenException

If no .json role files are found in world_folder.

GenException

If the role Python files and JSON files are inconsistent.

GenException

If any behaviour file cannot be loaded or saved.

GenException

If stats.py exists in the world folder but cannot be read.

Examples:

set_node_info is called internally by the UNaIVERSE node framework and is not normally invoked by user code. The following sketch shows the calling pattern:

>>> # Inside the node's initialisation routine:
>>> ok = agent.set_node_info(conn, profile, ask_fcn, purge_fcn, identity_dir, {})
>>> assert ok
Source code in unaiverse/agent_basics.py
def set_node_info(self, conn: NodeConn, profile: NodeProfile,
                  ask_to_get_in_touch_fcn: Callable, purge_fcn: Callable, node_identity_dir: str,
                  agents_waiting: dict) -> bool:
    """Bind node-level resources to this agent after the hosting node is ready.

    This method is called by the framework once the underlying network node has been
    fully initialised and a peer ID is available. It stores references to the
    connection pool, the node profile, and the various node callbacks, then resolves
    any placeholder peer IDs (``"<public_peer_id>"``, ``"<private_peer_id>"``) that
    were embedded in stream objects registered before the node was ready. If the
    processor wraps a ``HumanModule``, the node's static profile is updated to mark
    the node as a human node.

    For world nodes, this method additionally loads and refactors all role-behaviour
    JSON files from ``world_folder`` (via ``load_and_refactor_action_file_and_behav_files``),
    calls ``create_behav_files`` to let subclasses generate JSON files dynamically,
    invokes ``augment_roles`` to build the full role lookup tables, and loads any
    custom ``stats.py`` found in the world folder.

    After all role/stream setup is complete, processor input and output streams are
    created (``create_proc_input_streams``, ``create_proc_output_streams``), the node
    profile is updated with stream metadata (``update_streams_in_profile``), and stdin
    is disabled for human agents.

    Args:
        conn: The ``NodeConn`` connection pool manager provided by the hosting node.
        profile: The ``NodeProfile`` of the hosting node.
        ask_to_get_in_touch_fcn: Callback invoked to request a rendezvous with another
            peer.
        purge_fcn: Callback invoked to disconnect and remove a peer from the node.
        node_identity_dir: Path to the folder storing the node's identity files.
        agents_waiting: Dictionary of peers that connected but have not yet been
            evaluated for addition to ``all_agents``.

    Returns:
        ``True`` upon successful setup.

    Raises:
        GenException: If this is a world node and ``world_folder`` is ``None``.
        GenException: If no ``.json`` role files are found in ``world_folder``.
        GenException: If the role Python files and JSON files are inconsistent.
        GenException: If any behaviour file cannot be loaded or saved.
        GenException: If ``stats.py`` exists in the world folder but cannot be read.

    Examples:
        ``set_node_info`` is called internally by the UNaIVERSE node framework and is
        not normally invoked by user code. The following sketch shows the calling
        pattern:

        >>> # Inside the node's initialisation routine:
        >>> ok = agent.set_node_info(conn, profile, ask_fcn, purge_fcn, identity_dir, {})
        >>> assert ok
    """

    # Getting basic references
    self._node_conn = conn
    self._node_profile = profile
    self._node_name = profile.get_static_profile()['node_name']
    self._node_ask_to_get_in_touch_fcn = ask_to_get_in_touch_fcn
    self._node_purge_fcn = purge_fcn
    self._node_identity_dir = node_identity_dir
    self._node_agents_waiting = agents_waiting

    # Checking if human, marking the node
    if self.proc is not None and isinstance(self.proc.module, HumanModule):
        self._node_profile.get_static_profile()["node_type"] = self.HUMAN

    # Adding peer_id information into the already existing stream data (if any)
    # (initially marked with generic wildcards like <public_peer_id>, ...)
    net_hashes = list(self.known_streams.keys())
    for net_hash in net_hashes:
        if net_hash.startswith("<public_peer_id>") or net_hash.startswith("<private_peer_id>"):
            stream_dict = self.known_streams[net_hash]
            for stream_obj in stream_dict.values():
                self.add_stream(stream_obj, owned=True)  # This will also re-add streams using the node clock

    # Removing place-holder streams
    for peer_id in ["<public_peer_id>", "<private_peer_id>"]:
        to_remove = []
        for net_hash in self.known_streams.keys():
            if DataProps.peer_id_from_net_hash(net_hash) == peer_id:
                for _name, _stream in self.known_streams[net_hash].items():
                    to_remove.append((net_hash, _name))

        # Removing
        for (net_hash, name) in to_remove:
            user_hash = DataProps.user_hash_from_net_hash(net_hash, name)
            del self.known_streams[net_hash][name]
            if len(self.known_streams[net_hash]) == 0:
                del self.known_streams[net_hash]
            if user_hash in self.known_streams_by_user_hash:
                del self.known_streams_by_user_hash[user_hash]

            # Removing all the owned streams (environment and processor streams are of course "owned")
            if net_hash in self.owned_streams:
                if name in self.owned_streams[net_hash]:
                    del self.owned_streams[net_hash][name]
                    if len(self.owned_streams[net_hash]) == 0:
                        del self.owned_streams[net_hash]
            if user_hash in self.owned_streams_by_user_hash:
                del self.owned_streams_by_user_hash[user_hash]
            if net_hash in self.env_streams:
                if name in self.env_streams[net_hash]:
                    del self.env_streams[net_hash][name]
                    if len(self.env_streams[net_hash]) == 0:
                        del self.env_streams[net_hash]
            if user_hash in self.env_streams_by_user_hash:
                del self.env_streams_by_user_hash[user_hash]
            if net_hash in self.proc_streams:
                if name in self.proc_streams[net_hash]:
                    del self.proc_streams[net_hash][name]
                    if len(self.proc_streams[net_hash]) == 0:
                        del self.proc_streams[net_hash]
            if user_hash in self.proc_streams_by_user_hash:
                del self.proc_streams_by_user_hash[user_hash]
            if net_hash in self.proc_in_streams:
                if name in self.proc_in_streams[net_hash]:
                    del self.proc_in_streams[net_hash][name]
                    if len(self.proc_in_streams[net_hash]) == 0:
                        del self.proc_in_streams[net_hash]
            if user_hash in self.proc_in_streams_by_user_hash:
                del self.proc_in_streams_by_user_hash[user_hash]
            log.misc(f"Successfully removed known stream with network hash {net_hash}, stream name: {name}")

    # World only: loading action files and refactoring (or building) JSON files of the different roles.
    # This where the world guesses roles.
    if self.is_world:
        assert self.world_folder is not None

        # Check role-JSON files in the world folder
        role_json_tracker = FileTracker(self.world_folder, ext=".json")

        # This usually does nothing, but if you like to dynamically create JSON files, overload this method
        self.create_behav_files()

        # Loading and refactoring roles and behaviors
        self.load_and_refactor_action_file_and_behav_files(force_save=role_json_tracker.something_changed())

        # Building combination of default roles (considering public, world_agent, world_master default roles), and
        # agent/world specific roles
        self.augment_roles()

        # Loading the custom Stats code
        if self.world_folder is not None:
            stats_file = os.path.join(self.world_folder, 'stats.py')
            if os.path.exists(stats_file):
                log.misc(f"Found custom stats.py at {stats_file}")
                try:
                    with open(stats_file, 'r', encoding='utf-8') as file:
                        self.agent_stats_code = file.read()
                except Exception as e:
                    raise GenException(f'Error while reading/loading the stats.py file: {stats_file} [{e}]')

    # Creating streams associated to the processor input (right now we assume there is no need to buffer them)
    self.create_proc_input_streams(buffered=False)

    # Creating streams associated to the processor output
    self.create_proc_output_streams(buffered=self.buffer_generated)

    # Updating node profile by indicating the processor-related streams
    self.update_streams_in_profile()

    # Blocking stdin for humans
    if has_human_processor(self):
        self.set_default_stdin_binding(public=True)
        self.stdin.disable()
        self.set_default_stdin_binding(public=False)
        self.stdin.disable()

    return True

get_connection_pool_manager

get_connection_pool_manager() -> NodeConn

Return the connection pool manager bound to this agent's hosting node.

The NodeConn object provides low-level access to peer connections, role assignment, rendezvous management, and publish-subscribe channels. It is set by set_node_info and is None before that call.

Returns:

Type Description
NodeConn

The NodeConn instance associated with the hosting node, or None if

NodeConn

set_node_info has not yet been called.

Source code in unaiverse/agent_basics.py
def get_connection_pool_manager(self) -> NodeConn:
    """Return the connection pool manager bound to this agent's hosting node.

    The ``NodeConn`` object provides low-level access to peer connections, role
    assignment, rendezvous management, and publish-subscribe channels. It is set
    by ``set_node_info`` and is ``None`` before that call.

    Returns:
        The ``NodeConn`` instance associated with the hosting node, or ``None`` if
        ``set_node_info`` has not yet been called.
    """
    return self._node_conn

set_default_stdin_binding

set_default_stdin_binding(public: bool | None = None) -> None

Bind the stdin stream proxy to the default processor input streams.

Selects the correct set of processor input streams (public or private) and calls stdin.bind with the system-level interaction UUID so that the proxy always points at the agent's own streams for the current network context.

When public is None, the choice is made automatically based on behaving_in_world: private streams are used inside a world, public streams are used on the public network.

Parameters:

Name Type Description Default
public bool | None

If True, bind to the public processor-input streams. If False, bind to the private (world) processor-input streams. If None, the network context is detected automatically. Defaults to None.

None
Source code in unaiverse/agent_basics.py
def set_default_stdin_binding(self, public: bool | None = None) -> None:
    """Bind the ``stdin`` stream proxy to the default processor input streams.

    Selects the correct set of processor input streams (public or private) and
    calls ``stdin.bind`` with the system-level interaction UUID so that the proxy
    always points at the agent's own streams for the current network context.

    When ``public`` is ``None``, the choice is made automatically based on
    ``behaving_in_world``: private streams are used inside a world, public streams
    are used on the public network.

    Args:
        public: If ``True``, bind to the public processor-input streams. If ``False``,
            bind to the private (world) processor-input streams. If ``None``, the
            network context is detected automatically. Defaults to ``None``.
    """

    if (public is not None and not public) or (public is None and self.behaving_in_world()):
        self.stdin.bind(self._proc_in_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)

    if (public is not None and public) or (public is None and not self.behaving_in_world()):
        self.stdin.bind(self._proc_in_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)

set_default_stream_binding

set_default_stream_binding(public: bool | None = None) -> None

Bind all four standard stream proxies to the default streams for the current context.

Updates stdin, stdtar, stdext, and stdout in a single call, selecting public or private stream dictionaries based on public. When public is None, the context is determined automatically via behaving_in_world.

stdtar is always bound to an empty dictionary (target streams are not available by default and must be explicitly set by the agent).

Parameters:

Name Type Description Default
public bool | None

If True, bind all proxies to the public stream dictionaries. If False, bind to the private (world) dictionaries. If None, the network context is detected automatically. Defaults to None.

None
Source code in unaiverse/agent_basics.py
def set_default_stream_binding(self, public: bool | None = None) -> None:
    """Bind all four standard stream proxies to the default streams for the current context.

    Updates ``stdin``, ``stdtar``, ``stdext``, and ``stdout`` in a single call,
    selecting public or private stream dictionaries based on ``public``. When
    ``public`` is ``None``, the context is determined automatically via
    ``behaving_in_world``.

    ``stdtar`` is always bound to an empty dictionary (target streams are not
    available by default and must be explicitly set by the agent).

    Args:
        public: If ``True``, bind all proxies to the public stream dictionaries. If
            ``False``, bind to the private (world) dictionaries. If ``None``, the
            network context is detected automatically. Defaults to ``None``.
    """

    if (public is not None and not public) or (public is None and self.behaving_in_world()):
        self.stdin.bind(self._proc_in_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdtar.bind({}, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdext.bind(self._env_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdout.bind(self._proc_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)

    if (public is not None and public) or (public is None and not self.behaving_in_world()):
        self.stdin.bind(self._proc_in_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdtar.bind({}, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdext.bind(self._env_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdout.bind(self._proc_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)

get_default_stdin_bindings

get_default_stdin_bindings(public: bool | None = None) -> dict

Return the stream dictionary that is bound by default to stdin.

The returned dictionary maps user-hash strings ("peer_id:name") to Stream objects and corresponds to the agent's own processor input streams for the requested network context.

Parameters:

Name Type Description Default
public bool | None

If True, return the public processor-input stream dictionary. If False, return the private (world) processor-input stream dictionary. If None, the context is determined automatically via behaving_in_world. Defaults to None.

None

Returns:

Type Description
dict

A dictionary mapping user-hash strings to Stream objects, or an empty

dict

dictionary if neither branch is matched.

Source code in unaiverse/agent_basics.py
def get_default_stdin_bindings(self, public: bool | None = None) -> dict:
    """Return the stream dictionary that is bound by default to ``stdin``.

    The returned dictionary maps user-hash strings (``"peer_id:name"``) to
    ``Stream`` objects and corresponds to the agent's own processor input streams
    for the requested network context.

    Args:
        public: If ``True``, return the public processor-input stream dictionary.
            If ``False``, return the private (world) processor-input stream dictionary.
            If ``None``, the context is determined automatically via ``behaving_in_world``.
            Defaults to ``None``.

    Returns:
        A dictionary mapping user-hash strings to ``Stream`` objects, or an empty
        dictionary if neither branch is matched.
    """
    if (public is not None and not public) or (public is None and self.behaving_in_world()):
        return self._proc_in_streams_by_user_hash_prv

    if (public is not None and public) or (public is None and not self.behaving_in_world()):
        return self._proc_in_streams_by_user_hash_pub
    return {}

get_default_stream_bindings

get_default_stream_bindings(public: bool | None = None) -> tuple[dict, dict, dict, dict]

Return the four stream dictionaries that are bound by default to the standard proxies.

Each returned dictionary maps user-hash strings to Stream objects. The four elements correspond to stdin, stdtar, stdext, and stdout respectively. stdtar is always an empty dictionary.

Parameters:

Name Type Description Default
public bool | None

If True, return the public stream dictionaries. If False, return the private (world) stream dictionaries. If None, the context is determined automatically via behaving_in_world. Defaults to None.

None

Returns:

Type Description
dict

A four-tuple (stdin_streams, stdtar_streams, stdext_streams, stdout_streams)

dict

where each element is a dictionary mapping user-hash strings to Stream

dict

objects. All four dictionaries are empty if neither branch is matched.

Source code in unaiverse/agent_basics.py
def get_default_stream_bindings(self, public: bool | None = None) -> tuple[dict, dict, dict, dict]:
    """Return the four stream dictionaries that are bound by default to the standard proxies.

    Each returned dictionary maps user-hash strings to ``Stream`` objects. The four
    elements correspond to ``stdin``, ``stdtar``, ``stdext``, and ``stdout``
    respectively. ``stdtar`` is always an empty dictionary.

    Args:
        public: If ``True``, return the public stream dictionaries. If ``False``,
            return the private (world) stream dictionaries. If ``None``, the context
            is determined automatically via ``behaving_in_world``. Defaults to ``None``.

    Returns:
        A four-tuple ``(stdin_streams, stdtar_streams, stdext_streams, stdout_streams)``
        where each element is a dictionary mapping user-hash strings to ``Stream``
        objects. All four dictionaries are empty if neither branch is matched.
    """
    if (public is not None and not public) or (public is None and self.behaving_in_world()):
        return (self._proc_in_streams_by_user_hash_prv, {},
                self._env_streams_by_user_hash_prv, self._proc_streams_by_user_hash_prv)

    if (public is not None and public) or (public is None and not self.behaving_in_world()):
        return (self._proc_in_streams_by_user_hash_pub, {},
                self._env_streams_by_user_hash_pub, self._proc_streams_by_user_hash_pub)
    return {}, {}, {}, {}

get_proc_output_net_hash

get_proc_output_net_hash(public: bool = True) -> str | None

Return the network hash of the processor output streams for the requested context.

The network hash is set by create_proc_output_streams and encodes the owning peer ID, the stream group ("processor"), and the transport type. It is None if no output streams have been created yet.

Parameters:

Name Type Description Default
public bool

If True, return the hash for the public peer's output streams. If False, return the hash for the private (world) peer's output streams. Defaults to True.

True

Returns:

Type Description
str | None

The network hash string, or None if the output streams have not yet been

str | None

created for the requested context.

Source code in unaiverse/agent_basics.py
def get_proc_output_net_hash(self, public: bool = True) -> str | None:
    """Return the network hash of the processor output streams for the requested context.

    The network hash is set by ``create_proc_output_streams`` and encodes the
    owning peer ID, the stream group (``"processor"``), and the transport type.
    It is ``None`` if no output streams have been created yet.

    Args:
        public: If ``True``, return the hash for the public peer's output streams.
            If ``False``, return the hash for the private (world) peer's output streams.
            Defaults to ``True``.

    Returns:
        The network hash string, or ``None`` if the output streams have not yet been
        created for the requested context.
    """
    return self.proc_net_hash['public'] if public else self.proc_net_hash['private']

get_proc_input_net_hash

get_proc_input_net_hash(public: bool = True) -> str | None

Return the network hash of the processor input streams for the requested context.

The network hash is set by create_proc_input_streams and encodes the owning peer ID, the stream group ("processor_in"), and the transport type. It is None if no input streams have been created yet.

Parameters:

Name Type Description Default
public bool

If True, return the hash for the public peer's input streams. If False, return the hash for the private (world) peer's input streams. Defaults to True.

True

Returns:

Type Description
str | None

The network hash string, or None if the input streams have not yet been

str | None

created for the requested context.

Source code in unaiverse/agent_basics.py
def get_proc_input_net_hash(self, public: bool = True) -> str | None:
    """Return the network hash of the processor input streams for the requested context.

    The network hash is set by ``create_proc_input_streams`` and encodes the
    owning peer ID, the stream group (``"processor_in"``), and the transport type.
    It is ``None`` if no input streams have been created yet.

    Args:
        public: If ``True``, return the hash for the public peer's input streams.
            If ``False``, return the hash for the private (world) peer's input streams.
            Defaults to ``True``.

    Returns:
        The network hash string, or ``None`` if the input streams have not yet been
        created for the requested context.
    """
    return self.proc_in_net_hash['public'] if public else self.proc_in_net_hash['private']

prepare_stdin_if_human

prepare_stdin_if_human(public: bool, peer_id: str)

Prepare the stdin binding for a human-processor agent before a processing step.

Resets the stdin proxy to the default binding for the given network context, then cleans up any completed interactions from proc_human_peer_id_to_interaction. Returns the interaction UUID that should be used to read from stdin: if an active (not completed) interaction exists for the effective peer, its UUID is returned so that the ongoing dialogue is resumed; otherwise the system-level UUID (Custom.SYSTEM_INTERACTION_UUID) is returned.

When operating inside a world (public is False), the peer ID used for interaction lookup is remapped to the world's peer ID via _node_conn.get_world_peer_id.

Parameters:

Name Type Description Default
public bool

True if the agent is currently operating on the public network; False if it is inside a world.

required
peer_id str

The peer ID of the remote party that initiated the current exchange. Remapped to the world peer ID when public is False.

required

Returns:

Type Description

The UUID string of the active interaction for the given peer, or

Custom.SYSTEM_INTERACTION_UUID if no active interaction exists.

Source code in unaiverse/agent_basics.py
def prepare_stdin_if_human(self, public: bool, peer_id: str):
    """Prepare the ``stdin`` binding for a human-processor agent before a processing step.

    Resets the ``stdin`` proxy to the default binding for the given network context,
    then cleans up any completed interactions from ``proc_human_peer_id_to_interaction``.
    Returns the interaction UUID that should be used to read from ``stdin``: if an
    active (not completed) interaction exists for the effective peer, its UUID is
    returned so that the ongoing dialogue is resumed; otherwise the system-level
    UUID (``Custom.SYSTEM_INTERACTION_UUID``) is returned.

    When operating inside a world (``public`` is ``False``), the peer ID used for
    interaction lookup is remapped to the world's peer ID via
    ``_node_conn.get_world_peer_id``.

    Args:
        public: ``True`` if the agent is currently operating on the public network;
            ``False`` if it is inside a world.
        peer_id: The peer ID of the remote party that initiated the current exchange.
            Remapped to the world peer ID when ``public`` is ``False``.

    Returns:
        The UUID string of the active interaction for the given peer, or
        ``Custom.SYSTEM_INTERACTION_UUID`` if no active interaction exists.
    """
    self.set_default_stdin_binding(public)

    # Zero-step: if we are in a world, the peer ID used for indexing the interaction cache is the world peer ID
    peer_id = peer_id if public else self._node_conn.get_world_peer_id()

    # First of all, clearing residuals of interactions that were completed in the past
    to_remove = []
    for _peer_id, _interaction in self.proc_human_peer_id_to_interaction.items():
        if _interaction.is_completed():
            to_remove.append(_peer_id)
    for _peer_id in to_remove:
        del self.proc_human_peer_id_to_interaction[_peer_id]

    # Getting existing interaction or defaulting to the system one
    if peer_id in self.proc_human_peer_id_to_interaction:
        interaction = self.proc_human_peer_id_to_interaction[peer_id]
        return interaction.uuid
    else:
        return Custom.SYSTEM_INTERACTION_UUID

generate_id staticmethod

generate_id() -> str

Generate a random 8-character hexadecimal ID string.

Uses uuid4 as the entropy source and returns the first eight characters of its hex representation. Suitable for generating short unique identifiers for interactions, tags, or other framework objects where full UUID length is not required.

Returns:

Type Description
str

An 8-character lowercase hexadecimal string (e.g., "3f7a2c1b").

Examples:

>>> uid = AgentBasics.generate_id()
>>> len(uid)
8
>>> all(c in '0123456789abcdef' for c in uid)
True
Source code in unaiverse/agent_basics.py
@staticmethod
def generate_id() -> str:
    """Generate a random 8-character hexadecimal ID string.

    Uses ``uuid4`` as the entropy source and returns the first eight characters of
    its hex representation. Suitable for generating short unique identifiers for
    interactions, tags, or other framework objects where full UUID length is not
    required.

    Returns:
        An 8-character lowercase hexadecimal string (e.g., ``"3f7a2c1b"``).

    Examples:
        >>> uid = AgentBasics.generate_id()
        >>> len(uid)
        8
        >>> all(c in '0123456789abcdef' for c in uid)
        True
    """
    return _uuid.uuid4().hex[0:8]

inject_received_interaction

inject_received_interaction(sender: str, interaction_dict: dict)

Register an interaction received from a remote peer into the local state machine and interaction manager.

Deserialises interaction_dict into an Interaction object, determines whether the sender is on the public network or a world peer, and selects the corresponding HSM (behav_lone_wolf or behav). If the interaction carries an action name, it is enqueued in the HSM via request_action; otherwise it is registered directly with the InteractionManager via im.register_received.

Parameters:

Name Type Description Default
sender str

The peer ID of the remote agent that sent the interaction.

required
interaction_dict dict

A dictionary representation of an Interaction, as produced by Interaction.to_dict.

required

Raises:

Type Description
AssertionError

If the resolved behavioral HSM is not a HybridStateMachine instance.

Note

If request_action returns False (the action cannot be enqueued), an error is logged and the interaction is silently dropped rather than raising an exception.

Source code in unaiverse/agent_basics.py
def inject_received_interaction(self, sender: str, interaction_dict: dict):
    """Register an interaction received from a remote peer into the local state machine and interaction manager.

    Deserialises ``interaction_dict`` into an ``Interaction`` object, determines
    whether the sender is on the public network or a world peer, and selects the
    corresponding HSM (``behav_lone_wolf`` or ``behav``). If the interaction carries
    an action name, it is enqueued in the HSM via ``request_action``; otherwise it is
    registered directly with the ``InteractionManager`` via ``im.register_received``.

    Args:
        sender: The peer ID of the remote agent that sent the interaction.
        interaction_dict: A dictionary representation of an ``Interaction``, as
            produced by ``Interaction.to_dict``.

    Raises:
        AssertionError: If the resolved behavioral HSM is not a ``HybridStateMachine``
            instance.

    Note:
        If ``request_action`` returns ``False`` (the action cannot be enqueued), an
        error is logged and the interaction is silently dropped rather than raising
        an exception.
    """

    # Creating the interaction object
    interaction = Interaction.from_dict(interaction_dict)

    # Marking
    public = sender in self.public_agents

    # Also register with the HSM for action selection
    behav = self.behav_lone_wolf if public else self.behav
    assert isinstance(behav, HybridStateMachine)

    if (interaction.action_name is not None and len(interaction.action_name) > 0 and
            not behav.request_action(interaction)):  # Here the interaction links to action (IM does not do this!)
        log.error(f"Cannot enqueue a received interaction!")
    else:
        # Register with the InteractionManager
        log.misc(f"Registering received interaction for action "
                 f"{interaction.action_name}")
        self.im.register_received(interaction, public)

build_augmented_roles_dictionaries staticmethod

build_augmented_roles_dictionaries(custom_roles: list[str] | set[str]) -> tuple[dict[int, str], dict[str, int]]

Build complete role lookup tables by combining default roles with custom roles.

Starts from the three built-in roles ("public_agent", "world_agent", "world_master") and appends each custom role string as a new bitmask entry starting at bit 2. For every custom role, two augmented (compound) roles are generated: one prefixed with "world_agent~" and one with "world_master~". Compound roles are formed by OR-ing the base role bits with the custom role bits, and their string representation uses "~" as a separator (e.g., "world_agent~student").

The resulting dictionaries are also side-effect-written into AgentBasics.ROLE_STR_TO_BITS for global accessibility.

Parameters:

Name Type Description Default
custom_roles list[str] | set[str]

An ordered list (or set) of custom role name strings. The assignment of bitmasks is index-based, so a list is preferred when reproducibility matters. The maximum supported length is 30.

required

Returns:

Type Description
dict[int, str]

A two-tuple (role_bits_to_str, role_str_to_bits) where:

dict[str, int]
  • role_bits_to_str maps integer bitmasks to human-readable role strings.
tuple[dict[int, str], dict[str, int]]
  • role_str_to_bits maps human-readable role strings to integer bitmasks.
Note

Passing more than 30 custom roles logs a critical warning but does not raise an exception; the extra roles are assigned bitmasks that may overlap with bits used by other parts of the framework.

Source code in unaiverse/agent_basics.py
@staticmethod
def build_augmented_roles_dictionaries(custom_roles: list[str] | set[str]) -> tuple[dict[int, str], dict[str, int]]:
    """Build complete role lookup tables by combining default roles with custom roles.

    Starts from the three built-in roles (``"public_agent"``, ``"world_agent"``,
    ``"world_master"``) and appends each custom role string as a new bitmask entry
    starting at bit 2. For every custom role, two augmented (compound) roles are
    generated: one prefixed with ``"world_agent~"`` and one with ``"world_master~"``.
    Compound roles are formed by OR-ing the base role bits with the custom role bits,
    and their string representation uses ``"~"`` as a separator
    (e.g., ``"world_agent~student"``).

    The resulting dictionaries are also side-effect-written into
    ``AgentBasics.ROLE_STR_TO_BITS`` for global accessibility.

    Args:
        custom_roles: An ordered list (or set) of custom role name strings. The
            assignment of bitmasks is index-based, so a list is preferred when
            reproducibility matters. The maximum supported length is 30.

    Returns:
        A two-tuple ``(role_bits_to_str, role_str_to_bits)`` where:
        - ``role_bits_to_str`` maps integer bitmasks to human-readable role strings.
        - ``role_str_to_bits`` maps human-readable role strings to integer bitmasks.

    Note:
        Passing more than 30 custom roles logs a critical warning but does not raise
        an exception; the extra roles are assigned bitmasks that may overlap with
        bits used by other parts of the framework.
    """

    role_bits_to_str = {k: v for k, v in AgentBasics.ROLE_BITS_TO_STR.items()}
    role_str_to_bits = {k: v for k, v in AgentBasics.ROLE_STR_TO_BITS.items()}

    # Both Agent and World: Fusing basic roles and custom roles
    if len(custom_roles) > 0:
        if len(custom_roles) > 30:  # Safe value, could be increased
            log.critical("Maximum number of custom role overcame (max is 30)")
        for i, role_str in enumerate(custom_roles):
            role_int = 1 << (i + 2)  # 000000100, then 00001000, etc. (recall that the first two bits are reserved)
            role_str_to_bits[role_str] = role_int
            role_bits_to_str[role_int] = role_str
            AgentBasics.ROLE_STR_TO_BITS[role_str] = role_int

    # Both Agent and World: Augmenting roles
    roles_not_to_be_augmented = {AgentBasics.ROLE_PUBLIC,
                                 AgentBasics.ROLE_WORLD_AGENT,
                                 AgentBasics.ROLE_WORLD_MASTER}
    role_bits_to_str_original = {k: v for k, v in role_bits_to_str.items()}
    for role_int, role_str in role_bits_to_str_original.items():
        if role_int not in roles_not_to_be_augmented and "~" not in role_str:
            for role_base_int in {AgentBasics.ROLE_WORLD_AGENT, AgentBasics.ROLE_WORLD_MASTER}:
                augmented_role_int = role_base_int | role_int
                augmented_role_str = role_bits_to_str[role_base_int] + "~" + role_str
                if augmented_role_str not in role_str_to_bits:
                    role_str_to_bits[augmented_role_str] = augmented_role_int
                    role_bits_to_str[augmented_role_int] = augmented_role_str

    return role_bits_to_str, role_str_to_bits

augment_roles

augment_roles(role_bits_to_str: dict[int, str] | None = None, role_str_to_bits: dict[str, int] | None = None)

Update the instance-level role lookup tables with default and custom roles.

If pre-built dictionaries are not provided, build_augmented_roles_dictionaries is called with self.CUSTOM_ROLES to generate them. The class-level ROLE_BITS_TO_STR and ROLE_STR_TO_BITS dictionaries are then cleared and repopulated in place so that all references to these objects (held by the HSMs and the network node) see the updated content immediately.

This method is called automatically by set_node_info for world nodes and should not normally be invoked by user code.

Parameters:

Name Type Description Default
role_bits_to_str dict[int, str] | None

A pre-built mapping from integer bitmasks to role strings, as returned by build_augmented_roles_dictionaries. If None, the mapping is generated from self.CUSTOM_ROLES. Defaults to None.

None
role_str_to_bits dict[str, int] | None

A pre-built mapping from role strings to integer bitmasks. Must be provided together with role_bits_to_str or both should be None. Defaults to None.

None
Source code in unaiverse/agent_basics.py
def augment_roles(self,
                  role_bits_to_str: dict[int, str] | None = None,
                  role_str_to_bits: dict[str, int] | None = None):
    """Update the instance-level role lookup tables with default and custom roles.

    If pre-built dictionaries are not provided, ``build_augmented_roles_dictionaries``
    is called with ``self.CUSTOM_ROLES`` to generate them. The class-level
    ``ROLE_BITS_TO_STR`` and ``ROLE_STR_TO_BITS`` dictionaries are then cleared and
    repopulated in place so that all references to these objects (held by the HSMs and
    the network node) see the updated content immediately.

    This method is called automatically by ``set_node_info`` for world nodes and
    should not normally be invoked by user code.

    Args:
        role_bits_to_str: A pre-built mapping from integer bitmasks to role strings,
            as returned by ``build_augmented_roles_dictionaries``. If ``None``, the
            mapping is generated from ``self.CUSTOM_ROLES``. Defaults to ``None``.
        role_str_to_bits: A pre-built mapping from role strings to integer bitmasks.
            Must be provided together with ``role_bits_to_str`` or both should be
            ``None``. Defaults to ``None``.
    """

    if role_bits_to_str is None or role_str_to_bits is None:
        role_bits_to_str, role_str_to_bits = AgentBasics.build_augmented_roles_dictionaries(self.CUSTOM_ROLES)

    self.ROLE_BITS_TO_STR.clear()
    for k, v in role_bits_to_str.items():
        self.ROLE_BITS_TO_STR[k] = v
    self.ROLE_STR_TO_BITS.clear()
    for k, v in role_str_to_bits.items():
        self.ROLE_STR_TO_BITS[k] = v
clear_world_related_data()

Destroy all world-related cached state after leaving a world (async).

Resets agent status attributes, removes all private (world-scoped) streams via the internal __remove_all_world_private_streams helper, removes all peers tracked as world agents or world masters via __remove_all_world_related_agents, and resets the rendezvous tag on the connection pool. This is the canonical cleanup path invoked when the agent disconnects from a world.

Note

Public streams and public-network agents are not removed by this method. Only world-private streams and world-related peer entries are cleared.

Source code in unaiverse/agent_basics.py
async def clear_world_related_data(self):
    """Destroy all world-related cached state after leaving a world (async).

    Resets agent status attributes, removes all private (world-scoped) streams via
    the internal ``__remove_all_world_private_streams`` helper, removes all peers
    tracked as world agents or world masters via ``__remove_all_world_related_agents``,
    and resets the rendezvous tag on the connection pool. This is the canonical
    cleanup path invoked when the agent disconnects from a world.

    Note:
        Public streams and public-network agents are not removed by this method. Only
        world-private streams and world-related peer entries are cleared.
    """

    # Clearing status variables
    self.reset_agent_status_attrs()

    # Clear/reset
    await self.__remove_all_world_private_streams()
    await self.__remove_all_world_related_agents()
    self._node_conn.reset_rendezvous_tag()

load_and_refactor_action_file_and_behav_files

load_and_refactor_action_file_and_behav_files(force_save: bool = False)

Load role-behaviour JSON files and validate them against the world's Python action files.

This method is called during world initialisation (inside set_node_info). It performs the following steps for each role discovered from the .json files in world_folder:

  1. Collects custom role names from the JSON filenames and stores them in self.CUSTOM_ROLES.
  2. Packs all Python files found in world_folder (excluding pdf and stats sub-directories) into self.packed_agent_files for distribution to joining agents.
  3. For each role, instantiates a temporary dummy agent from the corresponding .py file and verifies that its CUSTOM_ROLES match those of the world.
  4. Loads the role's .json HSM definition into a HybridStateMachine, stores its serialised form in self.role_to_behav, and writes it back to the node profile under world_roles_fsm.
  5. Re-saves the JSON (refactoring) and generates a PDF diagram under <world_folder>/pdf/ if the content has changed or force_save is True.

This method has no effect on non-world instances (self.is_world is False or world_folder is None).

Parameters:

Name Type Description Default
force_save bool

If True, the JSON and PDF files are always re-written even if the behaviour has not changed. Defaults to False.

False

Raises:

Type Description
GenException

If no .json role files are found in world_folder.

GenException

If the Python files in world_folder cannot be collected or packed.

GenException

If a dummy agent cannot be instantiated for a given role.

GenException

If the CUSTOM_ROLES declared in the Python file do not match those derived from the JSON filenames.

GenException

If a behaviour file cannot be loaded or saved.

Source code in unaiverse/agent_basics.py
def load_and_refactor_action_file_and_behav_files(self, force_save: bool = False):
    """Load role-behaviour JSON files and validate them against the world's Python action files.

    This method is called during world initialisation (inside ``set_node_info``). It
    performs the following steps for each role discovered from the ``.json`` files in
    ``world_folder``:

    1. Collects custom role names from the JSON filenames and stores them in
       ``self.CUSTOM_ROLES``.
    2. Packs all Python files found in ``world_folder`` (excluding ``pdf`` and ``stats``
       sub-directories) into ``self.packed_agent_files`` for distribution to joining
       agents.
    3. For each role, instantiates a temporary dummy agent from the corresponding
       ``.py`` file and verifies that its ``CUSTOM_ROLES`` match those of the world.
    4. Loads the role's ``.json`` HSM definition into a ``HybridStateMachine``, stores
       its serialised form in ``self.role_to_behav``, and writes it back to the node
       profile under ``world_roles_fsm``.
    5. Re-saves the JSON (refactoring) and generates a PDF diagram under
       ``<world_folder>/pdf/`` if the content has changed or ``force_save`` is
       ``True``.

    This method has no effect on non-world instances (``self.is_world`` is ``False``
    or ``world_folder`` is ``None``).

    Args:
        force_save: If ``True``, the JSON and PDF files are always re-written even if
            the behaviour has not changed. Defaults to ``False``.

    Raises:
        GenException: If no ``.json`` role files are found in ``world_folder``.
        GenException: If the Python files in ``world_folder`` cannot be collected or
            packed.
        GenException: If a dummy agent cannot be instantiated for a given role.
        GenException: If the ``CUSTOM_ROLES`` declared in the Python file do not match
            those derived from the JSON filenames.
        GenException: If a behaviour file cannot be loaded or saved.
    """

    # World only: the world discovers CUSTOM_ROLES from the JSON files in the world folder
    if self.world_folder is not None and self.is_world:

        # Guessing roles from the list of json files
        listdir = os.listdir(self.world_folder)  # Do this only once
        self.CUSTOM_ROLES = [os.path.splitext(f)[0] for f in listdir
                             if os.path.isfile(os.path.join(self.world_folder, f))
                             and f.lower().endswith(".json")]
        if len(self.CUSTOM_ROLES) == 0:
            raise GenException(f"No world-role files (*.json) were found in the world folder {self.world_folder}")

        # Default behaviors (getting roles, that are the names of the files with extension "json")
        default_behav_files = [os.path.join(self.world_folder, f) for f in listdir
                               if os.path.isfile(os.path.join(self.world_folder, f)) and
                               f.lower().endswith(".json")]

        # Loading Python files in the "world" folder
        try:
            self.packed_agent_files = pack_py_files(collect_py_files(self.world_folder,
                                                                     exclude_dirs={'pdf', 'stats'}))
        except Exception as e:
            raise GenException(f'Error while reading the agent-related python files in {self.world_folder} [{e}]')

        # Loading and refactoring behaviors
        agent_files = unpack_py_files(self.packed_agent_files)

        # We keep the compatibility with the case of a single agent.py
        if len(self.CUSTOM_ROLES) > 0:
            found = 0
            found_agent_py = False
            for role in self.CUSTOM_ROLES:
                for file_name in agent_files.keys():
                    if file_name == "agent.py":
                        found_agent_py = True
                    if file_name == f"{role}.py":
                        found += 1
                        break
            if found != len(self.CUSTOM_ROLES):
                if found_agent_py:
                    code_content = agent_files["agent.py"]

                    # All roles go to the content of agent.py
                    agent_files = {f"{k}.py": code_content for k in self.CUSTOM_ROLES}
                    self.packed_agent_files = pack_py_files(agent_files)  # Repacking
                else:
                    raise GenException(f'Mismatching JSON-file roles and .py files in {self.world_folder}')

        for role, default_behav_file in zip(self.CUSTOM_ROLES, default_behav_files):

            # Creating a dummy agent which supports the actions of the following state machines
            try:
                dummy_agent, dummy_agent_memory_finder = load_agent_in_memory(agent_files, role, proc=None)
                dummy_agent.CUSTOM_ROLES = self.CUSTOM_ROLES
            except Exception as e:
                raise GenException(f'Unable to create a valid dummy agent object for agent with role {role} [{e}]')

            # Checking if the roles you wrote in agent.py are coherent with the JSON files in this folder
            if dummy_agent.CUSTOM_ROLES != self.CUSTOM_ROLES:
                raise GenException(f"Mismatching roles. "
                                   f"Roles in JSON files: {self.CUSTOM_ROLES}. "
                                   f"Roles specified in the agent.py file: {dummy_agent.CUSTOM_ROLES}")

            try:
                behav = HybridStateMachine(dummy_agent)
                behav.load(default_behav_file)
                self.role_to_behav[role] = str(behav)

                # Adding roles and machines to profile
                self._node_profile.get_dynamic_profile()['world_roles_fsm'] = self.role_to_behav
            except Exception as e:
                raise GenException(f'Error while loading or handling '
                                   f'behav file {default_behav_file} for role {role} [{e}]')

            # Refactoring and saving PDF
            try:
                if (force_save or
                        behav.save(os.path.join(self.world_folder, f'{role}.json'), only_if_changed=dummy_agent)):
                    os.makedirs(os.path.join(self.world_folder, 'pdf'), exist_ok=True)
                    behav.save_pdf(os.path.join(self.world_folder, 'pdf', f'{role}.pdf'))
            except Exception as e:
                raise GenException(f'Error while saving the behav file {default_behav_file} for role {role} [{e}]')

            # Clearing memory
            dummy_agent_memory_finder.cleanup()

create_behav_files

create_behav_files()

Hook for world subclasses to generate role-behaviour JSON files programmatically.

This method is called by set_node_info before load_and_refactor_action_file_and_behav_files, giving subclasses an opportunity to create or overwrite the .json behaviour files in world_folder at runtime rather than maintaining them as static assets.

The default implementation does nothing. Override this method in a World subclass to generate JSON files dynamically; the files written here will be picked up immediately by the subsequent loading step.

Note

Implementing this hook is entirely optional. If the .json files already exist in world_folder and do not need to be regenerated at startup, there is no need to override this method.

Source code in unaiverse/agent_basics.py
def create_behav_files(self):
    """Hook for world subclasses to generate role-behaviour JSON files programmatically.

    This method is called by ``set_node_info`` before
    ``load_and_refactor_action_file_and_behav_files``, giving subclasses an
    opportunity to create or overwrite the ``.json`` behaviour files in
    ``world_folder`` at runtime rather than maintaining them as static assets.

    The default implementation does nothing. Override this method in a ``World``
    subclass to generate JSON files dynamically; the files written here will be
    picked up immediately by the subsequent loading step.

    Note:
        Implementing this hook is entirely optional. If the ``.json`` files already
        exist in ``world_folder`` and do not need to be regenerated at startup,
        there is no need to override this method.
    """
    pass

get_name

get_name() -> str

Return the human-readable name of this agent or world node.

The name is set at node creation time and stored in the static section of the node's profile ("node_name"). It is available as soon as set_node_info has been called; before that it defaults to "unk".

Returns:

Type Description
str

The "node_name" string from the node's static profile.

Source code in unaiverse/agent_basics.py
def get_name(self) -> str:
    """Return the human-readable name of this agent or world node.

    The name is set at node creation time and stored in the static section of the
    node's profile (``"node_name"``). It is available as soon as ``set_node_info``
    has been called; before that it defaults to ``"unk"``.

    Returns:
        The ``"node_name"`` string from the node's static profile.
    """
    return self._node_name

get_profile

get_profile() -> NodeProfile

Return the NodeProfile of the node hosting this agent or world.

The profile provides access to the node's static properties (name, email, node type, node ID) via get_static_profile(), its dynamic properties (peer IDs, role, connected peers, streams) via get_dynamic_profile(), and the node's curriculum vitae (badges) via get_cv(). The returned object is the live profile instance shared with the networking layer; callers should not modify it directly.

Returns:

Type Description
NodeProfile

The NodeProfile instance of this node, or None if set_node_info

NodeProfile

has not yet been called.

Source code in unaiverse/agent_basics.py
def get_profile(self) -> NodeProfile:
    """Return the ``NodeProfile`` of the node hosting this agent or world.

    The profile provides access to the node's static properties (name, email,
    node type, node ID) via ``get_static_profile()``, its dynamic properties
    (peer IDs, role, connected peers, streams) via ``get_dynamic_profile()``, and
    the node's curriculum vitae (badges) via ``get_cv()``. The returned object is
    the live profile instance shared with the networking layer; callers should not
    modify it directly.

    Returns:
        The ``NodeProfile`` instance of this node, or ``None`` if ``set_node_info``
        has not yet been called.
    """
    return self._node_profile

get_role

get_role(agent: str) -> str | None

Return the base role string of a connected peer.

Reads the integer role bitmask from the connection pool via _node_conn.get_role, then strips the lower two bits (which encode the world-agent/world-master distinction) to obtain the custom role component. The resulting base role integer is looked up in ROLE_BITS_TO_STR.

To retrieve the role of the agent itself rather than a peer, use get_current_role.

Parameters:

Name Type Description Default
agent str

The peer ID of the connected agent whose role is requested.

required

Returns:

Type Description
str | None

The human-readable base role string (e.g., "world_agent" or a custom role

str | None

name from CUSTOM_ROLES), or None if the role bitmask is not present in

str | None

ROLE_BITS_TO_STR.

Source code in unaiverse/agent_basics.py
def get_role(self, agent: str) -> str | None:
    """Return the base role string of a connected peer.

    Reads the integer role bitmask from the connection pool via ``_node_conn.get_role``,
    then strips the lower two bits (which encode the world-agent/world-master distinction)
    to obtain the custom role component. The resulting base role integer is looked up in
    ``ROLE_BITS_TO_STR``.

    To retrieve the role of the agent itself rather than a peer, use
    ``get_current_role``.

    Args:
        agent: The peer ID of the connected agent whose role is requested.

    Returns:
        The human-readable base role string (e.g., ``"world_agent"`` or a custom role
        name from ``CUSTOM_ROLES``), or ``None`` if the role bitmask is not present in
        ``ROLE_BITS_TO_STR``.
    """
    role_int = self._node_conn.get_role(agent)
    base_role_int = (role_int >> 2) << 2
    return self.ROLE_BITS_TO_STR[base_role_int] if base_role_int in self.ROLE_BITS_TO_STR else None

get_agents_by_role

get_agents_by_role(role: str | list[str], handshake_completed: bool = True)

Return the peer IDs of all connected agents that match one or more roles.

For each role string in role, the method resolves the corresponding integer bitmask from ROLE_STR_TO_BITS. If the role string is not augmented (i.e., does not contain the "~" separator), both the world-agent and world-master variants of that role are searched. If the role string is augmented (e.g., "world_agent~student"), only that exact compound role is searched.

Agents are then collected from the connection pool via _node_conn.find_addrs_by_role. When handshake_completed is True, only peers that are also present in all_agents (meaning the full handshake is done) are included.

Parameters:

Name Type Description Default
role str | list[str]

A single role string or a list of role strings to match against. Each string must be a key in ROLE_STR_TO_BITS.

required
handshake_completed bool

If True, only peers whose handshake has completed (i.e., that appear in all_agents) are returned. If False, all matching peers from the connection pool are returned regardless of handshake state. Defaults to True.

True

Returns:

Type Description

A list of peer ID strings for all agents whose role matches any entry in

role. The list may contain duplicates if a peer matches multiple roles.

Examples:

>>> agents = my_agent.get_agents_by_role("world_agent")
>>> agents = my_agent.get_agents_by_role(["world_master", "world_agent~student"])
Source code in unaiverse/agent_basics.py
def get_agents_by_role(self, role: str | list[str], handshake_completed: bool = True):
    """Return the peer IDs of all connected agents that match one or more roles.

    For each role string in ``role``, the method resolves the corresponding integer
    bitmask from ``ROLE_STR_TO_BITS``. If the role string is not augmented (i.e., does
    not contain the ``"~"`` separator), both the world-agent and world-master variants
    of that role are searched. If the role string is augmented (e.g.,
    ``"world_agent~student"``), only that exact compound role is searched.

    Agents are then collected from the connection pool via
    ``_node_conn.find_addrs_by_role``. When ``handshake_completed`` is ``True``, only
    peers that are also present in ``all_agents`` (meaning the full handshake is done)
    are included.

    Args:
        role: A single role string or a list of role strings to match against. Each
            string must be a key in ``ROLE_STR_TO_BITS``.
        handshake_completed: If ``True``, only peers whose handshake has completed
            (i.e., that appear in ``all_agents``) are returned. If ``False``, all
            matching peers from the connection pool are returned regardless of handshake
            state. Defaults to ``True``.

    Returns:
        A list of peer ID strings for all agents whose role matches any entry in
        ``role``. The list may contain duplicates if a peer matches multiple roles.

    Examples:
        >>> agents = my_agent.get_agents_by_role("world_agent")
        >>> agents = my_agent.get_agents_by_role(["world_master", "world_agent~student"])
    """
    role_list = role if isinstance(role, list) else [role]
    searched_roles_int = set()

    for role in role_list:
        is_augmented = "~" in role
        role_int = self.ROLE_STR_TO_BITS[role]
        if not is_augmented:
            role_int = self.ROLE_STR_TO_BITS[role]
            for role_prefix_int in {AgentBasics.ROLE_WORLD_AGENT, AgentBasics.ROLE_WORLD_MASTER}:
                searched_roles_int.add(role_prefix_int | role_int)
        else:
            searched_roles_int = {role_int}

    found_agents = []
    for role_int in searched_roles_int:
        _, _found_connected_peer_ids = self._node_conn.find_addrs_by_role(role_int, return_peer_ids_too=True)
        if handshake_completed:
            for agent in _found_connected_peer_ids:
                if agent in self.all_agents:
                    found_agents.append(agent)
        else:
            found_agents += _found_connected_peer_ids

    return found_agents

get_current_role

get_current_role(return_int: bool = False, ignore_base_role: bool = True) -> str | int | None

Return the current role of this agent within the world it has joined.

The role is read from the agent's own dynamic profile (connections.role). When ignore_base_role is True, only the custom role segment after the "~" separator is returned (e.g., "student" instead of "world_agent~student"). When return_int is True, the role string is converted back to its integer bitmask via ROLE_STR_TO_BITS.

If the agent is not currently connected to a world (in_world() returns False), None is returned without accessing the profile.

Parameters:

Name Type Description Default
return_int bool

If True, return the integer bitmask of the role instead of the string. Defaults to False.

False
ignore_base_role bool

If True, strip the base role prefix (e.g., "world_agent~") and return only the custom part. Has no effect when the role does not contain "~". Defaults to True.

True

Returns:

Type Description
str | int | None

The role as a string (or integer when return_int is True), or None

str | int | None

if the agent is not living in any worlds.

Source code in unaiverse/agent_basics.py
def get_current_role(self, return_int: bool = False, ignore_base_role: bool = True) -> str | int | None:
    """Return the current role of this agent within the world it has joined.

    The role is read from the agent's own dynamic profile (``connections.role``). When
    ``ignore_base_role`` is ``True``, only the custom role segment after the ``"~"``
    separator is returned (e.g., ``"student"`` instead of ``"world_agent~student"``).
    When ``return_int`` is ``True``, the role string is converted back to its integer
    bitmask via ``ROLE_STR_TO_BITS``.

    If the agent is not currently connected to a world (``in_world()`` returns
    ``False``), ``None`` is returned without accessing the profile.

    Args:
        return_int: If ``True``, return the integer bitmask of the role instead of the
            string. Defaults to ``False``.
        ignore_base_role: If ``True``, strip the base role prefix (e.g.,
            ``"world_agent~"``) and return only the custom part. Has no effect when the
            role does not contain ``"~"``. Defaults to ``True``.

    Returns:
        The role as a string (or integer when ``return_int`` is ``True``), or ``None``
        if the agent is not living in any worlds.
    """
    if self.in_world():
        role_str = self._node_profile.get_dynamic_profile()['connections']['role']
        if ignore_base_role:
            role_str = role_str.split("~")[-1]
        if not return_int:
            return role_str
        else:
            return self.ROLE_STR_TO_BITS[role_str]
    else:
        return None

add_agent async

add_agent(peer_id: str, profile: NodeProfile, add_proc_streams: bool = True, add_env_streams: bool = True, add_pubsub_streams: bool = True) -> bool

Register a new peer agent and import its compatible streams (async).

If the peer is already tracked in all_agents, it is first removed via remove_agent and then re-added so that updated profile data is always reflected. The peer's role bitmask (read from the connection pool) determines which membership dictionary it lands in: public_agents, world_agents, or world_masters. The node type from the static profile further places the peer in human_agents or artificial_agents.

When a processor is configured (proc_outputs and proc_inputs are not None), the method checks the peer's environmental streams (from profile.get_dynamic_profile()['streams']) and processor output streams (from profile.get_dynamic_profile()['proc_outputs']) for compatibility with this agent's processor via add_compatible_streams.

Parameters:

Name Type Description Default
peer_id str

The unique identifier of the peer to add.

required
profile NodeProfile

The NodeProfile object containing the peer's profile information.

required
add_proc_streams bool

If True, the processor output streams of the peer are checked for compatibility and added when compatible. Defaults to True.

True
add_env_streams bool

If True, the environmental streams of the peer are checked for compatibility and added when compatible. Defaults to True.

True
add_pubsub_streams bool

If True, PubSub streams are subscribed to when added. Defaults to True.

True

Returns:

Type Description
bool

True if the agent was successfully added, False if an unknown role was

bool

encountered or if a stream compatibility check failed.

Examples:

>>> ok = await my_agent.add_agent(peer_id, profile)
>>> assert ok
Source code in unaiverse/agent_basics.py
async def add_agent(self, peer_id: str, profile: NodeProfile,
                    add_proc_streams: bool = True,
                    add_env_streams: bool = True,
                    add_pubsub_streams: bool = True) -> bool:
    """Register a new peer agent and import its compatible streams (async).

    If the peer is already tracked in ``all_agents``, it is first removed via
    ``remove_agent`` and then re-added so that updated profile data is always
    reflected. The peer's role bitmask (read from the connection pool) determines
    which membership dictionary it lands in: ``public_agents``, ``world_agents``, or
    ``world_masters``. The node type from the static profile further places the peer in
    ``human_agents`` or ``artificial_agents``.

    When a processor is configured (``proc_outputs`` and ``proc_inputs`` are not
    ``None``), the method checks the peer's environmental streams (from
    ``profile.get_dynamic_profile()['streams']``) and processor output streams (from
    ``profile.get_dynamic_profile()['proc_outputs']``) for compatibility with this
    agent's processor via ``add_compatible_streams``.

    Args:
        peer_id: The unique identifier of the peer to add.
        profile: The ``NodeProfile`` object containing the peer's profile information.
        add_proc_streams: If ``True``, the processor output streams of the peer are
            checked for compatibility and added when compatible. Defaults to ``True``.
        add_env_streams: If ``True``, the environmental streams of the peer are checked
            for compatibility and added when compatible. Defaults to ``True``.
        add_pubsub_streams: If ``True``, PubSub streams are subscribed to when added.
            Defaults to ``True``.

    Returns:
        ``True`` if the agent was successfully added, ``False`` if an unknown role was
        encountered or if a stream compatibility check failed.

    Examples:
        >>> ok = await my_agent.add_agent(peer_id, profile)
        >>> assert ok
    """

    # If the agent was already there, we remove it and add it again (in case of changes)
    await self.remove_agent(peer_id)  # It has no effects if the agent is not existing

    # Guessing the type of agent to add (accordingly to the default roles shared by every agent)
    role = self._node_conn.get_role(peer_id)
    self.all_agents[peer_id] = profile
    if role & 1 == self.ROLE_PUBLIC:
        self.public_agents[peer_id] = profile
        public = True
    elif role & 3 == self.ROLE_WORLD_AGENT:
        self.world_agents[peer_id] = profile
        public = False
    elif role & 3 == self.ROLE_WORLD_MASTER:
        self.world_masters[peer_id] = profile
        public = False
    else:
        log.error(f"Cannot add agent with peer ID {peer_id} - unknown role: {role}")
        return False

    # Human or artificial?
    if profile.get_static_profile()["node_type"] == AgentBasics.HUMAN:
        self.human_agents[peer_id] = profile
    else:
        self.artificial_agents[peer_id] = profile

    # Check compatibility of the streams owned by the agent we are adding with our-agent's processor
    if self.proc_outputs is not None and self.proc_inputs is not None:

        # Check compatibility of the environmental streams of the agent we are adding with our-agent's processor
        environmental_streams = profile.get_dynamic_profile()['streams']
        if (add_env_streams and environmental_streams is not None and
                not (await self.add_compatible_streams(peer_id, environmental_streams,
                                                       buffered=False,
                                                       public=public,
                                                       skip_pubsub=not add_pubsub_streams))):
            return False

        # Check compatibility of the generated streams of the agent we are adding with our-agent's processor
        proc_streams = profile.get_dynamic_profile()['proc_outputs']
        if (add_proc_streams and proc_streams is not None and
                not (await self.add_compatible_streams(peer_id, profile.get_dynamic_profile()['proc_outputs'],
                                                       buffered=False,
                                                       public=public,
                                                       skip_pubsub=not add_pubsub_streams))):
            return False

    log.misc(f"Successfully added agent {profile.get_static_profile()['node_name']} "
             f"with peer ID {peer_id} (public: {public})")
    return True

remove_agent async

remove_agent(peer_id: str)

Remove a peer agent and clean up all associated state (async).

If the peer is not present in all_agents, the call is a no-op. Otherwise the peer is removed from every membership dictionary (all_agents, world_agents, world_masters, public_agents, human_agents, artificial_agents), from the compatible-stream sets (compat_in_streams, compat_out_streams), from the known-stream catalogue via remove_streams, from per-agent status attributes via remove_peer_from_agent_status_attrs, from the buffered-stream index, and from the interaction manager via im.remove_interactions_of_agent.

This method is the counterpart of add_agent and is also called internally whenever a peer disconnects.

Parameters:

Name Type Description Default
peer_id str

The unique peer ID of the agent to remove.

required
Note

If peer_id is not found in all_agents, the method returns without modifying any state and without raising an exception.

Source code in unaiverse/agent_basics.py
async def remove_agent(self, peer_id: str):
    """Remove a peer agent and clean up all associated state (async).

    If the peer is not present in ``all_agents``, the call is a no-op. Otherwise the
    peer is removed from every membership dictionary (``all_agents``, ``world_agents``,
    ``world_masters``, ``public_agents``, ``human_agents``, ``artificial_agents``),
    from the compatible-stream sets (``compat_in_streams``, ``compat_out_streams``),
    from the known-stream catalogue via ``remove_streams``, from per-agent status
    attributes via ``remove_peer_from_agent_status_attrs``, from the buffered-stream
    index, and from the interaction manager via ``im.remove_interactions_of_agent``.

    This method is the counterpart of ``add_agent`` and is also called internally
    whenever a peer disconnects.

    Args:
        peer_id: The unique peer ID of the agent to remove.

    Note:
        If ``peer_id`` is not found in ``all_agents``, the method returns without
        modifying any state and without raising an exception.
    """
    if peer_id in self.all_agents:

        # Removing from agent list
        del self.all_agents[peer_id]
        if peer_id in self.world_agents:
            del self.world_agents[peer_id]
        elif peer_id in self.world_masters:
            del self.world_masters[peer_id]
        elif peer_id in self.public_agents:
            del self.public_agents[peer_id]

        if peer_id in self.artificial_agents:
            del self.artificial_agents[peer_id]
        elif peer_id in self.human_agents:
            del self.human_agents[peer_id]

        # Clearing from the list of processor-input-compatible-streams
        if self.compat_in_streams is not None:
            for i, _ in enumerate(self.compat_in_streams):
                to_remove = []
                for net_hash_name in self.compat_in_streams[i]:
                    if DataProps.peer_id_from_net_hash(net_hash_name[0]) == peer_id:
                        to_remove.append(net_hash_name)
                for net_hash_name in to_remove:
                    self.compat_in_streams[i].remove(net_hash_name)

        # Clearing from the list of processor-output-compatible-streams
        if self.compat_out_streams is not None:
            for i, _ in enumerate(self.compat_out_streams):
                to_remove = []
                for net_hash_name in self.compat_out_streams[i]:
                    if DataProps.peer_id_from_net_hash(net_hash_name[0]) == peer_id:
                        to_remove.append(net_hash_name)
                for net_hash_name in to_remove:
                    self.compat_out_streams[i].remove(net_hash_name)

        # Clearing streams owned by the removed agent from the list of known streams
        await self.remove_streams(peer_id)

        # Removing from the status variables
        self.remove_peer_from_agent_status_attrs(peer_id)

        # Updating buffered stream index
        if peer_id in self.last_buffered_peer_id_to_info:
            del self.last_buffered_peer_id_to_info[peer_id]  # Only if present

        # Clearing pending interactions
        await self.im.remove_interactions_of_agent(peer_id)

        log.misc(f"Successfully removed agent with peer ID {peer_id}")

remove_all_agents

remove_all_agents()

Remove all known peer agents and reset membership and stream state.

All six membership dictionaries (all_agents, public_agents, world_masters, world_agents, human_agents, artificial_agents) are cleared. The compatible-stream sets compat_in_streams and compat_out_streams are re-initialised to empty sets (one per processor input/output entry respectively) so that fresh compatibility tracking can begin. Finally, all non-owned known streams are removed via remove_all_streams(owned_too=False), preserving the agent's own streams.

This method does not unsubscribe from PubSub channels; if graceful unsubscription is needed, prefer iterating over agents and calling remove_agent on each.

Source code in unaiverse/agent_basics.py
def remove_all_agents(self):
    """Remove all known peer agents and reset membership and stream state.

    All six membership dictionaries (``all_agents``, ``public_agents``,
    ``world_masters``, ``world_agents``, ``human_agents``, ``artificial_agents``) are
    cleared. The compatible-stream sets ``compat_in_streams`` and ``compat_out_streams``
    are re-initialised to empty sets (one per processor input/output entry respectively)
    so that fresh compatibility tracking can begin. Finally, all non-owned known streams
    are removed via ``remove_all_streams(owned_too=False)``, preserving the agent's own
    streams.

    This method does not unsubscribe from PubSub channels; if graceful unsubscription
    is needed, prefer iterating over agents and calling ``remove_agent`` on each.
    """

    # Clearing all agents
    self.all_agents = {}
    self.public_agents = {}
    self.world_masters = {}
    self.world_agents = {}
    self.human_agents = {}
    self.artificial_agents = {}

    # Clearing the list of processor-output-compatible-streams
    if self.compat_in_streams is not None and self.proc_inputs is not None:
        self.compat_in_streams = [set() for _ in range(len(self.proc_inputs))]
    if self.compat_out_streams is not None and self.proc_outputs is not None:
        self.compat_out_streams = [set() for _ in range(len(self.proc_outputs))]

    # Clearing the list of known streams (not our own streams!)
    self.remove_all_streams(owned_too=False)
    log.misc(f"Successfully removed all agents")

add_behav_wildcard

add_behav_wildcard(wildcard_from: str, wildcard_to: object)

Register a wildcard substitution for the agent's behavioral state machine.

Wildcard mappings stored in behav_wildcards are used when the agent joins a world and the world's HSM JSON contains placeholder strings that need to be resolved to live Python objects (for example, callable actions defined in the agent). Calling this method before joining a world ensures that the HSM can reference the correct targets by name.

Parameters:

Name Type Description Default
wildcard_from str

The placeholder string as it appears in the HSM JSON.

required
wildcard_to object

The Python object (typically a callable or a constant) that replaces the placeholder at load time.

required
Source code in unaiverse/agent_basics.py
def add_behav_wildcard(self, wildcard_from: str, wildcard_to: object):
    """Register a wildcard substitution for the agent's behavioral state machine.

    Wildcard mappings stored in ``behav_wildcards`` are used when the agent joins a
    world and the world's HSM JSON contains placeholder strings that need to be
    resolved to live Python objects (for example, callable actions defined in the
    agent). Calling this method before joining a world ensures that the HSM can
    reference the correct targets by name.

    Args:
        wildcard_from: The placeholder string as it appears in the HSM JSON.
        wildcard_to: The Python object (typically a callable or a constant) that
            replaces the placeholder at load time.
    """
    self.behav_wildcards[wildcard_from] = wildcard_to

add_stream

add_stream(stream: Stream, owned: bool = True, net_hash: str | None = None) -> dict[str, Stream]

Add a stream to the agent's catalogue of known streams.

The stream is inserted into known_streams (indexed by net_hash) and into known_streams_by_user_hash (indexed by "peer_id:name"). If a stream with the same user hash already exists under a different net hash, a GenException is raised to prevent naming clashes. Existing entries with the same net hash and stream name are silently overwritten, which allows profile updates to propagate correctly.

When owned is True, the stream is additionally placed in the appropriate specialised dictionaries:

  • If the stream matches an entry in proc_outputs (by name and group), it goes into proc_streams / proc_streams_by_user_hash and the relevant public or private variant (_proc_streams_by_user_hash_pub / _prv).
  • Else if it matches proc_inputs, it goes into proc_in_streams / proc_in_streams_by_user_hash and the corresponding pub/prv variant.
  • Otherwise it is treated as an environmental stream and goes into env_streams / env_streams_by_user_hash and the relevant pub/prv variant.
  • In all owned cases, it is also added to owned_streams / owned_streams_by_user_hash.

If merge_flat_stream_labels is True, merge_flat_data_stream_props is called after insertion to keep the shared label superset up to date.

Parameters:

Name Type Description Default
stream Stream

The Stream object to add.

required
owned bool

If True, the stream is treated as owned by this agent and inserted into the specialised owned-stream dictionaries. Defaults to True.

True
net_hash str | None

An explicit network hash to use. If None, the hash is computed from the stream's properties and the current peer ID. Defaults to None.

None

Returns:

Type Description
dict[str, Stream]

A dictionary mapping stream names to Stream objects for the net-hash group

dict[str, Stream]

that the stream was added to. Returns an empty dict if the stream could not be

dict[str, Stream]

added due to a property mismatch (e.g., public/pubsub conflict within the

dict[str, Stream]

group).

Raises:

Type Description
GenException

If a naming clash is detected: the user hash is already known under a different net hash.

Examples:

>>> from unaiverse.streams.streams import Stream
>>> stream = Stream(props=my_data_props)
>>> group = my_agent.add_stream(stream, owned=True)
Source code in unaiverse/agent_basics.py
def add_stream(self, stream: Stream, owned: bool = True, net_hash: str | None = None) -> dict[str, Stream]:
    """Add a stream to the agent's catalogue of known streams.

    The stream is inserted into ``known_streams`` (indexed by ``net_hash``) and into
    ``known_streams_by_user_hash`` (indexed by ``"peer_id:name"``). If a stream with
    the same user hash already exists under a different net hash, a ``GenException`` is
    raised to prevent naming clashes. Existing entries with the same net hash and stream
    name are silently overwritten, which allows profile updates to propagate correctly.

    When ``owned`` is ``True``, the stream is additionally placed in the appropriate
    specialised dictionaries:

    - If the stream matches an entry in ``proc_outputs`` (by name and group), it goes
      into ``proc_streams`` / ``proc_streams_by_user_hash`` and the relevant public or
      private variant (``_proc_streams_by_user_hash_pub`` / ``_prv``).
    - Else if it matches ``proc_inputs``, it goes into ``proc_in_streams`` /
      ``proc_in_streams_by_user_hash`` and the corresponding pub/prv variant.
    - Otherwise it is treated as an environmental stream and goes into ``env_streams`` /
      ``env_streams_by_user_hash`` and the relevant pub/prv variant.
    - In all owned cases, it is also added to ``owned_streams`` /
      ``owned_streams_by_user_hash``.

    If ``merge_flat_stream_labels`` is ``True``, ``merge_flat_data_stream_props`` is
    called after insertion to keep the shared label superset up to date.

    Args:
        stream: The ``Stream`` object to add.
        owned: If ``True``, the stream is treated as owned by this agent and inserted
            into the specialised owned-stream dictionaries. Defaults to ``True``.
        net_hash: An explicit network hash to use. If ``None``, the hash is computed
            from the stream's properties and the current peer ID. Defaults to ``None``.

    Returns:
        A dictionary mapping stream names to ``Stream`` objects for the net-hash group
        that the stream was added to. Returns an empty dict if the stream could not be
        added due to a property mismatch (e.g., public/pubsub conflict within the
        group).

    Raises:
        GenException: If a naming clash is detected: the user hash is already known
            under a different net hash.

    Examples:
        >>> from unaiverse.streams.streams import Stream
        >>> stream = Stream(props=my_data_props)
        >>> group = my_agent.add_stream(stream, owned=True)
    """

    # Stream net hash
    if net_hash is None:
        public_peer_id, private_peer_id = self.get_peer_ids()
        peer_id = public_peer_id if stream.is_public() else private_peer_id
        net_hash = stream.net_hash(peer_id)
    user_hash = stream.user_hash(stream.peer_id_from_net_hash(net_hash))

    # It is fine to overwrite an existing stream, but the net hash cannot change
    if net_hash not in self.known_streams and user_hash in self.known_streams_by_user_hash:
        raise GenException(f"Naming clash in streams. "
                           f"Net hash {net_hash} is not known (stream name: {stream.props.get_name()}), "
                           f"but user hash {user_hash} is already existing. "
                           f"Are you using the same name for two different streams? (not allowed)")

    # Adding the new stream
    if net_hash not in self.known_streams:
        self.known_streams[net_hash] = {}
    else:
        for _stream in self.known_streams[net_hash].values():
            public = _stream.get_props().is_public()
            pubsub = _stream.get_props().is_pubsub()
            if public and not stream.get_props().is_public():
                log.error(f"Cannot add a stream to a group with different properties (public): "
                          f"hash: {net_hash}, name: {stream.get_props().get_name()}, "
                          f"public: {stream.get_props().is_public()}")
                return {}
            if pubsub and not stream.get_props().is_pubsub():
                log.error(f"Cannot add a stream to a group with different properties (pubsub): "
                          f"hash: {net_hash}, name: {stream.get_props().get_name()}, "
                          f"public: {stream.get_props().is_public()}")
                return {}
            break
    self.known_streams[net_hash][stream.get_props().get_name()] = stream  # Overwrite if existing
    self.known_streams_by_user_hash[user_hash] = stream  # Overwrite if existing

    if owned:
        is_proc_outputs_stream = False
        is_proc_inputs_stream = False

        # Adding an 'owned' processor output stream (i.e., the stream coming from OUR OWN processor)
        if self.proc_outputs is not None:
            proc_outputs_name_and_group = set()
            for props in self.proc_outputs:
                proc_outputs_name_and_group.add((props.get_name(), props.get_group()))
            if (stream.get_props().get_name(), stream.get_props().get_group()) in proc_outputs_name_and_group:
                if net_hash not in self.proc_streams:
                    self.proc_streams[net_hash] = {}
                self.proc_streams[net_hash][stream.get_props().get_name()] = stream
                self.proc_streams_by_user_hash[user_hash] = stream
                if stream.is_public():
                    self._proc_streams_by_user_hash_pub[user_hash] = stream
                else:
                    self._proc_streams_by_user_hash_prv[user_hash] = stream
                is_proc_outputs_stream = True

        # Adding an 'owned' processor input stream (i.e., the stream entering OUR OWN processor)
        if self.proc_inputs is not None and not is_proc_outputs_stream:
            proc_inputs_name_and_group = set()
            for props in self.proc_inputs:
                proc_inputs_name_and_group.add((props.get_name(), props.get_group()))
            if (stream.get_props().get_name(), stream.get_props().get_group()) in proc_inputs_name_and_group:
                if net_hash not in self.proc_in_streams:
                    self.proc_in_streams[net_hash] = {}
                self.proc_in_streams[net_hash][stream.get_props().get_name()] = stream
                self.proc_in_streams_by_user_hash[user_hash] = stream
                if stream.is_public():
                    self._proc_in_streams_by_user_hash_pub[user_hash] = stream
                else:
                    self._proc_in_streams_by_user_hash_prv[user_hash] = stream
                is_proc_inputs_stream = True

        if net_hash not in self.owned_streams:
            self.owned_streams[net_hash] = {}
        self.owned_streams[net_hash][stream.get_props().get_name()] = stream
        self.owned_streams_by_user_hash[user_hash] = stream

        if not is_proc_outputs_stream and not is_proc_inputs_stream:
            if net_hash not in self.env_streams:
                self.env_streams[net_hash] = {}
            self.env_streams[net_hash][stream.get_props().get_name()] = stream
            self.env_streams_by_user_hash[user_hash] = stream
            if stream.is_public():
                self._env_streams_by_user_hash_pub[user_hash] = stream
            else:
                self._env_streams_by_user_hash_prv[user_hash] = stream

    # If needed, merging descriptor labels (attribute labels) and sharing them with all streams
    if self.merge_flat_stream_labels:
        self.merge_flat_data_stream_props()

    return self.known_streams[net_hash]

add_streams

add_streams(streams: list[Stream], owned: bool = True, net_hash: str | None = None) -> list[dict[str, Stream]]

Add multiple streams to the agent's catalogue in a single call.

Iterates over streams and delegates each element to add_stream. If any individual add_stream call returns an empty dictionary (indicating a failure such as a property mismatch), the method stops immediately and returns an empty list, leaving already-added streams in place.

Parameters:

Name Type Description Default
streams list[Stream]

A list of Stream objects to add.

required
owned bool

If True, each stream is treated as owned by this agent. Defaults to True.

True
net_hash str | None

An explicit network hash applied to every stream. If None, the hash is computed individually for each stream by add_stream. Defaults to None.

None

Returns:

Type Description
list[dict[str, Stream]]

A list of stream-group dictionaries, one per stream in streams, as returned

list[dict[str, Stream]]

by add_stream. Returns an empty list if any stream could not be added.

Raises:

Type Description
GenException

Propagated from add_stream if a naming clash is detected for any stream.

Source code in unaiverse/agent_basics.py
def add_streams(self, streams: list[Stream], owned: bool = True, net_hash: str | None = None) \
        -> list[dict[str, Stream]]:
    """Add multiple streams to the agent's catalogue in a single call.

    Iterates over ``streams`` and delegates each element to ``add_stream``. If any
    individual ``add_stream`` call returns an empty dictionary (indicating a failure
    such as a property mismatch), the method stops immediately and returns an empty
    list, leaving already-added streams in place.

    Args:
        streams: A list of ``Stream`` objects to add.
        owned: If ``True``, each stream is treated as owned by this agent. Defaults to
            ``True``.
        net_hash: An explicit network hash applied to every stream. If ``None``, the
            hash is computed individually for each stream by ``add_stream``. Defaults
            to ``None``.

    Returns:
        A list of stream-group dictionaries, one per stream in ``streams``, as returned
        by ``add_stream``. Returns an empty list if any stream could not be added.

    Raises:
        GenException: Propagated from ``add_stream`` if a naming clash is detected for
            any stream.
    """

    # Adding the new stream
    ret = []
    for stream in streams:
        stream_dict = self.add_stream(stream, owned, net_hash)
        if len(stream_dict) == 0:
            return []
        ret.append(stream_dict)
    return ret

remove_streams async

remove_streams(peer_id: str, name: str | None = None, owned_too: bool = False)

Remove streams owned by a given peer from the agent's catalogue (async).

Scans known_streams for all net hashes whose embedded peer ID matches peer_id. For each matching entry, if owned_too is False, any stream that belongs to owned_streams is skipped (preserving the agent's own streams). Streams that are removed are also cleaned up from owned_streams, env_streams, proc_streams, proc_in_streams, and the corresponding *_by_user_hash dictionaries. PubSub channels for removed pubsub streams are unsubscribed via _node_conn.unsubscribe.

Parameters:

Name Type Description Default
peer_id str

The peer ID whose associated streams are to be removed.

required
name str | None

The specific stream name to remove. If None, all streams belonging to peer_id are removed. Defaults to None.

None
owned_too bool

If True, also remove streams that are owned by this agent (environmental and processor streams). Defaults to False.

False
Note

A failed PubSub unsubscription is logged but does not interrupt the removal process.

Source code in unaiverse/agent_basics.py
async def remove_streams(self, peer_id: str, name: str | None = None, owned_too: bool = False):
    """Remove streams owned by a given peer from the agent's catalogue (async).

    Scans ``known_streams`` for all net hashes whose embedded peer ID matches
    ``peer_id``. For each matching entry, if ``owned_too`` is ``False``, any stream
    that belongs to ``owned_streams`` is skipped (preserving the agent's own streams).
    Streams that are removed are also cleaned up from ``owned_streams``,
    ``env_streams``, ``proc_streams``, ``proc_in_streams``, and the corresponding
    ``*_by_user_hash`` dictionaries. PubSub channels for removed pubsub streams are
    unsubscribed via ``_node_conn.unsubscribe``.

    Args:
        peer_id: The peer ID whose associated streams are to be removed.
        name: The specific stream name to remove. If ``None``, all streams belonging
            to ``peer_id`` are removed. Defaults to ``None``.
        owned_too: If ``True``, also remove streams that are owned by this agent
            (environmental and processor streams). Defaults to ``False``.

    Note:
        A failed PubSub unsubscription is logged but does not interrupt the removal
        process.
    """

    # Identifying what to remove
    to_remove = []
    for net_hash in self.known_streams.keys():
        if DataProps.peer_id_from_net_hash(net_hash) == peer_id:
            for _name, _stream in self.known_streams[net_hash].items():
                if name is None or name == _name:
                    to_remove.append((net_hash, _name))

    # Removing
    for (net_hash, name) in to_remove:
        if not owned_too and net_hash in self.owned_streams:
            continue

        group = DataProps.name_or_group_from_net_hash(net_hash)
        user_hash = DataProps.user_hash_from_net_hash(net_hash, name if name is not None else group)

        del self.known_streams[net_hash][name]
        if len(self.known_streams[net_hash]) == 0:
            del self.known_streams[net_hash]
        if user_hash in self.known_streams_by_user_hash:
            del self.known_streams_by_user_hash[user_hash]

        # Unsubscribing to pubsub
        if DataProps.is_pubsub_from_net_hash(net_hash):
            if peer_id != "<private_peer_id>" and peer_id != "<public_peer_id>":
                if not (await self._node_conn.unsubscribe(peer_id, channel=net_hash)):
                    # log.error(f"Failed in unsubscribing from pubsub, peer_id: {peer_id}, channel: {net_hash}")
                    pass
                else:
                    log.misc(f"Successfully unsubscribed from pubsub, peer_id: {peer_id}, channel: {net_hash}")

        # Removing all the owned streams (environment and processor streams are of course "owned")
        if net_hash in self.owned_streams:
            if name in self.owned_streams[net_hash]:
                del self.owned_streams[net_hash][name]
                if len(self.owned_streams[net_hash]) == 0:
                    del self.owned_streams[net_hash]
        if user_hash in self.owned_streams_by_user_hash:
            del self.owned_streams_by_user_hash[user_hash]
        if net_hash in self.env_streams:
            if name in self.env_streams[net_hash]:
                del self.env_streams[net_hash][name]
                if len(self.env_streams[net_hash]) == 0:
                    del self.env_streams[net_hash]
        if user_hash in self.env_streams_by_user_hash:
            del self.env_streams_by_user_hash[user_hash]
        if net_hash in self.proc_streams:
            if name in self.proc_streams[net_hash]:
                del self.proc_streams[net_hash][name]
                if len(self.proc_streams[net_hash]) == 0:
                    del self.proc_streams[net_hash]
        if user_hash in self.proc_streams_by_user_hash:
            del self.proc_streams_by_user_hash[user_hash]
        log.misc(f"Successfully removed known stream with network hash {net_hash}, stream name: {name}")

remove_all_streams

remove_all_streams(owned_too: bool = False)

Remove all non-owned streams from the agent's catalogue.

When owned_too is False (the default), known_streams and known_streams_by_user_hash are rebuilt to contain only entries that are also present in owned_streams, effectively discarding all peer-originated streams while preserving the agent's own streams.

When owned_too is True, every stream dictionary is cleared entirely, including owned_streams, env_streams, proc_streams, proc_in_streams, and all *_by_user_hash variants.

This method does not unsubscribe from any PubSub channels. Use remove_streams with individual peer IDs for graceful PubSub teardown.

Parameters:

Name Type Description Default
owned_too bool

If True, also remove the agent's own streams (environmental and processor streams). Defaults to False.

False
Source code in unaiverse/agent_basics.py
def remove_all_streams(self, owned_too: bool = False):
    """Remove all non-owned streams from the agent's catalogue.

    When ``owned_too`` is ``False`` (the default), ``known_streams`` and
    ``known_streams_by_user_hash`` are rebuilt to contain only entries that are also
    present in ``owned_streams``, effectively discarding all peer-originated streams
    while preserving the agent's own streams.

    When ``owned_too`` is ``True``, every stream dictionary is cleared entirely,
    including ``owned_streams``, ``env_streams``, ``proc_streams``,
    ``proc_in_streams``, and all ``*_by_user_hash`` variants.

    This method does not unsubscribe from any PubSub channels. Use ``remove_streams``
    with individual peer IDs for graceful PubSub teardown.

    Args:
        owned_too: If ``True``, also remove the agent's own streams (environmental and
            processor streams). Defaults to ``False``.
    """
    if not owned_too:
        self.known_streams = {k: v for k, v in self.owned_streams.items()}
        self.known_streams_by_user_hash = {k: v for k, v in self.known_streams_by_user_hash.items()}
    else:
        self.known_streams = {}
        self.owned_streams = {}
        self.env_streams = {}
        self.proc_streams = {}
        self.proc_in_streams = {}
        self.known_streams_by_user_hash = {}
        self.owned_streams_by_user_hash = {}
        self.env_streams_by_user_hash = {}
        self.proc_streams_by_user_hash = {}
        self.proc_in_streams_by_user_hash = {}
    log.misc(f"Successfully removed all streams!")

get_stream

get_stream(name_or_group: str, peer_id: str | None = None, data_type: str | None = None) -> Stream | None

Return a single stream identified by name or group, optionally filtered by data type.

The lookup first tries the known_streams_by_user_hash dictionary using a user hash built from peer_id and name_or_group. If that misses, it falls back to get_streams which searches by group name. When multiple streams are found in the group, a data_type filter may be used to disambiguate; if no filter is provided and more than one stream exists in the group, None is returned (ambiguous case).

Parameters:

Name Type Description Default
name_or_group str

The stream name or group identifier to look up.

required
peer_id str | None

The peer ID of the stream owner. If None, the agent's own current peer ID (from get_peer_id) is used. Defaults to None.

None
data_type str | None

An optional data type string (e.g., "image", "tensor") used to select a specific stream when the group contains multiple streams. Defaults to None.

None

Returns:

Type Description
Stream | None

The matching Stream object, or None if the stream is not found, if

Stream | None

multiple streams match and data_type is not specified, or if no stream

Stream | None

matches the given data_type.

Examples:

>>> stream = my_agent.get_stream("processor")
>>> stream = my_agent.get_stream("sensor", peer_id=peer_id, data_type="image")
Source code in unaiverse/agent_basics.py
def get_stream(self, name_or_group: str, peer_id: str | None = None, data_type: str | None = None) -> Stream | None:
    """Return a single stream identified by name or group, optionally filtered by data type.

    The lookup first tries the ``known_streams_by_user_hash`` dictionary using a
    user hash built from ``peer_id`` and ``name_or_group``. If that misses, it falls
    back to ``get_streams`` which searches by group name. When multiple streams are
    found in the group, a ``data_type`` filter may be used to disambiguate; if no
    filter is provided and more than one stream exists in the group, ``None`` is
    returned (ambiguous case).

    Args:
        name_or_group: The stream name or group identifier to look up.
        peer_id: The peer ID of the stream owner. If ``None``, the agent's own current
            peer ID (from ``get_peer_id``) is used. Defaults to ``None``.
        data_type: An optional data type string (e.g., ``"image"``, ``"tensor"``) used
            to select a specific stream when the group contains multiple streams.
            Defaults to ``None``.

    Returns:
        The matching ``Stream`` object, or ``None`` if the stream is not found, if
        multiple streams match and ``data_type`` is not specified, or if no stream
        matches the given ``data_type``.

    Examples:
        >>> stream = my_agent.get_stream("processor")
        >>> stream = my_agent.get_stream("sensor", peer_id=peer_id, data_type="image")
    """
    if peer_id is None:
        peer_id = self.get_peer_id()
    user_hash = DataProps.build_user_hash(peer_id, name_or_group)
    if user_hash in self.known_streams_by_user_hash:
        return self.known_streams_by_user_hash[user_hash]
    else:
        streams = self.get_streams(group_name=name_or_group, peer_id=peer_id)
        if streams is not None and len(streams) > 0:
            if data_type is not None:
                for stream in streams:
                    if stream.props.data_type == data_type:
                        return stream
                return None
            elif len(streams) == 1:
                return streams[0]
            else:
                return None  # Ambiguous case
        else:
            return None

get_streams

get_streams(group_name: str, peer_id: str | None = None) -> list[Stream] | None
Source code in unaiverse/agent_basics.py
def get_streams(self, group_name: str, peer_id: str | None = None) -> list[Stream] | None:
    if peer_id is None:
        peer_id = self.get_peer_id()
    net_hash_ps = DataProps.build_net_hash(peer_id, pubsub=True, name_or_group=group_name)
    if net_hash_ps in self.known_streams:
        return list(self.known_streams[net_hash_ps].values())
    else:
        net_hash_dm = DataProps.build_net_hash(peer_id, pubsub=False, name_or_group=group_name)
        if net_hash_dm in self.known_streams:
            return list(self.known_streams[net_hash_dm].values())
        else:
            return None

find_streams

find_streams(peer_id: str, name_or_group: str | None = None, discard_owned: bool = False) -> dict[str, dict[str, Stream]]

Find streams associated with a given peer ID and optionally by name or group.

Parameters:

Name Type Description Default
peer_id str

The peer ID of the (owner of the) streams to find.

required
name_or_group str | None

Optional name or group of the streams to find.

None
discard_owned bool

If True, the owned streams are not searched (default False).

False

Returns:

Type Description
dict[str, dict[str, Stream]]

A dictionary where keys are network hashes and values are dictionaries of streams

dict[str, dict[str, Stream]]

(stream name to Stream object) matching the criteria.

Source code in unaiverse/agent_basics.py
def find_streams(self, peer_id: str, name_or_group: str | None = None, discard_owned: bool = False) \
        -> dict[str, dict[str, Stream]]:
    """Find streams associated with a given peer ID and optionally by name or group.

    Args:
        peer_id: The peer ID of the (owner of the) streams to find.
        name_or_group: Optional name or group of the streams to find.
        discard_owned: If True, the owned streams are not searched (default False).

    Returns:
        A dictionary where keys are network hashes and values are dictionaries of streams
        (stream name to Stream object) matching the criteria.
    """
    ret = {}
    for net_hash, streams_dict in self.known_streams.items():
        if discard_owned and net_hash in self.owned_streams:
            continue
        _peer_id = Stream.peer_id_from_net_hash(net_hash)
        _name_or_group = Stream.name_or_group_from_net_hash(net_hash)
        if peer_id == _peer_id:
            if name_or_group is None or name_or_group == _name_or_group:
                ret[net_hash] = streams_dict
            else:
                for _name, _stream in streams_dict.items():
                    if name_or_group == _name:
                        if net_hash not in ret:
                            ret[net_hash] = {}
                        ret[net_hash][name_or_group] = _stream
    return ret

get_last_streamed_data

get_last_streamed_data(agent_name: str)

Get the last data samples from the processor streams of a given agent.

Parameters:

Name Type Description Default
agent_name str

The name of the agent.

required

Returns:

Type Description

A list of data samples taken from all the known streams associated to the provided agent.

Source code in unaiverse/agent_basics.py
def get_last_streamed_data(self, agent_name: str):
    """Get the last data samples from the processor streams of a given agent.

    Args:
        agent_name: The name of the agent.

    Returns:
        A list of data samples taken from all the known streams associated to the provided agent.
    """
    data_list = []
    for peer_id, profile in self.all_agents.items():
        if profile.get_static_profile()['node_name'] == agent_name:
            net_hash_to_stream_dict = self.find_streams(peer_id, name_or_group="processor")
            for net_hash, streams_dict in net_hash_to_stream_dict.items():
                for stream_name, stream_obj in streams_dict.items():
                    data_list.append(stream_obj.get())
    return data_list

merge_flat_data_stream_props

merge_flat_data_stream_props()

Merge the labels of the descriptor components, across all streams, sharing them.

Source code in unaiverse/agent_basics.py
def merge_flat_data_stream_props(self):
    """Merge the labels of the descriptor components, across all streams, sharing them."""

    # Set of pivot labels
    superset_labels = []

    # Checking the whole list of streams, but considering only the ones with generic data, flat, and labels
    considered_streams = []

    for stream_dict in self.owned_streams.values():
        for stream in stream_dict.values():

            # Skipping not flat, or not generic, or unlabeled streams
            if not stream.props.is_flat_tensor_with_labels():
                continue

            # Saving list of considered streams
            considered_streams.append(stream)

            # Adding the current stream-labels to the pivot labels
            for label in stream.props.tensor_labels:
                if label not in superset_labels:
                    superset_labels.append(label)

    # Telling each stream in which positions their labels fall, given the pivot labels
    for stream in considered_streams:

        # In the case of BufferedStream, we have to update the data buffer by clearing previously applied
        # adaptation first (I know it looks similar to what is done below, but we must clear first!)
        if isinstance(stream, BufferedStream):
            assert stream.props is not None
            for uuid in stream.data_buffer_by_uuid.keys():
                for data in stream.data_buffer_by_uuid[uuid]:
                    data.data = stream.props.clear_label_adaptation(data.data)

        # Updating labels
        assert stream.props is not None and isinstance(stream.props.tensor_labels, TensorLabels)
        stream.props.tensor_labels.interleave_with(superset_labels)

        # In the case of BufferedStream, we have to update the data buffer with the new labels
        if isinstance(stream, BufferedStream):
            assert stream.props is not None

            for uuid in stream.data_buffer_by_uuid.keys():
                for data in stream.data_buffer_by_uuid[uuid]:
                    data.data = stream.props.adapt_tensor_to_tensor_labels(data.data)

user_stream_hash_to_net_hash

user_stream_hash_to_net_hash(user_stream_hash: str) -> str | None

Converts a user-defined stream hash (peer_id:name_or_group) to a network hash (peer_id:🇩🇲... or peer_id:🇵🇸name_or_group) by searching the known hashes in the known streams.

Parameters:

Name Type Description Default
user_stream_hash str

The user-defined stream hash string (peer_id:name).

required

Returns:

Type Description
str | None

The corresponding network hash string (peer_id:🇩🇲... or peer_id:🇵🇸name_or_group), or None if not found.

Source code in unaiverse/agent_basics.py
def user_stream_hash_to_net_hash(self, user_stream_hash: str) -> str | None:
    """Converts a user-defined stream hash (peer_id:name_or_group) to a network hash
    (peer_id::dm:... or peer_id::ps:name_or_group) by searching the known hashes in the known streams.

    Args:
        user_stream_hash: The user-defined stream hash string (peer_id:name).

    Returns:
        The corresponding network hash string (peer_id::dm:... or peer_id::ps:name_or_group), or None if not found.
    """
    if user_stream_hash is None:
        return None
    if "::" in user_stream_hash:
        return user_stream_hash  # It was already fine
    components = user_stream_hash.split(":")
    peer_id = components[0]
    name = components[-1]
    for net_hash, stream_dict in self.known_streams.items():
        _peer_id = Stream.peer_id_from_net_hash(net_hash)
        _name_or_group = Stream.name_or_group_from_net_hash(net_hash)
        if _peer_id == peer_id and _name_or_group == name:
            return net_hash
        for _name in stream_dict.keys():
            if _peer_id == peer_id and _name == name:
                return net_hash
    return None

create_proc_input_streams

create_proc_input_streams(buffered: bool = False)

Creates the processor input streams based on the proc_inputs defined for the agent.

Parameters:

Name Type Description Default
buffered bool

If True, the created streams will be of type BufferedStream.

False
Source code in unaiverse/agent_basics.py
def create_proc_input_streams(self, buffered: bool = False):
    """Creates the processor input streams based on the `proc_inputs` defined for the agent.

    Args:
        buffered: If True, the created streams will be of type BufferedStream.
    """

    # Adding input streams (grouped together), passing the node clock
    if self.proc_inputs is not None:
        for i, procs in enumerate(self.proc_inputs):
            procs.set_group("processor_in")  # Adding default group info, forced, do not change this!

            # Creating the streams
            for props in procs.props:
                if not buffered:
                    stream = Stream(props=props.clone())
                else:
                    stream = BufferedStream(props=props.clone())

                self.add_stream(stream, owned=True)

                public_peer_id, private_peer_id = self.get_peer_ids()
                peer_id = public_peer_id if stream.is_public() else private_peer_id
                net_hash = stream.net_hash(peer_id)
                if stream.is_public():
                    self.proc_in_net_hash['public'] = net_hash
                else:
                    self.proc_in_net_hash['private'] = net_hash

                # forcing the input stream to be compatible with proc inputs
                self.compat_in_streams[i].add((net_hash, props.get_name()))

create_proc_output_streams

create_proc_output_streams(buffered: bool = False)

Creates the processor output streams based on the proc_outputs defined for the agent.

Parameters:

Name Type Description Default
buffered bool

If True, the created streams will be of type BufferedStream.

False
Source code in unaiverse/agent_basics.py
def create_proc_output_streams(self, buffered: bool = False):
    """Creates the processor output streams based on the `proc_outputs` defined for the agent.

    Args:
        buffered: If True, the created streams will be of type BufferedStream.
    """

    # Adding generated streams (grouped together), passing the node clock
    if self.proc_outputs is not None:
        for i, procs in enumerate(self.proc_outputs):
            procs.set_group("processor")  # Adding default group info, forced, do not change this!

            # Creating the streams
            for props in procs.props:
                if not buffered:
                    stream = Stream(props=props.clone())
                else:
                    stream = BufferedStream(props=props.clone())

                self.add_stream(stream, owned=True)

                public_peer_id, private_peer_id = self.get_peer_ids()
                peer_id = public_peer_id if stream.is_public() else private_peer_id
                net_hash = stream.net_hash(peer_id)
                if stream.is_public():
                    self.proc_net_hash['public'] = net_hash
                else:
                    self.proc_net_hash['private'] = net_hash

add_compatible_streams async

add_compatible_streams(peer_id: str, streams_in_profile: list[dict], buffered: bool = False, add_all: bool = False, public: bool = True, skip_pubsub: bool = False) -> bool

Add to the list of processor-compatible-streams those streams provided as arguments that are actually found to be compatible with the processor (if they are pubsub, it also subscribes to them) (async).

Parameters:

Name Type Description Default
peer_id str

The peer ID of the agent providing the streams.

required
streams_in_profile list[dict]

A list of dictionaries (DataProps to dict) representing the streams in peer's profile.

required
buffered bool

If True, the added streams will be of type BufferedStream.

False
add_all bool

If True, all streams from the profile are added, regardless of processor compatibility.

False
public bool

Consider public streams only (or private streams only).

True
skip_pubsub bool

Skip pubsub streams.

False

Returns:

Type Description
bool

True if compatible streams were successfully added and subscribed to, False otherwise.

Source code in unaiverse/agent_basics.py
async def add_compatible_streams(self, peer_id: str,
                                 streams_in_profile: list[dict], buffered: bool = False,
                                 add_all: bool = False, public: bool = True, skip_pubsub: bool = False) -> bool:
    """Add to the list of processor-compatible-streams those streams provided as arguments that are actually
    found to be compatible with the processor (if they are pubsub, it also subscribes to them) (async).

    Args:
        peer_id: The peer ID of the agent providing the streams.
        streams_in_profile: A list of dictionaries (DataProps to dict) representing the streams in peer's profile.
        buffered: If True, the added streams will be of type BufferedStream.
        add_all: If True, all streams from the profile are added, regardless of processor compatibility.
        public: Consider public streams only (or private streams only).
        skip_pubsub: Skip pubsub streams.

    Returns:
        True if compatible streams were successfully added and subscribed to, False otherwise.
    """
    added_streams = []

    if add_all:

        # This is the case in which we add all streams, storing all pairs (DataProps, net_hash)
        for j in streams_in_profile:
            jj = DataProps.from_dict(j)
            if (public == jj.is_public()
                    and (not jj.is_pubsub() or not skip_pubsub)):
                net_hash = jj.net_hash(peer_id)
                added_streams.append((jj, net_hash))
    else:

        # This is the case in which a processor is present, hence storing pairs (DataProps, net_hash)
        # of the found compatible streams
        added_net_hash_to_prop_name = {}

        # Find streams that are compatible with our 'proc_inputs'
        assert self.proc_inputs is not None
        for i, in_proc in enumerate(self.proc_inputs):
            for j in streams_in_profile:
                jj = DataProps.from_dict(j)
                if (public == jj.is_public() and in_proc.is_compatible(jj)
                        and (not jj.is_pubsub() or not skip_pubsub)):
                    net_hash = jj.net_hash(peer_id)

                    if net_hash not in added_net_hash_to_prop_name:
                        added_net_hash_to_prop_name[net_hash] = set()
                    if jj.name not in added_net_hash_to_prop_name[net_hash]:
                        added_net_hash_to_prop_name[net_hash].add(jj.name)
                        added_streams.append((jj, net_hash))

                    # Saving the position in the proc_input list
                    self.compat_in_streams[i].add((net_hash, jj.get_name()))

        # Find streams that are compatible with our 'proc_outputs'
        has_cross_entropy = []
        assert self.proc_outputs is not None
        if 'losses' in self.proc_opts:
            for i in range(0, len(self.proc_outputs)):
                if self.proc_opts['losses'][i] is not None and \
                        (self.proc_opts['losses'][i] == torch.nn.functional.cross_entropy or
                         isinstance(self.proc_opts['losses'][i], torch.nn.CrossEntropyLoss) or
                         "cross_entropy" in self.proc_opts['losses'][i].__name__):
                    has_cross_entropy.append(True)
                else:
                    has_cross_entropy.append(False)

        for i, out_proc in enumerate(self.proc_outputs):
            out_proc: StreamType
            for j in streams_in_profile:
                jj = DataProps.from_dict(j)
                if ((public == jj.is_public() and
                        (out_proc.is_compatible(jj) or (jj.is_tensor_target_id() and has_cross_entropy[i])))
                        and (not jj.is_pubsub() or not skip_pubsub)):
                    net_hash = jj.net_hash(peer_id)

                    if net_hash not in added_net_hash_to_prop_name:
                        added_net_hash_to_prop_name[net_hash] = set()
                    if jj.name not in added_net_hash_to_prop_name[net_hash]:
                        added_net_hash_to_prop_name[net_hash].add(jj.name)
                        added_streams.append((jj, net_hash))

                    # Saving the position in the proc_output list
                    self.compat_out_streams[i].add((net_hash, jj.get_name()))

    net_hashes_to_subscribe = set()

    # For each compatible stream found...
    for (props, net_hash) in added_streams:

        # Check if it is a new stream or a data stream to add to an already known stream
        already_known_stream = net_hash in self.known_streams

        # Creating the stream object
        if not buffered:
            stream = Stream(props=props.clone())
        else:
            stream = BufferedStream(props=props.clone())

        # Add the data stream to the list of known streams
        # if the stream already exists it will be overwritten (which is fine in case of changes)
        self.add_stream(stream, owned=False, net_hash=net_hash)

        # If the stream is over PubSub, and we are not already subscribed, we will subscribe
        if props.is_pubsub() and not already_known_stream:
            net_hashes_to_subscribe.add(net_hash)

    # Opening PubSubs
    for net_hash in net_hashes_to_subscribe:
        log.misc(f"Opening channel for the not-owned but processor-compatible stream {net_hash}")
        if not (await self._node_conn.subscribe(peer_id, channel=net_hash)):
            log.error(f"Error subscribing to {net_hash}")
            return False

    return True

subscribe_to_pubsub_owned_streams async

subscribe_to_pubsub_owned_streams() -> bool

Subscribes to all owned streams that are marked as PubSub (async).

Returns:

Type Description
bool

True if all subscriptions were successful, False otherwise.

Source code in unaiverse/agent_basics.py
async def subscribe_to_pubsub_owned_streams(self) -> bool:
    """Subscribes to all owned streams that are marked as PubSub (async).

    Returns:
        True if all subscriptions were successful, False otherwise.
    """

    # Opening channels for all the (groups of) owned streams (generated and not)
    for net_hash in self.owned_streams.keys():
        is_pubsub = Stream.is_pubsub_from_net_hash(net_hash)

        if is_pubsub:
            log.misc(f"Opening channel for the owned stream {net_hash}")
            peer_id = Stream.peer_id_from_net_hash(net_hash)  # Guessing peer ID from the net hash

            if not (await self._node_conn.subscribe(peer_id, channel=net_hash)):
                log.error(f"Cannot open a channel for owned stream hash {net_hash}")
                return False
    return True

update_streams_in_profile

update_streams_in_profile()

Updates the agent's profile with information about its owned (environmental and processor) streams.

Source code in unaiverse/agent_basics.py
def update_streams_in_profile(self):
    """Updates the agent's profile with information about its owned (environmental and processor) streams."""

    # Filling the information about the streams that can be generated and handled
    dynamic_profile = self._node_profile.get_dynamic_profile()
    if (hasattr(self, 'proc_outputs') and hasattr(self, 'proc_inputs') and
            self.proc_outputs is not None and self.proc_inputs is not None):
        dynamic_profile['proc_outputs'] = \
            [dct for d in self.proc_outputs for dct in d.to_list_of_dicts()]  # List of dict of DataProp
        dynamic_profile['proc_inputs'] = \
            [dct for d in self.proc_inputs for dct in d.to_list_of_dicts()]  # List of dict of DataProp

    # Adding the list of locally-created ("environmental") streams to the profile
    list_of_props = []
    public_peer_id, private_peer_id = self.get_peer_ids()
    for net_hash, streams_dict in self.owned_streams.items():
        if net_hash not in self.proc_streams.keys() and net_hash not in self.proc_in_streams.keys():
            if (DataProps.peer_id_from_net_hash(net_hash) == public_peer_id or
                    DataProps.peer_id_from_net_hash(net_hash) == private_peer_id):
                for stream in streams_dict.values():
                    list_of_props.append(stream.get_props().to_dict())  # DataProp
    if len(list_of_props) > 0:
        dynamic_profile['streams'] = list_of_props

send_profile_to_all async

send_profile_to_all()

Sends the agent's profile to all known agents (async).

Source code in unaiverse/agent_basics.py
async def send_profile_to_all(self):
    """Sends the agent's profile to all known agents (async)."""

    agents = list(self.all_agents.keys())
    for peer_id in agents:
        log.misc(f"Sending profile to {peer_id}")
        if not (await self._node_conn.send(peer_id, channel_trail=None,
                                           content=self._node_profile.get_all_profile(),
                                           content_type=Msg.PROFILE)):
            log.error("Failed to send profile, removing (disconnecting) " + peer_id)
            await self.remove_agent(peer_id)

behave async

behave()

Behave in the current environment, calling the state-machines of the public and private networks (async).

Source code in unaiverse/agent_basics.py
async def behave(self):
    """Behave in the current environment, calling the state-machines of the public and private networks (async)."""

    if self.in_world():
        log.set_sub("prv")
        log.misc("Behaving (world)...")
        if self.behav is None:
            log.error("No behaviour specified")
        else:
            if self.behav_lone_wolf is not None:
                self.behav_lone_wolf.enable(False)
            self.behav.enable(True)
            self.stdin.bind(self._proc_in_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
            self.stdout.bind(self._proc_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
            self.stdext.bind(self._env_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
            await self.on_tick()
            await self.behav.act()
            self.behav.enable(False)

    log.set_sub("pub")
    log.misc("Behaving (public)...")
    if self.behav_lone_wolf is None:
        log.error("No behaviour specified")
    else:
        if self.behav is not None:
            self.behav.enable(False)
        self.behav_lone_wolf.enable(True)
        self.stdin.bind(self._proc_in_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdout.bind(self._proc_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
        self.stdext.bind(self._env_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
        await self.behav_lone_wolf.act()
        self.behav_lone_wolf.enable(False)

    log.set_sub("gen")

    # Clear expired interactions at every clock cycle and notify originators
    await self.im.complete_expired()

    # Also drain recently completed interactions for status notification
    completed = self.im.drain_completed()

    # Clear stream-expired data
    self.im.clear_expired_stream_data()

    for interaction in completed:
        if (interaction.requester is not None and not interaction.volatile and
                interaction.requester in self.all_agents):
            log.inter(f"Communicating completion of interaction {interaction.to_code_str(True)}")
            my_public_peer_id, my_private_peer_id = self.get_peer_ids()
            if interaction.requester in self.public_agents:
                my_peer_id: str = my_public_peer_id
            else:
                my_peer_id: str = my_private_peer_id
            ret = await self._node_conn.send(interaction.requester, channel_trail=None,
                                             content=interaction.to_status_dict(status_generated_by=my_peer_id),
                                             content_type=Msg.INTERACTION_STATUS)
            if not ret:
                log.error(
                    f"Failed to send interaction status to peer {interaction.requester}, disconnecting it")

                await self._node_purge_fcn(interaction.requester)

learn_behave

learn_behave(state: int, last_action: int, prev_state: int)

A placeholder method for behavioral learning, intended to be implemented by child classes. It receives state and action information to update a behavioral model.

Parameters:

Name Type Description Default
state int

The current state of the agent.

required
last_action int

The last action taken.

required
prev_state int

The previous state of the agent.

required

Returns:

Type Description

None in the base implementation; subclasses are expected to return state-related feedback.

Source code in unaiverse/agent_basics.py
def learn_behave(self, state: int, last_action: int, prev_state: int):
    """A placeholder method for behavioral learning, intended to be implemented by child classes.
    It receives state and action information to update a behavioral model.

    Args:
        state: The current state of the agent.
        last_action: The last action taken.
        prev_state: The previous state of the agent.

    Returns:
        None in the base implementation; subclasses are expected to return state-related feedback.
    """
    pass

on_tick async

on_tick()
Source code in unaiverse/agent_basics.py
async def on_tick(self):
    pass

get_peer_id

get_peer_id() -> str

Return the current peer ID of the agent, depending on whether it is behaving in a world or not.

Returns:

Type Description
str

The private peer ID if the agent is behaving in a world, the public peer ID otherwise.

Source code in unaiverse/agent_basics.py
def get_peer_id(self) -> str:
    """Return the current peer ID of the agent, depending on whether it is behaving in a world or not.

    Returns:
        The private peer ID if the agent is behaving in a world, the public peer ID otherwise.
    """
    public_peer_id, private_peer_id = self.get_peer_ids()
    if self.behaving_in_world():
        return private_peer_id
    else:
        return public_peer_id

get_peer_ids

get_peer_ids() -> tuple[str, str]

Retrieve the public and private peer IDs of the agent, from the underlying node's dynamic profile.

Returns:

Type Description
str

A tuple containing the public peer ID and the private peer ID.

str

If either ID is not available, a placeholder string is returned , .

Source code in unaiverse/agent_basics.py
def get_peer_ids(self) -> tuple[str, str]:
    """Retrieve the public and private peer IDs of the agent, from the underlying node's dynamic profile.

    Returns:
        A tuple containing the public peer ID and the private peer ID.
        If either ID is not available, a placeholder string is returned <public_peer_id>, <private_peer_id>.
    """
    public_peer_id = None
    private_peer_id = None
    if self._node_profile is not None:
        dynamic_profile = self._node_profile.get_dynamic_profile()
        public_peer_id: str = dynamic_profile['peer_id']  # Public
        private_peer_id: str = dynamic_profile['private_peer_id']  # Private
    public_peer_id: str = '<public_peer_id>' if public_peer_id is None else public_peer_id
    private_peer_id: str = '<private_peer_id>' if private_peer_id is None else private_peer_id
    return public_peer_id, private_peer_id

evaluate_profile

evaluate_profile(role: int, profile: NodeProfile) -> bool

Evaluate if a given profile is valid for this agent based on its role. It helps in identifying and filtering out invalid or 'cheating' profiles.

Parameters:

Name Type Description Default
role int

The expected integer role (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER) for the profile.

required
profile NodeProfile

The NodeProfile object to be evaluated.

required

Returns:

Type Description
bool

True if the profile is considered valid for the specified role, False otherwise.

Source code in unaiverse/agent_basics.py
def evaluate_profile(self, role: int, profile: NodeProfile) -> bool:
    """Evaluate if a given profile is valid for this agent based on its role. It helps in identifying and filtering
     out invalid or 'cheating' profiles.

    Args:
        role: The expected integer role (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER) for the profile.
        profile: The NodeProfile object to be evaluated.

    Returns:
        True if the profile is considered valid for the specified role, False otherwise.
    """

    # If the role in the profile is not the provided role, a profile-cheater was found
    if (profile.get_dynamic_profile()['connections']['role'] in self.ROLE_STR_TO_BITS and
            self.ROLE_STR_TO_BITS[profile.get_dynamic_profile()['connections']['role']] != role):
        log.user(f"Cheater found: "
                 f"{profile.get_dynamic_profile()['connections']['role']} != {self.ROLE_BITS_TO_STR[role]}")
        return False  # Cheater found

    # These are just examples: you are expected to reimplement this method in your custom agent file
    if (role & 1 == self.ROLE_PUBLIC and
            profile.get_dynamic_profile()['guessed_location'] == 'Some Dummy Location, Just An Example Here'):
        return False
    elif (role & 3 == self.ROLE_WORLD_MASTER and
          profile.get_dynamic_profile()['guessed_location'] == 'Some Other Location, Just Another Example Here'):
        return False
    else:
        return True

accept_new_role

accept_new_role(role: int)

Set the agent's role and optionally load a default behavior (private/world behavior).

Parameters:

Name Type Description Default
role int

The integer role to assign to the agent (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER).

required
Source code in unaiverse/agent_basics.py
def accept_new_role(self, role: int):
    """Set the agent's role and optionally load a default behavior (private/world behavior).

    Args:
        role: The integer role to assign to the agent (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER).
    """
    base_role_str = self.ROLE_BITS_TO_STR[(role >> 2) << 2]
    full_role_str = self.ROLE_BITS_TO_STR[role]

    self._node_profile.get_dynamic_profile()['connections']['role'] = full_role_str

    default_behav = None  # A public role will not be found in the world map
    if self.world_profile is not None:
        base_role_to_behav = self.world_profile.get_dynamic_profile()['world_roles_fsm']
        if base_role_str in base_role_to_behav:
            default_behav = self.world_profile.get_dynamic_profile()['world_roles_fsm'][base_role_str]

    if default_behav is not None and isinstance(default_behav, str) and len(default_behav) > 0:
        default_behav_hsm = HybridStateMachine(self)
        default_behav_hsm.load(default_behav)
        self.behav = HybridStateMachine(self, policy=self.policy_default)
        self.behav.include(default_behav_hsm, make_a_copy=True)
        self.behav.set_role(base_role_str)
        self.set_policy_filter(self.policy_filter, public=False)

is_human

is_human()

Check if the agent is marked as human in its node.

Returns:

Type Description

True if the agent is in a human, False otherwise.

Source code in unaiverse/agent_basics.py
def is_human(self):
    """Check if the agent is marked as human in its node.

    Returns:
        True if the agent is in a human, False otherwise.
    """
    if self._node_profile is not None:
        return self._node_profile.get_static_profile()["node_type"] == self.HUMAN
    else:
        return False

in_world

in_world()

Check if the agent is currently operating within a 'world'.

Returns:

Type Description

True if the agent is in a world, False otherwise.

Source code in unaiverse/agent_basics.py
def in_world(self):
    """Check if the agent is currently operating within a 'world'.

    Returns:
        True if the agent is in a world, False otherwise.
    """
    if self._node_profile is not None:
        return self.ROLE_STR_TO_BITS[self._node_profile.get_dynamic_profile()['connections']['role']] & 1 == 1
    else:
        return False

behaving_in_world

behaving_in_world()

Checks if the agent's world-specific behavior state machine is currently active.

Returns:

Type Description

True if the world behavior is active, False otherwise.

Source code in unaiverse/agent_basics.py
def behaving_in_world(self):
    """Checks if the agent's world-specific behavior state machine is currently active.

    Returns:
        True if the world behavior is active, False otherwise.
    """
    return self.behav.is_enabled()

get_stream_sample

get_stream_sample(net_hash: str, sample_dict: dict[str, dict[str, Tensor | None | int | str]]) -> list[tuple[str, str]]

Receive and process stream samples that were provided by another agent.

Parameters:

Name Type Description Default
net_hash str

The network hash identifying the source of the stream samples.

required
sample_dict dict[str, dict[str, Tensor | None | int | str]]

A dictionary where keys are stream names and values are dictionaries containing 'data', 'data_tag', and 'data_uuid' for each sample.

required

Returns:

Type Description
list[tuple[str, str]]

The list of tuples (stream user hash, data UUID) for each added data sample.

Source code in unaiverse/agent_basics.py
def get_stream_sample(self, net_hash: str,
                      sample_dict: dict[str, dict[str, torch.Tensor | None | int | str]]) \
        -> list[tuple[str, str]]:
    """Receive and process stream samples that were provided by another agent.

    Args:
        net_hash: The network hash identifying the source of the stream samples.
        sample_dict: A dictionary where keys are stream names and values are dictionaries
                     containing 'data', 'data_tag', and 'data_uuid' for each sample.

    Returns:
        The list of tuples (stream user hash, data UUID) for each added data sample.
    """

    # Let's be sure that the net hash is converted from the user's perspective to the one of the code here
    net_hash = DataProps.normalize_net_hash(net_hash)
    added_data = []

    log.misc(f"Got a stream sample from {net_hash}...")
    if sample_dict is None:
        log.error(f"Invalid sample (expected a dictionary, got {type(sample_dict)})")
        return added_data

    if net_hash in self.known_streams:
        for name, data_and_tag_and_uuid in sample_dict.items():
            if ('data' not in data_and_tag_and_uuid or
                    'data_tag' not in data_and_tag_and_uuid or
                    'data_uuid' not in data_and_tag_and_uuid):
                log.error(f"Invalid sample in data stream named {name} (missing one or more keys)")
                continue

            data, data_tag, data_uuid = (data_and_tag_and_uuid['data'],
                                         data_and_tag_and_uuid['data_tag'],
                                         data_and_tag_and_uuid['data_uuid'])

            # - data must be not None
            # - the stream name must be known
            # - if the UUID associated to our local stream is the same of the data, then we check tag order
            # - if the UUID associated to our local stream is the expected one, we don't check tag order
            skip = False
            reason = None
            has_data = False
            if not skip:
                if data is None:
                    skip = True
                    reason = "Data is None"
            if not skip:
                if net_hash not in self.known_streams:
                    skip = True
                    reason = f"The net hash {net_hash} is not a known stream hash"
            if not skip:
                if name not in self.known_streams[net_hash]:
                    skip = True
                    reason = (f"The data stream named {name} is not present for net hash {net_hash} "
                              f"(names: {list(self.known_streams[net_hash].keys())})")
            if not skip:
                has_data = self.known_streams[net_hash][name].has_data(data_uuid)
                if has_data and data_tag <= self.known_streams[net_hash][name].get_tag(data_uuid):
                    skip = True
                    reason = (f"The data tag {data_tag} is less or equal to the already present one "
                              f"for UUID {data_uuid} "
                              f"({self.known_streams[net_hash][name].get_tag(data_uuid)})")

            # If we sample can be accepted...
            if not skip:
                log.misc(f"Accepted sample named {name}: tag={data_tag}, uuid={data_uuid}" +
                         (": it is a new UUID" if not has_data else ""))

                # Saving the data sample on the known stream objects
                self.known_streams[net_hash][name].set(data, data_tag, uuid=data_uuid)

                # Data to return
                added_data.append((DataProps.user_hash_from_net_hash(net_hash, name), data_uuid))

                # Buffering data, if it was requested and if this sample comes from somebody's processor
                if (self.buffer_generated_by_others != "none" and
                        DataProps.name_or_group_from_net_hash(net_hash) == "processor"):
                    log.debug(f"[get_stream_sample] Buffering others' processor generated data...")

                    # Getting the streams of the processor of the source agent
                    _processor_stream_dict = self.known_streams[net_hash]
                    _peer_id = DataProps.peer_id_from_net_hash(net_hash)

                    # Setting buffered stream counter
                    clear = False
                    if _peer_id in self.last_buffered_peer_id_to_info:
                        if self.buffer_generated_by_others == "one":
                            _buffered_uuid_to_id = self.last_buffered_peer_id_to_info[_peer_id]["uuid_to_id"]
                            if data_uuid not in _buffered_uuid_to_id:
                                _id = next(iter(_buffered_uuid_to_id.values()))
                                _buffered_uuid_to_id.clear()
                                _buffered_uuid_to_id[data_uuid] = _id
                                clear = True
                    else:
                        self.last_buffered_peer_id_to_info[_peer_id] = {"uuid_to_id": {}, "net_hash": None}
                    _buffered_uuid_to_id = self.last_buffered_peer_id_to_info[_peer_id]["uuid_to_id"]
                    if data_uuid not in _buffered_uuid_to_id:
                        _buffered_uuid_to_id[data_uuid] = sum(
                            len(v["uuid_to_id"]) for v in self.last_buffered_peer_id_to_info.values()) + 1
                    _buffered_id = _buffered_uuid_to_id[data_uuid]

                    # Building net hash to retrieve the buffered stream
                    _group = "buffered" + str(_buffered_id)
                    _name = name + "@" + _group
                    _net_hash = DataProps.build_net_hash(
                        _peer_id,
                        pubsub=False,
                        name_or_group=_group)

                    # If the buffered stream was not created before
                    if _net_hash not in self.known_streams:
                        log.debug(f"[get_stream_sample] Adding a new buffered stream to the list of known "
                                  f"streams, hash: {_net_hash}")
                        for stream_obj in _processor_stream_dict.values():
                            # Same properties of the stream of the processor of the source agent
                            props = stream_obj.get_props().clone()
                            props.set_group("buffered" + str(_buffered_id))
                            props.set_name(props.get_name() + "@" + props.get_group())

                            # Adding the newly created stream
                            self.add_stream(BufferedStream(props=props),
                                            owned=False,
                                            net_hash=_net_hash)

                        # Saving hash of the new buffered stream
                        self.last_buffered_peer_id_to_info[_peer_id]["net_hash"] = _net_hash
                    else:
                        if clear:
                            for stream_obj in self.known_streams[_net_hash].values():
                                stream_obj.clear_buffer()

                    # Saving sample
                    self.known_streams[_net_hash][_name].set(data, data_tag, uuid=data_uuid)
                    if self.known_streams[_net_hash][_name].get_interaction(data_uuid) is None:
                        interaction = self.known_streams[net_hash][name].get_interaction(data_uuid)
                        if interaction is not None:
                            self.im.add_lazy_stream_to_interaction(
                                DataProps.user_hash_from_net_hash(_net_hash, _name), interaction)

            # If we decided to skip this sample...
            else:
                log.misc(f"Skipping sample named {name} received in net hash {net_hash}, "
                         f"tag={data_tag}, uuid={data_uuid}: {reason}")

                if net_hash not in self.known_streams:
                    log.debug(f"[get_stream_sample] "
                              f"The net hash {net_hash} was not found in the set of known streams")
                else:
                    if name not in self.known_streams[net_hash]:
                        log.debug(f"[get_stream_sample] The net hash was known, but the data stream "
                                  f"named {name} is not known")
                    else:
                        log.debug(f"[get_stream_sample] "
                                  f"data={self.known_streams[net_hash][name].props.to_text(data)}")
        return added_data

    # If this stream is not known at all...
    else:
        log.misc(f"Skipping sample from {net_hash} (data stream is unknown)")
        return added_data

send_stream_samples async

send_stream_samples()

Collect and send stream samples from all owned streams to appropriate recipients (async).

Source code in unaiverse/agent_basics.py
async def send_stream_samples(self):
    """Collect and send stream samples from all owned streams to appropriate recipients (async)."""

    # For each owned net hash...
    for net_hash, streams_dict in self.owned_streams.items():

        # Skipping our processor input
        if DataProps.name_or_group_from_net_hash(net_hash) == "processor_in":
            continue

        # Get samples from all the owned streams: taking notes about the recipient and of what to send (contents)
        interactions_by_uuid = {}
        recipients_by_uuid = {}
        contents_by_uuid = {}
        contents_data_by_uuid = {}

        # Preparing content to send
        empty_contents: dict[str, dict | None] = {name: None for name in streams_dict.keys()}
        empty_data = {name: None for name in streams_dict.keys()}
        valid_samples_count = {}

        # For each stream within this net hash...
        for name, stream in streams_dict.items():
            if stream.is_pubsub():
                uuids = stream.get_data_uuids()  # We send by PubSub whenever there is some data
            else:
                uuids = stream.get_interaction_uuids()  # We send direct messages only if there is an interaction

            # For each agent who triggered the interaction that generated this data (recipient)

            # Set None as last UUID (it is present in pre-built BufferedStreams)
            # since it is sometimes replaced by the last set reference,
            # and we prefer to give priority to the not-None cases
            if None in uuids:
                uuids = [x for x in uuids if x is not None] + [None]

            for uuid in uuids:
                interaction = stream.get_interaction(uuid)
                recipient = None

                # Invalid agent? Skip!
                if not stream.is_pubsub():
                    recipient = self.im.get_recipients(interaction)
                    if len(recipient) == 0:
                        continue

                # Skipping system-triggered interactions
                if recipient == [Custom.SYSTEM_INTERACTION_LABEL]:
                    continue

                # Get data
                data = stream.get(requested_by="send_stream_samples", uuid=uuid)
                data_tag = stream.get_tag(uuid=uuid)

                log.debug(f"[send_stream_sample] data from stream {stream.props.get_name()} = {data}")

                if data is not None:
                    log.debug(f"[send_stream_samples] Found something in stream {stream.props.get_name()} "
                              f"uuid={uuid}, data_tag={data_tag}, recipient={recipient}")

                    # Prepare data structures the first time we meet a new recipient
                    if uuid not in contents_by_uuid:
                        interactions_by_uuid[uuid] = interaction
                        recipients_by_uuid[uuid] = recipient
                        contents_by_uuid[uuid] = empty_contents.copy()
                        contents_data_by_uuid[uuid] = empty_data.copy()
                        valid_samples_count[uuid] = 0

                    # Pack into the prepared data structures
                    contents_by_uuid[uuid][name] = {'data': data,
                                                    'data_tag': data_tag,
                                                    'data_uuid': uuid}
                    contents_data_by_uuid[uuid][name] = data
                    valid_samples_count[uuid] += 1
                else:
                    log.debug(
                        f"[send_stream_samples] None data in stream {stream.props.get_name()} uuid={uuid}")

        # Remove recipients of not-net-hash-full-of-contents messages
        uuid_to_remove = []
        for uuid, count in valid_samples_count.items():
            if count != len(streams_dict):
                uuid_to_remove.append(uuid)
                log.debug(f"[send_stream_samples] Cannot send data for {net_hash}, uuid {uuid}, "
                          f"since it is incomplete ({valid_samples_count[uuid]} vs {len(streams_dict)})")
        for uuid in uuid_to_remove:
            del interactions_by_uuid[uuid]
            del recipients_by_uuid[uuid]
            del contents_by_uuid[uuid]
            del contents_data_by_uuid[uuid]
            del valid_samples_count[uuid]

        # If direct message...
        if not Stream.is_pubsub_from_net_hash(net_hash):
            name_or_group = DataProps.name_or_group_from_net_hash(net_hash)

            for uuid, recipients in recipients_by_uuid.items():
                content = contents_by_uuid[uuid]
                content_data = contents_data_by_uuid[uuid]

                for recipient in recipients:
                    log.debug(f"[send_stream_samples] " 
                              f"Sending samples of {net_hash} by direct message to {recipient}...")
                    for name in content.keys():
                        content[name]['data'] = self.hook_before_sending_sample(content_data[name],
                                                                                content[name]['data_tag'],
                                                                                net_hash, name, recipient)
                        log.debug(f"[send_stream_samples] "
                                  f"(data_tag={content[name]['data_tag']}, data_uuid={uuid}, "
                                  f"data is None?={content[name]['data'] is None})")

                    ret = await self._node_conn.send(recipient, channel_trail=name_or_group,
                                                     content_type=Msg.STREAM_SAMPLE,
                                                     content=content)

                    log.debug(f"[send_stream_samples] Sending returned: " + str(ret))

        # If pubsub...
        if Stream.is_pubsub_from_net_hash(net_hash):
            for uuid, recipients in recipients_by_uuid.items():
                log.debug(f"[send_stream_samples] Sending stream samples of the whole {net_hash} by pubsub...")

                content = contents_by_uuid[uuid]
                content_data = contents_data_by_uuid[uuid]

                for name in content.keys():
                    content[name]['data'] = self.hook_before_sending_sample(content_data[name],
                                                                            content[name]['data_tag'],
                                                                            net_hash, name, None)
                    log.debug(
                        f"[send_stream_samples] (data_tag={content[name]['data_tag']}, data_uuid={uuid}, "
                        f"data is None?={content[name]['data'] is None}")

                peer_id = Stream.peer_id_from_net_hash(net_hash)  # Guessing agent peer ID from the net hash
                ret = await self._node_conn.publish(peer_id, channel=net_hash,
                                                    content_type=Msg.STREAM_SAMPLE,
                                                    content=content)

                log.debug(f"[send_stream_samples] Sending returned: " + str(ret))

disable_proc_input

disable_proc_input(public: bool)

Disable the processor input stream of this agent.

Parameters:

Name Type Description Default
public bool

If True, disables the public input stream, otherwise the private one.

required
Source code in unaiverse/agent_basics.py
def disable_proc_input(self, public: bool):
    """Disable the processor input stream of this agent.

    Args:
        public: If True, disables the public input stream, otherwise the private one.
    """
    stream_dict = self.owned_streams[self.get_proc_input_net_hash(public=public)]
    for stream_obj in stream_dict.values():
        if stream_obj.is_public() == public:
            stream_obj.disable()

enable_proc_input

enable_proc_input(public: bool)

Enable the processor input stream of this agent.

Parameters:

Name Type Description Default
public bool

If True, enables the public input stream, otherwise the private one.

required
Source code in unaiverse/agent_basics.py
def enable_proc_input(self, public: bool):
    """Enable the processor input stream of this agent.

    Args:
        public: If True, enables the public input stream, otherwise the private one.
    """
    stream_dict = self.owned_streams[self.get_proc_input_net_hash(public=public)]
    for stream_obj in stream_dict.values():
        if stream_obj.is_public() == public:
            stream_obj.enable()

set_proc_input

set_proc_input(data: str | Image | Tensor | None, public: bool = False, uuid: str | None = None, data_type: str = 'auto', data_tag: int = -1) -> bool

Set the data in the processor input stream.

Parameters:

Name Type Description Default
data str | Image | Tensor | None

The data to set in the stream (text, image, or tensor).

required
public bool

If True, targets the public input stream, otherwise the private one.

False
uuid str | None

Optional UUID to associate with the data sample.

None
data_type str

The type of data ("text", "img", "tensor", or "auto" to infer from the data).

'auto'
data_tag int

The tag associated with the sample (default -1).

-1

Returns:

Type Description
bool

True if the data was successfully set, False otherwise.

Source code in unaiverse/agent_basics.py
def set_proc_input(self, data: str | Image | torch.Tensor | None, public: bool = False,
                   uuid: str | None = None, data_type: str = "auto", data_tag: int = -1) -> bool:
    """Set the data in the processor input stream.

    Args:
        data: The data to set in the stream (text, image, or tensor).
        public: If True, targets the public input stream, otherwise the private one.
        uuid: Optional UUID to associate with the data sample.
        data_type: The type of data ("text", "img", "tensor", or "auto" to infer from the data).
        data_tag: The tag associated with the sample (default -1).

    Returns:
        True if the data was successfully set, False otherwise.
    """
    peer_id = self.get_peer_ids()[0] if public else self.get_peer_ids()[1]
    proc_in = self.find_streams(peer_id, "processor_in")
    if proc_in is None or len(proc_in) == 0:
        return False
    for net_hash, stream_dict in proc_in.items():
        if not DataProps.is_pubsub_from_net_hash(net_hash):
            for stream_name, stream_obj in stream_dict.items():
                if stream_obj.props.is_public() == public:
                    if (data is None or
                            ((data_type == "text" or isinstance(data, str)) and stream_obj.props.is_text()) or
                            ((data_type == "img" or isinstance(data, Image)) and stream_obj.props.is_img()) or
                            ((data_type == "tensor" or isinstance(data, torch.Tensor))
                             and stream_obj.props.is_tensor())):
                        stream_obj.set(data, uuid=uuid)  # This might fail if the stream is disabled
                        stream_obj.set_tag(data_tag, uuid=uuid)
                        return True
    return False

get_tag

get_tag(net_hash: str, uuid: str | None = None) -> int

Get the maximum data tag across all streams associated with the given network hash.

Parameters:

Name Type Description Default
net_hash str

The network hash identifying the stream group.

required
uuid str | None

Optional UUID of the data sample. If None, the latest tag is returned.

None

Returns:

Type Description
int

The maximum data tag found, or -1 if the stream is unknown.

Source code in unaiverse/agent_basics.py
def get_tag(self, net_hash: str, uuid: str | None = None) -> int:
    """Get the maximum data tag across all streams associated with the given network hash.

    Args:
        net_hash: The network hash identifying the stream group.
        uuid: Optional UUID of the data sample. If None, the latest tag is returned.

    Returns:
        The maximum data tag found, or -1 if the stream is unknown.
    """
    if net_hash in self.known_streams:
        data_tag = -1
        stream_dict = self.known_streams[net_hash]
        for stream_obj in stream_dict.values():
            t = stream_obj.get_tag(uuid)
            if t is not None:
                data_tag = max(data_tag, t)
        return data_tag
    return -1

set_tag

set_tag(net_hash: str, data_tag: int, uuid: str | None = None)

Set the data tag on all streams associated with the given network hash.

Parameters:

Name Type Description Default
net_hash str

The network hash identifying the stream group.

required
data_tag int

The tag value to set.

required
uuid str | None

Optional UUID of the data sample to tag. If None, the latest sample is tagged.

None
Source code in unaiverse/agent_basics.py
def set_tag(self, net_hash: str, data_tag: int, uuid: str | None = None):
    """Set the data tag on all streams associated with the given network hash.

    Args:
        net_hash: The network hash identifying the stream group.
        data_tag: The tag value to set.
        uuid: Optional UUID of the data sample to tag. If None, the latest sample is tagged.
    """
    if net_hash in self.known_streams:
        stream_dict = self.known_streams[net_hash]
        for stream_obj in stream_dict.values():
            stream_obj.set_tag(data_tag, uuid=uuid)

get_current_interaction

get_current_interaction()
Source code in unaiverse/agent_basics.py
def get_current_interaction(self):
    return self.im.get_current()

force_action_step

force_action_step(step: int)

Override the current action step index used in the active behavior.

Parameters:

Name Type Description Default
step int

The action step index to force. If negative, the override is cleared.

required
Source code in unaiverse/agent_basics.py
def force_action_step(self, step: int):
    """Override the current action step index used in the active behavior.

    Args:
        step: The action step index to force. If negative, the override is cleared.
    """
    self.overridden_action_step = step if step >= 0 else None

get_action_step

get_action_step()

Retrieve the current action step from the agent's private/world behavior.

Returns:

Type Description

The current action step object from the HybridStateMachine's active action, or None if no action.

Source code in unaiverse/agent_basics.py
def get_action_step(self):
    """Retrieve the current action step from the agent's private/world behavior.

    Returns:
        The current action step object from the HybridStateMachine's active action, or None if no action.
    """
    behav = self.behav if self.behav.is_enabled() else self.behav_lone_wolf
    return behav.get_action_step_idx() if self.overridden_action_step is None else self.overridden_action_step

is_last_action_step

is_last_action_step()

Check if the agent's current action (private/world behavior) is on its last step.

Returns:

Type Description

True if the current action was its last step, False otherwise. Returns None if there is no active action.

Source code in unaiverse/agent_basics.py
def is_last_action_step(self):
    """Check if the agent's current action (private/world behavior) is on its last step.

    Returns:
        True if the current action was its last step, False otherwise. Returns None if there is no active action.
    """
    if self.im.current is not None:
        return self.im.current.was_last_step_done()
    else:
        return None

is_multi_steps_action

is_multi_steps_action()

Determines if the current action is a multistep action.

Returns:

Type Description

True if the action is multistep, False otherwise.

Source code in unaiverse/agent_basics.py
def is_multi_steps_action(self):
    """Determines if the current action is a multistep action.

    Returns:
        True if the action is multistep, False otherwise.
    """
    if self.im.current is not None:
        return self.im.current.is_multi_steps()
    else:
        return False

set_policy async

set_policy(policy_method_name_or_policy_fcn: str | Callable[[list[Action]], [int, Interaction | None]], public: bool = False) -> bool

Sets the policy to be used in selecting what action to perform in the current state (async).

Parameters:

Name Type Description Default
policy_method_name_or_policy_fcn str | Callable[[list[Action]], [int, Interaction | None]]

The name of a method of the Agent class that implements a policy function. It is a function that takes a list of Action objects that are candidates for execution, and returns the index of the selected action and an ActionRequest object with the action-requester details (requester, arguments, time, and UUID), or -1 and None if no action is selected. By design, every agent implements a basic policy function named "policy_default".

required
public bool

If True, the policy will be applied to the public HSM, otherwise to the private/world one.

False

Returns:

Type Description
bool

True if the policy was successfully set, False otherwise.

Source code in unaiverse/agent_basics.py
async def set_policy(self,
                     policy_method_name_or_policy_fcn: str | Callable[[list[Action]], [int, Interaction | None]],
                     public: bool = False) -> bool:
    """Sets the policy to be used in selecting what action to perform in the current state (async).

    Args:
        policy_method_name_or_policy_fcn: The name of a method of the Agent class that implements a policy function.
            It is a function that takes a list of `Action` objects that are candidates for execution, and returns
            the index of the selected action and an ActionRequest object with the action-requester details
            (requester, arguments, time, and UUID), or -1 and None if no action is selected.
            By design, every agent implements a basic policy function named "policy_default".
        public: If True, the policy will be applied to the public HSM, otherwise to the private/world one.

    Returns:
        True if the policy was successfully set, False otherwise.
    """
    if isinstance(policy_method_name_or_policy_fcn, str):
        policy_fcn = getattr(self, policy_method_name_or_policy_fcn, None)
        if not callable(policy_fcn):
            return False
        behav = self.behav if not public else self.behav_lone_wolf
        assert isinstance(behav, HybridStateMachine)
        behav.set_policy(policy_fcn)
        return True
    elif callable(policy_method_name_or_policy_fcn):
        policy_fcn = policy_method_name_or_policy_fcn
        behav = self.behav if not public else self.behav_lone_wolf
        assert isinstance(behav, HybridStateMachine)
        behav.set_policy(policy_fcn)
        return True
    return False

set_policy_filter

set_policy_filter(filter_method_name_or_policy_fcn: str | Callable[[int, Interaction | None, list[Action], dict], [int, Interaction | None]] | None, public: bool = False) -> bool

Sets the policy filter function, which overrides the action selected by the policy in the current state.

Parameters:

Name Type Description Default
filter_method_name_or_policy_fcn str | Callable[[int, Interaction | None, list[Action], dict], [int, Interaction | None]] | None

The name of a method of the Agent class or a function that implements a policy filtering function, overriding what the policy decided. It is a function that takes what the policy decided, a list of Action objects that are candidates for execution, and a dictionary with customizable field (always including the "agent" key, with a ref to the current agent) and returns the index of the selected action and an ActionRequest object with the action-requester details (requester, arguments, time, and UUID), or -1 and None if no action is selected. By design, every agent comes with no filtering active.

required
public bool

If True, the filter will be applied to the public HSM, otherwise to the private/world one.

False

Returns:

Type Description
bool

True if the filter was successfully set, False otherwise.

Source code in unaiverse/agent_basics.py
def set_policy_filter(self,
                      filter_method_name_or_policy_fcn: str | Callable[
                          [int, Interaction | None, list[Action], dict], [int, Interaction | None]] | None,
                      public: bool = False) -> bool:
    """Sets the policy filter function, which overrides the action selected by the policy in the current state.

    Args:
        filter_method_name_or_policy_fcn: The name of a method of the Agent class or a function that implements a
            policy filtering function, overriding what the policy decided.
            It is a function that takes what the policy decided, a list of `Action` objects that are candidates
            for execution, and a dictionary with customizable field (always including the "agent" key, with a ref
            to the current agent) and returns the index of the selected action and an ActionRequest object with the
            action-requester details (requester, arguments, time, and UUID), or -1 and None
            if no action is selected.
            By design, every agent comes with no filtering active.
        public: If True, the filter will be applied to the public HSM, otherwise to the private/world one.

    Returns:
        True if the filter was successfully set, False otherwise.
    """
    if isinstance(filter_method_name_or_policy_fcn, str):
        filter_fcn: Callable | None = getattr(self, filter_method_name_or_policy_fcn, None)
        if not callable(filter_fcn):
            return False
        if public:
            self.policy_filter_lone_wolf = filter_fcn
            assert isinstance(self.behav_lone_wolf, HybridStateMachine)
            self.behav_lone_wolf.set_policy_filter(self.policy_filter_lone_wolf, self.policy_filter_lone_wolf_opts)
            self.policy_filter_lone_wolf_opts['agent'] = self  # Forced (do it *after* set_policy_filter)
            self.policy_filter_lone_wolf_opts['public'] = True
        else:
            self.policy_filter = filter_fcn
            assert isinstance(self.behav, HybridStateMachine)
            self.behav.set_policy_filter(self.policy_filter, self.policy_filter_opts)
            self.policy_filter_opts['agent'] = self  # Forced (do it *after* set_policy_filter)
            self.policy_filter_opts['public'] = False
        return True
    elif callable(filter_method_name_or_policy_fcn):
        if public:
            self.policy_filter_lone_wolf = filter_method_name_or_policy_fcn
            assert isinstance(self.behav_lone_wolf, HybridStateMachine)
            self.behav_lone_wolf.set_policy_filter(self.policy_filter_lone_wolf, self.policy_filter_lone_wolf_opts)
            self.policy_filter_lone_wolf_opts['agent'] = self  # Forced (do it *after* set_policy_filter)
            self.policy_filter_lone_wolf_opts['public'] = True
        else:
            self.policy_filter = filter_method_name_or_policy_fcn
            assert isinstance(self.behav, HybridStateMachine)
            self.behav.set_policy_filter(self.policy_filter, self.policy_filter_opts)
            self.policy_filter_opts['agent'] = self  # Forced (do it *after* set_policy_filter)
            self.policy_filter_opts['public'] = False
        return True
    return False

policy_default

policy_default(actions_list: list[Action]) -> tuple[int, Interaction | None]

This is the default policy for selecting which action to execute from a list of feasible actions. It prioritizes actions that have been explicitly requested (i.e., have pending requests) on a first-come, first-served basis. If no requested actions are found, it then selects the first action in the list that is marked as ready.

Parameters:

Name Type Description Default
actions_list list[Action]

A list of Action objects that are candidates for execution.

required

Returns:

Type Description
tuple[int, Interaction | None]

The index of the selected action and an ActionRequest object with the requester details (requester, arguments, time, and UUID), or -1 and None if no action is selected.

Source code in unaiverse/agent_basics.py
def policy_default(self, actions_list: list[Action]) -> tuple[int, Interaction | None]:
    """This is the default policy for selecting which action to execute from a list of feasible actions.
    It prioritizes actions that have been explicitly requested (i.e., have pending requests) on a first-come,
    first-served basis. If no requested actions are found, it then selects the first action in the list that is
    marked as `ready`.

    Args:
        actions_list: A list of `Action` objects that are candidates for execution.

    Returns:
        The index of the selected action and an ActionRequest object with the requester details (requester,
            arguments, time, and UUID), or -1 and None if no action is selected.
    """
    for i, action in enumerate(actions_list):
        if action.is_high_priority:
            _list_of_requests = action.get_list_of_interactions()
            _selected_action_idx = i
            _selected_interaction = _list_of_requests.get_oldest_interaction() \
                if len(_list_of_requests) > 0 else None
            return _selected_action_idx, _selected_interaction
    for i, action in enumerate(actions_list):
        _list_of_requests = action.get_list_of_interactions()
        if len(_list_of_requests) > 0:  # The process action has more priority
            _selected_action_idx = i
            _selected_interaction = _list_of_requests.get_oldest_interaction()
            return _selected_action_idx, _selected_interaction
    for i, action in enumerate(actions_list):
        if action.is_ready(consider_interactions=False):
            _selected_action_idx = i
            _selected_interaction = None
            return _selected_action_idx, _selected_interaction
    _selected_action_idx = -1
    _selected_interaction = None
    return _selected_action_idx, _selected_interaction

hook_proc_tweak_inputs

hook_proc_tweak_inputs(inputs)

A callback method that saves the inputs to the processor right before execution.

Parameters:

Name Type Description Default
inputs

The data inputs for the processor.

required

Returns:

Type Description

The same inputs passed to the function.

Source code in unaiverse/agent_basics.py
def hook_proc_tweak_inputs(self, inputs):
    """A callback method that saves the inputs to the processor right before execution.

    Args:
        inputs: The data inputs for the processor.

    Returns:
        The same inputs passed to the function.
    """
    self.proc_last_inputs = inputs
    return inputs

hook_proc_tweak_outputs

hook_proc_tweak_outputs(outputs)

A callback method that saves the outputs from the processor right after execution.

Parameters:

Name Type Description Default
outputs

The data outputs from the processor.

required

Returns:

Type Description

The same outputs passed to the function.

Source code in unaiverse/agent_basics.py
def hook_proc_tweak_outputs(self, outputs):
    """A callback method that saves the outputs from the processor right after execution.

    Args:
        outputs: The data outputs from the processor.

    Returns:
        The same outputs passed to the function.
    """
    self.proc_last_outputs = outputs
    return outputs

hook_before_sending_sample

hook_before_sending_sample(data, data_tag: int, net_hash: str, stream_name: str, recipient: str | None)

A callback method that handles the steam data right before sending it through the network.

Parameters:

Name Type Description Default
data

The stream data sample.

required
data_tag int

The tag of the sample.

required
stream_name str

The name of the data stream.

required
net_hash str

The net hash of the whole stream.

required
recipient str | None

The (planned) recipient of this sample (or None in case of pubsub).

required

Returns:

Type Description

The same data passed to the function.

Source code in unaiverse/agent_basics.py
def hook_before_sending_sample(self, data, data_tag: int,
                               net_hash: str, stream_name: str, recipient: str | None):
    """A callback method that handles the steam data right before sending it through the network.

    Args:
        data: The stream data sample.
        data_tag: The tag of the sample.
        stream_name: The name of the data stream.
        net_hash: The net hash of the whole stream.
        recipient: The (planned) recipient of this sample (or None in case of pubsub).

    Returns:
        The same data passed to the function.
    """
    return data

remove_peer_from_agent_status_attrs

remove_peer_from_agent_status_attrs(peer_id: str)

Remove a peer ID from the status of the agent, assuming it to be the represented by attributes that start with '_'.

Source code in unaiverse/agent_basics.py
def remove_peer_from_agent_status_attrs(self, peer_id: str):
    """Remove a peer ID from the status of the agent, assuming it to be the represented by attributes that start
    with '_'."""
    for attr_name in dir(self):
        if attr_name.startswith("_") and (not attr_name.startswith("__") and not attr_name.startswith("_Agent")
                                          and not attr_name.startswith("_WAgent")):
            try:
                value = getattr(self, attr_name)
                if isinstance(value, list):
                    setattr(self, attr_name, [v for v in value if v != peer_id])
                elif isinstance(value, set):
                    value.discard(peer_id)
                elif isinstance(value, dict):
                    if peer_id in value:
                        del value[peer_id]
            except AttributeError:
                continue  # Skip read-only attributes

reset_agent_status_attrs

reset_agent_status_attrs()

Resets attributes that represent the status of the agent, assuming to be the ones that start with '_'.

Source code in unaiverse/agent_basics.py
def reset_agent_status_attrs(self):
    """Resets attributes that represent the status of the agent, assuming to be the ones that start with '_'."""
    for attr_name in dir(self):
        if attr_name.startswith("_") and (not attr_name.startswith("__") and not attr_name.startswith("_Agent")
                                          and not attr_name.startswith("_WAgent")):
            try:
                value = getattr(self, attr_name)
                if isinstance(value, list):
                    setattr(self, attr_name, [])
                elif isinstance(value, set):
                    setattr(self, attr_name, set())
                elif isinstance(value, dict):
                    setattr(self, attr_name, {})
                elif isinstance(value, int):
                    setattr(self, attr_name, 0)
                elif isinstance(value, float):
                    setattr(self, attr_name, 0.)
                elif isinstance(value, bool):
                    setattr(self, attr_name, False)
            except AttributeError:
                continue  # Skip read-only attributes

streams_to_str

streams_to_str() -> str

Return a formatted string listing all known streams, marking owned ones with an asterisk.

Returns:

Type Description
str

A multi-line string representation of the known streams.

Source code in unaiverse/agent_basics.py
def streams_to_str(self) -> str:
    """Return a formatted string listing all known streams, marking owned ones with an asterisk.

    Returns:
        A multi-line string representation of the known streams.
    """
    return "Streams:\n" + "\n".join([(("   @" if user_hash in self.owned_streams_by_user_hash else "   ") +
                                      DataProps.peer_id_from_user_hash(user_hash) + " => " +
                                      str(stream).replace("\n", "\n   "))
                                     for user_hash, stream in self.known_streams_by_user_hash.items()])

agent_state_dict

agent_state_dict() -> dict

Returns a dictionary containing an instance of the agent's state that can be saved.

Source code in unaiverse/agent_basics.py
def agent_state_dict(self) -> dict:
    """Returns a dictionary containing an instance of the agent's state that can be saved."""
    save_in_state = ['world_profile', ]
    return {k: getattr(self, k) for k in save_in_state}

save

save(where: str = '') -> bool

Save the agent's state, including its processor and other attributes, to a specified location.

Parameters:

Name Type Description Default
where str

The directory path where the agent's state should be saved. Defaults to "".

''

Returns:

Type Description
bool

True upon successful saving.

Raises:

Type Description
IOError

If there is an issue with file operations (e.g., directory creation, writing files).

(TypeError, ValueError, RuntimeError)

For other potential issues during serialization or saving.

Source code in unaiverse/agent_basics.py
def save(self, where: str = "") -> bool:
    """Save the agent's state, including its processor and other attributes, to a specified location.

    Args:
        where: The directory path where the agent's state should be saved. Defaults to "".

    Returns:
        True upon successful saving.

    Raises:
        IOError: If there is an issue with file operations (e.g., directory creation, writing files).
        TypeError, ValueError, RuntimeError: For other potential issues during serialization or saving.
    """

    if where == '':
        if self._node_identity_dir is None or len(self._node_identity_dir) == 0:
            return False
        where = os.path.join(self._node_identity_dir, "agent_state")  # Default save path

    os.makedirs(where, exist_ok=True)

    # Saving the processor
    if self.proc is not None and self.proc_updated_since_last_save:
        pt_final = os.path.join(where, f"{self._node_name}.pt")
        pt_tmp = pt_final + ".tmp"
        try:
            checkpoint = {
                'model_state_dict': self.proc.state_dict(),
            }

            # If your agent has an optimizer, save its state too
            if self.proc_opts.get('optimizer') is not None:
                checkpoint['optimizer_state_dict'] = self.proc_opts['optimizer'].state_dict()

            torch.save(checkpoint, pt_tmp)
            os.replace(pt_tmp, pt_final)  # Atomic move
            self.proc_updated_since_last_save = False
        except Exception as e:
            if os.path.exists(pt_tmp):
                os.remove(pt_tmp)
            log.error(f"Error saving processor: {e}")
            raise e

    # Save Agent State
    pkl_final = os.path.join(where, f"{self._node_name}.pkl")
    pkl_tmp = pkl_final + ".tmp"
    try:
        state = self.agent_state_dict()
        with open(pkl_tmp, "wb") as f:
            pickle.dump(state, f)
        os.replace(pkl_tmp, pkl_final)
    except Exception as e:
        log.error(f"Could not save " + ("agent" if not self.is_world else "world") + f": {e}")
        if os.path.exists(pkl_tmp):
            os.remove(pkl_tmp)
        raise e

    return True

load

load(where: str = '') -> bool

Load the agent's state from a specified location.

Parameters:

Name Type Description Default
where str

The directory path from which the agent's state should be loaded. Defaults to "".

''

Returns:

Type Description
bool

True if loading succeeded.

Source code in unaiverse/agent_basics.py
def load(self, where: str = "") -> bool:
    """Load the agent's state from a specified location.

    Args:
        where: The directory path from which the agent's state should be loaded. Defaults to "".

    Returns:
        True if loading succeeded.
    """

    if where == '':
        if self._node_identity_dir is None or len(self._node_identity_dir) == 0:
            return False
        where = os.path.join(self._node_identity_dir, "agent_state")  # Default save path

    # Check if directory exists
    if not os.path.exists(where):
        log.error("No state folder found for " + ("agent" if not self.is_world else "world") +
                  f" {self._node_name}.")
        return False

    # Check if the specific pickle file exists
    pkl_path = os.path.join(where, f"{self._node_name}.pkl")
    if not os.path.exists(pkl_path):
        log.error("No saved state found for " + ("agent" if not self.is_world else "world") +
                  f" {self._node_name}.")
        return False

    # Loading the agent state dictionary
    try:
        with open(pkl_path, "rb") as f:
            agent_state_dict = pickle.load(f)
    except Exception as e:
        raise Exception(f"Error loading pickle file at {pkl_path}: {e}")

    # Update self's attributes with the loaded object's attributes
    self.__dict__.update(agent_state_dict)

    # Check if we also need to load the processor state
    pt_path = os.path.join(where, f"{self._node_name}.pt")
    load_proc = self.proc is not None and os.path.exists(pt_path)
    if load_proc:
        try:
            checkpoint = torch.load(pt_path)
            if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
                self.proc.load_state_dict(checkpoint['model_state_dict'])

                # Restore Optimizer to proc_opts
                if 'optimizer_state_dict' in checkpoint and self.proc_opts.get('optimizer') is not None:
                    self.proc_opts['optimizer'].load_state_dict(checkpoint['optimizer_state_dict'])
            else:
                self.proc.load_state_dict(checkpoint)
        except Exception as e:
            raise Exception(f"Error loading processor state: {e}")

    return True