unaiverse.modules.utils
What this module does 🔴
Shared infrastructure for the modules package: the ModuleWrapper base class that all networks extend, processor/module helpers (HumanModule, LoggerModule, MultiIdentity), agent processor validation, image/transform factories, training utilities, and an API gateway server/scheduler with Featherless integration.
utils
¶
█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████
░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█
░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░
░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █
░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████
░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░
A Collectionless AI Project (https://collectionless.ai)
Registration/Login: https://unaiverse.io
Code Repositories: https://github.com/collectionlessai/
Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi
OPENAI_NATIVE_SAMPLER_PARAMS
module-attribute
¶
OPENAI_NATIVE_SAMPLER_PARAMS: frozenset[str] = frozenset({'max_tokens', 'temperature', 'top_p', 'frequency_penalty', 'presence_penalty', 'stop', 'seed', 'n', 'logit_bias', 'logprobs', 'top_logprobs', 'response_format'})
MultiIdentity
¶
Bases: Module
Identity module that passes one or more inputs through unchanged.
Used by the framework as the default processor when no explicit module is supplied. It accepts any number of positional arguments and returns either the single input unchanged (when called with exactly one argument) or the full argument tuple (when called with more than one argument).
Examples:
>>> m = MultiIdentity()
>>> import torch
>>> t = torch.tensor([1.0, 2.0])
>>> m(t) is t # single input: returned as-is
True
>>> out = m(t, t) # multiple inputs: returned as a tuple
>>> len(out)
2
Initialize the MultiIdentity module with no learnable parameters.
Source code in unaiverse/modules/utils.py
forward
¶
Return the single input unchanged, or all positional inputs as a tuple.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
object
|
One or more input tensors or arbitrary objects to pass through. |
()
|
**kwargs
|
object
|
Accepted but ignored; present for interface compatibility. |
{}
|
Returns:
| Type | Description |
|---|---|
|
The single element |
|
|
provided, otherwise the full |
Source code in unaiverse/modules/utils.py
HumanModule
¶
Bases: Module
Placeholder module representing a human-in-the-loop processor.
When an agent is configured with proc="human", the framework wraps this module
in a ModuleWrapper and uses it as the processor. It has no learnable parameters
and simply echoes its text and image inputs back unchanged, acting as a neutral
stand-in until the real human interaction layer overrides the output at a higher level.
See has_human_processor to check whether an agent uses this module.
Initialize the HumanModule with no learnable parameters.
Source code in unaiverse/modules/utils.py
forward
¶
forward(text: str | None = None, img: Image | None = None, whatever: object | None = None) -> tuple[str | None, Image | None]
Return the text and image inputs unchanged.
Acts as a transparent pass-through. The whatever argument is accepted for
interface compatibility but is not forwarded to the output.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str | None
|
Optional text string to pass through. Defaults to |
None
|
img
|
Image | None
|
Optional PIL image to pass through. Defaults to |
None
|
whatever
|
object | None
|
Optional additional input; accepted but ignored. Defaults to |
None
|
Returns:
| Type | Description |
|---|---|
tuple[str | None, Image | None]
|
A two-element tuple |
Source code in unaiverse/modules/utils.py
LoggerModule
¶
Bases: Module
A processor module that logs inputs and outputs to a file and returns cycling dummy responses.
LoggerModule is intended for debugging and development: it records every
forward call (the received text, the image size if present, and the generated
response) to an optional log file, while producing a deterministic but varied stream
of dummy text outputs by cycling through an internally shuffled vocabulary of words.
No image is ever returned.
The file handler is created lazily on the first forward call, so the log file is
only opened when the module is actually invoked.
Examples:
>>> import tempfile, os
>>> with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f:
... log_path = f.name
>>> m = LoggerModule(log_file=log_path)
>>> response_text, response_img = m("hello")
>>> response_img is None
True
>>> os.unlink(log_path)
Initialize the LoggerModule.
Sets up the internal cycling vocabulary (randomly shuffled) and, if a log file
path is given, prepares the logging infrastructure (the file handler is opened
lazily on the first forward call).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
log_file
|
str | None
|
Path to the log file that records inputs and outputs. The parent
directory is created automatically if it does not exist. If |
None
|
Source code in unaiverse/modules/utils.py
forward
¶
Log the received inputs and return a cycling dummy text response.
On each call the received text and the size of img (or None if no
image was provided) are written to the log file (if configured). The method then
returns the next word from the internal cycling vocabulary as the response text,
advancing the internal index modulo the vocabulary size. The image output is
always None.
If the log file path was provided at construction time but the file handler has not yet been opened, it is initialized during this call (the parent directory is created if necessary).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
The text input to log and acknowledge. |
required |
img
|
Image | None
|
An optional PIL image whose size ( |
None
|
Returns:
| Type | Description |
|---|---|
str | None
|
A two-element tuple |
Image | None
|
string of the form |
tuple[str | None, Image | None]
|
cycle. |
Source code in unaiverse/modules/utils.py
ModuleWrapper
¶
ModuleWrapper(module: Module | Callable, proc_inputs: list[StreamType] | None = None, proc_outputs: list[StreamType] | None = None, proc_opts: dict | None = None, seed: int = -1, agent: object = None, device=None)
Bases: Module
Wrap a callable or torch.nn.Module with stream-based pre/post-processing and optional learning.
ModuleWrapper is the core adapter between the UNaIVERSE streaming pipeline and
a bare PyTorch module or any callable. It holds the StreamType descriptors for
each input and output slot, applies check_and_preprocess on incoming data and
check_and_postprocess on outgoing data in forward, and can run a full
supervised or unsupervised learning step (optimizer + backward pass) via
learn_backward.
The framework-controlled keyword arguments first and last are intercepted
inside forward and stripped before the wrapped module is called, unless the
module's own signature declares them.
The device is resolved in the following priority order: the device argument
(if given), then the PROC_DEVICE environment variable, and finally "cpu".
Attributes:
| Name | Type | Description |
|---|---|---|
device |
The |
|
module |
Callable | None
|
The wrapped callable or |
proc_inputs |
List of |
|
proc_outputs |
List of |
|
proc_opts |
Dictionary containing at least |
|
proc_optional_inputs |
Per-input default info derived from the module's signature. |
|
agent |
The agent that owns this wrapper. |
Examples:
>>> import torch
>>> linear = torch.nn.Linear(4, 2)
>>> from unaiverse.streams.dataprops import StreamType
>>> proc_in = [StreamType(data_type="tensor",
... tensor_shape=(None, 4),
... tensor_dtype=torch.float32,
... pubsub=False, private_only=False)]
>>> proc_out = [StreamType(data_type="tensor",
... tensor_shape=(None, 2),
... tensor_dtype=torch.float32,
... pubsub=False, private_only=False)]
>>> wrapper = ModuleWrapper(module=linear, proc_inputs=proc_in, proc_outputs=proc_out)
>>> x = torch.randn(1, 4)
>>> out = wrapper(x)
>>> out[0].shape
torch.Size([1, 2])
Initialize the ModuleWrapper.
Resolves the target device, wraps the given module (moving it to the device if
it is a torch.nn.Module), inspects the module's forward signature to
determine which framework kwargs (first, last) it accepts and to cache
per-argument default information, then sets the stream descriptors and
optimization options.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
module
|
Module | Callable
|
The |
required |
proc_inputs
|
list[StreamType] | None
|
List of |
None
|
proc_outputs
|
list[StreamType] | None
|
List of |
None
|
proc_opts
|
dict | None
|
Dictionary with at least the keys |
None
|
seed
|
int
|
Random seed applied via |
-1
|
agent
|
object
|
The agent that owns this processor; stored as |
None
|
device
|
The |
None
|
Raises:
| Type | Description |
|---|---|
GenException
|
If the module does not expose a callable |
Source code in unaiverse/modules/utils.py
guess_device
¶
Resolve and set the compute device for the wrapped module.
Determines self.device using the following priority order: the device
argument (if not None), then the PROC_DEVICE environment variable, and
finally "cpu" as the default fallback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
A |
required |
Source code in unaiverse/modules/utils.py
set_agent
¶
Set the agent that owns this processor wrapper.
Stores a reference to the owning agent so that downstream components (such as
learn_backward and AgentProcessorChecker) can reach back to the agent
context when needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
The agent object that owns this |
required |
Source code in unaiverse/modules/utils.py
set_proc_inputs_and_outputs
¶
Update the input and/or output stream descriptors.
When proc_inputs is not None, replaces self.proc_inputs with the
provided list and rebuilds proc_optional_inputs so it stays aligned with the
new input count (see __refresh_proc_optional_inputs). When proc_outputs
is not None, replaces self.proc_outputs. Either argument may be None
to leave the corresponding attribute unchanged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
proc_inputs
|
A list of |
required | |
proc_outputs
|
A list of |
required |
Source code in unaiverse/modules/utils.py
set_proc_opts
¶
Update the optimization options dictionary.
Replaces self.proc_opts with the provided dictionary. The call is a no-op
when proc_opts is None, preserving any previously assigned options.
The dictionary is expected to contain at least the keys "optimizer" (a
torch.optim.Optimizer or None) and "losses" (a list of loss
callables, one per output slot). See learn_backward for how these entries
are consumed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
proc_opts
|
A dictionary of optimization options, or |
required |
Source code in unaiverse/modules/utils.py
set_module
¶
Set the wrapped module and cache its forward-signature metadata.
Inspects the callable's signature to determine whether the framework-controlled
keyword arguments first and last are declared (stored in _has_first
and _has_last respectively). The per-parameter default information is cached
in _sig_defaults so that proc_optional_inputs can be rebuilt efficiently
whenever proc_inputs changes.
If mod is a torch.nn.Module it is moved to self.device via .to().
If mod has a forward method it is inspected via that method's signature;
otherwise mod itself must be directly callable.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
mod
|
Callable | Module
|
A |
required |
Raises:
| Type | Description |
|---|---|
GenException
|
If |
Source code in unaiverse/modules/utils.py
forward
¶
Preprocess inputs, run the wrapped module, and post-process the outputs.
Each positional argument is preprocessed by the corresponding StreamType
entry in proc_inputs (via check_and_preprocess), then the wrapped module
is called with the preprocessed data. The raw outputs are stored in
_last_raw_outputs (used later by learn_backward) and then passed through
the corresponding proc_outputs post-processing step
(check_and_postprocess) before being returned.
The framework-controlled keyword arguments "first" and "last" are
intercepted and stripped unless the wrapped module's own signature declares them
(see _has_first and _has_last).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*args
|
Positional inputs, one per entry in |
()
|
|
**kwargs
|
Additional keyword arguments forwarded to the wrapped module. The
keys |
{}
|
Returns:
| Type | Description |
|---|---|
tuple
|
A tuple of post-processed output values, one per entry in |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
Examples:
>>> import torch
>>> from unaiverse.streams.dataprops import StreamType
>>> linear = torch.nn.Linear(4, 2)
>>> proc_in = [StreamType(data_type="tensor", tensor_shape=(None, 4),
... tensor_dtype=torch.float32,
... pubsub=False, private_only=False)]
>>> proc_out = [StreamType(data_type="tensor", tensor_shape=(None, 2),
... tensor_dtype=torch.float32,
... pubsub=False, private_only=False)]
>>> wrapper = ModuleWrapper(module=linear, proc_inputs=proc_in, proc_outputs=proc_out)
>>> x = torch.randn(1, 4)
>>> out = wrapper(x)
>>> out[0].shape
torch.Size([1, 2])
Source code in unaiverse/modules/utils.py
learn_backward
¶
Run a supervised or unsupervised backward pass and apply an optimizer step.
Uses the raw outputs stored by the most recent forward call
(_last_raw_outputs) together with the provided targets to compute the
per-slot losses defined in proc_opts["losses"], sums them into a scalar,
calls loss.backward(), and steps the optimizer from
proc_opts["optimizer"].
When targets is None or all entries are None, the method runs each
loss function with only the corresponding raw output as argument (unsupervised
mode). When at least one target is provided, each slot whose target is not
None is treated as supervised (the target is preprocessed via
proc_outputs[i].check_and_preprocess); slots with a None target
contribute a zero loss to the sum.
If the wrapped module has a y attribute and targets is not None,
the first target tensor is written to module.y after the optimizer step
(autoregressive-teacher-forcing support).
The method is a no-op and returns [] when proc_opts is absent or does
not contain both "losses" and "optimizer" keys, or when proc_outputs
is None.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
targets
|
list | None
|
A list of target tensors aligned with |
None
|
Returns:
| Type | Description |
|---|---|
list
|
A list of per-output |
list
|
if a backward pass was performed, or an empty list when the method is a |
list
|
no-op (no optimizer configured). |
Source code in unaiverse/modules/utils.py
953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 | |
AgentProcessorChecker
¶
Validate and normalize the processor-related attributes of an agent-like container.
AgentProcessorChecker is a one-shot validation object: constructing it
immediately validates the processor configuration of the supplied agent, performs
all necessary type conversions and heuristic inference, and writes the normalized
values back onto the agent. No state is kept after construction.
The checker enforces the following invariants after it completes:
agent.procis a fully configuredModuleWrapper(neverNone, never a rawtorch.nn.Module).agent.proc_inputsandagent.proc_outputsare non-empty lists ofStreamTypeobjects with canonical names ("proc_input_N"/"proc_output_N").agent.proc_optscontains both"optimizer"and"losses"keys.agent.proc_optional_inputsis aligned withagent.proc_inputs.
The special string value "human" for proc is recognized and converted into
a ModuleWrapper around a HumanModule with text and image stream types.
When proc is None, a ModuleWrapper around MultiIdentity is created.
See ModuleWrapper, HumanModule, and MultiIdentity for details on the
wrapped modules.
Validate and normalize the processor attributes of the given agent.
Checks that agent exposes the required processor attributes, performs type
validation, converts string StreamType shorthands to proper StreamType
objects, auto-wraps plain torch.nn.Module or callable processors in
ModuleWrapper, and heuristically infers missing proc_inputs,
proc_outputs, and proc_opts. The validated and completed values are
written back onto agent so callers always receive a fully configured
processor on agent.proc.
The heuristic inference pipeline runs in this order:
__guess_proc_inputs- infers inputStreamTypefrom the first layer of the wrappedtorch.nn.Module(Conv2d,Linear,Conv1d, orEmbedding).__fix_proc_inputs- assigns canonical names to inputs.__guess_proc_outputs- runs a dummy forward pass to infer output shapes.__fix_proc_outputs- assigns canonical names to outputs.__guess_proc_opts- fills in missing"optimizer"and"losses"entries.__fix_proc_opts- normalizes the options dict and replacescross_entropywithtarget_shape_fixed_cross_entropy.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
object
|
Any object that exposes the attributes |
required |
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
GenException
|
If |
Source code in unaiverse/modules/utils.py
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 | |
APIGatewayServer
¶
APIGatewayServer(call: Callable = featherless_call)
A shared LLM API gateway that owns the Featherless connection and runs the scheduler.
APIGatewayServer is a single-process asyncio TCP server that acts as a
multiplexing proxy between multiple agent processes and the Featherless AI
chat-completions API. It enforces a shared concurrency budget across all clients
using the APIGatewayScheduler (strict round-robin, unit-based admission).
The server speaks a newline-delimited JSON protocol over TCP (HOST:PORT,
localhost by default). The message contract is:
-
Client registration (one persistent connection per client process, kept alive for the process lifetime)::
client -> server: {"op": "hello"}
-
Generation request (sent on a separate short-lived connection)::
client -> server: {"op": "generate", "process_id": "
", "sys_prompt": " ", "prompt": " ", "cost": , "model": " ", "sampler": {...}} server -> client: {"ok": true, "result": " "} | {"ok": false, "error": " "}
The open registration socket is the client's liveness token. When a client process dies for any reason the OS closes the socket, the server detects the drop, and when the last registered client disconnects the server shuts itself down cleanly.
Startup arbitration is handled via the TCP bind: if the port is already taken,
asyncio.start_server raises and the caller knows another server is already
running. A configurable grace period (GW_GRACE env var, default 10 s) causes
the server to exit if no client registers shortly after boot.
Class attributes (all configurable via environment variables):
HOST: Listening address. Defaults to GW_HOST or "127.0.0.1".
PORT: Listening port. Defaults to GW_PORT or 11321.
TOTAL_UNITS: Total concurrency budget shared across all clients. Defaults to
GW_UNITS or 8.
VALID_COSTS: Tuple of accepted per-query unit costs. Defaults to
GW_VALID_COSTS or (1, 2, 4).
LOCKFILE: Path to an advisory lock file. Defaults to GW_LOCKFILE or a
temp-directory path llm_api_gateway.lock.
Examples:
Starting the gateway server programmatically (normally done via
serve_api_gateway)::
>>> import asyncio
>>> from unaiverse.modules.utils import APIGatewayServer, featherless_call
>>> async def main():
... server = APIGatewayServer(call=featherless_call)
... await server.run()
>>> # asyncio.run(main()) # Blocks until all clients disconnect.
Initialize the gateway server with a scheduler and an empty client registry.
Creates the APIGatewayScheduler, stores the model-call callable, initializes
the set of live registration writers (live), and creates the asyncio shutdown
event that run waits on.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
call
|
Callable
|
An asynchronous callable with the signature
|
featherless_call
|
Source code in unaiverse/modules/utils.py
VALID_COSTS
class-attribute
instance-attribute
¶
LOCKFILE
class-attribute
instance-attribute
¶
handle
async
¶
Serve a single client TCP connection until the peer closes it (async).
Reads newline-delimited JSON messages in a loop. Two "op" values are
recognized:
"hello": marks this connection as the client's liveness registration. The writer is added toself.live. When the connection drops, the writer is removed fromself.liveand, if the set is now empty,self.shutdownis set to trigger server teardown."generate": extractsprocess_id,sys_prompt,prompt,cost,model, andsamplerfrom the message, validates the cost againstVALID_COSTS, submits the query to the scheduler viaself.sched.submit, and writes the result back as a JSON line{"ok": true, "result": "..."}or{"ok": false, "error": "..."}.
Malformed JSON lines are silently skipped. ConnectionResetError and
asyncio.IncompleteReadError are swallowed so that abrupt client
disconnections do not propagate as unhandled exceptions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reader
|
StreamReader
|
The asyncio stream reader for the client connection. |
required |
writer
|
StreamWriter
|
The asyncio stream writer for the client connection. |
required |
Source code in unaiverse/modules/utils.py
1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 | |
run
async
¶
Bind the listening socket, start the dispatcher, and run until shutdown is requested.
Binding also arbitrates the startup race: if the port is already taken, start_server raises and this process
is not the server. A grace period guards against an orphan server whose starter died before registering.
Source code in unaiverse/modules/utils.py
APIGatewayScheduler
¶
Strict round-robin scheduler with a shared unit-budget admission, used by the gateway server.
The scheduler serves one query per process per turn (round-robin), with equal importance regardless of how many units a query costs: the cost only affects the budget admission check, never the turn order. Within a process, queries are served in submission order (FIFO).
Two design decisions are worth highlighting:
- Strict hold, never skip. When it is a process's turn but its next query does not fit the remaining budget, the
dispatcher STALLS and holds the free units until that query can run; it does not skip ahead to another process.
This is intentional (chosen over skip-to-fill), accepting the throughput cost.
- At most one held "head" per process. The dispatcher pops a single query and keeps it in the held dict instead
of peeking the private asyncio.Queue deque (which caused an IndexError race across await boundaries).
A single asyncio.Condition drives every wakeup (new work submitted, or units freed by a finished job).
Create a Scheduler.
Source code in unaiverse/modules/utils.py
submit
async
¶
submit(pid: str, sys_prompt: str, prompt: str, cost: int, model: str | None, sampler: dict | None = None) -> str
Enqueue a query for a process and wait until the dispatcher runs it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pid
|
str
|
The process ID the query belongs to (round-robin fairness is per process). |
required |
sys_prompt
|
str
|
The system prompt string (can be ""). |
required |
prompt
|
str
|
The prompt string to send to the model. |
required |
cost
|
int
|
The query's unit cost (one of VALID_COSTS). |
required |
model
|
str | None
|
The model identifier to use (None lets the call fall back to its own default). |
required |
sampler
|
dict | None
|
Sampling parameters to forward to the model call (None means use API defaults). |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The model's textual result. |
Source code in unaiverse/modules/utils.py
dispatcher
async
¶
Forever serve queued queries in strict round-robin order under the unit budget.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
call
|
Callable
|
The asynchronous function performing the actual model call (passed on to each job). |
required |
Source code in unaiverse/modules/utils.py
has_human_processor
¶
Return True if the given agent's processor wraps a HumanModule.
Inspects the proc attribute of the agent, checks that it is not None,
and tests whether its inner module is an instance of HumanModule.
This is the canonical check used by the framework to decide whether a human
is expected to supply the agent's output interactively.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
object
|
The agent object to inspect. Must expose a |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
bool
|
|
Raises:
| Type | Description |
|---|---|
AssertionError
|
If |
Source code in unaiverse/modules/utils.py
transforms_factory
¶
transforms_factory(trans_type: str, add_batch_dim: bool = True, return_inverse: bool = False) -> Compose | None
Build and return a torchvision transform pipeline for the requested image type.
Each trans_type identifier maps to a fixed, opinionated pipeline:
"rgb<N>"/"gray<N>"- convert color mode, resize and center-crop to<N>pixels, convert to tensor, then apply ImageNet (or grayscale) normalization.<N>must be a positive integer suffix (e.g."rgb224")."rgb-no_norm<N>"/"gray-no_norm<N>"- same as above but without normalization; the output tensor isuint8in[0, 255]."rgb"/"gray"- no resize or crop; normalize with ImageNet statistics."rgb-no_norm"/"gray-no_norm"- no resize, crop, or normalization."gray_mnist"- MNIST-specific pipeline: convert to grayscale, resize and center-crop to 28x28, convert to tensor, normalize with MNIST mean (0.1307) and standard deviation (0.3081).
When add_batch_dim=True, a batch dimension is added by unsqueeze(0) at the
end of the forward pipeline and removed by squeeze(0) at the start of the inverse
pipeline, so the output is a 4-D tensor ready for direct model inference.
When return_inverse=True, the inverse (de-normalization and to-PIL) pipeline is
returned instead of the forward one, which is useful for visualization.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
trans_type
|
str
|
A string identifying the desired transform pipeline. Supported base
names are |
required |
add_batch_dim
|
bool
|
If |
True
|
return_inverse
|
bool
|
If |
False
|
Returns:
| Type | Description |
|---|---|
Compose | None
|
The requested |
Compose | None
|
arguments produces no pipeline (should not occur for valid inputs). |
Raises:
| Type | Description |
|---|---|
ValueError
|
If |
Examples:
>>> fwd = transforms_factory("rgb224")
>>> from PIL import Image
>>> img = Image.new("RGB", (300, 300))
>>> tensor = fwd(img) # shape: (1, 3, 224, 224)
>>> inv = transforms_factory("rgb224", return_inverse=True)
>>> recovered = inv(tensor.squeeze(0)) # PIL Image
Source code in unaiverse/modules/utils.py
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 | |
hard_tanh
¶
Return the element-wise hard tanh of the input tensor.
Equivalent to torch.clamp(x, min=-1., max=1.). Unlike the smooth hyperbolic
tangent, this activation saturates exactly at -1 and 1 and is linear in between,
making it fast and gradient-friendly for bounded representations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
Tensor
|
Input tensor of any shape and dtype. |
required |
Returns:
| Type | Description |
|---|---|
Tensor
|
A tensor of the same shape as |
Tensor
|
interval |
Source code in unaiverse/modules/utils.py
target_shape_fixed_cross_entropy
¶
Compute cross-entropy loss after squeezing a spurious leading dimension from the target.
When targets arrive from the framework pipeline they may carry an extra batch-size-1
leading dimension (shape (1, N) instead of (N,)). This wrapper calls
target.squeeze(0) when target.ndim > 1 before delegating to
torch.nn.functional.cross_entropy, so the loss computation always receives a
correctly-shaped target regardless of how the batch dimension was added upstream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
output
|
Tensor
|
The model output logits tensor (shape |
required |
target
|
Tensor
|
The target class-index tensor. If it has more than one dimension, the first (leading) dimension is squeezed before computing the loss. |
required |
*args
|
Additional positional arguments forwarded to
|
()
|
|
**kwargs
|
Additional keyword arguments forwarded to
|
{}
|
Returns:
| Type | Description |
|---|---|
Tensor
|
The scalar cross-entropy loss tensor. |
Source code in unaiverse/modules/utils.py
set_seed
¶
Seed all relevant random-number generators for reproducible runs.
Sets the seed for PyTorch (torch.manual_seed), Python's built-in random
module, and NumPy (numpy.random.seed). The NumPy seed is always fixed to 0
regardless of the value of seed; this is intentional for the current version.
When seed is negative the function is a no-op.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
seed
|
int
|
The integer seed value. Pass a negative integer to skip seeding entirely (useful when reproducibility is not required). |
required |
Note
CUDA randomness is not explicitly seeded here. For fully deterministic GPU
training, also call torch.cuda.manual_seed_all and set
torch.backends.cudnn.deterministic = True in the calling code.
Source code in unaiverse/modules/utils.py
get_proc_inputs_and_proc_outputs_for_rnn
¶
get_proc_inputs_and_proc_outputs_for_rnn(u_shape: Size | tuple, du_dim: int, y_dim: int) -> tuple[list, list]
Build StreamType descriptors for the inputs and output of an RNN-style processor.
Constructs the two-input, one-output stream specification expected by a recurrent
processor: the first input is the main observation tensor of shape
(batch, *u_shape); the second input is a delta-u tensor of shape
(batch, du_dim); the single output is a tensor of shape (batch, y_dim).
All tensors use torch.float32 dtype and are non-pubsub, non-private streams.
If u_shape is a torch.Size object it is converted to a plain tuple
before building the descriptor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
u_shape
|
Size | tuple
|
Shape of the main input tensor, excluding the batch dimension (e.g.
|
required |
du_dim
|
int
|
Dimensionality of the secondary delta-u input tensor. |
required |
y_dim
|
int
|
Dimensionality of the output tensor. |
required |
Returns:
| Type | Description |
|---|---|
list
|
A two-element tuple |
list
|
list of two |
tuple[list, list]
|
|
Source code in unaiverse/modules/utils.py
get_proc_inputs_and_proc_outputs_for_image_classification
¶
get_proc_inputs_and_proc_outputs_for_image_classification(y_dim: int = -1, trans_forms=None) -> tuple[list[StreamType], list[StreamType]]
Build StreamType descriptors for the inputs and output of an image-classification processor.
Constructs the canonical single-image-in, single-logit-vector-out specification for
an image classification model. The input stream is of type "img" (no fixed tensor
shape) and carries the optional stream_to_proc_transforms transform. The output
stream is a float tensor of shape (batch, y_dim).
When y_dim=-1, the output dimensionality defaults to 1000, matching the class
count of models pre-trained on ImageNet.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
y_dim
|
int
|
Number of output classes. Pass |
-1
|
trans_forms
|
A torchvision transform (or |
None
|
Returns:
| Type | Description |
|---|---|
list[StreamType]
|
A two-element tuple |
list[StreamType]
|
list containing one |
tuple[list[StreamType], list[StreamType]]
|
list containing one |
Source code in unaiverse/modules/utils.py
isinstance_fcn
¶
Return whether obj is an instance of the given type or tuple of types.
Thin wrapper around the built-in isinstance that can be passed as a
first-class callable in places where a plain isinstance call cannot be
used directly (e.g., as a function argument or in a lambda).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
obj
|
object
|
The object to test. |
required |
class_to_check
|
type | tuple
|
A single type or a tuple of types to test against. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
|
Source code in unaiverse/modules/utils.py
error_rate_mnist_test_set
¶
Evaluate and return a network's top-1 error rate on the full MNIST test set.
Downloads the MNIST test split if it is not already present at
mnist_data_save_path, then iterates over all 10,000 samples in batches of 200.
The network is temporarily switched to eval mode (network.eval()) for the
duration of the evaluation; its original training flag is restored afterwards.
All inputs are moved to the device inferred from the first parameter of network,
so the method works transparently on both CPU and CUDA models.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
network
|
Module
|
The |
required |
mnist_data_save_path
|
str
|
Filesystem path where the MNIST dataset is stored or will be downloaded to. |
required |
Returns:
| Type | Description |
|---|---|
float
|
The fraction of misclassified test samples as a |
float
|
|
Note
The evaluation runs inside torch.no_grad() is NOT called explicitly here;
callers that require gradient isolation should wrap the call themselves. The
network's training flag is restored to its original value after the call.
Source code in unaiverse/modules/utils.py
featherless_call
async
¶
featherless_call(sys_prompt: str, prompt: str, sampler: dict | None = None, model: str | None = None) -> str | None
Call the Featherless AI chat-completions API and return the generated text (async).
Sends a chat-completions request to the Featherless endpoint using the process-wide
shared AsyncOpenAI client (see _get_featherless_client). Sampler parameters
are split into two groups: keys present in OPENAI_NATIVE_SAMPLER_PARAMS are
forwarded as top-level fields; all other keys (e.g. top_k,
repetition_penalty, min_p) are forwarded via extra_body so that
non-standard vLLM/Featherless extensions reach the backend without triggering an SDK
validation error.
The call is retried up to three times with exponential backoff (1 s, 2 s, 4 s).
An empty or blank response is treated as a transient failure and also triggers a
retry. Diagnostic information (model, finish reason, completion token count, wall
time, tokens per second) is logged via log.user on each successful API response.
If the MODEL_ID environment variable is not set and model is None, a
critical log message is emitted but the call still proceeds with the placeholder
value "MOD". Similarly, if FEATHERLESS_API_KEY is not set, a critical
message is logged.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sys_prompt
|
str
|
The system prompt string. Pass an empty string |
required |
prompt
|
str
|
The user prompt string to send to the model. |
required |
sampler
|
dict | None
|
A dictionary of sampling parameters. Recognized keys include
|
None
|
model
|
str | None
|
The Featherless model identifier (e.g.
|
None
|
Returns:
| Type | Description |
|---|---|
str | None
|
The non-empty text of the first valid response choice, or an empty string |
str | None
|
|
str | None
|
|
str | None
|
before the empty string is returned). |
Raises:
| Type | Description |
|---|---|
Exception
|
Any exception raised by the |
Note
This function is a coroutine and must be awaited. It is the default call
argument to APIGatewayServer and serve_api_gateway.
Source code in unaiverse/modules/utils.py
1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 | |
serve_api_gateway
¶
Process entrypoint for the detached gateway server (see the APIGatewayServer class).
Spawned by the client via
python -c "from unaiverse.modules.utils import serve_api_gateway; serve_api_gateway()". We deliberately do
NOT use python -m unaiverse.modules.utils here: importing the parent package unaiverse.modules already
imports this module, so running it again via runpy would execute a second copy of it under the name
__main__ (RuntimeWarning, and two distinct APIGatewayServer classes). Calling this function avoids that.