Skip to content

🟑 unaiverse.networking.p2p

What this module does 🟑

Package initializer for the P2P layer that re-exports the Go-bridge and P2P classes and performs a developer source-integrity check by hashing the compiled Go library directory.

p2p

β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆ β–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘ β–‘β–ˆβ–ˆβ–ˆ β–ˆ β–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ
β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆ
β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘ β–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘ β–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

patterns module-attribute

patterns = ['*.so', '*.pyd', '*.dll', '*.dylib']

P2P

P2P(identity_dir: str, port: int = 0, ips: List[str] = None, enable_relay_client: bool = True, enable_relay_service: bool = False, use_broad_limits: bool = False, is_isolated: bool = False, knows_is_public: bool = False, enable_tls: bool = False, domain_name: Optional[str] = None, tls_cert_path: Optional[str] = None, tls_key_path: Optional[str] = None, dht_enabled: bool = True, dht_keep: bool = False, webrtc_enabled: bool = True, flood_sub: bool = False, ice_stun_servers: Optional[List[str]] = None, ice_turn_servers: Optional[List[Dict[str, Any]]] = None, log_sub: str = 'pub')

Python wrapper around the Go libp2p shared library.

Each instance represents one independent libp2p node backed by the Go runtime. The class manages the full node lifecycle: library initialisation (via setup_library), node creation (__init__), peer connectivity (connect_to, disconnect_from), direct and PubSub messaging (send_message_to_peer, broadcast_message, pop_messages), relay operations (reserve_on_relay), and graceful shutdown (close).

The Go library is a shared resource: it is loaded once per process and stored in the class-level libp2p attribute. setup_library must be called exactly once before any instance is created; subsequent calls are silently ignored. Up to _MAX_INSTANCES nodes may be active simultaneously; each instance is assigned a slot in _instance_ids and releases it on close.

Attributes:

Name Type Description
libp2p GoLibP2P

Class-level reference to the loaded GoLibP2P instance. Must be set by the package __init__.py before setup_library is called.

log_sub

Controls whether this node logs subscription events. Set to "pub" by default.

Note

The class supports the context-manager protocol. Using a P2P instance inside a with block guarantees that close is called on exit even if an exception is raised.

Initialize and start a new libp2p node with the given configuration.

Acquires a free instance slot from the shared _instance_ids pool and calls the Go CreateNode function with a JSON configuration blob that encodes all the options listed below. If node creation fails, the slot is released before re-raising the exception so it can be used by future instances.

The identity_dir directory is created if it does not already exist. The node's private key and any TLS certificates are persisted there so that the peer ID remains stable across restarts.

When both enable_relay_client and enable_relay_service are False, the relay subsystem is still enabled as a client if enable_relay_service is True (the client flag is ORed with the service flag internally).

Custom TLS (tls_cert_path / tls_key_path / domain_name) and AutoTLS (enable_tls with no custom paths) are mutually exclusive. If any of the custom TLS arguments is supplied, all three must be provided.

Parameters:

Name Type Description Default
identity_dir str

Path to a directory where the node's private key and TLS certificates are loaded from or written to. Created automatically if absent.

required
port int

TCP port number to listen on. Pass 0 for a random available port. Defaults to 0.

0
ips List[str]

List of IP address strings to bind to. Defaults to ["0.0.0.0"] (all interfaces) when None. Defaults to None.

None
enable_relay_client bool

If True, the node accepts inbound connections relayed through circuit-relay v2 nodes. Defaults to True.

True
enable_relay_service bool

If True, the node offers circuit-relay v2 service to other peers. Enabling this also forces enable_relay_client to True. Defaults to False.

False
use_broad_limits bool

If True, the relay service uses broad resource limits (higher connection counts). Ignored when enable_relay_service is False. Defaults to False.

False
is_isolated bool

If True, the node operates in an isolated network (no connections to the global DHT bootstrap peers). Useful for local testing. Defaults to False.

False
knows_is_public bool

If True, the node is marked as publicly reachable immediately without going through AutoNAT probing, UPnP, or hole-punching. Defaults to False.

False
enable_tls bool

If True and no custom TLS paths are given, AutoTLS certificate management is enabled (requires internet access and a valid domain_name). Defaults to False.

False
domain_name Optional[str]

Domain name for AutoTLS or custom TLS. Required when enable_tls is True or when custom TLS paths are supplied. Defaults to None.

None
tls_cert_path Optional[str]

Path to a PEM-encoded TLS certificate file for custom TLS. Must be provided together with tls_key_path and domain_name. Defaults to None.

None
tls_key_path Optional[str]

Path to a PEM-encoded TLS private key file for custom TLS. Must be provided together with tls_cert_path and domain_name. Defaults to None.

None
dht_enabled bool

If True, the Kademlia DHT is started for peer discovery. Defaults to True.

True
dht_keep bool

If True (and dht_enabled is True), the DHT routing table is preserved across reconnects. Defaults to False.

False
webrtc_enabled bool

If True, the node registers the /unaiverse/webrtc-signal/1.0.0 protocol for Go-to-browser NAT traversal via WebRTC DataChannels. Defaults to True.

True
flood_sub bool

If True, GossipSub falls back to FloodSub semantics. Defaults to False.

False
ice_stun_servers Optional[List[str]]

List of STUN server URIs in the form "stun:host:port". When None, the Go library uses its built-in default (Google public STUN). Defaults to None.

None
ice_turn_servers Optional[List[Dict[str, Any]]]

List of TURN server configuration dicts, each containing "urls", "username", and "credential" keys. Defaults to None.

None
log_sub str

Subscription logging mode string passed to the Go layer. Defaults to "pub".

'pub'

Raises:

Type Description
P2PError

If setup_library has not been called yet, if no free instance slot is available, if CreateNode returns a null or error result, or if the returned address list is invalid.

ValueError

If a partial set of custom TLS arguments is provided (all three of domain_name, tls_cert_path, and tls_key_path must be given together).

Source code in unaiverse/networking/p2p/p2p.py
def __init__(self,
             identity_dir: str,
             port: int = 0,
             ips: List[str] = None,
             enable_relay_client: bool = True,
             enable_relay_service: bool = False,
             use_broad_limits: bool = False,
             is_isolated: bool = False,
             knows_is_public: bool = False,
             enable_tls: bool = False,
             domain_name: Optional[str] = None,
             tls_cert_path: Optional[str] = None,
             tls_key_path: Optional[str] = None,
             dht_enabled: bool = True,
             dht_keep: bool = False,
             webrtc_enabled: bool = True,
             flood_sub: bool = False,
             ice_stun_servers: Optional[List[str]] = None,
             ice_turn_servers: Optional[List[Dict[str, Any]]] = None,
             log_sub: str = "pub",
             ) -> None:
    """Initialize and start a new libp2p node with the given configuration.

    Acquires a free instance slot from the shared ``_instance_ids`` pool and
    calls the Go ``CreateNode`` function with a JSON configuration blob that
    encodes all the options listed below. If node creation fails, the slot is
    released before re-raising the exception so it can be used by future
    instances.

    The ``identity_dir`` directory is created if it does not already exist. The
    node's private key and any TLS certificates are persisted there so that the
    peer ID remains stable across restarts.

    When both ``enable_relay_client`` and ``enable_relay_service`` are ``False``,
    the relay subsystem is still enabled as a client if ``enable_relay_service``
    is ``True`` (the client flag is ORed with the service flag internally).

    Custom TLS (``tls_cert_path`` / ``tls_key_path`` / ``domain_name``) and
    AutoTLS (``enable_tls`` with no custom paths) are mutually exclusive. If any
    of the custom TLS arguments is supplied, all three must be provided.

    Args:
        identity_dir: Path to a directory where the node's private key and TLS
            certificates are loaded from or written to. Created automatically if
            absent.
        port: TCP port number to listen on. Pass ``0`` for a random available
            port. Defaults to ``0``.
        ips: List of IP address strings to bind to. Defaults to ``["0.0.0.0"]``
            (all interfaces) when ``None``. Defaults to ``None``.
        enable_relay_client: If ``True``, the node accepts inbound connections
            relayed through circuit-relay v2 nodes. Defaults to ``True``.
        enable_relay_service: If ``True``, the node offers circuit-relay v2
            service to other peers. Enabling this also forces
            ``enable_relay_client`` to ``True``. Defaults to ``False``.
        use_broad_limits: If ``True``, the relay service uses broad resource
            limits (higher connection counts). Ignored when
            ``enable_relay_service`` is ``False``. Defaults to ``False``.
        is_isolated: If ``True``, the node operates in an isolated network
            (no connections to the global DHT bootstrap peers). Useful for
            local testing. Defaults to ``False``.
        knows_is_public: If ``True``, the node is marked as publicly reachable
            immediately without going through AutoNAT probing, UPnP, or
            hole-punching. Defaults to ``False``.
        enable_tls: If ``True`` and no custom TLS paths are given, AutoTLS
            certificate management is enabled (requires internet access and a
            valid ``domain_name``). Defaults to ``False``.
        domain_name: Domain name for AutoTLS or custom TLS. Required when
            ``enable_tls`` is ``True`` or when custom TLS paths are supplied.
            Defaults to ``None``.
        tls_cert_path: Path to a PEM-encoded TLS certificate file for custom
            TLS. Must be provided together with ``tls_key_path`` and
            ``domain_name``. Defaults to ``None``.
        tls_key_path: Path to a PEM-encoded TLS private key file for custom
            TLS. Must be provided together with ``tls_cert_path`` and
            ``domain_name``. Defaults to ``None``.
        dht_enabled: If ``True``, the Kademlia DHT is started for peer
            discovery. Defaults to ``True``.
        dht_keep: If ``True`` (and ``dht_enabled`` is ``True``), the DHT
            routing table is preserved across reconnects. Defaults to
            ``False``.
        webrtc_enabled: If ``True``, the node registers the
            ``/unaiverse/webrtc-signal/1.0.0`` protocol for Go-to-browser NAT
            traversal via WebRTC DataChannels. Defaults to ``True``.
        flood_sub: If ``True``, GossipSub falls back to FloodSub semantics.
            Defaults to ``False``.
        ice_stun_servers: List of STUN server URIs in the form
            ``"stun:host:port"``. When ``None``, the Go library uses its built-in
            default (Google public STUN). Defaults to ``None``.
        ice_turn_servers: List of TURN server configuration dicts, each
            containing ``"urls"``, ``"username"``, and ``"credential"`` keys.
            Defaults to ``None``.
        log_sub: Subscription logging mode string passed to the Go layer.
            Defaults to ``"pub"``.

    Raises:
        P2PError: If ``setup_library`` has not been called yet, if no free
            instance slot is available, if ``CreateNode`` returns a null or
            error result, or if the returned address list is invalid.
        ValueError: If a partial set of custom TLS arguments is provided (all
            three of ``domain_name``, ``tls_cert_path``, and ``tls_key_path``
            must be given together).
    """

    # --- CRITICAL: Check if library is initialized ---
    if not P2P._library_initialized:
        raise P2PError("P2P library not set up. Call P2P.setup_library() before creating an instance.")

    # Assign instance ID
    assigned_instance_id = -1
    with P2P._instance_lock:
        for _instance_id, i in enumerate(self._instance_ids):
            if not i:
                self._instance_ids[_instance_id] = True
                assigned_instance_id = _instance_id
                break
        if assigned_instance_id == -1:
            raise P2PError(
                f"Cannot create new P2P instance: Maximum number of instances "
                f"({P2P._MAX_INSTANCES})."
            )

    self._instance: int = assigned_instance_id
    logger.info(f"πŸš€ Attempting to initialize P2P Node with auto-assigned Instance ID: {self._instance}")

    os.makedirs(identity_dir, exist_ok=True)

    self._enable_relay_client = enable_relay_client or enable_relay_service
    self._peer_id: Optional[str] = None
    self.log_sub = log_sub

    # TLS Validation Logic
    has_custom_tls_args = (tls_cert_path is not None) or (tls_key_path is not None) or (domain_name is not None)
    if has_custom_tls_args:
        if domain_name is None or tls_cert_path is None or tls_key_path is None:
            raise ValueError("Custom TLS requires 'domain_name', 'tls_cert_path' and 'tls_key_path'.")

    use_auto_tls = enable_tls and not has_custom_tls_args

    # --- Build Configuration JSON ---
    node_config = {
        "identity_dir": identity_dir,
        "predefined_port": port,
        "listen_ips": ips,
        "relay": {
            "enable_client": self._enable_relay_client,
            "enable_service": enable_relay_service,
            "with_broad_limits": use_broad_limits,
        },
        "tls": {
            "auto_tls": use_auto_tls,
            "domain": domain_name if domain_name else "",
            "cert_path": tls_cert_path if tls_cert_path else "",
            "key_path": tls_key_path if tls_key_path else "",
        },
        "network": {
            "isolated": is_isolated,
            "force_public": knows_is_public,
            "flood_sub": flood_sub,
        },
        "dht": {
            "enabled": dht_enabled,
            "keep": dht_keep and dht_enabled,
        },
        "webrtc": {
            "enabled": webrtc_enabled,
            "ice_config": {
                "stun_servers": ice_stun_servers,
                "turn_servers": ice_turn_servers,
            } if webrtc_enabled else None,
        },
    }

    logger.info(f"🐍 Creating Node (Instance ID: {self._instance})...")
    try:

        # Call the Go function
        result_ptr = P2P.libp2p.CreateNode(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_json(node_config),
        )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            err_msg = "Received null result from Go CreateNode."
            logger.error(f"[Instance {self._instance}] {err_msg}")
            raise P2PError(f"[Instance {self._instance}] {err_msg}")
        if result.get('state') == "Error":
            err_msg = result.get('message', 'Unknown Go error on CreateNode')
            logger.error(f"[Instance {self._instance}] Go error: {err_msg}")
            raise P2PError(f"[Instance {self._instance}] Failed to create node: {err_msg}")

        message_data = result.get('message')
        initial_addresses = message_data.get("addresses", [])
        self._is_public = message_data.get("isPublic", False)

        # Check the returned data
        if not isinstance(initial_addresses, list):
            err_msg = "Received invalid addresses list from Go CreateNode."
            logger.error(f"[Instance {self._instance}] {err_msg}")
            raise P2PError(f"[Instance {self._instance}] {err_msg}")
        elif not initial_addresses:
            err_msg = "Received empty addresses list from Go CreateNode."
            logger.error(f"[Instance {self._instance}] {err_msg}")

        self._peer_id = initial_addresses[0].split("/")[-1]

        logger.info(f"βœ… [Instance {self._instance}] Node created with ID: {self._peer_id}")
        logger.info(f"πŸ‘‚ [Instance {self._instance}] Listening on: {initial_addresses}")
        logger.info(f"🌐 [Instance {self._instance}] Publicly reachable: {self._is_public}")

        logger.info(f"πŸŽ‰ [Instance {self._instance}] Node initialized successfully.")

    except Exception as e:
        logger.error(f"❌ [Instance {self._instance}] Node creation failed: {e}")

        # Reclaim the instance ID using the _instance_ids list
        if self._instance != -1:  # Check if an ID was actually assigned
            with P2P._instance_lock:
                P2P._instance_ids[self._instance] = False
                logger.info(f"[Instance {self._instance}] "
                            f"Reclaimed instance ID {self._instance} due to creation failure.")
        raise  # Re-raise the exception that caused the failure

    logger.info("πŸŽ‰ Node created successfully and background polling started.")

libp2p instance-attribute

libp2p: GoLibP2P

log_sub instance-attribute

log_sub = log_sub

peer_id property

peer_id: Optional[str]

Return the peer ID of the local libp2p node.

The value is set during __init__ by parsing the first multiaddress returned by the Go CreateNode call, and is reset to None after close is called.

Returns:

Type Description
Optional[str]

The peer ID string (e.g. "12D3Koo...") while the node is running,

Optional[str]

or None if the node has not been successfully initialised or has

Optional[str]

been closed.

addresses property

addresses: List[str]

Return the current list of multiaddresses from the Go engine.

Delegates to get_node_addresses with an empty peer ID to query the local node. The Go layer caches address information via internal events, so the call is effectively O(1). On failure the exception is caught, logged, and an empty list is returned so callers can safely iterate the result without additional error handling.

Returns:

Type Description
List[str]

A list of multiaddress strings including the /p2p/<PeerID> suffix,

List[str]

or an empty list if the Go call fails.

is_public property

is_public: Optional[bool]

Return whether the local node is publicly reachable.

The value is set during __init__ from the "isPublic" field in the Go CreateNode response. It reflects the reachability state at startup (or immediately after if knows_is_public was True); it is not updated dynamically during the node's lifetime.

Returns:

Type Description
Optional[bool]

True if the node is publicly reachable, False if it is behind

Optional[bool]

NAT without public reachability, or None if the node has not been

Optional[bool]

successfully initialised.

relay_is_enabled property

relay_is_enabled: bool

Return whether circuit-relay client functionality is enabled for this node.

The value is set during __init__ as the logical OR of the enable_relay_client and enable_relay_service constructor arguments, because enabling the relay service implicitly requires the relay client.

Returns:

Type Description
bool

True if the relay client is active (either explicitly or because

bool

the relay service was enabled), False otherwise.

setup_library classmethod

setup_library(max_instances: Optional[int] = None, max_channels: Optional[int] = None, max_queue_per_channel: Optional[int] = None, max_message_size: Optional[int] = None, enable_logging: bool = False, unai_logger=None) -> None

Initialize the underlying Go libp2p library with capacity and logging settings.

This class method must be called exactly once per process before any P2P instance is created. Subsequent calls while the library is already initialised are silently skipped with a warning. The method configures the Go runtime with capacity limits, then calls InitializeLibrary on the Go side.

When enable_logging is True and unai_logger is provided and the FdCapture utility is available, all Go stdout/stderr output is intercepted at the OS file-descriptor level before the Go library is initialised, so that every line emitted by the Go runtime (including startup messages) is routed through unai_logger.p2p() rather than appearing raw on the terminal.

Parameters:

Name Type Description Default
max_instances Optional[int]

Maximum number of simultaneous P2P node instances. Falls back to _MAX_INSTANCES (32) when None. Defaults to None.

None
max_channels Optional[int]

Maximum number of channels per instance. Falls back to _MAX_NUM_CAHNNELS (100) when None. Defaults to None.

None
max_queue_per_channel Optional[int]

Maximum number of queued messages per channel. Falls back to _MAX_QUEUE_PER_CHANNEL (50) when None. Defaults to None.

None
max_message_size Optional[int]

Maximum allowed message size in bytes. Falls back to _MAX_MESSAGE_SIZE (50 MB) when None. Defaults to None.

None
enable_logging bool

If True, sets the Python P2P logger to INFO level and passes a Go-level log-config dict to InitializeLibrary. Defaults to False.

False
unai_logger

An instance of unaiverse.utils.logger.Logger. When provided together with enable_logging=True and FdCapture being available, Go stdout/stderr are redirected to unai_logger.p2p(). Defaults to None.

None

Raises:

Type Description
P2PError

If P2P.libp2p has not been set before this call.

Source code in unaiverse/networking/p2p/p2p.py
@classmethod
def setup_library(cls,
                  max_instances: Optional[int] = None,
                  max_channels: Optional[int] = None,
                  max_queue_per_channel: Optional[int] = None,
                  max_message_size: Optional[int] = None,
                  enable_logging: bool = False,
                  unai_logger=None) -> None:
    """Initialize the underlying Go libp2p library with capacity and logging settings.

    This class method must be called exactly once per process before any ``P2P``
    instance is created. Subsequent calls while the library is already initialised
    are silently skipped with a warning. The method configures the Go runtime with
    capacity limits, then calls ``InitializeLibrary`` on the Go side.

    When ``enable_logging`` is ``True`` and ``unai_logger`` is provided and the
    ``FdCapture`` utility is available, all Go stdout/stderr output is intercepted
    at the OS file-descriptor level before the Go library is initialised, so that
    every line emitted by the Go runtime (including startup messages) is routed
    through ``unai_logger.p2p()`` rather than appearing raw on the terminal.

    Args:
        max_instances: Maximum number of simultaneous ``P2P`` node instances. Falls
            back to ``_MAX_INSTANCES`` (32) when ``None``. Defaults to ``None``.
        max_channels: Maximum number of channels per instance. Falls back to
            ``_MAX_NUM_CAHNNELS`` (100) when ``None``. Defaults to ``None``.
        max_queue_per_channel: Maximum number of queued messages per channel. Falls
            back to ``_MAX_QUEUE_PER_CHANNEL`` (50) when ``None``. Defaults to ``None``.
        max_message_size: Maximum allowed message size in bytes. Falls back to
            ``_MAX_MESSAGE_SIZE`` (50 MB) when ``None``. Defaults to ``None``.
        enable_logging: If ``True``, sets the Python ``P2P`` logger to INFO level and
            passes a Go-level log-config dict to ``InitializeLibrary``. Defaults to
            ``False``.
        unai_logger: An instance of ``unaiverse.utils.logger.Logger``. When provided
            together with ``enable_logging=True`` and ``FdCapture`` being available,
            Go stdout/stderr are redirected to ``unai_logger.p2p()``. Defaults to
            ``None``.

    Raises:
        P2PError: If ``P2P.libp2p`` has not been set before this call.
    """
    with cls._initialize_lock:
        if cls._library_initialized:
            logger.warning("P2P library is already initialized. Skipping setup.")
            return

        if not hasattr(cls, 'libp2p') or cls.libp2p is None:
            raise P2PError("Library not loaded before setup. Check package __init__.py")

        # Configure Python logging based on the flag
        if not enable_logging:
            logger.setLevel(logging.CRITICAL)
            _log_config = {}
        else:
            logger.setLevel(logging.INFO)
            _log_config = {
                'net/identify': 'info',
                'unailib': 'debug',
                # 'autotls': 'info',
                # 'p2p-forge': 'info',
                'nat': 'info',
                'basichost': 'info',
                'p2p-circuit': 'debug',
                'relay': 'debug',
                'p2p-holepunch': 'debug',
                'tcp-tpt': 'info',
                'connmgr': 'info',
                'dht': 'info',
                'autorelay': 'debug',
                'autonat': 'info',
                # 'rcmgr': 'info',
                'swarm2': 'info',
                'yamux': 'info'
            }

        # --- Start OS-level fd capture BEFORE InitializeLibrary, so we
        #     catch every line the Go runtime emits from the very start ---
        if enable_logging and unai_logger is not None and _CAPTURE_AVAILABLE:
            def _make_cb(src_label: str):
                def _cb(line: str):
                    unai_logger.p2p(line, src=src_label)
                return _cb
            cls._cap_stdout = FdCapture(1, _make_cb("stdout"))
            cls._cap_stderr = FdCapture(2, _make_cb("stderr"))
            cls._cap_stdout.start()
            cls._cap_stderr.start()
            logger.info("🐍 Go stdout/stderr redirected to P2P log channel.")

        logger.info("🐍 Setting up and initializing P2P library core with user settings...")
        cls._type_interface = TypeInterface(cls.libp2p)

        # Use provided arguments or fall back to class defaults
        _max_instances = max_instances if max_instances is not None else cls._MAX_INSTANCES
        _max_channels = max_channels if max_channels is not None else cls._MAX_NUM_CAHNNELS
        _max_queue = max_queue_per_channel if max_queue_per_channel is not None else cls._MAX_QUEUE_PER_CHANNEL
        _max_msg_size = max_message_size if max_message_size is not None else cls._MAX_MESSAGE_SIZE

        # Update class attributes if they were overridden
        cls._MAX_INSTANCES = _max_instances
        cls._instance_ids = [False, ] * _max_instances  # Resize the tracking list

        # Call the Go function to set up its internal state
        logger.info("🐍 Initializing Go library core...")
        cls.libp2p.InitializeLibrary(
            cls._type_interface.to_go_int(_max_instances),
            cls._type_interface.to_go_int(_max_channels),
            cls._type_interface.to_go_int(_max_queue),
            cls._type_interface.to_go_int(_max_msg_size),
            cls._type_interface.to_go_json(_log_config)
        )

        cls._library_initialized = True
        logger.info("βœ… Go library initialized successfully.")

connect_to

connect_to(multiaddrs: list[str]) -> Dict[str, Any]

Establish a connection to a remote peer identified by its multiaddresses.

The Go ConnectTo function is called with the full list of multiaddresses so the runtime can try each one in order until a connection succeeds. The peer ID is extracted from the first multiaddress for logging purposes.

Parameters:

Name Type Description Default
multiaddrs list[str]

Non-empty list of multiaddress strings for the target peer (e.g. ["/ip4/1.2.3.4/tcp/4001/p2p/12D3..."]). The list must contain at least one entry.

required

Returns:

Type Description
Dict[str, Any]

A dictionary representing the connected peer's AddrInfo, typically

Dict[str, Any]

containing the "ID" (peer ID string) and "Addrs" (list of

Dict[str, Any]

multiaddress strings) keys as returned by the Go library.

Raises:

Type Description
ValueError

If multiaddrs is empty or not a list.

P2PError

If the Go ConnectTo call returns a null result, reports an error state, or if any other failure occurs during the call.

Source code in unaiverse/networking/p2p/p2p.py
def connect_to(self, multiaddrs: list[str]) -> Dict[str, Any]:
    """Establish a connection to a remote peer identified by its multiaddresses.

    The Go ``ConnectTo`` function is called with the full list of multiaddresses
    so the runtime can try each one in order until a connection succeeds. The
    peer ID is extracted from the first multiaddress for logging purposes.

    Args:
        multiaddrs: Non-empty list of multiaddress strings for the target peer
            (e.g. ``["/ip4/1.2.3.4/tcp/4001/p2p/12D3..."]``). The list must
            contain at least one entry.

    Returns:
        A dictionary representing the connected peer's ``AddrInfo``, typically
        containing the ``"ID"`` (peer ID string) and ``"Addrs"`` (list of
        multiaddress strings) keys as returned by the Go library.

    Raises:
        ValueError: If ``multiaddrs`` is empty or not a list.
        P2PError: If the Go ``ConnectTo`` call returns a null result, reports
            an error state, or if any other failure occurs during the call.
    """
    if not multiaddrs or not isinstance(multiaddrs, list):
        logger.error("Invalid multiaddr provided.")
        raise ValueError("Invalid multiaddr provided.")
    dest_peer_id = multiaddrs[0].split('/')[-1]
    logger.info(f"πŸ“ž Attempting to connect to: {dest_peer_id}...")
    try:
        result_ptr = P2P.libp2p.ConnectTo(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_json(multiaddrs)
            )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to connect to peer, received null result.")
            raise P2PError("Failed to connect to peer, received null result.")
        if result.get('state') == "Error":
            logger.error(f"Failed to connect to peer '{dest_peer_id}': {result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to connect to peer '{dest_peer_id}': "
                           f"{result.get('message', 'Unknown Go error')}")

        peer_info = result.get('message', {})
        logger.info(f"βœ… Connection initiated to peer: {peer_info.get('ID', dest_peer_id)}")  # Use ID if available

        return peer_info

    except Exception as e:
        logger.error(f"❌ Connection to {dest_peer_id} failed: {e}")
        raise P2PError(f"Connection to {dest_peer_id} failed") from e

disconnect_from

disconnect_from(peer_id: str) -> None

Close all connections to a specific peer and remove its tracking state.

The Go DisconnectFrom function is called with the peer ID. A format warning is logged if the ID does not follow the standard libp2p v0 (Qm...) or v1 (12D3...) prefix conventions, but the call is still attempted.

Parameters:

Name Type Description Default
peer_id str

The Peer ID string of the peer to disconnect from.

required

Raises:

Type Description
ValueError

If peer_id is empty or not a string.

P2PError

If the Go DisconnectFrom call returns a null result, reports an error state, or if any other failure occurs during the call.

Source code in unaiverse/networking/p2p/p2p.py
def disconnect_from(self, peer_id: str) -> None:
    """Close all connections to a specific peer and remove its tracking state.

    The Go ``DisconnectFrom`` function is called with the peer ID. A format
    warning is logged if the ID does not follow the standard libp2p v0
    (``Qm...``) or v1 (``12D3...``) prefix conventions, but the call is still
    attempted.

    Args:
        peer_id: The Peer ID string of the peer to disconnect from.

    Raises:
        ValueError: If ``peer_id`` is empty or not a string.
        P2PError: If the Go ``DisconnectFrom`` call returns a null result, reports
            an error state, or if any other failure occurs during the call.
    """
    if not peer_id or not isinstance(peer_id, str):
        logger.error("Invalid Peer ID provided.")
        raise ValueError("Invalid Peer ID provided.")

    # Basic peer ID format check (Qm... or 12D3...)
    if not (peer_id.startswith("Qm") or peer_id.startswith("12D3")):
        logger.warning(f"⚠️ Warning: Peer ID '{peer_id}' does not look like a standard v0 or v1 ID.")

    logger.info(f"πŸ”Œ Attempting to disconnect from peer: {peer_id}...")
    try:
        result_ptr = P2P.libp2p.DisconnectFrom(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_string(peer_id)
            )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to disconnect from peer, received null result.")
            raise P2PError("Failed to disconnect from peer, received null result.")

        if result.get('state') == "Error":
            logger.error(f"Failed to disconnect from peer '{peer_id}': {result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to disconnect from peer '{peer_id}': "
                           f"{result.get('message', 'Unknown Go error')}")

        logger.info(f"βœ… Successfully disconnected from {peer_id}")

    except Exception as e:
        logger.error(f"❌ Disconnection from {peer_id} failed: {e}")
        raise P2PError(f"Disconnection from {peer_id} failed") from e

send_message_to_peer

send_message_to_peer(channel: str, msg_bytes: bytes) -> None

Send a direct (unicast) message to a specific peer over the given channel.

The channel string encodes both the transport mode and the target peer ID in the format <owner_peer_id>::dm:<target_peer_id>-<suffix>. The target peer ID is extracted from the channel string for logging. The raw byte payload and its length are forwarded to the Go SendMessageToPeer function.

Parameters:

Name Type Description Default
channel str

Channel identifier string in the direct-message format <owner>::dm:<peer_id>-<suffix>. Must be a non-empty string.

required
msg_bytes bytes

The serialised message payload as a bytes object.

required

Raises:

Type Description
ValueError

If channel is empty or not a string.

P2PError

If the Go SendMessageToPeer call returns a null result, reports an error state, or if any other failure occurs.

Source code in unaiverse/networking/p2p/p2p.py
def send_message_to_peer(self, channel: str, msg_bytes: bytes) -> None:
    """Send a direct (unicast) message to a specific peer over the given channel.

    The channel string encodes both the transport mode and the target peer ID in
    the format ``<owner_peer_id>::dm:<target_peer_id>-<suffix>``. The target peer
    ID is extracted from the channel string for logging. The raw byte payload and
    its length are forwarded to the Go ``SendMessageToPeer`` function.

    Args:
        channel: Channel identifier string in the direct-message format
            ``<owner>::dm:<peer_id>-<suffix>``. Must be a non-empty string.
        msg_bytes: The serialised message payload as a ``bytes`` object.

    Raises:
        ValueError: If ``channel`` is empty or not a string.
        P2PError: If the Go ``SendMessageToPeer`` call returns a null result,
            reports an error state, or if any other failure occurs.
    """
    if not channel or not isinstance(channel, str):
        logger.error("Invalid channel provided.")
        raise ValueError("Invalid channel provided.")

    # Serialize the entire message object to bytes using Protobuf.
    payload_len = len(msg_bytes)
    peer_id = channel.split("::dm:")[1].split('-')[0]  # Extract Peer ID from channel format

    # Call the Go function
    try:
        result_ptr = P2P.libp2p.SendMessageToPeer(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_string(channel),
            P2P._type_interface.to_go_bytes(msg_bytes),  # Pass bytes directly
            P2P._type_interface.to_go_int(payload_len),
        )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error(f"Failed to send direct message to {peer_id}, received null result.")
            raise P2PError(f"Failed to send direct message to {peer_id}, received null result.")

        if result.get('state') == "Error":
            logger.error(f"Failed to send direct message to '{peer_id}': "
                         f"{result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to send direct message to '{peer_id}': "
                           f"{result.get('message', 'Unknown Go error')}")

        logger.info(f"βœ… Successfully sent direct message to {peer_id[-5:]}.")

    except Exception as e:
        logger.error(f"❌ Sending direct message to {peer_id} failed: {e}")
        raise P2PError(f"Sending direct message to {peer_id} failed") from e

broadcast_message

broadcast_message(channel: str, msg_bytes: bytes) -> None

Broadcast a message via GossipSub/PubSub to all subscribers of a topic.

All peers that have called subscribe_to_topic with the same channel string will receive the message. The underlying Go call reuses SendMessageToPeer with the channel identifying a pubsub topic rather than a direct peer, distinguished by the ::ps: infix in the channel format.

Parameters:

Name Type Description Default
channel str

Channel identifier string in the pubsub format <owner_peer_id>::ps:<topic_name>. Must be a non-empty string.

required
msg_bytes bytes

The serialised message payload as a bytes object.

required

Raises:

Type Description
ValueError

If channel is empty or not a string.

P2PError

If the Go SendMessageToPeer call returns a null result, reports an error state, or if any other failure occurs.

Source code in unaiverse/networking/p2p/p2p.py
def broadcast_message(self, channel: str, msg_bytes: bytes) -> None:
    """Broadcast a message via GossipSub/PubSub to all subscribers of a topic.

    All peers that have called ``subscribe_to_topic`` with the same channel
    string will receive the message. The underlying Go call reuses
    ``SendMessageToPeer`` with the channel identifying a pubsub topic rather than
    a direct peer, distinguished by the ``::ps:`` infix in the channel format.

    Args:
        channel: Channel identifier string in the pubsub format
            ``<owner_peer_id>::ps:<topic_name>``. Must be a non-empty string.
        msg_bytes: The serialised message payload as a ``bytes`` object.

    Raises:
        ValueError: If ``channel`` is empty or not a string.
        P2PError: If the Go ``SendMessageToPeer`` call returns a null result,
            reports an error state, or if any other failure occurs.
    """
    if not channel or not isinstance(channel, str):
        raise ValueError("Invalid channel provided.")

    # Serialize the entire message object to bytes using Protobuf.
    payload_len = len(msg_bytes)

    # Call SendMessageToPeer with an empty peer_id string for broadcast
    try:
        result_ptr = P2P.libp2p.SendMessageToPeer(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_string(channel),
            P2P._type_interface.to_go_bytes(msg_bytes),
            P2P._type_interface.to_go_int(payload_len),
        )

        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error(f"Failed to broadcast message on channel {channel}, received null result.")
            raise P2PError(f"Failed to broadcast message on channel {channel}, received null result.")

        if result.get('state') == "Error":
            logger.error(f"Failed to broadcast message on channel '{channel}': "
                         f"{result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to broadcast message on channel '{channel}': "
                           f"{result.get('message', 'Unknown Go error')}")

    except Exception as e:
        logger.error(f"❌ Broadcasting to {channel} failed: {e}")
        raise P2PError(f"Broadcasting to {channel} failed") from e

    logger.info(f"βœ… Successfully broadcasted message on channel {channel}.")

pop_messages

pop_messages() -> List[bytes]

Retrieve and remove the oldest pending message from each channel queue.

Calls the Go PopMessages function which dequeues one message per active channel for this instance. The result is deserialized from the Go pointer to a Python list of raw byte arrays, one entry per message.

The Go layer may return three distinct states: - "Empty": no messages are waiting; an empty list is returned. - "Error": a Go-side failure occurred; P2PError is raised. - A list payload: one or more messages are available and returned.

Returns:

Type Description
List[bytes]

A list of bytes objects, one per received message. Returns an

List[bytes]

empty list when no messages are currently queued.

Raises:

Type Description
P2PError

If the Go PopMessages call returns a null result, an error state, an unexpected response format, or if any other failure occurs during deserialization.

Source code in unaiverse/networking/p2p/p2p.py
def pop_messages(self) -> List[bytes]:
    """Retrieve and remove the oldest pending message from each channel queue.

    Calls the Go ``PopMessages`` function which dequeues one message per
    active channel for this instance. The result is deserialized from the Go
    pointer to a Python list of raw byte arrays, one entry per message.

    The Go layer may return three distinct states:
    - ``"Empty"``: no messages are waiting; an empty list is returned.
    - ``"Error"``: a Go-side failure occurred; ``P2PError`` is raised.
    - A list payload: one or more messages are available and returned.

    Returns:
        A list of ``bytes`` objects, one per received message. Returns an
        empty list when no messages are currently queued.

    Raises:
        P2PError: If the Go ``PopMessages`` call returns a null result, an error
            state, an unexpected response format, or if any other failure occurs
            during deserialization.
    """
    logger.debug(f"[Instance {self._instance}] Popping message(s)...")
    try:
        go_instance_c = P2P._type_interface.to_go_int(self._instance)

        result_ptr = P2P.libp2p.PopMessages(go_instance_c)

        # From_go_ptr_to_json should handle freeing result_ptr
        raw_result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if raw_result is None:

            # This indicates an issue with the C call or JSON conversion in TypeInterface
            logger.error(f"[Instance {self._instance}] PopMessages: "
                         f"Received null/invalid result from TypeInterface.")
            raise P2PError(f"[Instance {self._instance}] PopMessages: Failed to get valid JSON response.")

        # Check for Go-side error or empty states first
        if isinstance(raw_result, dict):
            state = raw_result.get('state')
            if state == "Empty":
                logger.debug(f"[Instance {self._instance}] PopMessages: Queue is empty.")
                return []  # No messages available
            if state == "Error":
                error_message = raw_result.get('message', 'Unknown Go error during PopMessages')
                logger.error(f"[Instance {self._instance}] PopMessages: {error_message}")
                raise P2PError(f"[Instance {self._instance}] PopMessages: {error_message}")

            # If it's a dict but not a known state, it's unexpected
            logger.warning(f"[Instance {self._instance}] PopMessages: Unexpected dictionary format: {raw_result}")
            raise P2PError(f"[Instance {self._instance}] PopMessages: Unexpected dictionary response format.")

        # Expecting a list of messages if not an error/empty dict
        if not isinstance(raw_result, list):
            # This also covers the case where n=0 and Go returns "[]" which json.loads makes a list
            # If it's not a list at this point, it's an unexpected format.
            logger.error(f"[Instance {self._instance}] PopMessages: Unexpected response format, expected a list or "
                         f"specific state dictionary. Got: {type(raw_result)}")
            raise P2PError(f"[Instance {self._instance}] PopMessages: Unexpected response format.")

        return raw_result

    except P2PError:  # Re-raise P2PError directly
        raise
    except Exception as e:

        # Catch potential JSON parsing errors from TypeInterface or other unexpected errors
        logger.error(f"[Instance {self._instance}] ❌ Error during pop_message: {e}")
        raise P2PError(f"[Instance {self._instance}] Unexpected error during pop_message: {e}") from e

subscribe_to_topic

subscribe_to_topic(channel: str) -> None

Subscribe to a GossipSub/PubSub topic so that future broadcasts are received.

Calls the Go SubscribeToTopic function with the channel string. After a successful subscription, messages published on the same channel by any peer (including via broadcast_message) will appear in subsequent pop_messages calls. Subscribing to a topic the node is already subscribed to is handled by the Go layer.

Parameters:

Name Type Description Default
channel str

Channel identifier string in the pubsub format <owner_peer_id>::ps:<topic_name>. Must be a non-empty string.

required

Raises:

Type Description
ValueError

If channel is empty or not a string.

P2PError

If the Go SubscribeToTopic call returns a null result, reports an error state, or if any other failure occurs.

Source code in unaiverse/networking/p2p/p2p.py
def subscribe_to_topic(self, channel: str) -> None:
    """Subscribe to a GossipSub/PubSub topic so that future broadcasts are received.

    Calls the Go ``SubscribeToTopic`` function with the channel string. After a
    successful subscription, messages published on the same channel by any peer
    (including via ``broadcast_message``) will appear in subsequent ``pop_messages``
    calls. Subscribing to a topic the node is already subscribed to is handled by
    the Go layer.

    Args:
        channel: Channel identifier string in the pubsub format
            ``<owner_peer_id>::ps:<topic_name>``. Must be a non-empty string.

    Raises:
        ValueError: If ``channel`` is empty or not a string.
        P2PError: If the Go ``SubscribeToTopic`` call returns a null result,
            reports an error state, or if any other failure occurs.
    """
    if not channel or not isinstance(channel, str):
        logger.error("Invalid topic name provided.")
        raise ValueError("Invalid topic name provided.")
    logger.info(f"<sub> Subscribing to topic: {channel}...")
    try:
        result_ptr = P2P.libp2p.SubscribeToTopic(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_string(channel)
            )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to subscribe to topic, received null result.")
            raise P2PError("Failed to subscribe to topic, received null result.")
        if result.get('state') == "Error":
            logger.error(f"Failed to subscribe to topic '{channel}': {result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to subscribe to topic '{channel}': {result.get('message', 'Unknown Go error')}")

        logger.info(f"βœ… Successfully subscribed to {channel}")

    except Exception as e:
        logger.error(f"❌ Subscription to {channel} failed: {e}")
        raise P2PError(f"Subscription to {channel} failed") from e

unsubscribe_from_topic

unsubscribe_from_topic(channel: str) -> None

Unsubscribe from a GossipSub/PubSub topic to stop receiving its messages.

Calls the Go UnsubscribeFromTopic function with the channel string. After a successful call, broadcasts on that channel are no longer delivered to this node's message queue. This is the inverse of subscribe_to_topic.

Parameters:

Name Type Description Default
channel str

Channel identifier string in the pubsub format <owner_peer_id>::ps:<topic_name>. Must be a non-empty string.

required

Raises:

Type Description
ValueError

If channel is empty or not a string.

P2PError

If the Go UnsubscribeFromTopic call returns a null result, reports an error state, or if any other failure occurs.

Source code in unaiverse/networking/p2p/p2p.py
def unsubscribe_from_topic(self, channel: str) -> None:
    """Unsubscribe from a GossipSub/PubSub topic to stop receiving its messages.

    Calls the Go ``UnsubscribeFromTopic`` function with the channel string. After
    a successful call, broadcasts on that channel are no longer delivered to this
    node's message queue. This is the inverse of ``subscribe_to_topic``.

    Args:
        channel: Channel identifier string in the pubsub format
            ``<owner_peer_id>::ps:<topic_name>``. Must be a non-empty string.

    Raises:
        ValueError: If ``channel`` is empty or not a string.
        P2PError: If the Go ``UnsubscribeFromTopic`` call returns a null result,
            reports an error state, or if any other failure occurs.
    """
    if not channel or not isinstance(channel, str):
        logger.error("Invalid topic name provided.")
        raise ValueError("Invalid topic name provided.")
    logger.info(f"</sub> Unsubscribing from topic: {channel}...")
    try:
        result_ptr = P2P.libp2p.UnsubscribeFromTopic(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_string(channel)
            )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to unsubscribe from topic, received null result.")
            raise P2PError("Failed to unsubscribe from topic, received null result.")
        if result.get('state') == "Error":
            logger.error(f"Failed to unsubscribe from topic '{channel}': "
                         f"{result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to unsubscribe from topic '{channel}': "
                           f"{result.get('message', 'Unknown Go error')}")

        logger.info(f"βœ… Successfully unsubscribed from {channel}")

    except Exception as e:
        logger.error(f"❌ Unsubscription from {channel} failed: {e}")
        raise P2PError(f"Unsubscription from {channel} failed") from e

reserve_on_relay

reserve_on_relay(relay_peer_id: str) -> str

Reserve a circuit-relay v2 slot on the specified relay node.

Calls the Go ReserveOnRelay function with the target relay's peer ID. A successful reservation allows this node to be reached through the relay even when it is behind NAT. The reservation expires at the returned timestamp; callers are responsible for renewing it before expiry.

Parameters:

Name Type Description Default
relay_peer_id str

The peer ID string of the relay node to reserve a slot on. Must be a non-empty string.

required

Returns:

Type Description
str

The UTC expiration timestamp of the reservation as an ISO 8601 string

str

(e.g. "2026-01-01T12:00:00Z").

Raises:

Type Description
ValueError

If relay_peer_id is empty or not a string.

P2PError

If the Go ReserveOnRelay call returns a null result, reports an error state, returns an unexpected type for the expiration value, or if any other failure occurs.

Source code in unaiverse/networking/p2p/p2p.py
def reserve_on_relay(self, relay_peer_id: str) -> str:
    """Reserve a circuit-relay v2 slot on the specified relay node.

    Calls the Go ``ReserveOnRelay`` function with the target relay's peer ID.
    A successful reservation allows this node to be reached through the relay
    even when it is behind NAT. The reservation expires at the returned
    timestamp; callers are responsible for renewing it before expiry.

    Args:
        relay_peer_id: The peer ID string of the relay node to reserve a slot on.
            Must be a non-empty string.

    Returns:
        The UTC expiration timestamp of the reservation as an ISO 8601 string
        (e.g. ``"2026-01-01T12:00:00Z"``).

    Raises:
        ValueError: If ``relay_peer_id`` is empty or not a string.
        P2PError: If the Go ``ReserveOnRelay`` call returns a null result,
            reports an error state, returns an unexpected type for the
            expiration value, or if any other failure occurs.
    """
    if not relay_peer_id or not isinstance(relay_peer_id, str):
        logger.error("Invalid relay multiaddr provided.")
        raise ValueError("Invalid relay multiaddr provided.")
    logger.info(f"πŸ…ΏοΈ Attempting to reserve slot on relay: {relay_peer_id}...")
    try:
        result_ptr = P2P.libp2p.ReserveOnRelay(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_string(relay_peer_id)
            )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to reserve on relay, received null result.")
            raise P2PError("Failed to reserve on relay, received null result.")
        if result.get('state') == "Error":
            logger.error(f"Failed to reserve on relay '{relay_peer_id}': {result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to reserve on relay '{relay_peer_id}': {result.get('message', 'Unknown Go error')}")

        expiration_utc = result.get('message', {})
        if not isinstance(expiration_utc, str):
            raise P2PError(f"Expected expiration timestamp string, but got {type(expiration_utc)}")

        logger.info(f"βœ… Reservation successful. Expires at: {expiration_utc}")
        return expiration_utc

    except Exception as e:
        logger.error(f"❌ Reservation on {relay_peer_id} failed: {e}")
        print(e)
        raise P2PError(f"Reservation on {relay_peer_id} failed") from e

get_node_addresses

get_node_addresses(peer_id: str = '') -> List[str]

Return the known multiaddresses for the local node or a specific remote peer.

Calls the Go GetNodeAddresses function. When peer_id is an empty string, the Go layer returns addresses for the local node; otherwise it returns addresses it has learned for the given remote peer. This method is also used internally by the addresses property.

Parameters:

Name Type Description Default
peer_id str

The peer ID string of the target peer. Pass an empty string (the default) to query the local node's own addresses. Defaults to "".

''

Returns:

Type Description
List[str]

A list of multiaddress strings, each including the /p2p/<PeerID>

List[str]

suffix, as returned by the Go library.

Raises:

Type Description
P2PError

If the Go GetNodeAddresses call returns a null result, reports an error state, or if any other failure occurs.

Source code in unaiverse/networking/p2p/p2p.py
def get_node_addresses(self, peer_id: str = "") -> List[str]:
    """Return the known multiaddresses for the local node or a specific remote peer.

    Calls the Go ``GetNodeAddresses`` function. When ``peer_id`` is an empty
    string, the Go layer returns addresses for the local node; otherwise it
    returns addresses it has learned for the given remote peer. This method is
    also used internally by the ``addresses`` property.

    Args:
        peer_id: The peer ID string of the target peer. Pass an empty string
            (the default) to query the local node's own addresses. Defaults to
            ``""``.

    Returns:
        A list of multiaddress strings, each including the ``/p2p/<PeerID>``
        suffix, as returned by the Go library.

    Raises:
        P2PError: If the Go ``GetNodeAddresses`` call returns a null result,
            reports an error state, or if any other failure occurs.
    """
    target = "local node" if not peer_id else f"peer {peer_id}"
    logger.info(f"ℹ️ Fetching addresses for {target}...")
    try:
        result_ptr = P2P.libp2p.GetNodeAddresses(
            P2P._type_interface.to_go_int(self._instance),
            P2P._type_interface.to_go_string(peer_id)
            )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to get node addresses, received null result.")
            raise P2PError("Failed to get node addresses, received null result.")
        if result.get('state') == "Error":
            logger.error(f"Failed to get addresses for '{target}': {result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to get addresses for '{target}': {result.get('message', 'Unknown Go error')}")

        addr_list = result.get('message', [])
        logger.info(f"βœ… Found addresses for {target}: {addr_list}")
        return addr_list

    except Exception as e:
        logger.error(f"❌ Failed to get addresses for {target}: {e}")
        raise P2PError(f"Failed to get addresses for {target}") from e

get_webrtc_connections

get_webrtc_connections() -> List[Dict[str, Any]]

Return the list of peers with an active WebRTC DataChannel connection.

Calls the Go GetWebRTCConnections function. Only relevant when the node was started with webrtc_enabled=True. Each entry describes one peer that currently has an open WebRTC DataChannel with this node.

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries, each containing at least "peer_id" (str)

List[Dict[str, Any]]

and "state" (str, one of "open" or "other") as returned by

List[Dict[str, Any]]

the Go library.

Raises:

Type Description
P2PError

If the Go GetWebRTCConnections call returns a null result, reports an error state, or if any other failure occurs.

Source code in unaiverse/networking/p2p/p2p.py
def get_webrtc_connections(self) -> List[Dict[str, Any]]:
    """Return the list of peers with an active WebRTC DataChannel connection.

    Calls the Go ``GetWebRTCConnections`` function. Only relevant when the node
    was started with ``webrtc_enabled=True``. Each entry describes one peer that
    currently has an open WebRTC DataChannel with this node.

    Returns:
        A list of dictionaries, each containing at least ``"peer_id"`` (str)
        and ``"state"`` (str, one of ``"open"`` or ``"other"``) as returned by
        the Go library.

    Raises:
        P2PError: If the Go ``GetWebRTCConnections`` call returns a null result,
            reports an error state, or if any other failure occurs.
    """
    try:
        result_ptr = P2P.libp2p.GetWebRTCConnections(
            P2P._type_interface.to_go_int(self._instance),
        )
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)
        if result is None:
            raise P2PError("Null result from GetWebRTCConnections.")
        if result.get('state') == "Error":
            raise P2PError(f"GetWebRTCConnections failed: "
                           f"{result.get('message', 'Unknown Go error')}")
        return result.get('message', [])
    except Exception as e:
        logger.error(f"❌ GetWebRTCConnections failed: {e}")
        raise P2PError("GetWebRTCConnections failed") from e

get_connected_peers_info

get_connected_peers_info() -> List[Dict[str, Any]]

Return information about all peers currently connected to this node.

Calls the Go GetConnectedPeers function. Unlike most other methods, a failure here is caught and logged rather than re-raised, and an empty list is returned instead. This allows the method to be called safely from a polling loop without risk of crashing the caller.

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries, each representing a connected peer. Common keys

List[Dict[str, Any]]

include "addr_info" (a dict with "ID" and "Addrs"),

List[Dict[str, Any]]

"connected_at" (timestamp string), "direction" ("inbound" or

List[Dict[str, Any]]

"outbound"), and "misc" (additional metadata). Returns an empty

List[Dict[str, Any]]

list on error or when no peers are connected.

Note

Errors are logged but not propagated. Callers that need to distinguish between an empty result and a failure should use get_message_queue_length or check the node state independently.

Source code in unaiverse/networking/p2p/p2p.py
def get_connected_peers_info(self) -> List[Dict[str, Any]]:
    """Return information about all peers currently connected to this node.

    Calls the Go ``GetConnectedPeers`` function. Unlike most other methods, a
    failure here is caught and logged rather than re-raised, and an empty list
    is returned instead. This allows the method to be called safely from a
    polling loop without risk of crashing the caller.

    Returns:
        A list of dictionaries, each representing a connected peer. Common keys
        include ``"addr_info"`` (a dict with ``"ID"`` and ``"Addrs"``),
        ``"connected_at"`` (timestamp string), ``"direction"`` (``"inbound"`` or
        ``"outbound"``), and ``"misc"`` (additional metadata). Returns an empty
        list on error or when no peers are connected.

    Note:
        Errors are logged but not propagated. Callers that need to distinguish
        between an empty result and a failure should use ``get_message_queue_length``
        or check the node state independently.
    """

    # Logger.info("ℹ️ Fetching connected peers info...") # Can be noisy
    try:

        # GetConnectedPeers takes no arguments in Go
        result_ptr = P2P.libp2p.GetConnectedPeers(P2P._type_interface.to_go_int(self._instance))
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to get connected peers, received null result.")
            raise P2PError("Failed to get connected peers, received null result.")
        if result.get('state') == "Error":
            logger.error(f"Failed to get connected peers: {result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Failed to get connected peers: {result.get('message', 'Unknown Go error')}")

        peers_list = result.get('message', [])

        # Update internal map (optional)
        # logger.info(f"  Connected peers count: {len(peers_list)}") # Can be noisy
        return peers_list

    except Exception as e:

        # Avoid crashing the polling thread, just log the error
        logger.error(f"❌ Error fetching connected peers info: {e}")

        # Optionally raise P2PError(f"Failed to get connected peers info") from e if called directly
        return []  # Return empty list on error during polling

get_rendezvous_peers_info

get_rendezvous_peers_info() -> Dict[str, Any] | List | None

Return the full rendezvous state including discovered peers and metadata.

Calls the Go GetRendezvousPeers function which returns one of three states: - "Empty": no rendezvous updates have been received yet; None is returned. - "Error": a Go-side failure occurred; P2PError is raised. - "Success": a rendezvous update is available; the state dict is returned.

Like get_connected_peers_info, exceptions are caught, logged, and an empty list is returned to protect polling callers.

Returns:

Type Description
Dict[str, Any] | List | None

A dictionary with the rendezvous state containing "peers"

Dict[str, Any] | List | None

(list of discovered peer info dicts), "update_count" (int), and

Dict[str, Any] | List | None

"last_updated" (timestamp string) on success; None if no

Dict[str, Any] | List | None

rendezvous data has arrived yet; or an empty list if an unexpected

Dict[str, Any] | List | None

error occurs.

Note

Errors are logged but not propagated. The empty-list fallback on unexpected errors is intentional to avoid crashing polling threads.

Source code in unaiverse/networking/p2p/p2p.py
def get_rendezvous_peers_info(self) -> Dict[str, Any] | List | None:
    """Return the full rendezvous state including discovered peers and metadata.

    Calls the Go ``GetRendezvousPeers`` function which returns one of three
    states:
    - ``"Empty"``: no rendezvous updates have been received yet; ``None`` is
      returned.
    - ``"Error"``: a Go-side failure occurred; ``P2PError`` is raised.
    - ``"Success"``: a rendezvous update is available; the state dict is
      returned.

    Like ``get_connected_peers_info``, exceptions are caught, logged, and an
    empty list is returned to protect polling callers.

    Returns:
        A dictionary with the rendezvous state containing ``"peers"``
        (list of discovered peer info dicts), ``"update_count"`` (int), and
        ``"last_updated"`` (timestamp string) on success; ``None`` if no
        rendezvous data has arrived yet; or an empty list if an unexpected
        error occurs.

    Note:
        Errors are logged but not propagated. The empty-list fallback on
        unexpected errors is intentional to avoid crashing polling threads.
    """
    try:
        result_ptr = P2P.libp2p.GetRendezvousPeers(P2P._type_interface.to_go_int(self._instance))
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Failed to get rendezvous peers, received null result.")
            raise P2PError("Failed to get rendezvous peers, received null result.")

        state = result.get('state')
        if state == "Empty":
            logger.debug(f"[Instance {self._instance}] GetRendezvousPeers: No rendezvous messages received yet.")
            return None  # Return None for the "empty" state
        elif state == "Error":
            error_msg = result.get('message', 'Unknown Go error')
            logger.error(f"Failed to get rendezvous peers: {error_msg}")
            raise P2PError(f"Failed to get rendezvous peers: {error_msg}")
        elif state == "Success":

            # The message payload is the full RendezvousState object
            rendezvous_state = result.get('message', {})
            return rendezvous_state
        else:
            logger.error(f"[Instance {self._instance}] GetRendezvousPeers: Received invalid state '{state}'.")
            raise P2PError(f"[Instance {self._instance}] GetRendezvousPeers: Received invalid state.")

    except Exception as e:

        # Avoid crashing the polling thread, just log the error
        logger.error(f"❌ Error fetching rendezvous peers info: {e}")

        # Optionally raise P2PError(f"Failed to get rendezvous peers info") from e if called directly
        return []  # Return empty list on error during polling

get_message_queue_length

get_message_queue_length() -> int

Return the number of messages currently waiting in the incoming queue.

Calls the Go MessageQueueLength function, which returns a C integer directly without the JSON envelope used by other calls. The result is converted via from_go_int. On failure the exception is caught, logged, and -1 is returned to signal an error without raising.

Returns:

Type Description
int

The count of queued messages as a non-negative integer, or -1 if

int

the Go call fails.

Note

A return value of -1 indicates a failure rather than a queue length; callers should handle this case explicitly when the value is used for flow control.

Source code in unaiverse/networking/p2p/p2p.py
def get_message_queue_length(self) -> int:
    """Return the number of messages currently waiting in the incoming queue.

    Calls the Go ``MessageQueueLength`` function, which returns a C integer
    directly without the JSON envelope used by other calls. The result is
    converted via ``from_go_int``. On failure the exception is caught, logged,
    and ``-1`` is returned to signal an error without raising.

    Returns:
        The count of queued messages as a non-negative integer, or ``-1`` if
        the Go call fails.

    Note:
        A return value of ``-1`` indicates a failure rather than a queue length;
        callers should handle this case explicitly when the value is used for
        flow control.
    """
    try:

        # Call Go function, returns C.int directly
        length_cint = P2P.libp2p.MessageQueueLength(P2P._type_interface.to_go_int(self._instance))
        length = P2P._type_interface.from_go_int(length_cint)

        # Print(f"  Current Message Queue Len: {length}") # Can be noisy
        return length
    except Exception as e:

        # Avoid crashing polling thread
        logger.error(f"❌ Error fetching message queue length: {e}")
        return -1  # Indicate error

close

close(close_all: bool = False) -> None | str

Gracefully shut down the libp2p node and release its instance slot.

Calls the Go CloseNode function. When close_all is True, a special instance index of -1 is passed to the Go layer to close every active node at once, and all slots in _instance_ids are reset to False. When close_all is False, only this instance is closed and its slot is freed.

After the Go call succeeds, _peer_id is set to None to signal that the node is no longer active.

Parameters:

Name Type Description Default
close_all bool

If True, shuts down every active node instance in the process and frees all instance slots. Defaults to False.

False

Returns:

Type Description
None | str

A human-readable confirmation string such as

None | str

"Node closed successfully (instance 0)." or

None | str

"Node closed successfully (all instances).".

Raises:

Type Description
P2PError

If the Go CloseNode call returns a null result, reports an error state, or if any other failure occurs during closure.

Source code in unaiverse/networking/p2p/p2p.py
def close(self, close_all: bool = False) -> None | str:
    """Gracefully shut down the libp2p node and release its instance slot.

    Calls the Go ``CloseNode`` function. When ``close_all`` is ``True``, a
    special instance index of ``-1`` is passed to the Go layer to close every
    active node at once, and all slots in ``_instance_ids`` are reset to
    ``False``. When ``close_all`` is ``False``, only this instance is closed
    and its slot is freed.

    After the Go call succeeds, ``_peer_id`` is set to ``None`` to signal that
    the node is no longer active.

    Args:
        close_all: If ``True``, shuts down every active node instance in the
            process and frees all instance slots. Defaults to ``False``.

    Returns:
        A human-readable confirmation string such as
        ``"Node closed successfully (instance 0)."`` or
        ``"Node closed successfully (all instances)."``.

    Raises:
        P2PError: If the Go ``CloseNode`` call returns a null result, reports
            an error state, or if any other failure occurs during closure.
    """
    logger.info("πŸ›‘ Closing node...")

    # 1. Signal background threads to stop
    logger.info("  - Stopping background threads...")

    # 2. Wait briefly for threads to finish (optional, they are daemons)
    # self._get_connected_peers_thread.join(retry_timeout=2)
    # self._check_message_queue_thread.join(retry_timeout=2)
    # print("  - Background threads signaled.")

    # 3. Call the Go CloseNode function
    try:
        if close_all:
            result_ptr = P2P.libp2p.CloseNode(P2P._type_interface.to_go_int(-1))
        else:
            result_ptr = P2P.libp2p.CloseNode(P2P._type_interface.to_go_int(self._instance))
        result = P2P._type_interface.from_go_ptr_to_json(result_ptr)

        if result is None:
            logger.error("Node closure failed: received null result.")
            raise P2PError("Node closure failed: received null result.")
        if result.get('state') == "Error":
            logger.error(f"Node closure failed: {result.get('message', 'Unknown Go error')}")
            raise P2PError(f"Node closure failed: {result.get('message', 'Unknown Go error')}")

        close_msg = (f"Node closed successfully "
                     f"({'all instances' if close_all else f'instance {str(self._instance)}'}).")
        logger.info(f"βœ… {close_msg}")

    except Exception as e:
        logger.error(f"❌ Error closing node: {e}")
        raise P2PError(f"Error closing node: {e}") from e

    # 4. Clear internal state
    self._peer_id = None
    with P2P._instance_lock:
        if close_all:

            # Also apply the lock here and use the corrected logic
            P2P._instance_ids = [False] * P2P._MAX_INSTANCES
            logger.info("🐍 All instance slots have been marked as free.")
        else:
            if self._instance != -1:  # Ensure instance was set
                P2P._instance_ids[self._instance] = False
                logger.info(f"🐍 Instance slot {self._instance} has been marked as free.")

    logger.info("🐍 Python P2P object state cleared.")

    return close_msg

P2PError

Bases: Exception

Exception raised for errors originating in the Go libp2p layer.

Wraps low-level Go library failures (null results, Go-reported error states, failed connections, failed sends, etc.) so callers can catch a single exception type for all P2P-related problems without inspecting raw C pointers or JSON error payloads.

TypeInterface

TypeInterface(libp2p_instance: GoLibP2P)

Low-level bridge for converting between Python types and Go/C types via ctypes.

TypeInterface centralises all marshalling and unmarshalling needed when calling functions exported by a Go shared library through ctypes. Every method either converts a Python value into a C-compatible representation ready to pass into a Go function, or converts a value returned by Go back into a native Python type.

A particular concern is memory safety: Go functions that return heap-allocated C strings transfer ownership of that memory to the caller. from_go_ptr_to_json therefore calls FreeString on the underlying GoLibP2P instance to release the memory, and maintains an internal freed-pointer registry protected by a threading.Lock to guard against accidental double-free errors.

Attributes:

Name Type Description
libp2p GoLibP2P

The GoLibP2P shared-library wrapper used to call Go-exported functions, including FreeString for releasing C memory.

Initialize the type bridge with a loaded Go shared-library instance.

An internal freed-pointer registry (a set guarded by a threading.Lock) is created to detect and prevent double-free errors across concurrent calls to from_go_ptr_to_json.

Parameters:

Name Type Description Default
libp2p_instance GoLibP2P

A fully loaded GoLibP2P instance whose exported functions (including FreeString) will be used for all marshalling operations.

required
Source code in unaiverse/networking/p2p/lib_types.py
def __init__(self, libp2p_instance: GoLibP2P):
    """Initialize the type bridge with a loaded Go shared-library instance.

    An internal freed-pointer registry (a ``set`` guarded by a ``threading.Lock``)
    is created to detect and prevent double-free errors across concurrent calls
    to ``from_go_ptr_to_json``.

    Args:
        libp2p_instance: A fully loaded ``GoLibP2P`` instance whose exported
            functions (including ``FreeString``) will be used for all
            marshalling operations.
    """
    self.__freed_pointers: set[int] = set()  # Track freed pointers to prevent double-free errors
    self.__freed_pointers_lock: Any = Lock()  # A threading lock
    self.libp2p: GoLibP2P = libp2p_instance  # Store the shared library object instance

libp2p instance-attribute

libp2p: GoLibP2P = libp2p_instance

to_go_string

to_go_string(s: str) -> bytes

Convert a Python string to a UTF-8 encoded bytes object for Go/C interop.

The returned bytes value is suitable for direct use with ctypes when passing to a C function that expects a char* (ctypes.c_char_p): ctypes automatically passes a pointer to the byte string's internal data buffer.

If s is None it is treated as an empty string, so callers do not need to guard against None before invoking this method.

Parameters:

Name Type Description Default
s str

The Python string to encode. None is treated as an empty string.

required

Returns:

Type Description
bytes

A bytes object containing the UTF-8 encoded representation of s.

Source code in unaiverse/networking/p2p/lib_types.py
def to_go_string(self, s: str) -> bytes:
    """Convert a Python string to a UTF-8 encoded ``bytes`` object for Go/C interop.

    The returned ``bytes`` value is suitable for direct use with ctypes when passing
    to a C function that expects a ``char*`` (``ctypes.c_char_p``): ctypes
    automatically passes a pointer to the byte string's internal data buffer.

    If ``s`` is ``None`` it is treated as an empty string, so callers do not need
    to guard against ``None`` before invoking this method.

    Args:
        s: The Python string to encode. ``None`` is treated as an empty string.

    Returns:
        A ``bytes`` object containing the UTF-8 encoded representation of ``s``.
    """
    if s is None:
        s = ""
    return s.encode("utf-8")

from_go_string

from_go_string(cstr: bytes) -> str

Convert a null-terminated C string returned by Go into a Python str.

The argument is expected to be the raw value obtained from a ctypes.c_char_p result type, which ctypes exposes as a bytes object (or None for a NULL pointer). A falsy value (None or empty bytes) is mapped to an empty string.

Parameters:

Name Type Description Default
cstr bytes

The raw bytes value from a ctypes.c_char_p result, or None for a NULL pointer.

required

Returns:

Type Description
str

The UTF-8 decoded Python string, or an empty string when cstr is falsy.

Source code in unaiverse/networking/p2p/lib_types.py
def from_go_string(self, cstr: bytes) -> str:
    """Convert a null-terminated C string returned by Go into a Python ``str``.

    The argument is expected to be the raw value obtained from a ``ctypes.c_char_p``
    result type, which ctypes exposes as a ``bytes`` object (or ``None`` for a NULL
    pointer). A falsy value (``None`` or empty bytes) is mapped to an empty string.

    Args:
        cstr: The raw bytes value from a ``ctypes.c_char_p`` result, or ``None``
            for a NULL pointer.

    Returns:
        The UTF-8 decoded Python string, or an empty string when ``cstr`` is falsy.
    """
    if not cstr:
        return ""
    return cstr.decode("utf-8")

to_go_int

to_go_int(i: int) -> c_int

Convert a Python integer to a Go-compatible ctypes.c_int.

Parameters:

Name Type Description Default
i int

The Python integer to convert.

required

Returns:

Type Description
c_int

A ctypes.c_int wrapping i, suitable for passing to a Go-exported

c_int

function that expects a C int.

Source code in unaiverse/networking/p2p/lib_types.py
def to_go_int(self, i: int) -> ctypes.c_int:
    """Convert a Python integer to a Go-compatible ``ctypes.c_int``.

    Args:
        i: The Python integer to convert.

    Returns:
        A ``ctypes.c_int`` wrapping ``i``, suitable for passing to a Go-exported
        function that expects a C ``int``.
    """
    return ctypes.c_int(i)

from_go_int

from_go_int(val: c_int) -> int

Convert a ctypes.c_int returned by Go into a Python int.

Parameters:

Name Type Description Default
val c_int

The ctypes.c_int value returned from a Go-exported function.

required

Returns:

Type Description
int

The corresponding Python int.

Source code in unaiverse/networking/p2p/lib_types.py
def from_go_int(self, val: ctypes.c_int) -> int:
    """Convert a ``ctypes.c_int`` returned by Go into a Python ``int``.

    Args:
        val: The ``ctypes.c_int`` value returned from a Go-exported function.

    Returns:
        The corresponding Python ``int``.
    """
    return int(val)

to_go_float

to_go_float(f: float) -> c_float

Convert a Python float to a Go-compatible ctypes.c_float.

Parameters:

Name Type Description Default
f float

The Python float to convert.

required

Returns:

Type Description
c_float

A ctypes.c_float wrapping f, suitable for passing to a Go-exported

c_float

function that expects a C float.

Source code in unaiverse/networking/p2p/lib_types.py
def to_go_float(self, f: float) -> ctypes.c_float:
    """Convert a Python float to a Go-compatible ``ctypes.c_float``.

    Args:
        f: The Python float to convert.

    Returns:
        A ``ctypes.c_float`` wrapping ``f``, suitable for passing to a Go-exported
        function that expects a C ``float``.
    """
    return ctypes.c_float(f)

from_go_float

from_go_float(val: c_float) -> float

Convert a ctypes.c_float returned by Go into a Python float.

Parameters:

Name Type Description Default
val c_float

The ctypes.c_float value returned from a Go-exported function.

required

Returns:

Type Description
float

The corresponding Python float.

Source code in unaiverse/networking/p2p/lib_types.py
def from_go_float(self, val: ctypes.c_float) -> float:
    """Convert a ``ctypes.c_float`` returned by Go into a Python ``float``.

    Args:
        val: The ``ctypes.c_float`` value returned from a Go-exported function.

    Returns:
        The corresponding Python ``float``.
    """
    return float(val)

to_go_bool

to_go_bool(b: bool) -> c_int

Convert a Python boolean to a Go-compatible integer (1 for True, 0 for False).

Go does not export a dedicated bool type across the C boundary; booleans are conventionally represented as int values where non-zero means true.

Parameters:

Name Type Description Default
b bool

The Python boolean to convert.

required

Returns:

Type Description
c_int

A ctypes.c_int with value 1 when b is True, or 0

c_int

when b is False.

Source code in unaiverse/networking/p2p/lib_types.py
def to_go_bool(self, b: bool) -> ctypes.c_int:
    """Convert a Python boolean to a Go-compatible integer (``1`` for True, ``0`` for False).

    Go does not export a dedicated bool type across the C boundary; booleans are
    conventionally represented as ``int`` values where non-zero means true.

    Args:
        b: The Python boolean to convert.

    Returns:
        A ``ctypes.c_int`` with value ``1`` when ``b`` is ``True``, or ``0``
        when ``b`` is ``False``.
    """
    return ctypes.c_int(1 if b else 0)

from_go_bool

from_go_bool(val: c_int) -> bool

Convert a Go-compatible integer (ctypes.c_int) to a Python boolean.

The convention is that a value of exactly 1 means True; any other value (including negative integers) is treated as False.

Parameters:

Name Type Description Default
val c_int

The ctypes.c_int value returned from a Go-exported function.

required

Returns:

Type Description
bool

True if val equals 1, False otherwise.

Source code in unaiverse/networking/p2p/lib_types.py
def from_go_bool(self, val: ctypes.c_int) -> bool:
    """Convert a Go-compatible integer (``ctypes.c_int``) to a Python boolean.

    The convention is that a value of exactly ``1`` means ``True``; any other
    value (including negative integers) is treated as ``False``.

    Args:
        val: The ``ctypes.c_int`` value returned from a Go-exported function.

    Returns:
        ``True`` if ``val`` equals ``1``, ``False`` otherwise.
    """
    return val == 1

to_go_bytes

to_go_bytes(b: bytes) -> c_char_p

Convert a Python bytes object to a Go-compatible ctypes.c_char_p.

A fixed-size C buffer is allocated via ctypes.create_string_buffer and then cast to ctypes.c_char_p so that Go receives a stable pointer to the byte data. If b is None, an empty byte string is used instead.

Parameters:

Name Type Description Default
b bytes

The Python bytes to convert. None is treated as empty bytes.

required

Returns:

Type Description
c_char_p

A ctypes.c_char_p pointing to the contents of b.

Source code in unaiverse/networking/p2p/lib_types.py
def to_go_bytes(self, b: bytes) -> ctypes.c_char_p:
    """Convert a Python ``bytes`` object to a Go-compatible ``ctypes.c_char_p``.

    A fixed-size C buffer is allocated via ``ctypes.create_string_buffer`` and
    then cast to ``ctypes.c_char_p`` so that Go receives a stable pointer to the
    byte data. If ``b`` is ``None``, an empty byte string is used instead.

    Args:
        b: The Python ``bytes`` to convert. ``None`` is treated as empty bytes.

    Returns:
        A ``ctypes.c_char_p`` pointing to the contents of ``b``.
    """
    if b is None:
        b = b""
    buf = ctypes.create_string_buffer(b, len(b))
    return ctypes.cast(buf, ctypes.c_char_p)

from_go_bytes

from_go_bytes(cptr: c_char_p, length: int) -> bytes

Convert a raw C byte buffer returned by Go into a Python bytes object.

ctypes.string_at is used to copy exactly length bytes starting at the address given by cptr. If either cptr is a NULL pointer or length is not positive, an empty bytes object is returned immediately without attempting a memory read.

Parameters:

Name Type Description Default
cptr c_char_p

The ctypes.c_char_p (or integer address) pointing to the byte buffer allocated by Go.

required
length int

The number of bytes to read from cptr.

required

Returns:

Type Description
bytes

A bytes object containing the copied data, or an empty bytes

bytes

when cptr is falsy or length is not positive.

Source code in unaiverse/networking/p2p/lib_types.py
def from_go_bytes(self, cptr: ctypes.c_char_p, length: int) -> bytes:
    """Convert a raw C byte buffer returned by Go into a Python ``bytes`` object.

    ``ctypes.string_at`` is used to copy exactly ``length`` bytes starting at
    the address given by ``cptr``. If either ``cptr`` is a NULL pointer or
    ``length`` is not positive, an empty ``bytes`` object is returned immediately
    without attempting a memory read.

    Args:
        cptr: The ``ctypes.c_char_p`` (or integer address) pointing to the byte
            buffer allocated by Go.
        length: The number of bytes to read from ``cptr``.

    Returns:
        A ``bytes`` object containing the copied data, or an empty ``bytes``
        when ``cptr`` is falsy or ``length`` is not positive.
    """
    if not cptr or length <= 0:
        return bytes()
    return ctypes.string_at(cptr, length)

from_go_ptr_to_json

from_go_ptr_to_json(c_void_ptr_val: int) -> Any

Convert a Go-allocated C string containing JSON into a Python object, then free the memory.

The full lifecycle is: validate the pointer is not NULL and has not already been freed; cast the integer address to ctypes.c_char_p and read the null-terminated byte string; decode it as UTF-8; parse the resulting string as JSON; and - in a finally block that executes even on error - call libp2p.FreeString to release the C heap memory allocated by Go.

A freed-pointer registry (protected by an internal threading.Lock) guards against double-free errors. If the same pointer address is passed a second time before the registry is cleared, the method raises instead of calling FreeString again.

Parameters:

Name Type Description Default
c_void_ptr_val int

The integer address of the C string allocated by Go. A value of 0 is interpreted as a NULL pointer and raises immediately.

required

Returns:

Type Description
Any

The Python object produced by parsing the JSON string (typically a

Any

dict or a list).

Raises:

Type Description
Exception

If c_void_ptr_val is 0 (NULL pointer).

Exception

If the pointer was already freed (detected via the internal registry), indicating a double-free logic error.

Exception

If reading or UTF-8 decoding the C string fails.

Exception

If the string is not valid JSON (json.JSONDecodeError is re-raised as a plain Exception).

Note

FreeString is always called in the finally block regardless of whether JSON parsing succeeds or fails, to avoid memory leaks. If FreeString itself raises, the error is logged at CRITICAL level but is not re-raised so that the original exception (if any) propagates unchanged.

Source code in unaiverse/networking/p2p/lib_types.py
def from_go_ptr_to_json(self, c_void_ptr_val: int) -> Any:
    """Convert a Go-allocated C string containing JSON into a Python object, then free the memory.

    The full lifecycle is: validate the pointer is not NULL and has not already
    been freed; cast the integer address to ``ctypes.c_char_p`` and read the
    null-terminated byte string; decode it as UTF-8; parse the resulting string
    as JSON; and - in a ``finally`` block that executes even on error - call
    ``libp2p.FreeString`` to release the C heap memory allocated by Go.

    A freed-pointer registry (protected by an internal ``threading.Lock``) guards
    against double-free errors. If the same pointer address is passed a second
    time before the registry is cleared, the method raises instead of calling
    ``FreeString`` again.

    Args:
        c_void_ptr_val: The integer address of the C string allocated by Go.
            A value of ``0`` is interpreted as a NULL pointer and raises
            immediately.

    Returns:
        The Python object produced by parsing the JSON string (typically a
        ``dict`` or a ``list``).

    Raises:
        Exception: If ``c_void_ptr_val`` is ``0`` (NULL pointer).
        Exception: If the pointer was already freed (detected via the internal
            registry), indicating a double-free logic error.
        Exception: If reading or UTF-8 decoding the C string fails.
        Exception: If the string is not valid JSON (``json.JSONDecodeError``
            is re-raised as a plain ``Exception``).

    Note:
        ``FreeString`` is always called in the ``finally`` block regardless of
        whether JSON parsing succeeds or fails, to avoid memory leaks. If
        ``FreeString`` itself raises, the error is logged at ``CRITICAL`` level
        but is not re-raised so that the original exception (if any) propagates
        unchanged.
    """

    if not c_void_ptr_val:  # Check if the address is NULL (0)
        print("Received a NULL pointer from Go function")
        raise Exception("Received a NULL pointer from Go function")

    try:

        # --- Double-Free Check (Before Reading/Casting) ---
        self.__freed_pointers_lock.acquire()  # Acquire lock if using threading
        if c_void_ptr_val in self.__freed_pointers:

            # This indicates a serious logic error elsewhere - the pointer
            # was already freed but somehow passed here again.
            logger.warning(f"πŸ”₯πŸ”₯πŸ”₯ ATTEMPT TO PROCESS ALREADY FREED POINTER {hex(c_void_ptr_val)}! πŸ”₯πŸ”₯πŸ”₯")

            # Raising an error is safer than trying to read potentially invalid memory.
            logger.error(f"Attempt to process pointer {hex(c_void_ptr_val)} which was already freed")
            raise Exception(f"Attempt to process pointer {hex(c_void_ptr_val)} which was already freed")
        self.__freed_pointers_lock.release()  # Release lock if using threading

        # --- Cast void* to c_char_p and Read String ---
        try:

            # Perform the cast only when needed for reading
            c_char_ptr_for_read = ctypes.cast(c_void_ptr_val, ctypes.c_char_p)
            raw_bytes = ctypes.string_at(c_char_ptr_for_read)
            json_string = raw_bytes.decode('utf-8')

            # Logger.debug(f"Read string (len={len(json_string)})
            # from pointer {hex(c_void_ptr_val)}: %.100s...", json_string)
        except (ctypes.ArgumentError, ValueError, UnicodeDecodeError) as read_err:
            logger.error(f"Failed to read/decode string from pointer {hex(c_void_ptr_val)}: "
                         f"{read_err}", exc_info=False)

            # Even if reading fails, the pointer itself *might* still be valid C memory
            # that Go expects us to free. We will proceed to free it in finally.
            raise Exception(f"Failed to read string from pointer {hex(c_void_ptr_val)}: {read_err}") from read_err
        except Exception as unexpected_read_err:  # Catch other potential ctypes issues
            logger.error(f"Unexpected error reading C string from pointer {hex(c_void_ptr_val)}: "
                         f"{unexpected_read_err}", exc_info=True)
            raise Exception(f"Unexpected error reading C string from pointer {hex(c_void_ptr_val)}: "
                            f"{unexpected_read_err}") from unexpected_read_err

        # --- Check for Empty String ---

        # --- Parse JSON ---
        try:

            # Now that we have the string, parse it
            logger.debug(f"Parsing JSON from string: {json_string}")
            parsed_data = json.loads(json_string)
            logger.debug(f"Parsed JSON data: {parsed_data}")

            # Logger.debug(f"Successfully parsed JSON from pointer {hex(c_void_ptr_val)}")
            return parsed_data  # Return the parsed Python object

        except json.JSONDecodeError as json_err:
            logger.error(f"Failed to decode JSON from pointer {hex(c_void_ptr_val)}: {json_err}", exc_info=False)

            # Again, the pointer is likely valid C memory, but the content is bad.
            # Let the block handle freeing.
            raise Exception(f"Failed to decode JSON from pointer {hex(c_void_ptr_val)}: {json_err}") from json_err

    finally:

        # --- CRITICAL: Free C Memory ---
        # This block executes even if errors occurred during read/parse,
        # ensuring we attempt to free any non-NULL pointer received from Go.
        with self.__freed_pointers_lock:
            if c_void_ptr_val:
                logger.info(f"🐍 FINALLY: Freeing pointer {hex(c_void_ptr_val)}...")
                if c_void_ptr_val in self.__freed_pointers:

                    # This check is technically redundant if the initial check worked,
                    # but provides an extra safety layer in case of concurrency issues
                    # (if freed_pointers is shared without locks - which it shouldn't be).
                    logger.warning(f"πŸ”₯πŸ”₯πŸ”₯ DOUBLE FREE DETECTED in finally block for "
                                   f"{hex(c_void_ptr_val)}! Skipping FreeString call again. πŸ”₯πŸ”₯πŸ”₯")
                else:

                    # Add before calling free
                    try:
                        self.libp2p.FreeString(c_void_ptr_val)  # Pass the original void* value
                        # self.__freed_pointers_lock.add(c_void_ptr_val)  # TODO do we still need this?
                        logger.info(f"βœ… FINALLY: FreeString successful for {hex(c_void_ptr_val)}.")
                    except Exception as free_err:

                        # Log if FreeString fails, but don't raise from finally
                        # as it might hide the original error.
                        logger.critical(f"🚨 FAILED TO FREE C MEMORY for pointer "
                                        f"{hex(c_void_ptr_val)} via FreeString: {free_err}", exc_info=True)

to_go_json

to_go_json(data: Any) -> bytes

Encode a Python object to a JSON string and return it as UTF-8 bytes for Go/C interop.

The object is serialised with json.dumps and the resulting string is then passed through to_go_string to produce a UTF-8 encoded bytes value. The returned value is suitable for direct use with ctypes when passing to a C function that expects a char* (ctypes.c_char_p).

Parameters:

Name Type Description Default
data Any

The Python object to serialise (for example a dict or list).

required

Returns:

Type Description
bytes

A bytes object containing the UTF-8 encoded JSON representation of

bytes

data, ready to be passed to a Go-exported C function.

Source code in unaiverse/networking/p2p/lib_types.py
def to_go_json(self, data: Any) -> bytes:
    """Encode a Python object to a JSON string and return it as UTF-8 ``bytes`` for Go/C interop.

    The object is serialised with ``json.dumps`` and the resulting string is then
    passed through ``to_go_string`` to produce a UTF-8 encoded ``bytes`` value.
    The returned value is suitable for direct use with ctypes when passing to a C
    function that expects a ``char*`` (``ctypes.c_char_p``).

    Args:
        data: The Python object to serialise (for example a ``dict`` or ``list``).

    Returns:
        A ``bytes`` object containing the UTF-8 encoded JSON representation of
        ``data``, ready to be passed to a Go-exported C function.
    """
    json_str = json.dumps(data)
    return self.to_go_string(json_str)

from_go_string_to_list

from_go_string_to_list(cstr: c_char_p) -> List[Any]

Decode a JSON-encoded list from a Go C string into a Python list.

The C string is first converted to a Python str via from_go_string, and the resulting JSON text is then parsed with json.loads. The Go-exported function is expected to produce a JSON array; if the content is not a valid JSON list the call will raise.

Parameters:

Name Type Description Default
cstr c_char_p

The raw ctypes.c_char_p value (or equivalent bytes) returned by a Go-exported function, containing a UTF-8 encoded JSON array.

required

Returns:

Type Description
List[Any]

A Python list parsed from the JSON content of cstr.

Raises:

Type Description
JSONDecodeError

If cstr does not contain valid JSON.

UnicodeDecodeError

If the byte sequence in cstr is not valid UTF-8.

Source code in unaiverse/networking/p2p/lib_types.py
def from_go_string_to_list(self, cstr: ctypes.c_char_p) -> List[Any]:
    """Decode a JSON-encoded list from a Go C string into a Python ``list``.

    The C string is first converted to a Python ``str`` via ``from_go_string``, and
    the resulting JSON text is then parsed with ``json.loads``. The Go-exported
    function is expected to produce a JSON array; if the content is not a valid JSON
    list the call will raise.

    Args:
        cstr: The raw ``ctypes.c_char_p`` value (or equivalent ``bytes``) returned
            by a Go-exported function, containing a UTF-8 encoded JSON array.

    Returns:
        A Python ``list`` parsed from the JSON content of ``cstr``.

    Raises:
        json.JSONDecodeError: If ``cstr`` does not contain valid JSON.
        UnicodeDecodeError: If the byte sequence in ``cstr`` is not valid UTF-8.
    """
    s = self.from_go_string(cstr)

    return json.loads(s)