Skip to content

Data streams

Data streams are the typed channels through which agents exchange data. Every input and output of an Agent is a stream. When two agents connect, UNaIVERSE inspects their StreamType descriptors, finds compatible pairs, and wires them together automatically, with built-in type checking, optional buffering, and either point-to-point delivery or pub/sub broadcast. You write no networking code.

Why typed?

Two peers must agree on what they exchange before they exchange it. A StreamType is that contract, kind, shape, dtype. It's why you can connect a classifier and a generator with confidence that nobody sends the wrong thing.

The data types

The shorthand you'll use most. Pass these as plain strings to proc_inputs / proc_outputs, or build a full StreamType.

Value Meaning
"tensor" A torch.Tensor: images, audio, embeddings, signals.
"img" A PIL Image.
"text" A plain Python string.
"file" Any file (a PDF, an audio clip, a zip, a CSV), carried as a FileContainer.
"all" Wildcard, matches any of the others.

The file type, for whole files

Use "file" to move an actual file between agents, not just text or numbers. A file travels as a small FileContainer (from unaiverse.streams.dataprops import FileContainer) with three fields: content (the raw bytes), filename, and mime_type. When you declare a "file" input, your forward() receives a FileContainer. To send one out, return a path string, raw bytes, or a FileContainer. It is the right type for documents, media, datasets, or checkpoints that are not naturally a tensor, an image, or text.

# The two are equivalent for simple cases:
proc_inputs = ["text"]
proc_inputs = [StreamType(data_type="text")]

Defining I/O with StreamType

For anything beyond the shorthand, describe the stream precisely:

from unaiverse.streams.dataprops import StreamType

# Image input: batch × channels × height × width (None = dynamic)
image_input = StreamType(data_type="tensor",
                         tensor_shape=(None, 3, 224, 224),
                         tensor_dtype="torch.float32")

text_output = StreamType(data_type="text")            # text out
any_stream  = StreamType(data_type="all")             # wildcard in

UNaIVERSE uses these descriptors to validate incoming data, match compatible streams between peers, and apply transforms automatically.

StreamType properties

data_type · str · required
"tensor", "img", "text", or "all".
tensor_shape · tuple
Expected shape for tensor streams; None marks dynamic dims, e.g. (None, 3, 224, 224).
tensor_dtype · str
Expected dtype, e.g. "torch.float32" or "torch.long".
tensor_labels · list[str]
Human-readable labels for flat tensor dimensions (handy for tabular data).
pubsub · bool · default: False
Broadcast writes to all subscribers via a topic, instead of point-to-point.
public · bool · default: False
Advertise the stream on the public P2P network, not just inside a world.
private_only · bool · default: False
Transmit only on the private/world P2P layer; never expose publicly.

The \"all\" wildcard

An agent declaring StreamType(data_type="all") as input accepts connections from any stream. Use it for relay nodes, loggers, or any agent that should consume everything it can find.

Transforms

A StreamType can pre-/post-process data as it moves between the network and your processor, so forward() always sees clean inputs.

import torchvision.transforms as T

image_input = StreamType(
    data_type="tensor", tensor_shape=(None, 3, 224, 224),
    stream_to_proc_transforms=T.Compose([
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
)
prob_output = StreamType(
    data_type="tensor", tensor_shape=(None, 1000),
    proc_to_stream_transforms=lambda x: x.softmax(dim=-1),
)

stream_to_proc_transforms runs on incoming data before forward(); proc_to_stream_transforms runs on the output before it hits the stream. Both accept any callable, including torchvision.transforms.Compose.

Stream classes

Beyond single values, UNaIVERSE provides stream classes for common patterns:

Class Description
DataStream Base single-value stream, holds the most recent sample.
BufferedDataStream Keeps a history of samples with indexed access and replay.
ImageFileStream Streams images from a disk directory in sequence.
LabelStream Streams classification labels (ints or one-hot tensors).
TokensStream Streams tokenized text sequences.

Built-in generators

unaiverse.streams.streamlib ships ready-made BufferedStream subclasses for synthetic signals, great for testing pipelines or building signal-school worlds:

from unaiverse.streams.streamlib import Sin, Random, CombSin

noise     = Random(std=1.0, shape=(1,))             # Gaussian noise
sine_wave = Sin(freq=0.1, phase=0.0, delta=0.1)     # a sine wave
comb      = CombSin(freqs=[0.1, 0.5, 1.0], delta=0.1)  # summed sines

Where next