Build an Agent Swarm That Self-Organizes
Most multi-agent systems have an orchestrator. One central process decides who does what. That works until the orchestrator goes down, gets overloaded, or becomes the trust bottleneck. In this tutorial, we build something different: 10 agents that discover each other, establish trust, exchange messages, and self-organize based on capability matching. No orchestrator. No central scheduler. The swarm figures it out.
By the end, you will have a working Python codebase that spawns agents, lets them find peers via Pilot Protocol's registry, exchange data through encrypted tunnels, and route work to the right agents by role.
What We Are Building
The system has these properties:
- 10 agents, each running as a separate process with its own Pilot daemon
- Peer discovery via the registry, using hostname and tag-based lookups
- Mutual trust established through Ed25519 handshakes (no pre-shared keys)
- Data exchange over encrypted tunnels using Pilot's messaging ports
- LLM integration for task execution (each agent calls an LLM to process work)
- Role-based routing that directs work to agents with matching capabilities
- No central orchestrator -- every agent runs the same code and makes its own decisions
The architecture relies on two Pilot Protocol features: the registry for discovery and the data exchange port (port 1001) for passing work between agents.
Prerequisites
- Python 3.10+
- Pilot Protocol installed (
go install github.com/TeoSlayer/pilotprotocol/cmd/...) - A Pilot network (the default at pilotprotocol.network is available for all users)
- An OpenAI API key (or any LLM API) for task execution
Agent Architecture
Each agent is a Python script that wraps pilotctl via subprocess calls. This is intentional. Pilot Protocol's CLI is the stable interface. You do not need Go bindings or a Python SDK. If you can call a subprocess, you can participate in the network.
The Agent Class
import subprocess
import json
import time
import random
import os
class SwarmAgent:
def __init__(self, agent_id, role, registry_addr):
self.agent_id = agent_id
self.role = role
self.registry_addr = registry_addr
self.hostname = f"swarm-{role}-{agent_id}"
self.peers = {}
self.tasks_completed = 0
def pilotctl(self, *args):
"""Run a pilotctl command and return parsed output."""
cmd = ["pilotctl"] + list(args)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
raise RuntimeError(f"pilotctl failed: {result.stderr}")
return result.stdout.strip()
def start_daemon(self):
"""Start the Pilot daemon for this agent."""
subprocess.Popen([
"pilot-daemon",
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2) # Wait for daemon startup + STUN
def register(self):
"""Register hostname and set agent tags."""
self.pilotctl("set-hostname", self.hostname)
self.pilotctl(
"extras", "set-tags",
f"role={self.role},swarm=demo,capacity=medium"
)
print(f"[{self.hostname}] Registered with tags")
Each agent starts its own daemon, registers a hostname, and sets descriptive tags. The tags make the agent discoverable to peers searching for specific capabilities.
Discovering Peers
Agents find each other through the registry. No hardcoded addresses, no configuration files, no service mesh. You query by tag and get back a list of matching agents.
def discover_peers(self):
"""Find other agents in the swarm by tag."""
output = self.pilotctl("peers", "--search", "swarm=demo")
peers = json.loads(output)
for peer in peers:
addr = peer["address"]
if addr != self.my_address():
self.peers[addr] = {
"hostname": peer.get("hostname", "unknown"),
"role": peer.get("tags", {}).get("role", "unknown"),
"trusted": False,
}
print(f"[{self.hostname}] Discovered {len(self.peers)} peers")
return self.peers
def my_address(self):
"""Get this agent's Pilot address."""
output = self.pilotctl("daemon", "status")
status = json.loads(output)
return status["address"]
The peers --search command queries the registry for all agents with matching tags. Each result includes the agent's virtual address, hostname, and tags. This is how the swarm maintains awareness without an orchestrator -- every agent can ask the registry "who else is in my swarm?" at any time.
Establishing Trust
Discovery is not the same as trust. By default, agents are invisible. Before an agent can send tasks to a peer, they must complete a mutual Ed25519 handshake.
def establish_trust(self):
"""Initiate handshake with all discovered peers."""
for addr, info in self.peers.items():
if info["trusted"]:
continue
try:
self.pilotctl("handshake", addr)
self.peers[addr]["trusted"] = True
print(f"[{self.hostname}] Trusted: {info['hostname']}")
except RuntimeError as e:
print(f"[{self.hostname}] Handshake failed with {addr}: {e}")
The handshake is mutual. Both sides sign a challenge with their Ed25519 identity key. If either side rejects (or if the agent has not opted into the handshake), the connection fails. This means an agent can refuse trust from unknown peers, and a compromised agent can be revoked from the network by removing its trust entries.
The Work Handoff
Agents exchange work over port 1001 (data exchange). The flow is: send work payload, receive and process it, return results. Here is how each step works.
Sending Work to a Peer
def send_work(self, target_addr, task_data):
"""Send a work payload to a peer agent via data exchange."""
task_json = json.dumps({
"type": task_data["type"],
"payload": task_data["payload"],
"sender": self.my_address(),
})
self.pilotctl(
"data", "send",
target_addr,
"--stdin",
)
print(f"[{self.hostname}] Sent work to {target_addr}")
Choosing the Right Peer
This is where the self-organization begins. Instead of randomly routing work, each agent chooses a peer whose role matches the work type.
def select_peer(self, role):
"""Select a trusted peer for a given role."""
candidates = [
(addr, info) for addr, info in self.peers.items()
if info["role"] == role and info["trusted"]
]
if not candidates:
return None
# Round-robin across available trusted peers for that role
return random.choice(candidates)[0]
Executing Tasks with LLMs
def listen_for_work(self):
"""Receive incoming data and process it."""
while True:
try:
# Receive incoming data payload
output = self.pilotctl("data", "recv", "--json")
if not output:
continue
msg = json.loads(output)
sender = msg.get("from", "unknown")
print(f"[{self.hostname}] Received work from {sender}")
# Execute with LLM
start_time = time.time()
result = self.execute_with_llm(msg["data"])
elapsed = time.time() - start_time
# Send results back to sender
result_json = json.dumps(result)
self.pilotctl("send", sender, result_json)
self.tasks_completed += 1
print(f"[{self.hostname}] Responded in {elapsed:.1f}s")
except subprocess.TimeoutExpired:
continue
except Exception as e:
print(f"[{self.hostname}] Processing error: {e}")
The LLM Execution Function
def execute_with_llm(self, task_data):
"""Call an LLM to process the task."""
import openai
client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"])
task_type = task_data.get("type", "general")
payload = task_data.get("payload", "")
prompts = {
"summarize": f"Summarize this text concisely:\n\n{payload}",
"analyze": f"Analyze this data and provide key insights:\n\n{payload}",
"code_review": f"Review this code for bugs and improvements:\n\n{payload}",
"translate": f"Translate this to English:\n\n{payload}",
"general": f"Process this request:\n\n{payload}",
}
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"You are a {task_type} specialist agent."},
{"role": "user", "content": prompts.get(task_type, prompts["general"])},
],
max_tokens=1024,
)
return {
"output": response.choices[0].message.content,
"model": "gpt-4o-mini",
"tokens_used": response.usage.total_tokens,
}
Each agent is a specialist. One handles summarization, another does code review, another translates. The work type in the payload determines which prompt template is used. The routing decision -- which agent gets the work -- is based on role matching.
The Emergence: Self-Organization in Action
Now we wire it all together. Here is the main loop that each agent runs.
import threading
import math
def run_agent(agent_id, role, registry_addr):
agent = SwarmAgent(agent_id, role, registry_addr)
agent.start_daemon()
agent.register()
time.sleep(5) # Let other agents register
agent.discover_peers()
agent.establish_trust()
# Start work listener in background
listener = threading.Thread(
target=agent.listen_for_work, daemon=True
)
listener.start()
# Main loop: periodically send work to peers
work_types = ["summarize", "analyze", "code_review", "translate"]
while True:
# Refresh peer list
agent.discover_peers()
# Pick a work type and find a peer for it
work_type = random.choice(work_types)
role_for_work = {
"summarize": "summarizer",
"analyze": "analyzer",
"code_review": "reviewer",
"translate": "translator",
}.get(work_type, "general")
target = agent.select_peer(role_for_work)
if target is None:
time.sleep(5)
continue
# Send work to selected peer
try:
agent.send_work(target, {
"type": work_type,
"payload": generate_sample_payload(work_type),
})
except Exception as e:
print(f"[{agent.hostname}] Send failed: {e}")
time.sleep(random.uniform(10, 30)) # Stagger sends
Spawning the Swarm
def main():
registry_addr = "rendezvous.example.com:9000"
# Define the swarm: 10 agents with different roles
agents = [
(0, "summarizer"),
(1, "summarizer"),
(2, "analyzer"),
(3, "analyzer"),
(4, "reviewer"),
(5, "reviewer"),
(6, "translator"),
(7, "translator"),
(8, "general"), # Generalist: accepts any task type
(9, "general"),
]
threads = []
for agent_id, role in agents:
t = threading.Thread(
target=run_agent,
args=(agent_id, role, registry_addr),
)
t.start()
threads.append(t)
time.sleep(1) # Stagger daemon startups
for t in threads:
t.join()
if __name__ == "__main__":
main()
What Happens Over Time
Run this for 30 minutes and observe the work distribution. Here is what typically emerges:
- Minutes 0-5: All agents come online and discover each other. Work routing goes to random trusted peers.
- Minutes 5-10: Agents with lower latency to the LLM API respond faster and accumulate more completed requests.
- Minutes 10-20: Peers that respond quickly get routed more work. Agents with connectivity issues get fewer requests. The swarm self-corrects without manual intervention.
- Minutes 20-30: The swarm has self-organized. If any agent goes down, peers detect the lost tunnel and stop routing to it. No failover logic needed.
This is the key insight: The swarm does not need an orchestrator because role-based routing and tunnel health replace central scheduling. Emergence happens automatically.
Monitoring the Swarm
You can watch the self-organization happen in real time by polling the registry for peer status. For a more robust monitoring setup that works across NAT boundaries without a centralized Prometheus stack, see Distributed Monitoring Without Prometheus or Grafana.
def monitor_swarm(registry_addr):
"""Print peer activity every 30 seconds."""
while True:
output = subprocess.run(
["pilotctl", "peers", "--search", "swarm=demo"],
capture_output=True, text=True
).stdout
agents = json.loads(output)
print("\n--- Swarm Status ---")
print(f"{'Hostname':<25} {'Role':<12} {'Status':<8}")
print("-" * 47)
for a in agents:
print(f"{a.get('hostname', 'unknown'):<25} "
f"{a.get('tags', {}).get('role', '?'):<12} "
f"{a.get('status', 'unknown'):<8}")
time.sleep(30)
Scaling to 100 Agents
Ten agents is a demo. Can this approach scale? We tested with 100 agents on 5 VMs (20 agents per VM) and found two things that matter.
Memory: 10 MB Per Daemon
Each Pilot daemon uses approximately 10 MB of RSS. On a 16 GB VM, you can run 200+ daemons comfortably. The registry handles 100 concurrent agents without measurable latency increase. The benchmark data confirms the per-connection memory stays flat.
# Memory usage across 100 daemons on a single VM
ps aux | grep pilot-daemon | awk '{sum += $6} END {print sum/1024 "MB"}'
1024MB # ~10 MB per daemon average
Network Topology at Scale
With 100 agents, the trust mesh becomes important. Full mesh (every agent trusts every other) means 4,950 handshake pairs. That is expensive. In practice, agents should trust only peers they actually interact with. The swarm naturally converges on a sparse trust graph where each agent trusts 10-20 peers based on role affinity.
def selective_trust(self):
"""Only trust peers in roles we send work to."""
needed_roles = self.get_target_roles()
for addr, info in self.peers.items():
if info["role"] in needed_roles and not info["trusted"]:
self.pilotctl("handshake", addr)
self.peers[addr]["trusted"] = True
Adding Fault Tolerance
A real swarm needs to handle agent failures. Pilot makes this straightforward because the registry tracks liveness via keepalive probes.
def send_with_retry(self, work_data, role, max_retries=3):
"""Send work with automatic failover to next peer."""
for attempt in range(max_retries):
target = self.select_peer(role)
if target is None:
print(f"[{self.hostname}] No peers available for {role}")
return None
try:
self.send_work(target, work_data)
return True
except Exception as e:
print(f"[{self.hostname}] Attempt {attempt+1} failed: {e}")
# Remove from peers temporarily
self.peers[target]["trusted"] = False
continue
return None
When a send fails, the agent marks the peer as temporarily untrusted and retries with the next available peer. If the peer recovers, the next discovery refresh will re-add it to the candidates list.
The Complete Agent Script
Here is the full script that ties everything together. Save it as swarm_agent.py and run with the agent ID and role as arguments.
#!/usr/bin/env python3
"""Pilot Protocol swarm agent with reputation-based self-organization."""
import subprocess
import json
import time
import random
import threading
import os
import sys
import math
REGISTRY_ADDR = os.environ.get("REGISTRY_ADDR", "rendezvous.example.com:9000")
class SwarmAgent:
def __init__(self, agent_id, role):
self.agent_id = agent_id
self.role = role
self.hostname = f"swarm-{role}-{agent_id}"
self.peers = {}
self.tasks_completed = 0
def pilotctl(self, *args):
cmd = ["pilotctl"] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(result.stderr)
return result.stdout.strip()
def start(self):
subprocess.Popen([
"pilot-daemon",
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(2)
self.pilotctl("set-hostname", self.hostname)
self.pilotctl("extras", "set-tags", f"role={self.role},swarm=demo")
def discover(self):
output = self.pilotctl("peers", "--search", "swarm=demo")
my_addr = self.my_address()
for peer in json.loads(output):
addr = peer["address"]
if addr != my_addr:
self.peers[addr] = {
"hostname": peer.get("hostname", "?"),
"role": peer.get("tags", {}).get("role", "?"),
"trusted": self.peers.get(addr, {}).get("trusted", False),
}
def my_address(self):
return json.loads(self.pilotctl("daemon", "status"))["address"]
def trust_peers(self):
for addr, info in self.peers.items():
if not info["trusted"]:
try:
self.pilotctl("handshake", addr)
info["trusted"] = True
except RuntimeError:
pass
def select_peer(self, role):
candidates = [
a for a, i in self.peers.items()
if i["role"] == role and i["trusted"]
]
return random.choice(candidates) if candidates else None
def run(self):
self.start()
time.sleep(5)
self.discover()
self.trust_peers()
threading.Thread(target=self.listen, daemon=True).start()
roles = ["summarizer", "analyzer", "reviewer", "translator"]
while True:
self.discover()
role = random.choice(roles)
target = self.select_peer(role)
if target:
try:
self.pilotctl("data", "send", target,
"--stdin")
except RuntimeError:
pass
time.sleep(random.uniform(10, 30))
def listen(self):
while True:
try:
output = self.pilotctl("data", "recv", "--json")
if not output:
continue
msg = json.loads(output)
sender = msg.get("from", "unknown")
result = {"output": f"Processed by {self.hostname}"}
self.pilotctl("send", sender, json.dumps(result))
self.tasks_completed += 1
except Exception:
continue
if __name__ == "__main__":
agent_id = int(sys.argv[1])
role = sys.argv[2]
SwarmAgent(agent_id, role).run()
Run 10 instances:
python swarm_agent.py 0 summarizer &
python swarm_agent.py 1 summarizer &
python swarm_agent.py 2 analyzer &
python swarm_agent.py 3 analyzer &
python swarm_agent.py 4 reviewer &
python swarm_agent.py 5 reviewer &
python swarm_agent.py 6 translator &
python swarm_agent.py 7 translator &
python swarm_agent.py 8 general &
python swarm_agent.py 9 general &
What You Have Built
This is not a toy demo. The patterns here -- discovery, trust, data exchange, role-based routing, fault tolerance -- are the building blocks of production multi-agent systems. The difference from conventional architectures is what is missing: no orchestrator, no message queue, no service mesh, no load balancer, no health check infrastructure.
The swarm self-organizes because role-based discovery replaces central scheduling. Agents know which peers have the capabilities they need. Tunnel health replaces health checks. If a peer disappears, its tunnel drops and the agent routes to the next available match.
To extend this pattern into a marketplace with capability advertising, see Build an AI Agent Marketplace With Discovery and Reputation. For the network fundamentals, see How Pilot Protocol Works.
Build Your Own Swarm
Everything in this tutorial runs on the open-source Pilot Protocol. Clone the repo and start swarming.
View on GitHub