Skip to content

unaiverse.interaction

What this module does 🔴

Models inter-agent interactions and their lifecycle, with an InteractionManager that registers, matches, resolves and completes sent/received/lazy interactions across streams.

interaction

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████ ░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█ ░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

Interaction

Interaction(action_name: str | None = None, action_kwargs: dict | None = None, streams: dict[str, str] | list[str] | None = None, data_samples: list | dict | None = None, num_steps: int = -1, requester: str | None = None, target: str | list[str] | None = None, from_state: str | None = None, to_state: str | None = None, timeout: float = -1.0, callback: str | None = None, volatile: bool = False, forced_uuid: str | None = 'do_not_force', id: str | None = 'random')

A single interaction between agents in the UNaIVERSE network.

An Interaction is the fundamental unit of communication between agents. It encapsulates the requested action (by name and keyword arguments), the set of data streams or concrete data samples to be exchanged, timing and timeout information, status tracking, and the stream-based data-access interface exposed to the processor.

Interactions can be created by the local agent (and then registered as InteractionType.SENT or InteractionType.LAZY) or received from a remote agent (InteractionType.RECEIVED). In all cases an Interaction goes through the state machine defined by InteractionStatus: CREATED -> REQUESTED/ RECEIVED/LAZY -> RUNNING <-> PAUSED -> COMPLETED.

Streams attached to an interaction are organized in three categories:

  • stdin_streams: processor input streams.
  • stdtar_streams: processor target (output reference) streams.
  • stdext_streams: extra streams not consumed by the processor directly.

A StreamProxy provides unified read/write access across all attached streams.

Attributes:

Name Type Description
requester str | None

Peer ID of the agent that originated this interaction, or None for system-generated interactions.

target list[str | None]

List of peer IDs of the agents that should handle the interaction.

id str

Short identifier string (not necessarily unique; used to build uuid).

uuid str

Fully-qualified unique identifier built from id and requester via build_uuid.

action_name str

Name of the action being requested (e.g. "process").

action_kwargs dict

Keyword arguments forwarded to the action.

streams dict

Parsed stream specification dict with keys "stdin", "stdtar", "stdext", and "stdunk".

data_samples list

List of concrete data objects attached to the interaction when no stream-based specification is used.

num_steps int

Total number of execution steps for the interaction, or -1 when the interaction carries no data-based steps.

from_state str | None

Name of the HSM state from which the interaction is expected to originate.

to_state str | None

Name of the HSM state the interaction is expected to yield.

timeout float

Maximum lifetime of the interaction in seconds; -1 means no per-interaction limit (the InteractionManager applies its own global timeout).

status InteractionStatus

Current InteractionStatus of the interaction.

volatile

When True, the recipient is asked not to send back any completion status.

callback

Name of the agent method to invoke when the interaction completes.

completion_reason CompletionReason | None

CompletionReason that caused the interaction to complete, or None while still active.

destination_state str | None

Name of the HSM state reached after completion (set by the action handler).

stream_proxy

StreamProxy providing unified access to all attached streams.

Examples:

Create a minimal interaction requesting a "process" action on a remote agent:

>>> from unaiverse.interaction import Interaction
>>> inter = Interaction(
...     action_name="process",
...     action_kwargs={"mode": "fast"},
...     requester="peer_abc",
...     target="peer_xyz",
... )
>>> inter.uuid
'..._peer_abc'

Create a data-carrying interaction using concrete samples:

>>> import torch
>>> tensor = torch.zeros(3, 224, 224)
>>> inter = Interaction(
...     action_name="infer",
...     data_samples=[tensor],
...     requester="peer_abc",
...     target="peer_xyz",
... )
>>> inter.num_steps
1

Initialize a new Interaction with all its identity, data, and timing fields.

Builds the unique identifier (uuid) from id and requester via build_uuid, then parses and normalizes streams or data_samples into the internal stream dictionaries. If both streams and data_samples are provided, data_samples is ignored in favour of the stream specification.

When data_samples is a JSON string it is deserialized from base-64-encoded payload format. When it is a plain Python list it is stored as-is. In either case num_steps is forced to 1.

A GenException is raised immediately if both callback and volatile are set, because a volatile interaction never sends back a completion status and therefore the callback would never be invoked.

Parameters:

Name Type Description Default
action_name str | None

Name of the action being requested (e.g. "process" or "learn").

None
action_kwargs dict | None

Keyword arguments for the action. Must not contain stream objects. Defaults to {}.

None
streams dict[str, str] | list[str] | None

Stream specification. Either a plain list of stream hashes (user hash, net hash, name, or group), or a dict with keys "stdin", "stdtar", "stdext", and/or "stdunk" mapping to lists of stream hashes. Defaults to None.

None
data_samples list | dict | None

Concrete data samples to attach (used when no stream specification is given). Can be a list of objects (e.g. torch.Tensor, PIL.Image, str), a JSON string representing such a list in the serialized payload format, or a dict mapping stream user hashes to individual samples. Defaults to None.

None
num_steps int

Total number of execution steps (-1 for non-data-based interactions). Overridden to 1 when data_samples is provided. Defaults to -1.

-1
requester str | None

Peer ID of the agent originating this interaction. Defaults to None, which indicates a system-generated interaction.

None
target str | list[str] | None

Peer ID of the target agent, or a list of peer IDs. Defaults to None.

None
from_state str | None

Name of the HSM state from which this interaction is expected to originate. Defaults to None.

None
to_state str | None

Name of the HSM state this interaction is expected to yield. Defaults to None.

None
timeout float

Maximum lifetime in seconds; -1 means no per-interaction limit (the InteractionManager applies its own global timeout). Defaults to -1.

-1.0
callback str | None

Name of an agent method to call when this interaction completes. The method must accept a single interaction keyword argument and have default values for all other parameters. Cannot be combined with volatile=True. Defaults to None.

None
volatile bool

When True, the recipient is asked not to send back any completion status. Cannot be combined with a non-None callback. Defaults to False.

False
forced_uuid str | None

Overrides the automatically built UUID. None is a valid value (maps to the system interaction UUID). Only use in exceptional cases where explicit UUID control is required. Defaults to "do_not_force", which means the UUID is built normally.

'do_not_force'
id str | None

Short identifier string (not guaranteed to be unique across agents). "random" (default) generates a random 8-character hex string. None assigns the system interaction UUID. The final uuid appends the requester to this id, separated by "_". Defaults to "random".

'random'

Raises:

Type Description
GenException

If both callback and volatile are set to non-None/ True values simultaneously.

Source code in unaiverse/interaction.py
def __init__(self,
             action_name: str | None = None,
             action_kwargs: dict | None = None,
             streams: dict[str, str] | list[str] | None = None,
             data_samples: list | dict | None = None,
             num_steps: int = -1,
             requester: str | None = None,
             target: str | list[str] | None = None,
             from_state: str | None = None,
             to_state: str | None = None,
             timeout: float = -1.,
             callback: str | None = None,
             volatile: bool = False,
             forced_uuid: str | None = "do_not_force",
             id: str | None = "random"):
    """Initialize a new Interaction with all its identity, data, and timing fields.

    Builds the unique identifier (``uuid``) from ``id`` and ``requester`` via
    ``build_uuid``, then parses and normalizes ``streams`` or ``data_samples``
    into the internal stream dictionaries. If both ``streams`` and ``data_samples``
    are provided, ``data_samples`` is ignored in favour of the stream specification.

    When ``data_samples`` is a JSON string it is deserialized from base-64-encoded
    payload format. When it is a plain Python list it is stored as-is. In either
    case ``num_steps`` is forced to ``1``.

    A ``GenException`` is raised immediately if both ``callback`` and ``volatile``
    are set, because a volatile interaction never sends back a completion status and
    therefore the callback would never be invoked.

    Args:
        action_name: Name of the action being requested (e.g. ``"process"`` or
            ``"learn"``).
        action_kwargs: Keyword arguments for the action. Must not contain stream
            objects. Defaults to ``{}``.
        streams: Stream specification. Either a plain list of stream hashes
            (user hash, net hash, name, or group), or a dict with keys
            ``"stdin"``, ``"stdtar"``, ``"stdext"``, and/or ``"stdunk"``
            mapping to lists of stream hashes. Defaults to None.
        data_samples: Concrete data samples to attach (used when no stream
            specification is given). Can be a list of objects (e.g.
            ``torch.Tensor``, ``PIL.Image``, ``str``), a JSON string representing
            such a list in the serialized payload format, or a dict mapping stream
            user hashes to individual samples. Defaults to None.
        num_steps: Total number of execution steps (-1 for non-data-based
            interactions). Overridden to ``1`` when ``data_samples`` is provided.
            Defaults to -1.
        requester: Peer ID of the agent originating this interaction. Defaults to
            None, which indicates a system-generated interaction.
        target: Peer ID of the target agent, or a list of peer IDs. Defaults to
            None.
        from_state: Name of the HSM state from which this interaction is expected
            to originate. Defaults to None.
        to_state: Name of the HSM state this interaction is expected to yield.
            Defaults to None.
        timeout: Maximum lifetime in seconds; ``-1`` means no per-interaction
            limit (the ``InteractionManager`` applies its own global timeout).
            Defaults to -1.
        callback: Name of an agent method to call when this interaction completes.
            The method must accept a single ``interaction`` keyword argument and
            have default values for all other parameters. Cannot be combined with
            ``volatile=True``. Defaults to None.
        volatile: When ``True``, the recipient is asked not to send back any
            completion status. Cannot be combined with a non-None ``callback``.
            Defaults to False.
        forced_uuid: Overrides the automatically built UUID. ``None`` is a valid
            value (maps to the system interaction UUID). Only use in exceptional
            cases where explicit UUID control is required. Defaults to
            ``"do_not_force"``, which means the UUID is built normally.
        id: Short identifier string (not guaranteed to be unique across agents).
            ``"random"`` (default) generates a random 8-character hex string.
            ``None`` assigns the system interaction UUID. The final ``uuid``
            appends the ``requester`` to this ``id``, separated by ``"_"``.
            Defaults to ``"random"``.

    Raises:
        GenException: If both ``callback`` and ``volatile`` are set to non-None/
            True values simultaneously.
    """
    # Participants
    self.requester: str | None = requester
    self.target: list[str | None] = target if isinstance(target, list) else [target]

    # Identity
    self.id: str = _uuid.uuid4().hex[0:8] if (id is not None and id == 'random') else id \
        if id is not None else Custom.SYSTEM_INTERACTION_UUID

    # Indexing string (the actual unique identifier of the interaction, that also involves the requester)
    self.uuid: str = Interaction.build_uuid(self.id, str(self.requester))

    # Forcing a specific UUID (if needed)
    if forced_uuid != "do_not_force":
        self.id = None
        self.uuid = forced_uuid

    # Action requested
    self.action_name: str = action_name
    self.action_kwargs: dict = action_kwargs if action_kwargs is not None else {}

    # Timing information
    self.timestamp_created: float = -1.
    self.timestamp_started: float = -1.
    self.timeout: float = timeout
    self.cycle_created: int = -1
    self.cycle_started: int = -1
    self.timestamp_completed: float = -1.
    self.cycle_completed: int = -1

    # States
    self.from_state: str | None = from_state
    self.to_state: str | None = to_state

    # Samples specification: list of (stream_name, num_samples)
    self.streams: dict = {}
    self.data_samples: list = []
    self.num_steps: int = num_steps  # Generic not-data-based interactions have -1 here

    # Stream-based specification
    if streams is not None and len(streams) > 0:
        self.parse_streams(streams)  # This will create a specific stream structure

    # Actual data (alternative to samples above)
    if streams is None and data_samples is not None:
        if isinstance(data_samples, str):  # If it is JSON string of a list of dicts, each is type_name->base64...
            self.data_samples: list = deserialize_payload(data_samples)  # List of PIL.Image, torch.Tensor, str, ...
        else:
            self.data_samples = data_samples  # List of PIL.Image, torch.Tensor, str, ...
        self.num_steps = 1

    # Status
    self.status: InteractionStatus = InteractionStatus.CREATED
    self.data_sent_after_completion = False
    self.buffered_stream_restarted = False
    self.volatile = volatile

    # Pointers
    self.action_ref: Callable[Interaction] | None = None  # Reference to the actual Action object

    # Data tags
    self.data_tags: dict[str, list[int]] = {}  # Streams could have different data tags

    # Completion reason
    self.completion_reason: CompletionReason | None = None

    # Destination state reached, if the action was completed (for status messages)
    self.destination_state: str | None = None

    # Params that depends on the way it is stored in the action
    self.by_insertion_order_id = -1
    self.by_requester_insertion_order_id = -1

    # Status of the running action
    self.__step_idx = -1  # The first done step will have index 0, while -1 means "no steps done so far"
    self.__starting_time = 0.
    self.__timeout_starting_time = 0.
    self.__mark = None
    self.stdin_streams = {}  # User hash to stream object
    self.stdtar_streams = {}  # User hash to stream object
    self.stdext_streams = {}  # User hash to stream object
    self.owned_streams = {}  # User hash to stream object
    self.lazy_streams = {}  # User hash to stream object
    self.stream_proxy = StreamProxy()  # Virtual stream IO associated to all streams above (not the lazy ones)

    # Target-related info
    self.target_data_tags: list[dict[str, list[int]]] = [{} for _ in range(len(self.target))]
    self.target_cycle_completed: list[int] = [-1] * len(self.target)
    self.target_timestamp_completed: list[float] = [-1.] * len(self.target)
    self.target_completion_reason: list[CompletionReason | None] = [None] * len(self.target)
    self.target_destination_state: list[str | None] = [None] * len(self.target)

    # Back-reference to the InteractionManager (set when registered)
    self.__im: InteractionManager | None = None

    # Interaction type (from Interaction Manager)
    self.type: InteractionType | None = None

    # Callback method name
    self.callback = callback

    # Checking
    if self.callback is not None and self.volatile:
        raise GenException("You cannot set a callback on a volatile interaction")

requester instance-attribute

requester: str | None = requester

target instance-attribute

target: list[str | None] = target if isinstance(target, list) else [target]

id instance-attribute

id: str = hex[0:8] if id is not None and id == 'random' else id if id is not None else SYSTEM_INTERACTION_UUID

uuid instance-attribute

uuid: str = build_uuid(id, str(requester))

action_name instance-attribute

action_name: str = action_name

action_kwargs instance-attribute

action_kwargs: dict = action_kwargs if action_kwargs is not None else {}

timestamp_created instance-attribute

timestamp_created: float = -1.0

timestamp_started instance-attribute

timestamp_started: float = -1.0

timeout instance-attribute

timeout: float = timeout

cycle_created instance-attribute

cycle_created: int = -1

cycle_started instance-attribute

cycle_started: int = -1

timestamp_completed instance-attribute

timestamp_completed: float = -1.0

cycle_completed instance-attribute

cycle_completed: int = -1

from_state instance-attribute

from_state: str | None = from_state

to_state instance-attribute

to_state: str | None = to_state

streams instance-attribute

streams: dict = {}

data_samples instance-attribute

data_samples: list = []

num_steps instance-attribute

num_steps: int = num_steps

status instance-attribute

data_sent_after_completion instance-attribute

data_sent_after_completion = False

buffered_stream_restarted instance-attribute

buffered_stream_restarted = False

volatile instance-attribute

volatile = volatile

action_ref instance-attribute

action_ref: Callable[Interaction] | None = None

data_tags instance-attribute

data_tags: dict[str, list[int]] = {}

completion_reason instance-attribute

completion_reason: CompletionReason | None = None

destination_state instance-attribute

destination_state: str | None = None

by_insertion_order_id instance-attribute

by_insertion_order_id = -1

by_requester_insertion_order_id instance-attribute

by_requester_insertion_order_id = -1

stdin_streams instance-attribute

stdin_streams = {}

stdtar_streams instance-attribute

stdtar_streams = {}

stdext_streams instance-attribute

stdext_streams = {}

owned_streams instance-attribute

owned_streams = {}

lazy_streams instance-attribute

lazy_streams = {}

stream_proxy instance-attribute

stream_proxy = StreamProxy()

target_data_tags instance-attribute

target_data_tags: list[dict[str, list[int]]] = [{} for _ in (range(len(target)))]

target_cycle_completed instance-attribute

target_cycle_completed: list[int] = [-1] * len(target)

target_timestamp_completed instance-attribute

target_timestamp_completed: list[float] = [-1.0] * len(target)

target_completion_reason instance-attribute

target_completion_reason: list[CompletionReason | None] = [None] * len(target)

target_destination_state instance-attribute

target_destination_state: list[str | None] = [None] * len(target)

type instance-attribute

type: InteractionType | None = None

callback instance-attribute

callback = callback

created property

created: bool

Return True when the interaction has been created but not yet dispatched.

An interaction is in the CREATED state immediately after __init__ and before any InteractionManager registration step transitions it to REQUESTED, RECEIVED, or LAZY.

Returns:

Type Description
bool

True if status is InteractionStatus.CREATED.

completed property

completed: bool

Return True when the interaction has finished executing.

An interaction enters the COMPLETED state via mark_completed (called directly) or via InteractionManager.complete. Once completed it is moved to the recently-completed set and eventually drained by drain_completed. The completion_reason attribute records the cause.

Returns:

Type Description
bool

True if status is InteractionStatus.COMPLETED.

running property

running: bool

Return True when the interaction is actively being executed.

An interaction enters the RUNNING state when it is set as the current interaction by InteractionManager.set_current, which calls mark_running. It transitions back to PAUSED if another interaction preempts it, or to COMPLETED when the action finishes.

Returns:

Type Description
bool

True if status is InteractionStatus.RUNNING.

paused property

paused: bool

Return True when the interaction has been temporarily suspended.

An interaction is paused when another interaction is set as current by InteractionManager.set_current_as_paused, which calls mark_paused. A paused interaction may be resumed later by setting it as the current one again.

Returns:

Type Description
bool

True if status is InteractionStatus.PAUSED.

registered_as_sent property

registered_as_sent: bool

Return True when this interaction was registered as sent by the local agent.

The type attribute is set to InteractionType.SENT by InteractionManager.register_sent. A sent interaction tracks a request that the local agent dispatched to a remote target.

Returns:

Type Description
bool

True if type is InteractionType.SENT.

registered_as_received property

registered_as_received: bool

Return True when this interaction was registered as received from another agent.

The type attribute is set to InteractionType.RECEIVED by InteractionManager.register_received. A received interaction represents a request that a remote agent sent to the local agent.

Returns:

Type Description
bool

True if type is InteractionType.RECEIVED.

build_uuid staticmethod

build_uuid(id: str | None, requester: str | None = None) -> str

Build the canonical UUID string for an interaction from its id and requester.

The UUID is the concatenation of id and requester separated by "_". If id is None it is replaced by Custom.SYSTEM_INTERACTION_ID; if requester is None it is replaced by Custom.SYSTEM_INTERACTION_LABEL. The resulting UUID uniquely identifies the interaction within the session because it encodes both the short local identifier and the originating peer.

Parameters:

Name Type Description Default
id str | None

Short identifier part of the UUID. None is treated as the system interaction ID constant.

required
requester str | None

Peer ID part of the UUID. None is treated as the system interaction label constant.

None

Returns:

Type Description
str

A string of the form "<id>_<requester>".

Examples:

>>> Interaction.build_uuid("abc12345", "peer_xyz")
'abc12345_peer_xyz'
>>> Interaction.build_uuid(None, None)  # system interaction UUID
'system_system'
Source code in unaiverse/interaction.py
@staticmethod
def build_uuid(id: str | None, requester: str | None = None) -> str:
    """Build the canonical UUID string for an interaction from its ``id`` and ``requester``.

    The UUID is the concatenation of ``id`` and ``requester`` separated by ``"_"``.
    If ``id`` is ``None`` it is replaced by ``Custom.SYSTEM_INTERACTION_ID``; if
    ``requester`` is ``None`` it is replaced by ``Custom.SYSTEM_INTERACTION_LABEL``.
    The resulting UUID uniquely identifies the interaction within the session because
    it encodes both the short local identifier and the originating peer.

    Args:
        id: Short identifier part of the UUID. ``None`` is treated as the system
            interaction ID constant.
        requester: Peer ID part of the UUID. ``None`` is treated as the system
            interaction label constant.

    Returns:
        A string of the form ``"<id>_<requester>"``.

    Examples:
        >>> Interaction.build_uuid("abc12345", "peer_xyz")
        'abc12345_peer_xyz'
        >>> Interaction.build_uuid(None, None)  # system interaction UUID
        'system_system'
    """
    if id is None:
        id = Custom.SYSTEM_INTERACTION_ID
    if requester is None:
        requester = Custom.SYSTEM_INTERACTION_LABEL
    return id + "_" + requester

get_id_from_uuid staticmethod

get_id_from_uuid(uuid: str) -> str

Extract the short id component from a full interaction UUID.

The UUID format is "<id>_<requester>". This method splits on the first underscore and returns the left part. It is the inverse of the first argument passed to build_uuid.

Parameters:

Name Type Description Default
uuid str

A UUID string previously produced by build_uuid.

required

Returns:

Type Description
str

The id component (everything before the first "_").

Examples:

>>> Interaction.get_id_from_uuid("abc12345_peer_xyz")
'abc12345'
Source code in unaiverse/interaction.py
@staticmethod
def get_id_from_uuid(uuid: str) -> str:
    """Extract the short ``id`` component from a full interaction UUID.

    The UUID format is ``"<id>_<requester>"``. This method splits on the first
    underscore and returns the left part. It is the inverse of the first argument
    passed to ``build_uuid``.

    Args:
        uuid: A UUID string previously produced by ``build_uuid``.

    Returns:
        The ``id`` component (everything before the first ``"_"``).

    Examples:
        >>> Interaction.get_id_from_uuid("abc12345_peer_xyz")
        'abc12345'
    """
    return uuid.split("_")[0]

registered_as_lazy

registered_as_lazy() -> bool

Return True when this interaction was registered as lazy by the local agent.

The type attribute is set to InteractionType.LAZY by InteractionManager.register_lazy. A lazy interaction is one that the local agent generated internally (not dispatched from, or received from, a remote peer) and that targets either a local action or a remote agent.

Returns:

Type Description
bool

True if type is InteractionType.LAZY.

Source code in unaiverse/interaction.py
def registered_as_lazy(self) -> bool:
    """Return ``True`` when this interaction was registered as lazy by the local agent.

    The ``type`` attribute is set to ``InteractionType.LAZY`` by
    ``InteractionManager.register_lazy``. A lazy interaction is one that the local
    agent generated internally (not dispatched from, or received from, a remote peer)
    and that targets either a local action or a remote agent.

    Returns:
        ``True`` if ``type`` is ``InteractionType.LAZY``.
    """
    return self.type == InteractionType.LAZY

update_target

update_target(target: str | list[str])

Replace the target list and reset all per-target tracking arrays.

When the destination of an interaction changes after construction (for example when the InteractionManager resolves an agent name to its peer ID), this method updates target and reinitializes every array that is indexed in parallel with it: target_completion_reason, target_data_tags, target_timestamp_completed, target_destination_state, and target_cycle_completed.

Parameters:

Name Type Description Default
target str | list[str]

New target peer ID, or a list of peer IDs. A bare string is automatically wrapped in a list.

required
Source code in unaiverse/interaction.py
def update_target(self, target: str | list[str]):
    """Replace the target list and reset all per-target tracking arrays.

    When the destination of an interaction changes after construction (for example
    when the ``InteractionManager`` resolves an agent name to its peer ID), this
    method updates ``target`` and reinitializes every array that is indexed in
    parallel with it: ``target_completion_reason``, ``target_data_tags``,
    ``target_timestamp_completed``, ``target_destination_state``, and
    ``target_cycle_completed``.

    Args:
        target: New target peer ID, or a list of peer IDs. A bare string is
            automatically wrapped in a list.
    """
    if isinstance(target, str):
        target = [target]
    self.target = target
    self.target_completion_reason = [None] * len(self.target)
    self.target_data_tags = [{} for _ in range(len(self.target))]
    self.target_timestamp_completed = [-1.] * len(self.target)
    self.target_destination_state = [None] * len(self.target)
    self.target_cycle_completed = [-1] * len(self.target)

set_manager

set_manager(im: InteractionManager, stdin_streams: dict[str, Stream | object], stdtar_streams: dict[str, Stream | object], stdext_streams: dict[str, Stream | object], owned_streams: dict[str, Stream | object]) -> None

Bind this interaction to its owning InteractionManager and initialize its stream references.

Records the creation time (timestamp_created and cycle_created) from the global clock, stores the four stream dictionaries, and binds the stream_proxy to the union of all provided streams plus any concrete data samples. This method is called by InteractionManager.register_sent and InteractionManager.__register_received_or_lazy immediately after the streams have been matched and validated.

After this call the interaction's stream_proxy provides unified access to all attached streams and data samples.

Parameters:

Name Type Description Default
im InteractionManager

The InteractionManager that owns this interaction.

required
stdin_streams dict[str, Stream | object]

Mapping from user hash to stream objects used as processor input arguments.

required
stdtar_streams dict[str, Stream | object]

Mapping from user hash to stream objects used as processor target (output reference) arguments.

required
stdext_streams dict[str, Stream | object]

Mapping from user hash to stream objects that are extra (not consumed directly by the processor).

required
owned_streams dict[str, Stream | object]

Mapping from user hash to stream objects owned by the local agent (a subset of the union of the three dicts above).

required
Note

Data samples stored in data_samples are also exposed through the stream_proxy under synthetic keys of the form "<data_sample_i>".

Source code in unaiverse/interaction.py
def set_manager(self,
                im: 'InteractionManager',
                stdin_streams: dict[str, Stream | object],
                stdtar_streams: dict[str, Stream | object],
                stdext_streams: dict[str, Stream | object],
                owned_streams: dict[str, Stream | object]) -> None:
    """Bind this interaction to its owning ``InteractionManager`` and initialize its stream references.

    Records the creation time (``timestamp_created`` and ``cycle_created``) from the
    global clock, stores the four stream dictionaries, and binds the ``stream_proxy``
    to the union of all provided streams plus any concrete data samples. This method
    is called by ``InteractionManager.register_sent`` and
    ``InteractionManager.__register_received_or_lazy`` immediately after the streams
    have been matched and validated.

    After this call the interaction's ``stream_proxy`` provides unified access to
    all attached streams and data samples.

    Args:
        im: The ``InteractionManager`` that owns this interaction.
        stdin_streams: Mapping from user hash to stream objects used as processor
            input arguments.
        stdtar_streams: Mapping from user hash to stream objects used as processor
            target (output reference) arguments.
        stdext_streams: Mapping from user hash to stream objects that are extra
            (not consumed directly by the processor).
        owned_streams: Mapping from user hash to stream objects owned by the local
            agent (a subset of the union of the three dicts above).

    Note:
        Data samples stored in ``data_samples`` are also exposed through the
        ``stream_proxy`` under synthetic keys of the form ``"<data_sample_i>"``.
    """
    self.__im = im
    self.timestamp_created = clock.get_time()
    self.cycle_created = clock.get_cycle()

    self.stdin_streams = stdin_streams
    self.stdtar_streams = stdtar_streams
    self.stdext_streams = stdext_streams
    self.owned_streams = owned_streams
    data_samples_dict = {"<data_sample_" + str(i) + ">": self.data_samples[i]
                         for i in range(0, len(self.data_samples))}
    self.stream_proxy.bind(self.stdin_streams | self.stdtar_streams | self.stdext_streams | self.owned_streams |
                           data_samples_dict, uuid=self.uuid)

reset_state

reset_state()

Reset the execution state of the interaction so it can be re-run from the beginning.

Clears the step index (set to -1, meaning no steps done yet), the starting-time record, and the timeout-baseline time. The interaction status and stream bindings are not touched, so the caller is responsible for setting the status back to a suitable value before re-scheduling the interaction.

Note

This is intended for internal framework use (e.g., when an action retries after a transient failure). User code should rarely need to call it directly.

Source code in unaiverse/interaction.py
def reset_state(self):
    """Reset the execution state of the interaction so it can be re-run from the beginning.

    Clears the step index (set to ``-1``, meaning no steps done yet), the
    starting-time record, and the timeout-baseline time. The interaction ``status``
    and stream bindings are not touched, so the caller is responsible for setting
    the status back to a suitable value before re-scheduling the interaction.

    Note:
        This is intended for internal framework use (e.g., when an action retries
        after a transient failure). User code should rarely need to call it directly.
    """
    self.__step_idx = -1
    self.__starting_time = 0.
    self.__timeout_starting_time = 0.

set_mark

set_mark(mark: object) -> None

Store an arbitrary marker object on the interaction.

The mark is a free-form annotation slot that action handlers can use to attach bookkeeping data to the interaction without adding new attributes. It is initialized to None and is not serialized by to_dict. Use get_mark to retrieve the value and clear_mark to remove it.

Parameters:

Name Type Description Default
mark object

Any Python object to associate with this interaction.

required
Source code in unaiverse/interaction.py
def set_mark(self, mark: object) -> None:
    """Store an arbitrary marker object on the interaction.

    The mark is a free-form annotation slot that action handlers can use to
    attach bookkeeping data to the interaction without adding new attributes.
    It is initialized to ``None`` and is not serialized by ``to_dict``.
    Use ``get_mark`` to retrieve the value and ``clear_mark`` to remove it.

    Args:
        mark: Any Python object to associate with this interaction.
    """
    self.__mark = mark

get_mark

get_mark() -> object

Return the marker object previously stored by set_mark.

Returns:

Type Description
object

The marker object, or None if no mark has been set or it was

object

cleared by clear_mark.

Source code in unaiverse/interaction.py
def get_mark(self) -> object:
    """Return the marker object previously stored by ``set_mark``.

    Returns:
        The marker object, or ``None`` if no mark has been set or it was
        cleared by ``clear_mark``.
    """
    return self.__mark

clear_mark

clear_mark() -> None

Remove the marker object previously stored by set_mark.

Sets the internal mark to None. Equivalent to calling set_mark(None).

Source code in unaiverse/interaction.py
def clear_mark(self) -> None:
    """Remove the marker object previously stored by ``set_mark``.

    Sets the internal mark to ``None``. Equivalent to calling
    ``set_mark(None)``.
    """
    self.__mark = None

get_step_idx

get_step_idx() -> int

Return the index of the last completed execution step.

The step index starts at -1 (no steps done yet) and is incremented by inc_step_idx each time the action completes a step. The final step index is num_steps - 1 for data-carrying interactions, or 0 for interactions with no data steps (num_steps < 0).

Returns:

Type Description
int

An integer step index. -1 means no steps have been executed yet.

Source code in unaiverse/interaction.py
def get_step_idx(self) -> int:
    """Return the index of the last completed execution step.

    The step index starts at ``-1`` (no steps done yet) and is incremented by
    ``inc_step_idx`` each time the action completes a step. The final step index
    is ``num_steps - 1`` for data-carrying interactions, or ``0`` for interactions
    with no data steps (``num_steps < 0``).

    Returns:
        An integer step index. ``-1`` means no steps have been executed yet.
    """
    return self.__step_idx

set_step_idx

set_step_idx(steps: int) -> None

Directly assign the current execution step index.

Prefer inc_step_idx for normal step advancement. Use this method only when the framework needs to restore or override the step counter (for example when retrying or replaying an interaction).

Parameters:

Name Type Description Default
steps int

The step index to assign. -1 means no steps done yet.

required
Source code in unaiverse/interaction.py
def set_step_idx(self, steps: int) -> None:
    """Directly assign the current execution step index.

    Prefer ``inc_step_idx`` for normal step advancement. Use this method only
    when the framework needs to restore or override the step counter (for example
    when retrying or replaying an interaction).

    Args:
        steps: The step index to assign. ``-1`` means no steps done yet.
    """
    self.__step_idx = steps

inc_step_idx

inc_step_idx() -> None

Increment the execution step index by one.

Called by the action handler after each successful step. The step index transitions from -1 to 0 on the first step, then increases until was_last_step_done returns True.

Note

The complementary dec_step_idx method decrements the counter by one and is available for special retry scenarios.

Source code in unaiverse/interaction.py
def inc_step_idx(self) -> None:
    """Increment the execution step index by one.

    Called by the action handler after each successful step. The step index
    transitions from ``-1`` to ``0`` on the first step, then increases until
    ``was_last_step_done`` returns ``True``.

    Note:
        The complementary ``dec_step_idx`` method decrements the counter by one
        and is available for special retry scenarios.
    """
    self.__step_idx += 1

dec_step_idx

dec_step_idx() -> None

Decrement the execution step index by one.

Reverses the effect of inc_step_idx. Intended for special retry scenarios where the framework needs to re-execute the last step (for example when a transient transmission error means the step result was not delivered and must be regenerated). Normal action handlers should rely on inc_step_idx instead.

Note

The complementary inc_step_idx method increments the counter by one and is the standard way to advance through interaction steps.

Source code in unaiverse/interaction.py
def dec_step_idx(self) -> None:
    """Decrement the execution step index by one.

    Reverses the effect of ``inc_step_idx``. Intended for special retry scenarios where the
    framework needs to re-execute the last step (for example when a transient transmission
    error means the step result was not delivered and must be regenerated). Normal action
    handlers should rely on ``inc_step_idx`` instead.

    Note:
        The complementary ``inc_step_idx`` method increments the counter by one and is the
        standard way to advance through interaction steps.
    """
    self.__step_idx -= 1

set_starting_time

set_starting_time(t: float) -> None

Record the wall-clock time at which execution of this interaction started.

The stored value is later read by is_timed_out to evaluate the global total_time limit of the associated action: if more than action_ref.get_total_time() seconds have elapsed since this timestamp, the interaction is considered timed out. Retrieve the stored value with get_starting_time.

Parameters:

Name Type Description Default
t float

Starting timestamp in seconds, as returned by time.perf_counter().

required
Note

This method is called by the action scheduling machinery and is not meant to be called from user code.

Source code in unaiverse/interaction.py
def set_starting_time(self, t: float) -> None:
    """Record the wall-clock time at which execution of this interaction started.

    The stored value is later read by ``is_timed_out`` to evaluate the global
    ``total_time`` limit of the associated action: if more than ``action_ref.get_total_time()``
    seconds have elapsed since this timestamp, the interaction is considered timed out.
    Retrieve the stored value with ``get_starting_time``.

    Args:
        t: Starting timestamp in seconds, as returned by ``time.perf_counter()``.

    Note:
        This method is called by the action scheduling machinery and is not meant to be
        called from user code.
    """
    self.__starting_time = t

set_timeout_starting_time

set_timeout_starting_time(t: float) -> None

Record the wall-clock time used as the baseline for per-step timeout checks.

The stored value is read by is_timed_out to evaluate the next-step retry timeout limit of the associated action: if more than action_ref.get_timeout() seconds have elapsed since this timestamp, the interaction is considered timed out at the step level. Retrieve the stored value with get_timeout_starting_time.

Parameters:

Name Type Description Default
t float

Timeout baseline timestamp in seconds, as returned by time.perf_counter(). A value of 0. or negative disables the per-step timeout check.

required
Note

This is distinct from the global starting_time (set by set_starting_time), which guards the total lifetime of the interaction. The per-step timeout resets each time the action scheduler records a new attempt, whereas the global timeout is set once when execution first begins.

Source code in unaiverse/interaction.py
def set_timeout_starting_time(self, t: float) -> None:
    """Record the wall-clock time used as the baseline for per-step timeout checks.

    The stored value is read by ``is_timed_out`` to evaluate the next-step retry timeout
    limit of the associated action: if more than ``action_ref.get_timeout()`` seconds have
    elapsed since this timestamp, the interaction is considered timed out at the step level.
    Retrieve the stored value with ``get_timeout_starting_time``.

    Args:
        t: Timeout baseline timestamp in seconds, as returned by ``time.perf_counter()``.
            A value of ``0.`` or negative disables the per-step timeout check.

    Note:
        This is distinct from the global ``starting_time`` (set by ``set_starting_time``),
        which guards the total lifetime of the interaction. The per-step timeout resets each
        time the action scheduler records a new attempt, whereas the global timeout is set
        once when execution first begins.
    """
    self.__timeout_starting_time = t

get_total_steps

get_total_steps() -> int

Return the total number of execution steps configured for this interaction.

The value mirrors the num_steps attribute, which is set at construction time and may be overridden to 1 when data_samples is provided. A value of -1 means the interaction carries no data-based steps (e.g. a pure control message). A positive integer indicates the exact number of steps the action handler must complete before was_last_step_done returns True.

Returns:

Type Description
int

The total step count. -1 for non-data-based interactions, 1 for

int

single-sample interactions, or a larger positive integer for multi-step ones.

Source code in unaiverse/interaction.py
def get_total_steps(self) -> int:
    """Return the total number of execution steps configured for this interaction.

    The value mirrors the ``num_steps`` attribute, which is set at construction time and
    may be overridden to ``1`` when ``data_samples`` is provided. A value of ``-1`` means
    the interaction carries no data-based steps (e.g. a pure control message). A positive
    integer indicates the exact number of steps the action handler must complete before
    ``was_last_step_done`` returns ``True``.

    Returns:
        The total step count. ``-1`` for non-data-based interactions, ``1`` for
        single-sample interactions, or a larger positive integer for multi-step ones.
    """
    return self.num_steps

get_starting_time

get_starting_time() -> float

Return the wall-clock timestamp at which execution of this interaction started.

The value is set by set_starting_time when the action scheduling machinery first begins executing this interaction. is_timed_out uses this timestamp to evaluate the global total_time limit defined by the associated action. A value of 0. means execution has not yet begun.

Returns:

Type Description
float

A float timestamp in seconds (time.perf_counter() origin). 0. if execution

float

has not started yet.

Source code in unaiverse/interaction.py
def get_starting_time(self) -> float:
    """Return the wall-clock timestamp at which execution of this interaction started.

    The value is set by ``set_starting_time`` when the action scheduling machinery first
    begins executing this interaction. ``is_timed_out`` uses this timestamp to evaluate the
    global ``total_time`` limit defined by the associated action. A value of ``0.`` means
    execution has not yet begun.

    Returns:
        A float timestamp in seconds (``time.perf_counter()`` origin). ``0.`` if execution
        has not started yet.
    """
    return self.__starting_time

get_timeout_starting_time

get_timeout_starting_time() -> float

Return the wall-clock timestamp used as the baseline for per-step timeout checks.

The value is set by set_timeout_starting_time each time the action scheduler records a new execution attempt. is_timed_out compares the elapsed time since this baseline against the per-step retry timeout defined by the associated action. A value of 0. means no per-step timeout baseline has been recorded yet.

Returns:

Type Description
float

A float timestamp in seconds (time.perf_counter() origin). 0. if no

float

per-step timeout baseline has been set yet.

Source code in unaiverse/interaction.py
def get_timeout_starting_time(self) -> float:
    """Return the wall-clock timestamp used as the baseline for per-step timeout checks.

    The value is set by ``set_timeout_starting_time`` each time the action scheduler records
    a new execution attempt. ``is_timed_out`` compares the elapsed time since this baseline
    against the per-step retry timeout defined by the associated action. A value of ``0.``
    means no per-step timeout baseline has been recorded yet.

    Returns:
        A float timestamp in seconds (``time.perf_counter()`` origin). ``0.`` if no
        per-step timeout baseline has been set yet.
    """
    return self.__timeout_starting_time

is_multi_steps

is_multi_steps() -> bool

Return True if this interaction requires more than one execution step.

An interaction is considered multi-step when num_steps is a plain integer greater than 1, OR when num_steps is a string (a wildcard placeholder such as "<eval_steps>" that will be resolved at runtime by the action handler before execution begins). Single-sample interactions (num_steps == 1) and data-free interactions (num_steps < 0) return False.

Returns:

Type Description
bool

True if the interaction is multi-step, False otherwise.

Note

The string case occurs when the behaviour JSON specifies a wildcard number of steps. The action scheduling machinery resolves such placeholders before the first step is attempted, at which point num_steps becomes a plain integer.

Source code in unaiverse/interaction.py
def is_multi_steps(self) -> bool:
    """Return ``True`` if this interaction requires more than one execution step.

    An interaction is considered multi-step when ``num_steps`` is a plain integer greater
    than ``1``, OR when ``num_steps`` is a string (a wildcard placeholder such as
    ``"<eval_steps>"`` that will be resolved at runtime by the action handler before
    execution begins). Single-sample interactions (``num_steps == 1``) and data-free
    interactions (``num_steps < 0``) return ``False``.

    Returns:
        ``True`` if the interaction is multi-step, ``False`` otherwise.

    Note:
        The string case occurs when the behaviour JSON specifies a wildcard number of
        steps. The action scheduling machinery resolves such placeholders before the
        first step is attempted, at which point ``num_steps`` becomes a plain integer.
    """

    # The first part of this check sounds weird, I know,
    # it happens if num_steps is specified as a wildcard like <eval_steps>
    return isinstance(self.num_steps, str) or self.num_steps > 1

is_single_step

is_single_step() -> bool

Return True if this interaction completes in a single execution step.

An interaction is single-step when num_steps equals 1 (exactly one data sample attached) or when num_steps is negative (a pure control or data-free interaction). In both cases was_last_step_done will return True after the first step increment.

Returns:

Type Description
bool

True if num_steps == 1 or num_steps < 0, False otherwise.

Source code in unaiverse/interaction.py
def is_single_step(self) -> bool:
    """Return ``True`` if this interaction completes in a single execution step.

    An interaction is single-step when ``num_steps`` equals ``1`` (exactly one data
    sample attached) or when ``num_steps`` is negative (a pure control or data-free
    interaction). In both cases ``was_last_step_done`` will return ``True`` after the
    first step increment.

    Returns:
        ``True`` if ``num_steps == 1`` or ``num_steps < 0``, ``False`` otherwise.
    """
    return self.num_steps == 1 or self.num_steps < 0  # Action with a single data sample or no data samples at all

is_valid

is_valid() -> bool

Return True if the interaction has been fully registered with its action.

An interaction is valid once the action has assigned both by_insertion_order_id (a global monotonic index among all interactions handled by the action) and by_requester_insertion_order_id (a per-requester monotonic index). Both start at -1 and are set to non-negative values during action registration.

Returns:

Type Description
bool

True if both by_insertion_order_id and by_requester_insertion_order_id

bool

are greater than or equal to 0.

Source code in unaiverse/interaction.py
def is_valid(self) -> bool:
    """Return ``True`` if the interaction has been fully registered with its action.

    An interaction is valid once the action has assigned both ``by_insertion_order_id``
    (a global monotonic index among all interactions handled by the action) and
    ``by_requester_insertion_order_id`` (a per-requester monotonic index). Both start at
    ``-1`` and are set to non-negative values during action registration.

    Returns:
        ``True`` if both ``by_insertion_order_id`` and ``by_requester_insertion_order_id``
        are greater than or equal to ``0``.
    """
    return self.by_insertion_order_id >= 0 and self.by_requester_insertion_order_id >= 0

is_system

is_system() -> bool

Return True if this interaction was generated by the framework itself.

System interactions are those whose requester equals Custom.SYSTEM_INTERACTION_LABEL. They are created internally by the UNaIVERSE framework (for example to drive lifecycle transitions) rather than by a remote peer. System interactions have no owning InteractionManager and are always considered doable as long as they are not yet completed (see check_if_doable).

Returns:

Type Description
bool

True if requester matches Custom.SYSTEM_INTERACTION_LABEL.

Source code in unaiverse/interaction.py
def is_system(self) -> bool:
    """Return ``True`` if this interaction was generated by the framework itself.

    System interactions are those whose ``requester`` equals
    ``Custom.SYSTEM_INTERACTION_LABEL``. They are created internally by the UNaIVERSE
    framework (for example to drive lifecycle transitions) rather than by a remote peer.
    System interactions have no owning ``InteractionManager`` and are always considered
    doable as long as they are not yet completed (see ``check_if_doable``).

    Returns:
        ``True`` if ``requester`` matches ``Custom.SYSTEM_INTERACTION_LABEL``.
    """
    return self.requester == Custom.SYSTEM_INTERACTION_LABEL

is_completed

is_completed() -> bool

Return True if this interaction has reached the COMPLETED status.

Equivalent to the completed property, provided as a regular method for contexts where a callable is required (for example when passing as a predicate). Once completed, completion_reason and destination_state are populated and the interaction is moved to the recently-completed set by the InteractionManager.

Returns:

Type Description
bool

True if status is InteractionStatus.COMPLETED.

Source code in unaiverse/interaction.py
def is_completed(self) -> bool:
    """Return ``True`` if this interaction has reached the ``COMPLETED`` status.

    Equivalent to the ``completed`` property, provided as a regular method for
    contexts where a callable is required (for example when passing as a predicate).
    Once completed, ``completion_reason`` and ``destination_state`` are populated and
    the interaction is moved to the recently-completed set by the
    ``InteractionManager``.

    Returns:
        ``True`` if ``status`` is ``InteractionStatus.COMPLETED``.
    """
    return self.status == InteractionStatus.COMPLETED

was_data_sent_after_completion

was_data_sent_after_completion() -> bool

Return True if data was generated and sent after this interaction completed.

The data_sent_after_completion flag is set by the agent's output machinery when it detects that a stream write occurred after the interaction's status transitioned to COMPLETED. This typically happens in edge cases where the action handler produces a final output sample concurrently with the completion event.

Returns:

Type Description
bool

True if data_sent_after_completion is set, False otherwise.

Note

This flag is informational and is used for logging and diagnostics. It does not affect the completion status of the interaction itself.

Source code in unaiverse/interaction.py
def was_data_sent_after_completion(self) -> bool:
    """Return ``True`` if data was generated and sent after this interaction completed.

    The ``data_sent_after_completion`` flag is set by the agent's output machinery when it
    detects that a stream write occurred after the interaction's status transitioned to
    ``COMPLETED``. This typically happens in edge cases where the action handler produces
    a final output sample concurrently with the completion event.

    Returns:
        ``True`` if ``data_sent_after_completion`` is set, ``False`` otherwise.

    Note:
        This flag is informational and is used for logging and diagnostics. It does not
        affect the completion status of the interaction itself.
    """
    return self.data_sent_after_completion

has_dummy_requester

has_dummy_requester() -> bool

Return True if no requester peer ID is set on this interaction.

A None requester indicates that the interaction was created without an explicit originating peer, for example as a placeholder or an internally generated trigger. This is distinct from a system interaction (is_system), where requester is set to Custom.SYSTEM_INTERACTION_LABEL rather than None.

Returns:

Type Description
bool

True if requester is None.

Source code in unaiverse/interaction.py
def has_dummy_requester(self) -> bool:
    """Return ``True`` if no requester peer ID is set on this interaction.

    A ``None`` requester indicates that the interaction was created without an explicit
    originating peer, for example as a placeholder or an internally generated trigger.
    This is distinct from a system interaction (``is_system``), where ``requester`` is
    set to ``Custom.SYSTEM_INTERACTION_LABEL`` rather than ``None``.

    Returns:
        ``True`` if ``requester`` is ``None``.
    """
    return self.requester is None

set_arg

set_arg(arg_name: str, arg_value: object) -> None

Set or overwrite a keyword argument in action_kwargs.

Inserts or replaces the entry arg_name in action_kwargs. Unlike alter_arg, this method always writes the value regardless of whether the key was previously present.

Parameters:

Name Type Description Default
arg_name str

The name of the argument to set.

required
arg_value object

The value to assign to the argument.

required

Examples:

>>> inter = Interaction(action_name="process", requester="peer_abc")
>>> inter.set_arg("mode", "fast")
>>> inter.get_arg("mode")
'fast'
Source code in unaiverse/interaction.py
def set_arg(self, arg_name: str, arg_value: object) -> None:
    """Set or overwrite a keyword argument in ``action_kwargs``.

    Inserts or replaces the entry ``arg_name`` in ``action_kwargs``. Unlike
    ``alter_arg``, this method always writes the value regardless of whether the key
    was previously present.

    Args:
        arg_name: The name of the argument to set.
        arg_value: The value to assign to the argument.

    Examples:
        >>> inter = Interaction(action_name="process", requester="peer_abc")
        >>> inter.set_arg("mode", "fast")
        >>> inter.get_arg("mode")
        'fast'
    """
    self.action_kwargs[arg_name] = arg_value

get_arg

get_arg(arg_name: str) -> object

Retrieve a keyword argument from action_kwargs by name.

Looks up arg_name in action_kwargs and returns the associated value. If the key is absent, returns None rather than raising KeyError.

Parameters:

Name Type Description Default
arg_name str

The name of the argument to retrieve.

required

Returns:

Type Description
object

The argument value, or None if arg_name is not present in

object

action_kwargs.

Examples:

>>> inter = Interaction(action_name="process", action_kwargs={"mode": "fast"},
...                     requester="peer_abc")
>>> inter.get_arg("mode")
'fast'
>>> inter.get_arg("missing") is None
True
Source code in unaiverse/interaction.py
def get_arg(self, arg_name: str) -> object:
    """Retrieve a keyword argument from ``action_kwargs`` by name.

    Looks up ``arg_name`` in ``action_kwargs`` and returns the associated value.
    If the key is absent, returns ``None`` rather than raising ``KeyError``.

    Args:
        arg_name: The name of the argument to retrieve.

    Returns:
        The argument value, or ``None`` if ``arg_name`` is not present in
        ``action_kwargs``.

    Examples:
        >>> inter = Interaction(action_name="process", action_kwargs={"mode": "fast"},
        ...                     requester="peer_abc")
        >>> inter.get_arg("mode")
        'fast'
        >>> inter.get_arg("missing") is None
        True
    """
    return self.action_kwargs[arg_name] if arg_name in self.action_kwargs else None

was_at_least_one_step_done

was_at_least_one_step_done() -> bool

Return True if at least one execution step has been completed.

The internal step index starts at -1 (no steps done). After the first successful call to inc_step_idx the index becomes 0 and this method returns True. Use get_step_idx to retrieve the exact index of the last completed step.

Returns:

Type Description
bool

True if the step index is greater than or equal to 0.

Source code in unaiverse/interaction.py
def was_at_least_one_step_done(self) -> bool:
    """Return ``True`` if at least one execution step has been completed.

    The internal step index starts at ``-1`` (no steps done). After the first successful
    call to ``inc_step_idx`` the index becomes ``0`` and this method returns ``True``.
    Use ``get_step_idx`` to retrieve the exact index of the last completed step.

    Returns:
        ``True`` if the step index is greater than or equal to ``0``.
    """
    return self.__step_idx >= 0

was_last_step_done

was_last_step_done() -> bool

Return True if all execution steps for this interaction have been completed.

The completion criterion depends on the step model:

  • For data-free interactions (num_steps < 0): returns True when the step index reaches 0, meaning a single "no-data" cycle was executed.
  • For data-carrying interactions (num_steps > 0): returns True when the step index equals num_steps - 1, meaning every data sample has been processed.

This is the primary predicate used by action handlers to decide whether to trigger completion via InteractionManager.complete or InteractionManager.complete_current.

Returns:

Type Description
bool

True if the interaction's final step has been executed, False otherwise.

Source code in unaiverse/interaction.py
def was_last_step_done(self) -> bool:
    """Return ``True`` if all execution steps for this interaction have been completed.

    The completion criterion depends on the step model:

    - For data-free interactions (``num_steps < 0``): returns ``True`` when the step
      index reaches ``0``, meaning a single "no-data" cycle was executed.
    - For data-carrying interactions (``num_steps > 0``): returns ``True`` when the step
      index equals ``num_steps - 1``, meaning every data sample has been processed.

    This is the primary predicate used by action handlers to decide whether to trigger
    completion via ``InteractionManager.complete`` or ``InteractionManager.complete_current``.

    Returns:
        ``True`` if the interaction's final step has been executed, ``False`` otherwise.
    """
    return ((self.num_steps < 0 and self.__step_idx == 0) or  # Action with no data (no steps)
            (self.num_steps > 0 and self.__step_idx == self.num_steps - 1))  # Action with one or more steps

is_delayed

is_delayed(starting_time: float)

Return True if the action associated with this interaction is still in its mandatory delay period.

The delay is defined on the action object (action_ref.get_delay()). If the delay is greater than zero and the elapsed time since starting_time has not yet exceeded that delay, the interaction cannot be executed and this method returns True. When no delay is configured (action_ref.get_delay() <= 0), this method always returns False.

Parameters:

Name Type Description Default
starting_time float

The time.perf_counter() timestamp at which the delay period began (typically the moment the scheduler last attempted to execute this interaction).

required

Returns:

Type Description

True if a delay is configured and the delay period has not yet elapsed;

False otherwise.

Note

action_ref must have been set via set_action_ref before this method is called; calling it on an interaction without a bound action will raise AttributeError.

Source code in unaiverse/interaction.py
def is_delayed(self, starting_time: float):
    """Return ``True`` if the action associated with this interaction is still in its mandatory delay period.

    The delay is defined on the action object (``action_ref.get_delay()``). If the delay is
    greater than zero and the elapsed time since ``starting_time`` has not yet exceeded that
    delay, the interaction cannot be executed and this method returns ``True``. When no delay
    is configured (``action_ref.get_delay() <= 0``), this method always returns ``False``.

    Args:
        starting_time: The ``time.perf_counter()`` timestamp at which the delay period began
            (typically the moment the scheduler last attempted to execute this interaction).

    Returns:
        ``True`` if a delay is configured and the delay period has not yet elapsed;
        ``False`` otherwise.

    Note:
        ``action_ref`` must have been set via ``set_action_ref`` before this method is
        called; calling it on an interaction without a bound action will raise
        ``AttributeError``.
    """
    return self.action_ref.get_delay() > 0 and (time.perf_counter() - starting_time) <= self.action_ref.get_delay()

is_timed_out

is_timed_out()

Return True if this interaction has exceeded its configured timeout limits.

Two independent timeout checks are performed in order:

  1. Global timeout (action_ref.get_total_time()): if more than total_time seconds have elapsed since __starting_time, the interaction is timed out regardless of per-step progress.
  2. Per-step timeout (action_ref.get_timeout()): if __timeout_starting_time is positive and more than timeout seconds have elapsed since that baseline, the interaction is timed out at the step level.

If __starting_time is zero or negative (execution has not started yet), this method immediately returns False without performing any check.

Returns:

Type Description

True if either timeout condition is exceeded, False otherwise.

Note

action_ref must have been set via set_action_ref before this method is called. Timeout events are logged at the inter level when they fire.

Source code in unaiverse/interaction.py
def is_timed_out(self):
    """Return ``True`` if this interaction has exceeded its configured timeout limits.

    Two independent timeout checks are performed in order:

    1. **Global timeout** (``action_ref.get_total_time()``): if more than
       ``total_time`` seconds have elapsed since ``__starting_time``, the interaction
       is timed out regardless of per-step progress.
    2. **Per-step timeout** (``action_ref.get_timeout()``): if ``__timeout_starting_time``
       is positive and more than ``timeout`` seconds have elapsed since that baseline, the
       interaction is timed out at the step level.

    If ``__starting_time`` is zero or negative (execution has not started yet), this method
    immediately returns ``False`` without performing any check.

    Returns:
        ``True`` if either timeout condition is exceeded, ``False`` otherwise.

    Note:
        ``action_ref`` must have been set via ``set_action_ref`` before this method is
        called. Timeout events are logged at the ``inter`` level when they fire.
    """

    # If the action was never started (even a failed attempt), this method has no sense
    if self.__starting_time <= 0.:
        return False

    # Checking global retry_timeout: if too much time passed, no matter if the action started or not, it's retry_timeout!
    if self.action_ref.get_total_time() > 0:
        if self.action_ref.get_total_time() <= (time.perf_counter() - self.__starting_time):
            log.inter(f"Timeout for {self.action_name}! "
                      f"({(time.perf_counter() - self.__starting_time)}/"
                      f"{self.action_ref.get_total_time()})!")
            return True
        else:
            log.debug(f"Running retry_timeout for {self.action_name}! "
                      f"({(time.perf_counter() - self.__starting_time)}/"
                      f"{self.action_ref.get_total_time()})!")

    # Checking next-step retry_timeout
    if self.__timeout_starting_time > 0. and self.action_ref.get_timeout() > 0:
        if self.action_ref.get_timeout() <= (time.perf_counter() - self.__timeout_starting_time):
            log.inter(
                f"Hot retry_timeout for {self.action_name}! "
                f"({(time.perf_counter() - self.__timeout_starting_time)}/"
                f"{self.action_ref.get_timeout()})!")
            return True
        else:
            log.debug(f"Running hot retry_timeout for {self.action_name} "
                      f"({(time.perf_counter() - self.__timeout_starting_time)}/"
                      f"{self.action_ref.get_timeout()})!")
            return False
    else:
        return False

set_action_ref

set_action_ref(action_ref: object) -> None

Bind the interaction to its Action object and validate/augment its argument list.

Parameters:

Name Type Description Default
action_ref object

The Action instance that will execute this interaction.

required
Source code in unaiverse/interaction.py
def set_action_ref(self, action_ref: object) -> None:
    """Bind the interaction to its Action object and validate/augment its argument list.

    Args:
        action_ref: The Action instance that will execute this interaction.
    """
    self.action_ref = action_ref

    # Augmenting argument list, by fusing with the arguments defined in the HSM
    action_kwargs = self.action_kwargs if self.action_kwargs is not None else {}
    self.action_ref.check_provided_args(action_kwargs, exception=True)

    # Force a default retry_timeout on multistep actions, to avoid infinite trials
    if self.is_multi_steps() and self.action_ref.get_timeout() <= 0:
        self.action_ref.set_default_timeout()

has_stream

has_stream(user_hash: str) -> bool

Return True if a stream with the given user hash is already registered.

Parameters:

Name Type Description Default
user_hash str

The stream's user-level hash identifier.

required
Source code in unaiverse/interaction.py
def has_stream(self, user_hash: str) -> bool:
    """Return True if a stream with the given user hash is already registered.

    Args:
        user_hash: The stream's user-level hash identifier.
    """
    return user_hash in self.stream_proxy or user_hash in self.lazy_streams

add_lazy_stream

add_lazy_stream(user_hash: str, stream_obj: Stream, is_owned: bool = False) -> None

Lazily attach an additional stream to this interaction after it was created.

Parameters:

Name Type Description Default
user_hash str

The stream's user-level hash identifier.

required
stream_obj Stream

The stream object to attach.

required
is_owned bool

Whether the stream is owned by the current agent.

False
Source code in unaiverse/interaction.py
def add_lazy_stream(self, user_hash: str, stream_obj: Stream, is_owned: bool = False) -> None:
    """Lazily attach an additional stream to this interaction after it was created.

    Args:
        user_hash: The stream's user-level hash identifier.
        stream_obj: The stream object to attach.
        is_owned: Whether the stream is owned by the current agent.
    """
    if not self.has_stream(user_hash):
        self.lazy_streams[user_hash] = stream_obj
        if is_owned:
            self.owned_streams[user_hash] = stream_obj
        self.stream_proxy.add_new_bind(user_hash, stream_obj)

mark_running

mark_running() -> None

Mark this interaction as currently running.

Source code in unaiverse/interaction.py
def mark_running(self) -> None:
    """Mark this interaction as currently running."""
    self.status = InteractionStatus.RUNNING
    if self.timestamp_started <= 0:
        self.timestamp_started = clock.get_time()
        self.cycle_started = clock.get_cycle()

mark_paused

mark_paused() -> None

Mark this interaction as currently paused.

Source code in unaiverse/interaction.py
def mark_paused(self) -> None:
    """Mark this interaction as currently paused."""
    self.status = InteractionStatus.PAUSED

mark_completed

mark_completed(reason: CompletionReason, dest_state: str | None = None, target: str | None = None) -> None

Mark this interaction as completed.

Parameters:

Name Type Description Default
reason CompletionReason

The reason for completion.

required
dest_state str | None

The destination state reached by completing this action.

None
target str | None

The target agent who completed this interaction (Default: None).

None
Source code in unaiverse/interaction.py
def mark_completed(self, reason: 'CompletionReason', dest_state: str | None = None,
                   target: str | None = None) -> None:
    """Mark this interaction as completed.

    Args:
        reason: The reason for completion.
        dest_state: The destination state reached by completing this action.
        target: The target agent who completed this interaction (Default: None).
    """
    if target is None:
        self.destination_state = dest_state
        self.completion_reason = reason
        self.timestamp_completed = clock.get_time()
        self.cycle_completed = clock.get_cycle()
        self.status = InteractionStatus.COMPLETED

        _time = clock.get_time()
        _cycle = clock.get_cycle()
        for i, tar in enumerate(self.target):
            if self.target_completion_reason[i] is None:
                self.target_completion_reason[i] = reason  # The completion reason of the whole interaction
                self.target_timestamp_completed[i] = _time
                self.target_cycle_completed[i] = _cycle
    else:
        num_completed = 0
        _time = clock.get_time()
        _cycle = clock.get_cycle()
        for i, tar in enumerate(self.target):
            if tar == target:
                self.target_destination_state[i] = dest_state
                self.target_completion_reason[i] = reason
                self.target_timestamp_completed[i] = _time
                self.target_cycle_completed[i] = _cycle
            if self.target_completion_reason[i] is not None:
                num_completed += 1
        if num_completed == len(self.target):
            self.mark_completed(reason)  # The completion reason of the last target who completed

clear_from_action

clear_from_action() -> None

Deregister this interaction from its action's interaction list.

Source code in unaiverse/interaction.py
def clear_from_action(self) -> None:
    """Deregister this interaction from its action's interaction list."""

    # Clearing interaction from action list
    if self.action_ref is not None:
        self.action_ref.get_list_of_interactions().remove(self)

is_expired

is_expired(timeout_secs: float | None = None) -> bool

Check if this interaction has expired based on an external retry_timeout.

Parameters:

Name Type Description Default
timeout_secs float | None

Maximum higher-priority retry_timeout in seconds (default: None).

None

Returns:

Type Description
bool

True if the interaction is older than timeout_secs.

Source code in unaiverse/interaction.py
def is_expired(self, timeout_secs: float | None = None) -> bool:
    """Check if this interaction has expired based on an external retry_timeout.

    Args:
        timeout_secs: Maximum higher-priority retry_timeout in seconds (default: None).

    Returns:
        True if the interaction is older than timeout_secs.
    """

    # Deciding the real value of timeout_specs, also in function of the interaction-specific retry_timeout (if given)
    if timeout_secs is not None and timeout_secs > 0. and self.timeout is not None and self.timeout > 0.:
        timeout_secs = min(timeout_secs, self.timeout)
    elif timeout_secs is None or timeout_secs <= 0.:
        timeout_secs = self.timeout  # It could still be None or < 0.

    if timeout_secs is None or timeout_secs <= 0.:  # Perpetual interaction
        return False
    else:
        return (clock.get_time() - self.timestamp_created) >= timeout_secs

get_new_stream_data_tags

get_new_stream_data_tags(all_fresh_or_fail: bool = False) -> dict[str, list[int]] | None

Checks all streams involved in this interaction, and if they have fresh data it returns their tags.

Parameters:

Name Type Description Default
all_fresh_or_fail bool

If True, return None if there is at least one stream with no fresh data (Default: False).

False

Returns:

Type Description
dict[str, list[int]] | None

A dictionary "stream user hash to data tag", only involving streams with fresh data. It can return None if the all_fresh_or_fail option was turned on.

Source code in unaiverse/interaction.py
def get_new_stream_data_tags(self, all_fresh_or_fail: bool = False) -> dict[str, list[int]] | None:
    """Checks all streams involved in this interaction, and if they have fresh data it returns their tags.

    Args:
        all_fresh_or_fail: If True, return None if there is at least one stream with no fresh data (Default: False).

    Returns:
        A dictionary "stream user hash to data tag", only involving streams with fresh data. It can return None if
            the all_fresh_or_fail option was turned on.
    """
    data_tags = {}
    for stream_user_hash, stream_obj in self.stream_proxy.items():

        # Skipping data samples and default input values
        if not isinstance(stream_obj, Stream):
            continue

        if (stream_user_hash in self.data_tags and
                len(self.data_tags[stream_user_hash]) > 0):
            last_handled_data_tag = self.data_tags[stream_user_hash][-1]
        else:
            last_handled_data_tag = None

        # Check if stream has data with a tag not yet consumed by this interaction
        current_tag = stream_obj.get_tag(self.uuid)

        if last_handled_data_tag is not None and current_tag == last_handled_data_tag:
            if all_fresh_or_fail:
                return None
            else:
                continue

        # Storing
        data_tags[stream_user_hash] = [current_tag]
    return data_tags

record_data_tags

record_data_tags(data_tags: dict[str, list[int]] | None = None, target: str | None = None)

Record that data with the given tag was received for this interaction.

Parameters:

Name Type Description Default
data_tags dict[str, list[int]] | None

The data tags/sequence numbers.

None
target str | None

The agent who handled the data tags (if None, then it's the current agent).

None
Source code in unaiverse/interaction.py
def record_data_tags(self, data_tags: dict[str, list[int]] | None = None, target: str | None = None):
    """Record that data with the given tag was received for this interaction.

    Args:
        data_tags: The data tags/sequence numbers.
        target: The agent who handled the data tags (if None, then it's the current agent).
    """
    if data_tags is None:
        data_tags: dict[str, list[int]] = self.get_new_stream_data_tags()

    if len(data_tags) == 0:
        return

    if target is None:
        handled_data_tags = self.data_tags
    else:
        if target in self.target:
            i = self.target.index(target)
            handled_data_tags = self.target_data_tags[i]
        else:
            log.error(f"Invalid target {target} specified to record data tags: "
                      f"it does not exist in the list of targets ({self.target}) of "
                      f"interaction with UUID {self.uuid}")
            return

    _time = clock.get_time()
    for stream_name, list_of_data_tag in data_tags.items():
        if stream_name not in handled_data_tags:
            handled_data_tags[stream_name] = []
        handled_data_tags[stream_name] += list_of_data_tag

check_if_doable

check_if_doable() -> bool

Return True if the interaction can currently be executed.

Delegates to the owning InteractionManager when present; for system interactions without a manager, returns True as long as the interaction is not yet completed.

Source code in unaiverse/interaction.py
def check_if_doable(self) -> bool:
    """Return True if the interaction can currently be executed.

    Delegates to the owning InteractionManager when present; for system interactions without a manager,
    returns True as long as the interaction is not yet completed.
    """
    if self.__im is not None:
        return self.__im.check_if_doable(self)
    else:
        return not self.completed  # System interactions does not have an interaction manager

alter_arg

alter_arg(arg_name: str, arg_value: object) -> bool

Overwrite an existing action keyword argument, leaving the dict unchanged if the key is absent.

Parameters:

Name Type Description Default
arg_name str

The argument name to update.

required
arg_value object

The new value to assign.

required

Returns:

Type Description
bool

True if the argument existed and was updated; False otherwise.

Source code in unaiverse/interaction.py
def alter_arg(self, arg_name: str, arg_value: object) -> bool:
    """Overwrite an existing action keyword argument, leaving the dict unchanged if the key is absent.

    Args:
        arg_name: The argument name to update.
        arg_value: The new value to assign.

    Returns:
        True if the argument existed and was updated; False otherwise.
    """
    if arg_name in self.action_kwargs:
        self.action_kwargs[arg_name] = arg_value
        return True
    else:
        return False

to_dict

to_dict() -> dict

Serialize this interaction for network transmission.

Returns:

Type Description
dict

A dictionary representation of this interaction.

Source code in unaiverse/interaction.py
def to_dict(self) -> dict:
    """Serialize this interaction for network transmission.

    Returns:
        A dictionary representation of this interaction.
    """
    return {
        'id': self.id,
        'uuid': self.uuid,
        'action_name': self.action_name,
        'requester': self.requester,
        'target': self.target,
        'action_kwargs': self.action_kwargs,
        'streams': self.streams,
        'data_samples': serialize_payload(self.data_samples),
        'num_steps': self.num_steps,
        'completion_reason': self.completion_reason.value if self.completion_reason else None,
        'destination_state': self.destination_state,
        'from_state': self.from_state,
        'to_state': self.to_state,
        'retry_timeout': self.timeout,
        'volatile': self.volatile,
        'status': self.status.value
    }

from_dict classmethod

from_dict(d: dict) -> Interaction

Deserialize an Interaction from a dictionary.

Parameters:

Name Type Description Default
d dict

Dictionary from to_dict().

required

Returns:

Type Description
Interaction

A new Interaction instance.

Source code in unaiverse/interaction.py
@classmethod
def from_dict(cls, d: dict) -> 'Interaction':
    """Deserialize an Interaction from a dictionary.

    Args:
        d: Dictionary from to_dict().

    Returns:
        A new Interaction instance.
    """
    interaction = cls(
        id=d['id'],
        action_name=d['action_name'],
        action_kwargs=d.get('action_kwargs', {}),
        streams=d.get('streams', {}),
        data_samples=d.get('data_samples', None),
        num_steps=d.get('num_steps'),
        requester=d.get('requester'),
        target=d.get('target'),
        from_state=d.get('from_state'),
        to_state=d.get('to_state'),
        volatile=d.get('volatile'),
        timeout=d.get('retry_timeout', -1.),
    )
    interaction.uuid = d['uuid']
    interaction.status = InteractionStatus(d['status'])
    if d.get('completion_reason'):
        interaction.completion_reason = CompletionReason(d['completion_reason'])
    interaction.destination_state = d.get('destination_state')
    return interaction

to_status_dict

to_status_dict(status_generated_by: str) -> dict

Build a minimal status dict for status update messages.

Parameters:

Name Type Description Default
status_generated_by str

The agent who generate this dict.

required

Returns:

Type Description
dict

A dictionary with status_generated_by, uuid, status, completion_reason, destination_state, and data_tag.

Source code in unaiverse/interaction.py
def to_status_dict(self, status_generated_by: str) -> dict:
    """Build a minimal status dict for status update messages.

    Args:
        status_generated_by: The agent who generate this dict.

    Returns:
        A dictionary with status_generated_by, uuid, status, completion_reason, destination_state, and data_tag.
    """
    return {
        'status_generated_by': status_generated_by,
        'uuid': self.uuid,
        'status': self.status.value,
        'completion_reason': self.completion_reason.value if self.completion_reason else None,
        'destination_state': self.destination_state,
        'data_tags': self.data_tags if len(self.data_tags) > 0 else None,
    }

to_code_str

to_code_str(include_id: bool = False, include_received_tags: bool = False) -> str

Return a compact single-line representation for logs and debugging.

Parameters:

Name Type Description Default
include_id bool

Prepend the interaction UUID to the output when True.

False
include_received_tags bool

Append the received data tags when True.

False

Returns:

Type Description
str

A compact string describing the interaction.

Source code in unaiverse/interaction.py
def to_code_str(self, include_id: bool = False, include_received_tags: bool = False) -> str:
    """Return a compact single-line representation for logs and debugging.

    Args:
        include_id: Prepend the interaction UUID to the output when True.
        include_received_tags: Append the received data tags when True.

    Returns:
        A compact string describing the interaction.
    """
    s = ""
    t: list[str] = self.target

    if include_id:
        s = f"{self.uuid} => "
    if include_received_tags:
        t: list[str] = []
        for i, tar in enumerate(self.target):
            all_tags = [str(num) for sublist in self.target_data_tags[i].values() for num in sublist]
            if all_tags:
                t.append(f"{tar}_(" + ",".join(all_tags) + ")")
            else:
                t.append(f"{tar}")

    return s + (f"artss:{self.action_name}|{self.requester}|{t}|"
                f"{self.stream_proxy}|{self.status.value[0:3]}" +
                (("_" + self.completion_reason.value[0:3]) if self.completion_reason is not None else ""))

to_str

to_str() -> str

Return a JSON-encoded string with the core interaction fields for logging.

Returns:

Type Description
str

A JSON string containing requester, action_kwargs, timestamp_created, and uuid.

Source code in unaiverse/interaction.py
def to_str(self) -> str:
    """Return a JSON-encoded string with the core interaction fields for logging.

    Returns:
        A JSON string containing requester, action_kwargs, timestamp_created, and uuid.
    """
    return json.dumps([self.requester, self.action_kwargs, self.timestamp_created, self.uuid])

parse_streams

parse_streams(streams: list | dict)

Parse the list of streams provided by the user, standardizing its format and saving it.

The user can specify streams in these two ways, where "stream_hash_j" can be just the stream name, the stream group, the net hash, or the user hash. (1) [..., stream_hash_j (str), ...] (2) { "stdin": [..., stream_hash_j (str), ...], "stdtar": [..., stream_hash_k (str), ...], "stdext": [..., stream_hash_z (str), ...], "stdunk": [..., stream_hash_z (str), ...], }

The last one (2) optionally specifies if the stream is expected to become an input of the processor
('stdin'), a target ('stdtar'), and extra stream ('stdext'), or no preferences (None).
Notice that this choice could be re-arranged by the Interaction Manager, in function of the actual
capabilities of the processor.

Parameters:

Name Type Description Default
streams list | dict

The list or dict of streams (see the example above).

required
Source code in unaiverse/interaction.py
def parse_streams(self, streams: list | dict):
    """Parse the list of streams provided by the user, standardizing its format and saving it.

    The user can specify streams in these two ways, where "stream_hash_j" can be just the stream name, the stream
    group, the net hash, or the user hash.
        (1) [..., stream_hash_j (str), ...]
        (2) {
                "stdin": [..., stream_hash_j (str), ...],
                "stdtar": [..., stream_hash_k (str), ...],
                "stdext": [..., stream_hash_z (str), ...],
                "stdunk": [..., stream_hash_z (str), ...],
            }

        The last one (2) optionally specifies if the stream is expected to become an input of the processor
        ('stdin'), a target ('stdtar'), and extra stream ('stdext'), or no preferences (None).
        Notice that this choice could be re-arranged by the Interaction Manager, in function of the actual
        capabilities of the processor.

    Args:
        streams (list | dict): The list or dict of streams (see the example above).
    """
    invalid_msg = f'Invalid syntax for streams involved in an interaction: {streams}'
    self.streams = {"stdin": [], "stdtar": [], "stdext": [], "stdunk": []}

    if isinstance(streams, list):
        for stream in streams:
            if not isinstance(stream, str):
                raise GenException(invalid_msg)
            self.streams["stdunk"].append(stream)
    elif isinstance(streams, dict):
        for redirect, streams_list in streams.items():
            if redirect not in self.streams:
                raise GenException(invalid_msg)
            for stream in streams_list:
                if not isinstance(stream, str):
                    raise GenException(invalid_msg)
            self.streams[redirect] = list(streams_list)  # Shallow copy (that's why we have "list(...)" - NEEDED!)
    else:
        raise GenException(invalid_msg)

InteractionManager

InteractionManager(agent: object, max_interactions: int = -1)

Manages all interactions for an agent.

Responsibilities: - Maintains lists of sent and received interactions - Sets stdin/stdout to the streams of the current interaction - Sets the recipients of the generation - Clears expired interactions at every clock cycle - Sends back interaction status - Waits for all involved streams to have sent data before marking action ready

Create an InteractionManager.

Parameters:

Name Type Description Default
agent object

Back-reference to the owning agent (AgentBasics instance).

required
max_interactions int

Maximum number of simultaneously tracked interactions (sent + received + lazy).

-1
Source code in unaiverse/interaction.py
def __init__(self, agent: object, max_interactions: int = -1):
    """Create an InteractionManager.

    Args:
        agent: Back-reference to the owning agent (AgentBasics instance).
        max_interactions: Maximum number of simultaneously tracked interactions (sent + received + lazy).
    """
    self.agent = agent
    self.max_interactions = max_interactions if max_interactions > 0 else Custom.MAX_INTERACTIONS
    self.sent: dict[str, Interaction] = {}       # uuid -> Interaction (sent by this agent)
    self.received: dict[str, Interaction] = {}   # uuid -> Interaction (received from others)
    self.lazy: dict[str, Interaction] = {}   # uuid -> Interaction (added by you)
    self.current: Interaction | None = None      # Currently executing interaction
    self.last_registered: Interaction | None = None
    self.sent_recently_completed: set[Interaction] = set()  # Completed interactions
    self.received_recently_completed: set[Interaction] = set()  # Completed interactions
    self.lazy_recently_completed: set[Interaction] = set()  # Completed interactions

agent instance-attribute

agent = agent

max_interactions instance-attribute

max_interactions = max_interactions if max_interactions > 0 else MAX_INTERACTIONS

sent instance-attribute

sent: dict[str, Interaction] = {}

received instance-attribute

received: dict[str, Interaction] = {}

lazy instance-attribute

lazy: dict[str, Interaction] = {}

current instance-attribute

current: Interaction | None = None

last_registered instance-attribute

last_registered: Interaction | None = None

sent_recently_completed instance-attribute

sent_recently_completed: set[Interaction] = set()

received_recently_completed instance-attribute

received_recently_completed: set[Interaction] = set()

lazy_recently_completed instance-attribute

lazy_recently_completed: set[Interaction] = set()

room_for_registration

room_for_registration() -> bool

Return True if there is capacity to register another interaction.

Source code in unaiverse/interaction.py
def room_for_registration(self) -> bool:
    """Return True if there is capacity to register another interaction."""
    return len(self.sent) + len(self.received) + len(self.lazy) < self.max_interactions

count_interactions

count_interactions() -> int

Return the total number of tracked interactions, including recently completed ones.

Source code in unaiverse/interaction.py
def count_interactions(self) -> int:
    """Return the total number of tracked interactions, including recently completed ones."""
    return (len(self.sent) + len(self.received) + len(self.lazy) +
            len(self.sent_recently_completed) + len(self.received_recently_completed) +
            len(self.lazy_recently_completed))

clear_from_all_streams

clear_from_all_streams(interaction: Interaction, clear_data: bool = False) -> None

Remove the given interaction from every stream that references it.

Parameters:

Name Type Description Default
interaction Interaction

The interaction to deregister from all the known streams.

required
clear_data bool

Default False. Forces the removal of the stream data associated to this interaction.

False
Source code in unaiverse/interaction.py
def clear_from_all_streams(self, interaction: Interaction, clear_data: bool = False) -> None:
    """Remove the given interaction from every stream that references it.

    Args:
        interaction: The interaction to deregister from all the known streams.
        clear_data: Default False. Forces the removal of the stream data associated to this interaction.
    """
    for stream_obj in self.agent.known_streams_by_user_hash.values():
        if stream_obj.has_interaction(interaction.uuid):
            stream_obj.remove_interaction(interaction)
            if clear_data:
                stream_obj.remove_data(interaction.uuid)

unregister_all

unregister_all()

Deregister all current interactions

Source code in unaiverse/interaction.py
def unregister_all(self):
    """Deregister all current interactions"""
    all_interactions = (set(self.sent.values()) | set(self.received.values()) | set(self.lazy.values()))
    for interaction in all_interactions:
        self.unregister(interaction)

unregister

unregister(interaction: Interaction) -> bool

Deregister an interaction from all tracking dicts and dissociate it from streams and actions.

Parameters:

Name Type Description Default
interaction Interaction

The interaction to remove.

required

Returns:

Type Description
bool

True if the interaction was found and removed from at least one tracking dict.

Source code in unaiverse/interaction.py
def unregister(self, interaction: Interaction) -> bool:
    """Deregister an interaction from all tracking dicts and dissociate it from streams and actions.

    Args:
        interaction: The interaction to remove.

    Returns:
        True if the interaction was found and removed from at least one tracking dict.
    """
    if interaction.status == InteractionStatus.RUNNING:  # Avoid de-registering a currently running interaction
        return False

    interaction.clear_from_action()
    self.clear_from_all_streams(interaction)
    found = False
    if interaction.uuid in self.sent and self.sent[interaction.uuid].requester == interaction.requester:
        del self.sent[interaction.uuid]
        found = True
    if interaction.uuid in self.received and self.received[interaction.uuid].requester == interaction.requester:
        del self.received[interaction.uuid]
        found = True
    if interaction.uuid in self.lazy and self.lazy[interaction.uuid].requester == interaction.requester:
        del self.lazy[interaction.uuid]
        found = True
    return found

register_sent

register_sent(interaction: Interaction, public: bool) -> bool

Register an interaction that this agent has sent.

Parameters:

Name Type Description Default
interaction Interaction

The Interaction to register.

required
public bool

Whether the interaction is about a public agent or a private/world one.

required

Returns:

Type Description
bool

True if registration succeeded; False if there is no room or the streams are invalid.

Source code in unaiverse/interaction.py
def register_sent(self, interaction: Interaction, public: bool) -> bool:
    """Register an interaction that this agent has sent.

    Args:
        interaction: The Interaction to register.
        public: Whether the interaction is about a public agent or a private/world one.

    Returns:
        True if registration succeeded; False if there is no room or the streams are invalid.
    """
    self.__check_callback_method(interaction)
    if not self.room_for_registration():
        log.error(f"No more room for interactions (limit: {self.max_interactions})")
        return False

    if not InteractionManager.check_consistency_between_action_kwargs_and_interaction_fields(interaction):
        return False  # The error message is printed inside the method

    # This will resolve target names, data samples, stream names
    if not self.resolve(interaction, public):
        return False

    #  Ensuring all the streams mentioned in the interaction are known, and normalizing them
    _, expanded_owned_streams = self.expand_and_normalize_streams(interaction)
    if expanded_owned_streams is None:
        log.error(f"Invalid stream field in interaction: {interaction}")
        return False

    # Converting format for owned streams (no matter what the matching routine decided)
    owned_user_hashes_to_stream_objs = {_stream_user_hash: self.agent.known_streams_by_user_hash[_stream_user_hash]
                                        for _streams_list in expanded_owned_streams.values()
                                        for _stream_user_hash in _streams_list}

    for stream_obj in owned_user_hashes_to_stream_objs.values():
        stream_obj.add_interaction(interaction)

    interaction.set_manager(self, stdin_streams={}, stdtar_streams={}, stdext_streams={},
                            owned_streams=owned_user_hashes_to_stream_objs)

    # No matter what the interaction does: the processor output streams of the target be aware of the possibility
    # that this interaction might yield new data
    for target_agent in interaction.target:
        streams_of_target_agent = self.agent.find_streams(target_agent, 'processor', discard_owned=True)
        for stream_dict in streams_of_target_agent.values():
            for stream in stream_dict.values():
                stream.add_interaction(interaction)

    # Registering
    self.sent[interaction.uuid] = interaction

    interaction.status = InteractionStatus.REQUESTED
    interaction.type = InteractionType.SENT
    self.last_registered = interaction
    return True

register_received

register_received(interaction: Interaction, public: bool) -> bool

Register an interaction received from another agent.

Parameters:

Name Type Description Default
interaction Interaction

The Interaction to register.

required
public bool

Whether the interaction is about a public agent or a private/world one.

required

Returns:

Type Description
bool

True if registration succeeded; False if there is no room, streams are invalid, or matching failed.

Source code in unaiverse/interaction.py
def register_received(self, interaction: Interaction, public: bool) -> bool:
    """Register an interaction received from another agent.

    Args:
        interaction: The Interaction to register.
        public: Whether the interaction is about a public agent or a private/world one.

    Returns:
        True if registration succeeded; False if there is no room, streams are invalid, or matching failed.
    """
    if self.__register_received_or_lazy(interaction, public):
        self.received[interaction.uuid] = interaction

        interaction.status = InteractionStatus.RECEIVED
        interaction.type = InteractionType.RECEIVED
        return True
    else:
        return False

register_lazy

register_lazy(interaction: Interaction, public: bool) -> bool

Register an interaction that you manually generated within this agent.

Parameters:

Name Type Description Default
interaction Interaction

The Interaction to register.

required
public bool

Whether the interaction is about a public agent or a private/world one.

required

Returns:

Type Description
bool

True if registration succeeded; False if there is no room or the streams are invalid.

Source code in unaiverse/interaction.py
def register_lazy(self, interaction: Interaction, public: bool) -> bool:
    """Register an interaction that you manually generated within this agent.

    Args:
        interaction: The Interaction to register.
        public: Whether the interaction is about a public agent or a private/world one.

    Returns:
        True if registration succeeded; False if there is no room or the streams are invalid.
    """
    if self.__register_received_or_lazy(interaction, public):
        self.lazy[interaction.uuid] = interaction

        interaction.status = InteractionStatus.LAZY
        interaction.type = InteractionType.LAZY
        return True
    else:
        return False

resolve

resolve(interaction: Interaction, public: bool, resolve_target: bool = True) -> bool
Source code in unaiverse/interaction.py
def resolve(self, interaction: Interaction, public: bool, resolve_target: bool = True) -> bool:

    # A. Resolving targets
    if resolve_target:
        target = interaction.target
        if target is not None:
            for i, _target in enumerate(target):
                _target = self.agent.resolve_agent_ref(_target)
                if _target is not None:
                    target[i] = _target
                else:
                    log.error(f"Unknown target specified in an interaction ({_target})")
                    return False

    # B. Moving data samples to streams, if needed (when data_samples is a dictionary)
    # In this case, "streams" must be None (if not None, then the data_samples field is ignored)
    already_resolved = False
    if isinstance(interaction.data_samples, dict):

        # This is already an empty list (otherwise the data_samples field would not be there), but better
        # be extra safe
        streams = []
        for stream_hash, data_sample in interaction.data_samples.items():
            stream_user_hash_or_net_hash = self.agent.resolve_stream_ref(stream_hash, public)
            if Stream.is_net_hash(stream_user_hash_or_net_hash):
                log.error("When using a stream name in the data sample field of an interaction, "
                          "it MUST be provided in the form of a stream hash or something that can be unambiguously "
                          "resolved to a stream hash")
                return False
            if stream_user_hash_or_net_hash is None:
                log.error(f"Unknown stream hash specified in an interaction ({stream_hash})")
                return False
            stream_user_hash = stream_user_hash_or_net_hash
            stream: Stream = self.agent.known_streams_by_user_hash[stream_user_hash]
            stream.set(data_sample, uuid=interaction.uuid)

            # Appending the stream to the interaction, that now looks like a stream-based interaction
            streams.append(stream_user_hash)

        # Purging
        interaction.data_samples.clear()
        interaction.parse_streams(streams)  # Rebuilding the stream dictionaries
        already_resolved = True

    # C. Resolving streams
    streams = interaction.streams
    if streams is not None and not already_resolved:
        for redirect, streams_list in streams.items():
            for j, stream_hash in enumerate(streams_list):
                stream_user_hash_or_net_hash = self.agent.resolve_stream_ref(stream_hash, public)
                if stream_user_hash_or_net_hash is None:
                    log.error(f"Unknown stream hash specified in an interaction ({stream_hash})")
                    return False
                streams_list[j] = stream_user_hash_or_net_hash  # If it is a net hash, it will be expanded later
    return True

expand_and_normalize_streams

expand_and_normalize_streams(interaction: Interaction) -> tuple[dict[str | None, list[str]], dict[str | None, list[str]]]

Resolve all stream references in an interaction to fully-populated stream dicts.

Translates user-level and net-level hashes into detailed dicts containing user_hash, net_hash, name, group, and the stream object itself, grouped by their suggested redirection ('stdin', 'stdtar', 'stdext', or None).

Parameters:

Name Type Description Default
interaction Interaction

The interaction whose stream list should be expanded.

required

Returns:

Type Description
dict[str | None, list[str]]

A 2-tuple (expanded_streams, expanded_owned_streams) where each element is a dict

dict[str | None, list[str]]

keyed by redirection hint ('stdin', 'stdtar', 'stdext', None) mapping to

tuple[dict[str | None, list[str]], dict[str | None, list[str]]]

lists of fully-populated stream dicts. Returns (None, None) if any stream hash cannot

tuple[dict[str | None, list[str]], dict[str | None, list[str]]]

be resolved.

Source code in unaiverse/interaction.py
def expand_and_normalize_streams(self, interaction: Interaction) -> (
        tuple)[dict[str | None, list[str]], dict[str | None, list[str]]]:
    """Resolve all stream references in an interaction to fully-populated stream dicts.

    Translates user-level and net-level hashes into detailed dicts containing ``user_hash``,
    ``net_hash``, ``name``, ``group``, and the stream object itself, grouped by their suggested
    redirection (``'stdin'``, ``'stdtar'``, ``'stdext'``, or ``None``).

    Args:
        interaction: The interaction whose stream list should be expanded.

    Returns:
        A 2-tuple ``(expanded_streams, expanded_owned_streams)`` where each element is a dict
        keyed by redirection hint (``'stdin'``, ``'stdtar'``, ``'stdext'``, ``None``) mapping to
        lists of fully-populated stream dicts.  Returns ``(None, None)`` if any stream hash cannot
        be resolved.
    """

    # Guessing net hash and specific name af each stream: if the name was not provided, then all the streams of the
    # group are considered. Generating a full list of streams, distinguishing them in function of their suggested
    # redirection, i.e., 'stdin', 'stdtar', 'stdext', None (meaning 'no suggestions').
    expanded_streams = {'stdin': [], 'stdtar': [], 'stdext': [], "stdunk": []}
    expanded_owned_streams = {'stdin': [], 'stdtar': [], 'stdext': [], "stdunk": []}

    # Checking
    if len(interaction.streams) == 0:
        return expanded_streams, expanded_owned_streams

    for redirect, streams_list in interaction.streams.items():
        for stream_hash in streams_list:

            if Stream.is_user_hash(stream_hash):
                expanded_streams[redirect].append(stream_hash)
                if stream_hash in self.agent.owned_streams_by_user_hash:
                    expanded_owned_streams[redirect].append(stream_hash)

            elif Stream.is_net_hash(stream_hash):
                streams = self.agent.known_streams[stream_hash]
                for stream_obj in streams.values():
                    _stream_hash = DataProps.user_hash_from_net_hash(stream_hash, stream_obj.props.get_name())
                    expanded_streams[redirect].append(_stream_hash)
                    if _stream_hash in self.agent.owned_streams_by_user_hash:
                        expanded_owned_streams[redirect].append(_stream_hash)
    return expanded_streams, expanded_owned_streams

match_streams

match_streams(expanded_streams: dict) -> tuple[bool, dict, dict, dict, dict]

Assign expanded stream dicts to stdin, stdtar, or stdext based on processor compatibility.

Attempts to match streams to the processor's input and output argument slots, following any redirection hints. Streams that do not fit a processor slot are placed in stdext.

Parameters:

Name Type Description Default
expanded_streams dict

The output of expand_and_normalize_streams; a dict keyed by redirection hint ('stdin', 'stdtar', 'stdext', None) containing lists of fully-populated stream dicts.

required

Returns:

Type Description
bool

A 4-tuple (valid, stdin_streams, stdtar_streams, stdext_streams) where valid is

dict

False if required processor inputs have no matching stream and no default value, and the

dict

remaining three elements are {user_hash: stream_obj} dicts for each category.

Source code in unaiverse/interaction.py
def match_streams(self, expanded_streams: dict) -> tuple[bool, dict, dict, dict, dict]:
    """Assign expanded stream dicts to stdin, stdtar, or stdext based on processor compatibility.

    Attempts to match streams to the processor's input and output argument slots, following any
    redirection hints.  Streams that do not fit a processor slot are placed in stdext.

    Args:
        expanded_streams: The output of ``expand_and_normalize_streams``; a dict keyed by
            redirection hint (``'stdin'``, ``'stdtar'``, ``'stdext'``, ``None``) containing lists
            of fully-populated stream dicts.

    Returns:
        A 4-tuple ``(valid, stdin_streams, stdtar_streams, stdext_streams)`` where ``valid`` is
        False if required processor inputs have no matching stream and no default value, and the
        remaining three elements are ``{user_hash: stream_obj}`` dicts for each category.
    """

    # Assigning streams to 'stdin' or 'stdext', following the given suggestions, when possible, and being a bit
    # heuristic for all the not-well-defined cases
    processor_will_be_used = len(expanded_streams['stdin']) > 0 or len(expanded_streams["stdunk"]) > 0
    stdin_streams = {}
    stdtar_streams = {}
    stdext_streams = {}
    owned_streams = {}

    for user_hash in expanded_streams['stdext']:
        if user_hash not in self.agent.known_streams_by_user_hash:
            return False, {}, {}, {}, {}
        stdext_streams[user_hash] = self.agent.known_streams_by_user_hash[user_hash]
        if user_hash in self.agent.owned_streams_by_user_hash:
            owned_streams[user_hash] = stdext_streams[user_hash]

    # We try to match the 'stdin' suggestions with the different input arguments of the processors.
    # We also consider the streams with no specific suggestions (lower priority).
    if processor_will_be_used:
        sources = ['stdin', 'stdunk']
        pos_stdin_streams = [None] * len(self.agent.proc_inputs)
        for i in range(len(self.agent.proc_inputs)):
            found_match = -1
            for source in sources:
                for j, user_hash in enumerate(expanded_streams[source]):
                    if user_hash not in self.agent.known_streams_by_user_hash:
                        return False, {}, {}, {}, {}
                    stream_obj = self.agent.known_streams_by_user_hash[user_hash]
                    net_hash = DataProps.build_net_hash(DataProps.peer_id_from_user_hash(user_hash),
                                                        stream_obj.is_pubsub(),
                                                        stream_obj.props.name_or_group())

                    name = stream_obj.props.get_name()

                    # If the current input stream is compatible with the i-th input slot...
                    if (net_hash, name) in self.agent.compat_in_streams[i]:
                        pos_stdin_streams[i] = user_hash
                        found_match = j
                        break

                if found_match >= 0:
                    del expanded_streams[source][found_match]  # Removing the stream from the suggestions
                    break

        # We ensured that the not-filled-arguments of the processor have a default value, otherwise no ways
        for i in range(len(self.agent.proc_inputs)):
            if pos_stdin_streams[i] is None:
                if not self.agent.proc_optional_inputs[i]["has_default"]:
                    return False, {}, {}, {}, {}
                else:
                    stdin_streams["<default_input_pos_" + str(i) + ">"] = (
                        self.agent.proc_optional_inputs)[i]["default_value"]
            else:
                stdin_streams[pos_stdin_streams[i]] = self.agent.known_streams_by_user_hash[pos_stdin_streams[i]]
                if pos_stdin_streams[i] in self.agent.owned_streams_by_user_hash:
                    owned_streams[pos_stdin_streams[i]] = stdin_streams[pos_stdin_streams[i]]

        # Matching targets
        sources = ['stdtar', 'stdunk']
        pos_stdtar_streams = [None] * len(self.agent.proc_outputs)
        for i in range(len(self.agent.proc_outputs)):
            found_match = -1
            for source in sources:
                for j, user_hash in enumerate(expanded_streams[source]):
                    if user_hash not in self.agent.known_streams_by_user_hash:
                        return False, {}, {}, {}, {}
                    stream_obj = self.agent.known_streams_by_user_hash[user_hash]
                    net_hash = DataProps.build_net_hash(DataProps.peer_id_from_user_hash(user_hash),
                                                        stream_obj.is_pubsub(),
                                                        stream_obj.props.get_group())

                    name = stream_obj.props.get_name()

                    # If the current input stream is compatible with the i-th input slot...
                    if (net_hash, name) in self.agent.compat_out_streams[i]:
                        pos_stdtar_streams[i] = user_hash
                        found_match = j
                        break

                if found_match >= 0:
                    del expanded_streams[source][found_match]  # Removing the stream from the suggestions
                    break

        # We ensured that the not-filled-arguments of the processor have a default value, otherwise no ways
        for i in range(len(self.agent.proc_outputs)):
            if pos_stdtar_streams[i] is not None:
                stdtar_streams[pos_stdtar_streams[i]] = self.agent.known_streams_by_user_hash[pos_stdtar_streams[i]]
                if pos_stdtar_streams[i] in self.agent.owned_streams_by_user_hash:
                    owned_streams[pos_stdtar_streams[i]] = stdtar_streams[pos_stdtar_streams[i]]

    # We add to 'stdext' all the streams that did not fit the processor (both coming from suggestions in 'stdin' or
    # not suggested at all)
    sources = ['stdin', 'stdtar', 'stdunk']
    for source in sources:
        for user_hash in expanded_streams[source]:
            if user_hash not in self.agent.known_streams_by_user_hash:
                return False, {}, {}, {}, {}
            stdext_streams[user_hash] = self.agent.known_streams_by_user_hash[user_hash]
            if user_hash in self.agent.owned_streams_by_user_hash:
                owned_streams[user_hash] = stdext_streams[user_hash]

    return True, stdin_streams, stdtar_streams, stdext_streams, owned_streams

check_if_doable

check_if_doable(interaction: Interaction) -> bool

Return True if the given interaction can be executed right now.

An interaction is doable when it is not yet completed, the requester is a known agent, and either all involved streams have fresh data or a deprecated completion step is pending.

Parameters:

Name Type Description Default
interaction Interaction

The interaction to evaluate.

required
Source code in unaiverse/interaction.py
def check_if_doable(self, interaction: Interaction) -> bool:
    """Return True if the given interaction can be executed right now.

    An interaction is doable when it is not yet completed, the requester is a known agent, and
    either all involved streams have fresh data or a deprecated completion step is pending.

    Args:
        interaction: The interaction to evaluate.
    """
    requester_is_known = interaction.requester in self.agent.all_agents
    return (not interaction.completed and requester_is_known and
            (interaction.get_new_stream_data_tags(all_fresh_or_fail=True) is not None))

check_consistency_between_action_kwargs_and_interaction_fields staticmethod

check_consistency_between_action_kwargs_and_interaction_fields(interaction: Interaction)
Source code in unaiverse/interaction.py
@staticmethod
def check_consistency_between_action_kwargs_and_interaction_fields(interaction: Interaction):
    for k, v in interaction.action_kwargs.items():
        if k in Custom.RESERVED_IN_ACTION_KWARGS:
            log.error(f"Tried to register and interaction that includes a "
                      f"private/not-allowed argument: {k} not allowed in action_kwargs")
            return False
        if k in Custom.INTERACTION_FIELD_NAMES and v != getattr(interaction, k):
            log.error(f"Tried to register and interaction that includes a "
                      f"private/not-allowed argument: {k} echoed inconsistently,"
                      f" {v} vs {getattr(interaction, k)}")
            return False
    return True

get_current

get_current() -> Interaction | None

Return the currently executing interaction, or None if no interaction is running.

Source code in unaiverse/interaction.py
def get_current(self) -> Interaction | None:
    """Return the currently executing interaction, or None if no interaction is running."""
    return self.current

get_last_registered

get_last_registered() -> Interaction | None

Return the most recently registered interaction, or None if none have been registered.

Source code in unaiverse/interaction.py
def get_last_registered(self) -> Interaction | None:
    """Return the most recently registered interaction, or None if none have been registered."""
    return self.last_registered

set_current_as_paused

set_current_as_paused() -> None

Set the current interaction as something that was started but now must be paused because some other interactions are handled.

Source code in unaiverse/interaction.py
def set_current_as_paused(self) -> None:
    """Set the current interaction as something that was started but now must be paused because some other
    interactions are handled."""
    if self.current is not None:
        self.current.mark_paused()

set_current

set_current(interaction: Interaction | None) -> None

Set the given interaction as the currently executing one.

This marks the interaction as running and configures the agent's stdin/stdout to point to the correct streams for this interaction.

Parameters:

Name Type Description Default
interaction Interaction | None

The Interaction to set as current.

required
Source code in unaiverse/interaction.py
def set_current(self, interaction: Interaction | None) -> None:
    """Set the given interaction as the currently executing one.

    This marks the interaction as running and configures the agent's
    stdin/stdout to point to the correct streams for this interaction.

    Args:
        interaction: The Interaction to set as current.
    """
    self.current = interaction

    # Default stream bindings (this also automatically switches from private to public and vice-versa)
    self.agent.set_default_stream_binding()

    if interaction is not None:
        interaction.mark_running()

        if len(interaction.stream_proxy) > 0:

            # Restart buffered streams and activate them if they were off
            for stream in interaction.stream_proxy:
                if isinstance(stream, BufferedStream):
                    stream.restart(interaction.uuid)

            # Every interaction with some-streams-specified forces the interaction-described bindings
            self.agent.stdin.bind(interaction.stdin_streams, uuid=interaction.uuid)
            self.agent.stdtar.bind(interaction.stdtar_streams, uuid=interaction.uuid)
            self.agent.stdext.bind(interaction.stdext_streams, uuid=interaction.uuid)
            self.agent.stdout.bind_uuid_only(interaction.uuid)

has_data

has_data(interaction: Interaction) -> bool

Return True if any owned stream has data available for the given interaction.

Parameters:

Name Type Description Default
interaction Interaction

The interaction to check data availability for.

required
Source code in unaiverse/interaction.py
def has_data(self, interaction: 'Interaction') -> bool:
    """Return True if any owned stream has data available for the given interaction.

    Args:
        interaction: The interaction to check data availability for.
    """
    for stream_obj in self.agent.owned_streams_by_user_hash.values():
        if stream_obj.has_data(interaction.uuid):
            return True
    return False

get_recipients

get_recipients(interaction: Interaction) -> list[str]

Return the list of peer IDs that should receive data generated by the interaction.

For sent interactions the recipients are the interaction targets; for received interactions it is the original requester; for lazy interactions it is the interaction targets. Unknown (unregistered) interactions fall back to the interaction's target list.

Parameters:

Name Type Description Default
interaction Interaction

The interaction whose recipients should be determined.

required

Returns:

Type Description
list[str]

A list of peer ID strings, filtering out any None entries.

Source code in unaiverse/interaction.py
def get_recipients(self, interaction: 'Interaction') -> list[str]:
    """Return the list of peer IDs that should receive data generated by the interaction.

    For sent interactions the recipients are the interaction targets; for received interactions it
    is the original requester; for lazy interactions it is the interaction targets.  Unknown
    (unregistered) interactions fall back to the interaction's target list.

    Args:
        interaction: The interaction whose recipients should be determined.

    Returns:
        A list of peer ID strings, filtering out any ``None`` entries.
    """
    if self.is_known(interaction):
        if self.is_sent(interaction):
            recipients = interaction.target  # This is always a list, even when with 1 element only
        elif self.is_received(interaction):
            recipients = [interaction.requester]  # This is always 1 element, we make it a list
        elif self.is_lazy(interaction):
            recipients = interaction.target  # This is always a list, even when with 1 element only
        else:
            raise GenException("Unexpected case of a known interaction that is both not sent or received")
    else:

        # This is the case of a not-registered interaction.
        recipients = interaction.target  # This is always a list, even when with 1 element only
    return [x for x in recipients if x is not None]

complete async

complete(interaction: Interaction, reason: CompletionReason, dest_state: str | None = None, target: str | None = None) -> None

Mark an interaction as completed and move it to the recently-completed set, removing it from its action (async).

Parameters:

Name Type Description Default
interaction Interaction

The interaction to complete.

required
reason CompletionReason

The reason for completion.

required
dest_state str | None

The destination state reached, if any.

None
target str | None

The target agent who completed the interaction (Default: None).

None
Source code in unaiverse/interaction.py
async def complete(self, interaction: 'Interaction', reason: 'CompletionReason',
                   dest_state: str | None = None, target: str | None = None) -> None:
    """Mark an interaction as completed and move it to the recently-completed set, removing it from its action
    (async).

    Args:
        interaction: The interaction to complete.
        reason: The reason for completion.
        dest_state: The destination state reached, if any.
        target: The target agent who completed the interaction (Default: None).
    """
    if interaction is not None:
        interaction.mark_completed(reason, dest_state=dest_state, target=target)
        interaction.clear_from_action()  # We do not remove it from streams yet - it might be needed for sending
        if interaction.completed:
            if (interaction.uuid in self.sent and
                    interaction.requester == self.sent[interaction.uuid].requester):  # Distinguish chained
                self.sent_recently_completed.add(interaction)
                del self.sent[interaction.uuid]
            if (interaction.uuid in self.received and
                    interaction.requester == self.received[interaction.uuid].requester):  # Distinguish chained
                self.received_recently_completed.add(interaction)
                del self.received[interaction.uuid]
            if (interaction.uuid in self.lazy and
                    interaction.requester == self.lazy[interaction.uuid].requester):  # Distinguish chained
                self.lazy_recently_completed.add(interaction)
                del self.lazy[interaction.uuid]

            # Running callback method, if any
            if interaction.callback is not None:
                callback_method = getattr(self.agent, interaction.callback)

                self.agent.behav_lone_wolf.enable(False)
                self.agent.behav.enable(False)
                if interaction.requester in self.agent.public_agents:
                    self.agent.behav_lone_wolf.enable(True)
                else:
                    if self.agent.in_world():
                        self.agent.behav.enable(True)

                        await callback_method(interaction=interaction)  # "Calling callback!"

                self.agent.behav_lone_wolf.enable(False)
                self.agent.behav.enable(False)

complete_current async

complete_current(dest_state: str, reason: CompletionReason) -> None

Mark the current interaction as completed (async).

Parameters:

Name Type Description Default
dest_state str

The destination state reached by completing this action.

required
reason CompletionReason

The reason for completion.

required
Source code in unaiverse/interaction.py
async def complete_current(self, dest_state: str, reason: 'CompletionReason') -> None:
    """Mark the current interaction as completed (async).

    Args:
        dest_state: The destination state reached by completing this action.
        reason: The reason for completion.
    """
    await self.complete(self.current, dest_state=dest_state, reason=reason)
    self.current = None

drain_completed

drain_completed() -> list[Interaction]

Return and clear the list of recently completed interactions.

Used by the agent's behave() loop to send status notifications.

Returns:

Type Description
list[Interaction]

List of Interaction objects that were recently completed.

Source code in unaiverse/interaction.py
def drain_completed(self) -> list[Interaction]:
    """Return and clear the list of recently completed interactions.

    Used by the agent's ``behave()`` loop to send status notifications.

    Returns:
        List of Interaction objects that were recently completed.
    """
    cur_time = clock.get_time()
    cur_clock_cycle = clock.get_cycle()
    drained = []

    for recently_completed in [self.sent_recently_completed,
                               self.received_recently_completed,
                               self.lazy_recently_completed]:
        to_remove = []
        for i, interaction in enumerate(recently_completed):

            # Wait AT LEAST 1 clock cycle, to allow sending samples generated during the interaction
            # Of course, "disconnected"-agent-related interactions are immediately drained
            if (interaction.completion_reason == CompletionReason.DISCONNECTED or
                    (interaction.cycle_completed < cur_clock_cycle and
                     (cur_time - interaction.timestamp_completed) > Custom.DRAIN_TIMEOUT)):
                log.inter(
                    f"Draining {interaction.to_code_str(True)} "
                    f"(cycle_completed={interaction.cycle_completed})")
                to_remove.append(interaction)
                drained.append(interaction)
        for interaction in to_remove:
            self.sent_recently_completed.discard(interaction)
            self.received_recently_completed.discard(interaction)
            self.lazy_recently_completed.discard(interaction)

            # Clearing from all the owned and not-owned streams (all), that might have been used for output purposes
            # by the current agent or by others.
            # In principle, only the processor streams should be involved, since it is the only one in which we plug
            # this interaction in this class.
            # However, the user might have added the interaction to other owned streams.
            if not self.is_known(interaction):
                # If, meanwhile, another interaction with the same UUID (e.g., None) was received,
                # do not clear it from the streams
                self.clear_from_all_streams(interaction)

    return drained

complete_expired async

complete_expired() -> None

Remove expired (or no-action-based) interactions and return them for notification (async).

Checks all received and sent interactions against the retry_timeout. Expired interactions are marked as COMPLETED with TIMEOUT reason and removed from the tracking dicts.

Source code in unaiverse/interaction.py
async def complete_expired(self) -> None:
    """Remove expired (or no-action-based) interactions and return them for notification (async).

    Checks all received and sent interactions against the retry_timeout.
    Expired interactions are marked as COMPLETED with TIMEOUT reason
    and removed from the tracking dicts.
    """
    for interaction in list(self.sent.values()) + list(self.received.values()) + list(self.lazy.values()):
        if interaction.status == InteractionStatus.COMPLETED:
            continue
        elif interaction.is_expired(Custom.DEFAULT_INTER_TIMEOUT) or interaction.action_name is None:
            await self.complete(interaction, reason=CompletionReason.TIMEOUT)
        elif interaction.volatile and self.is_sent(interaction):
            await self.complete(interaction, reason=CompletionReason.TIMEOUT)
        else:
            if self.is_received(interaction) and interaction.requester not in self.agent.all_agents:
                await self.complete(interaction, reason=CompletionReason.DISCONNECTED)
            if self.is_sent(interaction):
                for target in interaction.target:
                    if target not in self.agent.all_agents:
                        await self.complete(interaction, reason=CompletionReason.DISCONNECTED, target=target)

clear_expired_stream_data

clear_expired_stream_data() -> None

Purge stale buffered data from streams whose associated interaction is done or gone.

Source code in unaiverse/interaction.py
def clear_expired_stream_data(self) -> None:
    """Purge stale buffered data from streams whose associated interaction is done or gone."""
    all_interactions = list(self.received.values()) + list(self.sent.values()) + list(self.lazy.values())
    for interaction in all_interactions:
        for stream in interaction.stream_proxy:

            # In case of data samples or default input values, skip
            if not isinstance(stream, Stream):
                continue

            # We only consider streams whose interactions were either removed, or are already completed, or
            # were just created (artificial interaction, for example the ones created just to send a sample)
            if (not stream.has_interaction(interaction.uuid)
                    or stream.get_interaction(interaction.uuid).completed
                    or stream.get_interaction(interaction.uuid).created):

                # This will also clear the associated (completed) interaction, if still there
                stream.clear_expired_data(Custom.DEFAULT_INTER_TIMEOUT)

update_sent_status async

update_sent_status(status_dict: dict) -> None

Update a previously sent interaction's status from a received status message (async).

Parameters:

Name Type Description Default
status_dict dict

Dict from Interaction.to_status_dict().

required
Source code in unaiverse/interaction.py
async def update_sent_status(self, status_dict: dict) -> None:
    """Update a previously sent interaction's status from a received status message (async).

    Args:
        status_dict: Dict from Interaction.to_status_dict().
    """
    sender = status_dict.get('status_generated_by')
    uuid = status_dict.get('uuid')  # UUID can be None (meaning generic UUID, still valid!), recall this
    if uuid in self.sent:

        # Collecting data
        interaction = self.sent[uuid]
        interaction_status = InteractionStatus(status_dict['status'])
        if status_dict.get('completion_reason'):
            completion_reason = CompletionReason(status_dict['completion_reason'])
        else:
            completion_reason = CompletionReason.OK
        interaction_destination_state = status_dict.get('destination_state')

        # Recording data tags
        data_tags: dict[str, list[int]] | None = status_dict.get('data_tags', None)
        if data_tags is not None:
            interaction.record_data_tags(data_tags,
                                         target=sender)

        #  Mark the completion from the sender
        if interaction_status == InteractionStatus.COMPLETED:
            await self.complete(interaction, dest_state=interaction_destination_state, reason=completion_reason,
                                target=sender)

is_received

is_received(interaction: Interaction) -> bool

Return True if the interaction was received from another agent (active or recently completed).

Parameters:

Name Type Description Default
interaction Interaction

The interaction to check.

required
Source code in unaiverse/interaction.py
def is_received(self, interaction: Interaction) -> bool:
    """Return True if the interaction was received from another agent (active or recently completed).

    Args:
        interaction: The interaction to check.
    """
    return interaction.uuid in self.received or interaction in self.received_recently_completed

is_sent

is_sent(interaction: Interaction) -> bool

Return True if the interaction was sent by this agent (active or recently completed).

Parameters:

Name Type Description Default
interaction Interaction

The interaction to check.

required
Source code in unaiverse/interaction.py
def is_sent(self, interaction: Interaction) -> bool:
    """Return True if the interaction was sent by this agent (active or recently completed).

    Args:
        interaction: The interaction to check.
    """
    return interaction.uuid in self.sent or interaction in self.sent_recently_completed

is_lazy

is_lazy(interaction: Interaction) -> bool

Return True if the interaction was lazily generated within this agent (active or recently completed).

Parameters:

Name Type Description Default
interaction Interaction

The interaction to check.

required
Source code in unaiverse/interaction.py
def is_lazy(self, interaction: Interaction) -> bool:
    """Return True if the interaction was lazily generated within this agent (active or recently completed).

    Args:
        interaction: The interaction to check.
    """
    return interaction.uuid in self.lazy or interaction in self.lazy_recently_completed

is_known

is_known(interaction: Interaction) -> bool

Return True if the interaction is tracked in any of sent, received, or lazy dicts.

Parameters:

Name Type Description Default
interaction Interaction

The interaction to check.

required
Source code in unaiverse/interaction.py
def is_known(self, interaction: Interaction) -> bool:
    """Return True if the interaction is tracked in any of sent, received, or lazy dicts.

    Args:
        interaction: The interaction to check.
    """
    return self.is_received(interaction) or self.is_sent(interaction) or self.is_lazy(interaction)

get_interaction

get_interaction(uuid: str | None, consider_completed_too: bool = False) -> Interaction | None

Look up an interaction by UUID across the active tracking dicts.

Parameters:

Name Type Description Default
uuid str | None

The UUID string to look up.

required
consider_completed_too bool

When True, also search the recently-completed sets and return the most recently completed match (there may be more than one with the same UUID).

False

Returns:

Type Description
Interaction | None

The matching Interaction, or None if not found.

Source code in unaiverse/interaction.py
def get_interaction(self, uuid: str | None, consider_completed_too: bool = False) -> Interaction | None:
    """Look up an interaction by UUID across the active tracking dicts.

    Args:
        uuid: The UUID string to look up.
        consider_completed_too: When True, also search the recently-completed sets and return the
            most recently completed match (there may be more than one with the same UUID).

    Returns:
        The matching Interaction, or None if not found.
    """
    if uuid in self.received:
        return self.received[uuid]
    elif uuid in self.sent:
        return self.sent[uuid]
    elif uuid in self.lazy:
        return self.lazy[uuid]
    else:
        if consider_completed_too:
            found_so_far = None
            for inter in self.received_recently_completed:
                if inter.uuid == uuid:
                    if found_so_far is None or found_so_far.timestamp_completed < inter.timestamp_completed:
                        found_so_far = inter
            for inter in self.sent_recently_completed:
                if inter.uuid == uuid:
                    if found_so_far is None or found_so_far.timestamp_completed < inter.timestamp_completed:
                        found_so_far = inter
            for inter in self.lazy_recently_completed:
                if inter.uuid == uuid:
                    if found_so_far is None or found_so_far.timestamp_completed < inter.timestamp_completed:
                        found_so_far = inter
            return found_so_far  # It returns the last completed, there could be more than one with the same UUID
        else:
            return None

add_lazy_stream_to_interaction

add_lazy_stream_to_interaction(stream_hash: str, interaction: Interaction) -> None

Attach a stream (identified by hash) to an already-registered interaction as a lazy stream.

Parameters:

Name Type Description Default
stream_hash str

A user-level or net-level stream hash to resolve and attach.

required
interaction Interaction

The interaction to attach the stream to.

required
Source code in unaiverse/interaction.py
def add_lazy_stream_to_interaction(self, stream_hash: str, interaction: Interaction) -> None:
    """Attach a stream (identified by hash) to an already-registered interaction as a lazy stream.

    Args:
        stream_hash: A user-level or net-level stream hash to resolve and attach.
        interaction: The interaction to attach the stream to.
    """
    user_hashes = self.__normalize_to_user_hashes(stream_hash)
    log.debug(f"[add_lazy_stream_to_interaction] stream_hash={stream_hash}, user_hashes={user_hashes}, "
              f"interaction={interaction}")
    for user_hash in user_hashes:
        stream_obj = self.agent.known_streams_by_user_hash[user_hash]
        stream_obj.add_interaction(interaction)
        interaction.add_lazy_stream(user_hash, stream_obj,
                                    is_owned=user_hash in self.agent.owned_streams_by_user_hash)

remove_interactions_of_agent async

remove_interactions_of_agent(agent: str) -> None

Complete and deregister all interactions that involve the given agent as requester or target (async).

Parameters:

Name Type Description Default
agent str

Peer ID of the agent being removed.

required
Source code in unaiverse/interaction.py
async def remove_interactions_of_agent(self, agent: str) -> None:
    """Complete and deregister all interactions that involve the given agent as requester or target (async).

    Args:
        agent: Peer ID of the agent being removed.
    """
    interaction_dicts = [self.sent, self.received, self.lazy]
    for interaction_dict in interaction_dicts:
        for uuid, inter in list(interaction_dict.items()):  # Do not remove list(...)
            if inter.requester == agent:
                await self.complete(inter, reason=CompletionReason.DISCONNECTED)
            if agent in inter.target:
                await self.complete(inter, reason=CompletionReason.DISCONNECTED, target=agent)

InteractionStatus

Bases: Enum

CREATED class-attribute instance-attribute

CREATED = 'created'

REQUESTED class-attribute instance-attribute

REQUESTED = 'requested'

LAZY class-attribute instance-attribute

LAZY = 'lazy'

RECEIVED class-attribute instance-attribute

RECEIVED = 'received'

RUNNING class-attribute instance-attribute

RUNNING = 'running'

PAUSED class-attribute instance-attribute

PAUSED = 'paused'

COMPLETED class-attribute instance-attribute

COMPLETED = 'completed'

CompletionReason

Bases: Enum

OK class-attribute instance-attribute

OK = 'ok'

TIMEOUT class-attribute instance-attribute

TIMEOUT = 'retry_timeout'

REJECTED class-attribute instance-attribute

REJECTED = 'rejected'

DISCONNECTED class-attribute instance-attribute

DISCONNECTED = 'disconnected'

ERROR class-attribute instance-attribute

ERROR = 'error'

DISCARDED class-attribute instance-attribute

DISCARDED = 'discarded'

InteractionType

Bases: Enum

SENT class-attribute instance-attribute

SENT = 'sent'

RECEIVED class-attribute instance-attribute

RECEIVED = 'received'

LAZY class-attribute instance-attribute

LAZY = 'lazy'