Skip to content

🔴 unaiverse.streams.dataprops

What this module does 🔴

Defines stream data descriptors (DataProps, StreamType, TensorLabels, FileContainer) including net/user hashing, tensor label adaptation, and pre/post-processing of stream payloads.

dataprops

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

FileContainer dataclass

FileContainer(content: bytes | str, filename: str, mime_type: str)

A typed wrapper for file data that distinguishes files from raw bytes or text strings.

FileContainer carries a file's raw content together with its name and MIME type so that stream processing code can handle binary files (e.g. Protobuf payloads, images, PDFs) without ambiguity. It is the canonical data format expected and produced by DataProps instances whose data_type is "file".

Attributes:

Name Type Description
content bytes | str

Raw bytes (or, rarely, a string) holding the file's payload.

filename str

Base name of the file (e.g. "report.pdf").

mime_type str

IANA MIME type string (e.g. "application/pdf"). Falls back to "application/octet-stream" when the type cannot be inferred.

Examples:

>>> fc = FileContainer(content=b"\x89PNG...", filename="image.png", mime_type="image/png")
>>> fc.mime_type
'image/png'

content instance-attribute

content: bytes | str

filename instance-attribute

filename: str

mime_type instance-attribute

mime_type: str

from_path classmethod

from_path(file_path: str) -> FileContainer

Create a FileContainer by reading a file from disk.

The MIME type is guessed from the file extension via mimetypes.guess_type. If the extension is unknown the type defaults to "application/octet-stream". The file is always read in binary mode, which is required for binary formats such as Protobuf messages.

Parameters:

Name Type Description Default
file_path str

Absolute or relative path to the file to read.

required

Returns:

Type Description
FileContainer

A FileContainer whose content holds the raw bytes, filename

FileContainer

holds the base name of the file, and mime_type holds the inferred MIME

FileContainer

type string.

Raises:

Type Description
FileNotFoundError

If file_path does not point to an existing file.

PermissionError

If the process lacks read permission on the file.

Examples:

>>> fc = FileContainer.from_path("/tmp/data.json")
>>> fc.filename
'data.json'
>>> fc.mime_type
'application/json'
Source code in unaiverse/streams/dataprops.py
@classmethod
def from_path(cls, file_path: str) -> 'FileContainer':
    """Create a ``FileContainer`` by reading a file from disk.

    The MIME type is guessed from the file extension via ``mimetypes.guess_type``.
    If the extension is unknown the type defaults to ``"application/octet-stream"``.
    The file is always read in binary mode, which is required for binary formats
    such as Protobuf messages.

    Args:
        file_path: Absolute or relative path to the file to read.

    Returns:
        A ``FileContainer`` whose ``content`` holds the raw bytes, ``filename``
        holds the base name of the file, and ``mime_type`` holds the inferred MIME
        type string.

    Raises:
        FileNotFoundError: If ``file_path`` does not point to an existing file.
        PermissionError: If the process lacks read permission on the file.

    Examples:
        >>> fc = FileContainer.from_path("/tmp/data.json")
        >>> fc.filename
        'data.json'
        >>> fc.mime_type
        'application/json'
    """
    # 1. Guess MIME type based on extension
    mime_type, _ = mimetypes.guess_type(file_path)
    if mime_type is None:
        mime_type = "application/octet-stream"  # Safe fallback for binary

    # 2. Extract clean filename
    filename = os.path.basename(file_path)

    # 3. Read file safely as BYTES (crucial for Protobuf)
    with open(file_path, "rb") as f:
        file_bytes = f.read()

    return cls(content=file_bytes, filename=filename, mime_type=mime_type)

StreamType

StreamType(*args: object, private_only: bool = False, public_only: bool = False, **kwargs: object)

A paired container that holds one or two DataProps instances for a stream.

A StreamType groups a private DataProps and, optionally, a public DataProps for the same logical data stream. This mirrors the UNaIVERSE networking model where a stream may be accessible over both the private peer-to-peer channel and the public network.

By default, both a private and a public DataProps are created. Use private_only or public_only to restrict creation to a single variant. set_* methods dispatched through __getattr__ are automatically forwarded to every contained DataProps, allowing batch configuration in a single call.

Attributes:

Name Type Description
props

Ordered list of DataProps instances. The first entry is always the private stream descriptor; the second (when present) is the public one.

Examples:

>>> from unaiverse.streams.dataprops import StreamType
>>> st = StreamType("text")  # shorthand: creates private + public text stream
>>> len(st.props)
2
>>> st_private = StreamType("tensor", private_only=True,
...                         tensor_shape=(1, 128), tensor_dtype="torch.float32")
>>> len(st_private.props)
1

Initialize a StreamType with one or two DataProps descriptors.

When exactly one positional argument is given and no keyword arguments are provided, it is treated as the data_type shorthand (e.g. StreamType("text")). In all other cases the positional and keyword arguments are forwarded verbatim to the DataProps constructor.

Unless private_only=True, a second DataProps is created with public=True appended to the same arguments, representing the public variant of the stream.

Parameters:

Name Type Description Default
*args object

Positional arguments forwarded to DataProps. A single string positional argument is interpreted as the data_type shorthand.

()
private_only bool

When True, only the private DataProps is created and the list has one element. Defaults to False.

False
public_only bool

When True, only the public DataProps is created and the list has one element. Defaults to False.

False
**kwargs object

Keyword arguments forwarded to DataProps. The public key must not be set here - it is managed internally.

{}

Raises:

Type Description
ValueError

If both private_only and public_only are True.

ValueError

If public is passed explicitly in kwargs, since it is controlled internally by this class.

Source code in unaiverse/streams/dataprops.py
def __init__(self, *args: object, private_only: bool = False, public_only: bool = False, **kwargs: object) -> None:
    """Initialize a ``StreamType`` with one or two ``DataProps`` descriptors.

    When exactly one positional argument is given and no keyword arguments are
    provided, it is treated as the ``data_type`` shorthand (e.g.
    ``StreamType("text")``). In all other cases the positional and keyword arguments
    are forwarded verbatim to the ``DataProps`` constructor.

    Unless ``private_only=True``, a second ``DataProps`` is created with
    ``public=True`` appended to the same arguments, representing the public variant
    of the stream.

    Args:
        *args: Positional arguments forwarded to ``DataProps``. A single string
            positional argument is interpreted as the ``data_type`` shorthand.
        private_only: When ``True``, only the private ``DataProps`` is created and
            the list has one element. Defaults to False.
        public_only: When ``True``, only the public ``DataProps`` is created and
            the list has one element. Defaults to False.
        **kwargs: Keyword arguments forwarded to ``DataProps``. The ``public`` key
            must not be set here - it is managed internally.

    Raises:
        ValueError: If both ``private_only`` and ``public_only`` are ``True``.
        ValueError: If ``public`` is passed explicitly in ``kwargs``, since it is
            controlled internally by this class.
    """
    self.props = []

    if public_only and private_only:
        raise ValueError("Cannot set both private_only and public_only to True (it does not make any sense)")
    if 'public' in kwargs:
        raise ValueError("Invalid argument was provided to Stream: 'public' (it is an argument of DataProps)")

    # Shorthand creation: Stream("text")
    if len(args) == 1 and len(kwargs) == 0:
        kwargs['data_type'] = args[0]
        args = []

    kwargs['public'] = False
    self.props.append(DataProps(*args, **kwargs))
    if not private_only:
        kwargs['public'] = True
        self.props.append(DataProps(*args, **kwargs))

props instance-attribute

props = []

to_list_of_dicts

to_list_of_dicts() -> list[dict]

Convert each contained DataProps to a dictionary and return them as a list.

Calls DataProps.to_dict on every element of props and collects the results. This is the serialization entry point when a StreamType is transmitted over the network or persisted to storage.

Returns:

Type Description
list[dict]

A list of dictionaries, one per DataProps instance in props, in the

list[dict]

same order (private first, then public when present).

Source code in unaiverse/streams/dataprops.py
def to_list_of_dicts(self) -> list[dict]:
    """Convert each contained ``DataProps`` to a dictionary and return them as a list.

    Calls ``DataProps.to_dict`` on every element of ``props`` and collects the
    results. This is the serialization entry point when a ``StreamType`` is
    transmitted over the network or persisted to storage.

    Returns:
        A list of dictionaries, one per ``DataProps`` instance in ``props``, in the
        same order (private first, then public when present).
    """
    return [props.to_dict() for props in self.props]

to_dict

to_dict() -> None

Raise RuntimeError unconditionally - use to_list_of_dicts instead.

to_dict is defined on DataProps for single-descriptor serialization. StreamType may contain more than one descriptor, so a single-dict representation is ambiguous and therefore intentionally unsupported here. Call to_list_of_dicts to serialize all contained DataProps objects.

Raises:

Type Description
RuntimeError

Always, because this method is not supported on StreamType.

Source code in unaiverse/streams/dataprops.py
def to_dict(self) -> None:
    """Raise ``RuntimeError`` unconditionally - use ``to_list_of_dicts`` instead.

    ``to_dict`` is defined on ``DataProps`` for single-descriptor serialization.
    ``StreamType`` may contain more than one descriptor, so a single-dict
    representation is ambiguous and therefore intentionally unsupported here.
    Call ``to_list_of_dicts`` to serialize all contained ``DataProps`` objects.

    Raises:
        RuntimeError: Always, because this method is not supported on ``StreamType``.
    """
    raise RuntimeError("This method can only be called on a DataProps object and not on Stream")

from_dict

from_dict() -> None

Raise RuntimeError unconditionally - use DataProps.from_dict instead.

from_dict is defined on DataProps to reconstruct a single descriptor from a dictionary. Reconstructing a StreamType from a single dict is ambiguous because the type may contain multiple DataProps objects. Use DataProps.from_dict on each dictionary produced by to_list_of_dicts and then assemble the StreamType manually.

Raises:

Type Description
RuntimeError

Always, because this method is not supported on StreamType.

Source code in unaiverse/streams/dataprops.py
def from_dict(self) -> None:
    """Raise ``RuntimeError`` unconditionally - use ``DataProps.from_dict`` instead.

    ``from_dict`` is defined on ``DataProps`` to reconstruct a single descriptor
    from a dictionary. Reconstructing a ``StreamType`` from a single dict is
    ambiguous because the type may contain multiple ``DataProps`` objects.
    Use ``DataProps.from_dict`` on each dictionary produced by ``to_list_of_dicts``
    and then assemble the ``StreamType`` manually.

    Raises:
        RuntimeError: Always, because this method is not supported on ``StreamType``.
    """
    raise RuntimeError("This method can only be called on a DataProps object and not on Stream")

clone

clone() -> StreamType

Return a deep copy of this StreamType with independently cloned descriptors.

Creates a new StreamType with an empty props list and then appends a DataProps.clone of every descriptor in this instance. The cloned object shares no mutable state with the original, including transform objects, which are preserved via their __original_* references inside DataProps.

Returns:

Type Description
StreamType

A new StreamType whose props list contains independent clones of

StreamType

each DataProps descriptor in the same order as the original.

Source code in unaiverse/streams/dataprops.py
def clone(self) -> 'StreamType':
    """Return a deep copy of this ``StreamType`` with independently cloned descriptors.

    Creates a new ``StreamType`` with an empty ``props`` list and then appends a
    ``DataProps.clone`` of every descriptor in this instance. The cloned object
    shares no mutable state with the original, including transform objects, which
    are preserved via their ``__original_*`` references inside ``DataProps``.

    Returns:
        A new ``StreamType`` whose ``props`` list contains independent clones of
        each ``DataProps`` descriptor in the same order as the original.
    """
    ret = StreamType()
    ret.props = []
    for p in self.props:
        ret.props.append(p.clone())
    return ret

is_public

is_public() -> None

Raise RuntimeError unconditionally - call is_public on a DataProps instance instead.

A StreamType may hold both a private and a public DataProps, so returning a single boolean for the whole container is ambiguous. Access props[0].is_public() or props[1].is_public() directly when the public/private distinction matters.

Raises:

Type Description
RuntimeError

Always, because this method is not supported on StreamType.

Source code in unaiverse/streams/dataprops.py
def is_public(self) -> None:
    """Raise ``RuntimeError`` unconditionally - call ``is_public`` on a ``DataProps`` instance instead.

    A ``StreamType`` may hold both a private and a public ``DataProps``, so
    returning a single boolean for the whole container is ambiguous. Access
    ``props[0].is_public()`` or ``props[1].is_public()`` directly when the
    public/private distinction matters.

    Raises:
        RuntimeError: Always, because this method is not supported on ``StreamType``.
    """
    raise RuntimeError("This method can only be called on a DataProps object and not on Stream")

DataProps

DataProps(name: str = 'unk', group: str = 'none', data_type: str = 'text', data_desc: str = 'unk', tensor_shape: tuple[int | None, ...] | None = None, tensor_labels: list[str] | str | None = None, tensor_dtype: dtype | str | None = None, tensor_labeling_rule: str = 'max', stream_to_proc_transforms: Callable[..., Any] | PreTrainedTokenizerBase | str | dict | tuple[dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None, dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None] | None = None, proc_to_stream_transforms: Callable[..., Any] | PreTrainedTokenizerBase | str | list | None = None, delta: float = -1, pubsub: bool = False, public: bool = False)

Descriptor for a single UNaIVERSE data stream, carrying type, shape, labels, and transforms.

DataProps captures everything the framework needs to know about a stream in order to validate, transform, and route its data. It supports four concrete data types:

  • "tensor" - a fixed or partially-variable-shaped torch.Tensor.
  • "img" - a PIL.Image.Image that may optionally be converted to a tensor.
  • "text" - a Python str that may optionally be tokenized.
  • "file" - a binary FileContainer (e.g. a PDF or Protobuf payload).
  • "all" - a wildcard type that accepts any data format without validation.

Stream-to-processor and processor-to-stream transforms can be plain callables, HuggingFace PreTrainedTokenizerBase instances, AutoTokenizer model ID strings (prefixed with "AutoTokenizer:"), or vocabulary dictionaries. For tensor streams, integer tensor labels can be attached via TensorLabels.

Attributes:

Name Type Description
VALID_DATA_TYPES

Tuple of accepted data_type values: ('tensor', 'img', 'text', 'file', 'all').

Examples:

Create a text stream with a HuggingFace tokenizer for encoding:

>>> from unaiverse.streams.dataprops import DataProps
>>> dp = DataProps(name="input_text", group="nlp", data_type="text",
...                data_desc="Raw tokenized sentence",
...                stream_to_proc_transforms="AutoTokenizer:bert-base-uncased")

Create a flat float tensor stream with class labels:

>>> import torch
>>> dp = DataProps(name="features", group="none", data_type="tensor",
...                data_desc="Image classification logits",
...                tensor_shape=(1, 10), tensor_dtype=torch.float32,
...                tensor_labels=["cat", "dog", "bird", "fish", "car",
...                               "plane", "ship", "truck", "deer", "horse"])

Initialize a DataProps descriptor for a single data stream.

Validates the combination of arguments against the chosen data_type and stores all attributes. For "tensor" streams the shape, dtype, and optional labels are validated and stored. For "text" streams, AutoTokenizer model IDs are resolved eagerly by downloading the tokenizer. For "img" and "file" streams, tokenizer-related arguments are explicitly rejected.

When stream_to_proc_transforms is a non-list, non-tuple value it is duplicated internally into a two-element list [transform, transform] where index 0 is used for inputs and index 1 is used for targets.

Parameters:

Name Type Description Default
name str

Human-readable identifier for this stream. Must not contain "~". Defaults to "unk".

'unk'
group str

Group name used to multiplex several streams over a single network channel. Must not contain "~". Use "none" to disable grouping. Defaults to "none".

'none'
data_type str

One of "tensor", "img", "text", "file", or "all". Defaults to "text".

'text'
data_desc str

Free-text human-readable description of the stream contents. Defaults to "unk".

'unk'
tensor_shape tuple[int | None, ...] | None

Required for data_type="tensor". Shape of the tensor including the batch dimension (e.g. (1, 128)). Dimensions that are variable-length may be set to None (e.g. (1, None, None)). Must be None for non-tensor types.

None
tensor_labels list[str] | str | None

Optional class or token labels for the feature dimension of a 2-D tensor. Accepted values: a list[str] of label names, or a string of the form "AutoTokenizer:<model_id>" which triggers eager download of the tokenizer vocabulary. Must be None for non-tensor types. Defaults to None.

None
tensor_dtype dtype | str | None

Required for data_type="tensor". Either a torch.dtype object or a string such as "torch.float32". Must be None for non-tensor types. Defaults to None.

None
tensor_labeling_rule str

Rule for mapping a tensor to a label string. Use "max" to select the argmax label, or "geqX" (where X is a float threshold) to select all labels whose value is >= X. Defaults to "max".

'max'
stream_to_proc_transforms Callable[..., Any] | PreTrainedTokenizerBase | str | dict | tuple[dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None, dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None] | None

Transform applied to incoming stream data before the processor sees it. Accepts a callable, a PreTrainedTokenizerBase, an "AutoTokenizer:<model_id>" string, a vocabulary dict (str->int), or a 2-element tuple/list (input_transform, target_transform) to differentiate input and target paths. Defaults to None.

None
proc_to_stream_transforms Callable[..., Any] | PreTrainedTokenizerBase | str | list | None

Transform applied to processor output before it is emitted back into the stream. Accepts a callable, a PreTrainedTokenizerBase, an "AutoTokenizer:<model_id>" string, or a vocabulary list (int->str). Defaults to None.

None
delta float

Minimum time interval in seconds between consecutive stream samples. Values <= 0 indicate real-time (as-fast-as-possible) streaming. Defaults to -1.

-1
pubsub bool

When True, the stream is routed through a Pub/Sub topic rather than a direct peer-to-peer channel. Defaults to False.

False
public bool

When True, the stream descriptor targets the public network interface. Defaults to False.

False

Raises:

Type Description
AssertionError

If data_type is not in VALID_DATA_TYPES.

AssertionError

If data_desc is not a string.

AssertionError

If stream_to_proc_transforms has an unsupported type.

AssertionError

If proc_to_stream_transforms has an unsupported type.

AssertionError

If tensor_shape is missing or invalid for tensor streams.

AssertionError

If tensor_dtype is missing or invalid for tensor streams.

AssertionError

If tensor_labels has an unsupported format.

AssertionError

If tensor-related arguments are non-None for non-tensor types.

AssertionError

If tokenizer transforms are used with image or file streams.

AssertionError

If name or group contain the reserved character "~".

Source code in unaiverse/streams/dataprops.py
def __init__(self,
             name: str = "unk",
             group: str = "none",
             data_type: str = "text",  # Do not set tensor as default
             data_desc: str = "unk",
             tensor_shape: tuple[int | None, ...] | None = None,
             tensor_labels: list[str] | str | None = None,
             tensor_dtype: torch.dtype | str | None = None,
             tensor_labeling_rule: str = "max",
             stream_to_proc_transforms: Callable[..., Any] | PreTrainedTokenizerBase | str | dict | tuple[
                 dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None,
                 dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None] | None = None,
             proc_to_stream_transforms: Callable[..., Any] | PreTrainedTokenizerBase | str | list | None = None,
             delta: float = -1,
             pubsub: bool = False,
             public: bool = False):
    """Initialize a ``DataProps`` descriptor for a single data stream.

    Validates the combination of arguments against the chosen ``data_type`` and
    stores all attributes. For ``"tensor"`` streams the shape, dtype, and optional
    labels are validated and stored. For ``"text"`` streams, AutoTokenizer model IDs
    are resolved eagerly by downloading the tokenizer. For ``"img"`` and ``"file"``
    streams, tokenizer-related arguments are explicitly rejected.

    When ``stream_to_proc_transforms`` is a non-list, non-tuple value it is
    duplicated internally into a two-element list ``[transform, transform]`` where
    index ``0`` is used for inputs and index ``1`` is used for targets.

    Args:
        name: Human-readable identifier for this stream. Must not contain ``"~"``.
            Defaults to ``"unk"``.
        group: Group name used to multiplex several streams over a single network
            channel. Must not contain ``"~"``. Use ``"none"`` to disable grouping.
            Defaults to ``"none"``.
        data_type: One of ``"tensor"``, ``"img"``, ``"text"``, ``"file"``, or
            ``"all"``. Defaults to ``"text"``.
        data_desc: Free-text human-readable description of the stream contents.
            Defaults to ``"unk"``.
        tensor_shape: Required for ``data_type="tensor"``. Shape of the tensor
            including the batch dimension (e.g. ``(1, 128)``). Dimensions that are
            variable-length may be set to ``None`` (e.g. ``(1, None, None)``).
            Must be ``None`` for non-tensor types.
        tensor_labels: Optional class or token labels for the feature dimension of
            a 2-D tensor. Accepted values: a ``list[str]`` of label names, or a
            string of the form ``"AutoTokenizer:<model_id>"`` which triggers eager
            download of the tokenizer vocabulary. Must be ``None`` for non-tensor
            types. Defaults to ``None``.
        tensor_dtype: Required for ``data_type="tensor"``. Either a ``torch.dtype``
            object or a string such as ``"torch.float32"``. Must be ``None`` for
            non-tensor types. Defaults to ``None``.
        tensor_labeling_rule: Rule for mapping a tensor to a label string. Use
            ``"max"`` to select the argmax label, or ``"geqX"`` (where ``X`` is a
            float threshold) to select all labels whose value is >= X.
            Defaults to ``"max"``.
        stream_to_proc_transforms: Transform applied to incoming stream data before
            the processor sees it. Accepts a callable, a
            ``PreTrainedTokenizerBase``, an ``"AutoTokenizer:<model_id>"`` string, a
            vocabulary ``dict`` (``str->int``), or a 2-element tuple/list
            ``(input_transform, target_transform)`` to differentiate input and
            target paths. Defaults to ``None``.
        proc_to_stream_transforms: Transform applied to processor output before it
            is emitted back into the stream. Accepts a callable, a
            ``PreTrainedTokenizerBase``, an ``"AutoTokenizer:<model_id>"`` string,
            or a vocabulary ``list`` (``int->str``). Defaults to ``None``.
        delta: Minimum time interval in seconds between consecutive stream samples.
            Values <= 0 indicate real-time (as-fast-as-possible) streaming.
            Defaults to ``-1``.
        pubsub: When ``True``, the stream is routed through a Pub/Sub topic rather
            than a direct peer-to-peer channel. Defaults to ``False``.
        public: When ``True``, the stream descriptor targets the public network
            interface. Defaults to ``False``.

    Raises:
        AssertionError: If ``data_type`` is not in ``VALID_DATA_TYPES``.
        AssertionError: If ``data_desc`` is not a string.
        AssertionError: If ``stream_to_proc_transforms`` has an unsupported type.
        AssertionError: If ``proc_to_stream_transforms`` has an unsupported type.
        AssertionError: If ``tensor_shape`` is missing or invalid for tensor streams.
        AssertionError: If ``tensor_dtype`` is missing or invalid for tensor streams.
        AssertionError: If ``tensor_labels`` has an unsupported format.
        AssertionError: If tensor-related arguments are non-None for non-tensor types.
        AssertionError: If tokenizer transforms are used with image or file streams.
        AssertionError: If ``name`` or ``group`` contain the reserved character ``"~"``.
    """

    # Checking data type
    assert data_type in DataProps.VALID_DATA_TYPES, "Invalid data type"
    assert isinstance(data_desc, str), "Invalid data description"

    # Checking transformations
    assert (stream_to_proc_transforms is None or
            isinstance(stream_to_proc_transforms, str) or
            isinstance(stream_to_proc_transforms, PreTrainedTokenizerBase) or
            callable(stream_to_proc_transforms) or
            isinstance(stream_to_proc_transforms, dict) or
            isinstance(stream_to_proc_transforms, tuple) or
            isinstance(stream_to_proc_transforms, list)), \
        "Invalid stream to processor transforms"

    if stream_to_proc_transforms is not None:
        if not isinstance(stream_to_proc_transforms, list) and not isinstance(stream_to_proc_transforms, tuple):
            self.stream_to_proc_transforms = [stream_to_proc_transforms, stream_to_proc_transforms]
        else:
            assert len(stream_to_proc_transforms) == 2, \
                "Expected a list with two sets of transforms (input, target)"
            self.stream_to_proc_transforms = stream_to_proc_transforms
        self.__original_stream_to_proc_transforms = stream_to_proc_transforms
    else:
        self.stream_to_proc_transforms = None
        self.__original_stream_to_proc_transforms = None

    assert (proc_to_stream_transforms is None or
            isinstance(proc_to_stream_transforms, str) or
            isinstance(proc_to_stream_transforms, PreTrainedTokenizerBase) or
            callable(proc_to_stream_transforms) or
            isinstance(proc_to_stream_transforms, list)), \
        "Invalid stream to processor transforms"

    self.proc_to_stream_transforms = proc_to_stream_transforms
    self.__original_proc_to_stream_transforms = proc_to_stream_transforms

    # Setting data type and description
    self.data_type = data_type
    self.data_desc = data_desc

    # Setting empty attributes
    self.tensor_shape = None
    self.tensor_dtype = None
    self.tensor_labels = None

    # Checking data in function of its type
    if self.is_tensor():

        # Checking shape
        assert (tensor_shape is not None and
                isinstance(tensor_shape, (tuple, list))), f"Invalid shape for DataProps: {tensor_shape}"
        assert all(x is None or isinstance(x, int) for x in tensor_shape), \
            f"Invalid shape for DataProps: {tensor_shape}"

        # Setting shape
        self.tensor_shape = tuple(tensor_shape)  # Forcing (important)

        # Checking dtype
        assert (tensor_dtype is not None and
                (isinstance(tensor_dtype, torch.dtype) or isinstance(tensor_dtype, str)
                 and tensor_dtype.startswith("torch."))), \
            f"Invalid tensor type: {tensor_dtype}"

        # Setting dtype
        self.tensor_dtype = tensor_dtype if isinstance(tensor_dtype, torch.dtype) else eval(tensor_dtype)

        # Checking labels
        assert tensor_labels is None or (isinstance(tensor_labels, list) or
                                         (isinstance(tensor_labels, str) and
                                          tensor_labels.startswith("AutoTokenizer:"))), \
            f"Invalid tensor labels: {tensor_labels}"

        # Setting labels
        if tensor_labels is not None:
            if not (isinstance(tensor_labels, str) and tensor_labels.startswith("AutoTokenizer:")):
                self.tensor_labels = TensorLabels(self, labels=tensor_labels, labeling_rule=tensor_labeling_rule)
            else:
                self.set_tensor_labels_from_auto_tokenizer(tensor_labels.split(":")[1])

    elif self.is_img():

        # Ensuring other type-related tools are not set
        assert tensor_shape is None and tensor_labels is None and tensor_dtype is None, \
            f"Tensor-related arguments must be None when using a DataProps of type {data_type}"
        assert (self.stream_to_proc_transforms is None or (not isinstance(self.stream_to_proc_transforms, str)
                                                           and not isinstance(self.stream_to_proc_transforms,
                                                                              PreTrainedTokenizerBase))), \
            "Non-image-related transforms were selected"
        assert (self.proc_to_stream_transforms is None or (not isinstance(self.proc_to_stream_transforms, str)
                                                           and not isinstance(self.proc_to_stream_transforms,
                                                                              PreTrainedTokenizerBase)
                                                           and not isinstance(self.proc_to_stream_transforms,
                                                                              list))), \
            "Non-image-related transforms were selected"

    elif self.is_text():

        # Ensuring other type-related tools are not set
        assert tensor_shape is None and tensor_labels is None and tensor_dtype is None, \
            f"Tensor/image-related arguments must be None when using a DataProps of type {data_type}"

        # Setting text to tensor transform (tokenizer in encode mode) (if given)
        if self.stream_to_proc_transforms is not None:
            for j, _tttt in enumerate(self.stream_to_proc_transforms):
                assert ((isinstance(_tttt, str) and _tttt.startswith("AutoTokenizer:")) or
                        isinstance(_tttt, PreTrainedTokenizerBase) or
                        isinstance(_tttt, dict) or
                        callable(_tttt)), \
                    ("Invalid text tokenizer: expected object of type PreTrainedTokenizerBase or a "
                     "string starting with 'AutoTokenizer:' or a callable object or a dictionary "
                     "(vocabulary str->int)")
                if isinstance(_tttt, str) and _tttt.startswith("AutoTokenizer:"):
                    self.stream_to_proc_transforms[j] = AutoTokenizer.from_pretrained(_tttt.split(":")[1])

        # Setting tensor to text transform (tokenizer in decode mode OR a given vocabulary int->str) (if given)
        if self.proc_to_stream_transforms is not None:
            assert ((isinstance(self.proc_to_stream_transforms, str) and
                     self.proc_to_stream_transforms.startswith("AutoTokenizer:")) or
                    isinstance(self.proc_to_stream_transforms, PreTrainedTokenizerBase) or
                    isinstance(self.proc_to_stream_transforms, list) or
                    callable(self.proc_to_stream_transforms)), \
                ("Invalid text tokenizer: expected object of type PreTrainedTokenizerBase or a "
                 "string starting with 'AutoTokenizer:' or a callable object or a dictionary "
                 "(vocabulary int->str)")
            if (isinstance(self.proc_to_stream_transforms, str) and
                    self.proc_to_stream_transforms.startswith("AutoTokenizer:")):
                self.proc_to_stream_transforms = (
                    AutoTokenizer.from_pretrained(self.proc_to_stream_transforms.split(":")[1]))

    elif self.is_file():
        # Ensuring other type-related tools are not set
        assert tensor_shape is None and tensor_labels is None and tensor_dtype is None, \
            f"Tensor-related arguments must be None when using a DataProps of type {data_type}"

        # Files usually don't use standard transforms, but we allow custom callables if needed.
        # We strictly forbid PreTrainedTokenizerBase as it makes no sense for binary files.
        assert (self.stream_to_proc_transforms is None or 
                (not isinstance(self.stream_to_proc_transforms, PreTrainedTokenizerBase) and 
                 not isinstance(self.stream_to_proc_transforms, str))), \
            "Tokenizers cannot be used as transforms for file streams"

    # Checking name and group
    assert "~" not in name, "Invalid chars in stream name"
    assert "~" not in group, "Invalid chars in group name"

    # Initialize properties
    self.name = name
    self.group = group
    self.delta = delta
    self.pubsub = pubsub
    self.public = public

VALID_DATA_TYPES class-attribute instance-attribute

VALID_DATA_TYPES = ('tensor', 'img', 'text', 'file', 'all')

stream_to_proc_transforms instance-attribute

stream_to_proc_transforms = [stream_to_proc_transforms, stream_to_proc_transforms]

proc_to_stream_transforms instance-attribute

proc_to_stream_transforms = proc_to_stream_transforms

data_type instance-attribute

data_type = data_type

data_desc instance-attribute

data_desc = data_desc

tensor_shape instance-attribute

tensor_shape = None

tensor_dtype instance-attribute

tensor_dtype = None

tensor_labels instance-attribute

tensor_labels = None

name instance-attribute

name = name

group instance-attribute

group = group

delta instance-attribute

delta = delta

pubsub instance-attribute

pubsub = pubsub

public instance-attribute

public = public

to_dict

to_dict() -> dict

Serialize this DataProps to a JSON-compatible dictionary.

Converts non-serializable attributes to safe primitive types: torch.dtype values are stringified (e.g. "torch.float32"), and TensorLabels objects are replaced by the dictionary returned from TensorLabels.to_dict. Transform callables are intentionally excluded because they cannot be reliably serialized; they must be re-supplied on deserialization via from_dict and then configured separately.

Returns:

Type Description
dict

A dict with the following keys: name, group, data_type,

dict

data_desc, tensor_shape, tensor_dtype, tensor_labels,

dict

delta, pubsub, and public.

Source code in unaiverse/streams/dataprops.py
def to_dict(self) -> dict:
    """Serialize this ``DataProps`` to a JSON-compatible dictionary.

    Converts non-serializable attributes to safe primitive types: ``torch.dtype``
    values are stringified (e.g. ``"torch.float32"``), and ``TensorLabels``
    objects are replaced by the dictionary returned from ``TensorLabels.to_dict``.
    Transform callables are intentionally excluded because they cannot be reliably
    serialized; they must be re-supplied on deserialization via ``from_dict`` and
    then configured separately.

    Returns:
        A ``dict`` with the following keys: ``name``, ``group``, ``data_type``,
        ``data_desc``, ``tensor_shape``, ``tensor_dtype``, ``tensor_labels``,
        ``delta``, ``pubsub``, and ``public``.
    """
    return {
        'name': self.name,
        'group': self.group,
        'data_type': self.data_type,
        'data_desc': self.data_desc,
        'tensor_shape': self.tensor_shape,
        'tensor_dtype': str(self.tensor_dtype) if self.tensor_dtype is not None else None,
        'tensor_labels': self.tensor_labels.to_dict() if self.tensor_labels is not None else None,
        'delta': self.delta,
        'pubsub': self.pubsub,
        'public': self.public
    }

from_dict staticmethod

from_dict(d_props: dict) -> DataProps

Reconstruct a DataProps instance from a serialized dictionary.

This is the inverse of to_dict. It reads the tensor_labels sub-dict to restore the label list and labeling rule, then passes all remaining fields to the DataProps constructor. Transform functions are not serialized, so the returned instance has stream_to_proc_transforms and proc_to_stream_transforms both set to None and must be re-attached manually when required.

Parameters:

Name Type Description Default
d_props dict

Dictionary previously produced by to_dict, containing at minimum the keys name, group, data_type, data_desc, tensor_shape, tensor_dtype, tensor_labels, delta, pubsub, and public.

required

Returns:

Type Description
DataProps

A new DataProps instance whose attributes match the serialized state.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def from_dict(d_props: dict) -> 'DataProps':
    """Reconstruct a ``DataProps`` instance from a serialized dictionary.

    This is the inverse of ``to_dict``. It reads the ``tensor_labels`` sub-dict
    to restore the label list and labeling rule, then passes all remaining fields
    to the ``DataProps`` constructor. Transform functions are not serialized, so the
    returned instance has ``stream_to_proc_transforms`` and
    ``proc_to_stream_transforms`` both set to ``None`` and must be re-attached
    manually when required.

    Args:
        d_props: Dictionary previously produced by ``to_dict``, containing at
            minimum the keys ``name``, ``group``, ``data_type``, ``data_desc``,
            ``tensor_shape``, ``tensor_dtype``, ``tensor_labels``, ``delta``,
            ``pubsub``, and ``public``.

    Returns:
        A new ``DataProps`` instance whose attributes match the serialized state.
    """
    d_labels = d_props['tensor_labels']
    return DataProps(name=d_props['name'],
                     group=d_props['group'],
                     data_type=d_props['data_type'],
                     data_desc=d_props['data_desc'],
                     tensor_shape=d_props['tensor_shape'],
                     tensor_dtype=d_props['tensor_dtype'],
                     tensor_labels=d_labels['labels'] if d_labels is not None else None,
                     tensor_labeling_rule=d_labels['labeling_rule'] if d_labels is not None else "max",
                     delta=d_props['delta'],
                     pubsub=d_props['pubsub'],
                     public=d_props['public'])

clone

clone() -> DataProps

Return an independent deep copy of this DataProps instance.

Constructs a new DataProps using the same arguments as the original, including the original transform objects stored in the private __original_stream_to_proc_transforms and __original_proc_to_stream_transforms attributes. This avoids re-downloading AutoTokenizer models or re-evaluating transforms. The cloned object shares no mutable state with the original.

Returns:

Type Description
DataProps

A new DataProps instance that is functionally identical to the original,

DataProps

with independently copied tensor shape and label information.

Source code in unaiverse/streams/dataprops.py
def clone(self) -> 'DataProps':
    """Return an independent deep copy of this ``DataProps`` instance.

    Constructs a new ``DataProps`` using the same arguments as the original,
    including the original transform objects stored in the private
    ``__original_stream_to_proc_transforms`` and
    ``__original_proc_to_stream_transforms`` attributes. This avoids re-downloading
    AutoTokenizer models or re-evaluating transforms. The cloned object shares no
    mutable state with the original.

    Returns:
        A new ``DataProps`` instance that is functionally identical to the original,
        with independently copied tensor shape and label information.
    """
    return DataProps(name=self.name,
                     group=self.group,
                     data_type=self.data_type,
                     data_desc=self.data_desc,
                     tensor_shape=self.tensor_shape,
                     tensor_dtype=self.tensor_dtype,
                     tensor_labels=self.tensor_labels.labels if self.tensor_labels is not None else None,
                     tensor_labeling_rule=self.tensor_labels.original_labeling_rule
                     if self.tensor_labels is not None else "max",
                     stream_to_proc_transforms=self.__original_stream_to_proc_transforms,
                     proc_to_stream_transforms=self.__original_proc_to_stream_transforms,
                     delta=self.delta,
                     pubsub=self.pubsub,
                     public=self.public)

get_name

get_name() -> str

Return the name of this data stream.

Returns:

Type Description
str

The name string assigned at construction or via set_name.

Source code in unaiverse/streams/dataprops.py
def get_name(self) -> str:
    """Return the name of this data stream.

    Returns:
        The ``name`` string assigned at construction or via ``set_name``.
    """

    return self.name

get_group

get_group() -> str

Return the group name of this data stream.

The group name is used by the network layer to multiplex several related streams over a single channel. When no grouping is desired the value is "none".

Returns:

Type Description
str

The group string assigned at construction or via set_group.

Source code in unaiverse/streams/dataprops.py
def get_group(self) -> str:
    """Return the group name of this data stream.

    The group name is used by the network layer to multiplex several related streams
    over a single channel. When no grouping is desired the value is ``"none"``.

    Returns:
        The ``group`` string assigned at construction or via ``set_group``.
    """

    return self.group

get_description

get_description() -> str

Return the human-readable description of this stream's data.

Returns:

Type Description
str

The data_desc string assigned at construction or via set_description.

Source code in unaiverse/streams/dataprops.py
def get_description(self) -> str:
    """Return the human-readable description of this stream's data.

    Returns:
        The ``data_desc`` string assigned at construction or via ``set_description``.
    """
    return self.data_desc

get_tensor_labels

get_tensor_labels() -> list[str] | None

Return the list of tensor label strings, or None if no labels are set.

Labels correspond to the feature dimension of a 2-D tensor stream and are stored internally in a TensorLabels wrapper. This method unwraps that wrapper and returns the raw label list.

Returns:

Type Description
list[str] | None

A list[str] of label names when tensor_labels is set, or None

list[str] | None

when the stream has no labels (e.g. for non-tensor or unlabeled tensor

list[str] | None

streams).

Source code in unaiverse/streams/dataprops.py
def get_tensor_labels(self) -> list[str] | None:
    """Return the list of tensor label strings, or ``None`` if no labels are set.

    Labels correspond to the feature dimension of a 2-D tensor stream and are stored
    internally in a ``TensorLabels`` wrapper. This method unwraps that wrapper and
    returns the raw label list.

    Returns:
        A ``list[str]`` of label names when ``tensor_labels`` is set, or ``None``
        when the stream has no labels (e.g. for non-tensor or unlabeled tensor
        streams).
    """
    return self.tensor_labels.labels if self.tensor_labels is not None else None

set_name

set_name(name: str) -> None

Set the name of this data stream.

The "~" character is reserved by the networking layer for internal hash construction and is therefore forbidden in stream names.

Parameters:

Name Type Description Default
name str

New name for the stream. Must not contain "~".

required

Raises:

Type Description
AssertionError

If name contains the reserved character "~".

Source code in unaiverse/streams/dataprops.py
def set_name(self, name: str) -> None:
    """Set the name of this data stream.

    The ``"~"`` character is reserved by the networking layer for internal hash
    construction and is therefore forbidden in stream names.

    Args:
        name: New name for the stream. Must not contain ``"~"``.

    Raises:
        AssertionError: If ``name`` contains the reserved character ``"~"``.
    """
    assert "~" not in name, "Invalid chars in stream name"
    self.name = name

set_group

set_group(group: str) -> None

Set the group name of this data stream.

The "~" character is reserved by the networking layer and is therefore forbidden in group names. Use "none" to indicate that the stream does not belong to any group.

Parameters:

Name Type Description Default
group str

New group name for the stream. Must not contain "~".

required

Raises:

Type Description
AssertionError

If group contains the reserved character "~".

Source code in unaiverse/streams/dataprops.py
def set_group(self, group: str) -> None:
    """Set the group name of this data stream.

    The ``"~"`` character is reserved by the networking layer and is therefore
    forbidden in group names. Use ``"none"`` to indicate that the stream does not
    belong to any group.

    Args:
        group: New group name for the stream. Must not contain ``"~"``.

    Raises:
        AssertionError: If ``group`` contains the reserved character ``"~"``.
    """
    assert "~" not in group, "Invalid chars in group name"
    self.group = group

set_description

set_description(desc: str) -> None

Set the human-readable description of this stream's data.

Replaces the current data_desc value in place. The description is included in the dictionary produced by to_dict and is intended for display in dashboards and API documentation.

Parameters:

Name Type Description Default
desc str

New free-text description of the stream contents. Must be a string.

required

Raises:

Type Description
AssertionError

If desc is not a string (checked only at construction; this setter does not re-validate the type at call time).

Source code in unaiverse/streams/dataprops.py
def set_description(self, desc: str) -> None:
    """Set the human-readable description of this stream's data.

    Replaces the current ``data_desc`` value in place. The description is
    included in the dictionary produced by ``to_dict`` and is intended for
    display in dashboards and API documentation.

    Args:
        desc: New free-text description of the stream contents. Must be a string.

    Raises:
        AssertionError: If ``desc`` is not a string (checked only at construction;
            this setter does not re-validate the type at call time).
    """
    self.data_desc = desc

set_public

set_public(public: bool) -> None

Set whether this stream descriptor targets the public network interface.

When public is True, the stream is announced on the public-facing network channel of the UNaIVERSE peer. When False, it remains private (peer-to-peer only). This flag is used by the networking layer when constructing stream advertisements and routing messages.

Parameters:

Name Type Description Default
public bool

True to mark the stream as public; False for private.

required
Source code in unaiverse/streams/dataprops.py
def set_public(self, public: bool) -> None:
    """Set whether this stream descriptor targets the public network interface.

    When ``public`` is ``True``, the stream is announced on the public-facing
    network channel of the UNaIVERSE peer. When ``False``, it remains private
    (peer-to-peer only). This flag is used by the networking layer when
    constructing stream advertisements and routing messages.

    Args:
        public: ``True`` to mark the stream as public; ``False`` for private.
    """
    self.public = public

set_pubsub

set_pubsub(pubsub: bool) -> None

Set whether this stream is routed through a Pub/Sub topic.

When pubsub is True, data for this stream is exchanged via a publish-subscribe topic rather than a direct peer-to-peer channel. This affects the network hash produced by net_hash, which embeds "::ps:" for Pub/Sub streams and "::dm:" for direct-message streams.

Parameters:

Name Type Description Default
pubsub bool

True to route through a Pub/Sub topic; False for direct peer-to-peer messaging.

required
Source code in unaiverse/streams/dataprops.py
def set_pubsub(self, pubsub: bool) -> None:
    """Set whether this stream is routed through a Pub/Sub topic.

    When ``pubsub`` is ``True``, data for this stream is exchanged via a
    publish-subscribe topic rather than a direct peer-to-peer channel. This
    affects the network hash produced by ``net_hash``, which embeds ``"::ps:"``
    for Pub/Sub streams and ``"::dm:"`` for direct-message streams.

    Args:
        pubsub: ``True`` to route through a Pub/Sub topic; ``False`` for direct
            peer-to-peer messaging.
    """
    self.pubsub = pubsub

set_delta

set_delta(delta: float) -> None

Set the minimum time interval between consecutive stream samples.

The delta controls how frequently data is emitted on this stream. Values less than or equal to zero indicate real-time (as-fast-as-possible) delivery with no enforced pacing. Positive values specify the minimum number of seconds that must elapse between two successive samples.

Parameters:

Name Type Description Default
delta float

Minimum inter-sample interval in seconds. Values <= 0 disable rate limiting.

required
Source code in unaiverse/streams/dataprops.py
def set_delta(self, delta: float) -> None:
    """Set the minimum time interval between consecutive stream samples.

    The delta controls how frequently data is emitted on this stream. Values
    less than or equal to zero indicate real-time (as-fast-as-possible) delivery
    with no enforced pacing. Positive values specify the minimum number of seconds
    that must elapse between two successive samples.

    Args:
        delta: Minimum inter-sample interval in seconds. Values <= 0 disable
            rate limiting.
    """
    self.delta = delta

set_stream_to_proc_transforms

set_stream_to_proc_transforms(t) -> None

Set the transform applied to incoming stream data before the processor receives it.

Replaces both the active transform (stream_to_proc_transforms) and the original transform reference (__original_stream_to_proc_transforms) used by clone to preserve the correct transform when duplicating the descriptor.

Accepted transform types mirror those supported at construction time:

  • None - no transformation; raw data is passed to the processor unchanged.
  • A callable - applied directly to each sample.
  • A PreTrainedTokenizerBase instance - encodes text to token-ID tensors.
  • A str starting with "AutoTokenizer:" - the tokenizer is resolved lazily by the networking layer.
  • A dict (str -> int) vocabulary mapping for text streams.
  • A 2-element tuple or list (input_transform, target_transform) to supply different transforms for inputs and targets.

Parameters:

Name Type Description Default
t

New stream-to-processor transform, or None to disable transformation.

required
Source code in unaiverse/streams/dataprops.py
def set_stream_to_proc_transforms(self, t) -> None:
    """Set the transform applied to incoming stream data before the processor receives it.

    Replaces both the active transform (``stream_to_proc_transforms``) and the
    original transform reference (``__original_stream_to_proc_transforms``) used
    by ``clone`` to preserve the correct transform when duplicating the descriptor.

    Accepted transform types mirror those supported at construction time:

    - ``None`` - no transformation; raw data is passed to the processor unchanged.
    - A callable - applied directly to each sample.
    - A ``PreTrainedTokenizerBase`` instance - encodes text to token-ID tensors.
    - A ``str`` starting with ``"AutoTokenizer:"`` - the tokenizer is resolved
      lazily by the networking layer.
    - A ``dict`` (``str -> int``) vocabulary mapping for text streams.
    - A 2-element ``tuple`` or ``list`` ``(input_transform, target_transform)``
      to supply different transforms for inputs and targets.

    Args:
        t: New stream-to-processor transform, or ``None`` to disable transformation.
    """
    self.stream_to_proc_transforms = t
    self.__original_stream_to_proc_transforms = t

set_proc_to_stream_transforms

set_proc_to_stream_transforms(t) -> None

Set the transform applied to processor output before it is emitted back into the stream.

Replaces both the active transform (proc_to_stream_transforms) and the original transform reference (__original_proc_to_stream_transforms) used by clone to preserve the correct transform when duplicating the descriptor.

Accepted transform types mirror those supported at construction time:

  • None - no transformation; processor output is forwarded unchanged.
  • A callable - applied directly to each output sample.
  • A PreTrainedTokenizerBase instance - decodes token-ID tensors to text.
  • A str starting with "AutoTokenizer:" - resolved lazily at postprocess time.
  • A list (int -> str) reverse-vocabulary mapping for text streams.

Parameters:

Name Type Description Default
t

New processor-to-stream transform, or None to disable transformation.

required
Source code in unaiverse/streams/dataprops.py
def set_proc_to_stream_transforms(self, t) -> None:
    """Set the transform applied to processor output before it is emitted back into the stream.

    Replaces both the active transform (``proc_to_stream_transforms``) and the
    original transform reference (``__original_proc_to_stream_transforms``) used
    by ``clone`` to preserve the correct transform when duplicating the descriptor.

    Accepted transform types mirror those supported at construction time:

    - ``None`` - no transformation; processor output is forwarded unchanged.
    - A callable - applied directly to each output sample.
    - A ``PreTrainedTokenizerBase`` instance - decodes token-ID tensors to text.
    - A ``str`` starting with ``"AutoTokenizer:"`` - resolved lazily at
      postprocess time.
    - A ``list`` (``int -> str``) reverse-vocabulary mapping for text streams.

    Args:
        t: New processor-to-stream transform, or ``None`` to disable transformation.
    """
    self.proc_to_stream_transforms = t
    self.__original_proc_to_stream_transforms = t

is_tensor

is_tensor() -> bool

Return True if this stream carries torch.Tensor data.

Returns:

Type Description
bool

True when data_type is "tensor"; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_tensor(self) -> bool:
    """Return ``True`` if this stream carries ``torch.Tensor`` data.

    Returns:
        ``True`` when ``data_type`` is ``"tensor"``; ``False`` otherwise.
    """
    return self.data_type == "tensor"

is_img

is_img() -> bool

Return True if this stream carries PIL.Image.Image data.

Returns:

Type Description
bool

True when data_type is "img"; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_img(self) -> bool:
    """Return ``True`` if this stream carries ``PIL.Image.Image`` data.

    Returns:
        ``True`` when ``data_type`` is ``"img"``; ``False`` otherwise.
    """
    return self.data_type == "img"

is_text

is_text() -> bool

Return True if this stream carries plain string data.

Returns:

Type Description
bool

True when data_type is "text"; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_text(self) -> bool:
    """Return ``True`` if this stream carries plain string data.

    Returns:
        ``True`` when ``data_type`` is ``"text"``; ``False`` otherwise.
    """
    return self.data_type == "text"

is_file

is_file() -> bool

Return True if this stream carries binary FileContainer data.

Returns:

Type Description
bool

True when data_type is "file"; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_file(self) -> bool:
    """Return ``True`` if this stream carries binary ``FileContainer`` data.

    Returns:
        ``True`` when ``data_type`` is ``"file"``; ``False`` otherwise.
    """
    return self.data_type == "file"

is_tensor_long

is_tensor_long() -> bool

Return True if the tensor dtype is torch.long.

Returns False when tensor_dtype is None (i.e. for non-tensor streams) rather than raising an exception.

Returns:

Type Description
bool

True if tensor_dtype equals torch.long; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_tensor_long(self) -> bool:
    """Return ``True`` if the tensor dtype is ``torch.long``.

    Returns ``False`` when ``tensor_dtype`` is ``None`` (i.e. for non-tensor
    streams) rather than raising an exception.

    Returns:
        ``True`` if ``tensor_dtype`` equals ``torch.long``; ``False`` otherwise.
    """
    return self.tensor_dtype == torch.long if self.tensor_dtype is not None else False

is_tensor_float

is_tensor_float() -> bool

Return True if the tensor dtype is a floating-point type.

Detects any torch.float* variant (e.g. torch.float16, torch.float32, torch.float64) by checking the string representation of tensor_dtype. Returns False when tensor_dtype is None (i.e. for non-tensor streams) rather than raising an exception.

Returns:

Type Description
bool

True if the string form of tensor_dtype starts with

bool

"torch.float"; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_tensor_float(self) -> bool:
    """Return ``True`` if the tensor dtype is a floating-point type.

    Detects any ``torch.float*`` variant (e.g. ``torch.float16``,
    ``torch.float32``, ``torch.float64``) by checking the string representation
    of ``tensor_dtype``. Returns ``False`` when ``tensor_dtype`` is ``None``
    (i.e. for non-tensor streams) rather than raising an exception.

    Returns:
        ``True`` if the string form of ``tensor_dtype`` starts with
        ``"torch.float"``; ``False`` otherwise.
    """
    return str(self.tensor_dtype).startswith("torch.float") if self.tensor_dtype is not None else False

is_tensor_img

is_tensor_img() -> bool

Return True if the tensor shape follows a standard image layout.

A tensor is considered image-shaped when it is 4-dimensional (batch, channels, height, width) and the channel dimension is either 1 (grayscale) or 3 (RGB). Returns False when tensor_shape is None (i.e. for non-tensor streams) rather than raising an exception.

Returns:

Type Description
bool

True if tensor_shape has four dimensions and

bool

tensor_shape[1] is 1 or 3; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_tensor_img(self) -> bool:
    """Return ``True`` if the tensor shape follows a standard image layout.

    A tensor is considered image-shaped when it is 4-dimensional
    ``(batch, channels, height, width)`` and the channel dimension is either
    ``1`` (grayscale) or ``3`` (RGB). Returns ``False`` when ``tensor_shape``
    is ``None`` (i.e. for non-tensor streams) rather than raising an exception.

    Returns:
        ``True`` if ``tensor_shape`` has four dimensions and
        ``tensor_shape[1]`` is ``1`` or ``3``; ``False`` otherwise.
    """
    return len(self.tensor_shape) == 4 and (self.tensor_shape[1] == 1 or self.tensor_shape[1] == 3) \
        if self.tensor_shape is not None else False

is_tensor_token_ids

is_tensor_token_ids() -> bool

Return True if the tensor represents a sequence of token IDs.

A tensor is considered a token-ID sequence when its dtype is torch.long and its shape is 2-D (batch, seq_len) where seq_len >= 1 or is None (variable-length). Returns False when tensor_shape is None (i.e. for non-tensor streams) rather than raising an exception.

Returns:

Type Description
bool

True when tensor_dtype is torch.long, the shape has exactly

bool

two dimensions, and tensor_shape[1] is at least 1 or None;

bool

False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_tensor_token_ids(self) -> bool:
    """Return ``True`` if the tensor represents a sequence of token IDs.

    A tensor is considered a token-ID sequence when its dtype is ``torch.long``
    and its shape is 2-D ``(batch, seq_len)`` where ``seq_len >= 1`` or is
    ``None`` (variable-length). Returns ``False`` when ``tensor_shape`` is
    ``None`` (i.e. for non-tensor streams) rather than raising an exception.

    Returns:
        ``True`` when ``tensor_dtype`` is ``torch.long``, the shape has exactly
        two dimensions, and ``tensor_shape[1]`` is at least ``1`` or ``None``;
        ``False`` otherwise.
    """
    return (self.tensor_dtype == torch.long and
            len(self.tensor_shape) == 2 and (self.tensor_shape[1] >= 1 or self.tensor_shape[1] is None)) \
        if self.tensor_shape is not None else False

is_tensor_target_id

is_tensor_target_id() -> bool

Return True if the tensor represents a single scalar target ID.

A tensor is considered a target ID when its dtype is torch.long and its shape is 1-D (i.e. len(tensor_shape) == 1). This is the typical layout for a class-label target vector without an explicit batch dimension. Returns False when tensor_shape is None (i.e. for non-tensor streams) rather than raising an exception.

Returns:

Type Description
bool

True when tensor_dtype is torch.long and the shape has

bool

exactly one dimension; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_tensor_target_id(self) -> bool:
    """Return ``True`` if the tensor represents a single scalar target ID.

    A tensor is considered a target ID when its dtype is ``torch.long`` and its
    shape is 1-D (i.e. ``len(tensor_shape) == 1``). This is the typical layout
    for a class-label target vector without an explicit batch dimension.
    Returns ``False`` when ``tensor_shape`` is ``None`` (i.e. for non-tensor
    streams) rather than raising an exception.

    Returns:
        ``True`` when ``tensor_dtype`` is ``torch.long`` and the shape has
        exactly one dimension; ``False`` otherwise.
    """
    return (self.tensor_dtype == torch.long and
            len(self.tensor_shape) == 1) \
        if self.tensor_shape is not None else False

is_all

is_all() -> bool

Return True if this stream uses the wildcard "all" data type.

Streams of type "all" bypass data-type validation and accept any Python object in check_and_preprocess and check_and_postprocess. This is useful for pass-through or debugging streams where the exact data type is unknown or variable.

Returns:

Type Description
bool

True when data_type is "all"; False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_all(self) -> bool:
    """Return ``True`` if this stream uses the wildcard ``"all"`` data type.

    Streams of type ``"all"`` bypass data-type validation and accept any Python
    object in ``check_and_preprocess`` and ``check_and_postprocess``. This is
    useful for pass-through or debugging streams where the exact data type is
    unknown or variable.

    Returns:
        ``True`` when ``data_type`` is ``"all"``; ``False`` otherwise.
    """
    return self.data_type == "all"

net_hash

net_hash(prefix: str) -> str

Return the network-level routing hash for this stream.

Delegates to build_net_hash using the stream's pubsub flag and the result of name_or_group. The hash embeds whether the channel is a publish-subscribe topic ("::ps:") or a direct-message channel ("::dm:") so the networking layer can route messages without inspecting the full descriptor.

The inverse operations are available via peer_id_from_net_hash and name_or_group_from_net_hash.

Parameters:

Name Type Description Default
prefix str

The local peer's ID, used as the leading component of the hash.

required

Returns:

Type Description
str

A string of the form "<prefix>::ps:<name_or_group>" for Pub/Sub

str

streams or "<prefix>::dm:<name_or_group>" for direct-message streams.

Examples:

>>> dp = DataProps(name="sensor", group="none", data_type="text", pubsub=False)
>>> dp.net_hash("peer123")
'peer123::dm:sensor'
Source code in unaiverse/streams/dataprops.py
def net_hash(self, prefix: str) -> str:
    """Return the network-level routing hash for this stream.

    Delegates to ``build_net_hash`` using the stream's ``pubsub`` flag and the
    result of ``name_or_group``. The hash embeds whether the channel is a
    publish-subscribe topic (``"::ps:"``) or a direct-message channel
    (``"::dm:"``) so the networking layer can route messages without inspecting
    the full descriptor.

    The inverse operations are available via ``peer_id_from_net_hash`` and
    ``name_or_group_from_net_hash``.

    Args:
        prefix: The local peer's ID, used as the leading component of the hash.

    Returns:
        A string of the form ``"<prefix>::ps:<name_or_group>"`` for Pub/Sub
        streams or ``"<prefix>::dm:<name_or_group>"`` for direct-message streams.

    Examples:
        >>> dp = DataProps(name="sensor", group="none", data_type="text", pubsub=False)
        >>> dp.net_hash("peer123")
        'peer123::dm:sensor'
    """
    return DataProps.build_net_hash(prefix, self.pubsub, self.name_or_group())

user_hash

user_hash(prefix: str) -> str

Return the user-facing identifier hash for this stream.

Delegates to build_user_hash using the stream's name. Unlike net_hash, the user hash always uses the stream name (never the group) and does not encode the Pub/Sub flag. It is intended for display and subscription lookups visible to end users.

The inverse operations are available via peer_id_from_user_hash and name_from_user_hash.

Parameters:

Name Type Description Default
prefix str

The local peer's ID, used as the leading component of the hash.

required

Returns:

Type Description
str

A string of the form "<prefix>:<name>".

Examples:

>>> dp = DataProps(name="sensor", group="none", data_type="text")
>>> dp.user_hash("peer123")
'peer123:sensor'
Source code in unaiverse/streams/dataprops.py
def user_hash(self, prefix: str) -> str:
    """Return the user-facing identifier hash for this stream.

    Delegates to ``build_user_hash`` using the stream's ``name``. Unlike
    ``net_hash``, the user hash always uses the stream name (never the group)
    and does not encode the Pub/Sub flag. It is intended for display and
    subscription lookups visible to end users.

    The inverse operations are available via ``peer_id_from_user_hash`` and
    ``name_from_user_hash``.

    Args:
        prefix: The local peer's ID, used as the leading component of the hash.

    Returns:
        A string of the form ``"<prefix>:<name>"``.

    Examples:
        >>> dp = DataProps(name="sensor", group="none", data_type="text")
        >>> dp.user_hash("peer123")
        'peer123:sensor'
    """
    return DataProps.build_user_hash(prefix, self.name)

peer_id_from_net_hash staticmethod

peer_id_from_net_hash(net_hash: str) -> str

Extract the peer ID from a network hash string.

Network hashes have the form "<peer_id>::ps:<name>" or "<peer_id>::dm:<name>". This method splits on "::" and returns the first component, which is the originating peer ID.

See also net_hash and name_or_group_from_net_hash.

Parameters:

Name Type Description Default
net_hash str

A network hash previously produced by build_net_hash or net_hash.

required

Returns:

Type Description
str

The peer ID string embedded at the beginning of the hash.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def peer_id_from_net_hash(net_hash: str) -> str:
    """Extract the peer ID from a network hash string.

    Network hashes have the form ``"<peer_id>::ps:<name>"`` or
    ``"<peer_id>::dm:<name>"``. This method splits on ``"::"`` and returns the
    first component, which is the originating peer ID.

    See also ``net_hash`` and ``name_or_group_from_net_hash``.

    Args:
        net_hash: A network hash previously produced by ``build_net_hash`` or
            ``net_hash``.

    Returns:
        The peer ID string embedded at the beginning of the hash.
    """
    return net_hash.split("::")[0]

peer_id_from_user_hash staticmethod

peer_id_from_user_hash(user_hash: str) -> str

Extract the peer ID from a user hash string.

User hashes have the form "<peer_id>:<stream_name>". This method splits on the last ":" (using rsplit with maxsplit=1) and returns the first component, which is the originating peer ID. Using the rightmost separator correctly handles peer IDs that themselves contain colons.

See also user_hash and name_from_user_hash.

Parameters:

Name Type Description Default
user_hash str

A user hash previously produced by build_user_hash or user_hash.

required

Returns:

Type Description
str

The peer ID string embedded at the beginning of the hash.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def peer_id_from_user_hash(user_hash: str) -> str:
    """Extract the peer ID from a user hash string.

    User hashes have the form ``"<peer_id>:<stream_name>"``. This method
    splits on the last ``":"`` (using ``rsplit`` with ``maxsplit=1``) and
    returns the first component, which is the originating peer ID. Using the
    rightmost separator correctly handles peer IDs that themselves contain
    colons.

    See also ``user_hash`` and ``name_from_user_hash``.

    Args:
        user_hash: A user hash previously produced by ``build_user_hash`` or
            ``user_hash``.

    Returns:
        The peer ID string embedded at the beginning of the hash.
    """
    return user_hash.rsplit(":", 1)[0]

name_or_group_from_net_hash staticmethod

name_or_group_from_net_hash(net_hash: str) -> str

A static method to extract the name or group from a network hash.

Parameters:

Name Type Description Default
net_hash str

The network hash string.

required

Returns:

Type Description
str

A string representing the name or group.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def name_or_group_from_net_hash(net_hash: str) -> str:
    """A static method to extract the name or group from a network hash.

    Args:
        net_hash: The network hash string.

    Returns:
        A string representing the name or group.
    """
    return net_hash.split("::ps:")[1] if DataProps.is_pubsub_from_net_hash(net_hash) else net_hash.split("::dm:")[1]

name_from_user_hash staticmethod

name_from_user_hash(user_hash: str) -> str

A static method to extract the name from a user hash.

Parameters:

Name Type Description Default
user_hash str

The user hash string.

required

Returns:

Type Description
str

A string representing the name or group.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def name_from_user_hash(user_hash: str) -> str:
    """A static method to extract the name from a user hash.

    Args:
        user_hash: The user hash string.

    Returns:
        A string representing the name or group.
    """
    return user_hash.rsplit(":", 1)[1]

is_pubsub_from_net_hash staticmethod

is_pubsub_from_net_hash(net_hash: str) -> bool

A static method to check if a network hash belongs to a Pub/Sub stream.

Parameters:

Name Type Description Default
net_hash str

The network hash string.

required

Returns:

Type Description
bool

True if the hash is for a Pub/Sub stream, False otherwise.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def is_pubsub_from_net_hash(net_hash: str) -> bool:
    """A static method to check if a network hash belongs to a Pub/Sub stream.

    Args:
        net_hash: The network hash string.

    Returns:
        True if the hash is for a Pub/Sub stream, False otherwise.
    """
    return "::ps:" in net_hash

name_or_group

name_or_group() -> str

Retrieves the group name if it's set, otherwise defaults to the stream name.

Returns:

Type Description
str

A string representing the name or group.

Source code in unaiverse/streams/dataprops.py
def name_or_group(self) -> str:
    """Retrieves the group name if it's set, otherwise defaults to the stream name.

    Returns:
        A string representing the name or group.
    """
    group = self.get_group()
    return group if group != 'none' else self.get_name()

build_net_hash staticmethod

build_net_hash(prefix: str, pubsub: bool, name_or_group: str) -> str

A static method to construct a complete network hash from a prefix, Pub/Sub status, and name/group.

Parameters:

Name Type Description Default
prefix str

The peer ID prefix.

required
pubsub bool

The Pub/Sub status.

required
name_or_group str

The name or group of the stream.

required

Returns:

Type Description
str

The constructed network hash string.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def build_net_hash(prefix: str, pubsub: bool, name_or_group: str) -> str:
    """A static method to construct a complete network hash from a prefix, Pub/Sub status, and name/group.

    Args:
        prefix: The peer ID prefix.
        pubsub: The Pub/Sub status.
        name_or_group: The name or group of the stream.

    Returns:
        The constructed network hash string.
    """
    if pubsub:
        return f"{prefix}::ps:{name_or_group}"
    else:
        return f"{prefix}::dm:{name_or_group}"

build_user_hash staticmethod

build_user_hash(prefix: str, name: str) -> str

A static method to construct a complete user hash from a prefix and name.

Parameters:

Name Type Description Default
prefix str

The peer ID prefix.

required
name str

The name of the stream.

required

Returns:

Type Description
str

The constructed user hash string.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def build_user_hash(prefix: str, name: str) -> str:
    """A static method to construct a complete user hash from a prefix and name.

    Args:
        prefix: The peer ID prefix.
        name: The name of the stream.

    Returns:
        The constructed user hash string.
    """
    return f"{prefix}:{name}"

user_hash_from_net_hash staticmethod

user_hash_from_net_hash(net_hash: str, name: str) -> str

Generates a unique user hash for the stream using a provided network hash

Parameters:

Name Type Description Default
net_hash str

The network hash.

required
name str

Stream name.

required

Returns:

Type Description
str

A string representing the user hash.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def user_hash_from_net_hash(net_hash: str, name: str) -> str:
    """Generates a unique user hash for the stream using a provided network hash

    Args:
        net_hash: The network hash.
        name: Stream name.

    Returns:
        A string representing the user hash.
    """
    prefix = DataProps.peer_id_from_net_hash(net_hash)
    return DataProps.build_user_hash(prefix, name)

normalize_net_hash staticmethod

normalize_net_hash(not_normalized_net_hash: str) -> str

A static method that cleans up or normalizes a network hash string to a canonical format, particularly for direct messages.

Parameters:

Name Type Description Default
not_normalized_net_hash str

The network hash to normalize.

required

Returns:

Type Description
str

The normalized network hash string.

Source code in unaiverse/streams/dataprops.py
@staticmethod
def normalize_net_hash(not_normalized_net_hash: str) -> str:
    """A static method that cleans up or normalizes a network hash string to a canonical format, particularly
    for direct messages.

    Args:
        not_normalized_net_hash: The network hash to normalize.

    Returns:
        The normalized network hash string.
    """
    if not DataProps.is_pubsub_from_net_hash(not_normalized_net_hash):
        if "~" in not_normalized_net_hash:
            return not_normalized_net_hash.split("::dm:")[0] + "::dm:" + not_normalized_net_hash.split("~")[1]
        else:
            parts = not_normalized_net_hash.split("::dm:")
            return parts[0] + "::dm:" + parts[1].split("-")[1]
    else:
        return not_normalized_net_hash

is_pubsub

is_pubsub() -> bool

Checks if the stream is set to use Pub/Sub.

Returns:

Type Description
bool

True if it's a Pub/Sub stream, False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_pubsub(self) -> bool:
    """Checks if the stream is set to use Pub/Sub.

    Returns:
        True if it's a Pub/Sub stream, False otherwise.
    """
    return self.pubsub

is_public

is_public() -> bool

Checks if the stream is set to be public.

Returns:

Type Description
bool

True if it's a public stream, False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_public(self) -> bool:
    """Checks if the stream is set to be public.

    Returns:
        True if it's a public stream, False otherwise.
    """
    return self.public

set_tensor_labels_from_auto_tokenizer

set_tensor_labels_from_auto_tokenizer(model_id: str) -> None

Initializes and sets the tensor labels by fetching the vocabulary from a Hugging Face AutoTokenizer model ID.

Parameters:

Name Type Description Default
model_id str

The ID of the tokenizer model.

required
Source code in unaiverse/streams/dataprops.py
def set_tensor_labels_from_auto_tokenizer(self, model_id: str) -> None:
    """Initializes and sets the tensor labels by fetching the vocabulary from a Hugging Face `AutoTokenizer`
    model ID.

    Args:
        model_id: The ID of the tokenizer model.
    """
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    vocab_size = len(tokenizer.vocab)
    reverse_vocab_list: list[str | None] = [None] * vocab_size
    for i in range(vocab_size):
        reverse_vocab_list[i] = tokenizer.convert_ids_to_tokens(i)
    self.set_tensor_labels(reverse_vocab_list)

set_tensor_labels

set_tensor_labels(labels: list[str] | None, labeling_rule: str = 'max')

Sets the labels for the data.

Parameters:

Name Type Description Default
labels list[str] or None

List of labels to associate with the data.

required
labeling_rule str

The labeling rule for the labels.

'max'

Returns:

Type Description

None

Source code in unaiverse/streams/dataprops.py
def set_tensor_labels(self, labels: list[str] | None, labeling_rule: str = "max"):
    """Sets the labels for the data.

    Args:
        labels (list[str] or None): List of labels to associate with the data.
        labeling_rule (str): The labeling rule for the labels.

    Returns:
        None
    """
    self.tensor_labels = TensorLabels(self, labels=labels, labeling_rule=labeling_rule)

adapt_tensor_to_tensor_labels

adapt_tensor_to_tensor_labels(data: Tensor) -> Tensor

Interleaves data in function of its corresponding labels and the current super-set labels.

Parameters:

Name Type Description Default
data Tensor

The data tensor to interleave.

required

Returns:

Type Description
Tensor

torch.Tensor: The interleaved data tensor.

Source code in unaiverse/streams/dataprops.py
def adapt_tensor_to_tensor_labels(self, data: torch.Tensor) -> torch.Tensor:
    """Interleaves data in function of its corresponding labels and the current super-set labels.

    Args:
        data (torch.Tensor): The data tensor to interleave.

    Returns:
        torch.Tensor: The interleaved data tensor.
    """
    if self.is_tensor():
        num_labels = len(self.tensor_labels) if self.tensor_labels is not None else 0
        if num_labels > 0 and data.shape[1] < num_labels and self.tensor_labels.indices is not None:
            data_padded = torch.zeros((data.shape[0], num_labels), device=data.device, dtype=data.dtype)
            data_padded[:, self.tensor_labels.indices] = data
            return data_padded
        else:
            return data  # Do nothing
    else:
        return data  # Do nothing

clear_label_adaptation

clear_label_adaptation(data: Tensor)

Removes the padding and returns the original data from an adapted tensor.

Parameters:

Name Type Description Default
data Tensor

The adapted tensor.

required

Returns:

Type Description

The original, un-padded tensor.

Source code in unaiverse/streams/dataprops.py
def clear_label_adaptation(self, data: torch.Tensor):
    """Removes the padding and returns the original data from an adapted tensor.

    Args:
        data: The adapted tensor.

    Returns:
        The original, un-padded tensor.
    """
    return data[:, self.tensor_labels.indices] if self.tensor_labels.indices is not None else data

is_flat_tensor_with_labels

is_flat_tensor_with_labels() -> bool

Checks if the tensor is a 2D array and has labels, which is a common structure for general feature data.

Returns:

Type Description
bool

True if it is, False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_flat_tensor_with_labels(self) -> bool:
    """Checks if the tensor is a 2D array and has labels, which is a common structure for general feature data.

    Returns:
        True if it is, False otherwise.
    """
    return self.is_tensor() and len(self.tensor_shape) == 2 and self.has_tensor_labels()

has_tensor_labels

has_tensor_labels() -> bool

Checks if any tensor labels are associated with the stream.

Returns:

Type Description
bool

True if labels exist, False otherwise.

Source code in unaiverse/streams/dataprops.py
def has_tensor_labels(self) -> bool:
    """Checks if any tensor labels are associated with the stream.

    Returns:
        True if labels exist, False otherwise.
    """
    return self.tensor_labels is not None and len(self.tensor_labels) > 0

to_text

to_text(data: Tensor | Image | str, ignore_raw_tensors: bool = False)

Converts the tensor data into a text-based representation exploiting the given labels and the labeling rule.

Parameters:

Name Type Description Default
data Tensor or Image or str

The data to convert into text (if a string, then pass-through).

required
ignore_raw_tensors bool

Default False. If True, only tensor with labels will be considered.

False

Returns:

Type Description

str or None: The corresponding text representation of the data.

Raises:

Type Description
ValueError

If the data type is not supported for conversion.

Source code in unaiverse/streams/dataprops.py
def to_text(self, data: torch.Tensor | Image.Image | str, ignore_raw_tensors: bool = False):
    """Converts the tensor data into a text-based representation exploiting the given labels and the labeling rule.

    Args:
        data (torch.Tensor or Image.Image or str): The data to convert into text (if a string, then pass-through).
        ignore_raw_tensors (bool): Default False. If True, only tensor with labels will be considered.

    Returns:
        str or None: The corresponding text representation of the data.

    Raises:
        ValueError: If the data type is not supported for conversion.
    """
    if isinstance(data, str):
        return data
    elif not isinstance(data, torch.Tensor):
        return '<non-tensor and non-text>' if data is not None else None

    if self.is_tensor():
        if not self.has_tensor_labels():
            if ignore_raw_tensors:
                return None
            str_rep = f"{'x'.join(map(str, data.shape))} tensor ({data.dtype})\n{data.detach().cpu().numpy()}"
            if len(str_rep) > 110:
                str_rep = str_rep[0:(110-3)] + "..."
            return str_rep

        if len(data.shape) > 2:  # Can only print 1d data (recall that 1d data has 2 dimensions, due to batch size)
            return None

        if data.shape[0] != 1:
            return None  # "Code designed for a batch of only 1 element

        if self.is_tensor_token_ids():

            # This is the case in which we assume to have a vector of token IDs
            text = ""
            for i in range(0, data.shape[1]):
                if i > 0:
                    text += " "
                text += self.tensor_labels[data[0][i].item()]
            return text

        elif self.is_tensor_float():

            # This is the generic case of a 1d tensor
            if self.tensor_labels.labeling_rule == "max":
                j = torch.argmax(data, dim=1)
                return self.tensor_labels[j.item()]
            elif self.tensor_labels.labeling_rule == "geq":

                # Warning: does not work for mini-batches
                jj = torch.where(data >= self.tensor_labels.labeling_rule_thres)[1]
                return ", ".join(self.tensor_labels[j] for j in jj.tolist())
            else:
                return None
        return None

    elif self.is_text():
        if self.proc_to_stream_transforms is None:
            return None
        if isinstance(self.proc_to_stream_transforms, PreTrainedTokenizerBase):
            return self.proc_to_stream_transforms.decode(data[0])
        elif isinstance(self.proc_to_stream_transforms, dict):
            if data.dtype != torch.long:

                # This is the case of probabilities
                j = torch.argmax(data, dim=1)  # Warning: does not work for mini-batches
                return self.proc_to_stream_transforms[j.item()]
            else:

                # This is the case in which we assume to have a vector of token IDs
                text = ""
                for i in range(0, data.shape[1]):
                    if i > 0:
                        text += " "
                    text += self.proc_to_stream_transforms[data[0][i].item()]
                return text
        else:
            return self.proc_to_stream_transforms(data)
    else:
        return None

check_and_preprocess

check_and_preprocess(data: str | Image | Tensor | None, allow_class_ids: bool = False, targets: bool = False, device: device = device('cpu'))

Prepares incoming data for a processor by validating its type and applying necessary transformations. It handles different data types, including tensors, text (strings), and images, raising ValueError if the data type is unexpected or incompatible with the stream's properties. For text and images, it can apply a pre-configured transformation (like a tokenizer or a standard image transform) to convert the data into a tensor format suitable for processing. For tensors, it performs validation on shape and data type.

Parameters:

Name Type Description Default
data str | Image | Tensor | None

The data sample to check and preprocess.

required
allow_class_ids bool

A boolean to allow single-element long tensors, typically for class IDs.

False
targets bool

A boolean to indicate if the data is a target (used to select the correct transformation in a dual-transform setup).

False
device device

The PyTorch device (e.g., 'cpu' or 'cuda') to which the tensor should be moved.

device('cpu')

Returns:

Type Description

The preprocessed data, typically a tensor on the specified device.

Source code in unaiverse/streams/dataprops.py
def check_and_preprocess(self, data: str | Image.Image | torch.Tensor | None,
                         allow_class_ids: bool = False, targets: bool = False,
                         device: torch.device = torch.device("cpu")):
    """Prepares incoming data for a processor by validating its type and applying necessary transformations.
    It handles different data types, including tensors, text (strings), and images, raising `ValueError` if
    the data type is unexpected or incompatible with the stream's properties. For text and images, it can apply a
    pre-configured transformation (like a tokenizer or a standard image transform) to convert the data into a
    tensor format suitable for processing. For tensors, it performs validation on shape and data type.

    Args:
        data: The data sample to check and preprocess.
        allow_class_ids: A boolean to allow single-element long tensors, typically for class IDs.
        targets: A boolean to indicate if the data is a target (used to select the correct transformation in a
            dual-transform setup).
        device: The PyTorch device (e.g., 'cpu' or 'cuda') to which the tensor should be moved.

    Returns:
        The preprocessed data, typically a tensor on the specified device.
    """
    if data is None:
        return data

    if self.is_tensor():
        if isinstance(data, torch.Tensor):

            # Skipping all checks, it is enough to know it is a tensor
            if allow_class_ids and data.dtype == torch.long and len(data.shape) == 1:
                return data.to(device)

            # Checking dtype
            if self.tensor_dtype != data.dtype:
                raise ValueError(f"Expected data of type {self.tensor_dtype}, got {data.dtype} ("
                                 f"shape {data.shape})")

            # Checking shape
            if len(self.tensor_shape) != len(data.shape):
                raise ValueError(f"Expected data with shape {self.tensor_shape}, got {data.shape}")
            for i, s in enumerate(self.tensor_shape):
                if s is not None:
                    if s != data.shape[i]:
                        raise ValueError(f"Expected data with shape {self.tensor_shape}, got {data.shape}")

            # Checking labels
            if self.has_tensor_labels():
                if data.ndim != 2:
                    raise ValueError("Only 2d tensors are expected for "
                                     "labeled attributes (1st dimension is batch dim)")
                if not (self.is_tensor_token_ids() or data.shape[1] == self.tensor_labels.num_labels):
                    raise ValueError(f"Expected data with {self.tensor_labels.num_labels} "
                                     f"components (ignoring the 1st dimension), "
                                     f"got {data[0].numel()}")

            return data.to(device)
        else:
            raise ValueError(f"Expecting tensor data, got {type(data)}")
    elif self.is_text():
        if isinstance(data, str):
            if self.stream_to_proc_transforms is not None:
                text_to_tensor_transform = self.stream_to_proc_transforms[int(targets)]
                if text_to_tensor_transform is not None:
                    if isinstance(text_to_tensor_transform, PreTrainedTokenizerBase):
                        return text_to_tensor_transform(data, return_tensors='pt')['input_ids'].to(device)  # Tok
                    elif isinstance(text_to_tensor_transform, dict):
                        tensor = torch.tensor(
                            text_to_tensor_transform[data] if data in text_to_tensor_transform else len(
                                text_to_tensor_transform), dtype=torch.long, device=device)
                        if not targets or tensor.ndim > 1:
                            return tensor.view(1, -1)  # Warning batch size 1
                        else:
                            return tensor.view(-1)  # No batch size for targets, just a 1D vector with IDS
                    else:
                        return text_to_tensor_transform(data).to(device)  # Custom callable function
                else:
                    return data
            else:
                return data
        else:
            raise ValueError(f"Expecting text (string) data, got {type(data)}")
    elif self.is_img():
        if isinstance(data, Image.Image):
            if self.stream_to_proc_transforms is not None:
                img_to_tensor_transform = self.stream_to_proc_transforms[int(targets)]
                if img_to_tensor_transform is not None:
                    return img_to_tensor_transform(data).to(device)
                else:
                    return data
            else:
                return data
        else:
            raise ValueError(f"Expecting image (PIL.Image) data, got {type(data)}")
    elif self.is_file():
        if isinstance(data, FileContainer):
            return data

        elif isinstance(data, str):
            return FileContainer.from_path(data)

        elif isinstance(data, bytes):
            return FileContainer(content=data, filename="raw_bytes", mime_type="application/octet-stream")

        # Fallback if FileContainer is not imported but the object looks like one (duck typing)
        elif hasattr(data, 'content') and hasattr(data, 'filename') and hasattr(data, 'mime_type'):
            return data

        else:
            raise ValueError(f"Expecting FileContainer, str (path), or bytes for file stream, got {type(data)}")
    elif self.is_all():
        return data
    else:
        raise ValueError(f"Unexpected data type, {self.data_type}")

check_and_postprocess

check_and_postprocess(data: str | Image | Tensor | None)

Takes a processor's output and validates it before converting it back into a stream-compatible format. It handles torch.Tensor data, applying a proc_to_stream_transform (if one exists) to convert the tensor into an appropriate format for the stream, such as a string for text or a PIL Image for images. It performs a final check on the data's format (shape, dtype, etc.) to ensure consistency with the stream's properties.

Parameters:

Name Type Description Default
data str | Image | Tensor | None

The output from the processor, typically a tensor.

required

Returns:

Type Description

The post-processed data, in a stream-compatible format (e.g., a string, image, or CPU tensor).

Source code in unaiverse/streams/dataprops.py
def check_and_postprocess(self, data: str | Image.Image | torch.Tensor | None):
    """Takes a processor's output and validates it before converting it back into a stream-compatible format.
    It handles `torch.Tensor` data, applying a `proc_to_stream_transform` (if one exists) to convert the tensor
    into an appropriate format for the stream, such as a string for text or a PIL `Image` for images. It performs
    a final check on the data's format (shape, dtype, etc.) to ensure consistency with the stream's properties.

    Args:
        data: The output from the processor, typically a tensor.

    Returns:
        The post-processed data, in a stream-compatible format (e.g., a string, image, or CPU tensor).
    """
    if data is None:
        return None

    if self.is_tensor():
        if isinstance(data, torch.Tensor):
            if self.proc_to_stream_transforms is not None:
                data = self.proc_to_stream_transforms(data)
            data = data.cpu()

            # Checking dtype
            if self.tensor_dtype != data.dtype:
                raise ValueError(f"Expected data of type {self.tensor_dtype}, got {data.dtype}")

            # Checking shape
            if len(self.tensor_shape) != len(data.shape):
                raise ValueError(f"Expected data with shape {self.tensor_shape}, got {data.shape}")
            for i, s in enumerate(self.tensor_shape):
                if s is not None:
                    if s != data.shape[i]:
                        raise ValueError(f"Expected data with shape {self.tensor_shape}, got {data.shape}")

            # Checking labels
            if self.has_tensor_labels():
                if data.ndim != 2:
                    raise ValueError("Only 2d tensors are expected for "
                                     "labeled attributes (1st dimension is batch dim)")
                if not (self.is_tensor_token_ids() or data.shape[1] == self.tensor_labels.num_labels):
                    raise ValueError(f"Expected data with {self.tensor_labels.num_labels} "
                                     f"components (ignoring the 1st dimension), "
                                     f"got {data[0].numel()}")

            return data
        else:
            raise ValueError(f"Expecting tensor data, got {type(data)}")
    elif self.is_text():
        if isinstance(data, str):
            return data
        elif isinstance(data, torch.Tensor):
            data = data.cpu()
            if self.proc_to_stream_transforms is not None:
                assert data.shape[0] == 1, f"Code designed for a batch of only 1 element, got {data.shape[0]}"
                if isinstance(self.proc_to_stream_transforms, PreTrainedTokenizerBase):
                    return self.proc_to_stream_transforms.decode(data[0])  # Tokenizer
                elif isinstance(self.proc_to_stream_transforms, list):
                    if data.dtype != torch.long:

                        # This is the case of probabilities
                        j = torch.argmax(data, dim=1)  # Warning: does not work for mini-batches
                        return self.proc_to_stream_transforms[j.item()]
                    else:

                        # This is the case in which we assume to have a vector of token IDs
                        text = ""
                        for i in range(0, data.shape[1]):
                            if i > 0:
                                text += " "
                            text += self.proc_to_stream_transforms[data[0][i].item()]
                        return text
                else:
                    return self.proc_to_stream_transforms(data)  # Custom callable function
            else:
                raise ValueError(f"Cannot decode torch.Tensor to text, since text_to_tensor_inv_transform is None")
        else:
            raise ValueError(f"Expecting text (string) or tensor data, got {type(data)}")
    elif self.is_img():
        if isinstance(data, Image.Image):
            return data
        elif isinstance(data, torch.Tensor):
            data = data.cpu()
            if self.proc_to_stream_transforms is not None:
                return self.proc_to_stream_transforms(data)
            else:
                raise ValueError(f"Cannot convert a tensor to PIL.Image, since img_to_tensor_inv_transform is None")
        else:
            raise ValueError(f"Expecting image (PIL.Image) data or torch.Tensor, got {type(data)}")
    elif self.is_file():
        if isinstance(data, FileContainer) or \
           (hasattr(data, 'content') and hasattr(data, 'filename') and hasattr(data, 'mime_type')):
            return data
        else:
            raise ValueError(f"Expecting FileContainer for file stream output, got {type(data)}")
    elif self.is_all():
        return data
    else:
        raise ValueError(f"Unexpected data type, {self.data_type}")

is_compatible

is_compatible(props_to_compare: DataProps) -> bool

Checks if the current DataProps instance is compatible with another DataProps instance. Checks include data type, shape, and labels.

Parameters:

Name Type Description Default
props_to_compare DataProps

The DataProps instance to check compatibility with.

required

Returns:

Name Type Description
bool bool

True if compatible, False otherwise.

Source code in unaiverse/streams/dataprops.py
def is_compatible(self, props_to_compare: 'DataProps') -> bool:
    """Checks if the current DataProps instance is compatible with another DataProps instance.
    Checks include data type, shape, and labels.

    Args:
        props_to_compare (DataProps): The DataProps instance to check compatibility with.

    Returns:
        bool: True if compatible, False otherwise.
    """

    # Checking data type
    if self.data_type != props_to_compare.data_type and self.data_type != "all":
        return False

    # In the case of tensors...
    if self.is_tensor():

        # Checking shape
        if len(self.tensor_shape) == len(props_to_compare.tensor_shape):
            for s, p in zip(self.tensor_shape, props_to_compare.tensor_shape):
                if s is not None and p is not None and s != p:
                    return False
        else:
            return False

        # Checking labels (if possible)
        if (not self.has_tensor_labels()) or (not props_to_compare.has_tensor_labels()):
            return True
        else:
            return self.tensor_labels == props_to_compare.tensor_labels
    else:
        return True

TensorLabels

TensorLabels(data_props: DataProps, labels: list[str] | None, labeling_rule: str = 'max')

A class to manage labels associated with data and perform operations on them.

Attributes:

Name Type Description
VALID_LABELING_RULES tuple

Tuple of valid labeling rules ('max', 'geq').

Initializes the TensorLabels instance.

Parameters:

Name Type Description Default
data_props DataProps

The DataProps instance that owns these labels.

required
labels list[str] or None

List of labels.

required
labeling_rule str

The rule for labeling (either 'max' or 'geqX', where X is a number).

'max'

Returns:

Type Description

None

Raises:

Type Description
AssertionError

If the labels or labeling_rule are invalid.

Source code in unaiverse/streams/dataprops.py
def __init__(self, data_props: DataProps, labels: list[str] | None, labeling_rule: str = "max"):
    """Initializes the TensorLabels instance.

    Args:
        data_props (DataProps): The DataProps instance that owns these labels.
        labels (list[str] or None): List of labels.
        labeling_rule (str): The rule for labeling (either 'max' or 'geqX', where X is a number).

    Returns:
        None

    Raises:
        AssertionError: If the labels or labeling_rule are invalid.
    """
    assert data_props.is_tensor(), "Tensor labels can only be attached to tensor data properties"
    num_labels = len(labels) if labels is not None else 0
    assert num_labels == 0 or (data_props.is_tensor() and len(data_props.tensor_shape) == 2), \
        "Data attribute labels can only be specified for 2d arrays (batch size + data features)"
    assert len(labeling_rule) >= 3 and labeling_rule[0:3] in TensorLabels.VALID_LABELING_RULES, \
        "Invalid labeling rule"
    try:
        original_labeling_rule = labeling_rule
        if len(labeling_rule) > 3:
            labeling_rule_thres = float(labeling_rule[3:])
            labeling_rule = labeling_rule[0:3]
        else:
            labeling_rule_thres = None
    except ValueError:
        assert False, "Invalid labeling rule"

    # Basic attributes
    self.data_props = data_props
    self.labels = labels
    self.labeling_rule = labeling_rule
    self.labeling_rule_thres = labeling_rule_thres
    self.original_labeling_rule = original_labeling_rule

    # These are mostly operational stuff, similar to private info (but it could be useful to expose them)
    self.num_labels = num_labels
    self.indices = None

VALID_LABELING_RULES class-attribute instance-attribute

VALID_LABELING_RULES = ('max', 'geq')

data_props instance-attribute

data_props = data_props

labels instance-attribute

labels = labels

labeling_rule instance-attribute

labeling_rule = labeling_rule

labeling_rule_thres instance-attribute

labeling_rule_thres = labeling_rule_thres

original_labeling_rule instance-attribute

original_labeling_rule = original_labeling_rule

num_labels instance-attribute

num_labels = num_labels

indices instance-attribute

indices = None

to_dict

to_dict() -> dict

Serializes the TensorLabels instance into a dictionary, which includes the list of labels and the original labeling rule.

Returns:

Type Description
dict

A dictionary containing the labels and the original labeling rule.

Source code in unaiverse/streams/dataprops.py
def to_dict(self) -> dict:
    """Serializes the `TensorLabels` instance into a dictionary, which includes the list of labels and the original
    labeling rule.

    Returns:
        A dictionary containing the labels and the original labeling rule.
    """
    return {
        'labels': self.labels,
        'labeling_rule': self.original_labeling_rule
    }

clear_indices

clear_indices() -> None

Resets the internal indices attribute to None. This effectively clears any previous label adaptation that was performed and allows the object to revert to its original, non-interleaved state.

Source code in unaiverse/streams/dataprops.py
def clear_indices(self) -> None:
    """Resets the internal `indices` attribute to `None`. This effectively clears any previous label adaptation
    that was performed and allows the object to revert to its original, non-interleaved state.
    """
    self.indices = None

interleave_with

interleave_with(superset_labels: list[str]) -> None

Interleaves the current labels with a super-set of labels, determining how to index them.

Parameters:

Name Type Description Default
superset_labels list[str]

The super-set of labels to interleave with.

required

Raises:

Type Description
AssertionError

If the super-set of labels is not compatible.

Source code in unaiverse/streams/dataprops.py
def interleave_with(self, superset_labels: list[str]) -> None:
    """Interleaves the current labels with a super-set of labels, determining how to index them.

    Args:
        superset_labels: The super-set of labels to interleave with.

    Raises:
        AssertionError: If the super-set of labels is not compatible.
    """
    assert superset_labels is not None and self.labels is not None, \
        f"Can only interleave non-empty sets of attribute labels"
    assert len(superset_labels) >= len(self), f"You must provide a super-set of attribute labels"

    # Ensuring it is a super-set of the current labels and finding its position
    if self.indices is not None:
        labels = []
        indices_list = self.indices.tolist()
        for i in indices_list:
            labels.append(self.labels[i])
    else:
        labels = self.labels

    indices = []
    for label in labels:
        assert label in superset_labels, \
            f"Cannot find attribute label {label} in (expected) super-set {superset_labels}"
        indices.append(superset_labels.index(label))

    if len(indices) == len(superset_labels):
        same_labels_and_order = True
        for j, i in enumerate(indices):
            if j != i:
                same_labels_and_order = False
                break
    else:
        same_labels_and_order = False

    if not same_labels_and_order:
        self.labels = superset_labels
        self.num_labels = len(self.labels)
        self.indices = torch.tensor(indices, dtype=torch.long)

        # Altering shape
        self.data_props.tensor_shape = (self.data_props.tensor_shape[0], self.num_labels)
    else:
        self.indices = None