Skip to content

🔴 unaiverse.networking.node.connpool

What this module does 🔴

Manages pools of peer connections over multiple P2P transports, routing peers into named pools by role and providing connect/disconnect, messaging, pub/sub, and world-membership tracking for the agent network.

connpool

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

ConnectionPools

ConnectionPools(max_connections: int, pool_name_to_p2p_name_and_ratio: dict[str, [str, float]], p2p_name_to_p2p: dict[str, P2P], public_key: str | None = None, token: str | None = None)

A multi-pool connection manager that routes peers across named P2P connection pools.

ConnectionPools partitions the total connection budget (max_connections) into named pools, each associated with a P2P network object and a capacity ratio. Every connected peer is tracked in exactly one pool and is identified by its peer ID. The class provides primitives for connecting, disconnecting, adding, removing, querying, and messaging peers, along with token-based authentication for incoming messages.

Internally each pool is represented as a "pool triple": a set of peer IDs, an integer maximum size, and the P2P object that owns those connections. Several cross-referenced dictionaries (peer_id_to_pool_name, peer_id_to_p2p, pool_name_to_peer_infos, etc.) are maintained in sync so that per-peer lookups remain O(1).

This class is abstract in the sense that conn_routing_fcn raises NotImplementedError and must be overridden by a concrete subclass (such as NodeConn) to define how raw peer-info dictionaries returned by the P2P layer are mapped to pool names.

Attributes:

Name Type Description
max_con

Maximum total number of connections across all pools.

pool_count

Number of named connection pools.

pool_names

Ordered list of pool names as provided at construction time.

pool_ratios

Per-pool capacity ratios in the same order as pool_names.

p2p_name_to_p2p

Mapping from P2P network name to its P2P object.

pool_name_to_pool_triple

Mapping from pool name to its triple [peer_id_set, max_size, p2p].

pool_name_to_peer_infos

Mapping from pool name to a dict of {peer_id: peer_info_dict}.

peer_id_to_pool_name

Reverse mapping from peer ID to the pool it occupies.

peer_id_to_p2p

Mapping from peer ID to the P2P object that owns its connection.

peer_id_to_misc

Mapping from peer ID to an integer "misc" field (role bits, etc.).

peer_id_to_token

Mapping from peer ID to the last verified authentication token.

pool_name_to_added_in_last_update

Set of peer IDs added per pool during the most recent update call.

pool_name_to_removed_in_last_update

Set of peer IDs removed per pool during the most recent update call.

Initialize the connection pools and compute per-pool capacity budgets.

Pool capacities are derived from the provided ratios scaled by max_connections. A pool whose ratio is exactly 0.0 receives a capacity of 0 (disabled), while a pool whose ratio is negative is treated as an unlimited-capacity overflow pool. Any rounding remainder from math.floor is assigned to the last pool so that the sum of all pool capacities equals max_connections exactly.

The constructor validates that: - Every P2P name referenced in pool_name_to_p2p_name_and_ratio appears in p2p_name_to_p2p. - max_connections is at least as large as the number of pools. - The positive ratios sum to exactly 1.0. - The number of zero-ratio (disabled) pools does not exceed max_connections.

Parameters:

Name Type Description Default
max_connections int

Maximum total number of simultaneous connections across all pools.

required
pool_name_to_p2p_name_and_ratio dict[str, [str, float]]

Ordered mapping from pool name to a two-element list [p2p_network_name, ratio]. The ratio must be a non-negative float, or negative to mark a pool as dynamically sized. Positive ratios must sum to 1.0.

required
p2p_name_to_p2p dict[str, P2P]

Mapping from P2P network name to its P2P object.

required
public_key str | None

PEM-encoded public key used to verify tokens piggybacked on incoming messages. If None, token verification is disabled and verify_token always returns (None, None). Defaults to None.

None
token str | None

Authentication token appended to every outgoing message as a piggyback payload. Defaults to None (empty string token).

None

Raises:

Type Description
AssertionError

If any referenced P2P name is absent from p2p_name_to_p2p, if max_connections is smaller than the pool count, if positive ratios do not sum to 1.0, or if the computed per-pool sizes are inconsistent with max_connections.

Source code in unaiverse/networking/node/connpool.py
def __init__(self, max_connections: int, pool_name_to_p2p_name_and_ratio: dict[str, [str, float]],
             p2p_name_to_p2p: dict[str, P2P],
             public_key: str | None = None, token: str | None = None) -> None:
    """Initialize the connection pools and compute per-pool capacity budgets.

    Pool capacities are derived from the provided ratios scaled by ``max_connections``.
    A pool whose ratio is exactly ``0.0`` receives a capacity of ``0`` (disabled), while
    a pool whose ratio is negative is treated as an unlimited-capacity overflow pool.
    Any rounding remainder from ``math.floor`` is assigned to the last pool so that the
    sum of all pool capacities equals ``max_connections`` exactly.

    The constructor validates that:
    - Every P2P name referenced in ``pool_name_to_p2p_name_and_ratio`` appears in
      ``p2p_name_to_p2p``.
    - ``max_connections`` is at least as large as the number of pools.
    - The positive ratios sum to exactly ``1.0``.
    - The number of zero-ratio (disabled) pools does not exceed ``max_connections``.

    Args:
        max_connections: Maximum total number of simultaneous connections across all
            pools.
        pool_name_to_p2p_name_and_ratio: Ordered mapping from pool name to a two-element
            list ``[p2p_network_name, ratio]``. The ratio must be a non-negative float,
            or negative to mark a pool as dynamically sized. Positive ratios must sum
            to ``1.0``.
        p2p_name_to_p2p: Mapping from P2P network name to its ``P2P`` object.
        public_key: PEM-encoded public key used to verify tokens piggybacked on
            incoming messages. If ``None``, token verification is disabled and
            ``verify_token`` always returns ``(None, None)``. Defaults to ``None``.
        token: Authentication token appended to every outgoing message as a piggyback
            payload. Defaults to ``None`` (empty string token).

    Raises:
        AssertionError: If any referenced P2P name is absent from ``p2p_name_to_p2p``,
            if ``max_connections`` is smaller than the pool count, if positive ratios do
            not sum to ``1.0``, or if the computed per-pool sizes are inconsistent with
            ``max_connections``.
    """
    # Common terms: a "pool triple" is [pool_contents, max_connections_in_such_a_pool, p2p_object_of_the_pool
    self.max_con = max_connections
    self.pool_count = len(pool_name_to_p2p_name_and_ratio)
    self.pool_names = list(pool_name_to_p2p_name_and_ratio.keys())
    self.pool_ratios = [p2p_name_and_ratio[1] for p2p_name_and_ratio in pool_name_to_p2p_name_and_ratio.values()]

    # Indices involving the P2P object or its name
    self.p2p_name_to_p2p = p2p_name_to_p2p
    self.p2p_name_and_pool_name_to_pool_triple = {}
    self.p2p_to_pool_names = {}

    # Indices rooted around the pool name
    self.pool_name_to_pool_triple = {}
    self.pool_name_to_added_in_last_update = {}
    self.pool_name_to_removed_in_last_update = {}
    self.pool_name_to_peer_infos = {p: {} for p in self.pool_names}

    # Indices rooted around the peer ID
    self.peer_id_to_pool_name = {}
    self.peer_id_to_p2p = {}
    self.peer_id_to_misc = {}
    self.peer_id_to_token = {}

    # Token-related stuff, super private
    self.__token = token if token is not None else ""
    self.__token_verifier = TokenVerifier(public_key) if public_key is not None else None

    # Checking
    for p2p_name_and_ratio in pool_name_to_p2p_name_and_ratio.values():
        assert p2p_name_and_ratio[0] in self.p2p_name_to_p2p, f"Cannot find p2p named {p2p_name_and_ratio[0]} "
    assert self.max_con >= len(self.pool_names), "Too small number of max connections"
    assert sum([x for x in self.pool_ratios if x > 0]) == 1.0, "Pool ratios must sum to 1.0"

    # Preparing the pool triples
    self.pool_name_to_pool_triple = \
        {k: [set(), 0, self.p2p_name_to_p2p[pool_name_to_p2p_name_and_ratio[k][0]]] for k in self.pool_names}
    num_zero_ratio_pools = len([x for x in self.pool_ratios if x == 0])
    assert num_zero_ratio_pools <= self.max_con, "Cannot create pools given the provided max connection count"

    # Edit: to solve the teacher not engaging with more than two students.
    pools_max_sizes = {k: max(math.floor(self.pool_ratios[i] * (self.max_con - num_zero_ratio_pools)),
                              1 if self.pool_ratios[i] >= 0. else 0)
                       for i, k in enumerate(self.pool_names)}

    # Pools_max_sizes = {k: self.max_con for k in self.pool_names}

    # Fixing sizes
    tot = 0
    for i, (k, v) in enumerate(pools_max_sizes.items()):
        assert v > 0 or self.pool_ratios[i] < 0, "Cannot create pools given the provided max connection count"

        # Edit: to solve the teacher not engaging with more than two students.
        tot += v
    assert tot <= self.max_con, \
        "Cannot create pools given the provided max connection count"
    pools_max_sizes[self.pool_names[-1]] += (self.max_con - tot)

    # Storing fixed sizes in the previously created pool triples & building additional index
    for pool_name, pool_contents_max_con_and_p2p in self.pool_name_to_pool_triple.items():
        pool_contents_max_con_and_p2p[1] = pools_max_sizes[pool_name]  # Fixing the second element of the triple

        pool, _, p2p = pool_contents_max_con_and_p2p
        p2p_name = None
        for k, v in self.p2p_name_to_p2p.items():
            if v == p2p:
                p2p_name = k
                break
        if p2p_name not in self.p2p_name_and_pool_name_to_pool_triple:
            self.p2p_name_and_pool_name_to_pool_triple[p2p_name] = {}
            self.p2p_to_pool_names[p2p] = []
        self.p2p_name_and_pool_name_to_pool_triple[p2p_name][pool_name] = (
            pool_contents_max_con_and_p2p)
        self.p2p_to_pool_names[p2p].append(pool_name)

    # Time markers
    self.__last_sent_time = 0.
    self.__last_recv_time = 0.

max_con instance-attribute

max_con = max_connections

pool_count instance-attribute

pool_count = len(pool_name_to_p2p_name_and_ratio)

pool_names instance-attribute

pool_names = list(keys())

pool_ratios instance-attribute

pool_ratios = [(p2p_name_and_ratio[1]) for p2p_name_and_ratio in (values())]

p2p_name_to_p2p instance-attribute

p2p_name_to_p2p = p2p_name_to_p2p

p2p_name_and_pool_name_to_pool_triple instance-attribute

p2p_name_and_pool_name_to_pool_triple = {}

p2p_to_pool_names instance-attribute

p2p_to_pool_names = {}

pool_name_to_added_in_last_update instance-attribute

pool_name_to_added_in_last_update = {}

pool_name_to_removed_in_last_update instance-attribute

pool_name_to_removed_in_last_update = {}

pool_name_to_peer_infos instance-attribute

pool_name_to_peer_infos = {p: {} for p in (pool_names)}

peer_id_to_pool_name instance-attribute

peer_id_to_pool_name = {}

peer_id_to_p2p instance-attribute

peer_id_to_p2p = {}

peer_id_to_misc instance-attribute

peer_id_to_misc = {}

peer_id_to_token instance-attribute

peer_id_to_token = {}

pool_name_to_pool_triple instance-attribute

pool_name_to_pool_triple = {k: [set(), 0, p2p_name_to_p2p[pool_name_to_p2p_name_and_ratio[k][0]]] for k in (pool_names)}

conn_routing_fcn async

conn_routing_fcn(connected_peer_infos: list, p2p: P2P) -> dict

Route connected peers to the correct pool based on application-specific logic (async).

This method is the primary extension point of ConnectionPools. Concrete subclasses must override it to implement the routing strategy that maps raw peer-info dictionaries (as returned by the P2P layer) to named pool buckets. The base implementation always raises NotImplementedError.

The returned dictionary drives the update method: for each pool it computes newly arrived peers (to be added) and departed peers (to be removed).

Parameters:

Name Type Description Default
connected_peer_infos list

List of peer-info dicts as returned by P2P.get_connected_peers_info. Each dict typically contains at least 'id', 'direction', 'addrs', and 'connected_at' fields.

required
p2p P2P

The P2P object whose currently connected peers are being routed.

required

Returns:

Type Description
dict

A dict mapping each pool name (that belongs to p2p) to an inner dict of

dict

the form {peer_id: peer_info_dict} for all peers routed to that pool.

Raises:

Type Description
NotImplementedError

Always, because the base class provides no routing logic.

Source code in unaiverse/networking/node/connpool.py
async def conn_routing_fcn(self, connected_peer_infos: list, p2p: P2P) -> dict:
    """Route connected peers to the correct pool based on application-specific logic (async).

    This method is the primary extension point of ``ConnectionPools``. Concrete
    subclasses must override it to implement the routing strategy that maps raw
    peer-info dictionaries (as returned by the P2P layer) to named pool buckets.
    The base implementation always raises ``NotImplementedError``.

    The returned dictionary drives the ``update`` method: for each pool it computes
    newly arrived peers (to be added) and departed peers (to be removed).

    Args:
        connected_peer_infos: List of peer-info dicts as returned by
            ``P2P.get_connected_peers_info``. Each dict typically contains at least
            ``'id'``, ``'direction'``, ``'addrs'``, and ``'connected_at'`` fields.
        p2p: The ``P2P`` object whose currently connected peers are being routed.

    Returns:
        A dict mapping each pool name (that belongs to ``p2p``) to an inner dict of
        the form ``{peer_id: peer_info_dict}`` for all peers routed to that pool.

    Raises:
        NotImplementedError: Always, because the base class provides no routing logic.
    """
    raise NotImplementedError("You must implement conn_routing_fcn!")

disconnect async staticmethod

disconnect(p2p: P2P, peer_id: str) -> bool

Disconnect from a specific peer on a P2P network (async).

Calls P2P.disconnect_from and translates a P2PError into a False return value so callers do not need to catch the exception. This method does not remove the peer from any internal pool bookkeeping; use remove for the full teardown.

Parameters:

Name Type Description Default
p2p P2P

The P2P network object through which the peer is connected.

required
peer_id str

The peer ID to disconnect from.

required

Returns:

Type Description
bool

True if the disconnection call succeeded without raising P2PError,

bool

False otherwise.

Source code in unaiverse/networking/node/connpool.py
@staticmethod
async def disconnect(p2p: P2P, peer_id: str) -> bool:
    """Disconnect from a specific peer on a P2P network (async).

    Calls ``P2P.disconnect_from`` and translates a ``P2PError`` into a ``False``
    return value so callers do not need to catch the exception. This method does
    not remove the peer from any internal pool bookkeeping; use ``remove`` for the
    full teardown.

    Args:
        p2p: The ``P2P`` network object through which the peer is connected.
        peer_id: The peer ID to disconnect from.

    Returns:
        ``True`` if the disconnection call succeeded without raising ``P2PError``,
        ``False`` otherwise.
    """
    try:
        p2p.disconnect_from(peer_id)
    except P2PError:
        return False
    return True

set_token

set_token(token: str) -> None

Set the authentication token piggybacked on every outgoing message.

The token is appended (along with a trailing inspector-mode bit) to the piggyback field of each Msg created by send and publish. Calling this method replaces the token provided at construction time.

Parameters:

Name Type Description Default
token str

The new authentication token string.

required
Source code in unaiverse/networking/node/connpool.py
def set_token(self, token: str) -> None:
    """Set the authentication token piggybacked on every outgoing message.

    The token is appended (along with a trailing inspector-mode bit) to the
    ``piggyback`` field of each ``Msg`` created by ``send`` and ``publish``.
    Calling this method replaces the token provided at construction time.

    Args:
        token: The new authentication token string.
    """
    self.__token = token

verify_token async

verify_token(token: str, peer_id: str) -> tuple[str | None, str | None]

Verify a token extracted from an incoming message piggyback field (async).

Delegates to the TokenVerifier that was constructed from the public_key supplied at instantiation time. If no public key was provided (i.e., the verifier is None), the method returns (None, None) without performing any check, effectively treating all tokens as invalid. On a successful verification the verifier returns the node ID and CV hash encoded in the token.

Parameters:

Name Type Description Default
token str

The raw token string extracted from the message piggyback payload.

required
peer_id str

The P2P peer ID of the sending peer, used as an additional binding input by the verifier.

required

Returns:

Type Description
str | None

A two-element tuple (node_id, cv_hash) where both elements are strings on

str | None

success, or (None, None) if no verifier is configured or the token is

tuple[str | None, str | None]

invalid.

Source code in unaiverse/networking/node/connpool.py
async def verify_token(self, token: str, peer_id: str) -> tuple[str | None, str | None]:
    """Verify a token extracted from an incoming message piggyback field (async).

    Delegates to the ``TokenVerifier`` that was constructed from the ``public_key``
    supplied at instantiation time. If no public key was provided (i.e., the verifier
    is ``None``), the method returns ``(None, None)`` without performing any check,
    effectively treating all tokens as invalid. On a successful verification the
    verifier returns the node ID and CV hash encoded in the token.

    Args:
        token: The raw token string extracted from the message piggyback payload.
        peer_id: The P2P peer ID of the sending peer, used as an additional binding
            input by the verifier.

    Returns:
        A two-element tuple ``(node_id, cv_hash)`` where both elements are strings on
        success, or ``(None, None)`` if no verifier is configured or the token is
        invalid.
    """
    if self.__token_verifier is None:
        return None, None
    else:
        node_id, cv_hash = self.__token_verifier.verify_token(token, p2p_peer=peer_id)
        return node_id, cv_hash  # If the verification fails, this is None, None

connect async

connect(addresses: list[str], p2p_name: str) -> tuple[str | None, bool]

Connect to a peer via a named P2P network (async).

Looks up the P2P object for p2p_name and delegates to the internal __connect method, which filters addresses (removing loopback and pure TCP entries where possible), sorts by protocol priority (QUIC/UDP first, WebSocket second, raw TCP last), and attempts the connection. If the preferred address set fails, a fallback to the discarded set is tried automatically.

This method only establishes the transport-layer connection; it does not add the peer to any pool. Call add after a successful connect if pool membership is desired.

Parameters:

Name Type Description Default
addresses list[str]

List of multiaddr strings for the target peer. At least one reachable address must be present for the connection to succeed.

required
p2p_name str

Name of the P2P network (key in p2p_name_to_p2p) to use for the outgoing connection.

required

Returns:

Type Description
str | None

A two-element tuple (peer_id, through_relay) where peer_id is the

bool

string ID of the connected peer (or None on failure) and through_relay

tuple[str | None, bool]

is True when the connection was established via a circuit-relay hop.

Source code in unaiverse/networking/node/connpool.py
async def connect(self, addresses: list[str], p2p_name: str) -> tuple[str | None, bool]:
    """Connect to a peer via a named P2P network (async).

    Looks up the ``P2P`` object for ``p2p_name`` and delegates to the internal
    ``__connect`` method, which filters addresses (removing loopback and pure TCP
    entries where possible), sorts by protocol priority (QUIC/UDP first, WebSocket
    second, raw TCP last), and attempts the connection. If the preferred address set
    fails, a fallback to the discarded set is tried automatically.

    This method only establishes the transport-layer connection; it does not add the
    peer to any pool. Call ``add`` after a successful connect if pool membership is
    desired.

    Args:
        addresses: List of multiaddr strings for the target peer. At least one
            reachable address must be present for the connection to succeed.
        p2p_name: Name of the P2P network (key in ``p2p_name_to_p2p``) to use for
            the outgoing connection.

    Returns:
        A two-element tuple ``(peer_id, through_relay)`` where ``peer_id`` is the
        string ID of the connected peer (or ``None`` on failure) and ``through_relay``
        is ``True`` when the connection was established via a circuit-relay hop.
    """
    p2p = self.p2p_name_to_p2p[p2p_name]

    # Connecting
    peer_id, through_relay = await self.__connect(p2p, addresses)
    return peer_id, through_relay

reserve async

reserve(peer_id: str, p2p_name: str) -> str | None

Reserve a relay slot on a remote relay node via a named P2P network (async).

Calls P2P.reserve_on_relay to request a circuit-relay reservation that allows other peers to reach this node through the relay identified by peer_id. A P2PError is silently caught and translated to a None return value.

Parameters:

Name Type Description Default
peer_id str

Peer ID of the relay node to reserve a slot on.

required
p2p_name str

Name of the P2P network (key in p2p_name_to_p2p) through which the relay is reachable.

required

Returns:

Type Description
str | None

A UTC expiration timestamp string for the reservation on success, or None

str | None

if the reservation attempt failed.

Source code in unaiverse/networking/node/connpool.py
async def reserve(self, peer_id: str, p2p_name: str) -> str | None:
    """Reserve a relay slot on a remote relay node via a named P2P network (async).

    Calls ``P2P.reserve_on_relay`` to request a circuit-relay reservation that allows
    other peers to reach this node through the relay identified by ``peer_id``. A
    ``P2PError`` is silently caught and translated to a ``None`` return value.

    Args:
        peer_id: Peer ID of the relay node to reserve a slot on.
        p2p_name: Name of the P2P network (key in ``p2p_name_to_p2p``) through which
            the relay is reachable.

    Returns:
        A UTC expiration timestamp string for the reservation on success, or ``None``
        if the reservation attempt failed.
    """
    p2p = self.p2p_name_to_p2p[p2p_name]
    try:
        return p2p.reserve_on_relay(peer_id)
    except P2PError:
        return None

add

add(peer_info: dict, pool_name: str) -> bool

Add a peer to a named connection pool.

The pool must have available capacity (current size below its maximum). A peer that is already tracked in a different pool is rejected to enforce the invariant that each peer ID appears in at most one pool. A peer that is already in the same pool is silently accepted and its internal state is refreshed.

On success, the method updates all relevant internal indices (peer_id_to_pool_name, peer_id_to_p2p, pool_name_to_peer_infos) and sets the 'misc' field of peer_info from peer_id_to_misc (defaulting to 0 for public peers).

Parameters:

Name Type Description Default
peer_info dict

Peer-info dict containing at least the key 'id' with the peer's ID string. Other fields ('addrs', 'direction', etc.) are stored as-is and returned by get_all_connected_peer_infos.

required
pool_name str

Name of the destination pool (must be a key in pool_name_to_pool_triple).

required

Returns:

Type Description
bool

True if the peer was accepted into the pool, False if the pool is full

bool

or the peer is already registered in a different pool.

Source code in unaiverse/networking/node/connpool.py
def add(self, peer_info: dict, pool_name: str) -> bool:
    """Add a peer to a named connection pool.

    The pool must have available capacity (current size below its maximum). A peer
    that is already tracked in a *different* pool is rejected to enforce the invariant
    that each peer ID appears in at most one pool. A peer that is already in the same
    pool is silently accepted and its internal state is refreshed.

    On success, the method updates all relevant internal indices
    (``peer_id_to_pool_name``, ``peer_id_to_p2p``, ``pool_name_to_peer_infos``) and
    sets the ``'misc'`` field of ``peer_info`` from ``peer_id_to_misc`` (defaulting
    to ``0`` for public peers).

    Args:
        peer_info: Peer-info dict containing at least the key ``'id'`` with the peer's
            ID string. Other fields (``'addrs'``, ``'direction'``, etc.) are stored
            as-is and returned by ``get_all_connected_peer_infos``.
        pool_name: Name of the destination pool (must be a key in
            ``pool_name_to_pool_triple``).

    Returns:
        ``True`` if the peer was accepted into the pool, ``False`` if the pool is full
        or the peer is already registered in a different pool.
    """
    peer_id = peer_info['id']
    pool, max_size, p2p = self.pool_name_to_pool_triple[pool_name]
    if len(pool) < max_size:

        # "hoping" peer IDs are unique, and stopping duplicate cases
        if peer_id in self.peer_id_to_pool_name and self.peer_id_to_pool_name[peer_id] != pool_name:
            return False

        self.peer_id_to_pool_name[peer_id] = pool_name
        self.peer_id_to_p2p[peer_id] = p2p

        # Setting 'misc' field (default is 0, where 0 means public)
        peer_info['misc'] = self.peer_id_to_misc.get(peer_id, 0)

        # Storing (only)
        pool.add(peer_id)
        self.pool_name_to_peer_infos[pool_name][peer_id] = peer_info
        return True
    else:
        return False

remove async

remove(peer_id: str) -> bool

Remove a peer from its pool and disconnect the underlying transport (async).

Looks up the pool that contains peer_id, calls disconnect on the associated P2P object, and then removes the peer from all internal indices: the pool set, pool_name_to_peer_infos, peer_id_to_pool_name, peer_id_to_p2p, and peer_id_to_token. The peer_id_to_misc entry is intentionally preserved so that role/misc information survives a reconnect.

If peer_id is not tracked in any pool, the method returns False immediately without attempting a disconnection.

Parameters:

Name Type Description Default
peer_id str

ID of the peer to remove.

required

Returns:

Type Description
bool

True if the peer was found and the P2P.disconnect_from call succeeded

bool

without raising P2PError, False if the peer was not in any pool or the

bool

disconnection call failed.

Source code in unaiverse/networking/node/connpool.py
async def remove(self, peer_id: str) -> bool:
    """Remove a peer from its pool and disconnect the underlying transport (async).

    Looks up the pool that contains ``peer_id``, calls ``disconnect`` on the
    associated ``P2P`` object, and then removes the peer from all internal indices:
    the pool set, ``pool_name_to_peer_infos``, ``peer_id_to_pool_name``,
    ``peer_id_to_p2p``, and ``peer_id_to_token``. The ``peer_id_to_misc`` entry is
    intentionally preserved so that role/misc information survives a reconnect.

    If ``peer_id`` is not tracked in any pool, the method returns ``False``
    immediately without attempting a disconnection.

    Args:
        peer_id: ID of the peer to remove.

    Returns:
        ``True`` if the peer was found and the ``P2P.disconnect_from`` call succeeded
        without raising ``P2PError``, ``False`` if the peer was not in any pool or the
        disconnection call failed.
    """
    if peer_id in self.peer_id_to_pool_name:
        pool_name = self.peer_id_to_pool_name[peer_id]
        pool, _, p2p = self.pool_name_to_pool_triple[pool_name]

        # Disconnecting
        disc = await ConnectionPools.disconnect(p2p, peer_id)
        pool.remove(peer_id)
        del self.pool_name_to_peer_infos[pool_name][peer_id]
        del self.peer_id_to_pool_name[peer_id]
        del self.peer_id_to_p2p[peer_id]
        if peer_id in self.peer_id_to_token:
            del self.peer_id_to_token[peer_id]

        # Remember to NOT del peer_id_to_misc[peer_id]!
        return disc
    else:
        return False

get_all_connected_peer_infos

get_all_connected_peer_infos(pool_name: str) -> list[dict]

Return the peer-info dicts for every peer currently in a named pool.

Each dict in the returned list is the same object stored internally in pool_name_to_peer_infos, so callers must not mutate it unless they understand the shared-reference implications.

Parameters:

Name Type Description Default
pool_name str

Name of the pool to query (must be a key in pool_name_to_peer_infos).

required

Returns:

Type Description
list[dict]

A list of peer-info dicts, one per connected peer in the pool. The list is

list[dict]

a snapshot copy; subsequent pool changes do not affect it.

Source code in unaiverse/networking/node/connpool.py
def get_all_connected_peer_infos(self, pool_name: str) -> list[dict]:
    """Return the peer-info dicts for every peer currently in a named pool.

    Each dict in the returned list is the same object stored internally in
    ``pool_name_to_peer_infos``, so callers must not mutate it unless they
    understand the shared-reference implications.

    Args:
        pool_name: Name of the pool to query (must be a key in
            ``pool_name_to_peer_infos``).

    Returns:
        A list of peer-info dicts, one per connected peer in the pool. The list is
        a snapshot copy; subsequent pool changes do not affect it.
    """
    return list(self.pool_name_to_peer_infos[pool_name].values())

get_pool_status

get_pool_status() -> dict[str, set]

Return the live set of peer IDs contained in each pool.

The returned sets are the actual internal sets, not copies; mutating them will corrupt the pool state. Use this method for read-only inspection or logging purposes only.

Returns:

Type Description
dict[str, set]

A dict mapping each pool name to the set of peer IDs currently

dict[str, set]

registered in that pool.

Source code in unaiverse/networking/node/connpool.py
def get_pool_status(self) -> dict[str, set]:
    """Return the live set of peer IDs contained in each pool.

    The returned sets are the *actual* internal sets, not copies; mutating them
    will corrupt the pool state. Use this method for read-only inspection or
    logging purposes only.

    Returns:
        A dict mapping each pool name to the ``set`` of peer IDs currently
        registered in that pool.
    """
    return {k: v[0] for k, v in self.pool_name_to_pool_triple.items()}

get_all_connected_peer_ids

get_all_connected_peer_ids() -> list[str]

Return a snapshot list of every peer ID connected across all pools.

Returns:

Type Description
list[str]

A list of peer ID strings drawn from all pools. The list is a copy of the

list[str]

dictionary key view at the moment of the call; subsequent connect or

list[str]

disconnect events do not affect it.

Source code in unaiverse/networking/node/connpool.py
def get_all_connected_peer_ids(self) -> list[str]:
    """Return a snapshot list of every peer ID connected across all pools.

    Returns:
        A list of peer ID strings drawn from all pools. The list is a copy of the
        dictionary key view at the moment of the call; subsequent connect or
        disconnect events do not affect it.
    """
    return list(self.peer_id_to_pool_name.keys())

update async

update() -> tuple[dict, dict]

Refresh all connection pools by reconciling live P2P state with internal bookkeeping (async).

For every registered P2P network, this method calls P2P.get_connected_peers_info to obtain the current set of transport-level connections, passes the result through conn_routing_fcn to map peers to pools, and then computes the symmetric difference between the live set and the internal pool set:

  • Peers present in the live set but absent from the pool are added via add and recorded in pool_name_to_added_in_last_update.
  • Peers absent from the live set but still tracked in the pool are flagged in pool_name_to_removed_in_last_update (the caller is responsible for calling remove on them as needed).

The two result dicts are also stored as instance attributes pool_name_to_added_in_last_update and pool_name_to_removed_in_last_update and can be retrieved after the call via get_added_after_updating and get_removed_after_updating.

Returns:

Type Description
dict

A two-element tuple (added, removed) where each element is a dict mapping

dict

pool names to a set of peer IDs that were added or removed, respectively,

tuple[dict, dict]

during this call. Both dicts contain only pools where a change occurred.

Source code in unaiverse/networking/node/connpool.py
async def update(self) -> tuple[dict, dict]:
    """Refresh all connection pools by reconciling live P2P state with internal bookkeeping (async).

    For every registered P2P network, this method calls ``P2P.get_connected_peers_info``
    to obtain the current set of transport-level connections, passes the result through
    ``conn_routing_fcn`` to map peers to pools, and then computes the symmetric
    difference between the live set and the internal pool set:

    - Peers present in the live set but absent from the pool are added via ``add``
      and recorded in ``pool_name_to_added_in_last_update``.
    - Peers absent from the live set but still tracked in the pool are flagged in
      ``pool_name_to_removed_in_last_update`` (the caller is responsible for calling
      ``remove`` on them as needed).

    The two result dicts are also stored as instance attributes
    ``pool_name_to_added_in_last_update`` and ``pool_name_to_removed_in_last_update``
    and can be retrieved after the call via ``get_added_after_updating`` and
    ``get_removed_after_updating``.

    Returns:
        A two-element tuple ``(added, removed)`` where each element is a dict mapping
        pool names to a ``set`` of peer IDs that were added or removed, respectively,
        during this call. Both dicts contain only pools where a change occurred.
    """
    self.pool_name_to_added_in_last_update = {}
    self.pool_name_to_removed_in_last_update = {}

    for p2p_name, p2p in self.p2p_name_to_p2p.items():
        connected_peer_infos = p2p.get_connected_peers_info()

        if connected_peer_infos is not None:

            # Routing to the right queue / filtering
            pool_name_and_peer_ids_to_peer_info = await self.conn_routing_fcn(connected_peer_infos, p2p)

            # Parsing the generated index
            for pool_name, connected_peer_ids_to_connected_peer_infos \
                    in pool_name_and_peer_ids_to_peer_info.items():
                pool, _, pool_p2p = self.p2p_name_and_pool_name_to_pool_triple[p2p_name][pool_name]
                connected_peer_ids = connected_peer_ids_to_connected_peer_infos.keys()
                new_peer_ids = connected_peer_ids - pool
                lost_peer_ids = pool - connected_peer_ids

                # Clearing disconnected agents
                for lost_peer_id in lost_peer_ids:
                    self.pool_name_to_removed_in_last_update.setdefault(pool_name, set()).add(lost_peer_id)

                # Adding new agents
                for new_peer_id in new_peer_ids:
                    peer_info = connected_peer_ids_to_connected_peer_infos[new_peer_id]
                    if not self.add(peer_info, pool_name=pool_name):
                        break
                    self.pool_name_to_added_in_last_update.setdefault(pool_name, set()).add(new_peer_id)

    return self.pool_name_to_added_in_last_update, self.pool_name_to_removed_in_last_update

get_messages async

get_messages(p2p_name: str, allowed_not_connected_peers: set | None = None) -> list[Msg]

Pop, decode, authenticate, and return all pending messages from a P2P network (async).

The method performs the following pipeline for every raw message dict returned by P2P.pop_messages:

  1. Extracts the cryptographically verified sender ID ('from') and the Base64-encoded payload ('data') from the Go-layer message dict.
  2. Decodes the payload and parses it into a Msg object via Msg.from_bytes.
  3. Enforces a sender-identity check: the sender field inside the Msg payload must match the 'from' field from the network layer; mismatches are silently discarded.
  4. Accepts the message only if the sender is either tracked in peer_id_to_pool_name or present in allowed_not_connected_peers.
  5. Verifies the piggybacked token via verify_token; on success, replaces the piggyback field with the verified node_id concatenated with the inspector-mode bit, and caches the raw token in peer_id_to_token. Messages with missing or invalid tokens are discarded.

Updating __last_recv_time is a side effect that occurs when at least one raw message is present (before filtering).

Parameters:

Name Type Description Default
p2p_name str

Name of the P2P network (key in p2p_name_to_p2p) from which to pop messages.

required
allowed_not_connected_peers set | None

Optional set of peer IDs whose messages are accepted even if those peers are not currently in any pool. Useful for handling messages from peers that are known but not yet tracked (for example, world agents on the rendezvous topic). Defaults to None.

None

Returns:

Type Description
list[Msg]

A list of fully authenticated Msg objects whose piggyback field has

list[Msg]

been replaced with node_id + inspector_mode_bit. Messages that fail any

list[Msg]

step of the pipeline are silently dropped.

Source code in unaiverse/networking/node/connpool.py
async def get_messages(self, p2p_name: str, allowed_not_connected_peers: set | None = None) -> list[Msg]:
    """Pop, decode, authenticate, and return all pending messages from a P2P network (async).

    The method performs the following pipeline for every raw message dict returned by
    ``P2P.pop_messages``:

    1. Extracts the cryptographically verified sender ID (``'from'``) and the
       Base64-encoded payload (``'data'``) from the Go-layer message dict.
    2. Decodes the payload and parses it into a ``Msg`` object via ``Msg.from_bytes``.
    3. Enforces a sender-identity check: the ``sender`` field inside the ``Msg``
       payload must match the ``'from'`` field from the network layer; mismatches are
       silently discarded.
    4. Accepts the message only if the sender is either tracked in
       ``peer_id_to_pool_name`` or present in ``allowed_not_connected_peers``.
    5. Verifies the piggybacked token via ``verify_token``; on success, replaces the
       ``piggyback`` field with the verified ``node_id`` concatenated with the
       inspector-mode bit, and caches the raw token in ``peer_id_to_token``. Messages
       with missing or invalid tokens are discarded.

    Updating ``__last_recv_time`` is a side effect that occurs when at least one raw
    message is present (before filtering).

    Args:
        p2p_name: Name of the P2P network (key in ``p2p_name_to_p2p``) from which
            to pop messages.
        allowed_not_connected_peers: Optional set of peer IDs whose messages are
            accepted even if those peers are not currently in any pool. Useful for
            handling messages from peers that are known but not yet tracked
            (for example, world agents on the rendezvous topic). Defaults to ``None``.

    Returns:
        A list of fully authenticated ``Msg`` objects whose ``piggyback`` field has
        been replaced with ``node_id + inspector_mode_bit``. Messages that fail any
        step of the pipeline are silently dropped.
    """
    # Pop all messages
    p2p = self[p2p_name]
    byte_messages = p2p.pop_messages()  # Pop all messages
    # Process the list of message dictionaries
    processed_messages: list[Msg] = []

    if len(byte_messages) > 0:
        self.__last_recv_time = time.time()

    for i, msg_dict in enumerate(byte_messages):
        msg_dict: dict
        try:
            # Extract and validate required fields from the Go message structure
            # Go structure: {"from":"Qm...", "data":"BASE64_ENCODED_DATA"}
            verified_sender_id = msg_dict.get("from")
            base64_data = msg_dict.get("data")

            # Decode data
            decoded_data = base64.b64decode(base64_data)

            # Attempt to create the higher-level Msg object
            # This assumes Msg.from_bytes can parse your message protocol from decoded_data
            # and that Msg objects store sender, type, channel intrinsically or can be set.
            msg_obj = Msg.from_bytes(decoded_data)

            log.network(f"<<< RECEIVED {msg_obj.content_type} through channel {msg_obj.channel}", sub=p2p.log_sub)

            # --- CRITICAL SECURITY CHECK ---
            # Verify that the sender claimed inside the message payload
            # matches the cryptographically verified sender from the network layer.
            if msg_obj.sender != verified_sender_id:
                log.error(f"SENDER MISMATCH! Network sender '{verified_sender_id}' "
                          f"does not match payload sender '{msg_obj.sender}'. Discarding message.",
                          sub=p2p.log_sub)

                # In a real-world scenario, you might also want to penalize or disconnect
                # from a peer that sends such malformed/spoofed messages.
                continue  # Discard this message

            # filter only valid messages to return
            if (msg_obj.sender in self.peer_id_to_pool_name or  # Check if expected sender
                    (allowed_not_connected_peers is not None and msg_obj.sender in allowed_not_connected_peers)):

                try:
                    token_with_inspector_final_bit = msg_obj.piggyback
                    token = token_with_inspector_final_bit[0:-1]
                    inspector_mode = token_with_inspector_final_bit[-1]
                    node_id, _ = await self.verify_token(token, msg_obj.sender)
                    if node_id is not None:
                        # Replacing piggyback with the node ID and the flag telling if it is inspector
                        msg_obj.piggyback = node_id + inspector_mode
                        processed_messages.append(msg_obj)
                        if msg_obj.sender in self.peer_id_to_pool_name:
                            self.peer_id_to_token[msg_obj.sender] = token
                    else:
                        log.error("Received a message missing expected info in the token payload "
                                  "(discarding it)", sub=p2p.log_sub)
                except Exception as e:
                    log.error(f"Received a message with an invalid piggyback token! (discarding it) [{e}]",
                              sub=p2p.log_sub)

        except ValueError as ve:
            log.error(f"Invalid message created, stopping. Error: {ve}", sub=p2p.log_sub)
            continue  # Skip problematic message
        except (TypeError, binascii.Error) as decode_err:
            log.error(f"Failed to decode Base64 data for a message in batch: {decode_err}. "
                      f"Message dict: {msg_dict}", sub=p2p.log_sub)
            continue  # Skip problematic message
        except Exception as msg_proc_err:  # Catch errors from Msg.from_bytes or attribute setting
            log.error(f"Error processing popped message item {i}: {msg_proc_err}. "
                      f"Message dict: {msg_dict}", sub=p2p.log_sub)
            continue  # Skip problematic message

    return processed_messages

passed_time_since_last_communication

passed_time_since_last_communication()

Return the number of seconds elapsed since the most recent send or receive event.

The reference timestamp is the later of __last_sent_time (updated by send for non-stats message types) and __last_recv_time (updated by get_messages when at least one raw message is present). Both markers start at 0.0, so the returned value may be very large before any communication occurs.

Returns:

Type Description

A non-negative float representing the elapsed wall-clock seconds since the

last send or receive event. Returns 0.0 if the computed difference is

negative, which can happen in rare cases of clock skew.

Source code in unaiverse/networking/node/connpool.py
def passed_time_since_last_communication(self):
    """Return the number of seconds elapsed since the most recent send or receive event.

    The reference timestamp is the later of ``__last_sent_time`` (updated by
    ``send`` for non-stats message types) and ``__last_recv_time`` (updated by
    ``get_messages`` when at least one raw message is present). Both markers start at
    ``0.0``, so the returned value may be very large before any communication occurs.

    Returns:
        A non-negative float representing the elapsed wall-clock seconds since the
        last send or receive event. Returns ``0.0`` if the computed difference is
        negative, which can happen in rare cases of clock skew.
    """
    passed = time.time() - max(self.__last_sent_time, self.__last_recv_time)
    return 0. if passed < 0. else passed

get_added_after_updating

get_added_after_updating(pool_name: str | None = None) -> set | dict

Return the peer IDs that were added during the most recent update call.

The internal dict pool_name_to_added_in_last_update is populated by update and contains only pools where at least one peer was added. Querying a pool name that had no additions will raise a KeyError.

Parameters:

Name Type Description Default
pool_name str | None

Name of a specific pool to query. If None, the entire pool_name_to_added_in_last_update dict is returned. Defaults to None.

None

Returns:

Type Description
set | dict

A set of added peer ID strings when pool_name is given, or a dict

set | dict

mapping pool names to such sets when pool_name is None.

Raises:

Type Description
KeyError

If pool_name is not None and no peers were added to that pool in the last update.

Source code in unaiverse/networking/node/connpool.py
def get_added_after_updating(self, pool_name: str | None = None) -> set | dict:
    """Return the peer IDs that were added during the most recent ``update`` call.

    The internal dict ``pool_name_to_added_in_last_update`` is populated by ``update``
    and contains only pools where at least one peer was added. Querying a pool name
    that had no additions will raise a ``KeyError``.

    Args:
        pool_name: Name of a specific pool to query. If ``None``, the entire
            ``pool_name_to_added_in_last_update`` dict is returned. Defaults to
            ``None``.

    Returns:
        A ``set`` of added peer ID strings when ``pool_name`` is given, or a dict
        mapping pool names to such sets when ``pool_name`` is ``None``.

    Raises:
        KeyError: If ``pool_name`` is not ``None`` and no peers were added to that
            pool in the last update.
    """
    if pool_name is not None:
        return self.pool_name_to_added_in_last_update[pool_name]
    else:
        return self.pool_name_to_added_in_last_update

get_removed_after_updating

get_removed_after_updating(pool_name: str | None = None) -> set | dict

Return the peer IDs that were removed during the most recent update call.

The internal dict pool_name_to_removed_in_last_update is populated by update and contains only pools where at least one peer departed. Note that update records a departure but does not call remove itself; the caller is responsible for acting on the returned set.

Parameters:

Name Type Description Default
pool_name str | None

Name of a specific pool to query. If None, the entire pool_name_to_removed_in_last_update dict is returned. Defaults to None.

None

Returns:

Type Description
set | dict

A set of removed peer ID strings when pool_name is given, or a dict

set | dict

mapping pool names to such sets when pool_name is None.

Raises:

Type Description
KeyError

If pool_name is not None and no peers were removed from that pool in the last update.

Source code in unaiverse/networking/node/connpool.py
def get_removed_after_updating(self, pool_name: str | None = None) -> set | dict:
    """Return the peer IDs that were removed during the most recent ``update`` call.

    The internal dict ``pool_name_to_removed_in_last_update`` is populated by
    ``update`` and contains only pools where at least one peer departed. Note that
    ``update`` records a departure but does not call ``remove`` itself; the caller
    is responsible for acting on the returned set.

    Args:
        pool_name: Name of a specific pool to query. If ``None``, the entire
            ``pool_name_to_removed_in_last_update`` dict is returned. Defaults to
            ``None``.

    Returns:
        A ``set`` of removed peer ID strings when ``pool_name`` is given, or a dict
        mapping pool names to such sets when ``pool_name`` is ``None``.

    Raises:
        KeyError: If ``pool_name`` is not ``None`` and no peers were removed from that
            pool in the last update.
    """
    if pool_name is not None:
        return self.pool_name_to_removed_in_last_update[pool_name]
    else:
        return self.pool_name_to_removed_in_last_update

get_last_token

get_last_token(peer_id: str) -> str | None

Return the last verified authentication token received from a peer.

Tokens are cached in peer_id_to_token by get_messages whenever a message from a pool-tracked peer passes the full token-verification pipeline. The cached token is the raw token string (without the trailing inspector-mode bit).

Parameters:

Name Type Description Default
peer_id str

ID of the peer whose token to retrieve.

required

Returns:

Type Description
str | None

The raw token string if a token has been received and verified from this peer,

str | None

or None if no token is cached for the given peer ID.

Source code in unaiverse/networking/node/connpool.py
def get_last_token(self, peer_id: str) -> str | None:
    """Return the last verified authentication token received from a peer.

    Tokens are cached in ``peer_id_to_token`` by ``get_messages`` whenever a message
    from a pool-tracked peer passes the full token-verification pipeline. The cached
    token is the raw token string (without the trailing inspector-mode bit).

    Args:
        peer_id: ID of the peer whose token to retrieve.

    Returns:
        The raw token string if a token has been received and verified from this peer,
        or ``None`` if no token is cached for the given peer ID.
    """
    return self.peer_id_to_token[peer_id] if peer_id in self.peer_id_to_token else None

is_connected

is_connected(peer_id: str, pool_name: str | None = None) -> bool

Check whether a peer is currently tracked in the connection pools.

When pool_name is omitted, the check is against all pools: the method returns True if peer_id appears in peer_id_to_pool_name. When a specific pool is given, the method additionally verifies that the peer is registered in that exact pool.

Parameters:

Name Type Description Default
peer_id str

ID of the peer to look up.

required
pool_name str | None

Name of a specific pool to restrict the search to. If None, all pools are searched. Defaults to None.

None

Returns:

Type Description
bool

True if the peer is found (in the specified pool when pool_name is

bool

given, or in any pool otherwise), False if the peer is not connected.

Source code in unaiverse/networking/node/connpool.py
def is_connected(self, peer_id: str, pool_name: str | None = None) -> bool:
    """Check whether a peer is currently tracked in the connection pools.

    When ``pool_name`` is omitted, the check is against all pools: the method
    returns ``True`` if ``peer_id`` appears in ``peer_id_to_pool_name``. When a
    specific pool is given, the method additionally verifies that the peer is
    registered in that exact pool.

    Args:
        peer_id: ID of the peer to look up.
        pool_name: Name of a specific pool to restrict the search to. If ``None``,
            all pools are searched. Defaults to ``None``.

    Returns:
        ``True`` if the peer is found (in the specified pool when ``pool_name`` is
        given, or in any pool otherwise), ``False`` if the peer is not connected.
    """
    if pool_name is None:
        return peer_id in self.peer_id_to_pool_name
    else:
        return peer_id in self.peer_id_to_pool_name and pool_name == self.peer_id_to_pool_name[peer_id]

get_pool_of

get_pool_of(peer_id: str) -> str | None

Return the name of the pool that currently contains a peer.

Performs an O(1) lookup in peer_id_to_pool_name. This is the inverse operation of iterating pool_name_to_pool_triple: given a peer ID it immediately identifies which named bucket the peer occupies, which is useful before calling send or when checking pool membership without knowing the direction (inbound vs. outbound) in advance.

Parameters:

Name Type Description Default
peer_id str

ID of the peer to look up.

required

Returns:

Type Description
str | None

The pool name string if the peer is currently tracked in a pool, or

str | None

None if the peer is not connected.

Source code in unaiverse/networking/node/connpool.py
def get_pool_of(self, peer_id: str) -> str | None:
    """Return the name of the pool that currently contains a peer.

    Performs an O(1) lookup in ``peer_id_to_pool_name``. This is the inverse
    operation of iterating ``pool_name_to_pool_triple``: given a peer ID it
    immediately identifies which named bucket the peer occupies, which is useful
    before calling ``send`` or when checking pool membership without knowing the
    direction (inbound vs. outbound) in advance.

    Args:
        peer_id: ID of the peer to look up.

    Returns:
        The pool name string if the peer is currently tracked in a pool, or
        ``None`` if the peer is not connected.
    """
    if peer_id in self.peer_id_to_pool_name:
        return self.peer_id_to_pool_name[peer_id]
    else:
        return None

size

size(pool_name: str | None = None) -> int

Return the number of connections in a specific pool or across all pools.

When pool_name is given, the count reflects the number of peer IDs currently in that pool's set. When omitted, all pool sets are summed. The NodeConn subclass overrides this signature to accept a list[str] instead of a single name, enabling multi-pool aggregation in a single call.

Parameters:

Name Type Description Default
pool_name str | None

Name of the pool to query. If None, the total connection count across every pool is returned. Defaults to None.

None

Returns:

Type Description
int

The number of connections in the specified pool, or the grand total when

int

pool_name is None.

Source code in unaiverse/networking/node/connpool.py
def size(self, pool_name: str | None = None) -> int:
    """Return the number of connections in a specific pool or across all pools.

    When ``pool_name`` is given, the count reflects the number of peer IDs currently
    in that pool's set. When omitted, all pool sets are summed. The ``NodeConn``
    subclass overrides this signature to accept a ``list[str]`` instead of a single
    name, enabling multi-pool aggregation in a single call.

    Args:
        pool_name: Name of the pool to query. If ``None``, the total connection count
            across every pool is returned. Defaults to ``None``.

    Returns:
        The number of connections in the specified pool, or the grand total when
        ``pool_name`` is ``None``.
    """
    if pool_name is not None:
        return len(self.pool_name_to_pool_triple[pool_name][0])
    else:
        c = 0
        for v in self.pool_name_to_pool_triple.values():
            c += len(v[0])
        return c

send async

send(peer_id: str, channel_trail: str | None, content_type: str, content: bytes | dict | None = None, p2p: P2P | None = None) -> bool

Send a direct message to a specific peer (async).

Constructs a Msg with the local node's peer ID as sender and the current authentication token (plus a trailing "0" inspector-mode bit) as the piggyback field, then dispatches it through P2P.send_message_to_peer offloaded to a worker thread so the event loop is not blocked.

The channel name is derived from the local peer ID, the target peer ID, and content_type following the pattern "<local_peer_id>::dm:<peer_id>-<content_type>". When channel_trail is a non-empty string, it is appended with a "~" separator.

If content_type is not one of Msg.STATS_UPDATE, Msg.STATS_REQUEST, or Msg.STATS_RESPONSE, the internal __last_sent_time timestamp is updated, which affects passed_time_since_last_communication.

Parameters:

Name Type Description Default
peer_id str

ID of the destination peer.

required
channel_trail str | None

Optional suffix appended to the channel name after a "~" separator. Pass None or an empty string for no suffix.

required
content_type str

Message type identifier (one of the Msg.* constants).

required
content bytes | dict | None

Message payload, either raw bytes, a serialisable dict, or None. Defaults to None.

None
p2p P2P | None

P2P object to use for dispatch. If None, the object is resolved from peer_id_to_p2p; if the peer is not found, the call returns False immediately. Defaults to None.

None

Returns:

Type Description
bool

True if the message was dispatched without raising P2PError,

bool

False if the target peer's P2P object could not be resolved or if

bool

the underlying send call raised P2PError.

Source code in unaiverse/networking/node/connpool.py
async def send(self, peer_id: str, channel_trail: str | None,
               content_type: str, content: bytes | dict | None = None, p2p: P2P | None = None) -> bool:
    """Send a direct message to a specific peer (async).

    Constructs a ``Msg`` with the local node's peer ID as sender and the
    current authentication token (plus a trailing ``"0"`` inspector-mode bit)
    as the piggyback field, then dispatches it through ``P2P.send_message_to_peer``
    offloaded to a worker thread so the event loop is not blocked.

    The channel name is derived from the local peer ID, the target peer ID, and
    ``content_type`` following the pattern
    ``"<local_peer_id>::dm:<peer_id>-<content_type>"``. When ``channel_trail`` is
    a non-empty string, it is appended with a ``"~"`` separator.

    If ``content_type`` is not one of ``Msg.STATS_UPDATE``, ``Msg.STATS_REQUEST``,
    or ``Msg.STATS_RESPONSE``, the internal ``__last_sent_time`` timestamp is
    updated, which affects ``passed_time_since_last_communication``.

    Args:
        peer_id: ID of the destination peer.
        channel_trail: Optional suffix appended to the channel name after a ``"~"``
            separator. Pass ``None`` or an empty string for no suffix.
        content_type: Message type identifier (one of the ``Msg.*`` constants).
        content: Message payload, either raw bytes, a serialisable dict, or ``None``.
            Defaults to ``None``.
        p2p: ``P2P`` object to use for dispatch. If ``None``, the object is resolved
            from ``peer_id_to_p2p``; if the peer is not found, the call returns
            ``False`` immediately. Defaults to ``None``.

    Returns:
        ``True`` if the message was dispatched without raising ``P2PError``,
        ``False`` if the target peer's ``P2P`` object could not be resolved or if
        the underlying send call raised ``P2PError``.
    """
    if content_type not in [Msg.STATS_UPDATE, Msg.STATS_REQUEST, Msg.STATS_RESPONSE]:
        self.__last_sent_time = time.time()

    # Getting the right p2p object
    if p2p is None:
        p2p = self.peer_id_to_p2p[peer_id] if peer_id in self.peer_id_to_p2p else None
        if p2p is None:
            log.error("P2P non found for peer id: " + str(peer_id))
            return False

    # Defining channel
    if channel_trail is not None and len(channel_trail) > 0:
        channel = f"{p2p.peer_id}::dm:{peer_id}-{content_type}~{channel_trail}"
    else:
        channel = f"{p2p.peer_id}::dm:{peer_id}-{content_type}"

    log.network(f">>> SENDING {content_type} to {peer_id}", sub=p2p.log_sub)

    # Adding sender info here
    msg = Msg(sender=p2p.peer_id,
              content_type=content_type,
              content=content,
              channel=channel,
              piggyback=self.__token + "0")  # Adding inspector-mode bit (dummy bit here)

    # Sending direct message (offloaded to a thread so the event loop stays unblocked)
    try:
        await asyncio.to_thread(p2p.send_message_to_peer, channel, msg_bytes=msg.to_bytes())
        return True
    except P2PError as e:
        log.error("Sending error is: " + str(e), sub=p2p.log_sub)
        return False

subscribe async

subscribe(peer_id: str, channel: str, default_p2p_name: str | None = None) -> bool

Subscribe to a GossipSub topic on the P2P network that owns a peer (async).

Resolves the P2P object to use via a three-step fallback:

  1. If peer_id matches a local P2P node's own peer ID (i.e., the caller itself owns that ID), that P2P object is used.
  2. Otherwise, if the peer is tracked in peer_id_to_p2p, its registered P2P object is used.
  3. Finally, if default_p2p_name is provided, the named P2P object from p2p_name_to_p2p is used as a last resort.

If none of the above resolves a P2P object, the method returns False. Both P2PError and ValueError raised by P2P.subscribe_to_topic are caught and translated to a False return value.

Parameters:

Name Type Description Default
peer_id str

Peer ID used to identify which P2P network to subscribe on.

required
channel str

GossipSub topic name to subscribe to.

required
default_p2p_name str | None

Name of the P2P network to use as a fallback when the peer cannot be resolved from the internal indices. Defaults to None.

None

Returns:

Type Description
bool

True if the subscription succeeded, False if no P2P object could

bool

be resolved or if the subscription call raised an exception.

Source code in unaiverse/networking/node/connpool.py
async def subscribe(self, peer_id: str, channel: str, default_p2p_name: str | None = None) -> bool:
    """Subscribe to a GossipSub topic on the P2P network that owns a peer (async).

    Resolves the ``P2P`` object to use via a three-step fallback:

    1. If ``peer_id`` matches a local P2P node's own peer ID (i.e., the caller
       itself owns that ID), that ``P2P`` object is used.
    2. Otherwise, if the peer is tracked in ``peer_id_to_p2p``, its registered
       ``P2P`` object is used.
    3. Finally, if ``default_p2p_name`` is provided, the named ``P2P`` object from
       ``p2p_name_to_p2p`` is used as a last resort.

    If none of the above resolves a ``P2P`` object, the method returns ``False``.
    Both ``P2PError`` and ``ValueError`` raised by ``P2P.subscribe_to_topic`` are
    caught and translated to a ``False`` return value.

    Args:
        peer_id: Peer ID used to identify which ``P2P`` network to subscribe on.
        channel: GossipSub topic name to subscribe to.
        default_p2p_name: Name of the ``P2P`` network to use as a fallback when the
            peer cannot be resolved from the internal indices. Defaults to ``None``.

    Returns:
        ``True`` if the subscription succeeded, ``False`` if no ``P2P`` object could
        be resolved or if the subscription call raised an exception.
    """

    # Getting the right p2p object
    p2p = None
    for _p2p in self.p2p_to_pool_names.keys():
        if _p2p.peer_id == peer_id:
            p2p = _p2p
            break
    if p2p is None and peer_id in self.peer_id_to_p2p:
        p2p = self.peer_id_to_p2p[peer_id]
    if p2p is None:
        if default_p2p_name is not None:
            p2p = self.p2p_name_to_p2p[default_p2p_name]
        else:
            return False

    try:
        p2p.subscribe_to_topic(channel)
    except (P2PError, ValueError):
        return False
    return True

unsubscribe async

unsubscribe(peer_id: str, channel: str, default_p2p_name: str | None = None) -> bool

Unsubscribe from a GossipSub topic on the P2P network that owns a peer (async).

Applies the same three-step P2P resolution logic used by subscribe: local peer ID match, then peer_id_to_p2p lookup, then default_p2p_name fallback. If resolution fails, returns False without raising.

Both P2PError and ValueError raised by P2P.unsubscribe_from_topic are caught and translated to a False return value so callers do not need to handle them explicitly.

Parameters:

Name Type Description Default
peer_id str

Peer ID used to identify which P2P network to unsubscribe on.

required
channel str

GossipSub topic name to unsubscribe from.

required
default_p2p_name str | None

Name of the P2P network to use as a fallback when the peer cannot be resolved from the internal indices. Defaults to None.

None

Returns:

Type Description
bool

True if the unsubscription succeeded, False if no P2P object

bool

could be resolved or if the unsubscription call raised an exception.

Source code in unaiverse/networking/node/connpool.py
async def unsubscribe(self, peer_id: str, channel: str, default_p2p_name: str | None = None) -> bool:
    """Unsubscribe from a GossipSub topic on the P2P network that owns a peer (async).

    Applies the same three-step ``P2P`` resolution logic used by ``subscribe``:
    local peer ID match, then ``peer_id_to_p2p`` lookup, then ``default_p2p_name``
    fallback. If resolution fails, returns ``False`` without raising.

    Both ``P2PError`` and ``ValueError`` raised by ``P2P.unsubscribe_from_topic``
    are caught and translated to a ``False`` return value so callers do not need
    to handle them explicitly.

    Args:
        peer_id: Peer ID used to identify which ``P2P`` network to unsubscribe on.
        channel: GossipSub topic name to unsubscribe from.
        default_p2p_name: Name of the ``P2P`` network to use as a fallback when the
            peer cannot be resolved from the internal indices. Defaults to ``None``.

    Returns:
        ``True`` if the unsubscription succeeded, ``False`` if no ``P2P`` object
        could be resolved or if the unsubscription call raised an exception.
    """

    # Getting the right p2p object
    p2p = None
    for _p2p in self.p2p_to_pool_names.keys():
        if _p2p.peer_id == peer_id:
            p2p = _p2p
            break
    if p2p is None and peer_id in self.peer_id_to_p2p:
        p2p = self.peer_id_to_p2p[peer_id]
    if p2p is None:
        if default_p2p_name is not None:
            p2p = self.p2p_name_to_p2p[default_p2p_name]
        else:
            return False

    try:
        p2p.unsubscribe_from_topic(channel)
    except (P2PError, ValueError):
        return False
    return True

publish async

publish(peer_id: str, channel: str, content_type: str, content: bytes | dict | tuple | None = None) -> bool

Publish a message to a GossipSub topic on the P2P network that owns a peer (async).

Resolves the P2P object with a two-step strategy: first checks whether peer_id is one of the local node's own peer IDs (iterating p2p_to_pool_names), then falls back to peer_id_to_p2p. If neither yields a P2P object, the method returns False immediately.

The Msg is constructed with the resolved P2P node's peer ID as sender and the current authentication token plus "0" inspector-mode bit as the piggyback field, mirroring the construction used in send. Dispatch is synchronous (via P2P.broadcast_message). A P2PError is caught and translated to a False return value.

Parameters:

Name Type Description Default
peer_id str

Peer ID used to identify which P2P network to publish on. This is typically the local node's own public or world peer ID.

required
channel str

GossipSub topic name to publish to.

required
content_type str

Message type identifier (one of the Msg.* constants).

required
content bytes | dict | tuple | None

Message payload - raw bytes, a serialisable dict, a tuple, or None. Defaults to None.

None

Returns:

Type Description
bool

True if the broadcast succeeded without raising P2PError, False

bool

if the P2P object could not be resolved or if broadcast_message

bool

raised P2PError.

Source code in unaiverse/networking/node/connpool.py
async def publish(self, peer_id: str, channel: str,
                  content_type: str, content: bytes | dict | tuple | None = None) -> bool:
    """Publish a message to a GossipSub topic on the P2P network that owns a peer (async).

    Resolves the ``P2P`` object with a two-step strategy: first checks whether
    ``peer_id`` is one of the local node's own peer IDs (iterating
    ``p2p_to_pool_names``), then falls back to ``peer_id_to_p2p``. If neither
    yields a ``P2P`` object, the method returns ``False`` immediately.

    The ``Msg`` is constructed with the resolved ``P2P`` node's peer ID as sender
    and the current authentication token plus ``"0"`` inspector-mode bit as the
    piggyback field, mirroring the construction used in ``send``. Dispatch is
    synchronous (via ``P2P.broadcast_message``). A ``P2PError`` is caught and
    translated to a ``False`` return value.

    Args:
        peer_id: Peer ID used to identify which ``P2P`` network to publish on.
            This is typically the local node's own public or world peer ID.
        channel: GossipSub topic name to publish to.
        content_type: Message type identifier (one of the ``Msg.*`` constants).
        content: Message payload - raw bytes, a serialisable dict, a tuple, or
            ``None``. Defaults to ``None``.

    Returns:
        ``True`` if the broadcast succeeded without raising ``P2PError``, ``False``
        if the ``P2P`` object could not be resolved or if ``broadcast_message``
        raised ``P2PError``.
    """

    # Getting the right p2p object
    p2p = None
    for _p2p in self.p2p_to_pool_names.keys():
        if _p2p.peer_id == peer_id:
            p2p = _p2p
            break
    if p2p is None:
        p2p = self.peer_id_to_p2p[peer_id]
    if p2p is None:
        return False

    log.network(f">>> PUBLISHING {content_type} to {channel}", sub=p2p.log_sub)

    # Adding sender info here
    msg = Msg(sender=p2p.peer_id,
              content_type=content_type,
              content=content,
              channel=channel,
              piggyback=self.__token + "0")  # Adding inspector-mode bit (dummy bit here)

    # Sending message via GossipSub
    try:
        p2p.broadcast_message(channel, msg_bytes=msg.to_bytes())

        # If the line above executes without raising an error, it was successful.
        return True
    except P2PError:

        # If send_message_to_peer fails, it will raise a P2PError. We catch it here.
        return False

NodeConn

NodeConn(max_connections: int, p2p_u: P2P, p2p_w: P2P, is_world_node: bool, public_key: str, token: str)

Bases: ConnectionPools

A UNaIVERSE-aware connection pool that routes peers by role and network type.

NodeConn is a concrete subclass of ConnectionPools that implements the conn_routing_fcn extension point for the UNaIVERSE node architecture. It manages two physical P2P networks:

  • p2p_public (P2P_PUBLIC) - the universally reachable network used for connections with arbitrary peers not yet joined to a world.
  • p2p_world (P2P_WORLD) - the private network shared only with peers that belong to the same world.

Each physical network is split into inbound and outbound pools, and the world network is further subdivided by peer role: WORLD_AGENTS, WORLD_MASTERS, and WORLD_NODE. The pool constants (IN_PUBLIC, OUT_PUBLIC, IN_WORLD_AGENTS, etc.) are class-level strings that can be used as keys wherever a pool name is expected.

In addition to the inherited pool bookkeeping, NodeConn maintains:

  • world_agents_set and world_masters_set: sets of peer IDs that are allowed on the private network, updated via set_world_agents_list and set_world_masters_list (or incrementally via add_to_world_agents_list and add_to_world_masters_list).
  • world_node_peer_id: the peer ID of the world node when this instance belongs to an agent, or None when operating as a world node itself.
  • peer_id_to_addrs: cached multiaddr lists for known peers, used by find_addrs_by_role and get_addrs to locate peers before a transport connection is established.
  • rendezvous_tag: a monotonically increasing counter used to detect and apply only the freshest rendezvous-topic snapshots.

Aggregated pool sets such as PUBLIC, WORLD, ALL, OUTGOING, and INCOMING are provided as class attributes for convenient multi-pool queries via is_connected, size, get_all_connected_peer_infos, and related methods.

Attributes:

Name Type Description
p2p_public

The public-network P2P object (shortcut for p2p_name_to_p2p[P2P_PUBLIC]).

p2p_world

The world/private-network P2P object (shortcut for p2p_name_to_p2p[P2P_WORLD]).

world_agents_set

Set of peer IDs recognised as world agents on the private network.

world_masters_set

Set of peer IDs recognised as world masters on the private network.

world_masters_first

Peer ID of the first world master added, or None if no masters are registered.

world_agents_and_world_masters_set

Union of world_agents_set and world_masters_set, kept in sync by every list-mutation method.

world_node_peer_id

Peer ID of the world node, or None if not yet set.

inspector_peer_id

Peer ID of the inspector node, or None if not present.

role_to_peer_ids

Mapping from integer role bitmask to the set of peer IDs currently holding that role.

peer_id_to_addrs

Mapping from peer ID to the list of known multiaddr strings for that peer.

rendezvous_tag

Monotonic update counter for the rendezvous topic snapshot. Starts at -1 and is updated by set_world_agents_and_world_masters_lists_from_rendezvous.

Initialize a NodeConn with role-aware pool layout for a UNaIVERSE node.

Invokes ConnectionPools.__init__ with a fixed pool configuration that partitions max_connections across the public and world P2P networks. Capacity ratios differ between world-node and regular-agent deployments:

  • Regular agent: 25% public (in+out equally split), 75% world agents (in+out). World-node, world-masters, and inspector pools are disabled (ratio 0.0).
  • World node: 25% public, 50% world agents, 25% world masters; the world-node pools are set to -1 (unlimited overflow) since a world serves any number of incoming agents.

After the parent constructor, convenience references p2p_public and p2p_world are stored, and all role/agent tracking attributes are initialised to their empty/sentinel values.

Parameters:

Name Type Description Default
max_connections int

Maximum total number of simultaneous connections across all pools.

required
p2p_u P2P

P2P object for the public network (mapped to P2P_PUBLIC).

required
p2p_w P2P

P2P object for the world/private network (mapped to P2P_WORLD).

required
is_world_node bool

True when this node operates as a world node, which enables the world-masters and world-node pools and adjusts pool ratios accordingly.

required
public_key str

PEM-encoded public key used to verify tokens in incoming messages. Passed through to ConnectionPools.__init__ and stored in the internal TokenVerifier.

required
token str

Authentication token appended as piggyback to every outgoing message. Passed through to ConnectionPools.__init__.

required

Raises:

Type Description
AssertionError

Propagated from ConnectionPools.__init__ if the pool ratios or connection count constraints are violated.

Source code in unaiverse/networking/node/connpool.py
def __init__(self, max_connections: int, p2p_u: P2P, p2p_w: P2P,
             is_world_node: bool, public_key: str, token: str) -> None:
    """Initialize a ``NodeConn`` with role-aware pool layout for a UNaIVERSE node.

    Invokes ``ConnectionPools.__init__`` with a fixed pool configuration that
    partitions ``max_connections`` across the public and world P2P networks.
    Capacity ratios differ between world-node and regular-agent deployments:

    - Regular agent: 25% public (in+out equally split), 75% world agents (in+out).
      World-node, world-masters, and inspector pools are disabled (ratio ``0.0``).
    - World node: 25% public, 50% world agents, 25% world masters; the world-node
      pools are set to ``-1`` (unlimited overflow) since a world serves any number
      of incoming agents.

    After the parent constructor, convenience references ``p2p_public`` and
    ``p2p_world`` are stored, and all role/agent tracking attributes are
    initialised to their empty/sentinel values.

    Args:
        max_connections: Maximum total number of simultaneous connections across all
            pools.
        p2p_u: ``P2P`` object for the public network (mapped to ``P2P_PUBLIC``).
        p2p_w: ``P2P`` object for the world/private network (mapped to
            ``P2P_WORLD``).
        is_world_node: ``True`` when this node operates as a world node, which
            enables the world-masters and world-node pools and adjusts pool ratios
            accordingly.
        public_key: PEM-encoded public key used to verify tokens in incoming
            messages. Passed through to ``ConnectionPools.__init__`` and stored in
            the internal ``TokenVerifier``.
        token: Authentication token appended as piggyback to every outgoing
            message. Passed through to ``ConnectionPools.__init__``.

    Raises:
        AssertionError: Propagated from ``ConnectionPools.__init__`` if the pool
            ratios or connection count constraints are violated.
    """
    super().__init__(max_connections=max_connections,
                     p2p_name_to_p2p={
                         NodeConn.P2P_PUBLIC: p2p_u,
                         NodeConn.P2P_WORLD: p2p_w,
                     },
                     pool_name_to_p2p_name_and_ratio={
                         NodeConn.IN_PUBLIC: [NodeConn.P2P_PUBLIC, 0.25 / 2. if not is_world_node else 0.25 / 2.],
                         NodeConn.OUT_PUBLIC: [NodeConn.P2P_PUBLIC, 0.25 / 2. if not is_world_node else 0.25 / 2.],
                         NodeConn.IN_WORLD_AGENTS: [NodeConn.P2P_WORLD, .75 / 2 if not is_world_node else 0.5 / 2],
                         NodeConn.OUT_WORLD_AGENTS: [NodeConn.P2P_WORLD, .75 / 2 if not is_world_node else 0.5 / 2],
                         NodeConn.IN_WORLD_NODE: [NodeConn.P2P_WORLD, 0. if not is_world_node else -1.],
                         NodeConn.OUT_WORLD_NODE: [NodeConn.P2P_WORLD, 0. if not is_world_node else -1],
                         NodeConn.IN_WORLD_MASTERS: [NodeConn.P2P_WORLD, 0. if not is_world_node else 0.25 / 2.],
                         NodeConn.OUT_WORLD_MASTERS: [NodeConn.P2P_WORLD, 0. if not is_world_node else 0.25 / 2.]
                     },
                     public_key=public_key, token=token)

    # Just for convenience
    self.p2p_public = p2p_u
    self.p2p_world = p2p_w

    # These are the list of all the possible agents that might try to connect when we are in world
    self.world_agents_set = set()
    self.world_masters_set = set()
    self.world_masters_first = None
    self.world_agents_and_world_masters_set = set()
    self.world_node_peer_id = None
    self.inspector_peer_id = None
    self.role_to_peer_ids = {}
    self.peer_id_to_addrs = {}

    # Rendezvous
    self.rendezvous_tag = -1

P2P_PUBLIC class-attribute instance-attribute

P2P_PUBLIC = 'p2p_public'

P2P_WORLD class-attribute instance-attribute

P2P_WORLD = 'p2p_world'

IN_PUBLIC class-attribute instance-attribute

IN_PUBLIC = __INBOUND + __ALL_UNIVERSE + __PUBLIC_NET

OUT_PUBLIC class-attribute instance-attribute

OUT_PUBLIC = __OUTBOUND + __ALL_UNIVERSE + __PUBLIC_NET

IN_WORLD_AGENTS class-attribute instance-attribute

IN_WORLD_AGENTS = __INBOUND + __WORLD_AGENTS_ONLY + __PRIVATE_NET

OUT_WORLD_AGENTS class-attribute instance-attribute

OUT_WORLD_AGENTS = __OUTBOUND + __WORLD_AGENTS_ONLY + __PRIVATE_NET

IN_WORLD_NODE class-attribute instance-attribute

IN_WORLD_NODE = __INBOUND + __WORLD_NODE_ONLY + __PRIVATE_NET

OUT_WORLD_NODE class-attribute instance-attribute

OUT_WORLD_NODE = __OUTBOUND + __WORLD_NODE_ONLY + __PRIVATE_NET

IN_WORLD_MASTERS class-attribute instance-attribute

IN_WORLD_MASTERS = __INBOUND + __WORLD_MASTERS_ONLY + __PRIVATE_NET

OUT_WORLD_MASTERS class-attribute instance-attribute

OUT_WORLD_MASTERS = __OUTBOUND + __WORLD_MASTERS_ONLY + __PRIVATE_NET

PUBLIC class-attribute instance-attribute

PUBLIC = {IN_PUBLIC, OUT_PUBLIC}

WORLD_NODE class-attribute instance-attribute

WORLD_NODE = {IN_WORLD_NODE, OUT_WORLD_NODE}

WORLD_AGENTS class-attribute instance-attribute

WORLD_MASTERS class-attribute instance-attribute

WORLD class-attribute instance-attribute

ALL class-attribute instance-attribute

ALL = PUBLIC | WORLD

OUTGOING class-attribute instance-attribute

INCOMING class-attribute instance-attribute

p2p_public instance-attribute

p2p_public = p2p_u

p2p_world instance-attribute

p2p_world = p2p_w

world_agents_set instance-attribute

world_agents_set = set()

world_masters_set instance-attribute

world_masters_set = set()

world_masters_first instance-attribute

world_masters_first = None

world_agents_and_world_masters_set instance-attribute

world_agents_and_world_masters_set = set()

world_node_peer_id instance-attribute

world_node_peer_id = None

inspector_peer_id instance-attribute

inspector_peer_id = None

role_to_peer_ids instance-attribute

role_to_peer_ids = {}

peer_id_to_addrs instance-attribute

peer_id_to_addrs = {}

rendezvous_tag instance-attribute

rendezvous_tag = -1

reset_rendezvous_tag

reset_rendezvous_tag() -> None

Reset the rendezvous tag to its sentinel value of -1.

Setting rendezvous_tag back to -1 causes the next call to set_world_agents_and_world_masters_lists_from_rendezvous to unconditionally apply the next rendezvous snapshot, regardless of its update_count. This is useful after a world reconnect or when the agent needs to force a full refresh of the peer lists.

Source code in unaiverse/networking/node/connpool.py
def reset_rendezvous_tag(self) -> None:
    """Reset the rendezvous tag to its sentinel value of ``-1``.

    Setting ``rendezvous_tag`` back to ``-1`` causes the next call to
    ``set_world_agents_and_world_masters_lists_from_rendezvous`` to unconditionally
    apply the next rendezvous snapshot, regardless of its ``update_count``. This is
    useful after a world reconnect or when the agent needs to force a full refresh
    of the peer lists.
    """
    self.rendezvous_tag = -1

conn_routing_fcn async

conn_routing_fcn(connected_peer_infos: list, p2p: P2P) -> dict

Route each connected peer to a pool based on its network and role (async).

Implements the ConnectionPools.conn_routing_fcn extension point for the UNaIVERSE node topology. The routing logic differs by network:

  • Public network (p2p == p2p_public): peers are routed to IN_PUBLIC (inbound) or OUT_PUBLIC (outbound) purely based on connection direction.
  • World/private network: each peer is classified by cross-referencing its ID against world_agents_set, world_masters_set, world_node_peer_id, and inspector_peer_id. Inbound peers land in IN_WORLD_AGENTS, IN_WORLD_NODE, or IN_WORLD_MASTERS; outbound peers in their OUT_* counterparts. Peers that match none of the known sets are disconnected immediately and a diagnostic log is emitted.

Parameters:

Name Type Description Default
connected_peer_infos list

List of peer-info dicts as returned by P2P.get_connected_peers_info. Each dict must contain at minimum the keys 'id' (peer ID string) and 'direction' ("inbound" or "outbound").

required
p2p P2P

The P2P object whose connected peer list is being routed.

required

Returns:

Type Description
dict

A dict mapping every pool name associated with p2p to an inner dict of

dict

{peer_id: peer_info_dict} for all peers routed to that pool. Pools with

dict

no matching peers map to an empty dict.

Source code in unaiverse/networking/node/connpool.py
async def conn_routing_fcn(self, connected_peer_infos: list, p2p: P2P) -> dict:
    """Route each connected peer to a pool based on its network and role (async).

    Implements the ``ConnectionPools.conn_routing_fcn`` extension point for the
    UNaIVERSE node topology. The routing logic differs by network:

    - Public network (``p2p == p2p_public``): peers are routed to ``IN_PUBLIC``
      (inbound) or ``OUT_PUBLIC`` (outbound) purely based on connection direction.
    - World/private network: each peer is classified by cross-referencing its ID
      against ``world_agents_set``, ``world_masters_set``, ``world_node_peer_id``,
      and ``inspector_peer_id``. Inbound peers land in ``IN_WORLD_AGENTS``,
      ``IN_WORLD_NODE``, or ``IN_WORLD_MASTERS``; outbound peers in their ``OUT_*``
      counterparts. Peers that match none of the known sets are disconnected
      immediately and a diagnostic log is emitted.

    Args:
        connected_peer_infos: List of peer-info dicts as returned by
            ``P2P.get_connected_peers_info``. Each dict must contain at minimum
            the keys ``'id'`` (peer ID string) and ``'direction'`` (``"inbound"``
            or ``"outbound"``).
        p2p: The ``P2P`` object whose connected peer list is being routed.

    Returns:
        A dict mapping every pool name associated with ``p2p`` to an inner dict of
        ``{peer_id: peer_info_dict}`` for all peers routed to that pool. Pools with
        no matching peers map to an empty dict.
    """
    pool_name_and_peer_id_to_peer_info = {k: {} for k in self.p2p_to_pool_names[p2p]}
    public = p2p == self.p2p_public

    for c in connected_peer_infos:
        inbound = c['direction'] == "inbound"
        outbound = c['direction'] == "outbound"
        peer_id = c['id']  # Other fields are: c['addrs'], c['connected_at']

        if public:
            if inbound:
                pool_name_and_peer_id_to_peer_info[NodeConn.IN_PUBLIC][peer_id] = c
            elif outbound:
                pool_name_and_peer_id_to_peer_info[NodeConn.OUT_PUBLIC][peer_id] = c
            else:
                log.critical(f"Connection direction is undefined: {c['direction']}", sub=p2p.log_sub)
        else:
            is_world_agent = peer_id in self.world_agents_set
            is_world_master = peer_id in self.world_masters_set
            is_world_node = self.world_node_peer_id is not None and peer_id == self.world_node_peer_id
            is_inspector = self.inspector_peer_id is not None and peer_id == self.inspector_peer_id
            if not is_world_node and not is_world_master and not is_world_agent and not is_inspector:
                log.cpool("World agents list:  " + str(self.world_agents_set), sub=p2p.log_sub)
                log.cpool("World masters list: " + str(self.world_masters_set), sub=p2p.log_sub)
                log.cpool("World node peer id: " + str(self.world_node_peer_id), sub=p2p.log_sub)
                log.cpool("Inspector peer id: " + str(self.inspector_peer_id), sub=p2p.log_sub)
                log.cpool(f"Unable to determine the peer type for {peer_id}: "
                          f"cannot say if world agent, master, world node, inspector (disconnecting it)",
                          sub=p2p.log_sub)
                await ConnectionPools.disconnect(p2p, peer_id)
                continue

            if inbound:
                pool_name_and_peer_id_to_peer_info[NodeConn.IN_WORLD_AGENTS if is_world_agent else (
                    NodeConn.IN_WORLD_NODE if is_world_node else
                    NodeConn.IN_WORLD_MASTERS)][peer_id] = c
            elif outbound:
                pool_name_and_peer_id_to_peer_info[NodeConn.OUT_WORLD_AGENTS if is_world_agent else (
                    NodeConn.OUT_WORLD_NODE if is_world_node else
                    NodeConn.OUT_WORLD_MASTERS)][peer_id] = c
            else:
                log.critical(f"Connection direction is undefined: {c}", sub=p2p.log_sub)

    return pool_name_and_peer_id_to_peer_info

set_world

set_world(world_peer_id: str | None) -> None

Set the peer ID of the world node.

Updates world_node_peer_id, which is consulted by conn_routing_fcn when classifying private-network connections. Passing None clears the registration and causes any subsequent connection from the former world node peer to be treated as unrecognised (and disconnected) by the routing function.

Parameters:

Name Type Description Default
world_peer_id str | None

Peer ID of the world node to register, or None to clear the current registration.

required
Source code in unaiverse/networking/node/connpool.py
def set_world(self, world_peer_id: str | None) -> None:
    """Set the peer ID of the world node.

    Updates ``world_node_peer_id``, which is consulted by ``conn_routing_fcn``
    when classifying private-network connections. Passing ``None`` clears the
    registration and causes any subsequent connection from the former world node
    peer to be treated as unrecognised (and disconnected) by the routing function.

    Args:
        world_peer_id: Peer ID of the world node to register, or ``None`` to
            clear the current registration.
    """
    self.world_node_peer_id = world_peer_id

set_inspector

set_inspector(inspector_peer_id: str | None) -> None

Set the peer ID of the inspector node.

Updates inspector_peer_id, which is consulted alongside world_agents_set and world_masters_set in conn_routing_fcn when classifying private-network connections. Passing None clears the inspector registration.

Parameters:

Name Type Description Default
inspector_peer_id str | None

Peer ID of the inspector node to register, or None to clear the current registration.

required
Source code in unaiverse/networking/node/connpool.py
def set_inspector(self, inspector_peer_id: str | None) -> None:
    """Set the peer ID of the inspector node.

    Updates ``inspector_peer_id``, which is consulted alongside ``world_agents_set``
    and ``world_masters_set`` in ``conn_routing_fcn`` when classifying private-network
    connections. Passing ``None`` clears the inspector registration.

    Args:
        inspector_peer_id: Peer ID of the inspector node to register, or ``None``
            to clear the current registration.
    """
    self.inspector_peer_id = inspector_peer_id

get_world_peer_id

get_world_peer_id() -> str | None

Return the peer ID of the world node.

Returns the value stored in world_node_peer_id, which is set via set_world. For a regular agent this is the peer ID of the world it has joined; for a world node itself the value is typically None (or its own world-network peer ID if explicitly set).

Returns:

Type Description
str | None

The world node's peer ID string, or None if no world has been

str | None

registered.

Source code in unaiverse/networking/node/connpool.py
def get_world_peer_id(self) -> str | None:
    """Return the peer ID of the world node.

    Returns the value stored in ``world_node_peer_id``, which is set via
    ``set_world``. For a regular agent this is the peer ID of the world it has
    joined; for a world node itself the value is typically ``None`` (or its own
    world-network peer ID if explicitly set).

    Returns:
        The world node's peer ID string, or ``None`` if no world has been
        registered.
    """
    return self.world_node_peer_id

get_first_world_master

get_first_world_master() -> str | None

Return the peer ID of the first registered world master.

world_masters_first is set to the first element of the list passed to set_world_masters_list, and updated incrementally when add_to_world_masters_list is called with the first entry ever added. It is reset to None whenever set_world_masters_list is called with an empty or None list.

This value can be used as a preferred contact point when initiating communication with the world before a full masters list is available.

Returns:

Type Description
str | None

The peer ID string of the first world master, or None if no masters

str | None

have been registered.

Source code in unaiverse/networking/node/connpool.py
def get_first_world_master(self) -> str | None:
    """Return the peer ID of the first registered world master.

    ``world_masters_first`` is set to the first element of the list passed to
    ``set_world_masters_list``, and updated incrementally when
    ``add_to_world_masters_list`` is called with the first entry ever added. It
    is reset to ``None`` whenever ``set_world_masters_list`` is called with an
    empty or ``None`` list.

    This value can be used as a preferred contact point when initiating
    communication with the world before a full masters list is available.

    Returns:
        The peer ID string of the first world master, or ``None`` if no masters
        have been registered.
    """
    return self.world_masters_first

set_addresses_in_peer_info

set_addresses_in_peer_info(peer_id: str, addresses: list[str]) -> None

Update the address list stored inside a peer's pool entry.

Retrieves the 'addrs' list from the peer-info dict in pool_name_to_peer_infos and updates it in place (clear then extend) so that any other code holding a reference to the same list object sees the updated values. The operation is a no-op if the peer is not currently tracked in any connection pool (as determined by in_connection_queues).

Parameters:

Name Type Description Default
peer_id str

Peer ID of the entry to update.

required
addresses list[str]

New list of multiaddr strings to store for the peer.

required
Note

The update is intentionally in-place (rather than replacing the list reference) because other parts of the framework may hold direct references to the same list object.

Source code in unaiverse/networking/node/connpool.py
def set_addresses_in_peer_info(self, peer_id: str, addresses: list[str]) -> None:
    """Update the address list stored inside a peer's pool entry.

    Retrieves the ``'addrs'`` list from the peer-info dict in
    ``pool_name_to_peer_infos`` and updates it in place (clear then extend) so
    that any other code holding a reference to the same list object sees the
    updated values. The operation is a no-op if the peer is not currently tracked
    in any connection pool (as determined by ``in_connection_queues``).

    Args:
        peer_id: Peer ID of the entry to update.
        addresses: New list of multiaddr strings to store for the peer.

    Note:
        The update is intentionally in-place (rather than replacing the list
        reference) because other parts of the framework may hold direct references
        to the same list object.
    """
    if self.in_connection_queues(peer_id):
        addrs = self.pool_name_to_peer_infos[self.get_pool_of(peer_id)][peer_id]['addrs']
        addrs.clear()  # Warning: do not allocate a new list, keep the current one (it is referenced by others)
        for _addrs in addresses:
            addrs.append(_addrs)

set_role

set_role(peer_id: str, new_role: int) -> None

Update the role of a peer and keep role_to_peer_ids consistent.

Stores new_role in peer_id_to_misc (the inherited "misc" field used throughout the pool layer). If the peer is currently tracked in a pool, the 'misc' field of its cached peer-info dict is updated to match. The role_to_peer_ids index is then maintained by removing the peer from its old role set (deleting the set entry if it becomes empty) and inserting it into the new role set.

Parameters:

Name Type Description Default
peer_id str

Peer ID whose role is to be changed.

required
new_role int

New integer role bitmask to assign to the peer.

required
Source code in unaiverse/networking/node/connpool.py
def set_role(self, peer_id: str, new_role: int) -> None:
    """Update the role of a peer and keep ``role_to_peer_ids`` consistent.

    Stores ``new_role`` in ``peer_id_to_misc`` (the inherited "misc" field used
    throughout the pool layer). If the peer is currently tracked in a pool, the
    ``'misc'`` field of its cached peer-info dict is updated to match. The
    ``role_to_peer_ids`` index is then maintained by removing the peer from its
    old role set (deleting the set entry if it becomes empty) and inserting it
    into the new role set.

    Args:
        peer_id: Peer ID whose role is to be changed.
        new_role: New integer role bitmask to assign to the peer.
    """
    cur_role = self.get_role(peer_id)

    # Updating
    self.peer_id_to_misc[peer_id] = new_role

    if self.in_connection_queues(peer_id):
        self.pool_name_to_peer_infos[self.get_pool_of(peer_id)][peer_id]['misc'] = new_role

    # Updating
    if cur_role in self.role_to_peer_ids:
        if peer_id in self.role_to_peer_ids[cur_role]:
            self.role_to_peer_ids[cur_role].remove(peer_id)
        if len(self.role_to_peer_ids[cur_role]) == 0:
            del self.role_to_peer_ids[cur_role]
    if new_role not in self.role_to_peer_ids:
        self.role_to_peer_ids[new_role] = set()
    self.role_to_peer_ids[new_role].add(peer_id)

set_world_agents_list

set_world_agents_list(world_agents_list_peer_infos: list[dict] | None) -> None

Sets the list of all world agents based on a provided list of peer information.

Parameters:

Name Type Description Default
world_agents_list_peer_infos list[dict] | None

A list of dictionaries containing peer information for world agents.

required
Source code in unaiverse/networking/node/connpool.py
def set_world_agents_list(self, world_agents_list_peer_infos: list[dict] | None) -> None:
    """Sets the list of all world agents based on a provided list of peer information.

    Args:
        world_agents_list_peer_infos: A list of dictionaries containing peer information for world agents.
    """

    # Clearing previous information
    to_remove = []
    for peer_id, misc in self.peer_id_to_misc.items():
        if misc & 1 == 1 and misc & 2 == 0:
            to_remove.append((peer_id, misc))

    for peer_id, misc in to_remove:
        del self.peer_id_to_misc[peer_id]
        if peer_id in self.peer_id_to_addrs:
            del self.peer_id_to_addrs[peer_id]
        self.role_to_peer_ids[misc].discard(peer_id)

    # Setting new information
    if world_agents_list_peer_infos is not None and len(world_agents_list_peer_infos) > 0:
        self.world_agents_set = {x['id'] for x in world_agents_list_peer_infos}
        for x in world_agents_list_peer_infos:
            self.peer_id_to_addrs[x['id']] = x['addrs']
            self.set_role(x['id'], x['misc'])
    else:
        self.world_agents_set = set()

    self.world_agents_and_world_masters_set = self.world_agents_set | self.world_masters_set

get_world_masters

get_world_masters() -> set[str]

Returns the set of world masters.

Source code in unaiverse/networking/node/connpool.py
def get_world_masters(self) -> set[str]:
    """Returns the set of world masters."""
    return self.world_masters_set

set_world_masters_list

set_world_masters_list(world_masters_list_peer_infos: list[dict] | None) -> None

Sets the list of all world masters based on a provided list of peer information.

Parameters:

Name Type Description Default
world_masters_list_peer_infos list[dict] | None

A list of dictionaries containing peer information for world masters.

required
Source code in unaiverse/networking/node/connpool.py
def set_world_masters_list(self, world_masters_list_peer_infos: list[dict] | None) -> None:
    """Sets the list of all world masters based on a provided list of peer information.

    Args:
        world_masters_list_peer_infos: A list of dictionaries containing peer information for world masters.
    """

    # Clearing previous information
    to_remove = []
    for peer_id, misc in self.peer_id_to_misc.items():
        if misc & 1 == 1 and misc & 2 == 2:
            to_remove.append((peer_id, misc))

    for peer_id, misc in to_remove:
        del self.peer_id_to_misc[peer_id]
        if peer_id in self.peer_id_to_addrs:
            del self.peer_id_to_addrs[peer_id]
        self.role_to_peer_ids[misc].discard(peer_id)

    # Setting new information
    if world_masters_list_peer_infos is not None and len(world_masters_list_peer_infos) > 0:
        self.world_masters_first = world_masters_list_peer_infos[0]['id']
        self.world_masters_set = {x['id'] for x in world_masters_list_peer_infos}
        for x in world_masters_list_peer_infos:
            self.peer_id_to_addrs[x['id']] = x['addrs']
            self.set_role(x['id'], x['misc'])
    else:
        self.world_masters_first = None
        self.world_masters_set = set()

    self.world_agents_and_world_masters_set = self.world_agents_set | self.world_masters_set

add_to_world_agents_list

add_to_world_agents_list(peer_id: str, addrs: list[str], role: int = -1) -> None

Adds a new world agent to the list.

Parameters:

Name Type Description Default
peer_id str

The peer ID of the new agent.

required
addrs list[str]

A list of addresses for the new agent.

required
role int

The role assigned to the agent.

-1
Source code in unaiverse/networking/node/connpool.py
def add_to_world_agents_list(self, peer_id: str, addrs: list[str], role: int = -1) -> None:
    """Adds a new world agent to the list.

    Args:
        peer_id: The peer ID of the new agent.
        addrs: A list of addresses for the new agent.
        role: The role assigned to the agent.
    """
    self.world_agents_set.add(peer_id)

    # This assumes that the WORLD MASTER/AGENT BIT is the first one
    assert role & 1 == 1, "Expecting the first bit of the role to be 1 for world agents"
    assert role & 2 == 0, "Expecting the second bit of the role to be 0 for world agents"
    self.peer_id_to_addrs[peer_id] = addrs
    self.set_role(peer_id, role)
    self.world_agents_and_world_masters_set = self.world_agents_set | self.world_masters_set

add_to_world_masters_list

add_to_world_masters_list(peer_id: str, addrs: list[str], role: int = -1) -> None

Adds a new world master to the list.

Parameters:

Name Type Description Default
peer_id str

The peer ID of the new master.

required
addrs list[str]

A list of addresses for the new master.

required
role int

The role assigned to the master.

-1
Source code in unaiverse/networking/node/connpool.py
def add_to_world_masters_list(self, peer_id: str, addrs: list[str], role: int = -1) -> None:
    """Adds a new world master to the list.

    Args:
        peer_id: The peer ID of the new master.
        addrs: A list of addresses for the new master.
        role: The role assigned to the master.
    """
    self.world_masters_set.add(peer_id)

    if len(self.world_masters_set) == 1:
        self.world_masters_first = peer_id

    # This assumes that the WORLD MASTER/AGENT BIT is the first one
    assert role & 1 == 1, "Expecting the first bit of the role to be 1 for world masters"
    assert role & 2 == 2, "Expecting the second bit of the role to be 1 for world masters"
    self.peer_id_to_addrs[peer_id] = addrs
    self.set_role(peer_id, role)
    self.world_agents_and_world_masters_set = self.world_agents_set | self.world_masters_set

get_added_after_updating

get_added_after_updating(pool_names: list[str] | None = None) -> dict[str, set]

Retrieves the set of peers added after the last update cycle for specified pools.

Parameters:

Name Type Description Default
pool_names list[str] | None

A list of pool names to check. If None, checks all pools.

None

Returns:

Type Description
dict[str, set]

A dictionary mapping pool names to sets of added peer IDs.

Source code in unaiverse/networking/node/connpool.py
def get_added_after_updating(self, pool_names: list[str] | None = None) -> dict[str, set]:
    """Retrieves the set of peers added after the last update cycle for specified pools.

    Args:
        pool_names: A list of pool names to check. If None, checks all pools.

    Returns:
        A dictionary mapping pool names to sets of added peer IDs.
    """
    if pool_names is not None:
        ret = {}
        for p in pool_names:
            ret[p] = super().get_added_after_updating(p)
        return ret
    else:
        return super().get_added_after_updating()

get_removed_after_updating

get_removed_after_updating(pool_names: list[str] | None = None) -> dict[str, set]

Retrieves the set of peers removed after the last update cycle for specified pools.

Parameters:

Name Type Description Default
pool_names list[str] | None

A list of pool names to check. If None, checks all pools.

None

Returns:

Type Description
dict[str, set]

A dictionary mapping pool names to sets of removed peer IDs.

Source code in unaiverse/networking/node/connpool.py
def get_removed_after_updating(self, pool_names: list[str] | None = None) -> dict[str, set]:
    """Retrieves the set of peers removed after the last update cycle for specified pools.

    Args:
        pool_names: A list of pool names to check. If None, checks all pools.

    Returns:
        A dictionary mapping pool names to sets of removed peer IDs.
    """
    if pool_names is not None:
        ret = {}
        for p in pool_names:
            ret[p] = super().get_removed_after_updating(p)
        return ret
    else:
        return super().get_removed_after_updating()

size

size(pool_names: list[str] | None = None) -> int

Returns the total number of connections across all specified pools.

Parameters:

Name Type Description Default
pool_names list[str] | None

A list of pool names to sum the size of. If None, returns the total size of all pools.

None

Returns:

Type Description
int

The total number of connections.

Source code in unaiverse/networking/node/connpool.py
def size(self, pool_names: list[str] | None = None) -> int:
    """Returns the total number of connections across all specified pools.

    Args:
        pool_names: A list of pool names to sum the size of. If None, returns the total size of all pools.

    Returns:
        The total number of connections.
    """
    if pool_names is None:
        return super().size()
    else:
        c = 0
        for p in pool_names:
            c += super().size(p)
        return c

is_connected

is_connected(peer_id: str, pool_names: list[str] | None = None) -> bool

Checks if a peer is connected in any of the specified pools.

Parameters:

Name Type Description Default
peer_id str

The peer ID to check.

required
pool_names list[str] | None

A list of pool names to search within. If None, searches all pools.

None

Returns:

Type Description
bool

True if the peer is found in any of the pools, otherwise False.

Source code in unaiverse/networking/node/connpool.py
def is_connected(self, peer_id: str, pool_names: list[str] | None = None) -> bool:
    """Checks if a peer is connected in any of the specified pools.

    Args:
        peer_id: The peer ID to check.
        pool_names: A list of pool names to search within. If None, searches all pools.

    Returns:
        True if the peer is found in any of the pools, otherwise False.
    """
    if pool_names is None:
        return super().is_connected(peer_id)
    else:
        for p in pool_names:
            if super().is_connected(peer_id, p):
                return True
        return False

is_public

is_public(peer_id: str) -> bool

Checks if a peer is connected via the public network.

Parameters:

Name Type Description Default
peer_id str

The peer ID to check.

required

Returns:

Type Description
bool

True if the peer is in a public pool, otherwise False.

Source code in unaiverse/networking/node/connpool.py
def is_public(self, peer_id: str) -> bool:
    """Checks if a peer is connected via the public network.

    Args:
        peer_id: The peer ID to check.

    Returns:
        True if the peer is in a public pool, otherwise False.
    """
    pool_name = self.get_pool_of(peer_id)
    return pool_name in NodeConn.PUBLIC

is_world_master

is_world_master(peer_id: str) -> bool

Checks if a peer is a world master.

Parameters:

Name Type Description Default
peer_id str

The peer ID to check.

required

Returns:

Type Description
bool

True if the peer is in a world master pool, otherwise False.

Source code in unaiverse/networking/node/connpool.py
def is_world_master(self, peer_id: str) -> bool:
    """Checks if a peer is a world master.

    Args:
        peer_id: The peer ID to check.

    Returns:
        True if the peer is in a world master pool, otherwise False.
    """
    pool_name = self.get_pool_of(peer_id)
    return pool_name in NodeConn.WORLD_MASTERS

is_world_node

is_world_node(peer_id: str) -> bool

Checks if a peer is the world node.

Parameters:

Name Type Description Default
peer_id str

The peer ID to check.

required

Returns:

Type Description
bool

True if the peer is in a world node pool, otherwise False.

Source code in unaiverse/networking/node/connpool.py
def is_world_node(self, peer_id: str) -> bool:
    """Checks if a peer is the world node.

    Args:
        peer_id: The peer ID to check.

    Returns:
        True if the peer is in a world node pool, otherwise False.
    """
    pool_name = self.get_pool_of(peer_id)
    return pool_name in NodeConn.WORLD_NODE

is_in_world

is_in_world(peer_id: str) -> bool

Checks if a peer is connected to the world network.

Parameters:

Name Type Description Default
peer_id str

The peer ID to check.

required

Returns:

Type Description
bool

True if the peer is in any world pool, otherwise False.

Source code in unaiverse/networking/node/connpool.py
def is_in_world(self, peer_id: str) -> bool:
    """Checks if a peer is connected to the world network.

    Args:
        peer_id: The peer ID to check.

    Returns:
        True if the peer is in any world pool, otherwise False.
    """
    pool_name = self.get_pool_of(peer_id)
    return pool_name in NodeConn.WORLD

get_role

get_role(peer_id: str) -> int

Retrieves the role of a given peer.

Parameters:

Name Type Description Default
peer_id str

The peer ID to query.

required

Returns:

Type Description
int

The integer role of the peer.

Source code in unaiverse/networking/node/connpool.py
def get_role(self, peer_id: str) -> int:
    """Retrieves the role of a given peer.

    Args:
        peer_id: The peer ID to query.

    Returns:
        The integer role of the peer.
    """
    role = self.peer_id_to_misc.get(peer_id, 0)  # 0 means public
    assert role >= 0, "Expecting role to be >= 0"
    assert role & 1 != 0 or role == 0, "Expecting public role to be zero (all-zero-bits)"
    return role

get_addrs

get_addrs(peer_id: str) -> list[str] | None

Retrieves the list of addresses for a given peer.

Parameters:

Name Type Description Default
peer_id str

The peer ID to query.

required

Returns:

Type Description
list[str] | None

A list of addresses for the peer.

Source code in unaiverse/networking/node/connpool.py
def get_addrs(self, peer_id: str) -> list[str] | None:
    """Retrieves the list of addresses for a given peer.

    Args:
        peer_id: The peer ID to query.

    Returns:
        A list of addresses for the peer.
    """
    if peer_id in self.peer_id_to_addrs:
        return self.peer_id_to_addrs.get(peer_id)
    else:
        return None

in_connection_queues

in_connection_queues(peer_id: str) -> bool

Checks if a peer ID exists in any connection pool.

Parameters:

Name Type Description Default
peer_id str

The peer ID to check.

required

Returns:

Type Description
bool

True if the peer is found in any pool, otherwise False.

Source code in unaiverse/networking/node/connpool.py
def in_connection_queues(self, peer_id: str) -> bool:
    """Checks if a peer ID exists in any connection pool.

    Args:
        peer_id: The peer ID to check.

    Returns:
        True if the peer is found in any pool, otherwise False.
    """
    return peer_id in self.peer_id_to_pool_name

find_addrs_by_role

find_addrs_by_role(role: int, return_peer_ids_too: bool = False, discard_yourself: bool = False) -> list | tuple[list, list]

Finds all addresses of peers with a specific role.

Parameters:

Name Type Description Default
role int

The integer role to search for.

required
return_peer_ids_too bool

A boolean to also return the peer IDs.

False
discard_yourself bool

True to avoid reporting your info on the returned data.

False

Returns:

Type Description
list | tuple[list, list]

A list of lists of addresses, and optionally a list of peer IDs.

Source code in unaiverse/networking/node/connpool.py
def find_addrs_by_role(self, role: int, return_peer_ids_too: bool = False,
                       discard_yourself: bool = False) -> list | tuple[list, list]:
    """Finds all addresses of peers with a specific role.

    Args:
        role: The integer role to search for.
        return_peer_ids_too: A boolean to also return the peer IDs.
        discard_yourself: True to avoid reporting your info on the returned data.

    Returns:
        A list of lists of addresses, and optionally a list of peer IDs.
    """
    if role in self.role_to_peer_ids:
        peer_ids = self.role_to_peer_ids[role]
    else:
        if not return_peer_ids_too:
            return []
        else:
            return [], []
    ret_addrs = []
    ret_peer_ids = []
    for peer_id in peer_ids:
        if discard_yourself and (peer_id == self.p2p_public.peer_id or peer_id == self.p2p_world.peer_id):
            continue
        addrs = self.get_addrs(peer_id)
        if addrs is not None:
            ret_addrs.append(addrs)
            ret_peer_ids.append(peer_id)
    if not return_peer_ids_too:
        return ret_addrs
    else:
        return ret_addrs, ret_peer_ids

count_by_role

count_by_role(role: int) -> int

Counts the number of peers with a specific role.

Parameters:

Name Type Description Default
role int

The integer role to count.

required

Returns:

Type Description
int

The number of peers with that role.

Source code in unaiverse/networking/node/connpool.py
def count_by_role(self, role: int) -> int:
    """Counts the number of peers with a specific role.

    Args:
        role: The integer role to count.

    Returns:
        The number of peers with that role.
    """
    if role in self.role_to_peer_ids:
        return len(self.role_to_peer_ids[role])
    else:
        return 0

get_all_connected_peer_infos

get_all_connected_peer_infos(pool_names: list[str] | set[str]) -> list[dict]

Retrieves a list of all peer info dictionaries for the specified pools.

Parameters:

Name Type Description Default
pool_names list[str] | set[str]

A list or set of pool names to query.

required

Returns:

Type Description
list[dict]

A list of dictionaries containing peer information.

Source code in unaiverse/networking/node/connpool.py
def get_all_connected_peer_infos(self, pool_names: list[str] | set[str]) -> list[dict]:
    """Retrieves a list of all peer info dictionaries for the specified pools.

    Args:
        pool_names: A list or set of pool names to query.

    Returns:
        A list of dictionaries containing peer information.
    """
    ret = []
    for p in pool_names:
        ret += super().get_all_connected_peer_infos(p)
    return ret

set_world_agents_and_world_masters_lists_from_rendezvous async

set_world_agents_and_world_masters_lists_from_rendezvous() -> None

Updates the lists of world agents and masters using data from the rendezvous topic (async).

Source code in unaiverse/networking/node/connpool.py
async def set_world_agents_and_world_masters_lists_from_rendezvous(self) -> None:
    """Updates the lists of world agents and masters using data from the rendezvous topic (async)."""
    rendezvous_state = self.p2p_world.get_rendezvous_peers_info()

    if rendezvous_state is not None:
        tag = rendezvous_state.get('update_count', -1)

        if tag > self.rendezvous_tag:
            self.rendezvous_tag = tag
            rendezvous_peer_infos = rendezvous_state.get('peers', [])

            world_agents_peer_infos = []
            world_masters_peer_infos = []

            log.cpool(f"Rendezvous peer infos (tag: {tag}, peers: "
                      f"{len(rendezvous_peer_infos)} peers)", sub=self.p2p_world.log_sub)
            log.cpool(f"Rendezvous message included peer IDs: "
                      f"{[p['id'] for p in rendezvous_peer_infos]})", sub="prv")

            for c in rendezvous_peer_infos:
                if c['addrs'] is None:
                    log.cpool(f"Skipping a peer with None addrs (unexpected)", sub=self.p2p_world.log_sub)
                    continue
                # if len(c['addrs']) == 0:
                #    log.cpool(f"Skipping a peer with zero-length addrs-list (unexpected)")
                #    continue
                if (c['misc'] & 1) == 1 and (c['misc'] & 2) == 0:
                    world_agents_peer_infos.append(c)
                elif (c['misc'] & 1) == 1 and (c['misc'] & 2) == 2:
                    world_masters_peer_infos.append(c)
                else:
                    log.critical("Unexpected value of the 'misc' field: " + str(c), sub=self.p2p_world.log_sub)

            # Updating lists
            self.set_world_agents_list(world_agents_peer_infos)
            self.set_world_masters_list(world_masters_peer_infos)

get_cv_hash_from_last_token async

get_cv_hash_from_last_token(peer_id: str) -> str | None

Retrieves the CV hash from the last token received from a peer (async).

Parameters:

Name Type Description Default
peer_id str

The peer ID to query.

required

Returns:

Type Description
str | None

The CV hash string, or None if not found.

Source code in unaiverse/networking/node/connpool.py
async def get_cv_hash_from_last_token(self, peer_id: str) -> str | None:
    """Retrieves the CV hash from the last token received from a peer (async).

    Args:
        peer_id: The peer ID to query.

    Returns:
        The CV hash string, or None if not found.
    """
    token = self.get_last_token(peer_id)
    if token is not None:
        _, cv_hash = await self.verify_token(token, peer_id)
        return cv_hash
    else:
        return None

remove async

remove(peer_id: str) -> None

Removes a peer and its associated information from all lists and pools (async).

Parameters:

Name Type Description Default
peer_id str

The peer ID to remove.

required
Source code in unaiverse/networking/node/connpool.py
async def remove(self, peer_id: str) -> None:
    """Removes a peer and its associated information from all lists and pools (async).

    Args:
        peer_id: The peer ID to remove.
    """
    await super().remove(peer_id)

remove_all_world_agents async

remove_all_world_agents() -> None

Removes all connected world agents from the pools and role lists (async).

Source code in unaiverse/networking/node/connpool.py
async def remove_all_world_agents(self) -> None:
    """Removes all connected world agents from the pools and role lists (async)."""
    peer_infos = self.get_all_connected_peer_infos(NodeConn.WORLD)
    for c in peer_infos:
        peer_id = c['id']
        await self.remove(peer_id)
        if peer_id in self.peer_id_to_addrs:
            del self.peer_id_to_addrs[peer_id]
        for role, peer_ids in self.role_to_peer_ids.items():
            if peer_id not in peer_ids:
                continue
            if role & 1 == 1:
                peer_ids.remove(peer_id)

subscribe async

subscribe(peer_id: str, channel: str, default_p2p_name: str | None = None) -> bool

Subscribes to a channel, defaulting to the world P2P network if a network is not specified (async).

Parameters:

Name Type Description Default
peer_id str

The peer ID associated with the channel.

required
channel str

The channel to subscribe to.

required
default_p2p_name str | None

An optional P2P name to use for the subscription.

None

Returns:

Type Description
bool

True if successful, False otherwise.

Source code in unaiverse/networking/node/connpool.py
async def subscribe(self, peer_id: str, channel: str, default_p2p_name: str | None = None) -> bool:
    """Subscribes to a channel, defaulting to the world P2P network if a network is not specified (async).

    Args:
        peer_id: The peer ID associated with the channel.
        channel: The channel to subscribe to.
        default_p2p_name: An optional P2P name to use for the subscription.

    Returns:
        True if successful, False otherwise.
    """
    return await super().subscribe(peer_id, channel,
                                   default_p2p_name=NodeConn.P2P_WORLD
                                   if default_p2p_name is None else default_p2p_name)

get_messages async

get_messages(p2p_name: str, allowed_not_connected_peers: set | None = None) -> list[Msg]

Retrieves messages, allowing for messages from known world agents and masters even if not in a connection pool (async).

Parameters:

Name Type Description Default
p2p_name str

The name of the P2P network to get messages from.

required
allowed_not_connected_peers set | None

This parameter is ignored in this implementation.

None

Returns:

Type Description
list[Msg]

A list of verified and processed message objects.

Source code in unaiverse/networking/node/connpool.py
async def get_messages(self, p2p_name: str, allowed_not_connected_peers: set | None = None) -> list[Msg]:
    """Retrieves messages, allowing for messages from known world agents and masters even if not in a
    connection pool (async).

    Args:
        p2p_name: The name of the P2P network to get messages from.
        allowed_not_connected_peers: This parameter is ignored in this implementation.

    Returns:
        A list of verified and processed message objects.
    """
    assert allowed_not_connected_peers is None, "This param (allowed_not_connected_peers is ignored in NodeConn"
    return await super().get_messages(p2p_name,
                                      allowed_not_connected_peers=self.world_agents_and_world_masters_set)