| """ |
| pipeline.py — 9-phase AI orchestrator for Immersive Vibe Development Studio. |
| |
| Phases 1–7: sequential, one agent per phase. |
| Phase 8: round-robin coding session (Lead Coder + Geom Builder, 4 rounds). |
| Phase 9: auto-lint QA + one auto-fix round if needed. |
| |
| Yields JSON strings (chunk protocol) throughout; app.py forwards to the streaming bridge. |
| """ |
| import json |
| import re |
| import tempfile |
| import threading |
| import time |
| import uuid |
| import zipfile |
| from pathlib import Path |
| from typing import Generator |
|
|
| from huggingface_hub import InferenceClient |
|
|
| SOULS_DIR = Path(__file__).parent / "souls" |
| _REGISTRY_PATH = Path(__file__).parent / "sandbox_cache" / "characters_registry.json" |
|
|
| MODELS = [ |
| "deepseek-ai/DeepSeek-V4-Flash", |
| "deepseek-ai/DeepSeek-V4-Pro", |
| "MiniMaxAI/MiniMax-M2.7", |
| "tencent/Hy3-preview", |
| "moonshotai/Kimi-K2.6", |
| ] |
|
|
| SOUL_PATHS: dict[str, Path] = { |
| "Art Director": SOULS_DIR / "creative/theme-asset-director/SOUL.md", |
| "Char Designer": SOULS_DIR / "creative/blocky-character-designer/SOUL.md", |
| "Geom Builder": SOULS_DIR / "development/geometry-builder/SOUL.md", |
| "Texture Director": SOULS_DIR / "creative/texture-director/SOUL.md", |
| "Game Architect": SOULS_DIR / "development/platformer-architect/SOUL.md", |
| "Scene Composer": SOULS_DIR / "creative/3d-scene-composer/SOUL.md", |
| "Lead Coder": SOULS_DIR / "development/threejs-developer/SOUL.md", |
| } |
|
|
| PHASE_ROLES: list[tuple[int, str, str | None]] = [ |
| (1, "Asset Manifest", "Art Director"), |
| (2, "Char Design", "Char Designer"), |
| (3, "Geometry", "Geom Builder"), |
| (4, "Textures", "Texture Director"), |
| (5, "Mechanics", "Game Architect"), |
| (6, "Atmosphere", "Scene Composer"), |
| (7, "Initial Build", "Lead Coder"), |
| (9, "QA", None), |
| ] |
|
|
|
|
| def _load_soul(role: str) -> str: |
| path = SOUL_PATHS.get(role) |
| if path and path.exists(): |
| return path.read_text() |
| return f"You are a {role} building a Three.js platformer game." |
|
|
|
|
| def _load_character_pool() -> list[str]: |
| """Load character function names from pre-extracted registry at module import.""" |
| if _REGISTRY_PATH.exists(): |
| registry: dict[str, str] = json.loads(_REGISTRY_PATH.read_text()) |
| |
| char_tags = [ |
| "Character", "Songoku", "Raticate", "Persian", "Tauros", "Snorlax", |
| "Graveler", "Onix", "Zubat", "Golbat", "Pidgey", "Fearow", "Beedrill", |
| "Butterfree", "Voltorb", "Electrode", "Jigglypuff", "Abra", "Alakazam", |
| "Gengar", "Doduo", "Rapidash", |
| ] |
| named = [k for k in registry if any(t in k for t in char_tags)] |
| heroes = [k for k in registry if "buildHero" in k] |
| pool = named + [h for h in heroes if h not in named] |
| return pool if pool else ["buildHero_jungle"] |
| return ["buildHero_jungle"] |
|
|
|
|
| CHARACTER_POOL: list[str] = _load_character_pool() |
|
|
|
|
| def _assign_models(selected: list[str]) -> dict[str, str]: |
| """ |
| Assign roles to selected models by capability heuristic. |
| Returns: role_name → model_id |
| """ |
| flash_first = sorted(selected, key=lambda m: (0 if "Flash" in m else 1)) |
| pro_first = sorted(selected, key=lambda m: (0 if ("Pro" in m or "K2" in m) else 1)) |
|
|
| assignments: dict[str, str] = {} |
|
|
| if len(selected) == 2: |
| a, b = selected[0], selected[1] |
| for role in ("Art Director", "Geom Builder", "Game Architect", "Lead Coder"): |
| assignments[role] = a |
| for role in ("Char Designer", "Texture Director", "Scene Composer"): |
| assignments[role] = b |
| assignments["Geom Builder (Phase 8)"] = b |
| return assignments |
|
|
| assignments["Art Director"] = flash_first[0] |
| assignments["Lead Coder"] = pro_first[0] |
|
|
| filler_roles = ["Char Designer", "Geom Builder", "Texture Director", |
| "Game Architect", "Scene Composer"] |
| remaining = [m for m in selected |
| if m != assignments["Art Director"] and m != assignments["Lead Coder"]] |
| if not remaining: |
| remaining = selected[:] |
| for i, role in enumerate(filler_roles): |
| if role not in assignments: |
| assignments[role] = remaining[i % len(remaining)] |
|
|
| assignments["Geom Builder (Phase 8)"] = assignments.get("Geom Builder", selected[0]) |
| return assignments |
|
|
|
|
| class Pipeline: |
| def __init__( |
| self, |
| trigger: str, |
| selected_models: list[str], |
| token: str, |
| cancel_flag: threading.Event, |
| ): |
| parts = re.sub(r"[-_]+", " ", trigger.strip()).split() |
| self.theme = parts[0].lower() if len(parts) > 0 else "jungle" |
| self.hero = parts[1].lower() if len(parts) > 1 else "monkey" |
| self.selected_models = selected_models |
| self.token = token |
| self.cancel_flag = cancel_flag |
| self.assignments = _assign_models(selected_models) |
| self.context: list[dict] = [] |
| self.phase_start_times: dict[int, float] = {} |
| self._final_code: str = "" |
| self._qa_warnings: list[str] = [] |
|
|
| |
|
|
| def run(self) -> Generator[str, None, None]: |
| yield from self._run_phases_1_to_7() |
| if self.cancel_flag.is_set(): |
| yield self._cancelled() |
| return |
| yield from self._run_phase_8() |
| if self.cancel_flag.is_set(): |
| yield self._cancelled() |
| return |
| yield from self._run_phase_9() |
|
|
| def get_model_assignments(self) -> list[dict]: |
| """Return list suitable for build_studio_html().""" |
| colors = ["#f5c542", "#4a90e2", "#7ed321", "#e94f37", "#9b59b6"] |
| seen: dict[str, dict] = {} |
| desk = 1 |
| for _, _, role in PHASE_ROLES[:-1]: |
| if role is None or role in seen: |
| continue |
| model_id = self.assignments.get(role, self.selected_models[0]) |
| char_fn = CHARACTER_POOL[(desk - 1) % len(CHARACTER_POOL)] |
| seen[role] = { |
| "model_id": model_id, |
| "role": role, |
| "character_fn": char_fn, |
| "color": colors[(desk - 1) % len(colors)], |
| "desk": desk, |
| } |
| desk += 1 |
| if desk > 5: |
| break |
| return list(seen.values()) |
|
|
| def build_zip(self) -> tuple[bytes, str]: |
| """Build downloadable ZIP. Call after run() completes.""" |
| trace_html = self._build_trace_html() |
| zip_name = f"{self.theme}_{self.hero}_{uuid.uuid4().hex[:8]}" |
| tmp_dir = tempfile.mkdtemp(prefix="ivds_") |
| zip_path = Path(tmp_dir) / f"{zip_name}.zip" |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: |
| zf.writestr("index.html", self._final_code) |
| zf.writestr("trace.html", trace_html) |
| return zip_path.read_bytes(), zip_name |
|
|
| |
|
|
| def _cancelled(self) -> str: |
| return json.dumps({"type": "cancelled"}) |
|
|
| def _phase_header(self, phase_num: int, phase_name: str, role: str | None) -> str: |
| self.phase_start_times[phase_num] = time.time() |
| return json.dumps({ |
| "type": "phase_start", |
| "phase": phase_num, |
| "phase_name": phase_name, |
| "role": role, |
| }) |
|
|
| def _phase_footer(self, phase_num: int, phase_name: str) -> str: |
| return json.dumps({"type": "phase_complete", "phase": phase_num, |
| "phase_name": phase_name}) |
|
|
| def _commit(self, phase_num: int, text: str) -> str: |
| return json.dumps({"type": "commit", "phase": phase_num, "text": text}) |
|
|
| def _context_summary(self, max_chars: int = 12000) -> str: |
| summary = "\n\n".join( |
| f"=== Phase {c['phase']} ({c['role']}) ===\n{c['content']}" |
| for c in self.context |
| ) |
| return summary[-max_chars:] if len(summary) > max_chars else summary |
|
|
| |
|
|
| def _call_model( |
| self, role: str, messages: list[dict], phase_num: int |
| ) -> Generator[str, None, None]: |
| model_id = self.assignments.get(role, self.selected_models[0]) |
| system = _load_soul(role) |
| full_messages = [{"role": "system", "content": system}] + messages |
| full_response: list[str] = [] |
|
|
| client = InferenceClient(model=model_id, token=self.token) |
| for attempt in range(3): |
| if self.cancel_flag.is_set(): |
| return |
| try: |
| for chunk in client.chat_completion( |
| messages=full_messages, stream=True, max_tokens=4096 |
| ): |
| if self.cancel_flag.is_set(): |
| return |
| text = chunk.choices[0].delta.content or "" |
| if text: |
| full_response.append(text) |
| yield json.dumps({ |
| "type": "text", |
| "model": model_id, |
| "role": role, |
| "text": text, |
| "phase": phase_num, |
| }) |
| break |
| except Exception as e: |
| err_str = str(e) |
| is_retryable = any(c in err_str for c in ["429", "503", "timeout"]) |
| if attempt < 2 and is_retryable: |
| wait = 4 ** attempt |
| time.sleep(wait) |
| continue |
| yield json.dumps({ |
| "type": "error", |
| "model": model_id, |
| "role": role, |
| "text": f"Model error: {err_str[:120]}", |
| "phase": phase_num, |
| }) |
| return |
|
|
| response_text = "".join(full_response) |
| self.context.append({"phase": phase_num, "role": role, "content": response_text}) |
|
|
| |
|
|
| def _run_phases_1_to_7(self) -> Generator[str, None, None]: |
| prompts = { |
| 1: ( |
| f"Create an asset manifest for a '{self.theme} {self.hero}' platformer. " |
| f"List: hero design, 2 obstacles (1 ground, 1 aerial), 1 collectible, " |
| f"1 platform tile, 1 background prop, 3 decoratives. " |
| f"Be specific about blocky 3D geometry and provide a 5-color hex palette." |
| ), |
| 2: ( |
| f"Design the blocky hero character '{self.hero}' for theme '{self.theme}'. " |
| f"Describe head, body, limbs using box primitives only. " |
| f"Reference:\n{self._context_summary()}" |
| ), |
| 3: ( |
| f"Write Three.js geometry specifications (using BoxGeometry + MeshToonMaterial) " |
| f"for all assets in the manifest. Share materials by hex color. " |
| f"flatShading: true on all.\n" |
| f"Reference:\n{self._context_summary()}" |
| ), |
| 4: ( |
| f"Design the texture and color approach for '{self.theme}'. " |
| f"All textures MUST be canvas2D procedural (no image files). " |
| f"Provide makeXxxTexture() function sketches.\n" |
| f"Reference:\n{self._context_summary()}" |
| ), |
| 5: ( |
| f"Design platformer mechanics for '{self.theme} {self.hero}': " |
| f"movement speed, jump force, gravity (-22), maxFallSpeed (-22), laneWidth (1.5), " |
| f"platform tile recycling at z<-40 / z>5, collectible scoring, obstacle collision. " |
| f"Hero z MUST always be 0.\n" |
| f"Reference:\n{self._context_summary()}" |
| ), |
| 6: ( |
| f"Design the atmosphere for '{self.theme}': ambient/directional light colors, " |
| f"scene.background hex, fog color (MUST match background exactly), fog density. " |
| f"Reference:\n{self._context_summary()}" |
| ), |
| 7: ( |
| f"Write the COMPLETE Three.js platformer game as a single self-contained HTML file. " |
| f"HARD CONSTRAINTS:\n" |
| f"- CDN Three.js r167 from unpkg, no importmap, no <script type='module'>\n" |
| f"- Canvas2D procedural textures only (no TextureLoader with local paths)\n" |
| f"- Web Audio API oscillators for sound\n" |
| f"- hero.position.z ALWAYS 0\n" |
| f"- antialias: false\n" |
| f"- scene.fog color EXACTLY matches scene.background\n" |
| f"- Platform tiles recycle at z<-40 / z>5\n" |
| f"- All Vector3/Color pre-allocated before animation loop\n" |
| f"Theme: '{self.theme}', Hero: '{self.hero}'. " |
| f"Full spec:\n{self._context_summary()}" |
| ), |
| } |
|
|
| for phase_num, phase_name, role in PHASE_ROLES[:7]: |
| if self.cancel_flag.is_set(): |
| return |
| assert role is not None |
| yield self._phase_header(phase_num, phase_name, role) |
| yield from self._call_model(role, [{"role": "user", "content": prompts[phase_num]}], phase_num) |
| yield self._phase_footer(phase_num, phase_name) |
| yield self._commit(phase_num, f"Phase {phase_num} {phase_name} ✓") |
|
|
| def _run_phase_8(self) -> Generator[str, None, None]: |
| yield self._phase_header(8, "Coding Session", "Lead Coder + Geom Builder") |
| current_code = next( |
| (c["content"] for c in self.context if c["phase"] == 7), "" |
| ) |
|
|
| for round_num in range(1, 5): |
| if self.cancel_flag.is_set(): |
| return |
| if round_num % 2 == 1: |
| role = "Lead Coder" |
| prompt = ( |
| f"Round {round_num}/4 — Review and improve this Three.js platformer. " |
| f"Focus: game logic polish, hero animation smoothness, collectible feedback. " |
| f"If satisfied after round 3, emit [CODING_COMPLETE] at the end. " |
| f"Return the FULL updated HTML:\n\n{current_code}" |
| ) |
| else: |
| role = "Geom Builder" |
| prompt = ( |
| f"Round {round_num}/4 — Fix geometry/material sharing, " |
| f"texture application, and performance in this Three.js code. " |
| f"Return the FULL updated HTML:\n\n{current_code}" |
| ) |
|
|
| response_parts: list[str] = [] |
| for chunk_json in self._call_model(role, [{"role": "user", "content": prompt}], 8): |
| chunk = json.loads(chunk_json) |
| if chunk.get("type") == "text": |
| response_parts.append(chunk.get("text", "")) |
| yield chunk_json |
|
|
| current_code = "".join(response_parts) |
| yield self._commit(8, f"Round {round_num}/4 complete") |
| if "[CODING_COMPLETE]" in current_code and round_num >= 3: |
| break |
|
|
| self.context.append({"phase": 8, "role": "Coding Session", "content": current_code}) |
| self._final_code = current_code |
| yield self._phase_footer(8, "Coding Session") |
|
|
| def _run_phase_9(self) -> Generator[str, None, None]: |
| yield self._phase_header(9, "QA", None) |
| code = self._final_code |
| violations = self._qa_check(code) |
|
|
| if violations: |
| fix_brief = "Fix the following QA violations:\n" + "\n".join(f"- {v}" for v in violations) |
| prompt = f"{fix_brief}\n\nFull code:\n{code}" |
| response_parts: list[str] = [] |
| for chunk_json in self._call_model("Lead Coder", [{"role": "user", "content": prompt}], 9): |
| chunk = json.loads(chunk_json) |
| if chunk.get("type") == "text": |
| response_parts.append(chunk.get("text", "")) |
| yield chunk_json |
| code = "".join(response_parts) |
| self._final_code = code |
| remaining = self._qa_check(code) |
| else: |
| remaining = [] |
|
|
| self._qa_warnings = remaining |
| yield self._phase_footer(9, "QA") |
| yield json.dumps({"type": "done", "qa_warnings": remaining}) |
|
|
| def _qa_check(self, code: str) -> list[str]: |
| violations: list[str] = [] |
| if re.search(r'\bimport\s+', code): |
| violations.append("Contains ES module import statement") |
| if "importmap" in code: |
| violations.append("Contains importmap") |
| if '<script type="module"' in code or "<script type='module'" in code: |
| violations.append("Contains <script type='module'>") |
| if "new THREE.TextureLoader" in code and re.search(r'TextureLoader.*load\([\'"](?!https?://)', code): |
| violations.append("TextureLoader references local path (must use canvas2D or CDN)") |
| if re.search(r'hero\.position\.z\s*=\s*[^0]', code): |
| violations.append("hero.position.z is being set to non-zero (must always be 0)") |
| return violations |
|
|
| def _build_trace_html(self) -> str: |
| rows = "" |
| for c in self.context: |
| escaped = c["content"].replace("&", "&").replace("<", "<").replace(">", ">") |
| elapsed = "" |
| if c["phase"] in self.phase_start_times: |
| next_start = min( |
| (t for p, t in self.phase_start_times.items() if p > c["phase"]), |
| default=None, |
| ) |
| if next_start: |
| secs = int(next_start - self.phase_start_times[c["phase"]]) |
| elapsed = f" ({secs}s)" |
| rows += ( |
| f"<tr><td>{c['phase']}</td><td>{c['role']}{elapsed}</td>" |
| f"<td><pre>{escaped[:3000]}</pre></td></tr>\n" |
| ) |
| warnings_html = "" |
| if self._qa_warnings: |
| warnings_html = ( |
| "<h2 style='color:#c0392b'>QA Warnings</h2><ul>" |
| + "".join(f"<li>{w}</li>" for w in self._qa_warnings) |
| + "</ul>" |
| ) |
| return f"""<!DOCTYPE html> |
| <html><head><meta charset="utf-8"> |
| <title>IVDS Trace — {self.theme} {self.hero}</title> |
| <style> |
| body{{font-family:monospace;padding:1rem;background:#0d0d0d;color:#e0e0e0}} |
| h1{{color:#7c3aed}} h2{{color:#e67e22}} |
| table{{border-collapse:collapse;width:100%}} |
| td,th{{border:1px solid #333;padding:.5rem;vertical-align:top}} |
| th{{background:#1a1a2e;color:#7c3aed}} |
| pre{{white-space:pre-wrap;margin:0;font-size:0.78rem;color:#b0e0b0}} |
| </style> |
| </head><body> |
| <h1>Immersive Vibe Development Studio — Trace</h1> |
| <p>Theme: <strong>{self.theme}</strong> | Hero: <strong>{self.hero}</strong></p> |
| {warnings_html} |
| <table> |
| <tr><th>Phase</th><th>Role</th><th>Output (first 3000 chars)</th></tr> |
| {rows} |
| </table> |
| </body></html>""" |
|
|