Skip to content

11 · Build your first world, step by step

Build worlds · Chapter 11 of 12 · Path home

You have seen every piece and read real worlds. Now build one from nothing. We will do it with no AI at all, to drive home the point that a world is a society of agents of any kind. Our world is a smart greenhouse: a moisture sensor, a rule-based controller, and a water valve, coordinating so the plants get watered when the soil is dry.

What you'll build

Three kinds of agent and a world that hosts them:

  • a sensor that reads soil moisture and publishes it;
  • a controller that reads the moisture and decides;
  • a valve that opens or closes on command.

Not a single neural network in sight, and yet a complete UNaIVERSE world.

Step 0 · Design it first

Before code, answer five questions. This is how you design any world.

  1. Who takes part? A sensor, a controller, a valve.
  2. What does each do? Sensor: read and publish. Controller: read, decide, command. Valve: open or close.
  3. What data flows? A moisture reading (sensor to controller); an open/close command (controller to valve).
  4. Who leads? The controller. It is the master: it finds the devices and drives the cycle.
  5. What is each one's behavior? A short state machine per role (below).

That is the whole design. Notice it is the same five questions whether you are building a greenhouse, a classroom, or a marketplace.

Step 1 · The folder

greenhouse/
├── src/
│   ├── world.py          # the World subclass
│   ├── sensor.py         # the "sensor" role
│   ├── controller.py     # the "controller" role (the master)
│   └── valve.py          # the "valve" role
├── run_w.py              # hosts the world
├── run_sensor.py         # a sensor joins
├── run_controller.py     # a controller joins (the master)
└── run_valve.py          # a valve joins

Step 2 · The role agent classes

Each role is a small Agent subclass carrying its custom actions. The hardware calls (read_soil_moisture, set_irrigation) are yours: here they are tiny simulations so the world runs end to end on one machine, and you swap them for a real sensor and valve driver later. UNaIVERSE only cares about the actions around them.

src/sensor.py
import random
from unaiverse.agent import Agent, action


def read_soil_moisture() -> float:
    # Your hardware. Swap this for a real sensor (a GPIO/ADC read); here we simulate
    # soil moisture on a 0.0 (bone dry) to 1.0 (soaked) scale.
    return round(random.uniform(0.1, 0.6), 2)


class WAgent(Agent):
    @action
    async def publish_moisture(self) -> bool:
        self.stdout.set(read_soil_moisture())   # one reading -> our output stream
        return True

Focus · producing data with self.stdout.set(...)

Every agent owns a few named streams. self.stdout is the proxy to your output stream, named proc_output_0 (it exists because the sensor declared proc_outputs when it joins, Step 4). stdout.set(value) publishes one sample that anyone connected to you can read. New to these slots? An agent's own streams explains the whole convention in one place.

This sensor has no model (proc=None), so it fills stdout by hand. An agent with a proc would not: its process action reads stdin, runs forward(), and writes the result to stdout for you. Same output stream, either way, that is what makes a sensor and a neural network interchangeable to the rest of the world.

src/valve.py
from unaiverse.agent import Agent, action
from unaiverse.interaction import Interaction


def set_irrigation(open_valve: bool) -> None:
    # Your hardware. Swap this for a real valve driver; here we just report it.
    print(f"💧 valve {'OPEN' if open_valve else 'closed'}")


class WAgent(Agent):
    @action
    async def set_valve(self, interaction: Interaction | None = None) -> bool:
        cmd = self.stdin.get(uuid=interaction.uuid, requested_by="set_valve")
        set_irrigation(bool(cmd[0] if isinstance(cmd, (list, tuple)) else cmd))  # actuate
        return True

Focus · reading what an interaction carried with self.stdin.get(...)

When someone calls a named action on you and attaches data, that data lands on your stdin (the input slot proc_input_0), tagged with the interaction's uuid. The interaction argument is handed to your action by the framework; pass uuid=interaction.uuid so you read the sample for this request and not a stale one.

requested_by is not cosmetic: it gives single-delivery semantics. Each fresh sample is handed to a given reader once; call get again with the same requested_by and you get None until a new value is written (pass requested_by=None to always re-read the latest). That is exactly why the controller's self._sensor_stream.get(requested_by="decide") returns None on ticks where the sensor has not published anything new, and why both actions guard with if ... is None: return False to retry on the next tick.

Why the cmd[0] if ... else cmd dance? A stream read can hand you back a single value or a short list of samples; this unwraps the first one. Read your input, act on it, return True to tell the caller the request is done (return False would ask the framework to retry on the next tick).

src/controller.py
from unaiverse.agent import Agent, action


class WAgent(Agent):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self._sensor_peer_id = None       # who is the sensor
        self._valve_peer_id = None        # who is the valve
        self._sensor_stream = None        # the sensor's published reading stream

    @action
    async def connect_to_devices(self) -> bool:
        # connect_by_role refills self._found_agents on every call, so resolve one
        # role at a time and remember each peer (Chapter 7).
        if not await self.connect_by_role("sensor"):
            return False
        self._sensor_peer_id = next(iter(self._found_agents))   # the first sensor

        if not await self.connect_by_role("valve"):
            return False
        self._valve_peer_id = next(iter(self._found_agents))    # the first valve
        return True

    @action
    async def decide(self, dry_below: float = 0.3) -> bool:
        # Resolve the sensor's published stream once (same get_stream(...).get(...)
        # pattern the chat user uses to read the broadcaster), then read its latest
        # value every tick.
        if self._sensor_stream is None:
            self._sensor_stream = self.get_stream("processor",
                                                  peer_id=self._sensor_peer_id,
                                                  data_type="all")
        if self._sensor_stream is None:
            return False                                   # not connected yet, retry

        reading = self._sensor_stream.get(requested_by="decide")
        if reading is None:
            return False                                   # no reading yet, retry

        # Dry soil -> open the valve; otherwise close it.
        open_valve = float(reading) < dry_below
        return await self.send(action_name="set_valve",
                               target=[self._valve_peer_id],
                               data_samples={"proc_output_0": open_valve})

Focus · two ways to move data: pull and push

The controller never stores the sensor's reading in a shared place, there is no shared place. It moves data in two complementary ways:

Pull, with get_stream(...).get(...). get_stream("processor", peer_id=sensor) reaches into the sensor's output stream (the one it publishes on) and .get(...) reads its latest sample. No request, no permission: you read a peer's stream whenever you like. Resolve the stream once and cache it, then read every tick.

Push, with send(..., data_samples=...). data_samples={"proc_output_0": open_valve} loads the value onto your own output stream and ships it with the request; on the valve it arrives as stdin, exactly what set_valve reads above. The key is always a stream you own (here the canonical proc_output_0, the same name the chat broadcaster uses); the framework routes it to the target's input. Drop the action_name and the very same call becomes a pure broadcast that just deposits the value, triggering nothing.

Pull when you want the freshest value on your own schedule; push when you want someone to act on data you hand them. The controller does both: pulls the sensor, pushes the valve.

Three tiny classes, every action fully written out. The sensor publishes its reading on its own output stream (Chapter 5); the valve receives a command and acts; the controller finds the two devices, reads the sensor, decides, and sends (Chapter 6). Two details worth naming: connect_by_role clears self._found_agents each time, so the controller connects to sensor and valve in separate calls and keeps each peer id; and reading another agent's stream is exactly get_stream("processor", peer_id=...) then .get(...), the same two lines the chat user uses on the broadcaster. The set_valve send is a named-action request carrying one boolean; you could also push raw data. That is the whole logic.

Step 3 · The World subclass

The world assigns roles and builds each role's behavior. We assign by a simple rule: the controller node is the master; the others are devices that declare which kind they are through their profile (role_preference).

src/world.py
import os
from unaiverse.world import World
from unaiverse.hsm import HybridStateMachine
from unaiverse.networking.node.profile import NodeProfile


class GreenhouseWorld(World):
    def __init__(self, **kwargs):
        world_folder = os.path.dirname(os.path.abspath(__file__))
        super().__init__(world_folder=world_folder, **kwargs)

    def assign_role(self, profile: NodeProfile, is_world_master: bool):
        if is_world_master:
            return "controller"                 # the leader
        # each device states what it is when it joins (role_preference, read here
        # from the profile as tmp_role_preference); see Chapter 3 on profiles. You
        # could instead infer it from the streams it declares, like info_extraction.
        pref = profile.get_dynamic_profile().get("tmp_role_preference")
        return pref if pref in ("sensor", "valve") else "valve"

    def create_behav_files(self):
        # --- sensor: just keep publishing ---
        from .sensor import WAgent as Sensor
        b = HybridStateMachine(Sensor(proc=None))
        b.set_role("sensor")
        b.add_state("ready", action="publish_moisture", blocking=True)
        b.save(os.path.join(self.world_folder, "sensor.json"))

        # --- valve: open/close on request ---
        from .valve import WAgent as Valve
        b = HybridStateMachine(Valve(proc=None))
        b.set_role("valve")
        b.add_state("ready", blocking=True)
        b.add_transit("ready", "ready", action="set_valve", args={}, ready=False)
        b.save(os.path.join(self.world_folder, "valve.json"))

        # --- controller: connect, then decide forever ---
        from .controller import WAgent as Controller
        b = HybridStateMachine(Controller(proc=None))
        b.set_role("controller")
        b.add_state("init", blocking=True)
        b.add_state("ready", action="decide", args={"dry_below": 0.3}, blocking=True)
        b.add_transit("init", "ready",
                      action="connect_to_devices", args={},
                      msg="🌱 Connecting to the greenhouse devices...")
        b.save(os.path.join(self.world_folder, "controller.json"))

Read the three behaviors: the sensor loops publish_moisture; the valve sits in ready and runs set_valve whenever a command arrives (a ready=False self-loop, exactly like the chat broadcaster, Chapter 4); the controller connects once, then loops decide. That is the entire greenhouse.

Step 4 · Run it

Host the world, then start the three devices. The controller is the master.

run_w.py
from src.world import GreenhouseWorld
from unaiverse.networking.node.node import Node

world = GreenhouseWorld()
Node(world, node_name="Greenhouse", hidden=True, clock_delta=1. / 10.,
     world_masters_node_names=["Controller"]).run()
run_sensor.py / run_valve.py / run_controller.py (idea)
from unaiverse.agent import Agent
from unaiverse.networking.node.node import Node

# each device declares what it is with role_preference; the world's assign_role
# reads it back (Chapter 3). The controller node is named to match the world's
# world_masters_node_names, so it joins as the master and skips the preference.
agent = Agent(proc=None, proc_inputs=["all"], proc_outputs=["all"])
Node(agent, node_name="Sensor", hidden=True).run(join_world="Greenhouse",
                                                 role_preference="sensor")

This greenhouse is not a shipped example, it is the folder you just built. Drop your greenhouse/ next to the example worlds and run the whole thing on one machine with the synchronizer (Chapter 2):

python worlds/run_synch.py greenhouse

See it run

graph LR
    S[Sensor<br/>publish_moisture] -->|moisture reading| C[Controller · master<br/>decide]
    C -->|set_valve: open/close| V[Valve<br/>set_irrigation]

The controller connects to both devices, reads the sensor every tick, and tells the valve to open when the soil is dry. No model, no training, no human, and yet every concept from this path is in it: a World subclass assigning roles, behaviors per role, custom actions, streams of data, and interactions carrying commands.

Beyond one-shot samples

Everything above moves one sample at a time: the controller pulls the latest reading and pushes a single command. That is the right default. A few variations cover almost everything else you will need.

Reading a window, not just the latest

A single noisy reading is a bad reason to open a valve. Pass all_uuids=True to a stream read and instead of the latest value you get every buffered sample, as a list of (value, tag, timestamp) tuples, so the controller can smooth a window instead of reacting to one spike:

samples = self._sensor_stream.get(requested_by="decide", all_uuids=True)
readings = [float(v) for v, _tag, _t in samples]      # the recent window
if readings:
    avg = sum(readings) / len(readings)
    open_valve = avg < dry_below

The tag is a monotonic counter the producer stamps on each sample (so you can tell fresh from stale, or spot a gap), and timestamp is when it was written. The chat user uses exactly this call to keep a rolling history of the last few messages (get(..., all_uuids=True) looped into a short list).

Agreeing on a continuous stream

Polling one reading per tick is fine for a slow sensor. For a steady flow, a camera at many frames a second, a microphone, a sensor that never stops, you do not want to poll: you agree on the stream once and let samples arrive continuously over a dedicated channel. That channel is a Pub/Sub stream.

It takes three pieces:

  1. The producer marks its output stream as a channel, with pubsub=True:

    # run_sensor.py: a continuous reading channel instead of one-shot samples
    from unaiverse.streams.dataprops import StreamType
    
    agent = Agent(proc=None,
                  proc_outputs=[StreamType(data_type="all", pubsub=True)])
    
  2. The world master wires the subscription once (the consumer follows the producer), with the send_subscribe built-in. Say you added a logger device that records every reading. The controller (the master) connects it to the sensor's channel like this:

    # self._logger_peer_id was resolved when the controller joined the logger,
    # exactly like the sensor and valve: connect_by_role("logger") then
    # next(iter(self._found_agents)). (Chapter 7.)
    
    # 1) Get the sensor's published stream by its group name. find_streams returns
    #    a {network-hash: stream} dict; that network-hash IS the stream's
    #    unambiguous, global id, the thing send_subscribe wants.
    sensor_streams = self.find_streams(self._sensor_peer_id, "processor")
    sensor_stream_hash = next(iter(sensor_streams), None)
    
    # 2) Ask the logger to subscribe to it. The two arguments read plainly:
    #    agent=        -> WHO subscribes (the consumer, here the logger)
    #    stream_hashes -> WHICH stream(s) it follows (the sensor's hash from above)
    await self.send_subscribe(agent=self._logger_peer_id,
                              stream_hashes=[sensor_stream_hash])
    

    A stream hash is just a stream's address on the network: every stream an agent publishes has one, and find_streams(owner, group) is how you look it up once you are connected to the owner. You never type it by hand.

  3. The consumer accepts it by running the subscribe built-in in its behavior (one transit, action="subscribe"). After that the samples are simply there to read, no request per sample.

That handshake (master wires it, consumer runs subscribe) is why the logger, not the controller, is the subscriber here: an agent cannot subscribe itself, the subscribe action only runs in response to a send_subscribe from someone else. social_learning does the identical dance to stream the best student's predictions to its classmates. One-shot send is a knock on the door; a Pub/Sub stream is leaving the line open.

Commanding many devices at once

One valve was easy. With several, you do not loop and send one by one: target takes a list, so one send reaches them all. Keep every valve when you connect (connect_by_role leaves the whole set in self._found_agents), then command them together:

# in connect_to_devices: keep every valve, not just the first
await self.connect_by_role("valve")
self._valve_peer_ids = list(self._found_agents)

# ...later, in decide: one send to all of them
await self.send(action_name="set_valve",
                target=self._valve_peer_ids,
                data_samples={"proc_output_0": open_valve})

To wait until every valve has acted before moving on, gate the next state with the all_sent_completed built-in:

b.add_transit("commanding", "ready",
              action="all_sent_completed", args={"action_name": "set_valve"})

This is the "tell every worker, then wait" idiom: class_incremental_learning teaches a whole class with a single send(target=[all students]) and waits on all_sent_completed before the exam.

Hearing back from many

The mirror image: ask every sensor to report and collect answers as they trickle in. A state that loops on the received_some_asked_data built-in calls a method of yours once per sample received:

async def collect_reading(self, agent, props, data, data_tag):
    self._readings[agent] = float(data)         # one reply -> stash it by sender

# behavior: stay here draining replies until you have what you need
b.add_state("collecting", action="received_some_asked_data",
            args={"processing_fcn": "collect_reading"}, blocking=True)

processing_fcn is the name of a method, invoked as fcn(agent, props, data, data_tag) for each incoming sample. info_extraction drains its extractors' results this way; class_incremental_learning collects every student's predictions before grading.

Forwarding what you produced

Our controller is a plain rule, so it writes the command by hand. Swap it for a learned controller (the next idea below) and it produces the command by running its model: process() reads stdin, runs forward(), and writes the result to stdout. To forward that freshly produced output to the valve you do not read it back and re-stage it, you pass copy_sys=True and the framework carries it into the send:

await self.process()                                  # the model writes stdout
await self.send(action_name="set_valve",
                target=[self._valve_peer_id],
                copy_sys=True)                         # forward what I just produced

That is the chat user's exact move: generate a reply with process(), then send(..., copy_sys=True) to hand it to the broadcaster, no manual copy in between.

Sending raw JSON, the blunt approach

Our command is a single boolean. The moment it grows into something structured, an open/close plus a duration, a whole config, the crudest thing that works is to serialize it to JSON and ship the text. A stream of data_type="all" accepts any value, so a JSON string travels with no stream types to match and no schema to declare:

import json

# controller: send a structured order as raw JSON text
await self.send(action_name="set_valve", target=[self._valve_peer_id],
                data_samples={"proc_output_0":
                              json.dumps({"open": open_valve, "seconds": 30})})
import json

# valve: parse it back and act
async def set_valve(self, interaction: Interaction | None = None) -> bool:
    cmd = self.stdin.get(uuid=interaction.uuid, requested_by="set_valve")
    order = json.loads(cmd[0] if isinstance(cmd, (list, tuple)) else cmd)
    set_irrigation(order["open"])      # ...and honour order["seconds"]
    return True

No types, no negotiation, just text you both agree to read the same way. It is how turing ships its votes and chat between agents (json.dumps on one side, json.loads on the other). Blunt, but perfectly fine to start: tighten it into a typed stream later if you ever need to.

Make it yours

Small changes turn this into very different worlds, the same skeleton each time:

  • Add a weather agent; have the controller skip watering when rain is coming.
  • Make the valve report back how much water it released (a stream the controller logs).
  • Swap the rule for a learned controller, now it is a teaching world (Chapter 8), and the structure barely changes.
  • Add badges (Chapter 10) for a device that ran a full season without fault.

What just happened

You built a complete world from an empty folder, with zero AI, and in doing so used every piece of this path. If you can build a greenhouse, you can build a classroom, a market, or a game: change the participants and what they do; the shape stays the same.

Where next