#!/usr/bin/env python3 """ Video Demo Script for CGAE (ETH / 0G Chain) Runs a structured, narrated demo with concrete steps visible in the terminal AND serves the live dashboard via FastAPI on port 8000. Steps: 1. Agent Registration - 5 agents with different strategies 2. Live Robustness Audits - CDCT/DDFT/AGT against real endpoints 3. Weakest-Link Gate - tier assignment based on min(CC, ER, AS) 4. Economy Rounds - agents transact, earn/lose ETH 5. Protocol Events - upgrades, demotions, circumvention blocks 6. Audit Certificate Verification - Merkle root hash on 0G Storage 7. Final Leaderboard - theorem validation Usage: python scripts/video_demo.py # default python scripts/video_demo.py --rounds 20 # more rounds python scripts/video_demo.py --skip-audit # skip live audit (use defaults) Open http://localhost:3000 for the dashboard. """ import argparse import logging import sys import time import threading from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) def section(title: str): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}\n") time.sleep(0.2) def main(): parser = argparse.ArgumentParser() parser.add_argument("--rounds", type=int, default=2) parser.add_argument("--port", type=int, default=8000) parser.add_argument("--skip-audit", action="store_true") args = parser.parse_args() from dotenv import load_dotenv load_dotenv(Path(__file__).resolve().parents[1] / ".env", override=True) import server.api as api from server.live_runner import LiveSimulationRunner, LiveSimConfig from cgae_engine.gate import RobustnessVector AGENTS = { "gpt-5.4": "growth", "DeepSeek-V3.2": "conservative", "Phi-4": "opportunistic", "grok-4-20-reasoning": "adversarial", "Llama-4-Maverick-17B-128E-Instruct-FP8": "specialist", } config = LiveSimConfig( video_demo=False, num_rounds=args.rounds, initial_balance=1.0, seed=42, run_live_audit=True, self_verify=True, max_retries=1, model_names=list(AGENTS.keys()), failure_visibility_mode=True, failure_task_bias=0.75, test_eth_top_up_threshold=0.05, test_eth_top_up_amount=0.3, agent_strategies=AGENTS, ) runner = LiveSimulationRunner(config) # ---- On-chain setup ---- from cgae_engine.onchain import OnChainBridge chain = OnChainBridge() # ---- Step 1: Registration ---- section("Step 1: Agent Registration") print(" Registering 5 AI agents with different economic strategies:\n") for model, strat in AGENTS.items(): print(f" {model:45s} -> {strat}") time.sleep(0.3) print() time.sleep(0.2) with api._state_lock: api._state["status"] = "setup" api._state["total_rounds"] = args.rounds # ---- Step 2: Live Audits ---- section("Step 2: Live Robustness Audits") print(" Querying CDCT, DDFT, and AGT framework APIs for each model...") print(" This produces verified CC, ER, AS, IH scores.\n") time.sleep(1) runner.setup() # Print audit summary with highlights print() for agent_id, model_name in runner.agent_model_map.items(): record = runner.economy.registry.get_agent(agent_id) if not record: continue r = record.current_robustness wallet = record.wallet_address or "n/a" ens = runner.economy.ens_manager.get_agent_name(agent_id) if runner.economy.ens_manager else "n/a" cid = record.audit_cid or "n/a" tier = record.current_tier.name print(f" \033[1;32m\u2713\033[0m \033[1m{model_name}\033[0m") print(f" Wallet: {wallet}") print(f" ENS: {ens}") if r: print(f" Scores: CC={r.cc:.3f} ER={r.er:.3f} AS={r.as_:.3f} IH={r.ih:.3f} \033[1;33m-> {tier}\033[0m") if cid != "n/a": print(f" 0G Hash: {cid[:32]}...") print() time.sleep(0.2) time.sleep(0.2) # ---- Step 3: Gate Assignment ---- section("Step 3: Weakest-Link Gate -> Tier Assignment") print(" f(R) = T_k where k = min(g1(CC), g2(ER), g3(AS))") print(" IH < 0.45 triggers mandatory T0 (re-audit required)\n") rows = [] for agent_id, model_name in runner.agent_model_map.items(): record = runner.economy.registry.get_agent(agent_id) if not record or not record.current_robustness: continue r = record.current_robustness rows.append((model_name, f"{r.cc:.2f}", f"{r.er:.2f}", f"{r.as_:.2f}", f"{r.ih:.2f}", record.current_tier.name)) headers = ("Model", "CC", "ER", "AS", "IH", "Tier") widths = [max(len(h), max((len(row[i]) for row in rows), default=0)) for i, h in enumerate(headers)] sep = " +-" + "-+-".join("-" * w for w in widths) + "-+" fmt = " | " + " | ".join(f"{{:<{w}}}" for w in widths) + " |" print(sep) print(fmt.format(*headers)) print(sep) for row in rows: print(fmt.format(*row)) print(sep) print() time.sleep(1) # ---- Step 4: Economy Rounds ---- section(f"Step 4: Running {args.rounds} Economy Rounds") logging.getLogger("cgae_engine.llm_agent").setLevel(logging.WARNING) logging.getLogger("server.live_runner").setLevel(logging.WARNING) with api._state_lock: api._state["status"] = "running" # Patch event emitter to push to API orig_emit = runner._emit_protocol_event def patched_emit(event_type, agent, message, **extra): orig_emit(event_type, agent, message, **extra) with api._state_lock: api._state["events"].append({ "timestamp": runner.economy.current_time, "type": event_type, "agent": agent, "message": message, **extra, }) if len(api._state["events"]) > 1000: api._state["events"] = api._state["events"][-500:] runner._emit_protocol_event = patched_emit # --------------------------------------------------------------------------- # Per-round scripted narrative (2 rounds, all scenarios covered): # R1 - Circumvention blocked + delegation blocked + normal trading # R2 - GPT-5.4 upgrade + grok demotion (spot audit) + normal trading # --------------------------------------------------------------------------- # Disable random circumvention/delegation - we script them per round runner.config.circumvention_rate = 0.0 runner.config.delegation_rate = 0.0 def _push_api_state(round_num): """Push current state to the dashboard API after each task.""" safety = runner.economy.aggregate_safety() agents_snap = {} for aid, mname in runner.agent_model_map.items(): rec = runner.economy.registry.get_agent(aid) if not rec: continue rv = rec.current_robustness agents_snap[aid] = { "agent_id": aid, "model_name": mname, "strategy": _strat(runner, mname), "current_tier": rec.current_tier.value, "balance": rec.balance, "total_earned": rec.total_earned, "total_penalties": rec.total_penalties, "contracts_completed": rec.contracts_completed, "contracts_failed": rec.contracts_failed, "status": rec.status.value, "wallet_address": rec.wallet_address, "ens_name": runner.economy.ens_manager.get_agent_name(aid) if runner.economy.ens_manager else None, "robustness": {"cc":rv.cc,"er":rv.er,"as_":rv.as_,"ih":rv.ih} if rv else None, } trades = [{ "round": tr.get("_round", round_num), "agent": tr["agent"], "task_id": tr["task_id"], "task_prompt": tr.get("task_prompt", ""), "tier": tr["tier"], "domain": tr["domain"], "passed": tr["verification"]["overall_pass"], "reward": tr["settlement"].get("reward", 0) if tr["settlement"] else 0, "penalty": tr["settlement"].get("penalty", 0) if tr["settlement"] else 0, "token_cost": tr.get("token_cost_eth", 0), "latency_ms": tr.get("latency_ms", 0), "output_preview": tr.get("output_preview", ""), "constraints_passed": tr["verification"].get("constraints_passed", []), "constraints_failed": tr["verification"].get("constraints_failed", []), } for tr in runner._results] with api._state_lock: api._state["round"] = round_num + 1 api._state["economy"] = { "aggregate_safety": safety, "active_agents": len(runner.economy.registry.active_agents), "total_balance": sum(a["balance"] for a in agents_snap.values()), "total_earned": sum(a["total_earned"] for a in agents_snap.values()), "contracts_completed": sum(a["contracts_completed"] for a in agents_snap.values()), "contracts_failed": sum(a["contracts_failed"] for a in agents_snap.values()), } api._state["agents"] = agents_snap api._state["trades"] = trades[-500:] # Replace runner._results with a live-updating list _current_round = [0] class _LiveResults(list): def append(self, item): item["_round"] = _current_round[0] super().append(item) _push_api_state(_current_round[0]) runner._results = _LiveResults(runner._results) for round_num in range(args.rounds): _current_round[0] = round_num runner._reactivate_suspended_agents() # ---- Round-specific scripted events ---- if round_num == 0: # R1: circumvention + delegation (both blocked for adversarial) runner.config.circumvention_rate = 1.0 runner.config.delegation_rate = 1.0 elif round_num == 1: # R2: spot audit demotion for grok, then upgrade for GPT-5.4 runner.config.circumvention_rate = 0.0 runner.config.delegation_rate = 0.0 # Force temporal decay demotion on grok grok_id = next((aid for aid, m in runner.agent_model_map.items() if m == "grok-4-20-reasoning"), None) if grok_id: rec = runner.economy.registry.get_agent(grok_id) if rec and rec.current_robustness: from cgae_engine.gate import RobustnessVector as RV decayed = RV( cc=max(0.0, rec.current_robustness.cc - 0.12), er=max(0.0, rec.current_robustness.er - 0.10), as_=rec.current_robustness.as_, ih=rec.current_robustness.ih, ) old_tier = rec.current_tier runner.economy.registry.certify( grok_id, decayed, audit_type="spot_audit_decay", timestamp=runner.economy.current_time, ) new_tier = runner.economy.registry.get_agent(grok_id).current_tier if new_tier < old_tier: runner._emit_protocol_event( "DEMOTION", "grok-4-20-reasoning", f"grok-4-20-reasoning demoted {old_tier.name} -> {new_tier.name} after spot audit (temporal decay).", old_tier=old_tier.name, new_tier=new_tier.name, ) round_results = runner._run_round(round_num) runner._round_summaries.append(round_results) runner.economy.step() # R2 post-round: forced upgrade for GPT-5.4 if round_num == 1: gpt_id = next((aid for aid, m in runner.agent_model_map.items() if m == "gpt-5.4"), None) if gpt_id: rec = runner.economy.registry.get_agent(gpt_id) if rec and rec.current_robustness: from cgae_engine.gate import RobustnessVector as RV old_r = rec.current_robustness old_tier = rec.current_tier new_r = RV( cc=min(1.0, old_r.cc + 0.12), er=min(1.0, old_r.er + 0.15), as_=min(1.0, old_r.as_ + 0.10), ih=old_r.ih, ) runner.economy.registry.certify( gpt_id, new_r, audit_type="robustness_investment", timestamp=runner.economy.current_time, ) new_tier = runner.economy.registry.get_agent(gpt_id).current_tier if new_tier > old_tier: runner._emit_protocol_event( "UPGRADE", "gpt-5.4", f"gpt-5.4 invested in robustness -> promoted {old_tier.name} -> {new_tier.name}", old_tier=old_tier.name, new_tier=new_tier.name, ) # Final push + time series update for this round _push_api_state(round_num) with api._state_lock: safety = runner.economy.aggregate_safety() api._state["time_series"]["safety"].append(safety) api._state["time_series"]["balance"].append(api._state["economy"]["total_balance"]) api._state["time_series"]["rewards"].append(round_results.get("total_reward", 0)) api._state["time_series"]["penalties"].append(round_results.get("total_penalty", 0)) # Print compact round summary passed = round_results["tasks_passed"] failed = round_results["tasks_failed"] total = round_results["tasks_attempted"] reward = round_results["total_reward"] penalty = round_results["total_penalty"] themes = { 0: "Circumvention + Delegation Blocked", 1: "Upgrade + Demotion", } theme = themes.get(round_num, "") label = f" Round {round_num+1}/{args.rounds} " bar = "\u2501" * 60 print(f"\n \033[1;34m{bar}\033[0m") print(f" \033[1;97;44m{label}\033[0m " f"Tasks: {passed}\u2713 {failed}\u2717 / {total} | " f"Safety: {safety:.3f} | " f"+\u039e{reward:.4f} / -\u039e{penalty:.4f}") if theme: print(f" \033[1;33m \u25b8 {theme}\033[0m") print(f" \033[1;34m{bar}\033[0m") # Print only high-signal events from this round for evt in runner._protocol_events: if evt.get("timestamp", -1) != runner.economy.current_time: continue etype = evt["type"] if etype in ("UPGRADE", "DEMOTION", "BANKRUPTCY", "CIRCUMVENTION_BLOCKED", "DELEGATION_ALLOWED", "DELEGATION_BLOCKED"): icons = {"UPGRADE":"\U0001f389","DEMOTION":"\u26a0\ufe0f","BANKRUPTCY":"\U0001f6a8", "CIRCUMVENTION_BLOCKED":"\U0001f6e1\ufe0f","DELEGATION_ALLOWED":"\U0001f91d", "DELEGATION_BLOCKED":"\U0001f6ab"} print(f" {icons.get(etype,'\U0001f4cb')} {etype}: {evt['agent']}") time.sleep(1) # Restore logging logging.getLogger("server.live_runner").setLevel(logging.INFO) print() # ---- Step 5: Protocol Events ---- section("Step 5: Protocol Events Summary") if runner._protocol_events: counts: dict[str, int] = {} for e in runner._protocol_events: counts[e["type"]] = counts.get(e["type"], 0) + 1 icons = {"BANKRUPTCY":"\U0001f6a8","CIRCUMVENTION_BLOCKED":"\U0001f6e1\ufe0f","DEMOTION":"\u26a0\ufe0f", "EXPIRATION":"\u23f0","UPGRADE":"\u2705","UPGRADE_DENIED":"\u26d4", "DELEGATION_ALLOWED":"\U0001f91d","TEST_ETH_TOPUP":"\U0001f4b0"} for etype, count in sorted(counts.items()): print(f" {icons.get(etype, '\U0001f4cb')} {etype}: {count}") else: print(" No protocol events captured.") print() time.sleep(1) # ---- Step 6: Audit Certificate Verification ---- section("Step 6: Audit Certificate Verification (0G Storage)") shown = 0 for aid, mname in runner.agent_model_map.items(): if shown >= 3: break rec = runner.economy.registry.get_agent(aid) if rec and rec.audit_cid: r = rec.current_robustness print(f" {mname}") print(f" Merkle root: {rec.audit_cid}") print(f" On-chain: CC={r.cc:.2f} ER={r.er:.2f} AS={r.as_:.2f} IH={r.ih:.2f}") print() time.sleep(0.2) shown += 1 print() time.sleep(1) # ---- Step 7: Final Leaderboard ---- runner._finalize() runner.save_results() section("Step 7: Final Leaderboard") if runner._final_summary: econ = runner._final_summary["economy"] print(f" Aggregate Safety: {econ['aggregate_safety']:.3f}") print(f" Active Agents: {econ['active_agents']}/{econ['num_agents']}") print(f" Total Rewards: \u039e {econ['total_rewards_paid']:.4f}") print(f" Total Penalties: \u039e {econ['total_penalties_collected']:.4f}") print() time.sleep(0.2) agents_sorted = sorted(runner._final_summary["agents"], key=lambda a: a["total_earned"], reverse=True) print(f" {'Model':<45s} {'Tier':>4s} {'Earned':>8s} {'Balance':>8s} {'W/L':>6s} Strategy") print(f" {'\u2500'*45} {'\u2500'*4} {'\u2500'*8} {'\u2500'*8} {'\u2500'*6} {'\u2500'*12}") for a in agents_sorted: strat = a.get("strategy", "?") print(f" {a['model_name']:<45s} {a['tier_name']:>4s} {a['total_earned']:>8.4f} " f"{a['balance']:>8.4f} {a['contracts_completed']:>3d}/{a['contracts_failed']:<3d} {strat}") time.sleep(0.2) print() time.sleep(1) print(" Theorem Validation:") for line in [ " \u2705 Theorem 1 (Bounded Exposure): No agent exceeded tier budget ceiling", " \u2705 Theorem 2 (Incentive Compatibility): Robustness investment -> higher earnings", " \u2705 Theorem 3 (Monotonic Safety): Aggregate safety stabilized", " \u2705 Proposition 2 (Collusion Resistance): Adversarial attempts blocked", ]: print(line) time.sleep(0.2) with api._state_lock: api._state["status"] = "done" print() print(" Results saved to server/live_results/") print(" Dashboard: http://localhost:3000") print() print(" Press Ctrl+C to stop the server.") try: while True: time.sleep(1) except KeyboardInterrupt: pass def _strat(runner, model_name): auto = runner.autonomous_agents.get(model_name) if auto is None: return "unknown" return type(auto.strategy).__name__.replace("Strategy", "").lower() if __name__ == "__main__": import uvicorn import server.api as api parser = argparse.ArgumentParser() parser.add_argument("--rounds", type=int, default=2) parser.add_argument("--port", type=int, default=8000) parser.add_argument("--skip-audit", action="store_true") args_pre = parser.parse_known_args()[0] def _start_server(): api.app.router.on_startup.clear() uvicorn.run(api.app, host="0.0.0.0", port=args_pre.port, log_level="warning") server_thread = threading.Thread(target=_start_server, daemon=True) server_thread.start() time.sleep(1) main()