#!/usr/bin/env python3 """Surrogate-1 V13 — multi-agent runtime parser (ONLY external piece). After V13 trainer bakes /// tokens INTO the model weights (via 8 special tokens registered + multi-agent training data 60K+ traces), the model EMITS these tokens during generation. This 38-line async dispatcher parses them, calls the same model again with the spawned role's system prompt, gathers results in parallel via asyncio, and feeds back into the parent context. Usage: # Hosted on the surrogate-1 ZeroGPU Space as a tool the orchestrator # invokes when generation contains : runtime = MultiAgentRuntime(endpoint="https://surrogate1-surrogate-1-zero-gpu.hf.space") final = await runtime.run(prompt="Build a feature that does X", max_depth=3, max_fanout=8) Hard limits (research recommended): MAX_DEPTH = 3 (recursion cap) MAX_FANOUT = 8 (parallel sub-agents per spawn) """ from __future__ import annotations import asyncio import json import os import re import sys from typing import Optional import httpx # pip install httpx SPAWN_RE = re.compile(r']*)?>(.*?)', re.S) AWAIT_RE = re.compile(r'', re.S) ROLE_RE = re.compile(r'role="([^"]+)"') ID_RE = re.compile(r'id="([^"]+)"') PARALLEL_RE = re.compile(r'parallel="([^"]+)"') class MultiAgentRuntime: def __init__(self, endpoint: str, max_depth: int = 3, max_fanout: int = 8, hf_token: Optional[str] = None): self.endpoint = endpoint self.max_depth = max_depth self.max_fanout = max_fanout self.hf_token = hf_token or os.environ.get("HF_TOKEN") async def _generate(self, prompt: str, system: Optional[str] = None, max_tokens: int = 2048, temperature: float = 0.5) -> str: """Single call to the model (same endpoint, different system prompt).""" body = {"data": [prompt, system or "", max_tokens, temperature]} headers = {"Content-Type": "application/json"} if self.hf_token: headers["Authorization"] = f"Bearer {self.hf_token}" async with httpx.AsyncClient(timeout=180) as cx: for path in ("/api/predict", "/run/predict"): r = await cx.post(self.endpoint.rstrip("/") + path, json=body, headers=headers) if r.status_code == 200: j = r.json() if "data" in j and j["data"]: first = j["data"][0] return first if isinstance(first, str) else json.dumps(first) raise RuntimeError(f"model call failed at {self.endpoint}") def _extract_spawns(self, text: str) -> list[dict]: """Find all blocks, parse role/id/parallel.""" out = [] for m in SPAWN_RE.finditer(text): tag = text[m.start():m.start() + text[m.start():m.end()].find(">") + 1] role = (ROLE_RE.search(tag) or [None, "default"])[1] if ROLE_RE.search(tag) else "default" sid = (ID_RE.search(tag) or [None, "anon"])[1] if ID_RE.search(tag) else "anon" par = ((PARALLEL_RE.search(tag) or [None, "false"])[1] if PARALLEL_RE.search(tag) else "false") == "true" out.append({"role": role, "id": sid, "parallel": par, "body": m.group(1).strip(), "raw_span": (m.start(), m.end())}) return out async def _dispatch(self, parent_text: str, depth: int) -> str: """Recursively expand blocks until none remain or depth cap.""" if depth >= self.max_depth: return parent_text spawns = self._extract_spawns(parent_text) if not spawns: return parent_text spawns = spawns[:self.max_fanout] # Parallel-tagged spawns run via gather; serial ones sequence parallel_group = [s for s in spawns if s["parallel"]] serial_group = [s for s in spawns if not s["parallel"]] results: dict[str, str] = {} if parallel_group: tasks = [self._run_worker(s, depth + 1) for s in parallel_group] outs = await asyncio.gather(*tasks, return_exceptions=True) for s, o in zip(parallel_group, outs): results[s["id"]] = str(o) if not isinstance(o, Exception) else f"{o}" for s in serial_group: try: results[s["id"]] = await self._run_worker(s, depth + 1) except Exception as e: results[s["id"]] = f"{e}" # Replace each block with in the text new_text = parent_text for s in spawns: tag_text = parent_text[s["raw_span"][0]:s["raw_span"][1]] replacement = f'{results.get(s["id"], "")}' new_text = new_text.replace(tag_text, replacement, 1) return new_text async def _run_worker(self, spawn: dict, depth: int) -> str: """Dispatch one sub-agent: call the model with the role system prompt.""" role_prompt = ROLE_SYSTEM_PROMPTS.get(spawn["role"], DEFAULT_SYSTEM) worker_out = await self._generate(spawn["body"], system=role_prompt, max_tokens=2048) # Recursive expansion if the worker also emits return await self._dispatch(worker_out, depth) async def run(self, prompt: str, max_depth: Optional[int] = None, max_fanout: Optional[int] = None) -> str: """Entry: generate from root, then recursively dispatch any .""" if max_depth is not None: self.max_depth = max_depth if max_fanout is not None: self.max_fanout = max_fanout root = await self._generate(prompt, system=DEFAULT_SYSTEM, max_tokens=4096) return await self._dispatch(root, depth=0) # Role system prompts — the model is trained to recognize these via Anthropic # 5-component XML template (research §v13-role-comprehensive) DEFAULT_SYSTEM = ( "You are Surrogate-1, a senior polymath engineer. When a task requires " "multiple roles, emit " "tokens to dispatch sub-agents. Use + " "to gather results. Hard limits: depth ≤ 3, fanout ≤ 8." ) ROLE_SYSTEM_PROMPTS = { "PM": "You are PM (Product Manager). Output PRD with JTBD/OKRs.", "PO": "You are PO. Backlog grooming, sprint planning, acceptance criteria.", "BA": "You are BA. BRD + process modeling + verifiable requirements.", "SA": "You are SA. Multi-system design + ADRs + trade-off analysis.", "principal": "You are Principal Engineer. Cross-cutting tech leadership.", "BE": "You are Backend Engineer. Python/Go/Rust/Node API + data layer.", "FE": "You are Frontend Engineer. React/Vue/Svelte + a11y + perf.", "mobile": "You are Mobile Engineer. iOS/Android/RN/Flutter.", "data": "You are Data Engineer. Pipelines + warehousing.", "ml": "You are ML Engineer. Training + eval + MLOps.", "ai-eng": "You are AI Engineer. RAG + agents + fine-tuning.", "sre": "You are SRE. SLOs + oncall + postmortems + 5-Whys.", "devsecops": "You are DevSecOps. CI/CD security + IaC scanning + supply chain.", "platform": "You are Platform Engineer. IDP + golden paths.", "cloud": "You are Cloud Engineer. AWS/GCP/Azure + cost-aware.", "o11y": "You are Observability Engineer. PromQL/LogQL/TraceQL + SLOs.", "sec": "You are Security Engineer. Threat modeling + AppSec + IR.", "qa": "You are QA. Test strategy + manual + exploratory.", "sdet": "You are SDET. Selenium/Playwright/Cypress + perf via k6.", "sec-test": "You are Security Tester. OWASP + Burp + fuzzing.", "BD": "You are BD. Partnership scouting + deal structuring.", "sales": "You are Sales Engineer. Technical pitch + POC + ROI.", "CS": "You are Customer Success. Onboarding + escalations + expansion.", "founder": "You are Founder/CEO. Vision + fundraising + board.", "growth": "You are Growth Engineer. A/B + funnels + attribution.", "seo": "You are SEO/Content. Keyword research + technical SEO.", "brand": "You are Brand. ICP + messaging + competitive positioning.", "PMM": "You are Product Marketing Manager. Launch + positioning.", "PM-proj": "You are Project Manager. Agile/Scrum/Kanban/SAFe ceremonies.", "techwriter": "You are Tech Writer. RFCs + ADRs + runbooks + postmortems.", "EM": "You are Engineering Manager. 1:1s + perf review + hiring.", } if __name__ == "__main__": # Smoke test async def _smoke(): rt = MultiAgentRuntime( endpoint=os.environ.get("SURROGATE_ENDPOINT", "https://surrogate1-surrogate-1-zero-gpu.hf.space"), ) out = await rt.run( prompt="Ship a feature that adds OAuth2 PKCE login to the Vanguard API. " "Spawn PM/SA/BE/SDET/DevSecOps as needed.", max_depth=2, max_fanout=5, ) print(out) asyncio.run(_smoke())