Skip to content

🔴 unaiverse.utils.misc

What this module does 🔴

Grab-bag of utility helpers: node-address file IO, ID building, countdowns, JSON readiness checks, in-memory agent loading/packing, app dir prep, policies and a code analyzer.

misc

█████ █████ ██████ █████ █████ █████ █████ ██████████ ███████████ █████████ ██████████ ░░███ ░░███ ░░██████ ░░███ ░░███ ░░███ ░░███ ░░███░░░░░█░░███░░░░░███ ███░░░░░███░░███░░░░░█ ░███ ░███ ░███░███ ░███ ██████ ░███ ░███ ░███ ░███ █ ░ ░███ ░███ ░███ ░░░ ░███ █ ░ ░███ ░███ ░███░░███░███ ░░░░░███ ░███ ░███ ░███ ░██████ ░██████████ ░░█████████ ░██████
░███ ░███ ░███ ░░██████ ███████ ░███ ░░███ ███ ░███░░█ ░███░░░░░███ ░░░░░░░░███ ░███░░█
░███ ░███ ░███ ░░█████ ███░░███ ░███ ░░░█████░ ░███ ░ █ ░███ ░███ ███ ░███ ░███ ░ █ ░░████████ █████ ░░█████░░████████ █████ ░░███ ██████████ █████ █████░░█████████ ██████████ ░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░ ░░░░░ ░░░ ░░░░░░░░░░ ░░░░░ ░░░░░ ░░░░░░░░░ ░░░░░░░░░░ A Collectionless AI Project (https://collectionless.ai) Registration/Login: https://unaiverse.io Code Repositories: https://github.com/collectionlessai/ Main Developers: Stefano Melacci (Project Leader), Christian Di Maio, Tommaso Guidi

Silent

Silent(ignore: bool = False)

Context manager that suppresses stdout by redirecting it to /dev/null.

On entry, sys.stdout is replaced with a handle to /dev/null so that any print or other stdout writes are silently discarded. On exit, the original sys.stdout is restored regardless of whether an exception occurred.

If ignore=True is passed to the constructor, the context manager becomes a no-op and stdout is left unchanged. This is useful when callers want to toggle suppression without restructuring control flow.

Initialize the Silent context manager.

Parameters:

Name Type Description Default
ignore bool

If True, the context manager acts as a no-op and stdout is never redirected. Defaults to False.

False
Source code in unaiverse/utils/misc.py
def __init__(self, ignore: bool = False) -> None:
    """Initialize the Silent context manager.

    Args:
        ignore: If ``True``, the context manager acts as a no-op and stdout is never
            redirected. Defaults to ``False``.
    """
    self.ignore = ignore

ignore instance-attribute

ignore = ignore

FileTracker

FileTracker(folder: str, ext: str = '.json', prefix: str | None = None, skip: str | None = None)

Monitor a folder for file creation, modification, and deletion events.

FileTracker works by comparing nanosecond-precision modification timestamps (st_mtime_ns) between successive calls to something_changed. Only files that match the configured extension, optional prefix, and optional skip-name filter are considered.

The initial snapshot is taken at construction time, so the first call to something_changed reflects changes that occurred after the object was created.

Attributes:

Name Type Description
folder

pathlib.Path object for the monitored directory.

ext

Lowercase file extension that is tracked (e.g. ".json").

skip

Exact file name to exclude from tracking, or None.

prefix

File-name prefix filter, or None if no prefix filtering is applied.

last_state

Most recent snapshot dict mapping file names to st_mtime_ns values; updated on every call to something_changed.

Initialize the FileTracker and take an initial snapshot of the folder.

The snapshot records the st_mtime_ns of every file in folder that satisfies all active filters. Subsequent calls to something_changed compare against this baseline.

Parameters:

Name Type Description Default
folder str

Path to the folder to monitor.

required
ext str

File extension (including the leading dot) to watch. The comparison is case-insensitive. Defaults to ".json".

'.json'
prefix str | None

If set, only files whose names start with this string are tracked. Defaults to None (no prefix filtering).

None
skip str | None

If set, the file with this exact name is excluded from tracking. Defaults to None (no skip filtering).

None
Source code in unaiverse/utils/misc.py
def __init__(self, folder: str, ext: str = ".json",
             prefix: str | None = None, skip: str | None = None) -> None:
    """Initialize the FileTracker and take an initial snapshot of the folder.

    The snapshot records the ``st_mtime_ns`` of every file in ``folder`` that
    satisfies all active filters.  Subsequent calls to ``something_changed`` compare
    against this baseline.

    Args:
        folder: Path to the folder to monitor.
        ext: File extension (including the leading dot) to watch. The comparison is
            case-insensitive. Defaults to ``".json"``.
        prefix: If set, only files whose names start with this string are tracked.
            Defaults to ``None`` (no prefix filtering).
        skip: If set, the file with this exact name is excluded from tracking.
            Defaults to ``None`` (no skip filtering).
    """
    self.folder = Path(folder)
    self.ext = ext.lower()
    self.skip = skip
    self.prefix = prefix
    self.last_state = self.__scan_files()

folder instance-attribute

folder = Path(folder)

ext instance-attribute

ext = lower()

skip instance-attribute

skip = skip

prefix instance-attribute

prefix = prefix

last_state instance-attribute

last_state = __scan_files()

something_changed

something_changed() -> bool

Check whether any tracked file has been created, modified, or deleted since the last call.

A new folder snapshot is taken on every invocation and compared against last_state. Changes are detected in three categories: new files that did not exist in the previous snapshot (created), files whose st_mtime_ns differs (modified), and files that were present before but are gone now (deleted). last_state is updated to the new snapshot regardless of whether a change was detected.

Returns:

Type Description
bool

True if at least one file was created, modified, or deleted;

bool

False if the folder contents are unchanged.

Source code in unaiverse/utils/misc.py
def something_changed(self) -> bool:
    """Check whether any tracked file has been created, modified, or deleted since the last call.

    A new folder snapshot is taken on every invocation and compared against
    ``last_state``.  Changes are detected in three categories: new files that did not
    exist in the previous snapshot (created), files whose ``st_mtime_ns`` differs
    (modified), and files that were present before but are gone now (deleted).
    ``last_state`` is updated to the new snapshot regardless of whether a change was
    detected.

    Returns:
        ``True`` if at least one file was created, modified, or deleted;
        ``False`` if the folder contents are unchanged.
    """
    new_state = self.__scan_files()

    created = [f for f in new_state if f not in self.last_state]
    modified = [f for f in new_state if f in self.last_state and new_state[f] != self.last_state[f]]
    deleted = [f for f in self.last_state if f not in new_state]  # Track deletions

    has_changed = bool(created or modified or deleted)
    self.last_state = new_state
    return has_changed

InMemoryFinder

InMemoryFinder(py_files: dict[str, str], namespace: str)

Bases: MetaPathFinder

MetaPathFinder that resolves imports from an in-memory source dict.

Each instance owns a unique namespace (e.g. _dyn_abc12345) so that concurrent worlds loaded by different agents never collide in sys.modules.

Parameters:

Name Type Description Default
py_files dict[str, str]

Dict mapping relative paths (forward-slash, e.g. 'role1.py', 'utils/helper.py') to Python source text.

required
namespace str

Unique string used as the top-level package name.

required
Source code in unaiverse/utils/misc.py
def __init__(self, py_files: dict[str, str], namespace: str) -> None:
    self._ns = namespace

    # fullname -> (source, virtual_path, is_package)
    self._mods: dict[str, tuple[str, str, bool]] = {}

    # Discover all directories that contain at least one .py file,
    # so we can auto-generate implicit namespace packages for them.
    dirs: set[str] = set()
    for rel in py_files:
        parts = rel.split('/')
        for depth in range(1, len(parts)):
            dirs.add('/'.join(parts[:depth]))

    for rel, source in py_files.items():
        parts = rel.split('/')
        filename = parts[-1]
        sub_dirs = parts[:-1]

        if filename == '__init__.py':
            mod_name = namespace + ('.' + '.'.join(sub_dirs) if sub_dirs else '')
            is_pkg = True
        else:
            stem = filename[:-3]  # strip .py
            mod_name = namespace + '.' + '.'.join(sub_dirs + [stem])
            is_pkg = False

        v_path = f'<world:{namespace}>/{rel}'
        self._mods[mod_name] = (source, v_path, is_pkg)

    # Auto-create empty namespace packages for directories with no __init__.py
    for d in dirs:
        pkg_name = namespace + '.' + d.replace('/', '.')
        if pkg_name not in self._mods:
            self._mods[pkg_name] = ('', f'<world:{namespace}>/{d}/__init__.py', True)

find_spec

find_spec(fullname: str, path: object, target: object = None) -> ModuleSpec | None
Source code in unaiverse/utils/misc.py
def find_spec(self, fullname: str, path: object, target: object = None) -> importlib.machinery.ModuleSpec | None:

    # Root namespace package
    if fullname == self._ns:
        spec = importlib.machinery.ModuleSpec(fullname, loader=None, origin=f'<world:{self._ns}>', is_package=True)
        spec.submodule_search_locations = []
        return spec

    if fullname not in self._mods:
        return None

    source, v_path, is_pkg = self._mods[fullname]
    loader = _InMemoryLoader(source, v_path)
    spec = importlib.machinery.ModuleSpec(fullname, loader, origin=v_path, is_package=is_pkg)
    if is_pkg:
        spec.submodule_search_locations = []
    return spec

cleanup

cleanup() -> None

Remove all owned modules from "sys.modules" and unregister this finder.

Call this when the agent that was built from these sources is destroyed.

Source code in unaiverse/utils/misc.py
def cleanup(self) -> None:
    """Remove all owned modules from "sys.modules" and unregister this finder.

    Call this when the agent that was built from these sources is destroyed.
    """
    for name in list(self._mods):
        sys.modules.pop(name, None)
    sys.modules.pop(self._ns, None)
    try:
        sys.meta_path.remove(self)
    except ValueError:
        pass

PolicyFilterDelayAction

PolicyFilterDelayAction(action_names: set[str], wait: float, add_random_up_to: float = 0.0)

Policy filter that delays self-generation or learning actions by a configurable wait time.

Initialises the PolicyFilterSelfGen.

Parameters:

Name Type Description Default
action_names set[str]

The name of the actions to delay (set).

required
wait float

Minimum number of seconds to wait before allowing action_name action to proceed.

required
add_random_up_to float

Upper bound of an additional random delay in seconds added to wait (default: 0.0; negative values are clamped to 0.0).

0.0

Raises:

Type Description
GenException

If wait is not greater than zero.

Source code in unaiverse/utils/misc.py
def __init__(self, action_names: set[str], wait: float, add_random_up_to: float = 0.) -> None:
    """Initialises the PolicyFilterSelfGen.

    Args:
        action_names: The name of the actions to delay (set).
        wait: Minimum number of seconds to wait before allowing ``action_name`` action to proceed.
        add_random_up_to: Upper bound of an additional random delay in seconds
            added to ``wait`` (default: 0.0; negative values are clamped to 0.0).

    Raises:
        GenException: If ``wait`` is not greater than zero.
    """
    self.action_names = action_names
    self.wait = wait
    self.add_random_up_to = max(add_random_up_to, 0.)
    if wait <= 0.:
        raise GenException("Invalid number of seconds ('wait' must be > 0)")

action_names instance-attribute

action_names = action_names

wait instance-attribute

wait = wait

add_random_up_to instance-attribute

add_random_up_to = max(add_random_up_to, 0.0)

PolicyHumanLikeDelay

PolicyHumanLikeDelay(action_names: set[str], median_delay: float = 5.0, variability: float = 0.6)

Policy filter that delays actions with human-like variable timing.

Uses a log-normal distribution (which naturally models human reaction times) combined with momentum (bursts of fast/slow behavior) and occasional distraction spikes. All internal dynamics are derived from the three constructor parameters.

Initialises the PolicyHumanLikeDelay.

Parameters:

Name Type Description Default
action_names set[str]

The name of the actions to delay (set).

required
median_delay float

Median delay in seconds (the "typical" human response time).

5.0
variability float

How spread out the delays are (0.0 = nearly constant, 1.0+ = very erratic). Values around 0.4-0.8 are realistic. Also controls momentum strength and distraction probability internally.

0.6

Raises:

Type Description
GenException

If median_delay is not greater than zero.

Source code in unaiverse/utils/misc.py
def __init__(self, action_names: set[str], median_delay: float = 5.0, variability: float = 0.6) -> None:
    """Initialises the PolicyHumanLikeDelay.

    Args:
        action_names: The name of the actions to delay (set).
        median_delay: Median delay in seconds (the "typical" human response time).
        variability: How spread out the delays are (0.0 = nearly constant,
            1.0+ = very erratic). Values around 0.4-0.8 are realistic.
            Also controls momentum strength and distraction probability internally.

    Raises:
        GenException: If ``median_delay`` is not greater than zero.
    """
    if median_delay <= 0.:
        raise GenException("Invalid median_delay (must be > 0)")
    self.action_names = action_names
    self._mu = math.log(median_delay)
    self._sigma = max(variability, 0.)
    # Derived internals: momentum blends prev delay into the next one (more variability = more momentum)
    self._momentum = min(self._sigma * 0.4, 0.6)
    # Distraction: rare long pauses (probability scales with variability)
    self._distraction_prob = min(self._sigma * 0.08, 0.12)
    self._distraction_mult = 2.0 + self._sigma

action_names instance-attribute

action_names = action_names

MultiPartLimitedDict

MultiPartLimitedDict(part_name_to_limit: dict)

Bases: dict

A dict subclass that keeps at most a configurable number of entries per named partition.

Initialises the MultiPartLimitedDict.

Parameters:

Name Type Description Default
part_name_to_limit dict

A dictionary mapping partition names to their maximum entry counts.

required
Source code in unaiverse/utils/misc.py
def __init__(self, part_name_to_limit: dict) -> None:
    """Initialises the MultiPartLimitedDict.

    Args:
        part_name_to_limit: A dictionary mapping partition names to their maximum entry counts.
    """
    super().__init__()
    self._limits = part_name_to_limit
    self._trackers = {k: deque() for k in part_name_to_limit}
    self._current_part = next(iter(part_name_to_limit.keys()))  # Default part

set_part

set_part(part_label: str) -> None

Switches the active partition so that subsequent __setitem__ calls are attributed to it.

Parameters:

Name Type Description Default
part_label str

The name of the partition to activate. Unknown labels are silently ignored.

required
Source code in unaiverse/utils/misc.py
def set_part(self, part_label: str) -> None:
    """Switches the active partition so that subsequent ``__setitem__`` calls are attributed to it.

    Args:
        part_label: The name of the partition to activate.  Unknown labels are silently ignored.
    """
    if part_label in self._limits:
        self._current_part = part_label

build_unaid

build_unaid(profile)

Return the canonical UNaIVERSE agent identifier (UNAID) for a node profile.

Combines the node's email address and node name into a single slash-separated string of the form "<email>/<node_name>". This identifier is used throughout the framework as a stable, human-readable key for a specific agent instance.

Parameters:

Name Type Description Default
profile

A node profile object that exposes a get_static_profile() method returning a dict with at least the keys "email" and "node_name".

required

Returns:

Type Description

A string of the form "<email>/<node_name>" uniquely identifying the agent.

Source code in unaiverse/utils/misc.py
def build_unaid(profile):
    """Return the canonical UNaIVERSE agent identifier (UNAID) for a node profile.

    Combines the node's email address and node name into a single slash-separated
    string of the form ``"<email>/<node_name>"``.  This identifier is used throughout
    the framework as a stable, human-readable key for a specific agent instance.

    Args:
        profile: A node profile object that exposes a ``get_static_profile()`` method
            returning a dict with at least the keys ``"email"`` and ``"node_name"``.

    Returns:
        A string of the form ``"<email>/<node_name>"`` uniquely identifying the agent.
    """
    return profile.get_static_profile()['email'] + '/' + profile.get_static_profile()['node_name']

save_node_addresses_to_file

save_node_addresses_to_file(node: object, dir_path: str, public: bool, filename: str = 'addresses.txt', append: bool = False) -> None

Write the hosted node's name and its public or world addresses to a file.

Each call writes exactly one line in the format "<node_name>;<address_list>", where <address_list> is the string representation of the list returned by get_public_addresses() or get_world_addresses(). The file is flushed immediately after writing so that concurrent readers see fresh data.

This format is the counterpart to get_node_addresses_from_file, which parses the same name;[addr, ...] structure.

Parameters:

Name Type Description Default
node object

The node object whose addresses are to be saved. Expected to expose hosted.get_name(), get_public_addresses(), and get_world_addresses().

required
dir_path str

Directory in which the output file is created or updated.

required
public bool

If True, saves the public addresses; if False, saves the world addresses.

required
filename str

Name of the output file. Defaults to "addresses.txt".

'addresses.txt'
append bool

If True, appends to an existing file instead of overwriting it. Defaults to False.

False
Source code in unaiverse/utils/misc.py
def save_node_addresses_to_file(node: object, dir_path: str, public: bool,
                                filename: str = "addresses.txt", append: bool = False) -> None:
    """Write the hosted node's name and its public or world addresses to a file.

    Each call writes exactly one line in the format ``"<node_name>;<address_list>"``,
    where ``<address_list>`` is the string representation of the list returned by
    ``get_public_addresses()`` or ``get_world_addresses()``.  The file is flushed
    immediately after writing so that concurrent readers see fresh data.

    This format is the counterpart to ``get_node_addresses_from_file``, which parses
    the same ``name;[addr, ...]`` structure.

    Args:
        node: The node object whose addresses are to be saved.  Expected to expose
            ``hosted.get_name()``, ``get_public_addresses()``, and
            ``get_world_addresses()``.
        dir_path: Directory in which the output file is created or updated.
        public: If ``True``, saves the public addresses; if ``False``, saves the world
            addresses.
        filename: Name of the output file. Defaults to ``"addresses.txt"``.
        append: If ``True``, appends to an existing file instead of overwriting it.
            Defaults to ``False``.
    """
    address_file = os.path.join(dir_path, filename)
    with open(address_file, "w" if not append else "a") as file:
        file.write(node.hosted.get_name() + ";" +
                   str(node.get_public_addresses() if public else node.get_world_addresses()) + "\n")
        file.flush()

get_node_addresses_from_file

get_node_addresses_from_file(dir_path: str, filename: str = 'addresses.txt') -> dict[str, list[str]]

Read node names and their address lists from a file.

Supports two file formats:

  • Legacy format: each non-empty line is a single address string starting with "/". All addresses are collected under the key "unk".
  • Current format: each line is "<node_name>;<python_list_literal>", where the list literal is evaluated with ast.literal_eval. Lines starting with "***" are treated as header markers and skipped. If the same node name appears on multiple lines, only the last entry is kept.

The current format is produced by save_node_addresses_to_file.

Parameters:

Name Type Description Default
dir_path str

Directory containing the file.

required
filename str

Name of the file to read. Defaults to "addresses.txt".

'addresses.txt'

Returns:

Type Description
dict[str, list[str]]

A dict mapping node names to lists of address strings. Returns an empty dict if

dict[str, list[str]]

the file exists but contains no non-empty lines.

Raises:

Type Description
FileNotFoundError

If the file does not exist in dir_path.

ValueError

If ast.literal_eval cannot parse an address list in the current format (malformed line).

Source code in unaiverse/utils/misc.py
def get_node_addresses_from_file(dir_path: str, filename: str = "addresses.txt") -> dict[str, list[str]]:
    """Read node names and their address lists from a file.

    Supports two file formats:

    - Legacy format: each non-empty line is a single address string starting with ``"/"``.
      All addresses are collected under the key ``"unk"``.
    - Current format: each line is ``"<node_name>;<python_list_literal>"``, where the list
      literal is evaluated with ``ast.literal_eval``.  Lines starting with ``"***"`` are
      treated as header markers and skipped.  If the same node name appears on multiple
      lines, only the last entry is kept.

    The current format is produced by ``save_node_addresses_to_file``.

    Args:
        dir_path: Directory containing the file.
        filename: Name of the file to read. Defaults to ``"addresses.txt"``.

    Returns:
        A dict mapping node names to lists of address strings.  Returns an empty dict if
        the file exists but contains no non-empty lines.

    Raises:
        FileNotFoundError: If the file does not exist in ``dir_path``.
        ValueError: If ``ast.literal_eval`` cannot parse an address list in the current
            format (malformed line).
    """
    ret = {}
    with open(os.path.join(dir_path, filename)) as file:
        lines = file.readlines()
        if not lines or len(lines) == 0:
            return ret

        # Old file format
        if lines[0].strip() == "/":
            addresses = []
            for line in lines:
                _line = line.strip()
                if len(_line) > 0:
                    addresses.append(_line)
            ret["unk"] = addresses
            return ret

        # New file format
        for line in lines:
            if line.strip().startswith("***"):  # Header marker
                continue
            comma_separated_values = [v.strip() for v in line.split(';')]
            node_name, addresses_str = comma_separated_values
            ret[node_name] = ast.literal_eval(addresses_str)  # Name appearing multiple times? the last entry is kept

    return ret

countdown_start

countdown_start(seconds: int, msg: str) -> Thread

Start a tqdm progress-bar countdown in a background thread.

A non-daemon thread is created that displays a tqdm progress bar over seconds ticks, sleeping one second between each tick. During the countdown, sys.stdout inside the thread is redirected through a TqdmPrintRedirector so that ordinary print calls are routed to tqdm.write and do not break the bar layout. The original sys.stdout is restored when the countdown completes.

The caller should eventually pass the returned handle to countdown_wait to block until the countdown finishes.

Parameters:

Name Type Description Default
seconds int

Total number of seconds to count down.

required
msg str

Description label displayed to the left of the progress bar.

required

Returns:

Type Description
Thread

A threading.Thread handle for the running countdown. Pass it to

Thread

countdown_wait to block until the countdown completes.

Source code in unaiverse/utils/misc.py
def countdown_start(seconds: int, msg: str) -> threading.Thread:
    """Start a tqdm progress-bar countdown in a background thread.

    A non-daemon thread is created that displays a ``tqdm`` progress bar over
    ``seconds`` ticks, sleeping one second between each tick.  During the countdown,
    ``sys.stdout`` inside the thread is redirected through a ``TqdmPrintRedirector``
    so that ordinary ``print`` calls are routed to ``tqdm.write`` and do not break
    the bar layout.  The original ``sys.stdout`` is restored when the countdown
    completes.

    The caller should eventually pass the returned handle to ``countdown_wait`` to
    block until the countdown finishes.

    Args:
        seconds: Total number of seconds to count down.
        msg: Description label displayed to the left of the progress bar.

    Returns:
        A ``threading.Thread`` handle for the running countdown.  Pass it to
        ``countdown_wait`` to block until the countdown completes.
    """
    class TqdmPrintRedirector:
        def __init__(self, tqdm_instance: object) -> None:
            self.tqdm_instance = tqdm_instance
            self.original_stdout = sys.__stdout__

        def write(self, s: str) -> None:
            if s.strip():  # Ignore empty lines (needed for the way tqdm works)
                self.tqdm_instance.write(s, file=self.original_stdout)

        def flush(self) -> None:
            pass  # Tqdm handles flushing

    def drawing(secs: int, message: str) -> None:
        with tqdm(total=secs, desc=message, file=sys.__stdout__) as t:
            sys.stdout = TqdmPrintRedirector(t)  # Redirect prints to tqdm.write
            for i in range(secs):
                time.sleep(1)
                t.update(1.)
            sys.stdout = sys.__stdout__  # Restore original stdout

    sys.stdout.flush()
    handle = threading.Thread(target=drawing, args=(seconds, msg))
    handle.start()
    return handle

countdown_wait

countdown_wait(handle: Thread) -> None

Block until the countdown thread created by countdown_start finishes.

Calls join() on the provided thread handle, suspending the caller until the countdown completes. This is the intended companion to countdown_start.

Parameters:

Name Type Description Default
handle Thread

The threading.Thread handle returned by countdown_start.

required
Source code in unaiverse/utils/misc.py
def countdown_wait(handle: threading.Thread) -> None:
    """Block until the countdown thread created by ``countdown_start`` finishes.

    Calls ``join()`` on the provided thread handle, suspending the caller until the
    countdown completes.  This is the intended companion to ``countdown_start``.

    Args:
        handle: The ``threading.Thread`` handle returned by ``countdown_start``.
    """
    handle.join()

check_json_start

check_json_start(file: str, msg: str, delete_existing: bool = False) -> Thread

Start a background daemon thread that monitors a JSON file for changes and pretty-prints it.

The watcher loop polls the file every second. Whenever the parsed JSON content differs from the previous snapshot, the current timestamp and the full JSON body are rendered to the console via rich. Parse errors and other transient exceptions are silently ignored so that the watcher survives temporary write conflicts.

Because the thread is a daemon, it is automatically stopped when the main program exits. The caller may also pass the returned handle to check_json_start_wait to block explicitly (though in practice the daemon loop runs until the process ends or a KeyboardInterrupt is raised inside the thread).

Parameters:

Name Type Description Default
file str

Path to the JSON file to monitor.

required
msg str

A header message printed once to stdout when monitoring begins.

required
delete_existing bool

If True, removes the file before starting the monitor so that the first snapshot is always empty. Defaults to False.

False

Returns:

Type Description
Thread

A daemon threading.Thread handle for the running watcher. Pass it to

Thread

check_json_start_wait to join the thread.

Source code in unaiverse/utils/misc.py
def check_json_start(file: str, msg: str, delete_existing: bool = False) -> threading.Thread:
    """Start a background daemon thread that monitors a JSON file for changes and pretty-prints it.

    The watcher loop polls the file every second.  Whenever the parsed JSON content
    differs from the previous snapshot, the current timestamp and the full JSON body
    are rendered to the console via ``rich``.  Parse errors and other transient
    exceptions are silently ignored so that the watcher survives temporary write
    conflicts.

    Because the thread is a daemon, it is automatically stopped when the main program
    exits.  The caller may also pass the returned handle to ``check_json_start_wait``
    to block explicitly (though in practice the daemon loop runs until the process ends
    or a ``KeyboardInterrupt`` is raised inside the thread).

    Args:
        file: Path to the JSON file to monitor.
        msg: A header message printed once to stdout when monitoring begins.
        delete_existing: If ``True``, removes the file before starting the monitor so
            that the first snapshot is always empty. Defaults to ``False``.

    Returns:
        A daemon ``threading.Thread`` handle for the running watcher.  Pass it to
        ``check_json_start_wait`` to join the thread.
    """
    from rich.json import JSON
    from rich.console import Console
    cons = Console(file=sys.__stdout__)

    if delete_existing:
        if os.path.exists(file):
            os.remove(file)

    def checking(file_path: str, console: Console):
        print(msg)
        prev_dict = {}
        while True:
            if os.path.exists(file_path):
                try:
                    with open(file_path, "r", encoding='utf-8') as f:
                        json_dict = json.load(f)
                        if json_dict != prev_dict:
                            now = datetime.now()
                            console.print("─" * 80)
                            console.print("Printing updated file "
                                          "(print time: " + now.strftime("%Y-%m-%d %H:%M:%S") + ")")
                            console.print("─" * 80)
                            console.print(JSON.from_data(json_dict))
                        prev_dict = json_dict
                except KeyboardInterrupt:
                    break
                except Exception:
                    pass
            time.sleep(1)

    handle = threading.Thread(target=checking, args=(file, cons), daemon=True)
    handle.start()
    return handle

check_json_start_wait

check_json_start_wait(handle: Thread) -> None

Block until the JSON-watcher thread created by check_json_start finishes.

Calls join() on the provided daemon thread handle. Because the watcher loop runs indefinitely, this call will block until the process exits or the thread is stopped externally.

Parameters:

Name Type Description Default
handle Thread

The daemon threading.Thread handle returned by check_json_start.

required
Source code in unaiverse/utils/misc.py
def check_json_start_wait(handle: threading.Thread) -> None:
    """Block until the JSON-watcher thread created by ``check_json_start`` finishes.

    Calls ``join()`` on the provided daemon thread handle.  Because the watcher loop
    runs indefinitely, this call will block until the process exits or the thread is
    stopped externally.

    Args:
        handle: The daemon ``threading.Thread`` handle returned by ``check_json_start``.
    """
    handle.join()

show_images_grid

show_images_grid(image_paths: list[str], max_cols: int = 3) -> None

Display a grid of images from the given file paths using matplotlib.

The figure size is derived from the average pixel dimensions of the loaded images, scaled to a figure-coordinate unit of 100 pixels. Each image is shown without axes and labelled with its zero-based index. Unused grid cells are hidden.

Matplotlib interactive mode is enabled (plt.ion()) before displaying the figure so that the call does not block the caller.

Parameters:

Name Type Description Default
image_paths list[str]

List of file-path strings pointing to the images to display. All paths must be readable by matplotlib.image.imread.

required
max_cols int

Maximum number of columns in the image grid. The actual number of columns is min(max_cols, len(image_paths)). Defaults to 3.

3

Raises:

Type Description
ValueError

If image_paths is empty (cols would be zero, causing a division-by-zero in the layout calculation).

Source code in unaiverse/utils/misc.py
def show_images_grid(image_paths: list[str], max_cols: int = 3) -> None:
    """Display a grid of images from the given file paths using matplotlib.

    The figure size is derived from the average pixel dimensions of the loaded images,
    scaled to a figure-coordinate unit of 100 pixels.  Each image is shown without
    axes and labelled with its zero-based index.  Unused grid cells are hidden.

    Matplotlib interactive mode is enabled (``plt.ion()``) before displaying the
    figure so that the call does not block the caller.

    Args:
        image_paths: List of file-path strings pointing to the images to display.
            All paths must be readable by ``matplotlib.image.imread``.
        max_cols: Maximum number of columns in the image grid. The actual number of
            columns is ``min(max_cols, len(image_paths))``. Defaults to ``3``.

    Raises:
        ValueError: If ``image_paths`` is empty (``cols`` would be zero, causing a
            division-by-zero in the layout calculation).
    """
    import matplotlib.pyplot as plt
    import matplotlib.image as mpimg

    n = len(image_paths)
    cols = min(max_cols, n)
    rows = math.ceil(n / cols)

    # Load images
    images = [mpimg.imread(p) for p in image_paths]

    # Determine figure size based on image sizes
    widths, heights = zip(*[(img.shape[1], img.shape[0]) for img in images])

    # Use average width/height for scaling
    avg_width = sum(widths) / len(widths)
    avg_height = sum(heights) / len(heights)

    fig_width = cols * avg_width / 100
    fig_height = rows * avg_height / 100

    fig, axes = plt.subplots(rows, cols, figsize=(fig_width, fig_height))
    axes = axes.flatten() if n > 1 else [axes]

    fig.canvas.manager.set_window_title("Image Grid")

    # Hide unused axes
    for ax in axes[n:]:
        ax.axis('off')

    for idx, (ax, img) in enumerate(zip(axes, images)):
        ax.imshow(img)
        ax.axis('off')
        ax.set_title(str(idx), fontsize=12, fontweight='bold')

    plt.subplots_adjust(wspace=0, hspace=0)

    # Turn on interactive mode
    plt.ion()
    plt.show()

    fig.canvas.draw()
    plt.pause(0.1)

pack_py_files

pack_py_files(py_files: dict[str, str]) -> str

Serialize, gzip-compress, and base64-encode a py-files dict.

The dict is serialized to JSON, compressed with gzip, and then encoded as a URL-safe-alphabet Base64 ASCII string. The result can be safely embedded in any text-based message or protocol field.

The inverse operation is unpack_py_files.

Parameters:

Name Type Description Default
py_files dict[str, str]

Dict mapping relative file paths (forward-slash separated) to Python source text strings, as produced by collect_py_files.

required

Returns:

Type Description
str

A plain ASCII string containing the compressed, Base64-encoded representation

str

of py_files.

Source code in unaiverse/utils/misc.py
def pack_py_files(py_files: dict[str, str]) -> str:
    """Serialize, gzip-compress, and base64-encode a py-files dict.

    The dict is serialized to JSON, compressed with ``gzip``, and then encoded as a
    URL-safe-alphabet Base64 ASCII string.  The result can be safely embedded in any
    text-based message or protocol field.

    The inverse operation is ``unpack_py_files``.

    Args:
        py_files: Dict mapping relative file paths (forward-slash separated) to Python
            source text strings, as produced by ``collect_py_files``.

    Returns:
        A plain ASCII string containing the compressed, Base64-encoded representation
        of ``py_files``.
    """
    raw = json.dumps(py_files).encode('utf-8')
    return base64.b64encode(gzip.compress(raw)).decode('ascii')

collect_py_files

collect_py_files(folder: str, exclude_dirs: set[str] | None = None) -> dict[str, str]

Collect all .py sources under folder, skipping named subdirectories.

Walks the directory tree recursively using Path.rglob. Any file whose path contains a component (other than the filename itself) that appears in exclude_dirs is skipped. Back-slashes in relative paths are normalised to forward slashes so the result is consistent across operating systems.

The output dict is the canonical input format for pack_py_files and InMemoryFinder.

Parameters:

Name Type Description Default
folder str

Root directory to walk.

required
exclude_dirs set[str] | None

Set of sub-directory names to skip entirely. Files directly under folder (no intermediate directory) are never excluded. Defaults to None (no directories excluded).

None

Returns:

Type Description
dict[str, str]

A dict mapping each file's path relative to folder (forward-slash

dict[str, str]

separated, e.g. "role1.py", "utils/helper.py") to its UTF-8

dict[str, str]

source text.

Source code in unaiverse/utils/misc.py
def collect_py_files(folder: str, exclude_dirs: set[str] | None = None) -> dict[str, str]:
    """Collect all ``.py`` sources under ``folder``, skipping named subdirectories.

    Walks the directory tree recursively using ``Path.rglob``.  Any file whose path
    contains a component (other than the filename itself) that appears in
    ``exclude_dirs`` is skipped.  Back-slashes in relative paths are normalised to
    forward slashes so the result is consistent across operating systems.

    The output dict is the canonical input format for ``pack_py_files`` and
    ``InMemoryFinder``.

    Args:
        folder: Root directory to walk.
        exclude_dirs: Set of sub-directory names to skip entirely.  Files directly
            under ``folder`` (no intermediate directory) are never excluded.
            Defaults to ``None`` (no directories excluded).

    Returns:
        A dict mapping each file's path relative to ``folder`` (forward-slash
        separated, e.g. ``"role1.py"``, ``"utils/helper.py"``) to its UTF-8
        source text.
    """
    if exclude_dirs is None:
        exclude_dirs = set()
    root = Path(folder)
    result: dict[str, str] = {}
    for py_file in root.rglob('*.py'):
        rel = py_file.relative_to(root)
        if any(part in exclude_dirs for part in rel.parts[:-1]):
            continue
        file_name_with_relative_path = str(rel).replace('\\', '/')
        result[file_name_with_relative_path] = py_file.read_text(encoding='utf-8')
    return result

unpack_py_files

unpack_py_files(packed: str) -> dict[str, str]

Base64-decode, decompress, and deserialize a packed py-files string.

Reverses the three-step transformation applied by pack_py_files: Base64 decoding, gzip decompression, and JSON deserialization.

Parameters:

Name Type Description Default
packed str

A Base64-encoded, gzip-compressed string produced by pack_py_files.

required

Returns:

Type Description
dict[str, str]

The original dict mapping relative file paths (forward-slash separated) to

dict[str, str]

Python source text strings.

Raises:

Type Description
Exception

Any exception raised by base64.b64decode, gzip.decompress, or json.loads if packed is malformed or truncated.

Source code in unaiverse/utils/misc.py
def unpack_py_files(packed: str) -> dict[str, str]:
    """Base64-decode, decompress, and deserialize a packed py-files string.

    Reverses the three-step transformation applied by ``pack_py_files``: Base64
    decoding, gzip decompression, and JSON deserialization.

    Args:
        packed: A Base64-encoded, gzip-compressed string produced by ``pack_py_files``.

    Returns:
        The original dict mapping relative file paths (forward-slash separated) to
        Python source text strings.

    Raises:
        Exception: Any exception raised by ``base64.b64decode``, ``gzip.decompress``,
            or ``json.loads`` if ``packed`` is malformed or truncated.
    """
    return json.loads(gzip.decompress(base64.b64decode(packed)).decode('utf-8'))

load_agent_in_memory

load_agent_in_memory(py_files: dict[str, str], role: str, **init_kwargs: Any) -> tuple[Any, InMemoryFinder]

Instantiate a WAgent from an in-memory Python source tree.

The file {role}.py (at the root of py_files) is imported as the agent's main module. Any import statements inside it are resolved against the other entries in py_files through a temporary InMemoryFinder registered on sys.meta_path.

Parameters:

Name Type Description Default
py_files dict[str, str]

Dict mapping relative paths to source text (as produced by collect_py_files).

required
role str

Role name; {role}.py is the entry-point module.

required
**init_kwargs Any

Keyword arguments forwarded to WAgent.__init__ (e.g. proc=None).

{}

Returns:

Type Description
Any

A (agent_instance, finder) tuple. The caller must keep

InMemoryFinder

finder alive as long as the agent is in use, and call

tuple[Any, InMemoryFinder]

finder.cleanup() when the agent is destroyed.

Raises:

Type Description
KeyError

If {role}.py is not present in py_files.

Exception

Any exception raised during module execution or WAgent.__init__.

Source code in unaiverse/utils/misc.py
def load_agent_in_memory(py_files: dict[str, str], role: str, **init_kwargs: Any) -> tuple[Any, InMemoryFinder]:
    """Instantiate a ``WAgent`` from an in-memory Python source tree.

    The file ``{role}.py`` (at the root of *py_files*) is imported as the
    agent's main module.  Any ``import`` statements inside it are resolved
    against the other entries in *py_files* through a temporary
    ``InMemoryFinder`` registered on ``sys.meta_path``.

    Args:
        py_files: Dict mapping relative paths to source text (as produced by
            ``collect_py_files``).
        role: Role name; ``{role}.py`` is the entry-point module.
        **init_kwargs: Keyword arguments forwarded to ``WAgent.__init__``
            (e.g. ``proc=None``).

    Returns:
        A ``(agent_instance, finder)`` tuple.  The caller **must** keep
        *finder* alive as long as the agent is in use, and call
        ``finder.cleanup()`` when the agent is destroyed.

    Raises:
        KeyError: If ``{role}.py`` is not present in *py_files*.
        Exception: Any exception raised during module execution or
            ``WAgent.__init__``.
    """
    if f'{role}.py' not in py_files:
        raise GenException(f"No source file '{role}.py' found in the received module set "
                           f"(available: {list(py_files)})")

    namespace = f'_dyn_{_uuid.uuid4().hex[:8]}'
    finder = InMemoryFinder(py_files, namespace)
    sys.meta_path.insert(0, finder)  # insert first so it wins over real filesystem

    try:
        mod = importlib.import_module(f'{namespace}.{role}')
        agent = mod.WAgent(**init_kwargs)
    except Exception:
        finder.cleanup()
        raise

    return agent, finder

prepare_app_dir

prepare_app_dir(app_name: str = 'unaiverse') -> str

Resolves and creates the platform-appropriate application data directory.

Parameters:

Name Type Description Default
app_name str

Name of the application subdirectory (default: "unaiverse").

'unaiverse'

Returns:

Type Description
str

The absolute path to the created directory.

Source code in unaiverse/utils/misc.py
def prepare_app_dir(app_name: str = "unaiverse") -> str:
    """Resolves and creates the platform-appropriate application data directory.

    Args:
        app_name: Name of the application subdirectory (default: ``"unaiverse"``).

    Returns:
        The absolute path to the created directory.
    """
    app_name = app_name.lower()
    if os.name == "nt":  # Windows
        if os.getenv("APPDATA") is not None:
            key_dir = os.path.join(os.getenv("APPDATA"), "Local", app_name)  # Expected
        else:
            key_dir = os.path.join(str(Path.home()), f".{app_name}")  # Fallback
    else:  # Linux/macOS
        key_dir = os.path.join(str(Path.home()), f".{app_name}")
    os.makedirs(key_dir, exist_ok=True)
    return key_dir

get_key_considering_multiple_sources

get_key_considering_multiple_sources(key_variable: str | None) -> str

Retrieves the UNaIVERSE authentication key from one of several sources.

Checks, in priority order: (1) the key_variable argument, (2) the NODE_KEY environment variable, and (3) a cached key file. If multiple sources are found, a warning is printed. If no source is found, the user is prompted interactively and the entered key is saved to the cache file for future use.

Parameters:

Name Type Description Default
key_variable str | None

A key string passed directly from code, or None (and placeholder strings enclosed in <...> are also treated as None).

required

Returns:

Type Description
str

The resolved authentication key string.

Source code in unaiverse/utils/misc.py
def get_key_considering_multiple_sources(key_variable: str | None) -> str:
    """Retrieves the UNaIVERSE authentication key from one of several sources.

    Checks, in priority order: (1) the ``key_variable`` argument, (2) the ``NODE_KEY``
    environment variable, and (3) a cached key file.  If multiple sources are found,
    a warning is printed.  If no source is found, the user is prompted interactively and
    the entered key is saved to the cache file for future use.

    Args:
        key_variable: A key string passed directly from code, or None (and placeholder
            strings enclosed in ``<...>`` are also treated as None).

    Returns:
        The resolved authentication key string.
    """

    # Creating folder (if needed) to store the key
    try:
        key_dir = prepare_app_dir(app_name="UNaIVERSE")
    except Exception:
        raise GenException("Cannot create folder to store the key file")
    key_file = os.path.join(key_dir, "key")

    # Getting from an existing file
    key_from_file = None
    if os.path.exists(key_file):
        with open(key_file, "r") as f:
            key_from_file = f.read().strip()

    # Getting from env variable
    key_from_env = os.getenv("NODE_KEY", None)

    # Getting from code-specified option
    if key_variable is not None and len(key_variable.strip()) > 0:
        key_from_var = key_variable.strip()
        if key_from_var.startswith("<") and key_from_var.endswith(">"):  # Something like <UNAIVERSE_KEY_GOES_HERE>
            key_from_var = None
    else:
        key_from_var = None

    # Finding valid sources and checking if multiple keys were provided
    _keys = [key_from_var, key_from_env, key_from_file]
    _source_names = ["your code", "env variable 'NODE_KEY'", f"cache file {key_file}"]
    source_names = []
    mismatching = False
    multiple_source = False
    first_key = None
    first_source = None
    _prev_key = None
    for i, (_key, _source_name) in enumerate(zip(_keys, _source_names)):
        if _key is not None:
            source_names.append(_source_name)
            if _prev_key is not None:
                if _key != _prev_key:
                    mismatching = True
                multiple_source = True
                _prev_key = _key
            else:
                _prev_key = _key
                first_key = _key
                first_source = _source_name

    if len(source_names) > 0:
        if multiple_source and not mismatching:
            msg = "UNaIVERSE key (the exact same key) present in multiple locations: " + ", ".join(source_names)
            print(msg)
        if multiple_source and mismatching:
            msg = "UNaIVERSE keys (different keys) present in multiple locations: " + ", ".join(source_names)
            msg += "\nLoaded the one stored in " + first_source
            print(msg)
        # if not multiple_source:
        #    msg = f"UNaIVERSE key loaded from {first_source}"
        #    print(msg)
        return first_key
    else:

        # If no key present, ask user and save to file
        print("UNaIVERSE key not present in " + ", ".join(_source_names))
        print("If you did not already do it, go to https://unaiverse.io, login, and generate a key")
        key = input("Enter your UNaIVERSE key, that will be saved to the cache file: ").strip()
        with open(key_file, "w") as f:
            f.write(key)
        return key

analyze_code

analyze_code(files_in_memory: dict[str, str]) -> bool

Analyzes a string of Python code for dangerous or unsafe functions and modules.

Parameters:

Name Type Description Default
files_in_memory dict[str, str]

The dict "file name to code string with contents".

required

Returns:

Type Description
bool

True if the code is considered safe, otherwise False.

Source code in unaiverse/utils/misc.py
def analyze_code(files_in_memory: dict[str, str]) -> bool:
    """Analyzes a string of Python code for dangerous or unsafe functions and modules.

        Args:
            files_in_memory: The dict "file name to code string with contents".

        Returns:
            True if the code is considered safe, otherwise False.
        """
    dangerous_functions = {"eval", "exec", "compile", "system", "__import__", "input"}
    dangerous_modules = {"subprocess"}

    def is_suspicious(ast_node):

        # Detect bare function calls like eval(...)
        if isinstance(ast_node, ast.Call):
            # case: eval(...)  (ast.Name)
            if isinstance(ast_node.func, ast.Name):
                return ast_node.func.id in dangerous_functions
            # case: something.eval(...)  (ast.Attribute)
            elif isinstance(ast_node.func, ast.Attribute):
                attr_name = ast_node.func.attr
                # 1) If attribute name is one of the dangerous_functions, only flag it
                #    if the object is a suspicious module (os, subprocess, etc.)
                if attr_name in dangerous_functions:
                    value = ast_node.func.value
                    # example: os.system(...)  => ast.Name(id='os')
                    if isinstance(value, ast.Name):
                        if value.id in dangerous_modules:
                            return True
                    # example: package.subpackage.func(...) => ast.Attribute
                    # check top-level name if possible: walk down to the leftmost Name
                    left = value
                    while isinstance(left, ast.Attribute):
                        left = left.value
                    if isinstance(left, ast.Name) and left.id in dangerous_modules:
                        return True
                # 2) Also catch explicit module imports used directly:
                #    subprocess.run(...), os.system(...), etc.
                if isinstance(ast_node.func.value, ast.Name):
                    if ast_node.func.value.id in dangerous_modules:
                        # if the module is suspicious, any attribute call is risky
                        return True

        # Detect imports
        if isinstance(ast_node, (ast.Import, ast.ImportFrom)):
            for alias in ast_node.names:
                if alias.name.split('.')[0] in dangerous_modules:
                    return True

        return False

    for file_in_memory in files_in_memory.values():
        try:
            tree = ast.parse(file_in_memory)
        except SyntaxError:
            return False

        for _ast_node in ast.walk(tree):
            if is_suspicious(_ast_node):
                return False

    return True