unaiverse.agent
What this module does 🔴
Defines the high-level Agent class that orchestrates peer engagement, data streaming, learning, evaluation, and connection management on top of AgentBasics within the decentralized network.
agent
¶
█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████ ░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█ ░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi
Agent
¶
Bases: AgentBasics
An agent node: the primary participant in a UNaIVERSE world.
Agent extends AgentBasics with a rich set of pre-built action methods
that cover the full agent lifecycle - from joining a world and exchanging data
streams to engaging and disengaging with peers, running inference, learning,
and reporting statistics.
Custom agents are built by subclassing Agent and overriding hooks such as
hook_proc_tweak_inputs and hook_proc_tweak_outputs, or by adding new
@action-decorated coroutines that can be wired into a
HybridStateMachine behaviour JSON.
Because Agent is also an AgentBasics instance, it inherits all
membership bookkeeping (all_agents, world_agents, world_masters,
human_agents, artificial_agents) and the role constants
(ROLE_WORLD_MASTER, ROLE_WORLD_AGENT, ROLE_BITS_TO_STR) used
throughout the framework.
Attributes:
| Name | Type | Description |
|---|---|---|
stats |
The |
|
overwrite_stats |
When True, the next |
Examples:
A minimal custom agent that processes incoming data and reports to the world:
>>> from unaiverse.agent import Agent, action
>>> class Recogniser(Agent):
... @action
... async def on_result(self, interaction=None) -> bool:
... print("Interaction completed:", interaction)
... return True
>>>
>>> agent = Recogniser(proc=my_proc, proc_inputs=inputs, proc_outputs=outputs,
... proc_opts=opts, behav=my_behav, world_folder=None)
Initialize the agent, setting up status tracking and the stats recorder.
All positional and keyword arguments are forwarded unchanged to the parent
AgentBasics.__init__. After the parent is initialised, agent-level
status tracking sets (engaged agents, found agents, evaluation results,
playlist state) and a fresh Stats recorder are set up.
The Stats instance created here is agent-scoped (is_world=False)
and starts empty. It will be populated as the agent runs actions and
receives statistics responses from the world.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Positional arguments forwarded to |
()
|
|
**kwargs
|
Keyword arguments forwarded to |
{}
|
Examples:
>>> from unaiverse.agent import Agent
>>> agent = Agent(proc=my_proc, proc_inputs=inputs, proc_outputs=outputs,
... proc_opts=opts, behav=my_behav, world_folder="./my_world")
Source code in unaiverse/agent.py
collect_and_store_own_stats
¶
Collect this agent's own runtime metrics and push them to the stats recorder.
Three categories of metrics are sampled at the current wall-clock time and stored under the agent's own private peer ID:
- Network connectivity: the list of peer IDs currently connected through
the
p2p_worldconnection pool is stored under the key"connected_peers". If the connection pool raises an exception (for example while the node is still bootstrapping), an empty list is stored instead and the error is logged. - HSM state and action: the current state name, the currently executing
action name, and the last completed action name are read from
self.behavand stored under"state","action", and"last_action"respectively.
If self.stats is None, the method returns immediately without
performing any work. Errors from the stats recorder are caught and logged
rather than propagated, so a stats failure never interrupts the agent.
Note
This method is called automatically by send_stats_to_world before
assembling the payload. It can also be called manually to force a
stats snapshot at any time.
Source code in unaiverse/agent.py
send_stats_to_world
async
¶
Collect and transmit the agent's buffered statistics to the world node (async).
The method first verifies that the agent is currently connected to a world.
If not, it returns early. Otherwise it calls collect_and_store_own_stats
to snapshot the latest runtime metrics, then assembles the payload via
stats.get_payload_for_world() and sends it to the world peer using the
Msg.STATS_UPDATE message type.
If the payload is empty (no stats have accumulated since the last send) the transmission is skipped. If the network send fails, an error is logged but no exception is raised.
Note
The stats payload is consumed (cleared) by get_payload_for_world
after being assembled, so repeated calls will not re-send the same data.
Source code in unaiverse/agent.py
update_stats_view
¶
Merge a stats view received from the world into the agent's local stats.
The received_view dictionary is passed directly to
stats.update_view. Existing entries in the local stats are preserved
unless overwrite is True, in which case any key present in the incoming
view replaces the locally held value.
This method is typically invoked by the networking layer when the world
replies to a STATS_REQUEST message with a STATS_RESPONSE.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
received_view
|
dict
|
The stats view dictionary received from the world. Its
structure mirrors the internal |
required |
overwrite
|
bool
|
When True, existing local entries are replaced by the incoming values. When False (default), local entries are kept and only absent keys are added. Defaults to False. |
False
|
Source code in unaiverse/agent.py
suggest_role_to_world
async
¶
Suggest a role change for one or more agents to the world master (async).
The method resolves agent to the set of involved peer IDs (see
__involved_agents), converts role to its integer bitmask, and
filters out any peer whose current role already matches the requested role.
For the remaining peers a ROLE_SUGGESTION message is sent to the world
master carrying a list of {"peer_id": ..., "role": ...} entries.
If all involved agents already have the requested role, no message is sent
and True is returned immediately.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
str | None
|
The peer ID of the target agent, a wildcard string (such as
|
required |
role
|
str
|
The desired role as a human-readable string, matching a key in
|
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the suggestion message was sent successfully (or if no message |
bool
|
was needed), False if the network send failed. |
Source code in unaiverse/agent.py
suggest_badges_to_world
async
¶
suggest_badges_to_world(agent: str | None = None, score: float = -1.0, badge_type: str = 'completed', badge_description: str | None = None) -> bool
Suggest one or more performance badges to the world master (async).
This action is typically called by an evaluator agent to reward other agents
after a competition round. For each involved peer it constructs a badge
dictionary containing the peer ID, score, badge type, optional description,
and the agent's last known token retrieved via the connection pool, then
sends all dictionaries in a single Msg.BADGE_SUGGESTIONS message to
the world master.
The score must be non-negative and the badge type must be one of
Agent.BADGE_TYPES; both are validated before any message is sent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
str | None
|
The peer ID of the target agent, a wildcard (such as
|
None
|
score
|
float
|
A non-negative float representing the agent's performance score.
Defaults to |
-1.0
|
badge_type
|
str
|
The category of the badge. Must be one of
|
'completed'
|
badge_description
|
str | None
|
An optional free-text description for the badge. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the badge suggestion message was sent successfully, |
bool
|
False if the score is invalid, the badge type is unknown, or the |
bool
|
network send failed. |
Source code in unaiverse/agent.py
remove_peer_from_agent_status_attrs
¶
Remove a peer from all agent-level status tracking sets.
Delegates to the parent AgentBasics.remove_peer_from_agent_status_attrs
first, which removes peer_id from _found_agents,
_valid_cmp_agents, _engaged_agents, and related bookkeeping
structures. Afterwards, _available is recomputed: the agent is
considered available again only when _engaged_agents is empty.
This override ensures that disconnecting a peer automatically restores availability if the peer was the last engagement partner.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The private peer ID to remove from all status tracking structures. |
required |
Source code in unaiverse/agent.py
reset_agent_status_attrs
¶
Reset all agent-level status attributes to their initial values.
Delegates to AgentBasics.reset_agent_status_attrs first, which zeros
or clears generic status variables (lists, dicts, counters, and boolean
flags). The following Agent-specific attributes are then explicitly
restored to their construction-time defaults:
_availableis set to True (the agent starts as available)._repeatis set to 1 (single-pass playlist)._last_recorded_stream_numis set to 1._last_recorded_countis set to 0.
This method is called by the framework when the agent re-enters a clean state, for example after a world disconnection.
Source code in unaiverse/agent.py
send
async
¶
send(action_name: str | None = None, target: str | list[str] | None = None, action_kwargs: dict | None = None, streams: list[tuple[str, int]] | list[str] | None = None, data_samples: list[str | Image | Tensor] | dict[str, str | Image | Tensor] | None = None, num_steps: int = -1, timeout: float = -1.0, from_state: str | None = None, to_state: str | None = None, callback: str | None = None, forced_uuid: str | None = 'do_not_force', id: str | None = 'random', copy_sys: bool = False, wait_completion: bool = False, volatile: bool = False, interaction: Interaction | None = None) -> bool
Send an interaction request to one or more target agents (async).
Accepts either a pre-built Interaction object supplied via
interaction or raw keyword arguments from which one will be created
internally. When interaction carries a non-system interaction, the call
is rejected immediately.
On the first invocation (interaction.get_mark() is None) the target
agents are resolved, a new interaction is dispatched via _send, and
(if wait_completion=True) the interaction is stored as a mark on the
system interaction so the HSM can re-enter this action on subsequent ticks
until completion.
On re-entry (interaction.get_mark() is not None), the stored mark is
retrieved and True is returned only when the tracked interaction has
been completed by the interaction manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action_name
|
str | None
|
The action name to invoke on the target agent(s). Ignored
when a pre-built |
None
|
target
|
str | list[str] | None
|
Peer ID, wildcard (such as |
None
|
action_kwargs
|
dict | None
|
Keyword arguments to pass to the target action. Ignored
when a pre-built |
None
|
streams
|
list[tuple[str, int]] | list[str] | None
|
List of |
None
|
data_samples
|
list[str | Image | Tensor] | dict[str, str | Image | Tensor] | None
|
Actual data samples to attach to the interaction. May be
a list of raw values or a dict mapping stream user hashes to
samples. Ignored when a pre-built |
None
|
num_steps
|
int
|
Number of time steps the interaction spans. Ignored when a
pre-built |
-1
|
timeout
|
float
|
Maximum wall-clock seconds allowed for the interaction to
complete. A value of |
-1.0
|
from_state
|
str | None
|
The HSM state from which the interaction originates.
Ignored when a pre-built |
None
|
to_state
|
str | None
|
The HSM state the sender will transition to after the
interaction completes. Ignored when a pre-built |
None
|
callback
|
str | None
|
Name of an action method to call on this agent once the interaction is completed by the recipient. |
None
|
forced_uuid
|
str | None
|
An explicit UUID to assign to the new interaction. Use
with care to avoid UUID collisions. Defaults to
|
'do_not_force'
|
id
|
str | None
|
A human-readable identifier attached to the interaction for
logging. Ignored when a pre-built |
'random'
|
copy_sys
|
bool
|
When True, the last stream data published under the system UUID is copied into the new interaction to seed its initial data. |
False
|
wait_completion
|
bool
|
When True, the action keeps the HSM in the current state until all involved agents confirm completion (pedantic mode). Defaults to False. |
False
|
volatile
|
bool
|
When True, the recipient is instructed not to send back any
completion status for this interaction. Incompatible with
|
False
|
interaction
|
Interaction | None
|
The system |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one target agent was reached (or if completion was |
bool
|
confirmed when |
Raises:
| Type | Description |
|---|---|
GenException
|
If both |
Examples:
Trigger process on an engaged agent and wait for it to finish:
Source code in unaiverse/agent.py
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 | |
all_sent_completed
async
¶
Return True if every sent interaction with the given action name has been completed (async).
Iterates over self.im.sent and checks whether each interaction that
matches action_name (or all interactions when action_name is
None) has been marked as completed by the interaction manager.
This is the negation of "are there still-pending sends outstanding?" and
is useful as an HSM exit predicate after a send call that did not use
wait_completion=True.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
action_name
|
str | None
|
The action name to filter by, or |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if every matching sent interaction is completed, False if at |
bool
|
least one is still pending. |
Source code in unaiverse/agent.py
process
async
¶
process(interaction: Interaction | None = None) -> bool
Run one inference step: read from stdin, call the processor, and write to stdout (async).
The method follows these steps in order:
- For human processors,
__rebind_stdin_if_humanis called to redirect stdin to the appropriate interaction-specific binding; if it returns False the action aborts early. - Input data is read from
self.stdinunder the interaction's UUID. If no data is available,Falseis returned. - The data tag is read from
self.stdin. If it is missing,Falseis returned. hook_proc_tweak_inputsis invoked so subclasses can pre-process inputs before forwarding them to the processor.self.procis called with the input data, passingfirst=Trueon step 0 andlast=Trueon the final step.hook_proc_tweak_outputsis invoked so subclasses can post-process the outputs.- The output data is written to
self.stdoutunder the same UUID and data tag.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interaction
|
Interaction | None
|
The driving interaction injected automatically by the
dispatch layer. When |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the full inference pipeline completed successfully, |
bool
|
False if any step failed (missing data, processor error, hook error). |
Source code in unaiverse/agent.py
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 | |
learn
async
¶
learn(interaction: Interaction | None = None) -> bool
Run one supervised learning step: inference followed by a backward pass (async).
First verifies that the processor has learning capabilities (a configured
optimiser and at least one loss function). If not, the action returns
False immediately.
Then process(interaction) is called to perform the forward pass.
If it succeeds, target data is read from self.stdtar under the same
UUID, and proc.learn_backward is invoked with those targets. Loss
values are printed to the user log.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interaction
|
Interaction | None
|
The driving interaction injected automatically by the
dispatch layer. Must not be |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if both the inference step and the backward pass completed |
bool
|
without errors, False otherwise. |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
AssertionError
|
If |
Source code in unaiverse/agent.py
show
async
¶
show(interaction: Interaction | None = None)
Log the completed interaction for inspection purposes (async).
A generic action intended to be used as a completion callback in HSM behaviours. It logs the interaction as a code string at the debug level. Subclasses frequently override this method to implement custom post-completion logic such as storing results or triggering follow-up actions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interaction
|
Interaction | None
|
The interaction object that triggered this action. Injected automatically by the dispatch layer. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
|
Always True. |
Source code in unaiverse/agent.py
set_engaged_partner
async
¶
Force the engagement set to a specific agent or group of agents (async).
All existing engagement partners are cleared first. If agent is not
None, each element in the provided string, list, or set is added to
_engaged_agents. The availability flag _available is set to
True when the resulting engagement set is empty, and False
otherwise.
This is a programmatic shortcut to bypass the normal
send_engage / engage handshake when the pairing is already known
at design time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
str | list[str] | set[str] | None
|
The peer ID, list of peer IDs, or set of peer IDs to force as
the active engagement partners. Pass |
required |
clear_found
|
bool
|
When True (default), |
True
|
Returns:
| Type | Description |
|---|---|
bool
|
Always True. |
Source code in unaiverse/agent.py
send_engage
async
¶
Offer engagement to all agents currently in _found_agents (async).
Sends an "engage" interaction to every peer ID in _found_agents,
passing the local agent's current role string as the sender_role
argument. The send is performed with wait_completion=True, so on the
first call the action returns False and re-registers itself; on the second
call (after all targets replied) it checks which agents actually confirmed.
For each agent that confirmed engagement:
- its peer ID is added to _engaged_agents.
- _available is set to False.
- its peer ID is removed from _found_agents.
If no confirmed agents are found in last_sent_interaction.target,
or if the underlying send call fails, the method returns False.
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one agent successfully confirmed engagement, |
bool
|
False otherwise. |
Source code in unaiverse/agent.py
engage
async
¶
engage(acceptable_role: str | None = None, sender_role: str | None = None, interaction: Interaction | None = None) -> bool
Accept or reject an incoming engagement request from another agent (async).
Called on the recipient side of a send_engage / engage handshake.
The method validates the requester in four steps:
- The requester is looked up in
world_agentsandworld_masters; an unknown requester causes an immediate False return. sender_rolemust not be None.acceptable_rolemust not be None.- If the agent is currently available (
_available is True), the sender's role bitmask is compared againstacceptable_role. On a match the requester is added to_engaged_agents,_availableis set to False, and True is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
acceptable_role
|
str | None
|
The role the sender must have for the engagement to be
accepted, as a string from |
None
|
sender_role
|
str | None
|
The role string advertised by the sender in the engagement request. Defaults to None. |
None
|
interaction
|
Interaction | None
|
The interaction injected automatically by the dispatch layer; used to retrieve the requester's peer ID. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the engagement was accepted and confirmed, False if the agent |
bool
|
is unavailable, the role does not match, or any validation check fails. |
Source code in unaiverse/agent.py
send_disengage
async
¶
Send a disengagement request to all currently engaged agents (async).
Dispatches a "disengage" interaction to every peer ID in
_engaged_agents, optionally requesting that each recipient also
disconnects. On success, _engaged_agents is cleared immediately (the
local agent does not wait for a disengage callback to confirm).
If the underlying send fails, an error is logged and False is returned
without clearing the engagement set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
send_disconnection_too
|
bool
|
When True, the |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the disengagement message was delivered to at least one |
bool
|
engaged agent and |
bool
|
failed. |
Source code in unaiverse/agent.py
disengage
async
¶
disengage(disconnect_too: bool = False, interaction: Interaction | None = None) -> bool
Process an incoming disengagement request from another agent (async).
Called on the recipient side of a send_disengage / disengage
handshake. The requester is validated against world_agents and
world_masters; an unknown or not-previously-engaged requester causes
an early False return.
When validated, the requester is removed from _engaged_agents. If
disconnect_too is True, the status confirmation for this interaction
is suppressed (interaction.send_status = False) and
_node_purge_fcn is awaited to disconnect the requester from the
network. Finally, _available is recomputed as True if no other
agents remain engaged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
disconnect_too
|
bool
|
When True, the agent that sent the disengagement is
also disconnected from the network via |
False
|
interaction
|
Interaction | None
|
The interaction injected automatically by the dispatch layer; used to retrieve the requester's peer ID. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the disengagement was processed successfully, False if the |
bool
|
requester is unknown or was not previously engaged. |
Source code in unaiverse/agent.py
disengage_all
async
¶
Clear the local engagement set without notifying the remote peers (async).
Unlike send_disengage, this action does not send any message to the engaged agents. It
simply empties _engaged_agents in place and restores _available to True. Use this
when the remote side has already been notified (or when the communication channel is gone)
and only the local bookkeeping needs to be reset.
Returns:
| Type | Description |
|---|---|
bool
|
Always True. |
Source code in unaiverse/agent.py
disconnect
async
¶
Disconnect a single peer from the network and clean up its local state (async).
Invokes _node_purge_fcn on the given peer ID. The purge function is responsible
for removing the peer from the node's connection pool, which in turn calls
remove_agent and remove_streams to clean up all bookkeeping structures
associated with that agent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
str
|
The peer ID of the agent to disconnect. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
Always True. |
Source code in unaiverse/agent.py
disconnect_by_role
async
¶
Disconnect every connected agent whose role matches one of the specified roles (async).
The method first optionally sends a disengagement message (with disconnect_too=True) to
all currently engaged agents. It then calls find_agents to populate _found_agents
with peer IDs whose role matches role. For each found agent it calls
_node_purge_fcn, which removes the peer from the connection pool and triggers
remove_agent and remove_streams on the local state.
A deep copy of _found_agents is taken before iterating so that the purge callbacks
that modify _found_agents do not interfere with the loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
str | list[str]
|
A role string (e.g. |
required |
disengage_too
|
bool
|
When True, |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
Always True. |
Source code in unaiverse/agent.py
disconnected
async
¶
Return True if all specified agents are no longer connected (async).
The set of agents to check is resolved via __involved_agents: a concrete peer ID
produces a single-element list, a known wildcard such as "<valid_cmp>" expands to
_valid_cmp_agents, and None defaults to _engaged_agents (falling back to
_found_agents when the engagement set is empty). If the resolved set is empty,
False is returned immediately.
When handshake_completed is True the check uses all_agents (agents that
completed the handshake). When False, both all_agents and the raw connection pool
(_node_conn.is_connected) are consulted so that peers in mid-handshake are also
considered connected.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
str | None
|
The peer ID, wildcard string, or |
None
|
handshake_completed
|
bool
|
When True, only agents that completed the handshake are considered still connected. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if all involved agents are disconnected, False if at least one is still |
bool
|
connected or if the resolved agent set is empty. |
Source code in unaiverse/agent.py
received_some_asked_data
async
¶
Return True if any previously asked agent has published a stream sample (async).
Iterates over all interactions that were sent (both currently active and recently
completed) and, for each target agent, scans the agent's processor streams. A
non-public stream that contains data under the interaction's UUID is considered a
match. If processing_fcn is provided and callable on self, it is called with
(agent, stream_props, data, data_tag) for every matching sample; in that case the
method returns True only after collecting all hits. Without processing_fcn it
returns True on the very first hit found.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
processing_fcn
|
str | None
|
The name of an instance method on this agent that will be called
with |
None
|
data_type
|
str | None
|
When provided, only streams whose |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one stream sample was received from an asked agent, |
bool
|
False otherwise. |
Source code in unaiverse/agent.py
nop
async
¶
Perform no operation and optionally log a message (async).
A convenience placeholder action for HSM transitions that require an action slot but
have no meaningful work to perform. If message is provided it is emitted at the
misc log level.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str | None
|
An optional message to log. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
Always True. |
Source code in unaiverse/agent.py
connected
async
¶
Return True if all specified agents are currently connected (async).
The set of agents to check is resolved via __involved_agents: a concrete peer ID or
list of peer IDs, a wildcard such as "<valid_cmp>", or None to default to
_engaged_agents (falling back to _found_agents). If the resolved set is empty,
False is returned immediately.
When handshake_completed is True, a peer is considered connected only if it appears
in all_agents (i.e., the handshake finished). When False, the lower-level
_node_conn.is_connected is used, which also returns True for peers in mid-handshake.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
str | list[str] | None
|
The peer ID, list of peer IDs, wildcard string, or |
None
|
handshake_completed
|
bool
|
When True, only agents that completed the handshake are considered connected. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if all involved agents are connected, False if at least one is missing or if |
bool
|
the resolved agent set is empty. |
Source code in unaiverse/agent.py
all_asked_finished
async
¶
Return True if every asked agent has confirmed task completion (async).
Compares _agents_who_were_asked with
_agents_who_completed_what_they_were_asked. These sets are populated and cleared by
actions such as send_subscribe (and related ask-style actions) as agents are
contacted and as they report back. The check is a simple set equality test, so it
returns True when both sets are empty as well (no agents were asked).
Returns:
| Type | Description |
|---|---|
bool
|
True if all previously asked agents have sent a completion confirmation, |
bool
|
False if at least one is still pending. |
Source code in unaiverse/agent.py
all_engagements_completed
async
¶
Return True if no agents remain in _found_agents (async).
After send_engage is called, each agent that confirmed engagement is moved from
_found_agents to _engaged_agents. This predicate becomes True when
_found_agents is empty, meaning every discovered agent has either confirmed
engagement or has been discarded.
Returns:
| Type | Description |
|---|---|
bool
|
True if |
bool
|
not yet been engaged. |
Source code in unaiverse/agent.py
agents_are_waiting
async
¶
Return True if any found agent is still pending full registration (async).
Peers that connect to the node are held in _node_agents_waiting until the
handshake and profile exchange complete and they are moved into all_agents. This
action checks whether any peer ID in _found_agents is also present in
_node_agents_waiting, which would mean the agent connected but has not yet been
fully registered. The current waiting set is also logged at the misc level.
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one found agent is still waiting to be fully registered, |
bool
|
False if all found agents have completed registration or if |
bool
|
empty. |
Source code in unaiverse/agent.py
send_subscribe
async
¶
send_subscribe(agent: str | None = None, stream_hashes: list[str] | None = None, unsubscribe: bool = False) -> bool
Ask one or more remote agents to subscribe or unsubscribe from PubSub streams (async).
The involved agents are resolved via __involved_agents from agent. The
stream_hashes list is normalised to full network hashes with
__normalize_user_hash, the DataProps for each hash are read from
known_streams, and a "subscribe" action request is sent to every involved
agent carrying the stream owners and JSON-serialised stream properties. Both
_agents_who_were_asked and _agents_who_completed_what_they_were_asked are
reset before dispatching.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
str | None
|
The peer ID, wildcard string, or |
None
|
stream_hashes
|
list[str] | None
|
A list of stream hashes (net hashes, user hashes, or
|
None
|
unsubscribe
|
bool
|
When True, asks the remote agents to unsubscribe rather than subscribe. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one agent was successfully contacted, False if no agents were |
bool
|
resolved or all sends failed. |
Source code in unaiverse/agent.py
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 | |
subscribe
async
¶
subscribe(stream_owners: list[str] | None = None, stream_props: list[str] | None = None, unsubscribe: bool = False, interaction: Interaction | None = None) -> bool
Process a subscription or unsubscription request received from a remote agent (async).
Called on the recipient side of a send_subscribe / subscribe handshake. The
requester is validated against world_agents and world_masters (when behaving
in a world) or public_agents (when operating outside a world). If validation
passes, each entry in stream_props is deserialized from JSON and verified to be a
PubSub stream; non-PubSub streams cause an immediate False return.
For each valid (stream_owner, stream_props) pair:
- When subscribing,
add_compatible_streamsis called to register the stream and set up the underlying PubSub topic subscription. - When unsubscribing,
remove_streamsis called to deregister the stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_owners
|
list[str] | None
|
Parallel list of peer IDs that own the streams. Must have the same
length as |
None
|
stream_props
|
list[str] | None
|
Parallel list of JSON-serialized |
None
|
unsubscribe
|
bool
|
When True, the streams are removed rather than added. Defaults to False. |
False
|
interaction
|
Interaction | None
|
The interaction injected automatically by the dispatch layer; used to retrieve the requester's peer ID. Defaults to None. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if all streams were processed successfully, False if the requester is |
bool
|
unknown, any stream is not a PubSub stream, or any other validation check fails. |
Source code in unaiverse/agent.py
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 | |
record
async
¶
record(interaction: Interaction | None = None, record_uuid: str | None = 'see_interaction_uuid') -> bool
Records interaction.num_steps samples from the interaction's stdin stream group into a
new owned BufferedDataStream group and exposes it via pubsub. Multistep by virtue of the
interaction's num_steps; the framework's readiness gate already pauses execution on
no-fresh-data cycles.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interaction
|
Interaction | None
|
The driving interaction (auto-injected by the dispatch layer). |
None
|
record_uuid
|
str | None
|
Which |
'see_interaction_uuid'
|
Triggered from HSM transits via:
behav.add_transit("init", "next_state", action="record",
args={"streams": {"stdin": ["<world>:group"]},
"num_steps": "<wildcard_or_int>",
"record_uuid": None})
Source code in unaiverse/agent.py
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 | |
connect_to
async
¶
Source code in unaiverse/agent.py
connect_by_role
async
¶
Finds and attempts to connect with agents whose profiles match a specific role. It can be optionally filtered by a custom function. It returns True if at least one valid agent is found (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
str | list[str]
|
The role or list of roles to search for. |
required |
filter_fcn
|
str | None
|
The name of an optional filter function. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one agent is found and a NEW connection request is made, False otherwise. |
Source code in unaiverse/agent.py
find_agents
async
¶
Locally searches through the agent's known peers (world and public agents) to find agents with a specific
role. It populates the _found_agents set with the peer IDs of matching agents (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
str | list[str]
|
The role or list of roles to search for. |
required |
handshake_completed
|
bool
|
If True, only consider agents that have completed the handshake. |
False
|
engage
|
bool
|
If you want to force the found agents to be the ones that you are engaged with. |
False
|
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one agent is found, False otherwise. |
Source code in unaiverse/agent.py
next_pref_stream
async
¶
Moves the internal pointer to the next stream in the list of preferred streams, which is often used for playlist-like operations. It wraps around to the beginning if it reaches the end (async).
Returns:
| Type | Description |
|---|---|
bool
|
True if the move is successful, False if the list is empty. |
Source code in unaiverse/agent.py
first_pref_stream
async
¶
Resets the internal pointer to the first stream in the list of preferred streams. This is useful for restarting a playback or processing loop (async).
Returns:
| Type | Description |
|---|---|
bool
|
True if the move is successful, False if the list is empty. |
Source code in unaiverse/agent.py
check_pref_stream
async
¶
Checks the position of the current preferred stream within the list. It can check if it's the first, last, or if it has completed a full round, among other checks (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
what
|
str
|
A string specifying the type of check to perform (e.g., 'first', 'last', 'last_round'). |
'last'
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the condition is met, False otherwise. |
Source code in unaiverse/agent.py
set_pref_streams
async
¶
Fills the agent's list of preferred streams (a playlist). It can repeat the playlist a specified number of times and resolves user-provided stream hashes to their full network hashes (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
net_hashes
|
list[str]
|
A list of stream hashes to add to the playlist. |
required |
repeat
|
int
|
The number of times to repeat the playlist. |
1
|
Returns:
| Type | Description |
|---|---|
bool
|
Always True. |
Source code in unaiverse/agent.py
evaluate
async
¶
evaluate(stream_hash: str, how: str, steps: int = 100, re_offset: bool = False, agents_to_evaluate: str | list[str] | None = None) -> bool
Evaluates the performance of agents that have completed a generation task. It compares the generated data from each agent with a local stream (which can be a ground truth or reference stream) using a specified comparison method (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_hash
|
str
|
The hash of the local stream to use for comparison. |
required |
how
|
str
|
The name of the comparison method to use. |
required |
steps
|
int
|
The number of steps to perform the evaluation. |
100
|
re_offset
|
bool
|
A boolean to indicate whether to re-offset the streams. |
False
|
agents_to_evaluate
|
str | list[str] | None
|
peer ID or list of peer IDs of agents that returned streams that were buffered, and that we are going to evaluate (default is None, meaning the agents in self._agents_who_completed_what_they_were_asked are considered). |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the evaluation is successful, False otherwise. |
Source code in unaiverse/agent.py
compare_eval
async
¶
Compares the results of a previous evaluation to a given threshold or finds the best result among all agents. It can check for minimum, maximum, or simple threshold-based comparisons, and it populates a list of 'valid' agents that passed the comparison (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cmp
|
str
|
The comparison operator (e.g., '<', '>', 'min'). |
required |
thres
|
float
|
The threshold value for comparison. |
required |
good_if_true
|
bool
|
A boolean to invert the pass/fail logic. |
True
|
Returns:
| Type | Description |
|---|---|
bool
|
True if at least one agent passed the comparison, False otherwise. |
Source code in unaiverse/agent.py
1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 | |
resolve_agent_ref
¶
Resolve an agent name or peer_id to a peer_id.
If ref is already a known peer_id, it is returned as-is.
Otherwise, the method searches all known agents by node_name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ref
|
str
|
A peer_id or agent name. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The peer_id if found, None otherwise. |
Source code in unaiverse/agent.py
resolve_stream_ref
¶
Resolve a stream name to a stream user or net hash (heuristic, gives priority to owned streams).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ref
|
str
|
A stream name, without peer IDs, just the name. |
required |
public
|
bool
|
A boolean to inform the resolve procedure whether it must look for streams among public agents (True) or world-related agents (False). |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The stream user hash or net hash if found, None otherwise. |
Source code in unaiverse/agent.py
1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 | |
action
¶
Decorator that marks and wraps a coroutine as a UNaIVERSE action method.
Applying @action to an async method on an Agent subclass does three
things automatically at call time:
- Agent reference resolution - any keyword argument whose name appears in
Custom.AGENT_ARG_NAMESis resolved from a human-readable name or"email@node_name"notation to the underlying private peer ID viaresolve_agent_ref. Lists and tuples of references are resolved element by element. - Stream reference resolution - any keyword argument whose name appears in
Custom.STREAM_ARG_NAMESis resolved from a user-friendly hash or stream name to the canonical net hash viaresolve_stream_ref. Public vs. world streams are selected automatically based on whether the agent is currently behaving in a world. - Exception isolation - if the wrapped coroutine raises any exception, the
error is logged and
Falseis returned. The HSM will see a failed action without crashing the agent.
Methods listed in Custom.SPECIAL_ACTION_NAMES (such as send) bypass
steps 1 and 2 because they perform their own argument resolution internally.
After the decorator is applied, the resulting callable has
_is_action = True set on it, which allows the framework to discover all
action methods through introspection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable
|
The async action method to wrap. It must be a coroutine function
defined on an |
required |
Returns:
| Type | Description |
|---|---|
Callable
|
An async wrapper with the same signature and |
Callable
|
enriched with the resolution and isolation logic described above. |
Examples:
>>> from unaiverse.agent import Agent, action
>>> class MyAgent(Agent):
... @action
... async def greet(self, agent: str, message: str) -> bool:
... # "agent" is resolved automatically from name to peer ID
... print(f"Hello {agent}: {message}")
... return True
Source code in unaiverse/agent.py
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | |