🟡 unaiverse.streams.streamproxy
What this module does 🟡
Provides StreamProxy, a dict-like accessor that binds, enables/disables and routes interactions and data to underlying streams, with a defaults-aware subclass.
streamproxy
¶
█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████ ░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█ ░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi
StreamProxy
¶
StreamProxy(streams: dict[str, Stream | object] | None = None)
A proxy that wraps a set of streams and exposes a unified stdin/stdout interface.
StreamProxy provides a consistent interface for reading and writing data on a
collection of Stream objects (or plain default values) identified by name, index,
or implicitly when only one stream is bound. It is the object exposed to processor
implementations as self.stdin, self.stdout, and similar attributes.
Streams are stored in three parallel data structures that allow O(1) lookup by full
hash key (_streams), by position (_stream_list), and by short name stripped of
peer-ID prefix (_streams_by_name_only). All three are kept in sync by every
mutating operation (bind, add_new_bind, set).
A proxy may also hold a default UUID (set via bind_uuid_only or bind) that is
used automatically whenever the caller omits the uuid argument on get and
set.
Attributes:
| Name | Type | Description |
|---|---|---|
default_values |
Not defined on the base class; present only on
|
Examples:
Typical processor usage inside a process step:
>>> # Read from the single input stream
>>> data = self.stdin.get()
>>>
>>> # Write to the single output stream
>>> self.stdout.set(result)
>>>
>>> # Read a specific named input stream
>>> image = self.stdin.get("image")
>>>
>>> # Write to a specific named output stream by index
>>> self.stdout.set(0, processed_image)
Initialize a StreamProxy wrapping the given streams.
All three internal lookup structures are built from streams at construction
time. When the same short name (without peer-ID prefix) maps to more than one
entry, the first encountered entry wins in _streams_by_name_only. The default
UUID is initialised to None and can be set later with bind_uuid_only or
bind.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
streams
|
dict[str, Stream | object] | None
|
A dictionary mapping stream hash keys to |
None
|
Examples:
Source code in unaiverse/streams/streamproxy.py
names
property
¶
Return the full hash keys of all bound streams.
The returned list contains the raw keys stored in _streams (user hashes or
<default_pos_N> placeholders), in insertion order. Use items to obtain
both keys and stream objects simultaneously.
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of stream hash key strings. |
bind
¶
bind(streams: dict[str, Stream | object], uuid: str | None = None)
Rebind this proxy to a different set of streams.
All three internal lookup structures are rebuilt from streams. A shallow copy
of the incoming dictionary is stored so that subsequent external modifications to
the caller's dict do not silently affect the proxy. If uuid is provided, the
proxy's default UUID is also updated; otherwise the previously configured UUID is
preserved.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
streams
|
dict[str, Stream | object]
|
A dictionary mapping stream hash keys to |
required |
uuid
|
str | None
|
The default UUID to use when callers omit the |
None
|
Examples:
Source code in unaiverse/streams/streamproxy.py
add_new_bind
¶
add_new_bind(stream_hash: str, stream: Stream) -> None
Register an additional stream under the given hash, if not already bound.
If stream_hash is already present in the proxy, the call is a no-op. Otherwise
the stream is appended to all three internal lookup structures so that it becomes
reachable by full hash, by position, and by short name. In case of a short-name
collision, the existing entry wins.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stream_hash
|
str
|
The full hash key (typically a user hash) to associate with the stream. |
required |
stream
|
Stream
|
The |
required |
Source code in unaiverse/streams/streamproxy.py
bind_uuid_only
¶
Set the default UUID applied by get and set when the caller omits one.
Passing None clears the default UUID so that no interaction filtering is
applied implicitly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uuid
|
str | None
|
The UUID string to use as default, or |
required |
Source code in unaiverse/streams/streamproxy.py
clear_data
¶
Clear current-interaction data from every bound stream.
Calls clear_data on each stream in the proxy for the specified uuid,
removing only the data associated with that interaction. Stream interaction records
themselves are preserved. This is useful for resetting a single step's output
without discarding the full interaction history.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uuid
|
str | None
|
The UUID of the interaction whose data should be cleared. Pass |
required |
Source code in unaiverse/streams/streamproxy.py
clear_all_data
¶
Clear all data from every bound stream across all interactions.
Calls clear_all_data on each stream in the proxy, removing every stored
data sample regardless of UUID. Interaction records themselves are not removed.
Use clear_data when only a specific interaction's data should be erased.
Source code in unaiverse/streams/streamproxy.py
enable
¶
Enable every bound stream, allowing data to be written.
Calls enable on each stream object in the proxy. Streams that are already
enabled are unaffected. See disable for the reverse operation.
Source code in unaiverse/streams/streamproxy.py
disable
¶
Disable every bound stream, preventing data from being written.
Calls disable on each stream object in the proxy. Streams that are already
disabled are unaffected. Disabled streams still allow reads and interaction
registration; only set calls are blocked unless force=True is passed.
See enable for the reverse operation.
Source code in unaiverse/streams/streamproxy.py
get
¶
get(key: str | int | None = None, requested_by: str | None = None, uuid: str | None = None, all_uuids: bool = False, data_type: str | None = None)
Return data from one or all bound streams.
When key is None, data is collected from every stream in the proxy and
returned as a list. When key is an int, the stream at that position is
addressed. When key is a str, the full hash key is tried first, then the
short (name-only) key. Non-Stream entries (plain default values) are returned
as-is without UUID or requested_by filtering.
If the proxy's default UUID was set via bind_uuid_only or bind, it is used
automatically when uuid is not provided by the caller.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str | int | None
|
Stream name ( |
None
|
requested_by
|
str | None
|
Identifier of the caller requesting the data. Forwarded to the
underlying |
None
|
uuid
|
str | None
|
UUID of the interaction to retrieve data for. When |
None
|
all_uuids
|
bool
|
When |
False
|
data_type
|
str | None
|
Short name of a data type. When set (and |
None
|
Returns:
| Type | Description |
|---|---|
|
When |
|
|
if no stream produced a non- |
|
|
|
|
|
default value for non- |
|
|
element is a list of |
Raises:
| Type | Description |
|---|---|
GenException
|
If |
GenException
|
If both |
GenException
|
If |
Examples:
>>> # Read from the only bound stream
>>> sample = self.stdin.get()
>>>
>>> # Read a named stream
>>> image = self.stdin.get("image")
>>>
>>> # Read all samples for a specific UUID across every stream
>>> result = self.stdin.get(all_uuids=True, key="label")
Source code in unaiverse/streams/streamproxy.py
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 | |
add_interaction
¶
Register an interaction on every bound stream.
Calls add_interaction on each entry in the stream list. This makes the
interaction (and its UUID) visible to all streams simultaneously, which is required
before data can be written for that interaction via set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
interaction
|
The |
required |
Source code in unaiverse/streams/streamproxy.py
has_interaction
¶
Return whether every bound stream has an interaction for the given UUID.
Non-Stream entries are skipped. The check is strict: all remaining Stream
objects must report True from their own has_interaction method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uuid
|
str | None
|
The UUID to look up. Pass |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
|
Source code in unaiverse/streams/streamproxy.py
get_interaction
¶
Retrieve the interaction for the given UUID from the first stream that holds one.
has_interaction is called first; if it returns False the method returns
None immediately without iterating. Because has_interaction guarantees that
all streams share the same interaction, the first Stream entry that provides a
non-None result is returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uuid
|
str | None
|
The UUID of the interaction to retrieve. Pass |
required |
Returns:
| Type | Description |
|---|---|
object | None
|
The |
object | None
|
if |
Source code in unaiverse/streams/streamproxy.py
set
¶
Write data to one or all bound streams.
Supports four calling conventions:
proxy.set(data)-- single-stream or broadcast: whenkey_or_datais neither anintnor a known key, it is treated as the data for the only stream. Whenkey_or_datais alistortuple, each element is written to the corresponding stream in positional order.proxy.set(index, data)-- by zero-based integer index.proxy.set(stream_name, data)-- by full hash key or short name.
For non-Stream entries (plain default values), the value is updated in all
three internal lookup structures so they remain consistent. If the proxy's default
UUID was set via bind_uuid_only or bind, it is used automatically when
uuid is not provided by the caller.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key_or_data
|
One of: the data to write (single-stream case), a |
required | |
data
|
The data to write. Required when |
None
|
|
data_tag
|
int
|
Integer tag associated with the sample. Pass |
-1
|
uuid
|
str | None
|
UUID of the interaction for which data is being written. When |
None
|
force
|
bool
|
When |
False
|
Raises:
| Type | Description |
|---|---|
GenException
|
If no streams are bound to this proxy. |
GenException
|
If |
GenException
|
If |
Examples:
>>> # Write to the only bound output stream
>>> self.stdout.set(result)
>>>
>>> # Write to multiple output streams at once
>>> self.stdout.set([image_out, label_out])
>>>
>>> # Write to a named stream with a custom tag
>>> self.stdout.set("prediction", pred_tensor, data_tag=42)
Source code in unaiverse/streams/streamproxy.py
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 | |
get_tag
¶
Return the data tag for the given stream and interaction UUID.
When key is None, tags are collected from all streams and the maximum is
returned. A tag value of -1 is returned for non-Stream entries and for
streams that have no tag for the given UUID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str | int | None
|
Stream name ( |
None
|
uuid
|
str | None
|
UUID of the interaction to query. When |
None
|
Returns:
| Type | Description |
|---|---|
int
|
The integer data tag associated with the most recent sample, or |
int
|
no tag is available (non- |
Raises:
| Type | Description |
|---|---|
GenException
|
If no streams are bound to this proxy. |
GenException
|
If |
Source code in unaiverse/streams/streamproxy.py
clear
¶
Reset all bound streams by writing None to each of them.
Equivalent to calling set with a list of None values whose length matches
the number of bound streams. Useful for clearing output state between processing
steps. See clear_data to remove stored data for a specific UUID without
overwriting the stream value.
Source code in unaiverse/streams/streamproxy.py
items
¶
Iterate over (stream_hash, stream) pairs for all bound streams.
Yields each key-value pair from the internal _streams dictionary in insertion
order. Keys are full hash strings (either user hashes or <default_pos_N>
placeholders). Use names to retrieve only the keys as a list.
Returns:
| Type | Description |
|---|---|
|
An iterator of |
Source code in unaiverse/streams/streamproxy.py
values
¶
Iterate over all bound stream objects (or plain default values).
Yields each value from the internal _streams dictionary in insertion order.
Entries may be Stream instances or plain Python objects used as default values.
Returns:
| Type | Description |
|---|---|
|
An iterator over the stream objects and plain values. |
Source code in unaiverse/streams/streamproxy.py
StreamsProxyWithDefaults
¶
Bases: StreamProxy
A StreamProxy that substitutes None results with per-stream default values.
This subclass wraps StreamProxy.get so that when a stream returns None for a
given position, the corresponding entry from default_values is returned instead.
It is useful when a processor needs a guaranteed non-None value for every stream
slot even if data has not yet arrived.
Attributes:
| Name | Type | Description |
|---|---|---|
default_values |
A list of per-stream fallback values applied by |
Initialize a StreamsProxyWithDefaults with optional per-stream fallbacks.
Forwards all positional and keyword arguments to StreamProxy.__init__. The
default_values list is stored separately and is applied during every get
call. The length of default_values should match the number of bound streams;
mismatches do not raise an error but may cause index-out-of-range issues at
runtime.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Positional arguments forwarded to |
()
|
|
default_values
|
A list of fallback values, one per stream position. When
|
None
|
|
**kwargs
|
Keyword arguments forwarded to |
{}
|
Source code in unaiverse/streams/streamproxy.py
get
¶
Return data from bound streams, substituting None with default values.
Delegates to StreamProxy.get and then replaces any None entries in the
resulting list with the corresponding element from self.default_values. If the
result from the parent is itself None (no stream produced data) or if
self.default_values is None, the result is returned unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Positional arguments forwarded to |
()
|
|
**kwargs
|
Keyword arguments forwarded to |
{}
|
Returns:
| Type | Description |
|---|---|
|
A list of data values where |
|
|
configured default values, or the unmodified result from |
|
|
if no substitution is applicable. |