""" pipeline.py — 9-phase AI orchestrator for Immersive Vibe Development Studio. Phases 1–7: sequential, one agent per phase. Phase 8: round-robin coding session (Lead Coder + Geom Builder, 4 rounds). Phase 9: auto-lint QA + one auto-fix round if needed. Yields JSON strings (chunk protocol) throughout; app.py forwards to the streaming bridge. """ import json import re import tempfile import threading import time import uuid import zipfile from pathlib import Path from typing import Generator from huggingface_hub import InferenceClient SOULS_DIR = Path(__file__).parent / "souls" _REGISTRY_PATH = Path(__file__).parent / "sandbox_cache" / "characters_registry.json" MODELS = [ "deepseek-ai/DeepSeek-V4-Flash", "deepseek-ai/DeepSeek-V4-Pro", "MiniMaxAI/MiniMax-M2.7", "tencent/Hy3-preview", "moonshotai/Kimi-K2.6", ] SOUL_PATHS: dict[str, Path] = { "Art Director": SOULS_DIR / "creative/theme-asset-director/SOUL.md", "Char Designer": SOULS_DIR / "creative/blocky-character-designer/SOUL.md", "Geom Builder": SOULS_DIR / "development/geometry-builder/SOUL.md", "Texture Director": SOULS_DIR / "creative/texture-director/SOUL.md", "Game Architect": SOULS_DIR / "development/platformer-architect/SOUL.md", "Scene Composer": SOULS_DIR / "creative/3d-scene-composer/SOUL.md", "Lead Coder": SOULS_DIR / "development/threejs-developer/SOUL.md", } PHASE_ROLES: list[tuple[int, str, str | None]] = [ (1, "Asset Manifest", "Art Director"), (2, "Char Design", "Char Designer"), (3, "Geometry", "Geom Builder"), (4, "Textures", "Texture Director"), (5, "Mechanics", "Game Architect"), (6, "Atmosphere", "Scene Composer"), (7, "Initial Build", "Lead Coder"), (9, "QA", None), ] def _load_soul(role: str) -> str: path = SOUL_PATHS.get(role) if path and path.exists(): return path.read_text() return f"You are a {role} building a Three.js platformer game." def _load_character_pool() -> list[str]: """Load character function names from pre-extracted registry at module import.""" if _REGISTRY_PATH.exists(): registry: dict[str, str] = json.loads(_REGISTRY_PATH.read_text()) # Named characters (not generic world-builders) char_tags = [ "Character", "Songoku", "Raticate", "Persian", "Tauros", "Snorlax", "Graveler", "Onix", "Zubat", "Golbat", "Pidgey", "Fearow", "Beedrill", "Butterfree", "Voltorb", "Electrode", "Jigglypuff", "Abra", "Alakazam", "Gengar", "Doduo", "Rapidash", ] named = [k for k in registry if any(t in k for t in char_tags)] heroes = [k for k in registry if "buildHero" in k] pool = named + [h for h in heroes if h not in named] return pool if pool else ["buildHero_jungle"] return ["buildHero_jungle"] CHARACTER_POOL: list[str] = _load_character_pool() def _assign_models(selected: list[str]) -> dict[str, str]: """ Assign roles to selected models by capability heuristic. Returns: role_name → model_id """ flash_first = sorted(selected, key=lambda m: (0 if "Flash" in m else 1)) pro_first = sorted(selected, key=lambda m: (0 if ("Pro" in m or "K2" in m) else 1)) assignments: dict[str, str] = {} if len(selected) == 2: a, b = selected[0], selected[1] for role in ("Art Director", "Geom Builder", "Game Architect", "Lead Coder"): assignments[role] = a for role in ("Char Designer", "Texture Director", "Scene Composer"): assignments[role] = b assignments["Geom Builder (Phase 8)"] = b return assignments assignments["Art Director"] = flash_first[0] assignments["Lead Coder"] = pro_first[0] filler_roles = ["Char Designer", "Geom Builder", "Texture Director", "Game Architect", "Scene Composer"] remaining = [m for m in selected if m != assignments["Art Director"] and m != assignments["Lead Coder"]] if not remaining: remaining = selected[:] for i, role in enumerate(filler_roles): if role not in assignments: assignments[role] = remaining[i % len(remaining)] assignments["Geom Builder (Phase 8)"] = assignments.get("Geom Builder", selected[0]) return assignments class Pipeline: def __init__( self, trigger: str, selected_models: list[str], token: str, cancel_flag: threading.Event, ): parts = re.sub(r"[-_]+", " ", trigger.strip()).split() self.theme = parts[0].lower() if len(parts) > 0 else "jungle" self.hero = parts[1].lower() if len(parts) > 1 else "monkey" self.selected_models = selected_models self.token = token self.cancel_flag = cancel_flag self.assignments = _assign_models(selected_models) self.context: list[dict] = [] self.phase_start_times: dict[int, float] = {} self._final_code: str = "" self._qa_warnings: list[str] = [] # ── Public ─────────────────────────────────────────────────────────────── def run(self) -> Generator[str, None, None]: yield from self._run_phases_1_to_7() if self.cancel_flag.is_set(): yield self._cancelled() return yield from self._run_phase_8() if self.cancel_flag.is_set(): yield self._cancelled() return yield from self._run_phase_9() def get_model_assignments(self) -> list[dict]: """Return list suitable for build_studio_html().""" colors = ["#f5c542", "#4a90e2", "#7ed321", "#e94f37", "#9b59b6"] seen: dict[str, dict] = {} desk = 1 for _, _, role in PHASE_ROLES[:-1]: # exclude QA if role is None or role in seen: continue model_id = self.assignments.get(role, self.selected_models[0]) char_fn = CHARACTER_POOL[(desk - 1) % len(CHARACTER_POOL)] seen[role] = { "model_id": model_id, "role": role, "character_fn": char_fn, "color": colors[(desk - 1) % len(colors)], "desk": desk, } desk += 1 if desk > 5: break return list(seen.values()) def build_zip(self) -> tuple[bytes, str]: """Build downloadable ZIP. Call after run() completes.""" trace_html = self._build_trace_html() zip_name = f"{self.theme}_{self.hero}_{uuid.uuid4().hex[:8]}" tmp_dir = tempfile.mkdtemp(prefix="ivds_") zip_path = Path(tmp_dir) / f"{zip_name}.zip" with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: zf.writestr("index.html", self._final_code) zf.writestr("trace.html", trace_html) return zip_path.read_bytes(), zip_name # ── Private: streaming helpers ──────────────────────────────────────────── def _cancelled(self) -> str: return json.dumps({"type": "cancelled"}) def _phase_header(self, phase_num: int, phase_name: str, role: str | None) -> str: self.phase_start_times[phase_num] = time.time() return json.dumps({ "type": "phase_start", "phase": phase_num, "phase_name": phase_name, "role": role, }) def _phase_footer(self, phase_num: int, phase_name: str) -> str: return json.dumps({"type": "phase_complete", "phase": phase_num, "phase_name": phase_name}) def _commit(self, phase_num: int, text: str) -> str: return json.dumps({"type": "commit", "phase": phase_num, "text": text}) def _context_summary(self, max_chars: int = 12000) -> str: summary = "\n\n".join( f"=== Phase {c['phase']} ({c['role']}) ===\n{c['content']}" for c in self.context ) return summary[-max_chars:] if len(summary) > max_chars else summary # ── Private: model call ─────────────────────────────────────────────────── def _call_model( self, role: str, messages: list[dict], phase_num: int ) -> Generator[str, None, None]: model_id = self.assignments.get(role, self.selected_models[0]) system = _load_soul(role) full_messages = [{"role": "system", "content": system}] + messages full_response: list[str] = [] client = InferenceClient(model=model_id, token=self.token) for attempt in range(3): if self.cancel_flag.is_set(): return try: for chunk in client.chat_completion( messages=full_messages, stream=True, max_tokens=4096 ): if self.cancel_flag.is_set(): return text = chunk.choices[0].delta.content or "" if text: full_response.append(text) yield json.dumps({ "type": "text", "model": model_id, "role": role, "text": text, "phase": phase_num, }) break # success — exit retry loop except Exception as e: err_str = str(e) is_retryable = any(c in err_str for c in ["429", "503", "timeout"]) if attempt < 2 and is_retryable: wait = 4 ** attempt # 1s, 4s, 16s time.sleep(wait) continue yield json.dumps({ "type": "error", "model": model_id, "role": role, "text": f"Model error: {err_str[:120]}", "phase": phase_num, }) return response_text = "".join(full_response) self.context.append({"phase": phase_num, "role": role, "content": response_text}) # ── Private: phases ─────────────────────────────────────────────────────── def _run_phases_1_to_7(self) -> Generator[str, None, None]: prompts = { 1: ( f"Create an asset manifest for a '{self.theme} {self.hero}' platformer. " f"List: hero design, 2 obstacles (1 ground, 1 aerial), 1 collectible, " f"1 platform tile, 1 background prop, 3 decoratives. " f"Be specific about blocky 3D geometry and provide a 5-color hex palette." ), 2: ( f"Design the blocky hero character '{self.hero}' for theme '{self.theme}'. " f"Describe head, body, limbs using box primitives only. " f"Reference:\n{self._context_summary()}" ), 3: ( f"Write Three.js geometry specifications (using BoxGeometry + MeshToonMaterial) " f"for all assets in the manifest. Share materials by hex color. " f"flatShading: true on all.\n" f"Reference:\n{self._context_summary()}" ), 4: ( f"Design the texture and color approach for '{self.theme}'. " f"All textures MUST be canvas2D procedural (no image files). " f"Provide makeXxxTexture() function sketches.\n" f"Reference:\n{self._context_summary()}" ), 5: ( f"Design platformer mechanics for '{self.theme} {self.hero}': " f"movement speed, jump force, gravity (-22), maxFallSpeed (-22), laneWidth (1.5), " f"platform tile recycling at z<-40 / z>5, collectible scoring, obstacle collision. " f"Hero z MUST always be 0.\n" f"Reference:\n{self._context_summary()}" ), 6: ( f"Design the atmosphere for '{self.theme}': ambient/directional light colors, " f"scene.background hex, fog color (MUST match background exactly), fog density. " f"Reference:\n{self._context_summary()}" ), 7: ( f"Write the COMPLETE Three.js platformer game as a single self-contained HTML file. " f"HARD CONSTRAINTS:\n" f"- CDN Three.js r167 from unpkg, no importmap, no