Skip to content

🟑 unaiverse.utils.os_fd_capture

What this module does 🟑

Provides the FdCapture utility that redirects and reads OS-level file descriptors (stdout/stderr) via a background reader thread, used to surface native Go library output through the Python logger.

os_fd_capture

β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆ β–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘ β–‘β–ˆβ–ˆβ–ˆ β–ˆ β–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘ β–‘β–ˆβ–ˆβ–ˆ β–‘ β–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘β–ˆβ–ˆβ–ˆ β–‘ β–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–‘β–‘β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

FdCapture

FdCapture(fd: int, callback: Callable[[str], None], encoding: str = 'utf-8', errors: str = 'replace')

OS-level file-descriptor redirector for capturing C and Go library output.

The Go libp2p library writes its log lines directly to the OS stdout/stderr file descriptors (fd 1 and fd 2), bypassing Python's sys.stdout/sys.stderr entirely. A simple sys.stdout reassignment therefore has no effect on that output.

FdCapture solves this by operating at the OS level using POSIX dup/dup2 primitives:

  1. An os.pipe is created, producing a (read_fd, write_fd) pair.
  2. The original target file descriptor (e.g. fd 1 for stdout) is saved via os.dup and exposed as real_fd.
  3. The pipe's write end is installed over the target fd with os.dup2, so every write to that fd -- from any language or library -- flows into the pipe instead.
  4. sys.stdout or sys.stderr is replaced with a wrapper around real_fd so that Python's own logging and print statements still reach the real terminal.
  5. A background daemon thread reads from the pipe line-by-line and invokes the caller-supplied callback for each non-blank line.
  6. On stop, the pipe's write end is closed (signalling EOF to the reader thread), the reader drains remaining data, and the original fd is restored.

Both stdout (fd 1) and stderr (fd 2) can be captured simultaneously by creating two independent FdCapture instances.

FdCapture supports the context manager protocol: start is called on entry and stop is called on exit (see __enter__/__exit__).

Attributes:

Name Type Description
real_fd int | None

The duplicated copy of the original file descriptor, available while capture is active (see real_fd).

Note

start and stop are idempotent and protected by an internal threading.Lock, making them safe to call from multiple threads.

Initialize an FdCapture instance without starting the capture.

Validates that fd is either 1 (stdout) or 2 (stderr) and stores all configuration for later use by start. No OS-level operations are performed here; the pipe and dup machinery are set up only when start is called.

Parameters:

Name Type Description Default
fd int

The OS file descriptor to capture. Must be 1 (stdout) or 2 (stderr).

required
callback Callable[[str], None]

Called for every complete line read from the captured fd. Receives the decoded line as a str with the trailing newline stripped.

required
encoding str

Byte encoding used to decode captured bytes. Defaults to "utf-8".

'utf-8'
errors str

Error handler name passed to bytes.decode for decode failures. Defaults to "replace".

'replace'

Raises:

Type Description
ValueError

If fd is not 1 or 2.

Source code in unaiverse/utils/os_fd_capture.py
def __init__(
        self,
        fd: int,
        callback: Callable[[str], None],
        encoding: str = "utf-8",
        errors: str = "replace",
) -> None:
    """Initialize an ``FdCapture`` instance without starting the capture.

    Validates that ``fd`` is either 1 (stdout) or 2 (stderr) and stores all
    configuration for later use by ``start``. No OS-level operations are performed
    here; the pipe and dup machinery are set up only when ``start`` is called.

    Args:
        fd: The OS file descriptor to capture. Must be ``1`` (stdout) or ``2``
            (stderr).
        callback: Called for every complete line read from the captured fd. Receives
            the decoded line as a ``str`` with the trailing newline stripped.
        encoding: Byte encoding used to decode captured bytes. Defaults to
            ``"utf-8"``.
        errors: Error handler name passed to ``bytes.decode`` for decode failures.
            Defaults to ``"replace"``.

    Raises:
        ValueError: If ``fd`` is not ``1`` or ``2``.
    """
    if fd not in (1, 2):
        raise ValueError(f"FdCapture only supports fd 1 (stdout) or fd 2 (stderr), got {fd}")
    self._target_fd = fd
    self._callback = callback
    self._encoding = encoding
    self._errors = errors
    self._lock = threading.Lock()
    self._active = False
    self._py_stream = None

    # Set when start() is called; cleared on stop()
    self._saved_fd: int | None = None  # dup of original fd
    self._read_fd: int | None = None  # read end of the pipe
    self._write_fd: int | None = None  # write end (dup2'd over target)
    self._thread: threading.Thread | None = None

real_fd property

real_fd: int | None

The duped copy of the original fd, valid only while capture is active.

After start is called, the original OS file descriptor (e.g. fd 1 for stdout) is saved by calling os.dup before it is overwritten with the pipe's write end. This saved descriptor is exposed here so that callers can write directly to the real terminal without the output being intercepted by the pipe.

Returns:

Type Description
int | None

The duplicated file descriptor integer when the capture is active, or None

int | None

if start has not been called or stop has already been called.

start

start() -> None

Begin capturing output written to the target file descriptor.

Performs the following sequence under an internal lock, so concurrent calls are safe and the method is idempotent (a second call while already active is a no-op):

  1. Flushes the Python-level stream (sys.stdout or sys.stderr) to avoid interleaving buffered data with the new pipe.
  2. Saves the original OS file descriptor via os.dup (accessible via real_fd).
  3. Creates a new os.pipe (read end and write end).
  4. Redirects the target fd to the write end of the pipe with os.dup2, so that all future writes to that fd (including from C and Go libraries) flow into the pipe. sys.stdout/sys.stderr are then reassigned to wrap real_fd directly, ensuring that Python-level logging still reaches the terminal.
  5. Launches a background daemon thread (_reader_loop) that reads lines from the pipe and invokes the callback for each non-blank line.
Note

After start returns, any write to fd 1 or fd 2 (depending on which fd was captured) goes into the pipe and is delivered to the callback. Direct writes to the Python sys.stdout/sys.stderr objects bypass the pipe and reach the real terminal through real_fd.

Source code in unaiverse/utils/os_fd_capture.py
def start(self) -> None:
    """Begin capturing output written to the target file descriptor.

    Performs the following sequence under an internal lock, so concurrent calls are
    safe and the method is idempotent (a second call while already active is a no-op):

    1. Flushes the Python-level stream (``sys.stdout`` or ``sys.stderr``) to avoid
       interleaving buffered data with the new pipe.
    2. Saves the original OS file descriptor via ``os.dup`` (accessible via
       ``real_fd``).
    3. Creates a new ``os.pipe`` (read end and write end).
    4. Redirects the target fd to the write end of the pipe with ``os.dup2``, so that
       all future writes to that fd (including from C and Go libraries) flow into the
       pipe. ``sys.stdout``/``sys.stderr`` are then reassigned to wrap ``real_fd``
       directly, ensuring that Python-level logging still reaches the terminal.
    5. Launches a background daemon thread (``_reader_loop``) that reads lines from
       the pipe and invokes the ``callback`` for each non-blank line.

    Note:
        After ``start`` returns, any write to fd 1 or fd 2 (depending on which fd was
        captured) goes into the pipe and is delivered to the callback. Direct writes to
        the Python ``sys.stdout``/``sys.stderr`` objects bypass the pipe and reach the
        real terminal through ``real_fd``.
    """
    with self._lock:
        if self._active:
            return

        # 1. Flush the Python-level stream before touching the fd
        if self._target_fd == 1:
            sys.stdout.flush()
        else:
            sys.stderr.flush()

        # 2. Save a copy of the real FD, so we can restore it (and write to it)
        self._saved_fd = os.dup(self._target_fd)

        # 3. Create pipe
        self._read_fd, self._write_fd = os.pipe()

        # 4. Redirect target fd β†’ write end of pipe.
        #    Python's sys.stdout/stderr still point to their original file
        #    objects, which internally hold the original fd number.  After
        #    dup2, those file objects now also write into the pipe.  We
        #    therefore reassign sys.stdout/stderr to write through saved_fd
        #    so that Logger screen output bypasses the pipe.
        os.dup2(self._write_fd, self._target_fd)

        if self._target_fd == 1:
            self._py_stream = sys.stdout
            sys.stdout = open(self._saved_fd, "w",
                              encoding=self._encoding, errors=self._errors,
                              closefd=False)  # closefd=False: we own saved_fd
        else:
            self._py_stream = sys.stderr
            sys.stderr = open(self._saved_fd, "w",
                              encoding=self._encoding, errors=self._errors,
                              closefd=False)

        # 5. Start reader thread
        self._thread = threading.Thread(
            target=self._reader_loop,
            name=f"FdCapture-fd{self._target_fd}",
            daemon=True,
        )
        self._active = True
        self._thread.start()

stop

stop(timeout: float = 2.0) -> None

Stop capturing and restore the original file descriptor.

Idempotent: a call while capture is not active returns immediately without performing any OS operations. The shutdown sequence is:

  1. Flush and restore sys.stdout/sys.stderr to the original Python stream object saved during start.
  2. Restore the original OS-level fd with os.dup2(saved_fd, target_fd), then close saved_fd.
  3. Close the write end of the pipe so that the reader thread sees an EOF and exits naturally.
  4. Wait up to timeout seconds for the reader thread to drain any remaining buffered data and terminate.
  5. Close the read end of the pipe.

Parameters:

Name Type Description Default
timeout float

Maximum number of seconds to wait for the background reader thread to drain and exit before returning. Defaults to 2.0.

2.0
Note

Any lines already in the pipe buffer when stop is called are flushed to the callback by the reader thread before it exits (subject to the timeout limit). Lines still unread when the thread is abandoned are discarded.

Source code in unaiverse/utils/os_fd_capture.py
def stop(self, timeout: float = 2.0) -> None:
    """Stop capturing and restore the original file descriptor.

    Idempotent: a call while capture is not active returns immediately without
    performing any OS operations. The shutdown sequence is:

    1. Flush and restore ``sys.stdout``/``sys.stderr`` to the original Python stream
       object saved during ``start``.
    2. Restore the original OS-level fd with ``os.dup2(saved_fd, target_fd)``, then
       close ``saved_fd``.
    3. Close the write end of the pipe so that the reader thread sees an EOF and
       exits naturally.
    4. Wait up to ``timeout`` seconds for the reader thread to drain any remaining
       buffered data and terminate.
    5. Close the read end of the pipe.

    Args:
        timeout: Maximum number of seconds to wait for the background reader thread
            to drain and exit before returning. Defaults to ``2.0``.

    Note:
        Any lines already in the pipe buffer when ``stop`` is called are flushed to
        the callback by the reader thread before it exits (subject to the ``timeout``
        limit). Lines still unread when the thread is abandoned are discarded.
    """
    with self._lock:
        if not self._active:
            return

        # 1. Restore Python-level stream
        if self._target_fd == 1:
            try:
                sys.stdout.flush()
            except Exception:
                pass
            sys.stdout = self._py_stream
        else:
            try:
                sys.stderr.flush()
            except Exception:
                pass
            sys.stderr = self._py_stream

        # 2. Restore the original OS-level fd
        os.dup2(self._saved_fd, self._target_fd)
        os.close(self._saved_fd)
        self._saved_fd = None

        # 3. Close write end of pipe β†’ reader thread sees EOF
        os.close(self._write_fd)
        self._write_fd = None

        self._active = False

    # 4. Wait for reader thread to drain (outside lock)
    if self._thread is not None:
        self._thread.join(timeout=timeout)
        self._thread = None

    # 5. Close read end
    if self._read_fd is not None:
        try:
            os.close(self._read_fd)
        except OSError:
            pass
        self._read_fd = None