🔴 unaiverse.networking.node.connpool
What this module does 🔴
Manages pools of peer connections over multiple P2P transports, routing peers into named pools by role and providing connect/disconnect, messaging, pub/sub, and world-membership tracking for the agent network.
connpool
¶
█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████
░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█
░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░
░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █
░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████
░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░
A Collectionless AI Project (https://collectionless.ai)
Registration/Login: https://unaiverse.io
Code Repositories: https://github.com/collectionlessai/
Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi
ConnectionPools
¶
ConnectionPools(max_connections: int, pool_name_to_p2p_name_and_ratio: dict[str, [str, float]], p2p_name_to_p2p: dict[str, P2P], public_key: str | None = None, token: str | None = None)
A multi-pool connection manager that routes peers across named P2P connection pools.
ConnectionPools partitions the total connection budget (max_connections) into
named pools, each associated with a P2P network object and a capacity ratio. Every
connected peer is tracked in exactly one pool and is identified by its peer ID. The
class provides primitives for connecting, disconnecting, adding, removing, querying, and
messaging peers, along with token-based authentication for incoming messages.
Internally each pool is represented as a "pool triple": a set of peer IDs, an
integer maximum size, and the P2P object that owns those connections. Several
cross-referenced dictionaries (peer_id_to_pool_name, peer_id_to_p2p,
pool_name_to_peer_infos, etc.) are maintained in sync so that per-peer lookups
remain O(1).
This class is abstract in the sense that conn_routing_fcn raises
NotImplementedError and must be overridden by a concrete subclass (such as
NodeConn) to define how raw peer-info dictionaries returned by the P2P layer are
mapped to pool names.
Attributes:
| Name | Type | Description |
|---|---|---|
max_con |
Maximum total number of connections across all pools. |
|
pool_count |
Number of named connection pools. |
|
pool_names |
Ordered list of pool names as provided at construction time. |
|
pool_ratios |
Per-pool capacity ratios in the same order as |
|
p2p_name_to_p2p |
Mapping from P2P network name to its |
|
pool_name_to_pool_triple |
Mapping from pool name to its triple
|
|
pool_name_to_peer_infos |
Mapping from pool name to a dict of
|
|
peer_id_to_pool_name |
Reverse mapping from peer ID to the pool it occupies. |
|
peer_id_to_p2p |
Mapping from peer ID to the |
|
peer_id_to_misc |
Mapping from peer ID to an integer "misc" field (role bits, etc.). |
|
peer_id_to_token |
Mapping from peer ID to the last verified authentication token. |
|
pool_name_to_added_in_last_update |
Set of peer IDs added per pool during the most
recent |
|
pool_name_to_removed_in_last_update |
Set of peer IDs removed per pool during the
most recent |
Initialize the connection pools and compute per-pool capacity budgets.
Pool capacities are derived from the provided ratios scaled by max_connections.
A pool whose ratio is exactly 0.0 receives a capacity of 0 (disabled), while
a pool whose ratio is negative is treated as an unlimited-capacity overflow pool.
Any rounding remainder from math.floor is assigned to the last pool so that the
sum of all pool capacities equals max_connections exactly.
The constructor validates that:
- Every P2P name referenced in pool_name_to_p2p_name_and_ratio appears in
p2p_name_to_p2p.
- max_connections is at least as large as the number of pools.
- The positive ratios sum to exactly 1.0.
- The number of zero-ratio (disabled) pools does not exceed max_connections.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_connections
|
int
|
Maximum total number of simultaneous connections across all pools. |
required |
pool_name_to_p2p_name_and_ratio
|
dict[str, [str, float]]
|
Ordered mapping from pool name to a two-element
list |
required |
p2p_name_to_p2p
|
dict[str, P2P]
|
Mapping from P2P network name to its |
required |
public_key
|
str | None
|
PEM-encoded public key used to verify tokens piggybacked on
incoming messages. If |
None
|
token
|
str | None
|
Authentication token appended to every outgoing message as a piggyback
payload. Defaults to |
None
|
Raises:
| Type | Description |
|---|---|
AssertionError
|
If any referenced P2P name is absent from |
Source code in unaiverse/networking/node/connpool.py
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | |
pool_ratios
instance-attribute
¶
p2p_name_and_pool_name_to_pool_triple
instance-attribute
¶
pool_name_to_peer_infos
instance-attribute
¶
pool_name_to_pool_triple
instance-attribute
¶
pool_name_to_pool_triple = {k: [set(), 0, p2p_name_to_p2p[pool_name_to_p2p_name_and_ratio[k][0]]] for k in (pool_names)}
conn_routing_fcn
async
¶
conn_routing_fcn(connected_peer_infos: list, p2p: P2P) -> dict
Route connected peers to the correct pool based on application-specific logic (async).
This method is the primary extension point of ConnectionPools. Concrete
subclasses must override it to implement the routing strategy that maps raw
peer-info dictionaries (as returned by the P2P layer) to named pool buckets.
The base implementation always raises NotImplementedError.
The returned dictionary drives the update method: for each pool it computes
newly arrived peers (to be added) and departed peers (to be removed).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connected_peer_infos
|
list
|
List of peer-info dicts as returned by
|
required |
p2p
|
P2P
|
The |
required |
Returns:
| Type | Description |
|---|---|
dict
|
A dict mapping each pool name (that belongs to |
dict
|
the form |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Always, because the base class provides no routing logic. |
Source code in unaiverse/networking/node/connpool.py
disconnect
async
staticmethod
¶
disconnect(p2p: P2P, peer_id: str) -> bool
Disconnect from a specific peer on a P2P network (async).
Calls P2P.disconnect_from and translates a P2PError into a False
return value so callers do not need to catch the exception. This method does
not remove the peer from any internal pool bookkeeping; use remove for the
full teardown.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
p2p
|
P2P
|
The |
required |
peer_id
|
str
|
The peer ID to disconnect from. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
|
Source code in unaiverse/networking/node/connpool.py
set_token
¶
Set the authentication token piggybacked on every outgoing message.
The token is appended (along with a trailing inspector-mode bit) to the
piggyback field of each Msg created by send and publish.
Calling this method replaces the token provided at construction time.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
The new authentication token string. |
required |
Source code in unaiverse/networking/node/connpool.py
verify_token
async
¶
Verify a token extracted from an incoming message piggyback field (async).
Delegates to the TokenVerifier that was constructed from the public_key
supplied at instantiation time. If no public key was provided (i.e., the verifier
is None), the method returns (None, None) without performing any check,
effectively treating all tokens as invalid. On a successful verification the
verifier returns the node ID and CV hash encoded in the token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
The raw token string extracted from the message piggyback payload. |
required |
peer_id
|
str
|
The P2P peer ID of the sending peer, used as an additional binding input by the verifier. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
A two-element tuple |
str | None
|
success, or |
tuple[str | None, str | None]
|
invalid. |
Source code in unaiverse/networking/node/connpool.py
connect
async
¶
Connect to a peer via a named P2P network (async).
Looks up the P2P object for p2p_name and delegates to the internal
__connect method, which filters addresses (removing loopback and pure TCP
entries where possible), sorts by protocol priority (QUIC/UDP first, WebSocket
second, raw TCP last), and attempts the connection. If the preferred address set
fails, a fallback to the discarded set is tried automatically.
This method only establishes the transport-layer connection; it does not add the
peer to any pool. Call add after a successful connect if pool membership is
desired.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
addresses
|
list[str]
|
List of multiaddr strings for the target peer. At least one reachable address must be present for the connection to succeed. |
required |
p2p_name
|
str
|
Name of the P2P network (key in |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
A two-element tuple |
bool
|
string ID of the connected peer (or |
tuple[str | None, bool]
|
is |
Source code in unaiverse/networking/node/connpool.py
reserve
async
¶
Reserve a relay slot on a remote relay node via a named P2P network (async).
Calls P2P.reserve_on_relay to request a circuit-relay reservation that allows
other peers to reach this node through the relay identified by peer_id. A
P2PError is silently caught and translated to a None return value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
Peer ID of the relay node to reserve a slot on. |
required |
p2p_name
|
str
|
Name of the P2P network (key in |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
A UTC expiration timestamp string for the reservation on success, or |
str | None
|
if the reservation attempt failed. |
Source code in unaiverse/networking/node/connpool.py
add
¶
Add a peer to a named connection pool.
The pool must have available capacity (current size below its maximum). A peer that is already tracked in a different pool is rejected to enforce the invariant that each peer ID appears in at most one pool. A peer that is already in the same pool is silently accepted and its internal state is refreshed.
On success, the method updates all relevant internal indices
(peer_id_to_pool_name, peer_id_to_p2p, pool_name_to_peer_infos) and
sets the 'misc' field of peer_info from peer_id_to_misc (defaulting
to 0 for public peers).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_info
|
dict
|
Peer-info dict containing at least the key |
required |
pool_name
|
str
|
Name of the destination pool (must be a key in
|
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
or the peer is already registered in a different pool. |
Source code in unaiverse/networking/node/connpool.py
remove
async
¶
Remove a peer from its pool and disconnect the underlying transport (async).
Looks up the pool that contains peer_id, calls disconnect on the
associated P2P object, and then removes the peer from all internal indices:
the pool set, pool_name_to_peer_infos, peer_id_to_pool_name,
peer_id_to_p2p, and peer_id_to_token. The peer_id_to_misc entry is
intentionally preserved so that role/misc information survives a reconnect.
If peer_id is not tracked in any pool, the method returns False
immediately without attempting a disconnection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
ID of the peer to remove. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
without raising |
bool
|
disconnection call failed. |
Source code in unaiverse/networking/node/connpool.py
get_all_connected_peer_infos
¶
Return the peer-info dicts for every peer currently in a named pool.
Each dict in the returned list is the same object stored internally in
pool_name_to_peer_infos, so callers must not mutate it unless they
understand the shared-reference implications.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_name
|
str
|
Name of the pool to query (must be a key in
|
required |
Returns:
| Type | Description |
|---|---|
list[dict]
|
A list of peer-info dicts, one per connected peer in the pool. The list is |
list[dict]
|
a snapshot copy; subsequent pool changes do not affect it. |
Source code in unaiverse/networking/node/connpool.py
get_pool_status
¶
Return the live set of peer IDs contained in each pool.
The returned sets are the actual internal sets, not copies; mutating them will corrupt the pool state. Use this method for read-only inspection or logging purposes only.
Returns:
| Type | Description |
|---|---|
dict[str, set]
|
A dict mapping each pool name to the |
dict[str, set]
|
registered in that pool. |
Source code in unaiverse/networking/node/connpool.py
get_all_connected_peer_ids
¶
Return a snapshot list of every peer ID connected across all pools.
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of peer ID strings drawn from all pools. The list is a copy of the |
list[str]
|
dictionary key view at the moment of the call; subsequent connect or |
list[str]
|
disconnect events do not affect it. |
Source code in unaiverse/networking/node/connpool.py
update
async
¶
Refresh all connection pools by reconciling live P2P state with internal bookkeeping (async).
For every registered P2P network, this method calls P2P.get_connected_peers_info
to obtain the current set of transport-level connections, passes the result through
conn_routing_fcn to map peers to pools, and then computes the symmetric
difference between the live set and the internal pool set:
- Peers present in the live set but absent from the pool are added via
addand recorded inpool_name_to_added_in_last_update. - Peers absent from the live set but still tracked in the pool are flagged in
pool_name_to_removed_in_last_update(the caller is responsible for callingremoveon them as needed).
The two result dicts are also stored as instance attributes
pool_name_to_added_in_last_update and pool_name_to_removed_in_last_update
and can be retrieved after the call via get_added_after_updating and
get_removed_after_updating.
Returns:
| Type | Description |
|---|---|
dict
|
A two-element tuple |
dict
|
pool names to a |
tuple[dict, dict]
|
during this call. Both dicts contain only pools where a change occurred. |
Source code in unaiverse/networking/node/connpool.py
get_messages
async
¶
get_messages(p2p_name: str, allowed_not_connected_peers: set | None = None) -> list[Msg]
Pop, decode, authenticate, and return all pending messages from a P2P network (async).
The method performs the following pipeline for every raw message dict returned by
P2P.pop_messages:
- Extracts the cryptographically verified sender ID (
'from') and the Base64-encoded payload ('data') from the Go-layer message dict. - Decodes the payload and parses it into a
Msgobject viaMsg.from_bytes. - Enforces a sender-identity check: the
senderfield inside theMsgpayload must match the'from'field from the network layer; mismatches are silently discarded. - Accepts the message only if the sender is either tracked in
peer_id_to_pool_nameor present inallowed_not_connected_peers. - Verifies the piggybacked token via
verify_token; on success, replaces thepiggybackfield with the verifiednode_idconcatenated with the inspector-mode bit, and caches the raw token inpeer_id_to_token. Messages with missing or invalid tokens are discarded.
Updating __last_recv_time is a side effect that occurs when at least one raw
message is present (before filtering).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
p2p_name
|
str
|
Name of the P2P network (key in |
required |
allowed_not_connected_peers
|
set | None
|
Optional set of peer IDs whose messages are
accepted even if those peers are not currently in any pool. Useful for
handling messages from peers that are known but not yet tracked
(for example, world agents on the rendezvous topic). Defaults to |
None
|
Returns:
| Type | Description |
|---|---|
list[Msg]
|
A list of fully authenticated |
list[Msg]
|
been replaced with |
list[Msg]
|
step of the pipeline are silently dropped. |
Source code in unaiverse/networking/node/connpool.py
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 | |
passed_time_since_last_communication
¶
Return the number of seconds elapsed since the most recent send or receive event.
The reference timestamp is the later of __last_sent_time (updated by
send for non-stats message types) and __last_recv_time (updated by
get_messages when at least one raw message is present). Both markers start at
0.0, so the returned value may be very large before any communication occurs.
Returns:
| Type | Description |
|---|---|
|
A non-negative float representing the elapsed wall-clock seconds since the |
|
|
last send or receive event. Returns |
|
|
negative, which can happen in rare cases of clock skew. |
Source code in unaiverse/networking/node/connpool.py
get_added_after_updating
¶
Return the peer IDs that were added during the most recent update call.
The internal dict pool_name_to_added_in_last_update is populated by update
and contains only pools where at least one peer was added. Querying a pool name
that had no additions will raise a KeyError.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_name
|
str | None
|
Name of a specific pool to query. If |
None
|
Returns:
| Type | Description |
|---|---|
set | dict
|
A |
set | dict
|
mapping pool names to such sets when |
Raises:
| Type | Description |
|---|---|
KeyError
|
If |
Source code in unaiverse/networking/node/connpool.py
get_removed_after_updating
¶
Return the peer IDs that were removed during the most recent update call.
The internal dict pool_name_to_removed_in_last_update is populated by
update and contains only pools where at least one peer departed. Note that
update records a departure but does not call remove itself; the caller
is responsible for acting on the returned set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_name
|
str | None
|
Name of a specific pool to query. If |
None
|
Returns:
| Type | Description |
|---|---|
set | dict
|
A |
set | dict
|
mapping pool names to such sets when |
Raises:
| Type | Description |
|---|---|
KeyError
|
If |
Source code in unaiverse/networking/node/connpool.py
get_last_token
¶
Return the last verified authentication token received from a peer.
Tokens are cached in peer_id_to_token by get_messages whenever a message
from a pool-tracked peer passes the full token-verification pipeline. The cached
token is the raw token string (without the trailing inspector-mode bit).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
ID of the peer whose token to retrieve. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The raw token string if a token has been received and verified from this peer, |
str | None
|
or |
Source code in unaiverse/networking/node/connpool.py
is_connected
¶
Check whether a peer is currently tracked in the connection pools.
When pool_name is omitted, the check is against all pools: the method
returns True if peer_id appears in peer_id_to_pool_name. When a
specific pool is given, the method additionally verifies that the peer is
registered in that exact pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
ID of the peer to look up. |
required |
pool_name
|
str | None
|
Name of a specific pool to restrict the search to. If |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
given, or in any pool otherwise), |
Source code in unaiverse/networking/node/connpool.py
get_pool_of
¶
Return the name of the pool that currently contains a peer.
Performs an O(1) lookup in peer_id_to_pool_name. This is the inverse
operation of iterating pool_name_to_pool_triple: given a peer ID it
immediately identifies which named bucket the peer occupies, which is useful
before calling send or when checking pool membership without knowing the
direction (inbound vs. outbound) in advance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
ID of the peer to look up. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The pool name string if the peer is currently tracked in a pool, or |
str | None
|
|
Source code in unaiverse/networking/node/connpool.py
size
¶
Return the number of connections in a specific pool or across all pools.
When pool_name is given, the count reflects the number of peer IDs currently
in that pool's set. When omitted, all pool sets are summed. The NodeConn
subclass overrides this signature to accept a list[str] instead of a single
name, enabling multi-pool aggregation in a single call.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_name
|
str | None
|
Name of the pool to query. If |
None
|
Returns:
| Type | Description |
|---|---|
int
|
The number of connections in the specified pool, or the grand total when |
int
|
|
Source code in unaiverse/networking/node/connpool.py
send
async
¶
send(peer_id: str, channel_trail: str | None, content_type: str, content: bytes | dict | None = None, p2p: P2P | None = None) -> bool
Send a direct message to a specific peer (async).
Constructs a Msg with the local node's peer ID as sender and the
current authentication token (plus a trailing "0" inspector-mode bit)
as the piggyback field, then dispatches it through P2P.send_message_to_peer
offloaded to a worker thread so the event loop is not blocked.
The channel name is derived from the local peer ID, the target peer ID, and
content_type following the pattern
"<local_peer_id>::dm:<peer_id>-<content_type>". When channel_trail is
a non-empty string, it is appended with a "~" separator.
If content_type is not one of Msg.STATS_UPDATE, Msg.STATS_REQUEST,
or Msg.STATS_RESPONSE, the internal __last_sent_time timestamp is
updated, which affects passed_time_since_last_communication.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
ID of the destination peer. |
required |
channel_trail
|
str | None
|
Optional suffix appended to the channel name after a |
required |
content_type
|
str
|
Message type identifier (one of the |
required |
content
|
bytes | dict | None
|
Message payload, either raw bytes, a serialisable dict, or |
None
|
p2p
|
P2P | None
|
|
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
|
bool
|
the underlying send call raised |
Source code in unaiverse/networking/node/connpool.py
subscribe
async
¶
Subscribe to a GossipSub topic on the P2P network that owns a peer (async).
Resolves the P2P object to use via a three-step fallback:
- If
peer_idmatches a local P2P node's own peer ID (i.e., the caller itself owns that ID), thatP2Pobject is used. - Otherwise, if the peer is tracked in
peer_id_to_p2p, its registeredP2Pobject is used. - Finally, if
default_p2p_nameis provided, the namedP2Pobject fromp2p_name_to_p2pis used as a last resort.
If none of the above resolves a P2P object, the method returns False.
Both P2PError and ValueError raised by P2P.subscribe_to_topic are
caught and translated to a False return value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
Peer ID used to identify which |
required |
channel
|
str
|
GossipSub topic name to subscribe to. |
required |
default_p2p_name
|
str | None
|
Name of the |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
be resolved or if the subscription call raised an exception. |
Source code in unaiverse/networking/node/connpool.py
unsubscribe
async
¶
Unsubscribe from a GossipSub topic on the P2P network that owns a peer (async).
Applies the same three-step P2P resolution logic used by subscribe:
local peer ID match, then peer_id_to_p2p lookup, then default_p2p_name
fallback. If resolution fails, returns False without raising.
Both P2PError and ValueError raised by P2P.unsubscribe_from_topic
are caught and translated to a False return value so callers do not need
to handle them explicitly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
Peer ID used to identify which |
required |
channel
|
str
|
GossipSub topic name to unsubscribe from. |
required |
default_p2p_name
|
str | None
|
Name of the |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
could be resolved or if the unsubscription call raised an exception. |
Source code in unaiverse/networking/node/connpool.py
publish
async
¶
publish(peer_id: str, channel: str, content_type: str, content: bytes | dict | tuple | None = None) -> bool
Publish a message to a GossipSub topic on the P2P network that owns a peer (async).
Resolves the P2P object with a two-step strategy: first checks whether
peer_id is one of the local node's own peer IDs (iterating
p2p_to_pool_names), then falls back to peer_id_to_p2p. If neither
yields a P2P object, the method returns False immediately.
The Msg is constructed with the resolved P2P node's peer ID as sender
and the current authentication token plus "0" inspector-mode bit as the
piggyback field, mirroring the construction used in send. Dispatch is
synchronous (via P2P.broadcast_message). A P2PError is caught and
translated to a False return value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
Peer ID used to identify which |
required |
channel
|
str
|
GossipSub topic name to publish to. |
required |
content_type
|
str
|
Message type identifier (one of the |
required |
content
|
bytes | dict | tuple | None
|
Message payload - raw bytes, a serialisable dict, a tuple, or
|
None
|
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
if the |
bool
|
raised |
Source code in unaiverse/networking/node/connpool.py
NodeConn
¶
NodeConn(max_connections: int, p2p_u: P2P, p2p_w: P2P, is_world_node: bool, public_key: str, token: str)
Bases: ConnectionPools
A UNaIVERSE-aware connection pool that routes peers by role and network type.
NodeConn is a concrete subclass of ConnectionPools that implements the
conn_routing_fcn extension point for the UNaIVERSE node architecture. It
manages two physical P2P networks:
p2p_public(P2P_PUBLIC) - the universally reachable network used for connections with arbitrary peers not yet joined to a world.p2p_world(P2P_WORLD) - the private network shared only with peers that belong to the same world.
Each physical network is split into inbound and outbound pools, and the world
network is further subdivided by peer role: WORLD_AGENTS, WORLD_MASTERS,
and WORLD_NODE. The pool constants (IN_PUBLIC, OUT_PUBLIC,
IN_WORLD_AGENTS, etc.) are class-level strings that can be used as keys
wherever a pool name is expected.
In addition to the inherited pool bookkeeping, NodeConn maintains:
world_agents_setandworld_masters_set: sets of peer IDs that are allowed on the private network, updated viaset_world_agents_listandset_world_masters_list(or incrementally viaadd_to_world_agents_listandadd_to_world_masters_list).world_node_peer_id: the peer ID of the world node when this instance belongs to an agent, orNonewhen operating as a world node itself.peer_id_to_addrs: cached multiaddr lists for known peers, used byfind_addrs_by_roleandget_addrsto locate peers before a transport connection is established.rendezvous_tag: a monotonically increasing counter used to detect and apply only the freshest rendezvous-topic snapshots.
Aggregated pool sets such as PUBLIC, WORLD, ALL, OUTGOING, and
INCOMING are provided as class attributes for convenient multi-pool queries
via is_connected, size, get_all_connected_peer_infos, and related
methods.
Attributes:
| Name | Type | Description |
|---|---|---|
p2p_public |
The public-network |
|
p2p_world |
The world/private-network |
|
world_agents_set |
Set of peer IDs recognised as world agents on the private network. |
|
world_masters_set |
Set of peer IDs recognised as world masters on the private network. |
|
world_masters_first |
Peer ID of the first world master added, or |
|
world_agents_and_world_masters_set |
Union of |
|
world_node_peer_id |
Peer ID of the world node, or |
|
inspector_peer_id |
Peer ID of the inspector node, or |
|
role_to_peer_ids |
Mapping from integer role bitmask to the set of peer IDs currently holding that role. |
|
peer_id_to_addrs |
Mapping from peer ID to the list of known multiaddr strings for that peer. |
|
rendezvous_tag |
Monotonic update counter for the rendezvous topic snapshot.
Starts at |
Initialize a NodeConn with role-aware pool layout for a UNaIVERSE node.
Invokes ConnectionPools.__init__ with a fixed pool configuration that
partitions max_connections across the public and world P2P networks.
Capacity ratios differ between world-node and regular-agent deployments:
- Regular agent: 25% public (in+out equally split), 75% world agents (in+out).
World-node, world-masters, and inspector pools are disabled (ratio
0.0). - World node: 25% public, 50% world agents, 25% world masters; the world-node
pools are set to
-1(unlimited overflow) since a world serves any number of incoming agents.
After the parent constructor, convenience references p2p_public and
p2p_world are stored, and all role/agent tracking attributes are
initialised to their empty/sentinel values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_connections
|
int
|
Maximum total number of simultaneous connections across all pools. |
required |
p2p_u
|
P2P
|
|
required |
p2p_w
|
P2P
|
|
required |
is_world_node
|
bool
|
|
required |
public_key
|
str
|
PEM-encoded public key used to verify tokens in incoming
messages. Passed through to |
required |
token
|
str
|
Authentication token appended as piggyback to every outgoing
message. Passed through to |
required |
Raises:
| Type | Description |
|---|---|
AssertionError
|
Propagated from |
Source code in unaiverse/networking/node/connpool.py
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 | |
IN_PUBLIC
class-attribute
instance-attribute
¶
OUT_PUBLIC
class-attribute
instance-attribute
¶
IN_WORLD_AGENTS
class-attribute
instance-attribute
¶
OUT_WORLD_AGENTS
class-attribute
instance-attribute
¶
IN_WORLD_NODE
class-attribute
instance-attribute
¶
OUT_WORLD_NODE
class-attribute
instance-attribute
¶
IN_WORLD_MASTERS
class-attribute
instance-attribute
¶
OUT_WORLD_MASTERS
class-attribute
instance-attribute
¶
WORLD_AGENTS
class-attribute
instance-attribute
¶
WORLD_AGENTS = {IN_WORLD_AGENTS, OUT_WORLD_AGENTS}
WORLD_MASTERS
class-attribute
instance-attribute
¶
WORLD_MASTERS = {IN_WORLD_MASTERS, OUT_WORLD_MASTERS}
OUTGOING
class-attribute
instance-attribute
¶
OUTGOING = {OUT_PUBLIC, OUT_WORLD_NODE, OUT_WORLD_AGENTS, OUT_WORLD_MASTERS}
INCOMING
class-attribute
instance-attribute
¶
INCOMING = {IN_PUBLIC, IN_WORLD_NODE, IN_WORLD_AGENTS, IN_WORLD_MASTERS}
reset_rendezvous_tag
¶
Reset the rendezvous tag to its sentinel value of -1.
Setting rendezvous_tag back to -1 causes the next call to
set_world_agents_and_world_masters_lists_from_rendezvous to unconditionally
apply the next rendezvous snapshot, regardless of its update_count. This is
useful after a world reconnect or when the agent needs to force a full refresh
of the peer lists.
Source code in unaiverse/networking/node/connpool.py
conn_routing_fcn
async
¶
conn_routing_fcn(connected_peer_infos: list, p2p: P2P) -> dict
Route each connected peer to a pool based on its network and role (async).
Implements the ConnectionPools.conn_routing_fcn extension point for the
UNaIVERSE node topology. The routing logic differs by network:
- Public network (
p2p == p2p_public): peers are routed toIN_PUBLIC(inbound) orOUT_PUBLIC(outbound) purely based on connection direction. - World/private network: each peer is classified by cross-referencing its ID
against
world_agents_set,world_masters_set,world_node_peer_id, andinspector_peer_id. Inbound peers land inIN_WORLD_AGENTS,IN_WORLD_NODE, orIN_WORLD_MASTERS; outbound peers in theirOUT_*counterparts. Peers that match none of the known sets are disconnected immediately and a diagnostic log is emitted.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connected_peer_infos
|
list
|
List of peer-info dicts as returned by
|
required |
p2p
|
P2P
|
The |
required |
Returns:
| Type | Description |
|---|---|
dict
|
A dict mapping every pool name associated with |
dict
|
|
dict
|
no matching peers map to an empty dict. |
Source code in unaiverse/networking/node/connpool.py
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 | |
set_world
¶
Set the peer ID of the world node.
Updates world_node_peer_id, which is consulted by conn_routing_fcn
when classifying private-network connections. Passing None clears the
registration and causes any subsequent connection from the former world node
peer to be treated as unrecognised (and disconnected) by the routing function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
world_peer_id
|
str | None
|
Peer ID of the world node to register, or |
required |
Source code in unaiverse/networking/node/connpool.py
set_inspector
¶
Set the peer ID of the inspector node.
Updates inspector_peer_id, which is consulted alongside world_agents_set
and world_masters_set in conn_routing_fcn when classifying private-network
connections. Passing None clears the inspector registration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inspector_peer_id
|
str | None
|
Peer ID of the inspector node to register, or |
required |
Source code in unaiverse/networking/node/connpool.py
get_world_peer_id
¶
Return the peer ID of the world node.
Returns the value stored in world_node_peer_id, which is set via
set_world. For a regular agent this is the peer ID of the world it has
joined; for a world node itself the value is typically None (or its own
world-network peer ID if explicitly set).
Returns:
| Type | Description |
|---|---|
str | None
|
The world node's peer ID string, or |
str | None
|
registered. |
Source code in unaiverse/networking/node/connpool.py
get_first_world_master
¶
Return the peer ID of the first registered world master.
world_masters_first is set to the first element of the list passed to
set_world_masters_list, and updated incrementally when
add_to_world_masters_list is called with the first entry ever added. It
is reset to None whenever set_world_masters_list is called with an
empty or None list.
This value can be used as a preferred contact point when initiating communication with the world before a full masters list is available.
Returns:
| Type | Description |
|---|---|
str | None
|
The peer ID string of the first world master, or |
str | None
|
have been registered. |
Source code in unaiverse/networking/node/connpool.py
set_addresses_in_peer_info
¶
Update the address list stored inside a peer's pool entry.
Retrieves the 'addrs' list from the peer-info dict in
pool_name_to_peer_infos and updates it in place (clear then extend) so
that any other code holding a reference to the same list object sees the
updated values. The operation is a no-op if the peer is not currently tracked
in any connection pool (as determined by in_connection_queues).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
Peer ID of the entry to update. |
required |
addresses
|
list[str]
|
New list of multiaddr strings to store for the peer. |
required |
Note
The update is intentionally in-place (rather than replacing the list reference) because other parts of the framework may hold direct references to the same list object.
Source code in unaiverse/networking/node/connpool.py
set_role
¶
Update the role of a peer and keep role_to_peer_ids consistent.
Stores new_role in peer_id_to_misc (the inherited "misc" field used
throughout the pool layer). If the peer is currently tracked in a pool, the
'misc' field of its cached peer-info dict is updated to match. The
role_to_peer_ids index is then maintained by removing the peer from its
old role set (deleting the set entry if it becomes empty) and inserting it
into the new role set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
Peer ID whose role is to be changed. |
required |
new_role
|
int
|
New integer role bitmask to assign to the peer. |
required |
Source code in unaiverse/networking/node/connpool.py
set_world_agents_list
¶
Sets the list of all world agents based on a provided list of peer information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
world_agents_list_peer_infos
|
list[dict] | None
|
A list of dictionaries containing peer information for world agents. |
required |
Source code in unaiverse/networking/node/connpool.py
get_world_masters
¶
set_world_masters_list
¶
Sets the list of all world masters based on a provided list of peer information.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
world_masters_list_peer_infos
|
list[dict] | None
|
A list of dictionaries containing peer information for world masters. |
required |
Source code in unaiverse/networking/node/connpool.py
add_to_world_agents_list
¶
Adds a new world agent to the list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID of the new agent. |
required |
addrs
|
list[str]
|
A list of addresses for the new agent. |
required |
role
|
int
|
The role assigned to the agent. |
-1
|
Source code in unaiverse/networking/node/connpool.py
add_to_world_masters_list
¶
Adds a new world master to the list.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID of the new master. |
required |
addrs
|
list[str]
|
A list of addresses for the new master. |
required |
role
|
int
|
The role assigned to the master. |
-1
|
Source code in unaiverse/networking/node/connpool.py
get_added_after_updating
¶
Retrieves the set of peers added after the last update cycle for specified pools.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_names
|
list[str] | None
|
A list of pool names to check. If None, checks all pools. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, set]
|
A dictionary mapping pool names to sets of added peer IDs. |
Source code in unaiverse/networking/node/connpool.py
get_removed_after_updating
¶
Retrieves the set of peers removed after the last update cycle for specified pools.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_names
|
list[str] | None
|
A list of pool names to check. If None, checks all pools. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, set]
|
A dictionary mapping pool names to sets of removed peer IDs. |
Source code in unaiverse/networking/node/connpool.py
size
¶
Returns the total number of connections across all specified pools.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_names
|
list[str] | None
|
A list of pool names to sum the size of. If None, returns the total size of all pools. |
None
|
Returns:
| Type | Description |
|---|---|
int
|
The total number of connections. |
Source code in unaiverse/networking/node/connpool.py
is_connected
¶
Checks if a peer is connected in any of the specified pools.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to check. |
required |
pool_names
|
list[str] | None
|
A list of pool names to search within. If None, searches all pools. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the peer is found in any of the pools, otherwise False. |
Source code in unaiverse/networking/node/connpool.py
is_public
¶
Checks if a peer is connected via the public network.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the peer is in a public pool, otherwise False. |
Source code in unaiverse/networking/node/connpool.py
is_world_master
¶
Checks if a peer is a world master.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the peer is in a world master pool, otherwise False. |
Source code in unaiverse/networking/node/connpool.py
is_world_node
¶
Checks if a peer is the world node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the peer is in a world node pool, otherwise False. |
Source code in unaiverse/networking/node/connpool.py
is_in_world
¶
Checks if a peer is connected to the world network.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the peer is in any world pool, otherwise False. |
Source code in unaiverse/networking/node/connpool.py
get_role
¶
Retrieves the role of a given peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to query. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The integer role of the peer. |
Source code in unaiverse/networking/node/connpool.py
get_addrs
¶
Retrieves the list of addresses for a given peer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to query. |
required |
Returns:
| Type | Description |
|---|---|
list[str] | None
|
A list of addresses for the peer. |
Source code in unaiverse/networking/node/connpool.py
in_connection_queues
¶
Checks if a peer ID exists in any connection pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the peer is found in any pool, otherwise False. |
Source code in unaiverse/networking/node/connpool.py
find_addrs_by_role
¶
find_addrs_by_role(role: int, return_peer_ids_too: bool = False, discard_yourself: bool = False) -> list | tuple[list, list]
Finds all addresses of peers with a specific role.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
int
|
The integer role to search for. |
required |
return_peer_ids_too
|
bool
|
A boolean to also return the peer IDs. |
False
|
discard_yourself
|
bool
|
True to avoid reporting your info on the returned data. |
False
|
Returns:
| Type | Description |
|---|---|
list | tuple[list, list]
|
A list of lists of addresses, and optionally a list of peer IDs. |
Source code in unaiverse/networking/node/connpool.py
count_by_role
¶
Counts the number of peers with a specific role.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
role
|
int
|
The integer role to count. |
required |
Returns:
| Type | Description |
|---|---|
int
|
The number of peers with that role. |
Source code in unaiverse/networking/node/connpool.py
get_all_connected_peer_infos
¶
Retrieves a list of all peer info dictionaries for the specified pools.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pool_names
|
list[str] | set[str]
|
A list or set of pool names to query. |
required |
Returns:
| Type | Description |
|---|---|
list[dict]
|
A list of dictionaries containing peer information. |
Source code in unaiverse/networking/node/connpool.py
set_world_agents_and_world_masters_lists_from_rendezvous
async
¶
Updates the lists of world agents and masters using data from the rendezvous topic (async).
Source code in unaiverse/networking/node/connpool.py
get_cv_hash_from_last_token
async
¶
Retrieves the CV hash from the last token received from a peer (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to query. |
required |
Returns:
| Type | Description |
|---|---|
str | None
|
The CV hash string, or None if not found. |
Source code in unaiverse/networking/node/connpool.py
remove
async
¶
Removes a peer and its associated information from all lists and pools (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID to remove. |
required |
remove_all_world_agents
async
¶
Removes all connected world agents from the pools and role lists (async).
Source code in unaiverse/networking/node/connpool.py
subscribe
async
¶
Subscribes to a channel, defaulting to the world P2P network if a network is not specified (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
peer_id
|
str
|
The peer ID associated with the channel. |
required |
channel
|
str
|
The channel to subscribe to. |
required |
default_p2p_name
|
str | None
|
An optional P2P name to use for the subscription. |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if successful, False otherwise. |
Source code in unaiverse/networking/node/connpool.py
get_messages
async
¶
get_messages(p2p_name: str, allowed_not_connected_peers: set | None = None) -> list[Msg]
Retrieves messages, allowing for messages from known world agents and masters even if not in a connection pool (async).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
p2p_name
|
str
|
The name of the P2P network to get messages from. |
required |
allowed_not_connected_peers
|
set | None
|
This parameter is ignored in this implementation. |
None
|
Returns:
| Type | Description |
|---|---|
list[Msg]
|
A list of verified and processed message objects. |