Skip to content

🟡 unaiverse.streams.streamproxy

What this module does 🟡

Provides StreamProxy, a dict-like accessor that binds, enables/disables and routes interactions and data to underlying streams, with a defaults-aware subclass.

streamproxy

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████ ░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█ ░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

StreamProxy

StreamProxy(streams: dict[str, Stream | object] | None = None)

A proxy that wraps a set of streams and exposes a unified stdin/stdout interface.

StreamProxy provides a consistent interface for reading and writing data on a collection of Stream objects (or plain default values) identified by name, index, or implicitly when only one stream is bound. It is the object exposed to processor implementations as self.stdin, self.stdout, and similar attributes.

Streams are stored in three parallel data structures that allow O(1) lookup by full hash key (_streams), by position (_stream_list), and by short name stripped of peer-ID prefix (_streams_by_name_only). All three are kept in sync by every mutating operation (bind, add_new_bind, set).

A proxy may also hold a default UUID (set via bind_uuid_only or bind) that is used automatically whenever the caller omits the uuid argument on get and set.

Attributes:

Name Type Description
default_values

Not defined on the base class; present only on StreamsProxyWithDefaults subinstances.

Examples:

Typical processor usage inside a process step:

>>> # Read from the single input stream
>>> data = self.stdin.get()
>>>
>>> # Write to the single output stream
>>> self.stdout.set(result)
>>>
>>> # Read a specific named input stream
>>> image = self.stdin.get("image")
>>>
>>> # Write to a specific named output stream by index
>>> self.stdout.set(0, processed_image)

Initialize a StreamProxy wrapping the given streams.

All three internal lookup structures are built from streams at construction time. When the same short name (without peer-ID prefix) maps to more than one entry, the first encountered entry wins in _streams_by_name_only. The default UUID is initialised to None and can be set later with bind_uuid_only or bind.

Parameters:

Name Type Description Default
streams dict[str, Stream | object] | None

A dictionary mapping stream hash keys to Stream objects or plain default values. If None, an empty proxy is created. Defaults to None.

None

Examples:

>>> proxy = StreamProxy()  # empty proxy
>>> proxy = StreamProxy({"<default_pos_0>": my_stream})
Source code in unaiverse/streams/streamproxy.py
def __init__(self, streams: dict[str, Stream | object] | None = None):
    """Initialize a ``StreamProxy`` wrapping the given streams.

    All three internal lookup structures are built from ``streams`` at construction
    time. When the same short name (without peer-ID prefix) maps to more than one
    entry, the first encountered entry wins in ``_streams_by_name_only``. The default
    UUID is initialised to ``None`` and can be set later with ``bind_uuid_only`` or
    ``bind``.

    Args:
        streams: A dictionary mapping stream hash keys to ``Stream`` objects or plain
            default values. If ``None``, an empty proxy is created. Defaults to None.

    Examples:
        >>> proxy = StreamProxy()  # empty proxy
        >>> proxy = StreamProxy({"<default_pos_0>": my_stream})
    """
    self._streams: dict[str, Stream | object] = streams if streams is not None else {}
    self._stream_list: list = list(self._streams.values())
    self._streams_by_name_only: dict[str, Stream | object] = {}
    for stream_hash, stream_obj in self._streams.items():
        name_only = StreamProxy.__get_name_only(stream_hash)
        if name_only not in self._streams_by_name_only:  # In case of collision, the first name wins
            self._streams_by_name_only[name_only] = stream_obj
    self._uuid: str | None = None

names property

names: list[str]

Return the full hash keys of all bound streams.

The returned list contains the raw keys stored in _streams (user hashes or <default_pos_N> placeholders), in insertion order. Use items to obtain both keys and stream objects simultaneously.

Returns:

Type Description
list[str]

A list of stream hash key strings.

bind

bind(streams: dict[str, Stream | object], uuid: str | None = None)

Rebind this proxy to a different set of streams.

All three internal lookup structures are rebuilt from streams. A shallow copy of the incoming dictionary is stored so that subsequent external modifications to the caller's dict do not silently affect the proxy. If uuid is provided, the proxy's default UUID is also updated; otherwise the previously configured UUID is preserved.

Parameters:

Name Type Description Default
streams dict[str, Stream | object]

A dictionary mapping stream hash keys to Stream objects or plain default values.

required
uuid str | None

The default UUID to use when callers omit the uuid argument on get and set. If None, the current default UUID is left unchanged. Defaults to None.

None

Examples:

>>> proxy.bind({"image": image_stream, "label": label_stream}, uuid="session-abc")
Source code in unaiverse/streams/streamproxy.py
def bind(self, streams: dict[str, Stream | object], uuid: str | None = None):
    """Rebind this proxy to a different set of streams.

    All three internal lookup structures are rebuilt from ``streams``. A shallow copy
    of the incoming dictionary is stored so that subsequent external modifications to
    the caller's dict do not silently affect the proxy. If ``uuid`` is provided, the
    proxy's default UUID is also updated; otherwise the previously configured UUID is
    preserved.

    Args:
        streams: A dictionary mapping stream hash keys to ``Stream`` objects or plain
            default values.
        uuid: The default UUID to use when callers omit the ``uuid`` argument on
            ``get`` and ``set``. If ``None``, the current default UUID is left
            unchanged. Defaults to None.

    Examples:
        >>> proxy.bind({"image": image_stream, "label": label_stream}, uuid="session-abc")
    """
    self._streams = streams.copy()  # Shallow copy
    self._stream_list = list(streams.values())
    self._streams_by_name_only = {}
    for stream_hash, stream_obj in self._streams.items():
        name_only = StreamProxy.__get_name_only(stream_hash)
        if name_only not in self._streams_by_name_only:  # In case of collision, the first name wins
            self._streams_by_name_only[name_only] = stream_obj
    if uuid is not None:
        self._uuid = uuid

add_new_bind

add_new_bind(stream_hash: str, stream: Stream) -> None

Register an additional stream under the given hash, if not already bound.

If stream_hash is already present in the proxy, the call is a no-op. Otherwise the stream is appended to all three internal lookup structures so that it becomes reachable by full hash, by position, and by short name. In case of a short-name collision, the existing entry wins.

Parameters:

Name Type Description Default
stream_hash str

The full hash key (typically a user hash) to associate with the stream.

required
stream Stream

The Stream object to register.

required
Source code in unaiverse/streams/streamproxy.py
def add_new_bind(self, stream_hash: str, stream: Stream) -> None:
    """Register an additional stream under the given hash, if not already bound.

    If ``stream_hash`` is already present in the proxy, the call is a no-op. Otherwise
    the stream is appended to all three internal lookup structures so that it becomes
    reachable by full hash, by position, and by short name. In case of a short-name
    collision, the existing entry wins.

    Args:
        stream_hash: The full hash key (typically a user hash) to associate with the
            stream.
        stream: The ``Stream`` object to register.
    """
    if stream_hash not in self._streams:
        self._streams[stream_hash] = stream
        self._stream_list.append(stream)
        name_only = StreamProxy.__get_name_only(stream_hash)
        if name_only not in self._streams_by_name_only:  # In case of collision, the first name wins
            self._streams_by_name_only[name_only] = stream

bind_uuid_only

bind_uuid_only(uuid: str | None) -> None

Set the default UUID applied by get and set when the caller omits one.

Passing None clears the default UUID so that no interaction filtering is applied implicitly.

Parameters:

Name Type Description Default
uuid str | None

The UUID string to use as default, or None to clear it.

required
Source code in unaiverse/streams/streamproxy.py
def bind_uuid_only(self, uuid: str | None) -> None:
    """Set the default UUID applied by ``get`` and ``set`` when the caller omits one.

    Passing ``None`` clears the default UUID so that no interaction filtering is
    applied implicitly.

    Args:
        uuid: The UUID string to use as default, or ``None`` to clear it.
    """
    self._uuid = uuid

clear_data

clear_data(uuid: str | None)

Clear current-interaction data from every bound stream.

Calls clear_data on each stream in the proxy for the specified uuid, removing only the data associated with that interaction. Stream interaction records themselves are preserved. This is useful for resetting a single step's output without discarding the full interaction history.

Parameters:

Name Type Description Default
uuid str | None

The UUID of the interaction whose data should be cleared. Pass None to clear data that has no associated UUID.

required
Source code in unaiverse/streams/streamproxy.py
def clear_data(self, uuid: str | None):
    """Clear current-interaction data from every bound stream.

    Calls ``clear_data`` on each stream in the proxy for the specified ``uuid``,
    removing only the data associated with that interaction. Stream interaction records
    themselves are preserved. This is useful for resetting a single step's output
    without discarding the full interaction history.

    Args:
        uuid: The UUID of the interaction whose data should be cleared. Pass ``None``
            to clear data that has no associated UUID.
    """
    for stream_obj in self._stream_list:
        stream_obj.clear_data(uuid=uuid)

clear_all_data

clear_all_data()

Clear all data from every bound stream across all interactions.

Calls clear_all_data on each stream in the proxy, removing every stored data sample regardless of UUID. Interaction records themselves are not removed. Use clear_data when only a specific interaction's data should be erased.

Source code in unaiverse/streams/streamproxy.py
def clear_all_data(self):
    """Clear all data from every bound stream across all interactions.

    Calls ``clear_all_data`` on each stream in the proxy, removing every stored
    data sample regardless of UUID. Interaction records themselves are not removed.
    Use ``clear_data`` when only a specific interaction's data should be erased.
    """
    for stream_obj in self._stream_list:
        stream_obj.clear_all_data()

enable

enable() -> None

Enable every bound stream, allowing data to be written.

Calls enable on each stream object in the proxy. Streams that are already enabled are unaffected. See disable for the reverse operation.

Source code in unaiverse/streams/streamproxy.py
def enable(self) -> None:
    """Enable every bound stream, allowing data to be written.

    Calls ``enable`` on each stream object in the proxy. Streams that are already
    enabled are unaffected. See ``disable`` for the reverse operation.
    """
    for stream_obj in self._stream_list:
        stream_obj.enable()

disable

disable() -> None

Disable every bound stream, preventing data from being written.

Calls disable on each stream object in the proxy. Streams that are already disabled are unaffected. Disabled streams still allow reads and interaction registration; only set calls are blocked unless force=True is passed. See enable for the reverse operation.

Source code in unaiverse/streams/streamproxy.py
def disable(self) -> None:
    """Disable every bound stream, preventing data from being written.

    Calls ``disable`` on each stream object in the proxy. Streams that are already
    disabled are unaffected. Disabled streams still allow reads and interaction
    registration; only ``set`` calls are blocked unless ``force=True`` is passed.
    See ``enable`` for the reverse operation.
    """
    for stream_obj in self._stream_list:
        stream_obj.disable()

get

get(key: str | int | None = None, requested_by: str | None = None, uuid: str | None = None, all_uuids: bool = False, data_type: str | None = None)

Return data from one or all bound streams.

When key is None, data is collected from every stream in the proxy and returned as a list. When key is an int, the stream at that position is addressed. When key is a str, the full hash key is tried first, then the short (name-only) key. Non-Stream entries (plain default values) are returned as-is without UUID or requested_by filtering.

If the proxy's default UUID was set via bind_uuid_only or bind, it is used automatically when uuid is not provided by the caller.

Parameters:

Name Type Description Default
key str | int | None

Stream name (str), zero-based index (int), or None to collect from all streams. Defaults to None.

None
requested_by str | None

Identifier of the caller requesting the data. Forwarded to the underlying Stream.get call for access tracking. Defaults to None.

None
uuid str | None

UUID of the interaction to retrieve data for. When None, the proxy's bound default UUID is used. Defaults to None.

None
all_uuids bool

When True, data from every UUID is returned as a list of (data, data_tag) tuples. Valid only when key or data_type is also provided. Defaults to False.

False
data_type str | None

Short name of a data type. When set (and key is None), only streams whose props.data_type matches this value are considered. Cannot be combined with key. Defaults to None.

None

Returns:

Type Description

When key is None: a list of data values (one per stream), or None

if no stream produced a non-None result. When key is an int or

str: the data value returned by that stream's get method, or the plain

default value for non-Stream entries. When all_uuids is True, each

element is a list of (data, data_tag) tuples.

Raises:

Type Description
GenException

If all_uuids is True but neither key nor data_type is specified.

GenException

If both key and data_type are specified simultaneously.

GenException

If key is a string that does not match any bound stream.

Examples:

>>> # Read from the only bound stream
>>> sample = self.stdin.get()
>>>
>>> # Read a named stream
>>> image = self.stdin.get("image")
>>>
>>> # Read all samples for a specific UUID across every stream
>>> result = self.stdin.get(all_uuids=True, key="label")
Source code in unaiverse/streams/streamproxy.py
def get(self, key: str | int | None = None, requested_by: str | None = None, uuid: str | None = None,
        all_uuids: bool = False, data_type: str | None = None):
    """Return data from one or all bound streams.

    When ``key`` is ``None``, data is collected from every stream in the proxy and
    returned as a list. When ``key`` is an ``int``, the stream at that position is
    addressed. When ``key`` is a ``str``, the full hash key is tried first, then the
    short (name-only) key. Non-``Stream`` entries (plain default values) are returned
    as-is without UUID or ``requested_by`` filtering.

    If the proxy's default UUID was set via ``bind_uuid_only`` or ``bind``, it is used
    automatically when ``uuid`` is not provided by the caller.

    Args:
        key: Stream name (``str``), zero-based index (``int``), or ``None`` to collect
            from all streams. Defaults to None.
        requested_by: Identifier of the caller requesting the data. Forwarded to the
            underlying ``Stream.get`` call for access tracking. Defaults to None.
        uuid: UUID of the interaction to retrieve data for. When ``None``, the proxy's
            bound default UUID is used. Defaults to None.
        all_uuids: When ``True``, data from every UUID is returned as a list of
            ``(data, data_tag)`` tuples. Valid only when ``key`` or ``data_type`` is
            also provided. Defaults to False.
        data_type: Short name of a data type. When set (and ``key`` is ``None``),
            only streams whose ``props.data_type`` matches this value are considered.
            Cannot be combined with ``key``. Defaults to None.

    Returns:
        When ``key`` is ``None``: a list of data values (one per stream), or ``None``
        if no stream produced a non-``None`` result. When ``key`` is an ``int`` or
        ``str``: the data value returned by that stream's ``get`` method, or the plain
        default value for non-``Stream`` entries. When ``all_uuids`` is ``True``, each
        element is a list of ``(data, data_tag)`` tuples.

    Raises:
        GenException: If ``all_uuids`` is ``True`` but neither ``key`` nor
            ``data_type`` is specified.
        GenException: If both ``key`` and ``data_type`` are specified simultaneously.
        GenException: If ``key`` is a string that does not match any bound stream.

    Examples:
        >>> # Read from the only bound stream
        >>> sample = self.stdin.get()
        >>>
        >>> # Read a named stream
        >>> image = self.stdin.get("image")
        >>>
        >>> # Read all samples for a specific UUID across every stream
        >>> result = self.stdin.get(all_uuids=True, key="label")
    """
    if len(self._stream_list) == 0:
        return None

    if uuid is None:
        uuid = self._uuid

    if all_uuids and (not data_type and not key):
        raise GenException("You can only ask for all UUIDs if you also also specify a "
                           "stream name (key) or a data type")

    if key and data_type:
        raise GenException("You can only specify a stream name (key) or a data type, not both")

    if key is None:
        found_at_least_one = False
        ret = []
        for s in self._stream_list:
            if isinstance(s, Stream):
                if data_type and s.props.data_type != data_type:
                    continue
                data = s.get(requested_by, uuid, all_uuids)  # This might will be None if requested multiple times
                if data is not None:
                    found_at_least_one = True
                ret.append(data)
            else:
                ret.append(s)  # This will always be not-None (well, unless None is the actual value)
        return ret if found_at_least_one else None
    elif isinstance(key, int):
        if isinstance(self._stream_list[key], Stream):
            return self._stream_list[key].get(requested_by, uuid, all_uuids)
        else:
            return self._stream_list[key]  # Default value
    elif key in self._streams:
        if isinstance(self._streams[key], Stream):
            return self._streams[key].get(requested_by, uuid, all_uuids)
        else:
            return self._streams[key]  # Default value
    elif key in self._streams_by_name_only:
        if isinstance(self._streams_by_name_only[key], Stream):
            return self._streams_by_name_only[key].get(requested_by, uuid, all_uuids)
        else:
            return self._streams_by_name_only[key]  # Default value
    else:
        raise GenException(f"Unknown stream/key: {key}"
                           f"\n{self._stream_list}\n{self._streams}\n{self._streams_by_name_only}")

add_interaction

add_interaction(interaction) -> None

Register an interaction on every bound stream.

Calls add_interaction on each entry in the stream list. This makes the interaction (and its UUID) visible to all streams simultaneously, which is required before data can be written for that interaction via set.

Parameters:

Name Type Description Default
interaction

The Interaction object to register on all streams.

required
Source code in unaiverse/streams/streamproxy.py
def add_interaction(self, interaction) -> None:
    """Register an interaction on every bound stream.

    Calls ``add_interaction`` on each entry in the stream list. This makes the
    interaction (and its UUID) visible to all streams simultaneously, which is required
    before data can be written for that interaction via ``set``.

    Args:
        interaction: The ``Interaction`` object to register on all streams.
    """
    for s in self._stream_list:
        s.add_interaction(interaction)

has_interaction

has_interaction(uuid: str | None) -> bool

Return whether every bound stream has an interaction for the given UUID.

Non-Stream entries are skipped. The check is strict: all remaining Stream objects must report True from their own has_interaction method.

Parameters:

Name Type Description Default
uuid str | None

The UUID to look up. Pass None to check for UUID-less interactions.

required

Returns:

Type Description
bool

True if every Stream in the proxy has an interaction for uuid,

bool

False if any one of them does not.

Source code in unaiverse/streams/streamproxy.py
def has_interaction(self, uuid: str | None) -> bool:
    """Return whether every bound stream has an interaction for the given UUID.

    Non-``Stream`` entries are skipped. The check is strict: all remaining ``Stream``
    objects must report ``True`` from their own ``has_interaction`` method.

    Args:
        uuid: The UUID to look up. Pass ``None`` to check for UUID-less interactions.

    Returns:
        ``True`` if every ``Stream`` in the proxy has an interaction for ``uuid``,
        ``False`` if any one of them does not.
    """
    for s in self._stream_list:
        if not isinstance(s, Stream):
            continue
        if not s.has_interaction(uuid):
            return False
    return True

get_interaction

get_interaction(uuid: str | None) -> object | None

Retrieve the interaction for the given UUID from the first stream that holds one.

has_interaction is called first; if it returns False the method returns None immediately without iterating. Because has_interaction guarantees that all streams share the same interaction, the first Stream entry that provides a non-None result is returned.

Parameters:

Name Type Description Default
uuid str | None

The UUID of the interaction to retrieve. Pass None for UUID-less interactions.

required

Returns:

Type Description
object | None

The Interaction object if every stream has one for uuid, or None

object | None

if has_interaction returns False.

Source code in unaiverse/streams/streamproxy.py
def get_interaction(self, uuid: str | None) -> object | None:
    """Retrieve the interaction for the given UUID from the first stream that holds one.

    ``has_interaction`` is called first; if it returns ``False`` the method returns
    ``None`` immediately without iterating. Because ``has_interaction`` guarantees that
    all streams share the same interaction, the first ``Stream`` entry that provides a
    non-``None`` result is returned.

    Args:
        uuid: The UUID of the interaction to retrieve. Pass ``None`` for UUID-less
            interactions.

    Returns:
        The ``Interaction`` object if every stream has one for ``uuid``, or ``None``
        if ``has_interaction`` returns ``False``.
    """
    if not self.has_interaction(uuid):
        return None

    for s in self._stream_list:
        if not isinstance(s, Stream):
            continue
        interaction = s.get_interaction(uuid)
        if interaction is not None:
            return interaction  # This will happen at the 1st iteration for the way "has_interaction" is implemented
    return None  # This will never happen, since we already ensured that it "has_interaction"

set

set(key_or_data, data=None, data_tag: int = -1, uuid: str | None = None, force: bool = False)

Write data to one or all bound streams.

Supports four calling conventions:

  • proxy.set(data) -- single-stream or broadcast: when key_or_data is neither an int nor a known key, it is treated as the data for the only stream. When key_or_data is a list or tuple, each element is written to the corresponding stream in positional order.
  • proxy.set(index, data) -- by zero-based integer index.
  • proxy.set(stream_name, data) -- by full hash key or short name.

For non-Stream entries (plain default values), the value is updated in all three internal lookup structures so they remain consistent. If the proxy's default UUID was set via bind_uuid_only or bind, it is used automatically when uuid is not provided by the caller.

Parameters:

Name Type Description Default
key_or_data

One of: the data to write (single-stream case), a list or tuple of values (broadcast case), an int index, or a str stream name / hash key.

required
data

The data to write. Required when key_or_data is an int or str. Defaults to None.

None
data_tag int

Integer tag associated with the sample. Pass -1 to let the stream assign an automatic tag. Defaults to -1.

-1
uuid str | None

UUID of the interaction for which data is being written. When None, the proxy's bound default UUID is used. Defaults to None.

None
force bool

When True, the write is performed even on disabled streams. Defaults to False.

False

Raises:

Type Description
GenException

If no streams are bound to this proxy.

GenException

If key_or_data is a list or tuple whose length does not match the number of bound streams.

GenException

If key_or_data is a string that does not match any bound stream name.

Examples:

>>> # Write to the only bound output stream
>>> self.stdout.set(result)
>>>
>>> # Write to multiple output streams at once
>>> self.stdout.set([image_out, label_out])
>>>
>>> # Write to a named stream with a custom tag
>>> self.stdout.set("prediction", pred_tensor, data_tag=42)
Source code in unaiverse/streams/streamproxy.py
def set(self, key_or_data, data=None, data_tag: int = -1, uuid: str | None = None, force: bool = False):
    """Write data to one or all bound streams.

    Supports four calling conventions:

    - ``proxy.set(data)`` -- single-stream or broadcast: when ``key_or_data`` is
      neither an ``int`` nor a known key, it is treated as the data for the only
      stream. When ``key_or_data`` is a ``list`` or ``tuple``, each element is written
      to the corresponding stream in positional order.
    - ``proxy.set(index, data)`` -- by zero-based integer index.
    - ``proxy.set(stream_name, data)`` -- by full hash key or short name.

    For non-``Stream`` entries (plain default values), the value is updated in all
    three internal lookup structures so they remain consistent. If the proxy's default
    UUID was set via ``bind_uuid_only`` or ``bind``, it is used automatically when
    ``uuid`` is not provided by the caller.

    Args:
        key_or_data: One of: the data to write (single-stream case), a ``list`` or
            ``tuple`` of values (broadcast case), an ``int`` index, or a ``str``
            stream name / hash key.
        data: The data to write. Required when ``key_or_data`` is an ``int`` or
            ``str``. Defaults to None.
        data_tag: Integer tag associated with the sample. Pass ``-1`` to let the
            stream assign an automatic tag. Defaults to -1.
        uuid: UUID of the interaction for which data is being written. When ``None``,
            the proxy's bound default UUID is used. Defaults to None.
        force: When ``True``, the write is performed even on disabled streams.
            Defaults to False.

    Raises:
        GenException: If no streams are bound to this proxy.
        GenException: If ``key_or_data`` is a list or tuple whose length does not
            match the number of bound streams.
        GenException: If ``key_or_data`` is a string that does not match any bound
            stream name.

    Examples:
        >>> # Write to the only bound output stream
        >>> self.stdout.set(result)
        >>>
        >>> # Write to multiple output streams at once
        >>> self.stdout.set([image_out, label_out])
        >>>
        >>> # Write to a named stream with a custom tag
        >>> self.stdout.set("prediction", pred_tensor, data_tag=42)
    """
    if len(self._stream_list) == 0:
        stack_str = '\n'.join(traceback.format_stack())
        log.error(stack_str)
        raise GenException("No streams bound to this StreamIO")

    if uuid is None:
        uuid = self._uuid

    if isinstance(key_or_data, (list, tuple)):
        if len(key_or_data) != len(self._stream_list):
            raise GenException(f"The list of stream values to set must have the same length of the stream list "
                               f"in this proxy "
                               f"({len(key_or_data)} != {len(self._stream_list)})")
        for i, data in enumerate(key_or_data):
            s = self._stream_list[i]
            if isinstance(s, Stream):
                s.set(data, data_tag, uuid=uuid, force=force)
            else:
                self._stream_list[i] = data
                key_at_index = next(islice(self._streams, i, None))
                self._streams[key_at_index] = data
                name_only = StreamProxy.__get_name_only(key_at_index)
                self._streams_by_name_only[name_only] = data
    elif isinstance(key_or_data, int):
        s = self._stream_list[key_or_data]
        if isinstance(s, Stream):
            s.set(data, uuid=uuid, force=force)
        else:
            self._stream_list[key_or_data] = data
            key_at_index = next(islice(self._streams, key_or_data, None))
            self._streams[key_at_index] = data
            name_only = StreamProxy.__get_name_only(key_at_index)
            self._streams_by_name_only[name_only] = data
    elif key_or_data in self._streams:
        s = self._streams[key_or_data]
        if isinstance(s, Stream):
            s.set(data, data_tag, uuid=uuid, force=force)
        else:
            self._streams[key_or_data] = data

            # We need to keep this fresh in the other data structures
            key_to_pos_in_stream_list = {k: i for i, k in enumerate(self._streams)}
            p = key_to_pos_in_stream_list[key_or_data]
            self._stream_list[p] = data
            name_only = StreamProxy.__get_name_only(key_or_data)
            self._streams_by_name_only[name_only] = data
    elif key_or_data in self._streams_by_name_only:
        if isinstance(self._streams_by_name_only[key_or_data], Stream):
            return self._streams_by_name_only[key_or_data].set(data, data_tag, uuid=uuid, force=force)
        else:
            # We guess the full name and call set again using it
            for i, full_name in enumerate(self._streams.keys()):
                name_only = StreamProxy.__get_name_only(full_name)
                if name_only == key_or_data:
                    return self.set(full_name, data, data_tag, uuid, force)
    else:
        raise GenException(f"Unknown stream: {key_or_data}")

get_tag

get_tag(key: str | int | None = None, uuid: str | None = None) -> int

Return the data tag for the given stream and interaction UUID.

When key is None, tags are collected from all streams and the maximum is returned. A tag value of -1 is returned for non-Stream entries and for streams that have no tag for the given UUID.

Parameters:

Name Type Description Default
key str | int | None

Stream name (str), zero-based index (int), or None to return the maximum tag across all streams. Defaults to None.

None
uuid str | None

UUID of the interaction to query. When None, the proxy's bound default UUID is used. Defaults to None.

None

Returns:

Type Description
int

The integer data tag associated with the most recent sample, or -1 when

int

no tag is available (non-Stream entry, missing UUID, or empty stream list).

Raises:

Type Description
GenException

If no streams are bound to this proxy.

GenException

If key is a string that does not match any bound stream.

Source code in unaiverse/streams/streamproxy.py
def get_tag(self, key: str | int | None = None, uuid: str | None = None) -> int:
    """Return the data tag for the given stream and interaction UUID.

    When ``key`` is ``None``, tags are collected from all streams and the maximum is
    returned. A tag value of ``-1`` is returned for non-``Stream`` entries and for
    streams that have no tag for the given UUID.

    Args:
        key: Stream name (``str``), zero-based index (``int``), or ``None`` to return
            the maximum tag across all streams. Defaults to None.
        uuid: UUID of the interaction to query. When ``None``, the proxy's bound
            default UUID is used. Defaults to None.

    Returns:
        The integer data tag associated with the most recent sample, or ``-1`` when
        no tag is available (non-``Stream`` entry, missing UUID, or empty stream list).

    Raises:
        GenException: If no streams are bound to this proxy.
        GenException: If ``key`` is a string that does not match any bound stream.
    """
    if len(self._stream_list) == 0:
        raise GenException("No streams bound to this StreamIO")

    if uuid is None:
        uuid = self._uuid

    if key is None:
        ret = []
        for s in self._stream_list:
            if isinstance(s, Stream):
                data_tag = s.get_tag(uuid)
                if data_tag is None:
                    return -1
                ret.append(data_tag)
            else:
                ret.append(-1)
        if len(ret) == 0:
            return -1
        return max(ret)
    elif isinstance(key, int):
        if isinstance(self._stream_list[key], Stream):
            return self._stream_list[key].get_tag(uuid)
        else:
            return -1  # Default value
    elif key in self._streams:
        if self._streams[key] is not None:
            stream = self._streams[key]
            if isinstance(stream, Stream):
                tag = stream.get_tag(uuid)
                return tag if tag is not None else -1
            else:
                return -1
        else:
            return -1  # Default value
    else:
        raise GenException(f"Unknown stream: {key}")

clear

clear() -> None

Reset all bound streams by writing None to each of them.

Equivalent to calling set with a list of None values whose length matches the number of bound streams. Useful for clearing output state between processing steps. See clear_data to remove stored data for a specific UUID without overwriting the stream value.

Source code in unaiverse/streams/streamproxy.py
def clear(self) -> None:
    """Reset all bound streams by writing ``None`` to each of them.

    Equivalent to calling ``set`` with a list of ``None`` values whose length matches
    the number of bound streams. Useful for clearing output state between processing
    steps. See ``clear_data`` to remove stored data for a specific UUID without
    overwriting the stream value.
    """
    self.set([None] * len(self))

items

items()

Iterate over (stream_hash, stream) pairs for all bound streams.

Yields each key-value pair from the internal _streams dictionary in insertion order. Keys are full hash strings (either user hashes or <default_pos_N> placeholders). Use names to retrieve only the keys as a list.

Returns:

Type Description

An iterator of (stream_hash, stream_or_value) pairs.

Source code in unaiverse/streams/streamproxy.py
def items(self):
    """Iterate over ``(stream_hash, stream)`` pairs for all bound streams.

    Yields each key-value pair from the internal ``_streams`` dictionary in insertion
    order. Keys are full hash strings (either user hashes or ``<default_pos_N>``
    placeholders). Use ``names`` to retrieve only the keys as a list.

    Returns:
        An iterator of ``(stream_hash, stream_or_value)`` pairs.
    """
    for key, value in self._streams.items():
        yield key, value

values

values()

Iterate over all bound stream objects (or plain default values).

Yields each value from the internal _streams dictionary in insertion order. Entries may be Stream instances or plain Python objects used as default values.

Returns:

Type Description

An iterator over the stream objects and plain values.

Source code in unaiverse/streams/streamproxy.py
def values(self):
    """Iterate over all bound stream objects (or plain default values).

    Yields each value from the internal ``_streams`` dictionary in insertion order.
    Entries may be ``Stream`` instances or plain Python objects used as default values.

    Returns:
        An iterator over the stream objects and plain values.
    """
    yield from self._streams.values()

StreamsProxyWithDefaults

StreamsProxyWithDefaults(*args, default_values=None, **kwargs)

Bases: StreamProxy

A StreamProxy that substitutes None results with per-stream default values.

This subclass wraps StreamProxy.get so that when a stream returns None for a given position, the corresponding entry from default_values is returned instead. It is useful when a processor needs a guaranteed non-None value for every stream slot even if data has not yet arrived.

Attributes:

Name Type Description
default_values

A list of per-stream fallback values applied by get. Each element corresponds to the stream at the same position. If an element is itself None, no substitution is made for that slot.

Initialize a StreamsProxyWithDefaults with optional per-stream fallbacks.

Forwards all positional and keyword arguments to StreamProxy.__init__. The default_values list is stored separately and is applied during every get call. The length of default_values should match the number of bound streams; mismatches do not raise an error but may cause index-out-of-range issues at runtime.

Parameters:

Name Type Description Default
*args

Positional arguments forwarded to StreamProxy.__init__.

()
default_values

A list of fallback values, one per stream position. When get would return None for position i, default_values[i] is substituted. Defaults to None, which disables substitution entirely.

None
**kwargs

Keyword arguments forwarded to StreamProxy.__init__.

{}
Source code in unaiverse/streams/streamproxy.py
def __init__(self, *args, default_values=None, **kwargs):
    """Initialize a ``StreamsProxyWithDefaults`` with optional per-stream fallbacks.

    Forwards all positional and keyword arguments to ``StreamProxy.__init__``. The
    ``default_values`` list is stored separately and is applied during every ``get``
    call. The length of ``default_values`` should match the number of bound streams;
    mismatches do not raise an error but may cause index-out-of-range issues at
    runtime.

    Args:
        *args: Positional arguments forwarded to ``StreamProxy.__init__``.
        default_values: A list of fallback values, one per stream position. When
            ``get`` would return ``None`` for position ``i``, ``default_values[i]``
            is substituted. Defaults to None, which disables substitution entirely.
        **kwargs: Keyword arguments forwarded to ``StreamProxy.__init__``.
    """
    super().__init__(*args, **kwargs)
    self.default_values = default_values

default_values instance-attribute

default_values = default_values

get

get(*args, **kwargs)

Return data from bound streams, substituting None with default values.

Delegates to StreamProxy.get and then replaces any None entries in the resulting list with the corresponding element from self.default_values. If the result from the parent is itself None (no stream produced data) or if self.default_values is None, the result is returned unchanged.

Parameters:

Name Type Description Default
*args

Positional arguments forwarded to StreamProxy.get.

()
**kwargs

Keyword arguments forwarded to StreamProxy.get.

{}

Returns:

Type Description

A list of data values where None positions have been replaced by the

configured default values, or the unmodified result from StreamProxy.get

if no substitution is applicable.

Source code in unaiverse/streams/streamproxy.py
def get(self, *args, **kwargs):
    """Return data from bound streams, substituting ``None`` with default values.

    Delegates to ``StreamProxy.get`` and then replaces any ``None`` entries in the
    resulting list with the corresponding element from ``self.default_values``. If the
    result from the parent is itself ``None`` (no stream produced data) or if
    ``self.default_values`` is ``None``, the result is returned unchanged.

    Args:
        *args: Positional arguments forwarded to ``StreamProxy.get``.
        **kwargs: Keyword arguments forwarded to ``StreamProxy.get``.

    Returns:
        A list of data values where ``None`` positions have been replaced by the
        configured default values, or the unmodified result from ``StreamProxy.get``
        if no substitution is applicable.
    """
    data_list = super().get(*args, **kwargs)
    if data_list is not None and self.default_values is not None:
        for i, data in enumerate(data_list):
            if data is None:
                data_list[i] = self.default_values[i]
    return data_list