AgentBasics(proc: ModuleWrapper | Module | None, proc_inputs: list[StreamType | str] | None = None, proc_outputs: list[StreamType | str] | None = None, proc_opts: dict | None = None, behav: HybridStateMachine | None = None, behav_lone_wolf: HybridStateMachine | str = 'serve', merge_flat_stream_labels: bool = False, buffer_generated: bool = False, buffer_generated_by_others: str = 'none', world_folder: str | None = None, policy_filter: Callable | None = None, policy_filter_lone_wolf: Callable | None = None)
Base class that encapsulates agent identity, stream management, and peer bookkeeping.
AgentBasics holds everything needed to describe an agent or a world within the
UNaIVERSE network: the processor configuration, all known and owned streams, the
catalogue of connected peers (all_agents, world_masters, world_agents,
human_agents, artificial_agents), role constants and lookup tables, and the
interaction manager used to coordinate multi-step dialogues.
Neither action selection nor execution logic lives here - those are provided by the
Agent subclass. World also inherits from this class and reuses the peer-tracking
machinery while overriding stream and role assignment behaviour.
AgentBasics is not usually instantiated directly; use Agent or World instead.
Attributes:
| Name |
Type |
Description |
ROLE_PUBLIC |
|
Integer bitmask for the public role (value 0).
|
ROLE_WORLD_MASTER |
|
Integer bitmask for the world-master role (value 3).
|
ROLE_WORLD_AGENT |
|
Integer bitmask for the world-agent role (value 1).
|
CUSTOM_ROLES |
|
Class-level list of custom role name strings defined by the world.
|
ROLE_BITS_TO_STR |
|
Mapping from integer role bitmasks to human-readable role strings.
|
ROLE_STR_TO_BITS |
|
Mapping from human-readable role strings to integer bitmasks.
|
BADGE_TYPES |
|
Set of valid badge type strings ("completed", "attended",
"intermediate", "pro").
|
HUMAN |
|
Constant string "human" used to tag human agent nodes.
|
all_agents |
dict[str, NodeProfile]
|
Mapping from peer ID to NodeProfile for every connected peer.
|
world_agents |
dict[str, NodeProfile]
|
Mapping from peer ID to NodeProfile for world-agent peers.
|
world_masters |
dict[str, NodeProfile]
|
Mapping from peer ID to NodeProfile for world-master peers.
|
human_agents |
dict[str, NodeProfile]
|
Mapping from peer ID to NodeProfile for human peers.
|
artificial_agents |
dict[str, NodeProfile]
|
Mapping from peer ID to NodeProfile for artificial peers.
|
im |
|
The InteractionManager that tracks live interactions for this agent.
|
stdin |
|
StreamsProxyWithDefaults bound to processor input streams.
|
stdout |
|
StreamProxy bound to processor output streams.
|
stdtar |
|
StreamProxy bound to target streams.
|
stdext |
|
StreamProxy bound to environmental streams.
|
stats |
Stats | None
|
Optional Stats recorder; None unless explicitly set.
|
Examples:
AgentBasics is normally used through its Agent subclass:
>>> from unaiverse.agent import Agent
>>> import torch
>>> agent = Agent(proc=torch.nn.Linear(10, 2))
>>> print(agent.ROLE_BITS_TO_STR[Agent.ROLE_WORLD_AGENT])
world_agent
Initialize an agent or world with its processor, streams, and behavioral state machines.
This constructor wires together the processor, the two behavioral HSMs (world-scoped
behav and public-network behav_lone_wolf), the stream bookkeeping dictionaries,
the peer membership catalogues, and the InteractionManager. When the concrete class
defines a process method (i.e., it is an Agent rather than a World), a default
empty HSM is created if behav is None, and the lone-wolf HSM is loaded from the
built-in public.json template when behav_lone_wolf is the string "serve" or
"ask".
If the class is a World (no process method present), world_folder is mandatory
and is_world is set to True.
Parameters:
| Name |
Type |
Description |
Default |
proc
|
ModuleWrapper | Module | None
|
The processing module for the agent (e.g., a torch.nn.Module or a
ModuleWrapper). Pass None for a world or for a no-processor agent.
|
required
|
proc_inputs
|
list[StreamType | str] | None
|
List of StreamType objects describing the expected processor inputs.
If None, the inputs are inferred automatically by AgentProcessorChecker.
Defaults to None.
|
None
|
proc_outputs
|
list[StreamType | str] | None
|
List of StreamType objects describing the expected processor outputs.
If None, the outputs are inferred automatically by AgentProcessorChecker.
Defaults to None.
|
None
|
proc_opts
|
dict | None
|
Dictionary of extra options forwarded to the processor (e.g., loss functions
under the "losses" key). Defaults to None.
|
None
|
behav
|
HybridStateMachine | None
|
The HybridStateMachine describing the agent's behavior inside a world.
If None, a default empty HSM is created automatically (agent only).
Defaults to None.
|
None
|
behav_lone_wolf
|
HybridStateMachine | str
|
The HybridStateMachine describing the agent's behavior on the
public network, or one of the strings "serve" or "ask" to load a
pre-designed template HSM. Defaults to "serve".
|
'serve'
|
merge_flat_stream_labels
|
bool
|
If True, flat descriptor labels from all owned streams
are merged into a shared superset so every stream uses the same label space.
Defaults to False.
|
False
|
buffer_generated
|
bool
|
If True, processor output streams are created as
BufferedStream objects. Defaults to False.
|
False
|
buffer_generated_by_others
|
str
|
Controls buffering of streams produced by peer agents.
"one" buffers per-peer, "all" buffers across all peers, "none"
disables buffering. Defaults to "none".
|
'none'
|
world_folder
|
str | None
|
Path to the folder that holds the world's .json role-behaviour
files and agent.py. Required when the instance is a World; ignored
otherwise. Defaults to None.
|
None
|
policy_filter
|
Callable | None
|
A callable (or name of an Agent method) that overrides the
action selected by the HSM policy when the agent is inside a world.
Defaults to None.
|
None
|
policy_filter_lone_wolf
|
Callable | None
|
Same as policy_filter, but applied to the public-
network HSM. Defaults to None.
|
None
|
Raises:
| Type |
Description |
GenException
|
If buffer_generated_by_others is not one of "one", "all",
or "none".
|
GenException
|
If behav is provided but is not a HybridStateMachine instance.
|
GenException
|
If the instance is a World and world_folder is None.
|
ValueError
|
If behav_lone_wolf is a string other than "serve" or "ask".
|
Examples:
Create a minimal agent with a PyTorch linear layer:
>>> import torch
>>> from unaiverse.agent import Agent
>>> agent = Agent(proc=torch.nn.Linear(10, 2))
Create a world-only instance (no processor):
>>> from unaiverse.world import World
>>> world = World(world_folder="./my_world")
Source code in unaiverse/agent_basics.py
| def __init__(self,
proc: ModuleWrapper | torch.nn.Module | None,
proc_inputs: list[StreamType | str] | None = None,
proc_outputs: list[StreamType | str] | None = None,
proc_opts: dict | None = None,
behav: HybridStateMachine | None = None,
behav_lone_wolf: HybridStateMachine | str = "serve",
merge_flat_stream_labels: bool = False,
buffer_generated: bool = False,
buffer_generated_by_others: str = "none",
world_folder: str | None = None,
policy_filter: Callable | None = None,
policy_filter_lone_wolf: Callable | None = None):
"""Initialize an agent or world with its processor, streams, and behavioral state machines.
This constructor wires together the processor, the two behavioral HSMs (world-scoped
``behav`` and public-network ``behav_lone_wolf``), the stream bookkeeping dictionaries,
the peer membership catalogues, and the ``InteractionManager``. When the concrete class
defines a ``process`` method (i.e., it is an ``Agent`` rather than a ``World``), a default
empty HSM is created if ``behav`` is ``None``, and the lone-wolf HSM is loaded from the
built-in ``public.json`` template when ``behav_lone_wolf`` is the string ``"serve"`` or
``"ask"``.
If the class is a ``World`` (no ``process`` method present), ``world_folder`` is mandatory
and ``is_world`` is set to ``True``.
Args:
proc: The processing module for the agent (e.g., a ``torch.nn.Module`` or a
``ModuleWrapper``). Pass ``None`` for a world or for a no-processor agent.
proc_inputs: List of ``StreamType`` objects describing the expected processor inputs.
If ``None``, the inputs are inferred automatically by ``AgentProcessorChecker``.
Defaults to ``None``.
proc_outputs: List of ``StreamType`` objects describing the expected processor outputs.
If ``None``, the outputs are inferred automatically by ``AgentProcessorChecker``.
Defaults to ``None``.
proc_opts: Dictionary of extra options forwarded to the processor (e.g., loss functions
under the ``"losses"`` key). Defaults to ``None``.
behav: The ``HybridStateMachine`` describing the agent's behavior inside a world.
If ``None``, a default empty HSM is created automatically (agent only).
Defaults to ``None``.
behav_lone_wolf: The ``HybridStateMachine`` describing the agent's behavior on the
public network, or one of the strings ``"serve"`` or ``"ask"`` to load a
pre-designed template HSM. Defaults to ``"serve"``.
merge_flat_stream_labels: If ``True``, flat descriptor labels from all owned streams
are merged into a shared superset so every stream uses the same label space.
Defaults to ``False``.
buffer_generated: If ``True``, processor output streams are created as
``BufferedStream`` objects. Defaults to ``False``.
buffer_generated_by_others: Controls buffering of streams produced by peer agents.
``"one"`` buffers per-peer, ``"all"`` buffers across all peers, ``"none"``
disables buffering. Defaults to ``"none"``.
world_folder: Path to the folder that holds the world's ``.json`` role-behaviour
files and ``agent.py``. Required when the instance is a ``World``; ignored
otherwise. Defaults to ``None``.
policy_filter: A callable (or name of an ``Agent`` method) that overrides the
action selected by the HSM policy when the agent is inside a world.
Defaults to ``None``.
policy_filter_lone_wolf: Same as ``policy_filter``, but applied to the public-
network HSM. Defaults to ``None``.
Raises:
GenException: If ``buffer_generated_by_others`` is not one of ``"one"``, ``"all"``,
or ``"none"``.
GenException: If ``behav`` is provided but is not a ``HybridStateMachine`` instance.
GenException: If the instance is a ``World`` and ``world_folder`` is ``None``.
ValueError: If ``behav_lone_wolf`` is a string other than ``"serve"`` or ``"ask"``.
Examples:
Create a minimal agent with a PyTorch linear layer:
>>> import torch
>>> from unaiverse.agent import Agent
>>> agent = Agent(proc=torch.nn.Linear(10, 2))
Create a world-only instance (no processor):
>>> from unaiverse.world import World
>>> world = World(world_folder="./my_world")
"""
# Agent-related features
self.behav = behav # HSM that describes the agent behavior in the private/world net
self.behav_lone_wolf = behav_lone_wolf # HSM that describes the agent behavior in the public net
self.behav_wildcards = {}
self.proc = proc
self.proc_updated_since_last_save = False
self.proc_inputs = proc_inputs
self.proc_outputs = proc_outputs
self.proc_opts = proc_opts
self.proc_last_inputs = None
self.proc_last_outputs = None
self.proc_human_peer_id_to_interaction = {}
self.proc_optional_inputs: list[dict] | None = None # It must be None (do not put an empty list)
self.proc_net_hash: dict[str, None | str] = {'public': None, 'private': None}
self.proc_in_net_hash: dict[str, None | str] = {'public': None, 'private': None}
self.merge_flat_stream_labels = merge_flat_stream_labels
self.buffer_generated = buffer_generated
self.buffer_generated_by_others = buffer_generated_by_others
self.world_folder = world_folder
self.policy_filter = policy_filter
self.policy_filter_opts = {}
self.policy_filter_lone_wolf = policy_filter_lone_wolf
self.policy_filter_lone_wolf_opts = {}
self.clock = clock # Backward compatibility
if self.buffer_generated_by_others not in {"one", "all", "none"}:
raise GenException("Param buffer_generated_by_others can be set to 'one', 'all', or 'none' only.")
# Streams
self.known_streams = {} # All streams that are known to this agent, organized by net hash
self.owned_streams = {} # The streams that are generated/offered by this agent, organized by net hash
self.env_streams = {} # The owned streams that come from environmental sources (e.g., a camera), by net hash
self.proc_streams = {} # The owned streams that are generated by the agent's processor, organized by net hash
self.proc_in_streams = {} # The owned streams that are input to the agent's processor, organized by net hash
self.known_streams_by_user_hash = {} # From peer_id:name to stream object
self.owned_streams_by_user_hash = {} # From stream name to stream object
self.env_streams_by_user_hash = {} # From peer_id:name to stream object
self.proc_streams_by_user_hash = {} # From peer_id:name to stream object
self.proc_in_streams_by_user_hash = {} # From peer_id:name to stream object
self.compat_in_streams = [] # Streams compatible with the processor input (dynamically set)
self.compat_out_streams = [] # Streams compatible with the processor output (dynamically set)
# Agents, world masters, expected world masters
self.all_agents: dict[str, NodeProfile] = {} # ID -> profile (all types of agent)
self.public_agents: dict[str, NodeProfile] = {} # ID -> profile of agents talking in public manner
self.world_agents: dict[str, NodeProfile] = {} # ID -> profile of all agents living in this world
self.world_masters: dict[str, NodeProfile] = {} # ID -> profile of all master-agents living in this world
self.human_agents: dict[str, NodeProfile] = {} # ID -> profile (human agents)
self.artificial_agents: dict[str, NodeProfile] = {} # ID -> profile (artificial agent)
self.world_profile = None
self.is_world = False # If this instance is about a world: it will be discovered at creation time
self.last_sent_interaction = None
# World specific attributes (they are only used if this agent is actually a world)
self.packed_agent_files: str = ""
self.role_to_behav = {}
self.agent_badges: dict[str, list[dict]] = {} # Peer_id -> collected badges for other agents
self.role_changed_by_world: bool = False
self.received_address_update: bool = False
# Internal properties about the way streams are used
self.last_buffered_peer_id_to_info = {} # If buffering was turned on
self.last_ref_uuid = None
self.overridden_action_step = None
self.locked_set_proc_input = False
# Stats
self.stats: Stats | None = None
self.agent_stats_code = None
# Information inherited from the node that hosts this agent
self._node_name = "unk"
self._node_conn = None
self._node_profile = None
self._node_out_fcn = print
self._node_ask_to_get_in_touch_fcn = None
self._node_purge_fcn = None
self._node_agents_waiting = None
self._node_identity_dir = ''
# Utilities
self._proc_in_streams_by_user_hash_pub = {}
self._proc_streams_by_user_hash_pub = {}
self._env_streams_by_user_hash_pub = {}
self._proc_in_streams_by_user_hash_prv = {}
self._proc_streams_by_user_hash_prv = {}
self._env_streams_by_user_hash_prv = {}
# Checking and filling (guessing) missing processor-related info (proc_inputs and proc_outputs)
# and allocating a dummy processor if it was not specified (if None)
AgentProcessorChecker(self)
# Checking HSMs
if not (self.behav is None or isinstance(self.behav, HybridStateMachine)):
raise GenException("Invalid behavior: it must be either None or a HybridStateMachine")
# The stream_hash of compatible streams for each data_props are stored in a set
self.compat_in_streams = [set() for _ in range(len(self.proc_inputs))] \
if self.proc_inputs is not None else None
self.compat_out_streams = [set() for _ in range(len(self.proc_outputs))] \
if self.proc_outputs is not None else None
# Interaction Manager, stdin/stdout
self.im = InteractionManager(self, max_interactions=Custom.MAX_INTERACTIONS)
self.stdin = StreamsProxyWithDefaults(default_values=[item["default_value"] if item["has_default"] else None
for item in self.proc_optional_inputs])
self.stdtar = StreamProxy()
self.stdext = StreamProxy()
self.stdout = StreamProxy()
# Loading default public HSM
if hasattr(self, "process"): # Trick to distinguish if this is an Agent or a World (both sons of this class)
self.is_world = False
# Setting an empty HSM as default is not provided (private/world)
if self.behav is None:
self.behav = HybridStateMachine(self, policy=self.policy_default)
self.behav.add_state("empty")
if self.behav_lone_wolf is not None and isinstance(self.behav_lone_wolf, str):
template_string = self.behav_lone_wolf
if template_string == "serve":
json_to_load = "public.json"
elif template_string == "ask":
json_to_load = "public.json"
else:
raise ValueError("Invalid behav_lone_wolf: it must be an HybridStateMachine or a string "
"in ('serve', 'ask')")
# Safe way to load a file packed in a pip package
self.behav_lone_wolf = HybridStateMachine(self, policy=self.policy_default)
utils_path = importlib.resources.files("unaiverse.utils")
json_file = utils_path.joinpath(json_to_load)
file = json_file.open()
self.behav_lone_wolf.load(file)
file.close()
self.set_policy_filter(self.policy_filter_lone_wolf, public=True)
else:
self.is_world = True
if self.world_folder is None:
raise GenException("No world folder was indicated (world_folder argument)")
|
ROLE_PUBLIC
class-attribute
instance-attribute
ROLE_WORLD_MASTER
class-attribute
instance-attribute
ROLE_WORLD_MASTER = 1 << 0 | 1 << 1
ROLE_WORLD_AGENT
class-attribute
instance-attribute
ROLE_WORLD_AGENT = 1 << 0 | 0 << 1
CUSTOM_ROLES
class-attribute
instance-attribute
ROLE_BITS_TO_STR
class-attribute
instance-attribute
ROLE_STR_TO_BITS
class-attribute
instance-attribute
BADGE_TYPES
class-attribute
instance-attribute
BADGE_TYPES = {'completed', 'attended', 'intermediate', 'pro'}
HUMAN
class-attribute
instance-attribute
behav_lone_wolf
instance-attribute
behav_lone_wolf = behav_lone_wolf
behav_wildcards
instance-attribute
proc_updated_since_last_save
instance-attribute
proc_updated_since_last_save = False
proc_inputs = proc_inputs
proc_outputs
instance-attribute
proc_outputs = proc_outputs
proc_opts
instance-attribute
proc_last_outputs
instance-attribute
proc_human_peer_id_to_interaction
instance-attribute
proc_human_peer_id_to_interaction = {}
proc_optional_inputs: list[dict] | None = None
proc_net_hash
instance-attribute
proc_net_hash: dict[str, None | str] = {'public': None, 'private': None}
proc_in_net_hash
instance-attribute
proc_in_net_hash: dict[str, None | str] = {'public': None, 'private': None}
merge_flat_stream_labels
instance-attribute
merge_flat_stream_labels = merge_flat_stream_labels
buffer_generated
instance-attribute
buffer_generated = buffer_generated
buffer_generated_by_others
instance-attribute
buffer_generated_by_others = buffer_generated_by_others
world_folder
instance-attribute
world_folder = world_folder
policy_filter
instance-attribute
policy_filter = policy_filter
policy_filter_opts
instance-attribute
policy_filter_lone_wolf
instance-attribute
policy_filter_lone_wolf = policy_filter_lone_wolf
policy_filter_lone_wolf_opts
instance-attribute
policy_filter_lone_wolf_opts = {}
known_streams
instance-attribute
owned_streams
instance-attribute
env_streams
instance-attribute
proc_streams
instance-attribute
proc_in_streams
instance-attribute
known_streams_by_user_hash
instance-attribute
known_streams_by_user_hash = {}
owned_streams_by_user_hash
instance-attribute
owned_streams_by_user_hash = {}
env_streams_by_user_hash
instance-attribute
env_streams_by_user_hash = {}
proc_streams_by_user_hash
instance-attribute
proc_streams_by_user_hash = {}
proc_in_streams_by_user_hash
instance-attribute
proc_in_streams_by_user_hash = {}
all_agents
instance-attribute
public_agents
instance-attribute
world_agents
instance-attribute
world_masters
instance-attribute
human_agents
instance-attribute
artificial_agents
instance-attribute
world_profile
instance-attribute
is_world
instance-attribute
last_sent_interaction
instance-attribute
last_sent_interaction = None
packed_agent_files
instance-attribute
packed_agent_files: str = ''
role_to_behav
instance-attribute
agent_badges
instance-attribute
agent_badges: dict[str, list[dict]] = {}
role_changed_by_world
instance-attribute
role_changed_by_world: bool = False
received_address_update
instance-attribute
received_address_update: bool = False
last_buffered_peer_id_to_info
instance-attribute
last_buffered_peer_id_to_info = {}
last_ref_uuid
instance-attribute
overridden_action_step
instance-attribute
overridden_action_step = None
locked_set_proc_input = False
stats
instance-attribute
stats: Stats | None = None
agent_stats_code
instance-attribute
compat_in_streams
instance-attribute
compat_in_streams = [(set()) for _ in (range(len(proc_inputs)))] if proc_inputs is not None else None
compat_out_streams
instance-attribute
compat_out_streams = [(set()) for _ in (range(len(proc_outputs)))] if proc_outputs is not None else None
stdin
instance-attribute
stdin = StreamsProxyWithDefaults(default_values=[(item['default_value'] if item['has_default'] else None) for item in (proc_optional_inputs)])
stdtar
instance-attribute
stdext
instance-attribute
stdout
instance-attribute
set_node_info
set_node_info(conn: NodeConn, profile: NodeProfile, ask_to_get_in_touch_fcn: Callable, purge_fcn: Callable, node_identity_dir: str, agents_waiting: dict) -> bool
Bind node-level resources to this agent after the hosting node is ready.
This method is called by the framework once the underlying network node has been
fully initialised and a peer ID is available. It stores references to the
connection pool, the node profile, and the various node callbacks, then resolves
any placeholder peer IDs ("<public_peer_id>", "<private_peer_id>") that
were embedded in stream objects registered before the node was ready. If the
processor wraps a HumanModule, the node's static profile is updated to mark
the node as a human node.
For world nodes, this method additionally loads and refactors all role-behaviour
JSON files from world_folder (via load_and_refactor_action_file_and_behav_files),
calls create_behav_files to let subclasses generate JSON files dynamically,
invokes augment_roles to build the full role lookup tables, and loads any
custom stats.py found in the world folder.
After all role/stream setup is complete, processor input and output streams are
created (create_proc_input_streams, create_proc_output_streams), the node
profile is updated with stream metadata (update_streams_in_profile), and stdin
is disabled for human agents.
Parameters:
| Name |
Type |
Description |
Default |
conn
|
NodeConn
|
The NodeConn connection pool manager provided by the hosting node.
|
required
|
profile
|
NodeProfile
|
The NodeProfile of the hosting node.
|
required
|
ask_to_get_in_touch_fcn
|
Callable
|
Callback invoked to request a rendezvous with another
peer.
|
required
|
purge_fcn
|
Callable
|
Callback invoked to disconnect and remove a peer from the node.
|
required
|
node_identity_dir
|
str
|
Path to the folder storing the node's identity files.
|
required
|
agents_waiting
|
dict
|
Dictionary of peers that connected but have not yet been
evaluated for addition to all_agents.
|
required
|
Returns:
| Type |
Description |
bool
|
True upon successful setup.
|
Raises:
| Type |
Description |
GenException
|
If this is a world node and world_folder is None.
|
GenException
|
If no .json role files are found in world_folder.
|
GenException
|
If the role Python files and JSON files are inconsistent.
|
GenException
|
If any behaviour file cannot be loaded or saved.
|
GenException
|
If stats.py exists in the world folder but cannot be read.
|
Examples:
set_node_info is called internally by the UNaIVERSE node framework and is
not normally invoked by user code. The following sketch shows the calling
pattern:
>>> # Inside the node's initialisation routine:
>>> ok = agent.set_node_info(conn, profile, ask_fcn, purge_fcn, identity_dir, {})
>>> assert ok
Source code in unaiverse/agent_basics.py
| def set_node_info(self, conn: NodeConn, profile: NodeProfile,
ask_to_get_in_touch_fcn: Callable, purge_fcn: Callable, node_identity_dir: str,
agents_waiting: dict) -> bool:
"""Bind node-level resources to this agent after the hosting node is ready.
This method is called by the framework once the underlying network node has been
fully initialised and a peer ID is available. It stores references to the
connection pool, the node profile, and the various node callbacks, then resolves
any placeholder peer IDs (``"<public_peer_id>"``, ``"<private_peer_id>"``) that
were embedded in stream objects registered before the node was ready. If the
processor wraps a ``HumanModule``, the node's static profile is updated to mark
the node as a human node.
For world nodes, this method additionally loads and refactors all role-behaviour
JSON files from ``world_folder`` (via ``load_and_refactor_action_file_and_behav_files``),
calls ``create_behav_files`` to let subclasses generate JSON files dynamically,
invokes ``augment_roles`` to build the full role lookup tables, and loads any
custom ``stats.py`` found in the world folder.
After all role/stream setup is complete, processor input and output streams are
created (``create_proc_input_streams``, ``create_proc_output_streams``), the node
profile is updated with stream metadata (``update_streams_in_profile``), and stdin
is disabled for human agents.
Args:
conn: The ``NodeConn`` connection pool manager provided by the hosting node.
profile: The ``NodeProfile`` of the hosting node.
ask_to_get_in_touch_fcn: Callback invoked to request a rendezvous with another
peer.
purge_fcn: Callback invoked to disconnect and remove a peer from the node.
node_identity_dir: Path to the folder storing the node's identity files.
agents_waiting: Dictionary of peers that connected but have not yet been
evaluated for addition to ``all_agents``.
Returns:
``True`` upon successful setup.
Raises:
GenException: If this is a world node and ``world_folder`` is ``None``.
GenException: If no ``.json`` role files are found in ``world_folder``.
GenException: If the role Python files and JSON files are inconsistent.
GenException: If any behaviour file cannot be loaded or saved.
GenException: If ``stats.py`` exists in the world folder but cannot be read.
Examples:
``set_node_info`` is called internally by the UNaIVERSE node framework and is
not normally invoked by user code. The following sketch shows the calling
pattern:
>>> # Inside the node's initialisation routine:
>>> ok = agent.set_node_info(conn, profile, ask_fcn, purge_fcn, identity_dir, {})
>>> assert ok
"""
# Getting basic references
self._node_conn = conn
self._node_profile = profile
self._node_name = profile.get_static_profile()['node_name']
self._node_ask_to_get_in_touch_fcn = ask_to_get_in_touch_fcn
self._node_purge_fcn = purge_fcn
self._node_identity_dir = node_identity_dir
self._node_agents_waiting = agents_waiting
# Checking if human, marking the node
if self.proc is not None and isinstance(self.proc.module, HumanModule):
self._node_profile.get_static_profile()["node_type"] = self.HUMAN
# Adding peer_id information into the already existing stream data (if any)
# (initially marked with generic wildcards like <public_peer_id>, ...)
net_hashes = list(self.known_streams.keys())
for net_hash in net_hashes:
if net_hash.startswith("<public_peer_id>") or net_hash.startswith("<private_peer_id>"):
stream_dict = self.known_streams[net_hash]
for stream_obj in stream_dict.values():
self.add_stream(stream_obj, owned=True) # This will also re-add streams using the node clock
# Removing place-holder streams
for peer_id in ["<public_peer_id>", "<private_peer_id>"]:
to_remove = []
for net_hash in self.known_streams.keys():
if DataProps.peer_id_from_net_hash(net_hash) == peer_id:
for _name, _stream in self.known_streams[net_hash].items():
to_remove.append((net_hash, _name))
# Removing
for (net_hash, name) in to_remove:
user_hash = DataProps.user_hash_from_net_hash(net_hash, name)
del self.known_streams[net_hash][name]
if len(self.known_streams[net_hash]) == 0:
del self.known_streams[net_hash]
if user_hash in self.known_streams_by_user_hash:
del self.known_streams_by_user_hash[user_hash]
# Removing all the owned streams (environment and processor streams are of course "owned")
if net_hash in self.owned_streams:
if name in self.owned_streams[net_hash]:
del self.owned_streams[net_hash][name]
if len(self.owned_streams[net_hash]) == 0:
del self.owned_streams[net_hash]
if user_hash in self.owned_streams_by_user_hash:
del self.owned_streams_by_user_hash[user_hash]
if net_hash in self.env_streams:
if name in self.env_streams[net_hash]:
del self.env_streams[net_hash][name]
if len(self.env_streams[net_hash]) == 0:
del self.env_streams[net_hash]
if user_hash in self.env_streams_by_user_hash:
del self.env_streams_by_user_hash[user_hash]
if net_hash in self.proc_streams:
if name in self.proc_streams[net_hash]:
del self.proc_streams[net_hash][name]
if len(self.proc_streams[net_hash]) == 0:
del self.proc_streams[net_hash]
if user_hash in self.proc_streams_by_user_hash:
del self.proc_streams_by_user_hash[user_hash]
if net_hash in self.proc_in_streams:
if name in self.proc_in_streams[net_hash]:
del self.proc_in_streams[net_hash][name]
if len(self.proc_in_streams[net_hash]) == 0:
del self.proc_in_streams[net_hash]
if user_hash in self.proc_in_streams_by_user_hash:
del self.proc_in_streams_by_user_hash[user_hash]
log.misc(f"Successfully removed known stream with network hash {net_hash}, stream name: {name}")
# World only: loading action files and refactoring (or building) JSON files of the different roles.
# This where the world guesses roles.
if self.is_world:
assert self.world_folder is not None
# Check role-JSON files in the world folder
role_json_tracker = FileTracker(self.world_folder, ext=".json")
# This usually does nothing, but if you like to dynamically create JSON files, overload this method
self.create_behav_files()
# Loading and refactoring roles and behaviors
self.load_and_refactor_action_file_and_behav_files(force_save=role_json_tracker.something_changed())
# Building combination of default roles (considering public, world_agent, world_master default roles), and
# agent/world specific roles
self.augment_roles()
# Loading the custom Stats code
if self.world_folder is not None:
stats_file = os.path.join(self.world_folder, 'stats.py')
if os.path.exists(stats_file):
log.misc(f"Found custom stats.py at {stats_file}")
try:
with open(stats_file, 'r', encoding='utf-8') as file:
self.agent_stats_code = file.read()
except Exception as e:
raise GenException(f'Error while reading/loading the stats.py file: {stats_file} [{e}]')
# Creating streams associated to the processor input (right now we assume there is no need to buffer them)
self.create_proc_input_streams(buffered=False)
# Creating streams associated to the processor output
self.create_proc_output_streams(buffered=self.buffer_generated)
# Updating node profile by indicating the processor-related streams
self.update_streams_in_profile()
# Blocking stdin for humans
if has_human_processor(self):
self.set_default_stdin_binding(public=True)
self.stdin.disable()
self.set_default_stdin_binding(public=False)
self.stdin.disable()
return True
|
get_connection_pool_manager
get_connection_pool_manager() -> NodeConn
Return the connection pool manager bound to this agent's hosting node.
The NodeConn object provides low-level access to peer connections, role
assignment, rendezvous management, and publish-subscribe channels. It is set
by set_node_info and is None before that call.
Returns:
| Type |
Description |
NodeConn
|
The NodeConn instance associated with the hosting node, or None if
|
NodeConn
|
set_node_info has not yet been called.
|
Source code in unaiverse/agent_basics.py
| def get_connection_pool_manager(self) -> NodeConn:
"""Return the connection pool manager bound to this agent's hosting node.
The ``NodeConn`` object provides low-level access to peer connections, role
assignment, rendezvous management, and publish-subscribe channels. It is set
by ``set_node_info`` and is ``None`` before that call.
Returns:
The ``NodeConn`` instance associated with the hosting node, or ``None`` if
``set_node_info`` has not yet been called.
"""
return self._node_conn
|
set_default_stdin_binding
set_default_stdin_binding(public: bool | None = None) -> None
Bind the stdin stream proxy to the default processor input streams.
Selects the correct set of processor input streams (public or private) and
calls stdin.bind with the system-level interaction UUID so that the proxy
always points at the agent's own streams for the current network context.
When public is None, the choice is made automatically based on
behaving_in_world: private streams are used inside a world, public streams
are used on the public network.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool | None
|
If True, bind to the public processor-input streams. If False,
bind to the private (world) processor-input streams. If None, the
network context is detected automatically. Defaults to None.
|
None
|
Source code in unaiverse/agent_basics.py
| def set_default_stdin_binding(self, public: bool | None = None) -> None:
"""Bind the ``stdin`` stream proxy to the default processor input streams.
Selects the correct set of processor input streams (public or private) and
calls ``stdin.bind`` with the system-level interaction UUID so that the proxy
always points at the agent's own streams for the current network context.
When ``public`` is ``None``, the choice is made automatically based on
``behaving_in_world``: private streams are used inside a world, public streams
are used on the public network.
Args:
public: If ``True``, bind to the public processor-input streams. If ``False``,
bind to the private (world) processor-input streams. If ``None``, the
network context is detected automatically. Defaults to ``None``.
"""
if (public is not None and not public) or (public is None and self.behaving_in_world()):
self.stdin.bind(self._proc_in_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
if (public is not None and public) or (public is None and not self.behaving_in_world()):
self.stdin.bind(self._proc_in_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
|
set_default_stream_binding
set_default_stream_binding(public: bool | None = None) -> None
Bind all four standard stream proxies to the default streams for the current context.
Updates stdin, stdtar, stdext, and stdout in a single call,
selecting public or private stream dictionaries based on public. When
public is None, the context is determined automatically via
behaving_in_world.
stdtar is always bound to an empty dictionary (target streams are not
available by default and must be explicitly set by the agent).
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool | None
|
If True, bind all proxies to the public stream dictionaries. If
False, bind to the private (world) dictionaries. If None, the
network context is detected automatically. Defaults to None.
|
None
|
Source code in unaiverse/agent_basics.py
| def set_default_stream_binding(self, public: bool | None = None) -> None:
"""Bind all four standard stream proxies to the default streams for the current context.
Updates ``stdin``, ``stdtar``, ``stdext``, and ``stdout`` in a single call,
selecting public or private stream dictionaries based on ``public``. When
``public`` is ``None``, the context is determined automatically via
``behaving_in_world``.
``stdtar`` is always bound to an empty dictionary (target streams are not
available by default and must be explicitly set by the agent).
Args:
public: If ``True``, bind all proxies to the public stream dictionaries. If
``False``, bind to the private (world) dictionaries. If ``None``, the
network context is detected automatically. Defaults to ``None``.
"""
if (public is not None and not public) or (public is None and self.behaving_in_world()):
self.stdin.bind(self._proc_in_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdtar.bind({}, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdext.bind(self._env_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdout.bind(self._proc_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
if (public is not None and public) or (public is None and not self.behaving_in_world()):
self.stdin.bind(self._proc_in_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdtar.bind({}, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdext.bind(self._env_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdout.bind(self._proc_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
|
get_default_stdin_bindings
get_default_stdin_bindings(public: bool | None = None) -> dict
Return the stream dictionary that is bound by default to stdin.
The returned dictionary maps user-hash strings ("peer_id:name") to
Stream objects and corresponds to the agent's own processor input streams
for the requested network context.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool | None
|
If True, return the public processor-input stream dictionary.
If False, return the private (world) processor-input stream dictionary.
If None, the context is determined automatically via behaving_in_world.
Defaults to None.
|
None
|
Returns:
| Type |
Description |
dict
|
A dictionary mapping user-hash strings to Stream objects, or an empty
|
dict
|
dictionary if neither branch is matched.
|
Source code in unaiverse/agent_basics.py
| def get_default_stdin_bindings(self, public: bool | None = None) -> dict:
"""Return the stream dictionary that is bound by default to ``stdin``.
The returned dictionary maps user-hash strings (``"peer_id:name"``) to
``Stream`` objects and corresponds to the agent's own processor input streams
for the requested network context.
Args:
public: If ``True``, return the public processor-input stream dictionary.
If ``False``, return the private (world) processor-input stream dictionary.
If ``None``, the context is determined automatically via ``behaving_in_world``.
Defaults to ``None``.
Returns:
A dictionary mapping user-hash strings to ``Stream`` objects, or an empty
dictionary if neither branch is matched.
"""
if (public is not None and not public) or (public is None and self.behaving_in_world()):
return self._proc_in_streams_by_user_hash_prv
if (public is not None and public) or (public is None and not self.behaving_in_world()):
return self._proc_in_streams_by_user_hash_pub
return {}
|
get_default_stream_bindings
get_default_stream_bindings(public: bool | None = None) -> tuple[dict, dict, dict, dict]
Return the four stream dictionaries that are bound by default to the standard proxies.
Each returned dictionary maps user-hash strings to Stream objects. The four
elements correspond to stdin, stdtar, stdext, and stdout
respectively. stdtar is always an empty dictionary.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool | None
|
If True, return the public stream dictionaries. If False,
return the private (world) stream dictionaries. If None, the context
is determined automatically via behaving_in_world. Defaults to None.
|
None
|
Returns:
| Type |
Description |
dict
|
A four-tuple (stdin_streams, stdtar_streams, stdext_streams, stdout_streams)
|
dict
|
where each element is a dictionary mapping user-hash strings to Stream
|
dict
|
objects. All four dictionaries are empty if neither branch is matched.
|
Source code in unaiverse/agent_basics.py
| def get_default_stream_bindings(self, public: bool | None = None) -> tuple[dict, dict, dict, dict]:
"""Return the four stream dictionaries that are bound by default to the standard proxies.
Each returned dictionary maps user-hash strings to ``Stream`` objects. The four
elements correspond to ``stdin``, ``stdtar``, ``stdext``, and ``stdout``
respectively. ``stdtar`` is always an empty dictionary.
Args:
public: If ``True``, return the public stream dictionaries. If ``False``,
return the private (world) stream dictionaries. If ``None``, the context
is determined automatically via ``behaving_in_world``. Defaults to ``None``.
Returns:
A four-tuple ``(stdin_streams, stdtar_streams, stdext_streams, stdout_streams)``
where each element is a dictionary mapping user-hash strings to ``Stream``
objects. All four dictionaries are empty if neither branch is matched.
"""
if (public is not None and not public) or (public is None and self.behaving_in_world()):
return (self._proc_in_streams_by_user_hash_prv, {},
self._env_streams_by_user_hash_prv, self._proc_streams_by_user_hash_prv)
if (public is not None and public) or (public is None and not self.behaving_in_world()):
return (self._proc_in_streams_by_user_hash_pub, {},
self._env_streams_by_user_hash_pub, self._proc_streams_by_user_hash_pub)
return {}, {}, {}, {}
|
get_proc_output_net_hash
get_proc_output_net_hash(public: bool = True) -> str | None
Return the network hash of the processor output streams for the requested context.
The network hash is set by create_proc_output_streams and encodes the
owning peer ID, the stream group ("processor"), and the transport type.
It is None if no output streams have been created yet.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool
|
If True, return the hash for the public peer's output streams.
If False, return the hash for the private (world) peer's output streams.
Defaults to True.
|
True
|
Returns:
| Type |
Description |
str | None
|
The network hash string, or None if the output streams have not yet been
|
str | None
|
created for the requested context.
|
Source code in unaiverse/agent_basics.py
| def get_proc_output_net_hash(self, public: bool = True) -> str | None:
"""Return the network hash of the processor output streams for the requested context.
The network hash is set by ``create_proc_output_streams`` and encodes the
owning peer ID, the stream group (``"processor"``), and the transport type.
It is ``None`` if no output streams have been created yet.
Args:
public: If ``True``, return the hash for the public peer's output streams.
If ``False``, return the hash for the private (world) peer's output streams.
Defaults to ``True``.
Returns:
The network hash string, or ``None`` if the output streams have not yet been
created for the requested context.
"""
return self.proc_net_hash['public'] if public else self.proc_net_hash['private']
|
get_proc_input_net_hash(public: bool = True) -> str | None
Return the network hash of the processor input streams for the requested context.
The network hash is set by create_proc_input_streams and encodes the
owning peer ID, the stream group ("processor_in"), and the transport type.
It is None if no input streams have been created yet.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool
|
If True, return the hash for the public peer's input streams.
If False, return the hash for the private (world) peer's input streams.
Defaults to True.
|
True
|
Returns:
| Type |
Description |
str | None
|
The network hash string, or None if the input streams have not yet been
|
str | None
|
created for the requested context.
|
Source code in unaiverse/agent_basics.py
| def get_proc_input_net_hash(self, public: bool = True) -> str | None:
"""Return the network hash of the processor input streams for the requested context.
The network hash is set by ``create_proc_input_streams`` and encodes the
owning peer ID, the stream group (``"processor_in"``), and the transport type.
It is ``None`` if no input streams have been created yet.
Args:
public: If ``True``, return the hash for the public peer's input streams.
If ``False``, return the hash for the private (world) peer's input streams.
Defaults to ``True``.
Returns:
The network hash string, or ``None`` if the input streams have not yet been
created for the requested context.
"""
return self.proc_in_net_hash['public'] if public else self.proc_in_net_hash['private']
|
prepare_stdin_if_human
prepare_stdin_if_human(public: bool, peer_id: str)
Prepare the stdin binding for a human-processor agent before a processing step.
Resets the stdin proxy to the default binding for the given network context,
then cleans up any completed interactions from proc_human_peer_id_to_interaction.
Returns the interaction UUID that should be used to read from stdin: if an
active (not completed) interaction exists for the effective peer, its UUID is
returned so that the ongoing dialogue is resumed; otherwise the system-level
UUID (Custom.SYSTEM_INTERACTION_UUID) is returned.
When operating inside a world (public is False), the peer ID used for
interaction lookup is remapped to the world's peer ID via
_node_conn.get_world_peer_id.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool
|
True if the agent is currently operating on the public network;
False if it is inside a world.
|
required
|
peer_id
|
str
|
The peer ID of the remote party that initiated the current exchange.
Remapped to the world peer ID when public is False.
|
required
|
Returns:
| Type |
Description |
|
|
The UUID string of the active interaction for the given peer, or
|
|
|
Custom.SYSTEM_INTERACTION_UUID if no active interaction exists.
|
Source code in unaiverse/agent_basics.py
| def prepare_stdin_if_human(self, public: bool, peer_id: str):
"""Prepare the ``stdin`` binding for a human-processor agent before a processing step.
Resets the ``stdin`` proxy to the default binding for the given network context,
then cleans up any completed interactions from ``proc_human_peer_id_to_interaction``.
Returns the interaction UUID that should be used to read from ``stdin``: if an
active (not completed) interaction exists for the effective peer, its UUID is
returned so that the ongoing dialogue is resumed; otherwise the system-level
UUID (``Custom.SYSTEM_INTERACTION_UUID``) is returned.
When operating inside a world (``public`` is ``False``), the peer ID used for
interaction lookup is remapped to the world's peer ID via
``_node_conn.get_world_peer_id``.
Args:
public: ``True`` if the agent is currently operating on the public network;
``False`` if it is inside a world.
peer_id: The peer ID of the remote party that initiated the current exchange.
Remapped to the world peer ID when ``public`` is ``False``.
Returns:
The UUID string of the active interaction for the given peer, or
``Custom.SYSTEM_INTERACTION_UUID`` if no active interaction exists.
"""
self.set_default_stdin_binding(public)
# Zero-step: if we are in a world, the peer ID used for indexing the interaction cache is the world peer ID
peer_id = peer_id if public else self._node_conn.get_world_peer_id()
# First of all, clearing residuals of interactions that were completed in the past
to_remove = []
for _peer_id, _interaction in self.proc_human_peer_id_to_interaction.items():
if _interaction.is_completed():
to_remove.append(_peer_id)
for _peer_id in to_remove:
del self.proc_human_peer_id_to_interaction[_peer_id]
# Getting existing interaction or defaulting to the system one
if peer_id in self.proc_human_peer_id_to_interaction:
interaction = self.proc_human_peer_id_to_interaction[peer_id]
return interaction.uuid
else:
return Custom.SYSTEM_INTERACTION_UUID
|
generate_id
staticmethod
Generate a random 8-character hexadecimal ID string.
Uses uuid4 as the entropy source and returns the first eight characters of
its hex representation. Suitable for generating short unique identifiers for
interactions, tags, or other framework objects where full UUID length is not
required.
Returns:
| Type |
Description |
str
|
An 8-character lowercase hexadecimal string (e.g., "3f7a2c1b").
|
Examples:
>>> uid = AgentBasics.generate_id()
>>> len(uid)
8
>>> all(c in '0123456789abcdef' for c in uid)
True
Source code in unaiverse/agent_basics.py
| @staticmethod
def generate_id() -> str:
"""Generate a random 8-character hexadecimal ID string.
Uses ``uuid4`` as the entropy source and returns the first eight characters of
its hex representation. Suitable for generating short unique identifiers for
interactions, tags, or other framework objects where full UUID length is not
required.
Returns:
An 8-character lowercase hexadecimal string (e.g., ``"3f7a2c1b"``).
Examples:
>>> uid = AgentBasics.generate_id()
>>> len(uid)
8
>>> all(c in '0123456789abcdef' for c in uid)
True
"""
return _uuid.uuid4().hex[0:8]
|
inject_received_interaction
inject_received_interaction(sender: str, interaction_dict: dict)
Register an interaction received from a remote peer into the local state machine and interaction manager.
Deserialises interaction_dict into an Interaction object, determines
whether the sender is on the public network or a world peer, and selects the
corresponding HSM (behav_lone_wolf or behav). If the interaction carries
an action name, it is enqueued in the HSM via request_action; otherwise it is
registered directly with the InteractionManager via im.register_received.
Parameters:
| Name |
Type |
Description |
Default |
sender
|
str
|
The peer ID of the remote agent that sent the interaction.
|
required
|
interaction_dict
|
dict
|
A dictionary representation of an Interaction, as
produced by Interaction.to_dict.
|
required
|
Raises:
| Type |
Description |
AssertionError
|
If the resolved behavioral HSM is not a HybridStateMachine
instance.
|
Note
If request_action returns False (the action cannot be enqueued), an
error is logged and the interaction is silently dropped rather than raising
an exception.
Source code in unaiverse/agent_basics.py
| def inject_received_interaction(self, sender: str, interaction_dict: dict):
"""Register an interaction received from a remote peer into the local state machine and interaction manager.
Deserialises ``interaction_dict`` into an ``Interaction`` object, determines
whether the sender is on the public network or a world peer, and selects the
corresponding HSM (``behav_lone_wolf`` or ``behav``). If the interaction carries
an action name, it is enqueued in the HSM via ``request_action``; otherwise it is
registered directly with the ``InteractionManager`` via ``im.register_received``.
Args:
sender: The peer ID of the remote agent that sent the interaction.
interaction_dict: A dictionary representation of an ``Interaction``, as
produced by ``Interaction.to_dict``.
Raises:
AssertionError: If the resolved behavioral HSM is not a ``HybridStateMachine``
instance.
Note:
If ``request_action`` returns ``False`` (the action cannot be enqueued), an
error is logged and the interaction is silently dropped rather than raising
an exception.
"""
# Creating the interaction object
interaction = Interaction.from_dict(interaction_dict)
# Marking
public = sender in self.public_agents
# Also register with the HSM for action selection
behav = self.behav_lone_wolf if public else self.behav
assert isinstance(behav, HybridStateMachine)
if (interaction.action_name is not None and len(interaction.action_name) > 0 and
not behav.request_action(interaction)): # Here the interaction links to action (IM does not do this!)
log.error(f"Cannot enqueue a received interaction!")
else:
# Register with the InteractionManager
log.misc(f"Registering received interaction for action "
f"{interaction.action_name}")
self.im.register_received(interaction, public)
|
build_augmented_roles_dictionaries
staticmethod
build_augmented_roles_dictionaries(custom_roles: list[str] | set[str]) -> tuple[dict[int, str], dict[str, int]]
Build complete role lookup tables by combining default roles with custom roles.
Starts from the three built-in roles ("public_agent", "world_agent",
"world_master") and appends each custom role string as a new bitmask entry
starting at bit 2. For every custom role, two augmented (compound) roles are
generated: one prefixed with "world_agent~" and one with "world_master~".
Compound roles are formed by OR-ing the base role bits with the custom role bits,
and their string representation uses "~" as a separator
(e.g., "world_agent~student").
The resulting dictionaries are also side-effect-written into
AgentBasics.ROLE_STR_TO_BITS for global accessibility.
Parameters:
| Name |
Type |
Description |
Default |
custom_roles
|
list[str] | set[str]
|
An ordered list (or set) of custom role name strings. The
assignment of bitmasks is index-based, so a list is preferred when
reproducibility matters. The maximum supported length is 30.
|
required
|
Returns:
| Type |
Description |
dict[int, str]
|
A two-tuple (role_bits_to_str, role_str_to_bits) where:
|
dict[str, int]
|
role_bits_to_str maps integer bitmasks to human-readable role strings.
|
tuple[dict[int, str], dict[str, int]]
|
role_str_to_bits maps human-readable role strings to integer bitmasks.
|
Note
Passing more than 30 custom roles logs a critical warning but does not raise
an exception; the extra roles are assigned bitmasks that may overlap with
bits used by other parts of the framework.
Source code in unaiverse/agent_basics.py
| @staticmethod
def build_augmented_roles_dictionaries(custom_roles: list[str] | set[str]) -> tuple[dict[int, str], dict[str, int]]:
"""Build complete role lookup tables by combining default roles with custom roles.
Starts from the three built-in roles (``"public_agent"``, ``"world_agent"``,
``"world_master"``) and appends each custom role string as a new bitmask entry
starting at bit 2. For every custom role, two augmented (compound) roles are
generated: one prefixed with ``"world_agent~"`` and one with ``"world_master~"``.
Compound roles are formed by OR-ing the base role bits with the custom role bits,
and their string representation uses ``"~"`` as a separator
(e.g., ``"world_agent~student"``).
The resulting dictionaries are also side-effect-written into
``AgentBasics.ROLE_STR_TO_BITS`` for global accessibility.
Args:
custom_roles: An ordered list (or set) of custom role name strings. The
assignment of bitmasks is index-based, so a list is preferred when
reproducibility matters. The maximum supported length is 30.
Returns:
A two-tuple ``(role_bits_to_str, role_str_to_bits)`` where:
- ``role_bits_to_str`` maps integer bitmasks to human-readable role strings.
- ``role_str_to_bits`` maps human-readable role strings to integer bitmasks.
Note:
Passing more than 30 custom roles logs a critical warning but does not raise
an exception; the extra roles are assigned bitmasks that may overlap with
bits used by other parts of the framework.
"""
role_bits_to_str = {k: v for k, v in AgentBasics.ROLE_BITS_TO_STR.items()}
role_str_to_bits = {k: v for k, v in AgentBasics.ROLE_STR_TO_BITS.items()}
# Both Agent and World: Fusing basic roles and custom roles
if len(custom_roles) > 0:
if len(custom_roles) > 30: # Safe value, could be increased
log.critical("Maximum number of custom role overcame (max is 30)")
for i, role_str in enumerate(custom_roles):
role_int = 1 << (i + 2) # 000000100, then 00001000, etc. (recall that the first two bits are reserved)
role_str_to_bits[role_str] = role_int
role_bits_to_str[role_int] = role_str
AgentBasics.ROLE_STR_TO_BITS[role_str] = role_int
# Both Agent and World: Augmenting roles
roles_not_to_be_augmented = {AgentBasics.ROLE_PUBLIC,
AgentBasics.ROLE_WORLD_AGENT,
AgentBasics.ROLE_WORLD_MASTER}
role_bits_to_str_original = {k: v for k, v in role_bits_to_str.items()}
for role_int, role_str in role_bits_to_str_original.items():
if role_int not in roles_not_to_be_augmented and "~" not in role_str:
for role_base_int in {AgentBasics.ROLE_WORLD_AGENT, AgentBasics.ROLE_WORLD_MASTER}:
augmented_role_int = role_base_int | role_int
augmented_role_str = role_bits_to_str[role_base_int] + "~" + role_str
if augmented_role_str not in role_str_to_bits:
role_str_to_bits[augmented_role_str] = augmented_role_int
role_bits_to_str[augmented_role_int] = augmented_role_str
return role_bits_to_str, role_str_to_bits
|
augment_roles
augment_roles(role_bits_to_str: dict[int, str] | None = None, role_str_to_bits: dict[str, int] | None = None)
Update the instance-level role lookup tables with default and custom roles.
If pre-built dictionaries are not provided, build_augmented_roles_dictionaries
is called with self.CUSTOM_ROLES to generate them. The class-level
ROLE_BITS_TO_STR and ROLE_STR_TO_BITS dictionaries are then cleared and
repopulated in place so that all references to these objects (held by the HSMs and
the network node) see the updated content immediately.
This method is called automatically by set_node_info for world nodes and
should not normally be invoked by user code.
Parameters:
| Name |
Type |
Description |
Default |
role_bits_to_str
|
dict[int, str] | None
|
A pre-built mapping from integer bitmasks to role strings,
as returned by build_augmented_roles_dictionaries. If None, the
mapping is generated from self.CUSTOM_ROLES. Defaults to None.
|
None
|
role_str_to_bits
|
dict[str, int] | None
|
A pre-built mapping from role strings to integer bitmasks.
Must be provided together with role_bits_to_str or both should be
None. Defaults to None.
|
None
|
Source code in unaiverse/agent_basics.py
| def augment_roles(self,
role_bits_to_str: dict[int, str] | None = None,
role_str_to_bits: dict[str, int] | None = None):
"""Update the instance-level role lookup tables with default and custom roles.
If pre-built dictionaries are not provided, ``build_augmented_roles_dictionaries``
is called with ``self.CUSTOM_ROLES`` to generate them. The class-level
``ROLE_BITS_TO_STR`` and ``ROLE_STR_TO_BITS`` dictionaries are then cleared and
repopulated in place so that all references to these objects (held by the HSMs and
the network node) see the updated content immediately.
This method is called automatically by ``set_node_info`` for world nodes and
should not normally be invoked by user code.
Args:
role_bits_to_str: A pre-built mapping from integer bitmasks to role strings,
as returned by ``build_augmented_roles_dictionaries``. If ``None``, the
mapping is generated from ``self.CUSTOM_ROLES``. Defaults to ``None``.
role_str_to_bits: A pre-built mapping from role strings to integer bitmasks.
Must be provided together with ``role_bits_to_str`` or both should be
``None``. Defaults to ``None``.
"""
if role_bits_to_str is None or role_str_to_bits is None:
role_bits_to_str, role_str_to_bits = AgentBasics.build_augmented_roles_dictionaries(self.CUSTOM_ROLES)
self.ROLE_BITS_TO_STR.clear()
for k, v in role_bits_to_str.items():
self.ROLE_BITS_TO_STR[k] = v
self.ROLE_STR_TO_BITS.clear()
for k, v in role_str_to_bits.items():
self.ROLE_STR_TO_BITS[k] = v
|
clear_world_related_data()
Destroy all world-related cached state after leaving a world (async).
Resets agent status attributes, removes all private (world-scoped) streams via
the internal __remove_all_world_private_streams helper, removes all peers
tracked as world agents or world masters via __remove_all_world_related_agents,
and resets the rendezvous tag on the connection pool. This is the canonical
cleanup path invoked when the agent disconnects from a world.
Note
Public streams and public-network agents are not removed by this method. Only
world-private streams and world-related peer entries are cleared.
Source code in unaiverse/agent_basics.py
| async def clear_world_related_data(self):
"""Destroy all world-related cached state after leaving a world (async).
Resets agent status attributes, removes all private (world-scoped) streams via
the internal ``__remove_all_world_private_streams`` helper, removes all peers
tracked as world agents or world masters via ``__remove_all_world_related_agents``,
and resets the rendezvous tag on the connection pool. This is the canonical
cleanup path invoked when the agent disconnects from a world.
Note:
Public streams and public-network agents are not removed by this method. Only
world-private streams and world-related peer entries are cleared.
"""
# Clearing status variables
self.reset_agent_status_attrs()
# Clear/reset
await self.__remove_all_world_private_streams()
await self.__remove_all_world_related_agents()
self._node_conn.reset_rendezvous_tag()
|
load_and_refactor_action_file_and_behav_files
load_and_refactor_action_file_and_behav_files(force_save: bool = False)
Load role-behaviour JSON files and validate them against the world's Python action files.
This method is called during world initialisation (inside set_node_info). It
performs the following steps for each role discovered from the .json files in
world_folder:
- Collects custom role names from the JSON filenames and stores them in
self.CUSTOM_ROLES.
- Packs all Python files found in
world_folder (excluding pdf and stats
sub-directories) into self.packed_agent_files for distribution to joining
agents.
- For each role, instantiates a temporary dummy agent from the corresponding
.py file and verifies that its CUSTOM_ROLES match those of the world.
- Loads the role's
.json HSM definition into a HybridStateMachine, stores
its serialised form in self.role_to_behav, and writes it back to the node
profile under world_roles_fsm.
- Re-saves the JSON (refactoring) and generates a PDF diagram under
<world_folder>/pdf/ if the content has changed or force_save is
True.
This method has no effect on non-world instances (self.is_world is False
or world_folder is None).
Parameters:
| Name |
Type |
Description |
Default |
force_save
|
bool
|
If True, the JSON and PDF files are always re-written even if
the behaviour has not changed. Defaults to False.
|
False
|
Raises:
| Type |
Description |
GenException
|
If no .json role files are found in world_folder.
|
GenException
|
If the Python files in world_folder cannot be collected or
packed.
|
GenException
|
If a dummy agent cannot be instantiated for a given role.
|
GenException
|
If the CUSTOM_ROLES declared in the Python file do not match
those derived from the JSON filenames.
|
GenException
|
If a behaviour file cannot be loaded or saved.
|
Source code in unaiverse/agent_basics.py
| def load_and_refactor_action_file_and_behav_files(self, force_save: bool = False):
"""Load role-behaviour JSON files and validate them against the world's Python action files.
This method is called during world initialisation (inside ``set_node_info``). It
performs the following steps for each role discovered from the ``.json`` files in
``world_folder``:
1. Collects custom role names from the JSON filenames and stores them in
``self.CUSTOM_ROLES``.
2. Packs all Python files found in ``world_folder`` (excluding ``pdf`` and ``stats``
sub-directories) into ``self.packed_agent_files`` for distribution to joining
agents.
3. For each role, instantiates a temporary dummy agent from the corresponding
``.py`` file and verifies that its ``CUSTOM_ROLES`` match those of the world.
4. Loads the role's ``.json`` HSM definition into a ``HybridStateMachine``, stores
its serialised form in ``self.role_to_behav``, and writes it back to the node
profile under ``world_roles_fsm``.
5. Re-saves the JSON (refactoring) and generates a PDF diagram under
``<world_folder>/pdf/`` if the content has changed or ``force_save`` is
``True``.
This method has no effect on non-world instances (``self.is_world`` is ``False``
or ``world_folder`` is ``None``).
Args:
force_save: If ``True``, the JSON and PDF files are always re-written even if
the behaviour has not changed. Defaults to ``False``.
Raises:
GenException: If no ``.json`` role files are found in ``world_folder``.
GenException: If the Python files in ``world_folder`` cannot be collected or
packed.
GenException: If a dummy agent cannot be instantiated for a given role.
GenException: If the ``CUSTOM_ROLES`` declared in the Python file do not match
those derived from the JSON filenames.
GenException: If a behaviour file cannot be loaded or saved.
"""
# World only: the world discovers CUSTOM_ROLES from the JSON files in the world folder
if self.world_folder is not None and self.is_world:
# Guessing roles from the list of json files
listdir = os.listdir(self.world_folder) # Do this only once
self.CUSTOM_ROLES = [os.path.splitext(f)[0] for f in listdir
if os.path.isfile(os.path.join(self.world_folder, f))
and f.lower().endswith(".json")]
if len(self.CUSTOM_ROLES) == 0:
raise GenException(f"No world-role files (*.json) were found in the world folder {self.world_folder}")
# Default behaviors (getting roles, that are the names of the files with extension "json")
default_behav_files = [os.path.join(self.world_folder, f) for f in listdir
if os.path.isfile(os.path.join(self.world_folder, f)) and
f.lower().endswith(".json")]
# Loading Python files in the "world" folder
try:
self.packed_agent_files = pack_py_files(collect_py_files(self.world_folder,
exclude_dirs={'pdf', 'stats'}))
except Exception as e:
raise GenException(f'Error while reading the agent-related python files in {self.world_folder} [{e}]')
# Loading and refactoring behaviors
agent_files = unpack_py_files(self.packed_agent_files)
# We keep the compatibility with the case of a single agent.py
if len(self.CUSTOM_ROLES) > 0:
found = 0
found_agent_py = False
for role in self.CUSTOM_ROLES:
for file_name in agent_files.keys():
if file_name == "agent.py":
found_agent_py = True
if file_name == f"{role}.py":
found += 1
break
if found != len(self.CUSTOM_ROLES):
if found_agent_py:
code_content = agent_files["agent.py"]
# All roles go to the content of agent.py
agent_files = {f"{k}.py": code_content for k in self.CUSTOM_ROLES}
self.packed_agent_files = pack_py_files(agent_files) # Repacking
else:
raise GenException(f'Mismatching JSON-file roles and .py files in {self.world_folder}')
for role, default_behav_file in zip(self.CUSTOM_ROLES, default_behav_files):
# Creating a dummy agent which supports the actions of the following state machines
try:
dummy_agent, dummy_agent_memory_finder = load_agent_in_memory(agent_files, role, proc=None)
dummy_agent.CUSTOM_ROLES = self.CUSTOM_ROLES
except Exception as e:
raise GenException(f'Unable to create a valid dummy agent object for agent with role {role} [{e}]')
# Checking if the roles you wrote in agent.py are coherent with the JSON files in this folder
if dummy_agent.CUSTOM_ROLES != self.CUSTOM_ROLES:
raise GenException(f"Mismatching roles. "
f"Roles in JSON files: {self.CUSTOM_ROLES}. "
f"Roles specified in the agent.py file: {dummy_agent.CUSTOM_ROLES}")
try:
behav = HybridStateMachine(dummy_agent)
behav.load(default_behav_file)
self.role_to_behav[role] = str(behav)
# Adding roles and machines to profile
self._node_profile.get_dynamic_profile()['world_roles_fsm'] = self.role_to_behav
except Exception as e:
raise GenException(f'Error while loading or handling '
f'behav file {default_behav_file} for role {role} [{e}]')
# Refactoring and saving PDF
try:
if (force_save or
behav.save(os.path.join(self.world_folder, f'{role}.json'), only_if_changed=dummy_agent)):
os.makedirs(os.path.join(self.world_folder, 'pdf'), exist_ok=True)
behav.save_pdf(os.path.join(self.world_folder, 'pdf', f'{role}.pdf'))
except Exception as e:
raise GenException(f'Error while saving the behav file {default_behav_file} for role {role} [{e}]')
# Clearing memory
dummy_agent_memory_finder.cleanup()
|
create_behav_files
Hook for world subclasses to generate role-behaviour JSON files programmatically.
This method is called by set_node_info before
load_and_refactor_action_file_and_behav_files, giving subclasses an
opportunity to create or overwrite the .json behaviour files in
world_folder at runtime rather than maintaining them as static assets.
The default implementation does nothing. Override this method in a World
subclass to generate JSON files dynamically; the files written here will be
picked up immediately by the subsequent loading step.
Note
Implementing this hook is entirely optional. If the .json files already
exist in world_folder and do not need to be regenerated at startup,
there is no need to override this method.
Source code in unaiverse/agent_basics.py
| def create_behav_files(self):
"""Hook for world subclasses to generate role-behaviour JSON files programmatically.
This method is called by ``set_node_info`` before
``load_and_refactor_action_file_and_behav_files``, giving subclasses an
opportunity to create or overwrite the ``.json`` behaviour files in
``world_folder`` at runtime rather than maintaining them as static assets.
The default implementation does nothing. Override this method in a ``World``
subclass to generate JSON files dynamically; the files written here will be
picked up immediately by the subsequent loading step.
Note:
Implementing this hook is entirely optional. If the ``.json`` files already
exist in ``world_folder`` and do not need to be regenerated at startup,
there is no need to override this method.
"""
pass
|
get_name
Return the human-readable name of this agent or world node.
The name is set at node creation time and stored in the static section of the
node's profile ("node_name"). It is available as soon as set_node_info
has been called; before that it defaults to "unk".
Returns:
| Type |
Description |
str
|
The "node_name" string from the node's static profile.
|
Source code in unaiverse/agent_basics.py
| def get_name(self) -> str:
"""Return the human-readable name of this agent or world node.
The name is set at node creation time and stored in the static section of the
node's profile (``"node_name"``). It is available as soon as ``set_node_info``
has been called; before that it defaults to ``"unk"``.
Returns:
The ``"node_name"`` string from the node's static profile.
"""
return self._node_name
|
get_profile
Return the NodeProfile of the node hosting this agent or world.
The profile provides access to the node's static properties (name, email,
node type, node ID) via get_static_profile(), its dynamic properties
(peer IDs, role, connected peers, streams) via get_dynamic_profile(), and
the node's curriculum vitae (badges) via get_cv(). The returned object is
the live profile instance shared with the networking layer; callers should not
modify it directly.
Returns:
Source code in unaiverse/agent_basics.py
| def get_profile(self) -> NodeProfile:
"""Return the ``NodeProfile`` of the node hosting this agent or world.
The profile provides access to the node's static properties (name, email,
node type, node ID) via ``get_static_profile()``, its dynamic properties
(peer IDs, role, connected peers, streams) via ``get_dynamic_profile()``, and
the node's curriculum vitae (badges) via ``get_cv()``. The returned object is
the live profile instance shared with the networking layer; callers should not
modify it directly.
Returns:
The ``NodeProfile`` instance of this node, or ``None`` if ``set_node_info``
has not yet been called.
"""
return self._node_profile
|
get_role
get_role(agent: str) -> str | None
Return the base role string of a connected peer.
Reads the integer role bitmask from the connection pool via _node_conn.get_role,
then strips the lower two bits (which encode the world-agent/world-master distinction)
to obtain the custom role component. The resulting base role integer is looked up in
ROLE_BITS_TO_STR.
To retrieve the role of the agent itself rather than a peer, use
get_current_role.
Parameters:
| Name |
Type |
Description |
Default |
agent
|
str
|
The peer ID of the connected agent whose role is requested.
|
required
|
Returns:
| Type |
Description |
str | None
|
The human-readable base role string (e.g., "world_agent" or a custom role
|
str | None
|
name from CUSTOM_ROLES), or None if the role bitmask is not present in
|
str | None
|
|
Source code in unaiverse/agent_basics.py
| def get_role(self, agent: str) -> str | None:
"""Return the base role string of a connected peer.
Reads the integer role bitmask from the connection pool via ``_node_conn.get_role``,
then strips the lower two bits (which encode the world-agent/world-master distinction)
to obtain the custom role component. The resulting base role integer is looked up in
``ROLE_BITS_TO_STR``.
To retrieve the role of the agent itself rather than a peer, use
``get_current_role``.
Args:
agent: The peer ID of the connected agent whose role is requested.
Returns:
The human-readable base role string (e.g., ``"world_agent"`` or a custom role
name from ``CUSTOM_ROLES``), or ``None`` if the role bitmask is not present in
``ROLE_BITS_TO_STR``.
"""
role_int = self._node_conn.get_role(agent)
base_role_int = (role_int >> 2) << 2
return self.ROLE_BITS_TO_STR[base_role_int] if base_role_int in self.ROLE_BITS_TO_STR else None
|
get_agents_by_role
get_agents_by_role(role: str | list[str], handshake_completed: bool = True)
Return the peer IDs of all connected agents that match one or more roles.
For each role string in role, the method resolves the corresponding integer
bitmask from ROLE_STR_TO_BITS. If the role string is not augmented (i.e., does
not contain the "~" separator), both the world-agent and world-master variants
of that role are searched. If the role string is augmented (e.g.,
"world_agent~student"), only that exact compound role is searched.
Agents are then collected from the connection pool via
_node_conn.find_addrs_by_role. When handshake_completed is True, only
peers that are also present in all_agents (meaning the full handshake is done)
are included.
Parameters:
| Name |
Type |
Description |
Default |
role
|
str | list[str]
|
A single role string or a list of role strings to match against. Each
string must be a key in ROLE_STR_TO_BITS.
|
required
|
handshake_completed
|
bool
|
If True, only peers whose handshake has completed
(i.e., that appear in all_agents) are returned. If False, all
matching peers from the connection pool are returned regardless of handshake
state. Defaults to True.
|
True
|
Returns:
| Type |
Description |
|
|
A list of peer ID strings for all agents whose role matches any entry in
|
|
|
role. The list may contain duplicates if a peer matches multiple roles.
|
Examples:
>>> agents = my_agent.get_agents_by_role("world_agent")
>>> agents = my_agent.get_agents_by_role(["world_master", "world_agent~student"])
Source code in unaiverse/agent_basics.py
| def get_agents_by_role(self, role: str | list[str], handshake_completed: bool = True):
"""Return the peer IDs of all connected agents that match one or more roles.
For each role string in ``role``, the method resolves the corresponding integer
bitmask from ``ROLE_STR_TO_BITS``. If the role string is not augmented (i.e., does
not contain the ``"~"`` separator), both the world-agent and world-master variants
of that role are searched. If the role string is augmented (e.g.,
``"world_agent~student"``), only that exact compound role is searched.
Agents are then collected from the connection pool via
``_node_conn.find_addrs_by_role``. When ``handshake_completed`` is ``True``, only
peers that are also present in ``all_agents`` (meaning the full handshake is done)
are included.
Args:
role: A single role string or a list of role strings to match against. Each
string must be a key in ``ROLE_STR_TO_BITS``.
handshake_completed: If ``True``, only peers whose handshake has completed
(i.e., that appear in ``all_agents``) are returned. If ``False``, all
matching peers from the connection pool are returned regardless of handshake
state. Defaults to ``True``.
Returns:
A list of peer ID strings for all agents whose role matches any entry in
``role``. The list may contain duplicates if a peer matches multiple roles.
Examples:
>>> agents = my_agent.get_agents_by_role("world_agent")
>>> agents = my_agent.get_agents_by_role(["world_master", "world_agent~student"])
"""
role_list = role if isinstance(role, list) else [role]
searched_roles_int = set()
for role in role_list:
is_augmented = "~" in role
role_int = self.ROLE_STR_TO_BITS[role]
if not is_augmented:
role_int = self.ROLE_STR_TO_BITS[role]
for role_prefix_int in {AgentBasics.ROLE_WORLD_AGENT, AgentBasics.ROLE_WORLD_MASTER}:
searched_roles_int.add(role_prefix_int | role_int)
else:
searched_roles_int = {role_int}
found_agents = []
for role_int in searched_roles_int:
_, _found_connected_peer_ids = self._node_conn.find_addrs_by_role(role_int, return_peer_ids_too=True)
if handshake_completed:
for agent in _found_connected_peer_ids:
if agent in self.all_agents:
found_agents.append(agent)
else:
found_agents += _found_connected_peer_ids
return found_agents
|
get_current_role
get_current_role(return_int: bool = False, ignore_base_role: bool = True) -> str | int | None
Return the current role of this agent within the world it has joined.
The role is read from the agent's own dynamic profile (connections.role). When
ignore_base_role is True, only the custom role segment after the "~"
separator is returned (e.g., "student" instead of "world_agent~student").
When return_int is True, the role string is converted back to its integer
bitmask via ROLE_STR_TO_BITS.
If the agent is not currently connected to a world (in_world() returns
False), None is returned without accessing the profile.
Parameters:
| Name |
Type |
Description |
Default |
return_int
|
bool
|
If True, return the integer bitmask of the role instead of the
string. Defaults to False.
|
False
|
ignore_base_role
|
bool
|
If True, strip the base role prefix (e.g.,
"world_agent~") and return only the custom part. Has no effect when the
role does not contain "~". Defaults to True.
|
True
|
Returns:
| Type |
Description |
str | int | None
|
The role as a string (or integer when return_int is True), or None
|
str | int | None
|
if the agent is not living in any worlds.
|
Source code in unaiverse/agent_basics.py
| def get_current_role(self, return_int: bool = False, ignore_base_role: bool = True) -> str | int | None:
"""Return the current role of this agent within the world it has joined.
The role is read from the agent's own dynamic profile (``connections.role``). When
``ignore_base_role`` is ``True``, only the custom role segment after the ``"~"``
separator is returned (e.g., ``"student"`` instead of ``"world_agent~student"``).
When ``return_int`` is ``True``, the role string is converted back to its integer
bitmask via ``ROLE_STR_TO_BITS``.
If the agent is not currently connected to a world (``in_world()`` returns
``False``), ``None`` is returned without accessing the profile.
Args:
return_int: If ``True``, return the integer bitmask of the role instead of the
string. Defaults to ``False``.
ignore_base_role: If ``True``, strip the base role prefix (e.g.,
``"world_agent~"``) and return only the custom part. Has no effect when the
role does not contain ``"~"``. Defaults to ``True``.
Returns:
The role as a string (or integer when ``return_int`` is ``True``), or ``None``
if the agent is not living in any worlds.
"""
if self.in_world():
role_str = self._node_profile.get_dynamic_profile()['connections']['role']
if ignore_base_role:
role_str = role_str.split("~")[-1]
if not return_int:
return role_str
else:
return self.ROLE_STR_TO_BITS[role_str]
else:
return None
|
add_agent
async
add_agent(peer_id: str, profile: NodeProfile, add_proc_streams: bool = True, add_env_streams: bool = True, add_pubsub_streams: bool = True) -> bool
Register a new peer agent and import its compatible streams (async).
If the peer is already tracked in all_agents, it is first removed via
remove_agent and then re-added so that updated profile data is always
reflected. The peer's role bitmask (read from the connection pool) determines
which membership dictionary it lands in: public_agents, world_agents, or
world_masters. The node type from the static profile further places the peer in
human_agents or artificial_agents.
When a processor is configured (proc_outputs and proc_inputs are not
None), the method checks the peer's environmental streams (from
profile.get_dynamic_profile()['streams']) and processor output streams (from
profile.get_dynamic_profile()['proc_outputs']) for compatibility with this
agent's processor via add_compatible_streams.
Parameters:
| Name |
Type |
Description |
Default |
peer_id
|
str
|
The unique identifier of the peer to add.
|
required
|
profile
|
NodeProfile
|
The NodeProfile object containing the peer's profile information.
|
required
|
add_proc_streams
|
bool
|
If True, the processor output streams of the peer are
checked for compatibility and added when compatible. Defaults to True.
|
True
|
add_env_streams
|
bool
|
If True, the environmental streams of the peer are checked
for compatibility and added when compatible. Defaults to True.
|
True
|
add_pubsub_streams
|
bool
|
If True, PubSub streams are subscribed to when added.
Defaults to True.
|
True
|
Returns:
| Type |
Description |
bool
|
True if the agent was successfully added, False if an unknown role was
|
bool
|
encountered or if a stream compatibility check failed.
|
Examples:
>>> ok = await my_agent.add_agent(peer_id, profile)
>>> assert ok
Source code in unaiverse/agent_basics.py
| async def add_agent(self, peer_id: str, profile: NodeProfile,
add_proc_streams: bool = True,
add_env_streams: bool = True,
add_pubsub_streams: bool = True) -> bool:
"""Register a new peer agent and import its compatible streams (async).
If the peer is already tracked in ``all_agents``, it is first removed via
``remove_agent`` and then re-added so that updated profile data is always
reflected. The peer's role bitmask (read from the connection pool) determines
which membership dictionary it lands in: ``public_agents``, ``world_agents``, or
``world_masters``. The node type from the static profile further places the peer in
``human_agents`` or ``artificial_agents``.
When a processor is configured (``proc_outputs`` and ``proc_inputs`` are not
``None``), the method checks the peer's environmental streams (from
``profile.get_dynamic_profile()['streams']``) and processor output streams (from
``profile.get_dynamic_profile()['proc_outputs']``) for compatibility with this
agent's processor via ``add_compatible_streams``.
Args:
peer_id: The unique identifier of the peer to add.
profile: The ``NodeProfile`` object containing the peer's profile information.
add_proc_streams: If ``True``, the processor output streams of the peer are
checked for compatibility and added when compatible. Defaults to ``True``.
add_env_streams: If ``True``, the environmental streams of the peer are checked
for compatibility and added when compatible. Defaults to ``True``.
add_pubsub_streams: If ``True``, PubSub streams are subscribed to when added.
Defaults to ``True``.
Returns:
``True`` if the agent was successfully added, ``False`` if an unknown role was
encountered or if a stream compatibility check failed.
Examples:
>>> ok = await my_agent.add_agent(peer_id, profile)
>>> assert ok
"""
# If the agent was already there, we remove it and add it again (in case of changes)
await self.remove_agent(peer_id) # It has no effects if the agent is not existing
# Guessing the type of agent to add (accordingly to the default roles shared by every agent)
role = self._node_conn.get_role(peer_id)
self.all_agents[peer_id] = profile
if role & 1 == self.ROLE_PUBLIC:
self.public_agents[peer_id] = profile
public = True
elif role & 3 == self.ROLE_WORLD_AGENT:
self.world_agents[peer_id] = profile
public = False
elif role & 3 == self.ROLE_WORLD_MASTER:
self.world_masters[peer_id] = profile
public = False
else:
log.error(f"Cannot add agent with peer ID {peer_id} - unknown role: {role}")
return False
# Human or artificial?
if profile.get_static_profile()["node_type"] == AgentBasics.HUMAN:
self.human_agents[peer_id] = profile
else:
self.artificial_agents[peer_id] = profile
# Check compatibility of the streams owned by the agent we are adding with our-agent's processor
if self.proc_outputs is not None and self.proc_inputs is not None:
# Check compatibility of the environmental streams of the agent we are adding with our-agent's processor
environmental_streams = profile.get_dynamic_profile()['streams']
if (add_env_streams and environmental_streams is not None and
not (await self.add_compatible_streams(peer_id, environmental_streams,
buffered=False,
public=public,
skip_pubsub=not add_pubsub_streams))):
return False
# Check compatibility of the generated streams of the agent we are adding with our-agent's processor
proc_streams = profile.get_dynamic_profile()['proc_outputs']
if (add_proc_streams and proc_streams is not None and
not (await self.add_compatible_streams(peer_id, profile.get_dynamic_profile()['proc_outputs'],
buffered=False,
public=public,
skip_pubsub=not add_pubsub_streams))):
return False
log.misc(f"Successfully added agent {profile.get_static_profile()['node_name']} "
f"with peer ID {peer_id} (public: {public})")
return True
|
remove_agent
async
remove_agent(peer_id: str)
Remove a peer agent and clean up all associated state (async).
If the peer is not present in all_agents, the call is a no-op. Otherwise the
peer is removed from every membership dictionary (all_agents, world_agents,
world_masters, public_agents, human_agents, artificial_agents),
from the compatible-stream sets (compat_in_streams, compat_out_streams),
from the known-stream catalogue via remove_streams, from per-agent status
attributes via remove_peer_from_agent_status_attrs, from the buffered-stream
index, and from the interaction manager via im.remove_interactions_of_agent.
This method is the counterpart of add_agent and is also called internally
whenever a peer disconnects.
Parameters:
| Name |
Type |
Description |
Default |
peer_id
|
str
|
The unique peer ID of the agent to remove.
|
required
|
Note
If peer_id is not found in all_agents, the method returns without
modifying any state and without raising an exception.
Source code in unaiverse/agent_basics.py
| async def remove_agent(self, peer_id: str):
"""Remove a peer agent and clean up all associated state (async).
If the peer is not present in ``all_agents``, the call is a no-op. Otherwise the
peer is removed from every membership dictionary (``all_agents``, ``world_agents``,
``world_masters``, ``public_agents``, ``human_agents``, ``artificial_agents``),
from the compatible-stream sets (``compat_in_streams``, ``compat_out_streams``),
from the known-stream catalogue via ``remove_streams``, from per-agent status
attributes via ``remove_peer_from_agent_status_attrs``, from the buffered-stream
index, and from the interaction manager via ``im.remove_interactions_of_agent``.
This method is the counterpart of ``add_agent`` and is also called internally
whenever a peer disconnects.
Args:
peer_id: The unique peer ID of the agent to remove.
Note:
If ``peer_id`` is not found in ``all_agents``, the method returns without
modifying any state and without raising an exception.
"""
if peer_id in self.all_agents:
# Removing from agent list
del self.all_agents[peer_id]
if peer_id in self.world_agents:
del self.world_agents[peer_id]
elif peer_id in self.world_masters:
del self.world_masters[peer_id]
elif peer_id in self.public_agents:
del self.public_agents[peer_id]
if peer_id in self.artificial_agents:
del self.artificial_agents[peer_id]
elif peer_id in self.human_agents:
del self.human_agents[peer_id]
# Clearing from the list of processor-input-compatible-streams
if self.compat_in_streams is not None:
for i, _ in enumerate(self.compat_in_streams):
to_remove = []
for net_hash_name in self.compat_in_streams[i]:
if DataProps.peer_id_from_net_hash(net_hash_name[0]) == peer_id:
to_remove.append(net_hash_name)
for net_hash_name in to_remove:
self.compat_in_streams[i].remove(net_hash_name)
# Clearing from the list of processor-output-compatible-streams
if self.compat_out_streams is not None:
for i, _ in enumerate(self.compat_out_streams):
to_remove = []
for net_hash_name in self.compat_out_streams[i]:
if DataProps.peer_id_from_net_hash(net_hash_name[0]) == peer_id:
to_remove.append(net_hash_name)
for net_hash_name in to_remove:
self.compat_out_streams[i].remove(net_hash_name)
# Clearing streams owned by the removed agent from the list of known streams
await self.remove_streams(peer_id)
# Removing from the status variables
self.remove_peer_from_agent_status_attrs(peer_id)
# Updating buffered stream index
if peer_id in self.last_buffered_peer_id_to_info:
del self.last_buffered_peer_id_to_info[peer_id] # Only if present
# Clearing pending interactions
await self.im.remove_interactions_of_agent(peer_id)
log.misc(f"Successfully removed agent with peer ID {peer_id}")
|
remove_all_agents
Remove all known peer agents and reset membership and stream state.
All six membership dictionaries (all_agents, public_agents,
world_masters, world_agents, human_agents, artificial_agents) are
cleared. The compatible-stream sets compat_in_streams and compat_out_streams
are re-initialised to empty sets (one per processor input/output entry respectively)
so that fresh compatibility tracking can begin. Finally, all non-owned known streams
are removed via remove_all_streams(owned_too=False), preserving the agent's own
streams.
This method does not unsubscribe from PubSub channels; if graceful unsubscription
is needed, prefer iterating over agents and calling remove_agent on each.
Source code in unaiverse/agent_basics.py
| def remove_all_agents(self):
"""Remove all known peer agents and reset membership and stream state.
All six membership dictionaries (``all_agents``, ``public_agents``,
``world_masters``, ``world_agents``, ``human_agents``, ``artificial_agents``) are
cleared. The compatible-stream sets ``compat_in_streams`` and ``compat_out_streams``
are re-initialised to empty sets (one per processor input/output entry respectively)
so that fresh compatibility tracking can begin. Finally, all non-owned known streams
are removed via ``remove_all_streams(owned_too=False)``, preserving the agent's own
streams.
This method does not unsubscribe from PubSub channels; if graceful unsubscription
is needed, prefer iterating over agents and calling ``remove_agent`` on each.
"""
# Clearing all agents
self.all_agents = {}
self.public_agents = {}
self.world_masters = {}
self.world_agents = {}
self.human_agents = {}
self.artificial_agents = {}
# Clearing the list of processor-output-compatible-streams
if self.compat_in_streams is not None and self.proc_inputs is not None:
self.compat_in_streams = [set() for _ in range(len(self.proc_inputs))]
if self.compat_out_streams is not None and self.proc_outputs is not None:
self.compat_out_streams = [set() for _ in range(len(self.proc_outputs))]
# Clearing the list of known streams (not our own streams!)
self.remove_all_streams(owned_too=False)
log.misc(f"Successfully removed all agents")
|
add_behav_wildcard
add_behav_wildcard(wildcard_from: str, wildcard_to: object)
Register a wildcard substitution for the agent's behavioral state machine.
Wildcard mappings stored in behav_wildcards are used when the agent joins a
world and the world's HSM JSON contains placeholder strings that need to be
resolved to live Python objects (for example, callable actions defined in the
agent). Calling this method before joining a world ensures that the HSM can
reference the correct targets by name.
Parameters:
| Name |
Type |
Description |
Default |
wildcard_from
|
str
|
The placeholder string as it appears in the HSM JSON.
|
required
|
wildcard_to
|
object
|
The Python object (typically a callable or a constant) that
replaces the placeholder at load time.
|
required
|
Source code in unaiverse/agent_basics.py
| def add_behav_wildcard(self, wildcard_from: str, wildcard_to: object):
"""Register a wildcard substitution for the agent's behavioral state machine.
Wildcard mappings stored in ``behav_wildcards`` are used when the agent joins a
world and the world's HSM JSON contains placeholder strings that need to be
resolved to live Python objects (for example, callable actions defined in the
agent). Calling this method before joining a world ensures that the HSM can
reference the correct targets by name.
Args:
wildcard_from: The placeholder string as it appears in the HSM JSON.
wildcard_to: The Python object (typically a callable or a constant) that
replaces the placeholder at load time.
"""
self.behav_wildcards[wildcard_from] = wildcard_to
|
add_stream
add_stream(stream: Stream, owned: bool = True, net_hash: str | None = None) -> dict[str, Stream]
Add a stream to the agent's catalogue of known streams.
The stream is inserted into known_streams (indexed by net_hash) and into
known_streams_by_user_hash (indexed by "peer_id:name"). If a stream with
the same user hash already exists under a different net hash, a GenException is
raised to prevent naming clashes. Existing entries with the same net hash and stream
name are silently overwritten, which allows profile updates to propagate correctly.
When owned is True, the stream is additionally placed in the appropriate
specialised dictionaries:
- If the stream matches an entry in
proc_outputs (by name and group), it goes
into proc_streams / proc_streams_by_user_hash and the relevant public or
private variant (_proc_streams_by_user_hash_pub / _prv).
- Else if it matches
proc_inputs, it goes into proc_in_streams /
proc_in_streams_by_user_hash and the corresponding pub/prv variant.
- Otherwise it is treated as an environmental stream and goes into
env_streams /
env_streams_by_user_hash and the relevant pub/prv variant.
- In all owned cases, it is also added to
owned_streams /
owned_streams_by_user_hash.
If merge_flat_stream_labels is True, merge_flat_data_stream_props is
called after insertion to keep the shared label superset up to date.
Parameters:
| Name |
Type |
Description |
Default |
stream
|
Stream
|
The Stream object to add.
|
required
|
owned
|
bool
|
If True, the stream is treated as owned by this agent and inserted
into the specialised owned-stream dictionaries. Defaults to True.
|
True
|
net_hash
|
str | None
|
An explicit network hash to use. If None, the hash is computed
from the stream's properties and the current peer ID. Defaults to None.
|
None
|
Returns:
| Type |
Description |
dict[str, Stream]
|
A dictionary mapping stream names to Stream objects for the net-hash group
|
dict[str, Stream]
|
that the stream was added to. Returns an empty dict if the stream could not be
|
dict[str, Stream]
|
added due to a property mismatch (e.g., public/pubsub conflict within the
|
dict[str, Stream]
|
|
Raises:
| Type |
Description |
GenException
|
If a naming clash is detected: the user hash is already known
under a different net hash.
|
Examples:
>>> from unaiverse.streams.streams import Stream
>>> stream = Stream(props=my_data_props)
>>> group = my_agent.add_stream(stream, owned=True)
Source code in unaiverse/agent_basics.py
| def add_stream(self, stream: Stream, owned: bool = True, net_hash: str | None = None) -> dict[str, Stream]:
"""Add a stream to the agent's catalogue of known streams.
The stream is inserted into ``known_streams`` (indexed by ``net_hash``) and into
``known_streams_by_user_hash`` (indexed by ``"peer_id:name"``). If a stream with
the same user hash already exists under a different net hash, a ``GenException`` is
raised to prevent naming clashes. Existing entries with the same net hash and stream
name are silently overwritten, which allows profile updates to propagate correctly.
When ``owned`` is ``True``, the stream is additionally placed in the appropriate
specialised dictionaries:
- If the stream matches an entry in ``proc_outputs`` (by name and group), it goes
into ``proc_streams`` / ``proc_streams_by_user_hash`` and the relevant public or
private variant (``_proc_streams_by_user_hash_pub`` / ``_prv``).
- Else if it matches ``proc_inputs``, it goes into ``proc_in_streams`` /
``proc_in_streams_by_user_hash`` and the corresponding pub/prv variant.
- Otherwise it is treated as an environmental stream and goes into ``env_streams`` /
``env_streams_by_user_hash`` and the relevant pub/prv variant.
- In all owned cases, it is also added to ``owned_streams`` /
``owned_streams_by_user_hash``.
If ``merge_flat_stream_labels`` is ``True``, ``merge_flat_data_stream_props`` is
called after insertion to keep the shared label superset up to date.
Args:
stream: The ``Stream`` object to add.
owned: If ``True``, the stream is treated as owned by this agent and inserted
into the specialised owned-stream dictionaries. Defaults to ``True``.
net_hash: An explicit network hash to use. If ``None``, the hash is computed
from the stream's properties and the current peer ID. Defaults to ``None``.
Returns:
A dictionary mapping stream names to ``Stream`` objects for the net-hash group
that the stream was added to. Returns an empty dict if the stream could not be
added due to a property mismatch (e.g., public/pubsub conflict within the
group).
Raises:
GenException: If a naming clash is detected: the user hash is already known
under a different net hash.
Examples:
>>> from unaiverse.streams.streams import Stream
>>> stream = Stream(props=my_data_props)
>>> group = my_agent.add_stream(stream, owned=True)
"""
# Stream net hash
if net_hash is None:
public_peer_id, private_peer_id = self.get_peer_ids()
peer_id = public_peer_id if stream.is_public() else private_peer_id
net_hash = stream.net_hash(peer_id)
user_hash = stream.user_hash(stream.peer_id_from_net_hash(net_hash))
# It is fine to overwrite an existing stream, but the net hash cannot change
if net_hash not in self.known_streams and user_hash in self.known_streams_by_user_hash:
raise GenException(f"Naming clash in streams. "
f"Net hash {net_hash} is not known (stream name: {stream.props.get_name()}), "
f"but user hash {user_hash} is already existing. "
f"Are you using the same name for two different streams? (not allowed)")
# Adding the new stream
if net_hash not in self.known_streams:
self.known_streams[net_hash] = {}
else:
for _stream in self.known_streams[net_hash].values():
public = _stream.get_props().is_public()
pubsub = _stream.get_props().is_pubsub()
if public and not stream.get_props().is_public():
log.error(f"Cannot add a stream to a group with different properties (public): "
f"hash: {net_hash}, name: {stream.get_props().get_name()}, "
f"public: {stream.get_props().is_public()}")
return {}
if pubsub and not stream.get_props().is_pubsub():
log.error(f"Cannot add a stream to a group with different properties (pubsub): "
f"hash: {net_hash}, name: {stream.get_props().get_name()}, "
f"public: {stream.get_props().is_public()}")
return {}
break
self.known_streams[net_hash][stream.get_props().get_name()] = stream # Overwrite if existing
self.known_streams_by_user_hash[user_hash] = stream # Overwrite if existing
if owned:
is_proc_outputs_stream = False
is_proc_inputs_stream = False
# Adding an 'owned' processor output stream (i.e., the stream coming from OUR OWN processor)
if self.proc_outputs is not None:
proc_outputs_name_and_group = set()
for props in self.proc_outputs:
proc_outputs_name_and_group.add((props.get_name(), props.get_group()))
if (stream.get_props().get_name(), stream.get_props().get_group()) in proc_outputs_name_and_group:
if net_hash not in self.proc_streams:
self.proc_streams[net_hash] = {}
self.proc_streams[net_hash][stream.get_props().get_name()] = stream
self.proc_streams_by_user_hash[user_hash] = stream
if stream.is_public():
self._proc_streams_by_user_hash_pub[user_hash] = stream
else:
self._proc_streams_by_user_hash_prv[user_hash] = stream
is_proc_outputs_stream = True
# Adding an 'owned' processor input stream (i.e., the stream entering OUR OWN processor)
if self.proc_inputs is not None and not is_proc_outputs_stream:
proc_inputs_name_and_group = set()
for props in self.proc_inputs:
proc_inputs_name_and_group.add((props.get_name(), props.get_group()))
if (stream.get_props().get_name(), stream.get_props().get_group()) in proc_inputs_name_and_group:
if net_hash not in self.proc_in_streams:
self.proc_in_streams[net_hash] = {}
self.proc_in_streams[net_hash][stream.get_props().get_name()] = stream
self.proc_in_streams_by_user_hash[user_hash] = stream
if stream.is_public():
self._proc_in_streams_by_user_hash_pub[user_hash] = stream
else:
self._proc_in_streams_by_user_hash_prv[user_hash] = stream
is_proc_inputs_stream = True
if net_hash not in self.owned_streams:
self.owned_streams[net_hash] = {}
self.owned_streams[net_hash][stream.get_props().get_name()] = stream
self.owned_streams_by_user_hash[user_hash] = stream
if not is_proc_outputs_stream and not is_proc_inputs_stream:
if net_hash not in self.env_streams:
self.env_streams[net_hash] = {}
self.env_streams[net_hash][stream.get_props().get_name()] = stream
self.env_streams_by_user_hash[user_hash] = stream
if stream.is_public():
self._env_streams_by_user_hash_pub[user_hash] = stream
else:
self._env_streams_by_user_hash_prv[user_hash] = stream
# If needed, merging descriptor labels (attribute labels) and sharing them with all streams
if self.merge_flat_stream_labels:
self.merge_flat_data_stream_props()
return self.known_streams[net_hash]
|
add_streams
add_streams(streams: list[Stream], owned: bool = True, net_hash: str | None = None) -> list[dict[str, Stream]]
Add multiple streams to the agent's catalogue in a single call.
Iterates over streams and delegates each element to add_stream. If any
individual add_stream call returns an empty dictionary (indicating a failure
such as a property mismatch), the method stops immediately and returns an empty
list, leaving already-added streams in place.
Parameters:
| Name |
Type |
Description |
Default |
streams
|
list[Stream]
|
A list of Stream objects to add.
|
required
|
owned
|
bool
|
If True, each stream is treated as owned by this agent. Defaults to
True.
|
True
|
net_hash
|
str | None
|
An explicit network hash applied to every stream. If None, the
hash is computed individually for each stream by add_stream. Defaults
to None.
|
None
|
Returns:
| Type |
Description |
list[dict[str, Stream]]
|
A list of stream-group dictionaries, one per stream in streams, as returned
|
list[dict[str, Stream]]
|
by add_stream. Returns an empty list if any stream could not be added.
|
Raises:
| Type |
Description |
GenException
|
Propagated from add_stream if a naming clash is detected for
any stream.
|
Source code in unaiverse/agent_basics.py
| def add_streams(self, streams: list[Stream], owned: bool = True, net_hash: str | None = None) \
-> list[dict[str, Stream]]:
"""Add multiple streams to the agent's catalogue in a single call.
Iterates over ``streams`` and delegates each element to ``add_stream``. If any
individual ``add_stream`` call returns an empty dictionary (indicating a failure
such as a property mismatch), the method stops immediately and returns an empty
list, leaving already-added streams in place.
Args:
streams: A list of ``Stream`` objects to add.
owned: If ``True``, each stream is treated as owned by this agent. Defaults to
``True``.
net_hash: An explicit network hash applied to every stream. If ``None``, the
hash is computed individually for each stream by ``add_stream``. Defaults
to ``None``.
Returns:
A list of stream-group dictionaries, one per stream in ``streams``, as returned
by ``add_stream``. Returns an empty list if any stream could not be added.
Raises:
GenException: Propagated from ``add_stream`` if a naming clash is detected for
any stream.
"""
# Adding the new stream
ret = []
for stream in streams:
stream_dict = self.add_stream(stream, owned, net_hash)
if len(stream_dict) == 0:
return []
ret.append(stream_dict)
return ret
|
remove_streams
async
remove_streams(peer_id: str, name: str | None = None, owned_too: bool = False)
Remove streams owned by a given peer from the agent's catalogue (async).
Scans known_streams for all net hashes whose embedded peer ID matches
peer_id. For each matching entry, if owned_too is False, any stream
that belongs to owned_streams is skipped (preserving the agent's own streams).
Streams that are removed are also cleaned up from owned_streams,
env_streams, proc_streams, proc_in_streams, and the corresponding
*_by_user_hash dictionaries. PubSub channels for removed pubsub streams are
unsubscribed via _node_conn.unsubscribe.
Parameters:
| Name |
Type |
Description |
Default |
peer_id
|
str
|
The peer ID whose associated streams are to be removed.
|
required
|
name
|
str | None
|
The specific stream name to remove. If None, all streams belonging
to peer_id are removed. Defaults to None.
|
None
|
owned_too
|
bool
|
If True, also remove streams that are owned by this agent
(environmental and processor streams). Defaults to False.
|
False
|
Note
A failed PubSub unsubscription is logged but does not interrupt the removal
process.
Source code in unaiverse/agent_basics.py
| async def remove_streams(self, peer_id: str, name: str | None = None, owned_too: bool = False):
"""Remove streams owned by a given peer from the agent's catalogue (async).
Scans ``known_streams`` for all net hashes whose embedded peer ID matches
``peer_id``. For each matching entry, if ``owned_too`` is ``False``, any stream
that belongs to ``owned_streams`` is skipped (preserving the agent's own streams).
Streams that are removed are also cleaned up from ``owned_streams``,
``env_streams``, ``proc_streams``, ``proc_in_streams``, and the corresponding
``*_by_user_hash`` dictionaries. PubSub channels for removed pubsub streams are
unsubscribed via ``_node_conn.unsubscribe``.
Args:
peer_id: The peer ID whose associated streams are to be removed.
name: The specific stream name to remove. If ``None``, all streams belonging
to ``peer_id`` are removed. Defaults to ``None``.
owned_too: If ``True``, also remove streams that are owned by this agent
(environmental and processor streams). Defaults to ``False``.
Note:
A failed PubSub unsubscription is logged but does not interrupt the removal
process.
"""
# Identifying what to remove
to_remove = []
for net_hash in self.known_streams.keys():
if DataProps.peer_id_from_net_hash(net_hash) == peer_id:
for _name, _stream in self.known_streams[net_hash].items():
if name is None or name == _name:
to_remove.append((net_hash, _name))
# Removing
for (net_hash, name) in to_remove:
if not owned_too and net_hash in self.owned_streams:
continue
group = DataProps.name_or_group_from_net_hash(net_hash)
user_hash = DataProps.user_hash_from_net_hash(net_hash, name if name is not None else group)
del self.known_streams[net_hash][name]
if len(self.known_streams[net_hash]) == 0:
del self.known_streams[net_hash]
if user_hash in self.known_streams_by_user_hash:
del self.known_streams_by_user_hash[user_hash]
# Unsubscribing to pubsub
if DataProps.is_pubsub_from_net_hash(net_hash):
if peer_id != "<private_peer_id>" and peer_id != "<public_peer_id>":
if not (await self._node_conn.unsubscribe(peer_id, channel=net_hash)):
# log.error(f"Failed in unsubscribing from pubsub, peer_id: {peer_id}, channel: {net_hash}")
pass
else:
log.misc(f"Successfully unsubscribed from pubsub, peer_id: {peer_id}, channel: {net_hash}")
# Removing all the owned streams (environment and processor streams are of course "owned")
if net_hash in self.owned_streams:
if name in self.owned_streams[net_hash]:
del self.owned_streams[net_hash][name]
if len(self.owned_streams[net_hash]) == 0:
del self.owned_streams[net_hash]
if user_hash in self.owned_streams_by_user_hash:
del self.owned_streams_by_user_hash[user_hash]
if net_hash in self.env_streams:
if name in self.env_streams[net_hash]:
del self.env_streams[net_hash][name]
if len(self.env_streams[net_hash]) == 0:
del self.env_streams[net_hash]
if user_hash in self.env_streams_by_user_hash:
del self.env_streams_by_user_hash[user_hash]
if net_hash in self.proc_streams:
if name in self.proc_streams[net_hash]:
del self.proc_streams[net_hash][name]
if len(self.proc_streams[net_hash]) == 0:
del self.proc_streams[net_hash]
if user_hash in self.proc_streams_by_user_hash:
del self.proc_streams_by_user_hash[user_hash]
log.misc(f"Successfully removed known stream with network hash {net_hash}, stream name: {name}")
|
remove_all_streams
remove_all_streams(owned_too: bool = False)
Remove all non-owned streams from the agent's catalogue.
When owned_too is False (the default), known_streams and
known_streams_by_user_hash are rebuilt to contain only entries that are also
present in owned_streams, effectively discarding all peer-originated streams
while preserving the agent's own streams.
When owned_too is True, every stream dictionary is cleared entirely,
including owned_streams, env_streams, proc_streams,
proc_in_streams, and all *_by_user_hash variants.
This method does not unsubscribe from any PubSub channels. Use remove_streams
with individual peer IDs for graceful PubSub teardown.
Parameters:
| Name |
Type |
Description |
Default |
owned_too
|
bool
|
If True, also remove the agent's own streams (environmental and
processor streams). Defaults to False.
|
False
|
Source code in unaiverse/agent_basics.py
| def remove_all_streams(self, owned_too: bool = False):
"""Remove all non-owned streams from the agent's catalogue.
When ``owned_too`` is ``False`` (the default), ``known_streams`` and
``known_streams_by_user_hash`` are rebuilt to contain only entries that are also
present in ``owned_streams``, effectively discarding all peer-originated streams
while preserving the agent's own streams.
When ``owned_too`` is ``True``, every stream dictionary is cleared entirely,
including ``owned_streams``, ``env_streams``, ``proc_streams``,
``proc_in_streams``, and all ``*_by_user_hash`` variants.
This method does not unsubscribe from any PubSub channels. Use ``remove_streams``
with individual peer IDs for graceful PubSub teardown.
Args:
owned_too: If ``True``, also remove the agent's own streams (environmental and
processor streams). Defaults to ``False``.
"""
if not owned_too:
self.known_streams = {k: v for k, v in self.owned_streams.items()}
self.known_streams_by_user_hash = {k: v for k, v in self.known_streams_by_user_hash.items()}
else:
self.known_streams = {}
self.owned_streams = {}
self.env_streams = {}
self.proc_streams = {}
self.proc_in_streams = {}
self.known_streams_by_user_hash = {}
self.owned_streams_by_user_hash = {}
self.env_streams_by_user_hash = {}
self.proc_streams_by_user_hash = {}
self.proc_in_streams_by_user_hash = {}
log.misc(f"Successfully removed all streams!")
|
get_stream
get_stream(name_or_group: str, peer_id: str | None = None, data_type: str | None = None) -> Stream | None
Return a single stream identified by name or group, optionally filtered by data type.
The lookup first tries the known_streams_by_user_hash dictionary using a
user hash built from peer_id and name_or_group. If that misses, it falls
back to get_streams which searches by group name. When multiple streams are
found in the group, a data_type filter may be used to disambiguate; if no
filter is provided and more than one stream exists in the group, None is
returned (ambiguous case).
Parameters:
| Name |
Type |
Description |
Default |
name_or_group
|
str
|
The stream name or group identifier to look up.
|
required
|
peer_id
|
str | None
|
The peer ID of the stream owner. If None, the agent's own current
peer ID (from get_peer_id) is used. Defaults to None.
|
None
|
data_type
|
str | None
|
An optional data type string (e.g., "image", "tensor") used
to select a specific stream when the group contains multiple streams.
Defaults to None.
|
None
|
Returns:
| Type |
Description |
Stream | None
|
The matching Stream object, or None if the stream is not found, if
|
Stream | None
|
multiple streams match and data_type is not specified, or if no stream
|
Stream | None
|
matches the given data_type.
|
Examples:
>>> stream = my_agent.get_stream("processor")
>>> stream = my_agent.get_stream("sensor", peer_id=peer_id, data_type="image")
Source code in unaiverse/agent_basics.py
| def get_stream(self, name_or_group: str, peer_id: str | None = None, data_type: str | None = None) -> Stream | None:
"""Return a single stream identified by name or group, optionally filtered by data type.
The lookup first tries the ``known_streams_by_user_hash`` dictionary using a
user hash built from ``peer_id`` and ``name_or_group``. If that misses, it falls
back to ``get_streams`` which searches by group name. When multiple streams are
found in the group, a ``data_type`` filter may be used to disambiguate; if no
filter is provided and more than one stream exists in the group, ``None`` is
returned (ambiguous case).
Args:
name_or_group: The stream name or group identifier to look up.
peer_id: The peer ID of the stream owner. If ``None``, the agent's own current
peer ID (from ``get_peer_id``) is used. Defaults to ``None``.
data_type: An optional data type string (e.g., ``"image"``, ``"tensor"``) used
to select a specific stream when the group contains multiple streams.
Defaults to ``None``.
Returns:
The matching ``Stream`` object, or ``None`` if the stream is not found, if
multiple streams match and ``data_type`` is not specified, or if no stream
matches the given ``data_type``.
Examples:
>>> stream = my_agent.get_stream("processor")
>>> stream = my_agent.get_stream("sensor", peer_id=peer_id, data_type="image")
"""
if peer_id is None:
peer_id = self.get_peer_id()
user_hash = DataProps.build_user_hash(peer_id, name_or_group)
if user_hash in self.known_streams_by_user_hash:
return self.known_streams_by_user_hash[user_hash]
else:
streams = self.get_streams(group_name=name_or_group, peer_id=peer_id)
if streams is not None and len(streams) > 0:
if data_type is not None:
for stream in streams:
if stream.props.data_type == data_type:
return stream
return None
elif len(streams) == 1:
return streams[0]
else:
return None # Ambiguous case
else:
return None
|
get_streams
get_streams(group_name: str, peer_id: str | None = None) -> list[Stream] | None
Source code in unaiverse/agent_basics.py
| def get_streams(self, group_name: str, peer_id: str | None = None) -> list[Stream] | None:
if peer_id is None:
peer_id = self.get_peer_id()
net_hash_ps = DataProps.build_net_hash(peer_id, pubsub=True, name_or_group=group_name)
if net_hash_ps in self.known_streams:
return list(self.known_streams[net_hash_ps].values())
else:
net_hash_dm = DataProps.build_net_hash(peer_id, pubsub=False, name_or_group=group_name)
if net_hash_dm in self.known_streams:
return list(self.known_streams[net_hash_dm].values())
else:
return None
|
find_streams
find_streams(peer_id: str, name_or_group: str | None = None, discard_owned: bool = False) -> dict[str, dict[str, Stream]]
Find streams associated with a given peer ID and optionally by name or group.
Parameters:
| Name |
Type |
Description |
Default |
peer_id
|
str
|
The peer ID of the (owner of the) streams to find.
|
required
|
name_or_group
|
str | None
|
Optional name or group of the streams to find.
|
None
|
discard_owned
|
bool
|
If True, the owned streams are not searched (default False).
|
False
|
Returns:
| Type |
Description |
dict[str, dict[str, Stream]]
|
A dictionary where keys are network hashes and values are dictionaries of streams
|
dict[str, dict[str, Stream]]
|
(stream name to Stream object) matching the criteria.
|
Source code in unaiverse/agent_basics.py
| def find_streams(self, peer_id: str, name_or_group: str | None = None, discard_owned: bool = False) \
-> dict[str, dict[str, Stream]]:
"""Find streams associated with a given peer ID and optionally by name or group.
Args:
peer_id: The peer ID of the (owner of the) streams to find.
name_or_group: Optional name or group of the streams to find.
discard_owned: If True, the owned streams are not searched (default False).
Returns:
A dictionary where keys are network hashes and values are dictionaries of streams
(stream name to Stream object) matching the criteria.
"""
ret = {}
for net_hash, streams_dict in self.known_streams.items():
if discard_owned and net_hash in self.owned_streams:
continue
_peer_id = Stream.peer_id_from_net_hash(net_hash)
_name_or_group = Stream.name_or_group_from_net_hash(net_hash)
if peer_id == _peer_id:
if name_or_group is None or name_or_group == _name_or_group:
ret[net_hash] = streams_dict
else:
for _name, _stream in streams_dict.items():
if name_or_group == _name:
if net_hash not in ret:
ret[net_hash] = {}
ret[net_hash][name_or_group] = _stream
return ret
|
get_last_streamed_data
get_last_streamed_data(agent_name: str)
Get the last data samples from the processor streams of a given agent.
Parameters:
| Name |
Type |
Description |
Default |
agent_name
|
str
|
|
required
|
Returns:
| Type |
Description |
|
|
A list of data samples taken from all the known streams associated to the provided agent.
|
Source code in unaiverse/agent_basics.py
| def get_last_streamed_data(self, agent_name: str):
"""Get the last data samples from the processor streams of a given agent.
Args:
agent_name: The name of the agent.
Returns:
A list of data samples taken from all the known streams associated to the provided agent.
"""
data_list = []
for peer_id, profile in self.all_agents.items():
if profile.get_static_profile()['node_name'] == agent_name:
net_hash_to_stream_dict = self.find_streams(peer_id, name_or_group="processor")
for net_hash, streams_dict in net_hash_to_stream_dict.items():
for stream_name, stream_obj in streams_dict.items():
data_list.append(stream_obj.get())
return data_list
|
merge_flat_data_stream_props
merge_flat_data_stream_props()
Merge the labels of the descriptor components, across all streams, sharing them.
Source code in unaiverse/agent_basics.py
| def merge_flat_data_stream_props(self):
"""Merge the labels of the descriptor components, across all streams, sharing them."""
# Set of pivot labels
superset_labels = []
# Checking the whole list of streams, but considering only the ones with generic data, flat, and labels
considered_streams = []
for stream_dict in self.owned_streams.values():
for stream in stream_dict.values():
# Skipping not flat, or not generic, or unlabeled streams
if not stream.props.is_flat_tensor_with_labels():
continue
# Saving list of considered streams
considered_streams.append(stream)
# Adding the current stream-labels to the pivot labels
for label in stream.props.tensor_labels:
if label not in superset_labels:
superset_labels.append(label)
# Telling each stream in which positions their labels fall, given the pivot labels
for stream in considered_streams:
# In the case of BufferedStream, we have to update the data buffer by clearing previously applied
# adaptation first (I know it looks similar to what is done below, but we must clear first!)
if isinstance(stream, BufferedStream):
assert stream.props is not None
for uuid in stream.data_buffer_by_uuid.keys():
for data in stream.data_buffer_by_uuid[uuid]:
data.data = stream.props.clear_label_adaptation(data.data)
# Updating labels
assert stream.props is not None and isinstance(stream.props.tensor_labels, TensorLabels)
stream.props.tensor_labels.interleave_with(superset_labels)
# In the case of BufferedStream, we have to update the data buffer with the new labels
if isinstance(stream, BufferedStream):
assert stream.props is not None
for uuid in stream.data_buffer_by_uuid.keys():
for data in stream.data_buffer_by_uuid[uuid]:
data.data = stream.props.adapt_tensor_to_tensor_labels(data.data)
|
user_stream_hash_to_net_hash
user_stream_hash_to_net_hash(user_stream_hash: str) -> str | None
Converts a user-defined stream hash (peer_id:name_or_group) to a network hash
(peer_id:
... or peer_id:
name_or_group) by searching the known hashes in the known streams.
Parameters:
| Name |
Type |
Description |
Default |
user_stream_hash
|
str
|
The user-defined stream hash string (peer_id:name).
|
required
|
Returns:
| Type |
Description |
str | None
|
The corresponding network hash string (peer_id: ... or peer_id: name_or_group), or None if not found.
|
Source code in unaiverse/agent_basics.py
| def user_stream_hash_to_net_hash(self, user_stream_hash: str) -> str | None:
"""Converts a user-defined stream hash (peer_id:name_or_group) to a network hash
(peer_id::dm:... or peer_id::ps:name_or_group) by searching the known hashes in the known streams.
Args:
user_stream_hash: The user-defined stream hash string (peer_id:name).
Returns:
The corresponding network hash string (peer_id::dm:... or peer_id::ps:name_or_group), or None if not found.
"""
if user_stream_hash is None:
return None
if "::" in user_stream_hash:
return user_stream_hash # It was already fine
components = user_stream_hash.split(":")
peer_id = components[0]
name = components[-1]
for net_hash, stream_dict in self.known_streams.items():
_peer_id = Stream.peer_id_from_net_hash(net_hash)
_name_or_group = Stream.name_or_group_from_net_hash(net_hash)
if _peer_id == peer_id and _name_or_group == name:
return net_hash
for _name in stream_dict.keys():
if _peer_id == peer_id and _name == name:
return net_hash
return None
|
create_proc_input_streams(buffered: bool = False)
Creates the processor input streams based on the proc_inputs defined for the agent.
Parameters:
| Name |
Type |
Description |
Default |
buffered
|
bool
|
If True, the created streams will be of type BufferedStream.
|
False
|
Source code in unaiverse/agent_basics.py
| def create_proc_input_streams(self, buffered: bool = False):
"""Creates the processor input streams based on the `proc_inputs` defined for the agent.
Args:
buffered: If True, the created streams will be of type BufferedStream.
"""
# Adding input streams (grouped together), passing the node clock
if self.proc_inputs is not None:
for i, procs in enumerate(self.proc_inputs):
procs.set_group("processor_in") # Adding default group info, forced, do not change this!
# Creating the streams
for props in procs.props:
if not buffered:
stream = Stream(props=props.clone())
else:
stream = BufferedStream(props=props.clone())
self.add_stream(stream, owned=True)
public_peer_id, private_peer_id = self.get_peer_ids()
peer_id = public_peer_id if stream.is_public() else private_peer_id
net_hash = stream.net_hash(peer_id)
if stream.is_public():
self.proc_in_net_hash['public'] = net_hash
else:
self.proc_in_net_hash['private'] = net_hash
# forcing the input stream to be compatible with proc inputs
self.compat_in_streams[i].add((net_hash, props.get_name()))
|
create_proc_output_streams
create_proc_output_streams(buffered: bool = False)
Creates the processor output streams based on the proc_outputs defined for the agent.
Parameters:
| Name |
Type |
Description |
Default |
buffered
|
bool
|
If True, the created streams will be of type BufferedStream.
|
False
|
Source code in unaiverse/agent_basics.py
| def create_proc_output_streams(self, buffered: bool = False):
"""Creates the processor output streams based on the `proc_outputs` defined for the agent.
Args:
buffered: If True, the created streams will be of type BufferedStream.
"""
# Adding generated streams (grouped together), passing the node clock
if self.proc_outputs is not None:
for i, procs in enumerate(self.proc_outputs):
procs.set_group("processor") # Adding default group info, forced, do not change this!
# Creating the streams
for props in procs.props:
if not buffered:
stream = Stream(props=props.clone())
else:
stream = BufferedStream(props=props.clone())
self.add_stream(stream, owned=True)
public_peer_id, private_peer_id = self.get_peer_ids()
peer_id = public_peer_id if stream.is_public() else private_peer_id
net_hash = stream.net_hash(peer_id)
if stream.is_public():
self.proc_net_hash['public'] = net_hash
else:
self.proc_net_hash['private'] = net_hash
|
add_compatible_streams
async
add_compatible_streams(peer_id: str, streams_in_profile: list[dict], buffered: bool = False, add_all: bool = False, public: bool = True, skip_pubsub: bool = False) -> bool
Add to the list of processor-compatible-streams those streams provided as arguments that are actually
found to be compatible with the processor (if they are pubsub, it also subscribes to them) (async).
Parameters:
| Name |
Type |
Description |
Default |
peer_id
|
str
|
The peer ID of the agent providing the streams.
|
required
|
streams_in_profile
|
list[dict]
|
A list of dictionaries (DataProps to dict) representing the streams in peer's profile.
|
required
|
buffered
|
bool
|
If True, the added streams will be of type BufferedStream.
|
False
|
add_all
|
bool
|
If True, all streams from the profile are added, regardless of processor compatibility.
|
False
|
public
|
bool
|
Consider public streams only (or private streams only).
|
True
|
skip_pubsub
|
bool
|
|
False
|
Returns:
| Type |
Description |
bool
|
True if compatible streams were successfully added and subscribed to, False otherwise.
|
Source code in unaiverse/agent_basics.py
| async def add_compatible_streams(self, peer_id: str,
streams_in_profile: list[dict], buffered: bool = False,
add_all: bool = False, public: bool = True, skip_pubsub: bool = False) -> bool:
"""Add to the list of processor-compatible-streams those streams provided as arguments that are actually
found to be compatible with the processor (if they are pubsub, it also subscribes to them) (async).
Args:
peer_id: The peer ID of the agent providing the streams.
streams_in_profile: A list of dictionaries (DataProps to dict) representing the streams in peer's profile.
buffered: If True, the added streams will be of type BufferedStream.
add_all: If True, all streams from the profile are added, regardless of processor compatibility.
public: Consider public streams only (or private streams only).
skip_pubsub: Skip pubsub streams.
Returns:
True if compatible streams were successfully added and subscribed to, False otherwise.
"""
added_streams = []
if add_all:
# This is the case in which we add all streams, storing all pairs (DataProps, net_hash)
for j in streams_in_profile:
jj = DataProps.from_dict(j)
if (public == jj.is_public()
and (not jj.is_pubsub() or not skip_pubsub)):
net_hash = jj.net_hash(peer_id)
added_streams.append((jj, net_hash))
else:
# This is the case in which a processor is present, hence storing pairs (DataProps, net_hash)
# of the found compatible streams
added_net_hash_to_prop_name = {}
# Find streams that are compatible with our 'proc_inputs'
assert self.proc_inputs is not None
for i, in_proc in enumerate(self.proc_inputs):
for j in streams_in_profile:
jj = DataProps.from_dict(j)
if (public == jj.is_public() and in_proc.is_compatible(jj)
and (not jj.is_pubsub() or not skip_pubsub)):
net_hash = jj.net_hash(peer_id)
if net_hash not in added_net_hash_to_prop_name:
added_net_hash_to_prop_name[net_hash] = set()
if jj.name not in added_net_hash_to_prop_name[net_hash]:
added_net_hash_to_prop_name[net_hash].add(jj.name)
added_streams.append((jj, net_hash))
# Saving the position in the proc_input list
self.compat_in_streams[i].add((net_hash, jj.get_name()))
# Find streams that are compatible with our 'proc_outputs'
has_cross_entropy = []
assert self.proc_outputs is not None
if 'losses' in self.proc_opts:
for i in range(0, len(self.proc_outputs)):
if self.proc_opts['losses'][i] is not None and \
(self.proc_opts['losses'][i] == torch.nn.functional.cross_entropy or
isinstance(self.proc_opts['losses'][i], torch.nn.CrossEntropyLoss) or
"cross_entropy" in self.proc_opts['losses'][i].__name__):
has_cross_entropy.append(True)
else:
has_cross_entropy.append(False)
for i, out_proc in enumerate(self.proc_outputs):
out_proc: StreamType
for j in streams_in_profile:
jj = DataProps.from_dict(j)
if ((public == jj.is_public() and
(out_proc.is_compatible(jj) or (jj.is_tensor_target_id() and has_cross_entropy[i])))
and (not jj.is_pubsub() or not skip_pubsub)):
net_hash = jj.net_hash(peer_id)
if net_hash not in added_net_hash_to_prop_name:
added_net_hash_to_prop_name[net_hash] = set()
if jj.name not in added_net_hash_to_prop_name[net_hash]:
added_net_hash_to_prop_name[net_hash].add(jj.name)
added_streams.append((jj, net_hash))
# Saving the position in the proc_output list
self.compat_out_streams[i].add((net_hash, jj.get_name()))
net_hashes_to_subscribe = set()
# For each compatible stream found...
for (props, net_hash) in added_streams:
# Check if it is a new stream or a data stream to add to an already known stream
already_known_stream = net_hash in self.known_streams
# Creating the stream object
if not buffered:
stream = Stream(props=props.clone())
else:
stream = BufferedStream(props=props.clone())
# Add the data stream to the list of known streams
# if the stream already exists it will be overwritten (which is fine in case of changes)
self.add_stream(stream, owned=False, net_hash=net_hash)
# If the stream is over PubSub, and we are not already subscribed, we will subscribe
if props.is_pubsub() and not already_known_stream:
net_hashes_to_subscribe.add(net_hash)
# Opening PubSubs
for net_hash in net_hashes_to_subscribe:
log.misc(f"Opening channel for the not-owned but processor-compatible stream {net_hash}")
if not (await self._node_conn.subscribe(peer_id, channel=net_hash)):
log.error(f"Error subscribing to {net_hash}")
return False
return True
|
subscribe_to_pubsub_owned_streams
async
subscribe_to_pubsub_owned_streams() -> bool
Subscribes to all owned streams that are marked as PubSub (async).
Returns:
| Type |
Description |
bool
|
True if all subscriptions were successful, False otherwise.
|
Source code in unaiverse/agent_basics.py
| async def subscribe_to_pubsub_owned_streams(self) -> bool:
"""Subscribes to all owned streams that are marked as PubSub (async).
Returns:
True if all subscriptions were successful, False otherwise.
"""
# Opening channels for all the (groups of) owned streams (generated and not)
for net_hash in self.owned_streams.keys():
is_pubsub = Stream.is_pubsub_from_net_hash(net_hash)
if is_pubsub:
log.misc(f"Opening channel for the owned stream {net_hash}")
peer_id = Stream.peer_id_from_net_hash(net_hash) # Guessing peer ID from the net hash
if not (await self._node_conn.subscribe(peer_id, channel=net_hash)):
log.error(f"Cannot open a channel for owned stream hash {net_hash}")
return False
return True
|
update_streams_in_profile
update_streams_in_profile()
Updates the agent's profile with information about its owned (environmental and processor) streams.
Source code in unaiverse/agent_basics.py
| def update_streams_in_profile(self):
"""Updates the agent's profile with information about its owned (environmental and processor) streams."""
# Filling the information about the streams that can be generated and handled
dynamic_profile = self._node_profile.get_dynamic_profile()
if (hasattr(self, 'proc_outputs') and hasattr(self, 'proc_inputs') and
self.proc_outputs is not None and self.proc_inputs is not None):
dynamic_profile['proc_outputs'] = \
[dct for d in self.proc_outputs for dct in d.to_list_of_dicts()] # List of dict of DataProp
dynamic_profile['proc_inputs'] = \
[dct for d in self.proc_inputs for dct in d.to_list_of_dicts()] # List of dict of DataProp
# Adding the list of locally-created ("environmental") streams to the profile
list_of_props = []
public_peer_id, private_peer_id = self.get_peer_ids()
for net_hash, streams_dict in self.owned_streams.items():
if net_hash not in self.proc_streams.keys() and net_hash not in self.proc_in_streams.keys():
if (DataProps.peer_id_from_net_hash(net_hash) == public_peer_id or
DataProps.peer_id_from_net_hash(net_hash) == private_peer_id):
for stream in streams_dict.values():
list_of_props.append(stream.get_props().to_dict()) # DataProp
if len(list_of_props) > 0:
dynamic_profile['streams'] = list_of_props
|
send_profile_to_all
async
Sends the agent's profile to all known agents (async).
Source code in unaiverse/agent_basics.py
| async def send_profile_to_all(self):
"""Sends the agent's profile to all known agents (async)."""
agents = list(self.all_agents.keys())
for peer_id in agents:
log.misc(f"Sending profile to {peer_id}")
if not (await self._node_conn.send(peer_id, channel_trail=None,
content=self._node_profile.get_all_profile(),
content_type=Msg.PROFILE)):
log.error("Failed to send profile, removing (disconnecting) " + peer_id)
await self.remove_agent(peer_id)
|
behave
async
Behave in the current environment, calling the state-machines of the public and private networks (async).
Source code in unaiverse/agent_basics.py
| async def behave(self):
"""Behave in the current environment, calling the state-machines of the public and private networks (async)."""
if self.in_world():
log.set_sub("prv")
log.misc("Behaving (world)...")
if self.behav is None:
log.error("No behaviour specified")
else:
if self.behav_lone_wolf is not None:
self.behav_lone_wolf.enable(False)
self.behav.enable(True)
self.stdin.bind(self._proc_in_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdout.bind(self._proc_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdext.bind(self._env_streams_by_user_hash_prv, uuid=Custom.SYSTEM_INTERACTION_UUID)
await self.on_tick()
await self.behav.act()
self.behav.enable(False)
log.set_sub("pub")
log.misc("Behaving (public)...")
if self.behav_lone_wolf is None:
log.error("No behaviour specified")
else:
if self.behav is not None:
self.behav.enable(False)
self.behav_lone_wolf.enable(True)
self.stdin.bind(self._proc_in_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdout.bind(self._proc_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
self.stdext.bind(self._env_streams_by_user_hash_pub, uuid=Custom.SYSTEM_INTERACTION_UUID)
await self.behav_lone_wolf.act()
self.behav_lone_wolf.enable(False)
log.set_sub("gen")
# Clear expired interactions at every clock cycle and notify originators
await self.im.complete_expired()
# Also drain recently completed interactions for status notification
completed = self.im.drain_completed()
# Clear stream-expired data
self.im.clear_expired_stream_data()
for interaction in completed:
if (interaction.requester is not None and not interaction.volatile and
interaction.requester in self.all_agents):
log.inter(f"Communicating completion of interaction {interaction.to_code_str(True)}")
my_public_peer_id, my_private_peer_id = self.get_peer_ids()
if interaction.requester in self.public_agents:
my_peer_id: str = my_public_peer_id
else:
my_peer_id: str = my_private_peer_id
ret = await self._node_conn.send(interaction.requester, channel_trail=None,
content=interaction.to_status_dict(status_generated_by=my_peer_id),
content_type=Msg.INTERACTION_STATUS)
if not ret:
log.error(
f"Failed to send interaction status to peer {interaction.requester}, disconnecting it")
await self._node_purge_fcn(interaction.requester)
|
learn_behave
learn_behave(state: int, last_action: int, prev_state: int)
A placeholder method for behavioral learning, intended to be implemented by child classes.
It receives state and action information to update a behavioral model.
Parameters:
| Name |
Type |
Description |
Default |
state
|
int
|
The current state of the agent.
|
required
|
last_action
|
int
|
|
required
|
prev_state
|
int
|
The previous state of the agent.
|
required
|
Returns:
| Type |
Description |
|
|
None in the base implementation; subclasses are expected to return state-related feedback.
|
Source code in unaiverse/agent_basics.py
| def learn_behave(self, state: int, last_action: int, prev_state: int):
"""A placeholder method for behavioral learning, intended to be implemented by child classes.
It receives state and action information to update a behavioral model.
Args:
state: The current state of the agent.
last_action: The last action taken.
prev_state: The previous state of the agent.
Returns:
None in the base implementation; subclasses are expected to return state-related feedback.
"""
pass
|
on_tick
async
Source code in unaiverse/agent_basics.py
| async def on_tick(self):
pass
|
get_peer_id
Return the current peer ID of the agent, depending on whether it is behaving in a world or not.
Returns:
| Type |
Description |
str
|
The private peer ID if the agent is behaving in a world, the public peer ID otherwise.
|
Source code in unaiverse/agent_basics.py
| def get_peer_id(self) -> str:
"""Return the current peer ID of the agent, depending on whether it is behaving in a world or not.
Returns:
The private peer ID if the agent is behaving in a world, the public peer ID otherwise.
"""
public_peer_id, private_peer_id = self.get_peer_ids()
if self.behaving_in_world():
return private_peer_id
else:
return public_peer_id
|
get_peer_ids
get_peer_ids() -> tuple[str, str]
Retrieve the public and private peer IDs of the agent, from the underlying node's dynamic profile.
Returns:
| Type |
Description |
str
|
A tuple containing the public peer ID and the private peer ID.
|
str
|
If either ID is not available, a placeholder string is returned , .
|
Source code in unaiverse/agent_basics.py
| def get_peer_ids(self) -> tuple[str, str]:
"""Retrieve the public and private peer IDs of the agent, from the underlying node's dynamic profile.
Returns:
A tuple containing the public peer ID and the private peer ID.
If either ID is not available, a placeholder string is returned <public_peer_id>, <private_peer_id>.
"""
public_peer_id = None
private_peer_id = None
if self._node_profile is not None:
dynamic_profile = self._node_profile.get_dynamic_profile()
public_peer_id: str = dynamic_profile['peer_id'] # Public
private_peer_id: str = dynamic_profile['private_peer_id'] # Private
public_peer_id: str = '<public_peer_id>' if public_peer_id is None else public_peer_id
private_peer_id: str = '<private_peer_id>' if private_peer_id is None else private_peer_id
return public_peer_id, private_peer_id
|
evaluate_profile
evaluate_profile(role: int, profile: NodeProfile) -> bool
Evaluate if a given profile is valid for this agent based on its role. It helps in identifying and filtering
out invalid or 'cheating' profiles.
Parameters:
| Name |
Type |
Description |
Default |
role
|
int
|
The expected integer role (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER) for the profile.
|
required
|
profile
|
NodeProfile
|
The NodeProfile object to be evaluated.
|
required
|
Returns:
| Type |
Description |
bool
|
True if the profile is considered valid for the specified role, False otherwise.
|
Source code in unaiverse/agent_basics.py
| def evaluate_profile(self, role: int, profile: NodeProfile) -> bool:
"""Evaluate if a given profile is valid for this agent based on its role. It helps in identifying and filtering
out invalid or 'cheating' profiles.
Args:
role: The expected integer role (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER) for the profile.
profile: The NodeProfile object to be evaluated.
Returns:
True if the profile is considered valid for the specified role, False otherwise.
"""
# If the role in the profile is not the provided role, a profile-cheater was found
if (profile.get_dynamic_profile()['connections']['role'] in self.ROLE_STR_TO_BITS and
self.ROLE_STR_TO_BITS[profile.get_dynamic_profile()['connections']['role']] != role):
log.user(f"Cheater found: "
f"{profile.get_dynamic_profile()['connections']['role']} != {self.ROLE_BITS_TO_STR[role]}")
return False # Cheater found
# These are just examples: you are expected to reimplement this method in your custom agent file
if (role & 1 == self.ROLE_PUBLIC and
profile.get_dynamic_profile()['guessed_location'] == 'Some Dummy Location, Just An Example Here'):
return False
elif (role & 3 == self.ROLE_WORLD_MASTER and
profile.get_dynamic_profile()['guessed_location'] == 'Some Other Location, Just Another Example Here'):
return False
else:
return True
|
accept_new_role
accept_new_role(role: int)
Set the agent's role and optionally load a default behavior (private/world behavior).
Parameters:
| Name |
Type |
Description |
Default |
role
|
int
|
The integer role to assign to the agent (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER).
|
required
|
Source code in unaiverse/agent_basics.py
| def accept_new_role(self, role: int):
"""Set the agent's role and optionally load a default behavior (private/world behavior).
Args:
role: The integer role to assign to the agent (e.g., ROLE_PUBLIC, ROLE_WORLD_MASTER).
"""
base_role_str = self.ROLE_BITS_TO_STR[(role >> 2) << 2]
full_role_str = self.ROLE_BITS_TO_STR[role]
self._node_profile.get_dynamic_profile()['connections']['role'] = full_role_str
default_behav = None # A public role will not be found in the world map
if self.world_profile is not None:
base_role_to_behav = self.world_profile.get_dynamic_profile()['world_roles_fsm']
if base_role_str in base_role_to_behav:
default_behav = self.world_profile.get_dynamic_profile()['world_roles_fsm'][base_role_str]
if default_behav is not None and isinstance(default_behav, str) and len(default_behav) > 0:
default_behav_hsm = HybridStateMachine(self)
default_behav_hsm.load(default_behav)
self.behav = HybridStateMachine(self, policy=self.policy_default)
self.behav.include(default_behav_hsm, make_a_copy=True)
self.behav.set_role(base_role_str)
self.set_policy_filter(self.policy_filter, public=False)
|
is_human
Check if the agent is marked as human in its node.
Returns:
| Type |
Description |
|
|
True if the agent is in a human, False otherwise.
|
Source code in unaiverse/agent_basics.py
| def is_human(self):
"""Check if the agent is marked as human in its node.
Returns:
True if the agent is in a human, False otherwise.
"""
if self._node_profile is not None:
return self._node_profile.get_static_profile()["node_type"] == self.HUMAN
else:
return False
|
in_world
Check if the agent is currently operating within a 'world'.
Returns:
| Type |
Description |
|
|
True if the agent is in a world, False otherwise.
|
Source code in unaiverse/agent_basics.py
| def in_world(self):
"""Check if the agent is currently operating within a 'world'.
Returns:
True if the agent is in a world, False otherwise.
"""
if self._node_profile is not None:
return self.ROLE_STR_TO_BITS[self._node_profile.get_dynamic_profile()['connections']['role']] & 1 == 1
else:
return False
|
behaving_in_world
Checks if the agent's world-specific behavior state machine is currently active.
Returns:
| Type |
Description |
|
|
True if the world behavior is active, False otherwise.
|
Source code in unaiverse/agent_basics.py
| def behaving_in_world(self):
"""Checks if the agent's world-specific behavior state machine is currently active.
Returns:
True if the world behavior is active, False otherwise.
"""
return self.behav.is_enabled()
|
get_stream_sample
get_stream_sample(net_hash: str, sample_dict: dict[str, dict[str, Tensor | None | int | str]]) -> list[tuple[str, str]]
Receive and process stream samples that were provided by another agent.
Parameters:
| Name |
Type |
Description |
Default |
net_hash
|
str
|
The network hash identifying the source of the stream samples.
|
required
|
sample_dict
|
dict[str, dict[str, Tensor | None | int | str]]
|
A dictionary where keys are stream names and values are dictionaries
containing 'data', 'data_tag', and 'data_uuid' for each sample.
|
required
|
Returns:
| Type |
Description |
list[tuple[str, str]]
|
The list of tuples (stream user hash, data UUID) for each added data sample.
|
Source code in unaiverse/agent_basics.py
| def get_stream_sample(self, net_hash: str,
sample_dict: dict[str, dict[str, torch.Tensor | None | int | str]]) \
-> list[tuple[str, str]]:
"""Receive and process stream samples that were provided by another agent.
Args:
net_hash: The network hash identifying the source of the stream samples.
sample_dict: A dictionary where keys are stream names and values are dictionaries
containing 'data', 'data_tag', and 'data_uuid' for each sample.
Returns:
The list of tuples (stream user hash, data UUID) for each added data sample.
"""
# Let's be sure that the net hash is converted from the user's perspective to the one of the code here
net_hash = DataProps.normalize_net_hash(net_hash)
added_data = []
log.misc(f"Got a stream sample from {net_hash}...")
if sample_dict is None:
log.error(f"Invalid sample (expected a dictionary, got {type(sample_dict)})")
return added_data
if net_hash in self.known_streams:
for name, data_and_tag_and_uuid in sample_dict.items():
if ('data' not in data_and_tag_and_uuid or
'data_tag' not in data_and_tag_and_uuid or
'data_uuid' not in data_and_tag_and_uuid):
log.error(f"Invalid sample in data stream named {name} (missing one or more keys)")
continue
data, data_tag, data_uuid = (data_and_tag_and_uuid['data'],
data_and_tag_and_uuid['data_tag'],
data_and_tag_and_uuid['data_uuid'])
# - data must be not None
# - the stream name must be known
# - if the UUID associated to our local stream is the same of the data, then we check tag order
# - if the UUID associated to our local stream is the expected one, we don't check tag order
skip = False
reason = None
has_data = False
if not skip:
if data is None:
skip = True
reason = "Data is None"
if not skip:
if net_hash not in self.known_streams:
skip = True
reason = f"The net hash {net_hash} is not a known stream hash"
if not skip:
if name not in self.known_streams[net_hash]:
skip = True
reason = (f"The data stream named {name} is not present for net hash {net_hash} "
f"(names: {list(self.known_streams[net_hash].keys())})")
if not skip:
has_data = self.known_streams[net_hash][name].has_data(data_uuid)
if has_data and data_tag <= self.known_streams[net_hash][name].get_tag(data_uuid):
skip = True
reason = (f"The data tag {data_tag} is less or equal to the already present one "
f"for UUID {data_uuid} "
f"({self.known_streams[net_hash][name].get_tag(data_uuid)})")
# If we sample can be accepted...
if not skip:
log.misc(f"Accepted sample named {name}: tag={data_tag}, uuid={data_uuid}" +
(": it is a new UUID" if not has_data else ""))
# Saving the data sample on the known stream objects
self.known_streams[net_hash][name].set(data, data_tag, uuid=data_uuid)
# Data to return
added_data.append((DataProps.user_hash_from_net_hash(net_hash, name), data_uuid))
# Buffering data, if it was requested and if this sample comes from somebody's processor
if (self.buffer_generated_by_others != "none" and
DataProps.name_or_group_from_net_hash(net_hash) == "processor"):
log.debug(f"[get_stream_sample] Buffering others' processor generated data...")
# Getting the streams of the processor of the source agent
_processor_stream_dict = self.known_streams[net_hash]
_peer_id = DataProps.peer_id_from_net_hash(net_hash)
# Setting buffered stream counter
clear = False
if _peer_id in self.last_buffered_peer_id_to_info:
if self.buffer_generated_by_others == "one":
_buffered_uuid_to_id = self.last_buffered_peer_id_to_info[_peer_id]["uuid_to_id"]
if data_uuid not in _buffered_uuid_to_id:
_id = next(iter(_buffered_uuid_to_id.values()))
_buffered_uuid_to_id.clear()
_buffered_uuid_to_id[data_uuid] = _id
clear = True
else:
self.last_buffered_peer_id_to_info[_peer_id] = {"uuid_to_id": {}, "net_hash": None}
_buffered_uuid_to_id = self.last_buffered_peer_id_to_info[_peer_id]["uuid_to_id"]
if data_uuid not in _buffered_uuid_to_id:
_buffered_uuid_to_id[data_uuid] = sum(
len(v["uuid_to_id"]) for v in self.last_buffered_peer_id_to_info.values()) + 1
_buffered_id = _buffered_uuid_to_id[data_uuid]
# Building net hash to retrieve the buffered stream
_group = "buffered" + str(_buffered_id)
_name = name + "@" + _group
_net_hash = DataProps.build_net_hash(
_peer_id,
pubsub=False,
name_or_group=_group)
# If the buffered stream was not created before
if _net_hash not in self.known_streams:
log.debug(f"[get_stream_sample] Adding a new buffered stream to the list of known "
f"streams, hash: {_net_hash}")
for stream_obj in _processor_stream_dict.values():
# Same properties of the stream of the processor of the source agent
props = stream_obj.get_props().clone()
props.set_group("buffered" + str(_buffered_id))
props.set_name(props.get_name() + "@" + props.get_group())
# Adding the newly created stream
self.add_stream(BufferedStream(props=props),
owned=False,
net_hash=_net_hash)
# Saving hash of the new buffered stream
self.last_buffered_peer_id_to_info[_peer_id]["net_hash"] = _net_hash
else:
if clear:
for stream_obj in self.known_streams[_net_hash].values():
stream_obj.clear_buffer()
# Saving sample
self.known_streams[_net_hash][_name].set(data, data_tag, uuid=data_uuid)
if self.known_streams[_net_hash][_name].get_interaction(data_uuid) is None:
interaction = self.known_streams[net_hash][name].get_interaction(data_uuid)
if interaction is not None:
self.im.add_lazy_stream_to_interaction(
DataProps.user_hash_from_net_hash(_net_hash, _name), interaction)
# If we decided to skip this sample...
else:
log.misc(f"Skipping sample named {name} received in net hash {net_hash}, "
f"tag={data_tag}, uuid={data_uuid}: {reason}")
if net_hash not in self.known_streams:
log.debug(f"[get_stream_sample] "
f"The net hash {net_hash} was not found in the set of known streams")
else:
if name not in self.known_streams[net_hash]:
log.debug(f"[get_stream_sample] The net hash was known, but the data stream "
f"named {name} is not known")
else:
log.debug(f"[get_stream_sample] "
f"data={self.known_streams[net_hash][name].props.to_text(data)}")
return added_data
# If this stream is not known at all...
else:
log.misc(f"Skipping sample from {net_hash} (data stream is unknown)")
return added_data
|
send_stream_samples
async
Collect and send stream samples from all owned streams to appropriate recipients (async).
Source code in unaiverse/agent_basics.py
| async def send_stream_samples(self):
"""Collect and send stream samples from all owned streams to appropriate recipients (async)."""
# For each owned net hash...
for net_hash, streams_dict in self.owned_streams.items():
# Skipping our processor input
if DataProps.name_or_group_from_net_hash(net_hash) == "processor_in":
continue
# Get samples from all the owned streams: taking notes about the recipient and of what to send (contents)
interactions_by_uuid = {}
recipients_by_uuid = {}
contents_by_uuid = {}
contents_data_by_uuid = {}
# Preparing content to send
empty_contents: dict[str, dict | None] = {name: None for name in streams_dict.keys()}
empty_data = {name: None for name in streams_dict.keys()}
valid_samples_count = {}
# For each stream within this net hash...
for name, stream in streams_dict.items():
if stream.is_pubsub():
uuids = stream.get_data_uuids() # We send by PubSub whenever there is some data
else:
uuids = stream.get_interaction_uuids() # We send direct messages only if there is an interaction
# For each agent who triggered the interaction that generated this data (recipient)
# Set None as last UUID (it is present in pre-built BufferedStreams)
# since it is sometimes replaced by the last set reference,
# and we prefer to give priority to the not-None cases
if None in uuids:
uuids = [x for x in uuids if x is not None] + [None]
for uuid in uuids:
interaction = stream.get_interaction(uuid)
recipient = None
# Invalid agent? Skip!
if not stream.is_pubsub():
recipient = self.im.get_recipients(interaction)
if len(recipient) == 0:
continue
# Skipping system-triggered interactions
if recipient == [Custom.SYSTEM_INTERACTION_LABEL]:
continue
# Get data
data = stream.get(requested_by="send_stream_samples", uuid=uuid)
data_tag = stream.get_tag(uuid=uuid)
log.debug(f"[send_stream_sample] data from stream {stream.props.get_name()} = {data}")
if data is not None:
log.debug(f"[send_stream_samples] Found something in stream {stream.props.get_name()} "
f"uuid={uuid}, data_tag={data_tag}, recipient={recipient}")
# Prepare data structures the first time we meet a new recipient
if uuid not in contents_by_uuid:
interactions_by_uuid[uuid] = interaction
recipients_by_uuid[uuid] = recipient
contents_by_uuid[uuid] = empty_contents.copy()
contents_data_by_uuid[uuid] = empty_data.copy()
valid_samples_count[uuid] = 0
# Pack into the prepared data structures
contents_by_uuid[uuid][name] = {'data': data,
'data_tag': data_tag,
'data_uuid': uuid}
contents_data_by_uuid[uuid][name] = data
valid_samples_count[uuid] += 1
else:
log.debug(
f"[send_stream_samples] None data in stream {stream.props.get_name()} uuid={uuid}")
# Remove recipients of not-net-hash-full-of-contents messages
uuid_to_remove = []
for uuid, count in valid_samples_count.items():
if count != len(streams_dict):
uuid_to_remove.append(uuid)
log.debug(f"[send_stream_samples] Cannot send data for {net_hash}, uuid {uuid}, "
f"since it is incomplete ({valid_samples_count[uuid]} vs {len(streams_dict)})")
for uuid in uuid_to_remove:
del interactions_by_uuid[uuid]
del recipients_by_uuid[uuid]
del contents_by_uuid[uuid]
del contents_data_by_uuid[uuid]
del valid_samples_count[uuid]
# If direct message...
if not Stream.is_pubsub_from_net_hash(net_hash):
name_or_group = DataProps.name_or_group_from_net_hash(net_hash)
for uuid, recipients in recipients_by_uuid.items():
content = contents_by_uuid[uuid]
content_data = contents_data_by_uuid[uuid]
for recipient in recipients:
log.debug(f"[send_stream_samples] "
f"Sending samples of {net_hash} by direct message to {recipient}...")
for name in content.keys():
content[name]['data'] = self.hook_before_sending_sample(content_data[name],
content[name]['data_tag'],
net_hash, name, recipient)
log.debug(f"[send_stream_samples] "
f"(data_tag={content[name]['data_tag']}, data_uuid={uuid}, "
f"data is None?={content[name]['data'] is None})")
ret = await self._node_conn.send(recipient, channel_trail=name_or_group,
content_type=Msg.STREAM_SAMPLE,
content=content)
log.debug(f"[send_stream_samples] Sending returned: " + str(ret))
# If pubsub...
if Stream.is_pubsub_from_net_hash(net_hash):
for uuid, recipients in recipients_by_uuid.items():
log.debug(f"[send_stream_samples] Sending stream samples of the whole {net_hash} by pubsub...")
content = contents_by_uuid[uuid]
content_data = contents_data_by_uuid[uuid]
for name in content.keys():
content[name]['data'] = self.hook_before_sending_sample(content_data[name],
content[name]['data_tag'],
net_hash, name, None)
log.debug(
f"[send_stream_samples] (data_tag={content[name]['data_tag']}, data_uuid={uuid}, "
f"data is None?={content[name]['data'] is None}")
peer_id = Stream.peer_id_from_net_hash(net_hash) # Guessing agent peer ID from the net hash
ret = await self._node_conn.publish(peer_id, channel=net_hash,
content_type=Msg.STREAM_SAMPLE,
content=content)
log.debug(f"[send_stream_samples] Sending returned: " + str(ret))
|
disable_proc_input(public: bool)
Disable the processor input stream of this agent.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool
|
If True, disables the public input stream, otherwise the private one.
|
required
|
Source code in unaiverse/agent_basics.py
| def disable_proc_input(self, public: bool):
"""Disable the processor input stream of this agent.
Args:
public: If True, disables the public input stream, otherwise the private one.
"""
stream_dict = self.owned_streams[self.get_proc_input_net_hash(public=public)]
for stream_obj in stream_dict.values():
if stream_obj.is_public() == public:
stream_obj.disable()
|
enable_proc_input(public: bool)
Enable the processor input stream of this agent.
Parameters:
| Name |
Type |
Description |
Default |
public
|
bool
|
If True, enables the public input stream, otherwise the private one.
|
required
|
Source code in unaiverse/agent_basics.py
| def enable_proc_input(self, public: bool):
"""Enable the processor input stream of this agent.
Args:
public: If True, enables the public input stream, otherwise the private one.
"""
stream_dict = self.owned_streams[self.get_proc_input_net_hash(public=public)]
for stream_obj in stream_dict.values():
if stream_obj.is_public() == public:
stream_obj.enable()
|
set_proc_input(data: str | Image | Tensor | None, public: bool = False, uuid: str | None = None, data_type: str = 'auto', data_tag: int = -1) -> bool
Set the data in the processor input stream.
Parameters:
| Name |
Type |
Description |
Default |
data
|
str | Image | Tensor | None
|
The data to set in the stream (text, image, or tensor).
|
required
|
public
|
bool
|
If True, targets the public input stream, otherwise the private one.
|
False
|
uuid
|
str | None
|
Optional UUID to associate with the data sample.
|
None
|
data_type
|
str
|
The type of data ("text", "img", "tensor", or "auto" to infer from the data).
|
'auto'
|
data_tag
|
int
|
The tag associated with the sample (default -1).
|
-1
|
Returns:
| Type |
Description |
bool
|
True if the data was successfully set, False otherwise.
|
Source code in unaiverse/agent_basics.py
| def set_proc_input(self, data: str | Image | torch.Tensor | None, public: bool = False,
uuid: str | None = None, data_type: str = "auto", data_tag: int = -1) -> bool:
"""Set the data in the processor input stream.
Args:
data: The data to set in the stream (text, image, or tensor).
public: If True, targets the public input stream, otherwise the private one.
uuid: Optional UUID to associate with the data sample.
data_type: The type of data ("text", "img", "tensor", or "auto" to infer from the data).
data_tag: The tag associated with the sample (default -1).
Returns:
True if the data was successfully set, False otherwise.
"""
peer_id = self.get_peer_ids()[0] if public else self.get_peer_ids()[1]
proc_in = self.find_streams(peer_id, "processor_in")
if proc_in is None or len(proc_in) == 0:
return False
for net_hash, stream_dict in proc_in.items():
if not DataProps.is_pubsub_from_net_hash(net_hash):
for stream_name, stream_obj in stream_dict.items():
if stream_obj.props.is_public() == public:
if (data is None or
((data_type == "text" or isinstance(data, str)) and stream_obj.props.is_text()) or
((data_type == "img" or isinstance(data, Image)) and stream_obj.props.is_img()) or
((data_type == "tensor" or isinstance(data, torch.Tensor))
and stream_obj.props.is_tensor())):
stream_obj.set(data, uuid=uuid) # This might fail if the stream is disabled
stream_obj.set_tag(data_tag, uuid=uuid)
return True
return False
|
get_tag
get_tag(net_hash: str, uuid: str | None = None) -> int
Get the maximum data tag across all streams associated with the given network hash.
Parameters:
| Name |
Type |
Description |
Default |
net_hash
|
str
|
The network hash identifying the stream group.
|
required
|
uuid
|
str | None
|
Optional UUID of the data sample. If None, the latest tag is returned.
|
None
|
Returns:
| Type |
Description |
int
|
The maximum data tag found, or -1 if the stream is unknown.
|
Source code in unaiverse/agent_basics.py
| def get_tag(self, net_hash: str, uuid: str | None = None) -> int:
"""Get the maximum data tag across all streams associated with the given network hash.
Args:
net_hash: The network hash identifying the stream group.
uuid: Optional UUID of the data sample. If None, the latest tag is returned.
Returns:
The maximum data tag found, or -1 if the stream is unknown.
"""
if net_hash in self.known_streams:
data_tag = -1
stream_dict = self.known_streams[net_hash]
for stream_obj in stream_dict.values():
t = stream_obj.get_tag(uuid)
if t is not None:
data_tag = max(data_tag, t)
return data_tag
return -1
|
set_tag
set_tag(net_hash: str, data_tag: int, uuid: str | None = None)
Set the data tag on all streams associated with the given network hash.
Parameters:
| Name |
Type |
Description |
Default |
net_hash
|
str
|
The network hash identifying the stream group.
|
required
|
data_tag
|
int
|
|
required
|
uuid
|
str | None
|
Optional UUID of the data sample to tag. If None, the latest sample is tagged.
|
None
|
Source code in unaiverse/agent_basics.py
| def set_tag(self, net_hash: str, data_tag: int, uuid: str | None = None):
"""Set the data tag on all streams associated with the given network hash.
Args:
net_hash: The network hash identifying the stream group.
data_tag: The tag value to set.
uuid: Optional UUID of the data sample to tag. If None, the latest sample is tagged.
"""
if net_hash in self.known_streams:
stream_dict = self.known_streams[net_hash]
for stream_obj in stream_dict.values():
stream_obj.set_tag(data_tag, uuid=uuid)
|
get_current_interaction
get_current_interaction()
Source code in unaiverse/agent_basics.py
| def get_current_interaction(self):
return self.im.get_current()
|
force_action_step
force_action_step(step: int)
Override the current action step index used in the active behavior.
Parameters:
| Name |
Type |
Description |
Default |
step
|
int
|
The action step index to force. If negative, the override is cleared.
|
required
|
Source code in unaiverse/agent_basics.py
| def force_action_step(self, step: int):
"""Override the current action step index used in the active behavior.
Args:
step: The action step index to force. If negative, the override is cleared.
"""
self.overridden_action_step = step if step >= 0 else None
|
get_action_step
Retrieve the current action step from the agent's private/world behavior.
Returns:
| Type |
Description |
|
|
The current action step object from the HybridStateMachine's active action, or None if no action.
|
Source code in unaiverse/agent_basics.py
| def get_action_step(self):
"""Retrieve the current action step from the agent's private/world behavior.
Returns:
The current action step object from the HybridStateMachine's active action, or None if no action.
"""
behav = self.behav if self.behav.is_enabled() else self.behav_lone_wolf
return behav.get_action_step_idx() if self.overridden_action_step is None else self.overridden_action_step
|
is_last_action_step
Check if the agent's current action (private/world behavior) is on its last step.
Returns:
| Type |
Description |
|
|
True if the current action was its last step, False otherwise. Returns None if there is no active action.
|
Source code in unaiverse/agent_basics.py
| def is_last_action_step(self):
"""Check if the agent's current action (private/world behavior) is on its last step.
Returns:
True if the current action was its last step, False otherwise. Returns None if there is no active action.
"""
if self.im.current is not None:
return self.im.current.was_last_step_done()
else:
return None
|
is_multi_steps_action
Determines if the current action is a multistep action.
Returns:
| Type |
Description |
|
|
True if the action is multistep, False otherwise.
|
Source code in unaiverse/agent_basics.py
| def is_multi_steps_action(self):
"""Determines if the current action is a multistep action.
Returns:
True if the action is multistep, False otherwise.
"""
if self.im.current is not None:
return self.im.current.is_multi_steps()
else:
return False
|
set_policy
async
set_policy(policy_method_name_or_policy_fcn: str | Callable[[list[Action]], [int, Interaction | None]], public: bool = False) -> bool
Sets the policy to be used in selecting what action to perform in the current state (async).
Parameters:
| Name |
Type |
Description |
Default |
policy_method_name_or_policy_fcn
|
str | Callable[[list[Action]], [int, Interaction | None]]
|
The name of a method of the Agent class that implements a policy function.
It is a function that takes a list of Action objects that are candidates for execution, and returns
the index of the selected action and an ActionRequest object with the action-requester details
(requester, arguments, time, and UUID), or -1 and None if no action is selected.
By design, every agent implements a basic policy function named "policy_default".
|
required
|
public
|
bool
|
If True, the policy will be applied to the public HSM, otherwise to the private/world one.
|
False
|
Returns:
| Type |
Description |
bool
|
True if the policy was successfully set, False otherwise.
|
Source code in unaiverse/agent_basics.py
| async def set_policy(self,
policy_method_name_or_policy_fcn: str | Callable[[list[Action]], [int, Interaction | None]],
public: bool = False) -> bool:
"""Sets the policy to be used in selecting what action to perform in the current state (async).
Args:
policy_method_name_or_policy_fcn: The name of a method of the Agent class that implements a policy function.
It is a function that takes a list of `Action` objects that are candidates for execution, and returns
the index of the selected action and an ActionRequest object with the action-requester details
(requester, arguments, time, and UUID), or -1 and None if no action is selected.
By design, every agent implements a basic policy function named "policy_default".
public: If True, the policy will be applied to the public HSM, otherwise to the private/world one.
Returns:
True if the policy was successfully set, False otherwise.
"""
if isinstance(policy_method_name_or_policy_fcn, str):
policy_fcn = getattr(self, policy_method_name_or_policy_fcn, None)
if not callable(policy_fcn):
return False
behav = self.behav if not public else self.behav_lone_wolf
assert isinstance(behav, HybridStateMachine)
behav.set_policy(policy_fcn)
return True
elif callable(policy_method_name_or_policy_fcn):
policy_fcn = policy_method_name_or_policy_fcn
behav = self.behav if not public else self.behav_lone_wolf
assert isinstance(behav, HybridStateMachine)
behav.set_policy(policy_fcn)
return True
return False
|
set_policy_filter
set_policy_filter(filter_method_name_or_policy_fcn: str | Callable[[int, Interaction | None, list[Action], dict], [int, Interaction | None]] | None, public: bool = False) -> bool
Sets the policy filter function, which overrides the action selected by the policy in the current state.
Parameters:
| Name |
Type |
Description |
Default |
filter_method_name_or_policy_fcn
|
str | Callable[[int, Interaction | None, list[Action], dict], [int, Interaction | None]] | None
|
The name of a method of the Agent class or a function that implements a
policy filtering function, overriding what the policy decided.
It is a function that takes what the policy decided, a list of Action objects that are candidates
for execution, and a dictionary with customizable field (always including the "agent" key, with a ref
to the current agent) and returns the index of the selected action and an ActionRequest object with the
action-requester details (requester, arguments, time, and UUID), or -1 and None
if no action is selected.
By design, every agent comes with no filtering active.
|
required
|
public
|
bool
|
If True, the filter will be applied to the public HSM, otherwise to the private/world one.
|
False
|
Returns:
| Type |
Description |
bool
|
True if the filter was successfully set, False otherwise.
|
Source code in unaiverse/agent_basics.py
| def set_policy_filter(self,
filter_method_name_or_policy_fcn: str | Callable[
[int, Interaction | None, list[Action], dict], [int, Interaction | None]] | None,
public: bool = False) -> bool:
"""Sets the policy filter function, which overrides the action selected by the policy in the current state.
Args:
filter_method_name_or_policy_fcn: The name of a method of the Agent class or a function that implements a
policy filtering function, overriding what the policy decided.
It is a function that takes what the policy decided, a list of `Action` objects that are candidates
for execution, and a dictionary with customizable field (always including the "agent" key, with a ref
to the current agent) and returns the index of the selected action and an ActionRequest object with the
action-requester details (requester, arguments, time, and UUID), or -1 and None
if no action is selected.
By design, every agent comes with no filtering active.
public: If True, the filter will be applied to the public HSM, otherwise to the private/world one.
Returns:
True if the filter was successfully set, False otherwise.
"""
if isinstance(filter_method_name_or_policy_fcn, str):
filter_fcn: Callable | None = getattr(self, filter_method_name_or_policy_fcn, None)
if not callable(filter_fcn):
return False
if public:
self.policy_filter_lone_wolf = filter_fcn
assert isinstance(self.behav_lone_wolf, HybridStateMachine)
self.behav_lone_wolf.set_policy_filter(self.policy_filter_lone_wolf, self.policy_filter_lone_wolf_opts)
self.policy_filter_lone_wolf_opts['agent'] = self # Forced (do it *after* set_policy_filter)
self.policy_filter_lone_wolf_opts['public'] = True
else:
self.policy_filter = filter_fcn
assert isinstance(self.behav, HybridStateMachine)
self.behav.set_policy_filter(self.policy_filter, self.policy_filter_opts)
self.policy_filter_opts['agent'] = self # Forced (do it *after* set_policy_filter)
self.policy_filter_opts['public'] = False
return True
elif callable(filter_method_name_or_policy_fcn):
if public:
self.policy_filter_lone_wolf = filter_method_name_or_policy_fcn
assert isinstance(self.behav_lone_wolf, HybridStateMachine)
self.behav_lone_wolf.set_policy_filter(self.policy_filter_lone_wolf, self.policy_filter_lone_wolf_opts)
self.policy_filter_lone_wolf_opts['agent'] = self # Forced (do it *after* set_policy_filter)
self.policy_filter_lone_wolf_opts['public'] = True
else:
self.policy_filter = filter_method_name_or_policy_fcn
assert isinstance(self.behav, HybridStateMachine)
self.behav.set_policy_filter(self.policy_filter, self.policy_filter_opts)
self.policy_filter_opts['agent'] = self # Forced (do it *after* set_policy_filter)
self.policy_filter_opts['public'] = False
return True
return False
|
policy_default
This is the default policy for selecting which action to execute from a list of feasible actions.
It prioritizes actions that have been explicitly requested (i.e., have pending requests) on a first-come,
first-served basis. If no requested actions are found, it then selects the first action in the list that is
marked as ready.
Parameters:
| Name |
Type |
Description |
Default |
actions_list
|
list[Action]
|
A list of Action objects that are candidates for execution.
|
required
|
Returns:
| Type |
Description |
tuple[int, Interaction | None]
|
The index of the selected action and an ActionRequest object with the requester details (requester,
arguments, time, and UUID), or -1 and None if no action is selected.
|
Source code in unaiverse/agent_basics.py
| def policy_default(self, actions_list: list[Action]) -> tuple[int, Interaction | None]:
"""This is the default policy for selecting which action to execute from a list of feasible actions.
It prioritizes actions that have been explicitly requested (i.e., have pending requests) on a first-come,
first-served basis. If no requested actions are found, it then selects the first action in the list that is
marked as `ready`.
Args:
actions_list: A list of `Action` objects that are candidates for execution.
Returns:
The index of the selected action and an ActionRequest object with the requester details (requester,
arguments, time, and UUID), or -1 and None if no action is selected.
"""
for i, action in enumerate(actions_list):
if action.is_high_priority:
_list_of_requests = action.get_list_of_interactions()
_selected_action_idx = i
_selected_interaction = _list_of_requests.get_oldest_interaction() \
if len(_list_of_requests) > 0 else None
return _selected_action_idx, _selected_interaction
for i, action in enumerate(actions_list):
_list_of_requests = action.get_list_of_interactions()
if len(_list_of_requests) > 0: # The process action has more priority
_selected_action_idx = i
_selected_interaction = _list_of_requests.get_oldest_interaction()
return _selected_action_idx, _selected_interaction
for i, action in enumerate(actions_list):
if action.is_ready(consider_interactions=False):
_selected_action_idx = i
_selected_interaction = None
return _selected_action_idx, _selected_interaction
_selected_action_idx = -1
_selected_interaction = None
return _selected_action_idx, _selected_interaction
|
hook_proc_tweak_inputs(inputs)
A callback method that saves the inputs to the processor right before execution.
Parameters:
| Name |
Type |
Description |
Default |
inputs
|
|
The data inputs for the processor.
|
required
|
Returns:
| Type |
Description |
|
|
The same inputs passed to the function.
|
Source code in unaiverse/agent_basics.py
| def hook_proc_tweak_inputs(self, inputs):
"""A callback method that saves the inputs to the processor right before execution.
Args:
inputs: The data inputs for the processor.
Returns:
The same inputs passed to the function.
"""
self.proc_last_inputs = inputs
return inputs
|
hook_proc_tweak_outputs
hook_proc_tweak_outputs(outputs)
A callback method that saves the outputs from the processor right after execution.
Parameters:
| Name |
Type |
Description |
Default |
outputs
|
|
The data outputs from the processor.
|
required
|
Returns:
| Type |
Description |
|
|
The same outputs passed to the function.
|
Source code in unaiverse/agent_basics.py
| def hook_proc_tweak_outputs(self, outputs):
"""A callback method that saves the outputs from the processor right after execution.
Args:
outputs: The data outputs from the processor.
Returns:
The same outputs passed to the function.
"""
self.proc_last_outputs = outputs
return outputs
|
hook_before_sending_sample
hook_before_sending_sample(data, data_tag: int, net_hash: str, stream_name: str, recipient: str | None)
A callback method that handles the steam data right before sending it through the network.
Parameters:
| Name |
Type |
Description |
Default |
data
|
|
|
required
|
data_tag
|
int
|
|
required
|
stream_name
|
str
|
The name of the data stream.
|
required
|
net_hash
|
str
|
The net hash of the whole stream.
|
required
|
recipient
|
str | None
|
The (planned) recipient of this sample (or None in case of pubsub).
|
required
|
Returns:
| Type |
Description |
|
|
The same data passed to the function.
|
Source code in unaiverse/agent_basics.py
| def hook_before_sending_sample(self, data, data_tag: int,
net_hash: str, stream_name: str, recipient: str | None):
"""A callback method that handles the steam data right before sending it through the network.
Args:
data: The stream data sample.
data_tag: The tag of the sample.
stream_name: The name of the data stream.
net_hash: The net hash of the whole stream.
recipient: The (planned) recipient of this sample (or None in case of pubsub).
Returns:
The same data passed to the function.
"""
return data
|
remove_peer_from_agent_status_attrs
remove_peer_from_agent_status_attrs(peer_id: str)
Remove a peer ID from the status of the agent, assuming it to be the represented by attributes that start
with '_'.
Source code in unaiverse/agent_basics.py
| def remove_peer_from_agent_status_attrs(self, peer_id: str):
"""Remove a peer ID from the status of the agent, assuming it to be the represented by attributes that start
with '_'."""
for attr_name in dir(self):
if attr_name.startswith("_") and (not attr_name.startswith("__") and not attr_name.startswith("_Agent")
and not attr_name.startswith("_WAgent")):
try:
value = getattr(self, attr_name)
if isinstance(value, list):
setattr(self, attr_name, [v for v in value if v != peer_id])
elif isinstance(value, set):
value.discard(peer_id)
elif isinstance(value, dict):
if peer_id in value:
del value[peer_id]
except AttributeError:
continue # Skip read-only attributes
|
reset_agent_status_attrs
reset_agent_status_attrs()
Resets attributes that represent the status of the agent, assuming to be the ones that start with '_'.
Source code in unaiverse/agent_basics.py
| def reset_agent_status_attrs(self):
"""Resets attributes that represent the status of the agent, assuming to be the ones that start with '_'."""
for attr_name in dir(self):
if attr_name.startswith("_") and (not attr_name.startswith("__") and not attr_name.startswith("_Agent")
and not attr_name.startswith("_WAgent")):
try:
value = getattr(self, attr_name)
if isinstance(value, list):
setattr(self, attr_name, [])
elif isinstance(value, set):
setattr(self, attr_name, set())
elif isinstance(value, dict):
setattr(self, attr_name, {})
elif isinstance(value, int):
setattr(self, attr_name, 0)
elif isinstance(value, float):
setattr(self, attr_name, 0.)
elif isinstance(value, bool):
setattr(self, attr_name, False)
except AttributeError:
continue # Skip read-only attributes
|
streams_to_str
Return a formatted string listing all known streams, marking owned ones with an asterisk.
Returns:
| Type |
Description |
str
|
A multi-line string representation of the known streams.
|
Source code in unaiverse/agent_basics.py
| def streams_to_str(self) -> str:
"""Return a formatted string listing all known streams, marking owned ones with an asterisk.
Returns:
A multi-line string representation of the known streams.
"""
return "Streams:\n" + "\n".join([((" @" if user_hash in self.owned_streams_by_user_hash else " ") +
DataProps.peer_id_from_user_hash(user_hash) + " => " +
str(stream).replace("\n", "\n "))
for user_hash, stream in self.known_streams_by_user_hash.items()])
|
agent_state_dict
agent_state_dict() -> dict
Returns a dictionary containing an instance of the agent's state that can be saved.
Source code in unaiverse/agent_basics.py
| def agent_state_dict(self) -> dict:
"""Returns a dictionary containing an instance of the agent's state that can be saved."""
save_in_state = ['world_profile', ]
return {k: getattr(self, k) for k in save_in_state}
|
save
save(where: str = '') -> bool
Save the agent's state, including its processor and other attributes, to a specified location.
Parameters:
| Name |
Type |
Description |
Default |
where
|
str
|
The directory path where the agent's state should be saved. Defaults to "".
|
''
|
Returns:
| Type |
Description |
bool
|
True upon successful saving.
|
Raises:
| Type |
Description |
IOError
|
If there is an issue with file operations (e.g., directory creation, writing files).
|
(TypeError, ValueError, RuntimeError)
|
For other potential issues during serialization or saving.
|
Source code in unaiverse/agent_basics.py
| def save(self, where: str = "") -> bool:
"""Save the agent's state, including its processor and other attributes, to a specified location.
Args:
where: The directory path where the agent's state should be saved. Defaults to "".
Returns:
True upon successful saving.
Raises:
IOError: If there is an issue with file operations (e.g., directory creation, writing files).
TypeError, ValueError, RuntimeError: For other potential issues during serialization or saving.
"""
if where == '':
if self._node_identity_dir is None or len(self._node_identity_dir) == 0:
return False
where = os.path.join(self._node_identity_dir, "agent_state") # Default save path
os.makedirs(where, exist_ok=True)
# Saving the processor
if self.proc is not None and self.proc_updated_since_last_save:
pt_final = os.path.join(where, f"{self._node_name}.pt")
pt_tmp = pt_final + ".tmp"
try:
checkpoint = {
'model_state_dict': self.proc.state_dict(),
}
# If your agent has an optimizer, save its state too
if self.proc_opts.get('optimizer') is not None:
checkpoint['optimizer_state_dict'] = self.proc_opts['optimizer'].state_dict()
torch.save(checkpoint, pt_tmp)
os.replace(pt_tmp, pt_final) # Atomic move
self.proc_updated_since_last_save = False
except Exception as e:
if os.path.exists(pt_tmp):
os.remove(pt_tmp)
log.error(f"Error saving processor: {e}")
raise e
# Save Agent State
pkl_final = os.path.join(where, f"{self._node_name}.pkl")
pkl_tmp = pkl_final + ".tmp"
try:
state = self.agent_state_dict()
with open(pkl_tmp, "wb") as f:
pickle.dump(state, f)
os.replace(pkl_tmp, pkl_final)
except Exception as e:
log.error(f"Could not save " + ("agent" if not self.is_world else "world") + f": {e}")
if os.path.exists(pkl_tmp):
os.remove(pkl_tmp)
raise e
return True
|
load
load(where: str = '') -> bool
Load the agent's state from a specified location.
Parameters:
| Name |
Type |
Description |
Default |
where
|
str
|
The directory path from which the agent's state should be loaded. Defaults to "".
|
''
|
Returns:
| Type |
Description |
bool
|
True if loading succeeded.
|
Source code in unaiverse/agent_basics.py
| def load(self, where: str = "") -> bool:
"""Load the agent's state from a specified location.
Args:
where: The directory path from which the agent's state should be loaded. Defaults to "".
Returns:
True if loading succeeded.
"""
if where == '':
if self._node_identity_dir is None or len(self._node_identity_dir) == 0:
return False
where = os.path.join(self._node_identity_dir, "agent_state") # Default save path
# Check if directory exists
if not os.path.exists(where):
log.error("No state folder found for " + ("agent" if not self.is_world else "world") +
f" {self._node_name}.")
return False
# Check if the specific pickle file exists
pkl_path = os.path.join(where, f"{self._node_name}.pkl")
if not os.path.exists(pkl_path):
log.error("No saved state found for " + ("agent" if not self.is_world else "world") +
f" {self._node_name}.")
return False
# Loading the agent state dictionary
try:
with open(pkl_path, "rb") as f:
agent_state_dict = pickle.load(f)
except Exception as e:
raise Exception(f"Error loading pickle file at {pkl_path}: {e}")
# Update self's attributes with the loaded object's attributes
self.__dict__.update(agent_state_dict)
# Check if we also need to load the processor state
pt_path = os.path.join(where, f"{self._node_name}.pt")
load_proc = self.proc is not None and os.path.exists(pt_path)
if load_proc:
try:
checkpoint = torch.load(pt_path)
if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
self.proc.load_state_dict(checkpoint['model_state_dict'])
# Restore Optimizer to proc_opts
if 'optimizer_state_dict' in checkpoint and self.proc_opts.get('optimizer') is not None:
self.proc_opts['optimizer'].load_state_dict(checkpoint['optimizer_state_dict'])
else:
self.proc.load_state_dict(checkpoint)
except Exception as e:
raise Exception(f"Error loading processor state: {e}")
return True
|