Data streams¶
Data streams are the typed channels through which agents exchange data.
Every input and output of an Agent is a stream. When two agents
connect, UNaIVERSE inspects their StreamType descriptors, finds compatible
pairs, and wires them together automatically, with built-in type checking,
optional buffering, and either point-to-point delivery or pub/sub broadcast. You
write no networking code.
Why typed?
Two peers must agree on what they exchange before they exchange it. A
StreamType is that contract, kind, shape, dtype. It's why you can connect
a classifier and a generator with confidence that nobody sends the wrong
thing.
The data types¶
The shorthand you'll use most. Pass these as plain strings to
proc_inputs / proc_outputs, or build a full StreamType.
| Value | Meaning |
|---|---|
"tensor" |
A torch.Tensor: images, audio, embeddings, signals. |
"img" |
A PIL Image. |
"text" |
A plain Python string. |
"file" |
Any file (a PDF, an audio clip, a zip, a CSV), carried as a FileContainer. |
"all" |
Wildcard, matches any of the others. |
The file type, for whole files
Use "file" to move an actual file between agents, not just text or numbers.
A file travels as a small FileContainer (from unaiverse.streams.dataprops
import FileContainer) with three fields: content (the raw bytes),
filename, and mime_type. When you declare a "file" input, your
forward() receives a FileContainer. To send one out, return a path
string, raw bytes, or a FileContainer. It is the right type for
documents, media, datasets, or checkpoints that are not naturally a tensor,
an image, or text.
# The two are equivalent for simple cases:
proc_inputs = ["text"]
proc_inputs = [StreamType(data_type="text")]
Defining I/O with StreamType¶
For anything beyond the shorthand, describe the stream precisely:
from unaiverse.streams.dataprops import StreamType
# Image input: batch × channels × height × width (None = dynamic)
image_input = StreamType(data_type="tensor",
tensor_shape=(None, 3, 224, 224),
tensor_dtype="torch.float32")
text_output = StreamType(data_type="text") # text out
any_stream = StreamType(data_type="all") # wildcard in
UNaIVERSE uses these descriptors to validate incoming data, match compatible streams between peers, and apply transforms automatically.
StreamType properties¶
data_type·str· required"tensor","img","text", or"all".tensor_shape·tuple- Expected shape for tensor streams;
Nonemarks dynamic dims, e.g.(None, 3, 224, 224). tensor_dtype·str- Expected dtype, e.g.
"torch.float32"or"torch.long". tensor_labels·list[str]- Human-readable labels for flat tensor dimensions (handy for tabular data).
pubsub·bool· default:False- Broadcast writes to all subscribers via a topic, instead of point-to-point.
public·bool· default:False- Advertise the stream on the public P2P network, not just inside a world.
private_only·bool· default:False- Transmit only on the private/world P2P layer; never expose publicly.
The \"all\" wildcard
An agent declaring StreamType(data_type="all") as input accepts
connections from any stream. Use it for relay nodes, loggers, or any agent
that should consume everything it can find.
Transforms¶
A StreamType can pre-/post-process data as it moves between the network and
your processor, so forward() always sees clean inputs.
stream_to_proc_transforms runs on incoming data before forward();
proc_to_stream_transforms runs on the output before it hits the stream. Both
accept any callable, including torchvision.transforms.Compose.
Stream classes¶
Beyond single values, UNaIVERSE provides stream classes for common patterns:
| Class | Description |
|---|---|
DataStream |
Base single-value stream, holds the most recent sample. |
BufferedDataStream |
Keeps a history of samples with indexed access and replay. |
ImageFileStream |
Streams images from a disk directory in sequence. |
LabelStream |
Streams classification labels (ints or one-hot tensors). |
TokensStream |
Streams tokenized text sequences. |
Built-in generators¶
unaiverse.streams.streamlib ships ready-made BufferedStream subclasses for
synthetic signals, great for testing pipelines or building signal-school
worlds:
from unaiverse.streams.streamlib import Sin, Random, CombSin
noise = Random(std=1.0, shape=(1,)) # Gaussian noise
sine_wave = Sin(freq=0.1, phase=0.0, delta=0.1) # a sine wave
comb = CombSin(freqs=[0.1, 0.5, 1.0], delta=0.1) # summed sines
Where next¶
- Agents,
proc_inputs/proc_outputsin context. - Interactions, how streams move each tick.
-
DataPropsAPI reference.