🔴 unaiverse.streams.dataprops
What this module does 🔴
Defines stream data descriptors (DataProps, StreamType, TensorLabels, FileContainer) including net/user hashing, tensor label adaptation, and pre/post-processing of stream payloads.
dataprops
¶
█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████
░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█
░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░
░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █
░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████
░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░
A Collectionless AI Project (https://collectionless.ai)
Registration/Login: https://unaiverse.io
Code Repositories: https://github.com/collectionlessai/
Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi
FileContainer
dataclass
¶
A typed wrapper for file data that distinguishes files from raw bytes or text strings.
FileContainer carries a file's raw content together with its name and MIME type so
that stream processing code can handle binary files (e.g. Protobuf payloads, images,
PDFs) without ambiguity. It is the canonical data format expected and produced by
DataProps instances whose data_type is "file".
Attributes:
| Name | Type | Description |
|---|---|---|
content |
bytes | str
|
Raw bytes (or, rarely, a string) holding the file's payload. |
filename |
str
|
Base name of the file (e.g. |
mime_type |
str
|
IANA MIME type string (e.g. |
Examples:
>>> fc = FileContainer(content=b"\x89PNG...", filename="image.png", mime_type="image/png")
>>> fc.mime_type
'image/png'
from_path
classmethod
¶
from_path(file_path: str) -> FileContainer
Create a FileContainer by reading a file from disk.
The MIME type is guessed from the file extension via mimetypes.guess_type.
If the extension is unknown the type defaults to "application/octet-stream".
The file is always read in binary mode, which is required for binary formats
such as Protobuf messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str
|
Absolute or relative path to the file to read. |
required |
Returns:
| Type | Description |
|---|---|
FileContainer
|
A |
FileContainer
|
holds the base name of the file, and |
FileContainer
|
type string. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If |
PermissionError
|
If the process lacks read permission on the file. |
Examples:
>>> fc = FileContainer.from_path("/tmp/data.json")
>>> fc.filename
'data.json'
>>> fc.mime_type
'application/json'
Source code in unaiverse/streams/dataprops.py
StreamType
¶
A paired container that holds one or two DataProps instances for a stream.
A StreamType groups a private DataProps and, optionally, a public DataProps
for the same logical data stream. This mirrors the UNaIVERSE networking model where a
stream may be accessible over both the private peer-to-peer channel and the public
network.
By default, both a private and a public DataProps are created. Use private_only
or public_only to restrict creation to a single variant. set_* methods
dispatched through __getattr__ are automatically forwarded to every contained
DataProps, allowing batch configuration in a single call.
Attributes:
| Name | Type | Description |
|---|---|---|
props |
Ordered list of |
Examples:
>>> from unaiverse.streams.dataprops import StreamType
>>> st = StreamType("text") # shorthand: creates private + public text stream
>>> len(st.props)
2
>>> st_private = StreamType("tensor", private_only=True,
... tensor_shape=(1, 128), tensor_dtype="torch.float32")
>>> len(st_private.props)
1
Initialize a StreamType with one or two DataProps descriptors.
When exactly one positional argument is given and no keyword arguments are
provided, it is treated as the data_type shorthand (e.g.
StreamType("text")). In all other cases the positional and keyword arguments
are forwarded verbatim to the DataProps constructor.
Unless private_only=True, a second DataProps is created with
public=True appended to the same arguments, representing the public variant
of the stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
object
|
Positional arguments forwarded to |
()
|
private_only
|
bool
|
When |
False
|
public_only
|
bool
|
When |
False
|
**kwargs
|
object
|
Keyword arguments forwarded to |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both |
ValueError
|
If |
Source code in unaiverse/streams/dataprops.py
to_list_of_dicts
¶
Convert each contained DataProps to a dictionary and return them as a list.
Calls DataProps.to_dict on every element of props and collects the
results. This is the serialization entry point when a StreamType is
transmitted over the network or persisted to storage.
Returns:
| Type | Description |
|---|---|
list[dict]
|
A list of dictionaries, one per |
list[dict]
|
same order (private first, then public when present). |
Source code in unaiverse/streams/dataprops.py
to_dict
¶
Raise RuntimeError unconditionally - use to_list_of_dicts instead.
to_dict is defined on DataProps for single-descriptor serialization.
StreamType may contain more than one descriptor, so a single-dict
representation is ambiguous and therefore intentionally unsupported here.
Call to_list_of_dicts to serialize all contained DataProps objects.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
Always, because this method is not supported on |
Source code in unaiverse/streams/dataprops.py
from_dict
¶
Raise RuntimeError unconditionally - use DataProps.from_dict instead.
from_dict is defined on DataProps to reconstruct a single descriptor
from a dictionary. Reconstructing a StreamType from a single dict is
ambiguous because the type may contain multiple DataProps objects.
Use DataProps.from_dict on each dictionary produced by to_list_of_dicts
and then assemble the StreamType manually.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
Always, because this method is not supported on |
Source code in unaiverse/streams/dataprops.py
clone
¶
clone() -> StreamType
Return a deep copy of this StreamType with independently cloned descriptors.
Creates a new StreamType with an empty props list and then appends a
DataProps.clone of every descriptor in this instance. The cloned object
shares no mutable state with the original, including transform objects, which
are preserved via their __original_* references inside DataProps.
Returns:
| Type | Description |
|---|---|
StreamType
|
A new |
StreamType
|
each |
Source code in unaiverse/streams/dataprops.py
is_public
¶
Raise RuntimeError unconditionally - call is_public on a DataProps instance instead.
A StreamType may hold both a private and a public DataProps, so
returning a single boolean for the whole container is ambiguous. Access
props[0].is_public() or props[1].is_public() directly when the
public/private distinction matters.
Raises:
| Type | Description |
|---|---|
RuntimeError
|
Always, because this method is not supported on |
Source code in unaiverse/streams/dataprops.py
DataProps
¶
DataProps(name: str = 'unk', group: str = 'none', data_type: str = 'text', data_desc: str = 'unk', tensor_shape: tuple[int | None, ...] | None = None, tensor_labels: list[str] | str | None = None, tensor_dtype: dtype | str | None = None, tensor_labeling_rule: str = 'max', stream_to_proc_transforms: Callable[..., Any] | PreTrainedTokenizerBase | str | dict | tuple[dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None, dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None] | None = None, proc_to_stream_transforms: Callable[..., Any] | PreTrainedTokenizerBase | str | list | None = None, delta: float = -1, pubsub: bool = False, public: bool = False)
Descriptor for a single UNaIVERSE data stream, carrying type, shape, labels, and transforms.
DataProps captures everything the framework needs to know about a stream in order
to validate, transform, and route its data. It supports four concrete data types:
"tensor"- a fixed or partially-variable-shapedtorch.Tensor."img"- aPIL.Image.Imagethat may optionally be converted to a tensor."text"- a Pythonstrthat may optionally be tokenized."file"- a binaryFileContainer(e.g. a PDF or Protobuf payload)."all"- a wildcard type that accepts any data format without validation.
Stream-to-processor and processor-to-stream transforms can be plain callables,
HuggingFace PreTrainedTokenizerBase instances, AutoTokenizer model ID strings
(prefixed with "AutoTokenizer:"), or vocabulary dictionaries. For tensor streams,
integer tensor labels can be attached via TensorLabels.
Attributes:
| Name | Type | Description |
|---|---|---|
VALID_DATA_TYPES |
Tuple of accepted |
Examples:
Create a text stream with a HuggingFace tokenizer for encoding:
>>> from unaiverse.streams.dataprops import DataProps
>>> dp = DataProps(name="input_text", group="nlp", data_type="text",
... data_desc="Raw tokenized sentence",
... stream_to_proc_transforms="AutoTokenizer:bert-base-uncased")
Create a flat float tensor stream with class labels:
>>> import torch
>>> dp = DataProps(name="features", group="none", data_type="tensor",
... data_desc="Image classification logits",
... tensor_shape=(1, 10), tensor_dtype=torch.float32,
... tensor_labels=["cat", "dog", "bird", "fish", "car",
... "plane", "ship", "truck", "deer", "horse"])
Initialize a DataProps descriptor for a single data stream.
Validates the combination of arguments against the chosen data_type and
stores all attributes. For "tensor" streams the shape, dtype, and optional
labels are validated and stored. For "text" streams, AutoTokenizer model IDs
are resolved eagerly by downloading the tokenizer. For "img" and "file"
streams, tokenizer-related arguments are explicitly rejected.
When stream_to_proc_transforms is a non-list, non-tuple value it is
duplicated internally into a two-element list [transform, transform] where
index 0 is used for inputs and index 1 is used for targets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Human-readable identifier for this stream. Must not contain |
'unk'
|
group
|
str
|
Group name used to multiplex several streams over a single network
channel. Must not contain |
'none'
|
data_type
|
str
|
One of |
'text'
|
data_desc
|
str
|
Free-text human-readable description of the stream contents.
Defaults to |
'unk'
|
tensor_shape
|
tuple[int | None, ...] | None
|
Required for |
None
|
tensor_labels
|
list[str] | str | None
|
Optional class or token labels for the feature dimension of
a 2-D tensor. Accepted values: a |
None
|
tensor_dtype
|
dtype | str | None
|
Required for |
None
|
tensor_labeling_rule
|
str
|
Rule for mapping a tensor to a label string. Use
|
'max'
|
stream_to_proc_transforms
|
Callable[..., Any] | PreTrainedTokenizerBase | str | dict | tuple[dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None, dict | Callable[..., Any] | PreTrainedTokenizerBase | str | None] | None
|
Transform applied to incoming stream data before
the processor sees it. Accepts a callable, a
|
None
|
proc_to_stream_transforms
|
Callable[..., Any] | PreTrainedTokenizerBase | str | list | None
|
Transform applied to processor output before it
is emitted back into the stream. Accepts a callable, a
|
None
|
delta
|
float
|
Minimum time interval in seconds between consecutive stream samples.
Values <= 0 indicate real-time (as-fast-as-possible) streaming.
Defaults to |
-1
|
pubsub
|
bool
|
When |
False
|
public
|
bool
|
When |
False
|
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
AssertionError
|
If |
AssertionError
|
If |
AssertionError
|
If |
AssertionError
|
If |
AssertionError
|
If |
AssertionError
|
If |
AssertionError
|
If tensor-related arguments are non-None for non-tensor types. |
AssertionError
|
If tokenizer transforms are used with image or file streams. |
AssertionError
|
If |
Source code in unaiverse/streams/dataprops.py
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 | |
VALID_DATA_TYPES
class-attribute
instance-attribute
¶
stream_to_proc_transforms
instance-attribute
¶
proc_to_stream_transforms
instance-attribute
¶
to_dict
¶
Serialize this DataProps to a JSON-compatible dictionary.
Converts non-serializable attributes to safe primitive types: torch.dtype
values are stringified (e.g. "torch.float32"), and TensorLabels
objects are replaced by the dictionary returned from TensorLabels.to_dict.
Transform callables are intentionally excluded because they cannot be reliably
serialized; they must be re-supplied on deserialization via from_dict and
then configured separately.
Returns:
| Type | Description |
|---|---|
dict
|
A |
dict
|
|
dict
|
|
Source code in unaiverse/streams/dataprops.py
from_dict
staticmethod
¶
from_dict(d_props: dict) -> DataProps
Reconstruct a DataProps instance from a serialized dictionary.
This is the inverse of to_dict. It reads the tensor_labels sub-dict
to restore the label list and labeling rule, then passes all remaining fields
to the DataProps constructor. Transform functions are not serialized, so the
returned instance has stream_to_proc_transforms and
proc_to_stream_transforms both set to None and must be re-attached
manually when required.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d_props
|
dict
|
Dictionary previously produced by |
required |
Returns:
| Type | Description |
|---|---|
DataProps
|
A new |
Source code in unaiverse/streams/dataprops.py
clone
¶
clone() -> DataProps
Return an independent deep copy of this DataProps instance.
Constructs a new DataProps using the same arguments as the original,
including the original transform objects stored in the private
__original_stream_to_proc_transforms and
__original_proc_to_stream_transforms attributes. This avoids re-downloading
AutoTokenizer models or re-evaluating transforms. The cloned object shares no
mutable state with the original.
Returns:
| Type | Description |
|---|---|
DataProps
|
A new |
DataProps
|
with independently copied tensor shape and label information. |
Source code in unaiverse/streams/dataprops.py
get_name
¶
Return the name of this data stream.
Returns:
| Type | Description |
|---|---|
str
|
The |
get_group
¶
Return the group name of this data stream.
The group name is used by the network layer to multiplex several related streams
over a single channel. When no grouping is desired the value is "none".
Returns:
| Type | Description |
|---|---|
str
|
The |
Source code in unaiverse/streams/dataprops.py
get_description
¶
Return the human-readable description of this stream's data.
Returns:
| Type | Description |
|---|---|
str
|
The |
get_tensor_labels
¶
Return the list of tensor label strings, or None if no labels are set.
Labels correspond to the feature dimension of a 2-D tensor stream and are stored
internally in a TensorLabels wrapper. This method unwraps that wrapper and
returns the raw label list.
Returns:
| Type | Description |
|---|---|
list[str] | None
|
A |
list[str] | None
|
when the stream has no labels (e.g. for non-tensor or unlabeled tensor |
list[str] | None
|
streams). |
Source code in unaiverse/streams/dataprops.py
set_name
¶
Set the name of this data stream.
The "~" character is reserved by the networking layer for internal hash
construction and is therefore forbidden in stream names.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
New name for the stream. Must not contain |
required |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
Source code in unaiverse/streams/dataprops.py
set_group
¶
Set the group name of this data stream.
The "~" character is reserved by the networking layer and is therefore
forbidden in group names. Use "none" to indicate that the stream does not
belong to any group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group
|
str
|
New group name for the stream. Must not contain |
required |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
Source code in unaiverse/streams/dataprops.py
set_description
¶
Set the human-readable description of this stream's data.
Replaces the current data_desc value in place. The description is
included in the dictionary produced by to_dict and is intended for
display in dashboards and API documentation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
desc
|
str
|
New free-text description of the stream contents. Must be a string. |
required |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
Source code in unaiverse/streams/dataprops.py
set_public
¶
Set whether this stream descriptor targets the public network interface.
When public is True, the stream is announced on the public-facing
network channel of the UNaIVERSE peer. When False, it remains private
(peer-to-peer only). This flag is used by the networking layer when
constructing stream advertisements and routing messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
public
|
bool
|
|
required |
Source code in unaiverse/streams/dataprops.py
set_pubsub
¶
Set whether this stream is routed through a Pub/Sub topic.
When pubsub is True, data for this stream is exchanged via a
publish-subscribe topic rather than a direct peer-to-peer channel. This
affects the network hash produced by net_hash, which embeds "::ps:"
for Pub/Sub streams and "::dm:" for direct-message streams.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pubsub
|
bool
|
|
required |
Source code in unaiverse/streams/dataprops.py
set_delta
¶
Set the minimum time interval between consecutive stream samples.
The delta controls how frequently data is emitted on this stream. Values less than or equal to zero indicate real-time (as-fast-as-possible) delivery with no enforced pacing. Positive values specify the minimum number of seconds that must elapse between two successive samples.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
delta
|
float
|
Minimum inter-sample interval in seconds. Values <= 0 disable rate limiting. |
required |
Source code in unaiverse/streams/dataprops.py
set_stream_to_proc_transforms
¶
Set the transform applied to incoming stream data before the processor receives it.
Replaces both the active transform (stream_to_proc_transforms) and the
original transform reference (__original_stream_to_proc_transforms) used
by clone to preserve the correct transform when duplicating the descriptor.
Accepted transform types mirror those supported at construction time:
None- no transformation; raw data is passed to the processor unchanged.- A callable - applied directly to each sample.
- A
PreTrainedTokenizerBaseinstance - encodes text to token-ID tensors. - A
strstarting with"AutoTokenizer:"- the tokenizer is resolved lazily by the networking layer. - A
dict(str -> int) vocabulary mapping for text streams. - A 2-element
tupleorlist(input_transform, target_transform)to supply different transforms for inputs and targets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
t
|
New stream-to-processor transform, or |
required |
Source code in unaiverse/streams/dataprops.py
set_proc_to_stream_transforms
¶
Set the transform applied to processor output before it is emitted back into the stream.
Replaces both the active transform (proc_to_stream_transforms) and the
original transform reference (__original_proc_to_stream_transforms) used
by clone to preserve the correct transform when duplicating the descriptor.
Accepted transform types mirror those supported at construction time:
None- no transformation; processor output is forwarded unchanged.- A callable - applied directly to each output sample.
- A
PreTrainedTokenizerBaseinstance - decodes token-ID tensors to text. - A
strstarting with"AutoTokenizer:"- resolved lazily at postprocess time. - A
list(int -> str) reverse-vocabulary mapping for text streams.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
t
|
New processor-to-stream transform, or |
required |
Source code in unaiverse/streams/dataprops.py
is_tensor
¶
Return True if this stream carries torch.Tensor data.
Returns:
| Type | Description |
|---|---|
bool
|
|
is_img
¶
Return True if this stream carries PIL.Image.Image data.
Returns:
| Type | Description |
|---|---|
bool
|
|
is_text
¶
Return True if this stream carries plain string data.
Returns:
| Type | Description |
|---|---|
bool
|
|
is_file
¶
Return True if this stream carries binary FileContainer data.
Returns:
| Type | Description |
|---|---|
bool
|
|
is_tensor_long
¶
Return True if the tensor dtype is torch.long.
Returns False when tensor_dtype is None (i.e. for non-tensor
streams) rather than raising an exception.
Returns:
| Type | Description |
|---|---|
bool
|
|
Source code in unaiverse/streams/dataprops.py
is_tensor_float
¶
Return True if the tensor dtype is a floating-point type.
Detects any torch.float* variant (e.g. torch.float16,
torch.float32, torch.float64) by checking the string representation
of tensor_dtype. Returns False when tensor_dtype is None
(i.e. for non-tensor streams) rather than raising an exception.
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
|
Source code in unaiverse/streams/dataprops.py
is_tensor_img
¶
Return True if the tensor shape follows a standard image layout.
A tensor is considered image-shaped when it is 4-dimensional
(batch, channels, height, width) and the channel dimension is either
1 (grayscale) or 3 (RGB). Returns False when tensor_shape
is None (i.e. for non-tensor streams) rather than raising an exception.
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
|
Source code in unaiverse/streams/dataprops.py
is_tensor_token_ids
¶
Return True if the tensor represents a sequence of token IDs.
A tensor is considered a token-ID sequence when its dtype is torch.long
and its shape is 2-D (batch, seq_len) where seq_len >= 1 or is
None (variable-length). Returns False when tensor_shape is
None (i.e. for non-tensor streams) rather than raising an exception.
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
two dimensions, and |
bool
|
|
Source code in unaiverse/streams/dataprops.py
is_tensor_target_id
¶
Return True if the tensor represents a single scalar target ID.
A tensor is considered a target ID when its dtype is torch.long and its
shape is 1-D (i.e. len(tensor_shape) == 1). This is the typical layout
for a class-label target vector without an explicit batch dimension.
Returns False when tensor_shape is None (i.e. for non-tensor
streams) rather than raising an exception.
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
exactly one dimension; |
Source code in unaiverse/streams/dataprops.py
is_all
¶
Return True if this stream uses the wildcard "all" data type.
Streams of type "all" bypass data-type validation and accept any Python
object in check_and_preprocess and check_and_postprocess. This is
useful for pass-through or debugging streams where the exact data type is
unknown or variable.
Returns:
| Type | Description |
|---|---|
bool
|
|
Source code in unaiverse/streams/dataprops.py
net_hash
¶
Return the network-level routing hash for this stream.
Delegates to build_net_hash using the stream's pubsub flag and the
result of name_or_group. The hash embeds whether the channel is a
publish-subscribe topic ("::ps:") or a direct-message channel
("::dm:") so the networking layer can route messages without inspecting
the full descriptor.
The inverse operations are available via peer_id_from_net_hash and
name_or_group_from_net_hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
The local peer's ID, used as the leading component of the hash. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A string of the form |
str
|
streams or |
Examples:
>>> dp = DataProps(name="sensor", group="none", data_type="text", pubsub=False)
>>> dp.net_hash("peer123")
'peer123::dm:sensor'
Source code in unaiverse/streams/dataprops.py
user_hash
¶
Return the user-facing identifier hash for this stream.
Delegates to build_user_hash using the stream's name. Unlike
net_hash, the user hash always uses the stream name (never the group)
and does not encode the Pub/Sub flag. It is intended for display and
subscription lookups visible to end users.
The inverse operations are available via peer_id_from_user_hash and
name_from_user_hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
The local peer's ID, used as the leading component of the hash. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A string of the form |
Examples:
>>> dp = DataProps(name="sensor", group="none", data_type="text")
>>> dp.user_hash("peer123")
'peer123:sensor'
Source code in unaiverse/streams/dataprops.py
peer_id_from_net_hash
staticmethod
¶
Extract the peer ID from a network hash string.
Network hashes have the form "<peer_id>::ps:<name>" or
"<peer_id>::dm:<name>". This method splits on "::" and returns the
first component, which is the originating peer ID.
See also net_hash and name_or_group_from_net_hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
net_hash
|
str
|
A network hash previously produced by |
required |
Returns:
| Type | Description |
|---|---|
str
|
The peer ID string embedded at the beginning of the hash. |
Source code in unaiverse/streams/dataprops.py
peer_id_from_user_hash
staticmethod
¶
Extract the peer ID from a user hash string.
User hashes have the form "<peer_id>:<stream_name>". This method
splits on the last ":" (using rsplit with maxsplit=1) and
returns the first component, which is the originating peer ID. Using the
rightmost separator correctly handles peer IDs that themselves contain
colons.
See also user_hash and name_from_user_hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_hash
|
str
|
A user hash previously produced by |
required |
Returns:
| Type | Description |
|---|---|
str
|
The peer ID string embedded at the beginning of the hash. |
Source code in unaiverse/streams/dataprops.py
name_or_group_from_net_hash
staticmethod
¶
A static method to extract the name or group from a network hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
net_hash
|
str
|
The network hash string. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A string representing the name or group. |
Source code in unaiverse/streams/dataprops.py
name_from_user_hash
staticmethod
¶
A static method to extract the name from a user hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
user_hash
|
str
|
The user hash string. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A string representing the name or group. |
Source code in unaiverse/streams/dataprops.py
is_pubsub_from_net_hash
staticmethod
¶
A static method to check if a network hash belongs to a Pub/Sub stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
net_hash
|
str
|
The network hash string. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the hash is for a Pub/Sub stream, False otherwise. |
Source code in unaiverse/streams/dataprops.py
name_or_group
¶
Retrieves the group name if it's set, otherwise defaults to the stream name.
Returns:
| Type | Description |
|---|---|
str
|
A string representing the name or group. |
Source code in unaiverse/streams/dataprops.py
build_net_hash
staticmethod
¶
A static method to construct a complete network hash from a prefix, Pub/Sub status, and name/group.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
The peer ID prefix. |
required |
pubsub
|
bool
|
The Pub/Sub status. |
required |
name_or_group
|
str
|
The name or group of the stream. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The constructed network hash string. |
Source code in unaiverse/streams/dataprops.py
build_user_hash
staticmethod
¶
A static method to construct a complete user hash from a prefix and name.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
The peer ID prefix. |
required |
name
|
str
|
The name of the stream. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The constructed user hash string. |
Source code in unaiverse/streams/dataprops.py
user_hash_from_net_hash
staticmethod
¶
Generates a unique user hash for the stream using a provided network hash
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
net_hash
|
str
|
The network hash. |
required |
name
|
str
|
Stream name. |
required |
Returns:
| Type | Description |
|---|---|
str
|
A string representing the user hash. |
Source code in unaiverse/streams/dataprops.py
normalize_net_hash
staticmethod
¶
A static method that cleans up or normalizes a network hash string to a canonical format, particularly for direct messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
not_normalized_net_hash
|
str
|
The network hash to normalize. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The normalized network hash string. |
Source code in unaiverse/streams/dataprops.py
is_pubsub
¶
Checks if the stream is set to use Pub/Sub.
Returns:
| Type | Description |
|---|---|
bool
|
True if it's a Pub/Sub stream, False otherwise. |
is_public
¶
Checks if the stream is set to be public.
Returns:
| Type | Description |
|---|---|
bool
|
True if it's a public stream, False otherwise. |
set_tensor_labels_from_auto_tokenizer
¶
Initializes and sets the tensor labels by fetching the vocabulary from a Hugging Face AutoTokenizer
model ID.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model_id
|
str
|
The ID of the tokenizer model. |
required |
Source code in unaiverse/streams/dataprops.py
set_tensor_labels
¶
Sets the labels for the data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
labels
|
list[str] or None
|
List of labels to associate with the data. |
required |
labeling_rule
|
str
|
The labeling rule for the labels. |
'max'
|
Returns:
| Type | Description |
|---|---|
|
None |
Source code in unaiverse/streams/dataprops.py
adapt_tensor_to_tensor_labels
¶
Interleaves data in function of its corresponding labels and the current super-set labels.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Tensor
|
The data tensor to interleave. |
required |
Returns:
| Type | Description |
|---|---|
Tensor
|
torch.Tensor: The interleaved data tensor. |
Source code in unaiverse/streams/dataprops.py
clear_label_adaptation
¶
Removes the padding and returns the original data from an adapted tensor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Tensor
|
The adapted tensor. |
required |
Returns:
| Type | Description |
|---|---|
|
The original, un-padded tensor. |
Source code in unaiverse/streams/dataprops.py
is_flat_tensor_with_labels
¶
Checks if the tensor is a 2D array and has labels, which is a common structure for general feature data.
Returns:
| Type | Description |
|---|---|
bool
|
True if it is, False otherwise. |
Source code in unaiverse/streams/dataprops.py
has_tensor_labels
¶
Checks if any tensor labels are associated with the stream.
Returns:
| Type | Description |
|---|---|
bool
|
True if labels exist, False otherwise. |
Source code in unaiverse/streams/dataprops.py
to_text
¶
Converts the tensor data into a text-based representation exploiting the given labels and the labeling rule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Tensor or Image or str
|
The data to convert into text (if a string, then pass-through). |
required |
ignore_raw_tensors
|
bool
|
Default False. If True, only tensor with labels will be considered. |
False
|
Returns:
| Type | Description |
|---|---|
|
str or None: The corresponding text representation of the data. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the data type is not supported for conversion. |
Source code in unaiverse/streams/dataprops.py
1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 | |
check_and_preprocess
¶
check_and_preprocess(data: str | Image | Tensor | None, allow_class_ids: bool = False, targets: bool = False, device: device = device('cpu'))
Prepares incoming data for a processor by validating its type and applying necessary transformations.
It handles different data types, including tensors, text (strings), and images, raising ValueError if
the data type is unexpected or incompatible with the stream's properties. For text and images, it can apply a
pre-configured transformation (like a tokenizer or a standard image transform) to convert the data into a
tensor format suitable for processing. For tensors, it performs validation on shape and data type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
str | Image | Tensor | None
|
The data sample to check and preprocess. |
required |
allow_class_ids
|
bool
|
A boolean to allow single-element long tensors, typically for class IDs. |
False
|
targets
|
bool
|
A boolean to indicate if the data is a target (used to select the correct transformation in a dual-transform setup). |
False
|
device
|
device
|
The PyTorch device (e.g., 'cpu' or 'cuda') to which the tensor should be moved. |
device('cpu')
|
Returns:
| Type | Description |
|---|---|
|
The preprocessed data, typically a tensor on the specified device. |
Source code in unaiverse/streams/dataprops.py
1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 | |
check_and_postprocess
¶
Takes a processor's output and validates it before converting it back into a stream-compatible format.
It handles torch.Tensor data, applying a proc_to_stream_transform (if one exists) to convert the tensor
into an appropriate format for the stream, such as a string for text or a PIL Image for images. It performs
a final check on the data's format (shape, dtype, etc.) to ensure consistency with the stream's properties.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
str | Image | Tensor | None
|
The output from the processor, typically a tensor. |
required |
Returns:
| Type | Description |
|---|---|
|
The post-processed data, in a stream-compatible format (e.g., a string, image, or CPU tensor). |
Source code in unaiverse/streams/dataprops.py
1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 | |
is_compatible
¶
is_compatible(props_to_compare: DataProps) -> bool
Checks if the current DataProps instance is compatible with another DataProps instance. Checks include data type, shape, and labels.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
props_to_compare
|
DataProps
|
The DataProps instance to check compatibility with. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bool |
bool
|
True if compatible, False otherwise. |
Source code in unaiverse/streams/dataprops.py
TensorLabels
¶
TensorLabels(data_props: DataProps, labels: list[str] | None, labeling_rule: str = 'max')
A class to manage labels associated with data and perform operations on them.
Attributes:
| Name | Type | Description |
|---|---|---|
VALID_LABELING_RULES |
tuple
|
Tuple of valid labeling rules ('max', 'geq'). |
Initializes the TensorLabels instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_props
|
DataProps
|
The DataProps instance that owns these labels. |
required |
labels
|
list[str] or None
|
List of labels. |
required |
labeling_rule
|
str
|
The rule for labeling (either 'max' or 'geqX', where X is a number). |
'max'
|
Returns:
| Type | Description |
|---|---|
|
None |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If the labels or labeling_rule are invalid. |
Source code in unaiverse/streams/dataprops.py
to_dict
¶
Serializes the TensorLabels instance into a dictionary, which includes the list of labels and the original
labeling rule.
Returns:
| Type | Description |
|---|---|
dict
|
A dictionary containing the labels and the original labeling rule. |
Source code in unaiverse/streams/dataprops.py
clear_indices
¶
Resets the internal indices attribute to None. This effectively clears any previous label adaptation
that was performed and allows the object to revert to its original, non-interleaved state.
Source code in unaiverse/streams/dataprops.py
interleave_with
¶
Interleaves the current labels with a super-set of labels, determining how to index them.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
superset_labels
|
list[str]
|
The super-set of labels to interleave with. |
required |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If the super-set of labels is not compatible. |