#!/usr/bin/env python3 """ Live inference script for Swarm Incident Response OpenEnv. MANDATORY STDOUT FORMAT: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= """ from __future__ import annotations import json import os import re import shutil import sys import textwrap from dataclasses import dataclass from pathlib import Path from typing import Any, Optional import requests from openai import OpenAI from swarm_openenv_env.environment import IncidentResponseEnv from swarm_openenv_env.models import IncidentAction, IncidentObservation from swarm_openenv_env.tasks import TASKS, get_task, list_task_ids OPENAI_BASE_URL = os.getenv("API_BASE_URL") or "https://api.openai.com/v1" OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or os.getenv("HF_TOKEN") or "" GEMINI_API_KEY = ( os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_TOKEN") or "" ) REPO_ROOT = Path(__file__).resolve().parent TRAINED_GGUF_PATH = Path( os.getenv( "MODEL_PATH", str( REPO_ROOT / "models" / "aryan-gupta-2010" / "meta_hackthon_2010_2026" / "Llama-3.1-8B-Instruct.Q4_K_M.gguf" ), ) ) def _normalize_local_openai_base_url(raw_url: str) -> str: url = (raw_url or "").strip().rstrip("/") if not url: return "http://127.0.0.1:1234/v1" if not url.endswith("/v1"): url = f"{url}/v1" return url LOCAL_OPENAI_BASE_URL = _normalize_local_openai_base_url( os.getenv("LOCAL_OPENAI_BASE_URL") or "http://127.0.0.1:1234" ) LOCAL_OPENAI_API_KEY = os.getenv("LOCAL_OPENAI_API_KEY") or "lm-studio" MODEL_NAME = os.getenv("MODEL_NAME") or "meta_hackthon_2010_2026" LLM_PROVIDER = (os.getenv("LLM_PROVIDER") or "").strip().lower() PROMPT = os.getenv("PROMPT") or os.getenv("INCIDENT_PROMPT") or "" TASK_OVERRIDE = os.getenv("TASK") or "" ALLOW_SCRIPTED_BASELINE = os.getenv("ALLOW_SCRIPTED_BASELINE", "").strip().lower() in { "1", "true", "yes", } BENCHMARK = "swarm-incident-response-openenv" # When running inside a Hugging Face Space, backend/main.py listens on $PORT # (default 7860 in the Dockerfile) instead of the local-dev default of 8000. # Honor PORT so inference.py reaches the in-Space bridge without code changes. _BRIDGE_PORT = (os.getenv("FRONTEND_BRIDGE_PORT") or os.getenv("PORT") or "8000").strip() or "8000" FRONTEND_BRIDGE_URL = ( os.getenv("FRONTEND_BRIDGE_URL") or f"http://127.0.0.1:{_BRIDGE_PORT}/api/openenv/bridge" ) ENABLE_FRONTEND_BRIDGE = os.getenv("ENABLE_FRONTEND_BRIDGE", "1").strip().lower() not in { "0", "false", "no", } SYSTEM_PROMPT = """You are FrontierLabs Swarm-OS, an elite incident-response operator for real enterprise failures. Act like a calm senior production engineer: inspect evidence first, communicate clearly, use safe remediations, and close only when the incident is actually controlled. You interact with a structured environment. Respond with ONLY valid JSON matching this schema: { "operation": "inspect_artifact | open_ticket | send_status_update | propose_fix | resolve_incident", "target": "artifact or workflow target", "content": "free-text workflow content" } Rules: - Inspect relevant artifacts before proposing a fix. - Move with executive urgency but never skip evidence collection. - Tickets must mention severity and business impact. - Status updates should mention impact and ETA when needed. - Respect the live budget telemetry and prefer remediations that control cost when FinOps pressure is high. - Fixes and resolutions must be safe, auditable, and specific. - Prefer production-ready language: rollback, compatibility layer, monitored rollout, validation, blast radius, recovery. - No markdown, no code fences, no explanations outside the JSON object. """ TASK_SELECTION_PROMPT = """You are routing a user incident prompt into one OpenEnv task. Return ONLY valid JSON: {"task_id":"task_easy_gpu_oom"|"task_medium_schema_drift"|"task_hard_canary_regression"} """ RECOMMENDED_PROMPTS: dict[str, str] = { "task_easy_gpu_oom": ( "CRITICAL INCIDENT: A single-GPU production fine-tuning job is breaching memory and the " "business needs an audit-safe rescue plan now. Investigate the evidence like a senior ML " "platform engineer, open a properly prioritized incident, communicate impact, and propose a " "strict single-GPU remediation using mixed precision, checkpointing, or safer memory controls. " "Do not hand-wave. Make every action sound production-ready and credible." ), "task_medium_schema_drift": ( "SEV-2 DATA INCIDENT: Customer-facing dashboards are stale after an upstream schema change. " "Operate like a data platform incident commander: inspect the schema evidence, explain the blast " "radius, communicate ETA to stakeholders, and propose a backward-compatible fix with validation " "and backfill steps that would pass an engineering review." ), "task_hard_canary_regression": ( "SEV-1 CANARY FAILURE: A checkout canary is causing customer-visible latency and errors in eu-west. " "Respond like a production SRE under pressure: inspect deploy evidence, confirm impact, post a " "stakeholder update, recommend the safest rollback or feature-flag mitigation, and close only with " "monitored recovery language that sounds real." ), } GENERAL_HERO_PROMPT = ( "You are demonstrating a frontier OpenEnv benchmark for Theme 3.1 Professional Tasks. " "Behave like a world-class production incident responder working inside a real enterprise system. " "Use evidence, escalation discipline, safe remediation, and crisp executive communication." ) TASK_POLICIES: dict[str, list[dict[str, str]]] = { "task_easy_gpu_oom": [ {"operation": "inspect_artifact", "target": "telemetry", "content": ""}, {"operation": "inspect_artifact", "target": "runbook", "content": ""}, { "operation": "open_ticket", "target": "incident_ticket", "content": ( "P1 OOM on single GPU training job. Business impact: nightly fine-tune " "missed SLA and single GPU memory is exhausted." ), }, { "operation": "propose_fix", "target": "remediation_plan", "content": ( "Enable torch.autocast mixed precision, add gradient checkpoint support, " "reduce peak memory on the single GPU, and monitor VRAM after rollout." ), }, { "operation": "resolve_incident", "target": "closure_note", "content": ( "Memory usage is reduced for the single GPU workload. Continue to monitor " "VRAM and batch-size stability after the change." ), }, ], "task_medium_schema_drift": [ {"operation": "inspect_artifact", "target": "schema_diff", "content": ""}, {"operation": "inspect_artifact", "target": "job_log", "content": ""}, {"operation": "inspect_artifact", "target": "runbook", "content": ""}, { "operation": "open_ticket", "target": "incident_ticket", "content": ( "P2 schema drift broke the dashboard pipeline. Business impact: customer " "dashboard numbers are stale." ), }, { "operation": "send_status_update", "target": "stakeholders", "content": ( "Dashboards are stale. ETA 45 minutes while we apply a compatible fix " "and validate downstream models." ), }, { "operation": "propose_fix", "target": "remediation_plan", "content": ( "Add a compatibility mapping from state back to status, backfill the " "missing partition, and keep downstream reads compatible." ), }, { "operation": "resolve_incident", "target": "closure_note", "content": ( "Backfill completed, downstream validation passed, and the compatible " "mapping is in place." ), }, ], "task_hard_canary_regression": [ {"operation": "inspect_artifact", "target": "latency_chart", "content": ""}, {"operation": "inspect_artifact", "target": "deploy_diff", "content": ""}, {"operation": "inspect_artifact", "target": "runbook", "content": ""}, { "operation": "open_ticket", "target": "incident_ticket", "content": ( "P1 checkout latency regression. Business impact: checkout timeouts " "for customers in eu-west." ), }, { "operation": "send_status_update", "target": "stakeholders", "content": ( "Customer impact confirmed in eu-west. Rolling back the canary now; " "next ETA update in 15 minutes." ), }, { "operation": "propose_fix", "target": "remediation_plan", "content": ( "Rollback the canary, disable the feature flag, and monitor latency " "and error-rate recovery before re-enabling anything." ), }, { "operation": "resolve_incident", "target": "closure_note", "content": ( "Rollback recovered the service. Monitoring remains active to verify " "latency and error rates stay healthy." ), }, ], } @dataclass class LLMClient: provider: str model_name: str openai_client: Optional[OpenAI] = None gemini_api_key: str = "" base_url: str = "" model_path: str = "" def configured_provider_order() -> list[str]: if LLM_PROVIDER in {"openai", "gemini", "local"}: return [LLM_PROVIDER] order: list[str] = [] if TRAINED_GGUF_PATH.exists(): order.append("local") if OPENAI_API_KEY: order.append("openai") if GEMINI_API_KEY: order.append("gemini") if not order and MODEL_NAME.lower().startswith("gemini") and GEMINI_API_KEY: order.append("gemini") return order def available_provider_names() -> list[str]: return configured_provider_order() def detect_provider() -> str: order = configured_provider_order() return order[0] if order else "" def create_clients() -> list[LLMClient]: clients: list[LLMClient] = [] for provider in configured_provider_order(): if provider == "local": clients.append( LLMClient( provider="local", model_name=MODEL_NAME or TRAINED_GGUF_PATH.stem, openai_client=OpenAI( base_url=LOCAL_OPENAI_BASE_URL, api_key=LOCAL_OPENAI_API_KEY, ), base_url=LOCAL_OPENAI_BASE_URL, model_path=str(TRAINED_GGUF_PATH), ) ) elif provider == "openai" and OPENAI_API_KEY: clients.append( LLMClient( provider="openai", model_name=MODEL_NAME, openai_client=OpenAI( base_url=OPENAI_BASE_URL, api_key=OPENAI_API_KEY, ), base_url=OPENAI_BASE_URL, ) ) elif provider == "gemini" and GEMINI_API_KEY: clients.append( LLMClient( provider="gemini", model_name=MODEL_NAME, gemini_api_key=GEMINI_API_KEY, ) ) return clients def create_client() -> Optional[LLMClient]: clients = create_clients() return clients[0] if clients else None def extract_json_object(text: str) -> dict[str, Any]: cleaned = text.strip() cleaned = re.sub(r"```(?:json)?\s*", "", cleaned) cleaned = re.sub(r"```", "", cleaned) decoder = json.JSONDecoder() for index, char in enumerate(cleaned): if char != "{": continue try: obj, _ = decoder.raw_decode(cleaned[index:]) except json.JSONDecodeError: continue if isinstance(obj, dict): return obj raise ValueError("No valid JSON object found in model output.") def compact_action_dict(action: IncidentAction) -> dict[str, Any]: payload = action.model_dump() if not payload.get("metadata"): payload.pop("metadata", None) return payload def clean_error_message(error: str) -> str: return re.sub(r"\s+", " ", str(error)).strip()[:220] def _bridge_task_scenario_id(task_id: str) -> str: return { "task_easy_gpu_oom": "primary", "task_medium_schema_drift": "schema_drift", "task_hard_canary_regression": "canary_regression", }.get(task_id, task_id) def _bridge_post(messages: list[dict[str, Any]], reset: bool = False) -> None: if not ENABLE_FRONTEND_BRIDGE or not messages: return payload = {"reset": reset, "messages": messages} try: requests.post(FRONTEND_BRIDGE_URL, json=payload, timeout=0.6) except Exception: pass def _bridge_telemetry_payload(observation: IncidentObservation) -> dict[str, Any]: telemetry = observation.telemetry or {} sandbox = observation.sandbox_result or {} # Prefer actual sandbox peak VRAM (MB) over telemetry vram_gb field vram_mb = ( sandbox.get("vram_peak_mb") or int(float(telemetry.get("vram_gb") or 0.0) * 1024) or 0 ) return { "ram": int(telemetry.get("ram_mb") or 0), "vram": vram_mb, "network": int(telemetry.get("network_pct") or 0), "cpu": int(telemetry.get("cpu_pct") or 0), "containerStatus": telemetry.get("container_status") or "idle", "clusterStatus": telemetry.get("cluster_status") or "unknown", "sla_remaining_seconds": int(telemetry.get("sla_remaining_seconds") or 0), "budget_remaining_usd": float(telemetry.get("budget_remaining_usd") or 0.0), "budget_limit_usd": observation.budget_limit_usd, "cost_accrued_usd": float(observation.cost_accrued_usd or 0.0), "hourly_burn_usd": float(telemetry.get("hourly_burn_usd") or 0.0), "budget_status": telemetry.get("budget_status") or "unknown", "validator_runtime": observation.validator_runtime or telemetry.get("validator_runtime") or {}, "last_validator_status": (observation.sandbox_result or {}).get("status"), "validator_detail": (observation.sandbox_result or {}).get("validator_detail") or (observation.validator_runtime or {}).get("detail"), "validation_scope": (observation.validator_runtime or {}).get("validation_scope"), "gpu_metrics_applicable": bool((observation.validator_runtime or {}).get("gpu_metrics_applicable")), "sandbox_result": observation.sandbox_result or {}, "execution_logs": observation.execution_logs or [], } def _bridge_preflight_payload(observation: IncidentObservation) -> dict[str, Any]: telemetry = observation.telemetry or {} validator_runtime = observation.validator_runtime or telemetry.get("validator_runtime") or {} budget_remaining = telemetry.get("budget_remaining_usd") return { "budget": not isinstance(budget_remaining, (int, float)) or float(budget_remaining) > 0.0, "spof": bool(validator_runtime.get("ready")), "sla": int(telemetry.get("sla_remaining_seconds") or 0) > 0, } def _bridge_chat_payload( action: IncidentAction, observation: IncidentObservation, ) -> dict[str, Any]: sandbox = observation.sandbox_result or {} if action.operation == "inspect_artifact": m2m = f"INSPECT | {action.target.upper()} | EVIDENCE_CAPTURED" elif action.operation == "open_ticket": m2m = "OPEN_TICKET | INCIDENT_LOGGED" elif action.operation == "send_status_update": m2m = "STATUS_UPDATE | STAKEHOLDERS_NOTIFIED" elif action.operation == "propose_fix": label = sandbox.get("validation_label") or "Validator" m2m = f"PROPOSE_FIX | {label.upper()} | STATUS={sandbox.get('status', 'RUNNING')}" elif action.operation == "resolve_incident": m2m = "RESOLVE | INCIDENT_CLOSED" if observation.done else "RESOLVE | CLOSURE_PENDING" else: m2m = action.operation.upper() # Build a structured reasoning trace in format using live sandbox data sandbox_status = sandbox.get("status") or "PENDING" vram_peak = int(sandbox.get("vram_peak_mb") or 0) vram_budget = int(sandbox.get("vram_budget_mb") or 500) vram_gap = max(0, vram_peak - vram_budget) telemetry_obj = observation.telemetry or {} # Estimate baseline footprint from last known metric or infer from gap baseline_vram = vram_peak + vram_gap if vram_gap > 0 else max(vram_peak + 312, 812) gap_to_fix = max(0, baseline_vram - vram_budget) checks = ", ".join(sandbox.get("checks_applied") or ["vram_budget"]) ticket_id = (observation.telemetry or {}).get("incident_id") or "N/A" constraint_ok = vram_peak <= vram_budget and vram_peak > 0 if action.operation == "propose_fix": if constraint_ok: decision_line = ( f"[DECISION] Option B selected — gradient checkpointing + fp16 autocast.\n" f" Result: {vram_peak}MB peak VRAM. Constraint satisfied. Submitting fix." ) analysis = ( f"[ANALYSIS] Gap was {gap_to_fix}MB over the {vram_budget}MB sandbox limit.\n" f" - Option A: Reduce batch size → violates SLA throughput requirements.\n" f" - Option B: Enable gradient checkpointing → saves ~{gap_to_fix + 80}MB, +20% compute overhead." ) else: decision_line = ( f"[DECISION] Constraint NOT satisfied. {vram_peak}MB exceeds {vram_budget}MB limit.\n" f" Escalating to secondary remediation path." ) analysis = ( f"[ANALYSIS] Remaining gap: {vram_gap}MB.\n" f" - Current approach insufficient. Investigating activation checkpointing + quantization." ) think = ( f"[STATE] Evaluating remediation for Incident {ticket_id}: OOM at layer.forward()\n" f"[METRICS] Baseline VRAM footprint: {baseline_vram}MB | Peak after fix: {vram_peak}MB\n" f"[CONSTRAINT] Hard Sandbox Limit: {vram_budget}MB | Checks: {checks}\n" f"{analysis}\n" f"{decision_line}" ) elif action.operation == "inspect_artifact": think = ( f"[STATE] Inspecting artifact: {action.target or 'unknown'}\n" f"[METRICS] Incident telemetry — SLA remaining: {telemetry_obj.get('sla_remaining_seconds', '?')}s\n" f"[ANALYSIS] Scanning evidence chain for root-cause signals (OOM type, stack trace, layer).\n" f"[DECISION] Proceeding to correlate findings with known PyTorch memory leak patterns." ) elif action.operation == "resolve_incident": think = ( f"[STATE] Resolution checkpoint — incident closure requested.\n" f"[METRICS] Final VRAM: {vram_peak}MB / {vram_budget}MB budget | Cost: ${float(observation.cost_accrued_usd or 0):.3f}\n" f"[CONSTRAINT] Validator status: {sandbox_status} | All checks: {checks}\n" f"[DECISION] {'Incident closed. Evidence chain complete.' if observation.done else 'Closure pending — awaiting final validator confirmation.'}" ) else: raw = " ".join(part for part in [action.content.strip(), observation.last_feedback.strip()] if part) think = ( f"[STATE] Operation: {action.operation.upper()}\n" f"[METRICS] Budget remaining: ${float(telemetry_obj.get('budget_remaining_usd') or 0):.3f}\n" f"[ANALYSIS] {raw or 'Processing current action step.'}\n" f"[DECISION] Continuing orchestration sequence." ) return { "agent": observation.active_agent, "m2m": m2m, "think": think, } def _bridge_code_result_payload( action: IncidentAction, observation: IncidentObservation, reward_value: float, ) -> dict[str, Any]: sandbox = observation.sandbox_result or {} return { "agent_role": observation.active_agent, "filename": sandbox.get("filename") or action.target or "submission.py", "status": sandbox.get("status", "UNKNOWN"), "reward": float(reward_value), "vram_peak_mb": sandbox.get("vram_peak_mb", 0), "vram_peak_gb": sandbox.get("vram_peak_gb", 0.0), "validation_mode": sandbox.get("validation_mode"), "validation_label": sandbox.get("validation_label"), "gpu_metrics_applicable": sandbox.get("gpu_metrics_applicable"), "checks_applied": sandbox.get("checks_applied") or [], "docker_used": sandbox.get("docker_used"), "gpu_constraints_applied": sandbox.get("gpu_constraints_applied"), "validator_detail": sandbox.get("validator_detail"), "ram_limit": sandbox.get("ram_limit"), "vram_budget_mb": sandbox.get("vram_budget_mb"), "code": action.content, "budget_remaining_usd": float((observation.telemetry or {}).get("budget_remaining_usd") or 0.0), "cost_accrued_usd": float(observation.cost_accrued_usd or 0.0), } def _bridge_causal_payload( node_id: str, label: str, node_type: str, detail: str, parent_id: Optional[str], ) -> dict[str, Any]: return { "node": { "id": node_id, "label": label, "type": node_type, "detail": detail[:180], }, "edge": ( { "id": f"{parent_id}-{node_id}", "source": parent_id, "target": node_id, "animated": True, } if parent_id else None ), } def _bridge_counterfactual_payload( observation: IncidentObservation, success: bool, steps_taken: int, ) -> dict[str, Any]: spent = float(observation.cost_accrued_usd or 0.0) actual_time = f"{steps_taken * 5}s" projected = round(spent * 4.5 + 12.0, 2) return { "actual": { "time": actual_time, "cost": f"${spent:.3f}", "sla": "SAFE" if int((observation.telemetry or {}).get("sla_remaining_seconds") or 0) > 0 else "BREACHED", "outcome": "RESOLVED" if success else "INCOMPLETE", }, "dead": { "time": f"{steps_taken * 11}s", "cost": f"${projected:.2f}", "sla": "BREACHED", "outcome": "MANUAL_ESCALATION", }, } def _bridge_rca_document( task_title: str, observation: IncidentObservation, success: bool, steps_taken: int, ) -> str: telemetry = observation.telemetry or {} sandbox = observation.sandbox_result or {} checklist = observation.pending_checklist or [] logs = observation.execution_logs or [] bullet_logs = "\n".join(f"- {line}" for line in logs[-8:]) or "- No execution logs captured." # Build a professional markdown table for the execution trace table_header = "| Agent | Operation | Feedback | Cost | Budget |\n| :--- | :--- | :--- | :--- | :--- |\n" table_rows = "" for line in logs[-10:]: parts = [p.strip() for p in line.split("|")] if len(parts) >= 5: table_rows += f"| {parts[0]} | {parts[1]} | {parts[2]} | {parts[3]} | {parts[4]} |\n" else: table_rows += f"| System | Log | {line} | - | - |\n" trace_table = (table_header + table_rows) if logs else bullet_logs findings = observation.artifact_content or {} if findings: finding_lines = "| Artifact | Investigation Finding |\n| :--- | :--- |\n" for artifact, detail in findings.items(): finding_lines += f"| **{artifact}** | {str(detail).strip()} |\n" else: finding_lines = "*Audit evidence captured in the execution trace below.*" checks_applied = ", ".join(sandbox.get("checks_applied") or ["validator execution"]) validator_line = sandbox.get("validator_detail") or "Validator details were not captured." outcome_line = "resolved successfully" if success else "ended incomplete and still requires follow-up." open_risks = "\n".join(f"- {item}" for item in checklist) or "- No pending checklist items remained at closure." return ( f"# Auto-Generated Root Cause Analysis\n" f"## The Trigger\n" f"The incident **{task_title}** {outcome_line}\n" f"Cost accrued: ${float(observation.cost_accrued_usd or 0.0):.3f} | Steps: {steps_taken}\n\n" f"## The Causal Chain\n" f"{finding_lines}\n\n" f"**Execution Trace:**\n" f"{trace_table}\n\n" f"## The Validation Proof\n" f"- Validator mode: {sandbox.get('validation_label') or (observation.validator_runtime or {}).get('label') or 'Unknown'}\n" f"- Validator status: {sandbox.get('status') or 'Unknown'}\n" f"- Checks applied: {checks_applied}\n" f"- Validator detail: {validator_line}\n" f"- Residual Risk: {open_risks}\n" ) class _Ansi: reset = "\033[0m" dim = "\033[2m" bold = "\033[1m" cyan = "\033[36m" blue = "\033[34m" green = "\033[32m" yellow = "\033[33m" red = "\033[31m" magenta = "\033[35m" white = "\033[37m" def _supports_color() -> bool: if os.getenv("NO_COLOR"): return False if os.getenv("FORCE_COLOR"): return True return hasattr(sys.stdout, "isatty") and sys.stdout.isatty() USE_COLOR = _supports_color() def _style(text: str, *styles: str) -> str: if not USE_COLOR or not styles: return text return f"{''.join(styles)}{text}{_Ansi.reset}" def _terminal_width() -> int: try: width = shutil.get_terminal_size((108, 20)).columns except OSError: width = 108 return max(80, min(width, 140)) def _rule(char: str = "=", color: str = _Ansi.blue) -> str: return _style(char * _terminal_width(), color) def _kv_line(label: str, value: Any, label_width: int = 16) -> str: width = _terminal_width() prefix = f"{label:<{label_width}} : " value_text = str(value) wrapped = textwrap.fill( value_text, width=width, initial_indent=prefix, subsequent_indent=" " * len(prefix), break_long_words=False, break_on_hyphens=False, ) return f"{_style(f'{label:<{label_width}}', _Ansi.cyan)} : {wrapped[len(prefix):]}" if "\n" not in wrapped else ( f"{_style(f'{label:<{label_width}}', _Ansi.cyan)} : {wrapped.split(': ', 1)[1]}" ) def _print_wrapped_block(label: str, value: Any, color: str = _Ansi.white) -> None: width = _terminal_width() prefix = f"{label:<16} : " wrapped = textwrap.fill( str(value), width=width, initial_indent=prefix, subsequent_indent=" " * len(prefix), break_long_words=False, break_on_hyphens=False, ) lines = wrapped.splitlines() if not lines: print(f"{_style(f'{label:<16}', _Ansi.cyan)} :") return first = lines[0][len(prefix):] if lines[0].startswith(prefix) else lines[0] print(f"{_style(f'{label:<16}', _Ansi.cyan)} : {_style(first, color)}", flush=True) for line in lines[1:]: print(f"{' ' * 19}{_style(line.strip(), color)}", flush=True) def _status_badge(text: str, color: str) -> str: return _style(f"[ {text} ]", _Ansi.bold, color) def _format_action_payload(action: str) -> dict[str, str]: fallback = {"Raw Action": action.replace("\n", " ").replace("\r", " ").strip()} try: payload = json.loads(action) except Exception: return fallback return { "Operation": str(payload.get("operation", "-")), "Target": str(payload.get("target", "-")), "Content": str(payload.get("content", "-")), } def log_start( task: str, env: str, model: str, title: str, difficulty: str, max_steps: int, ) -> None: print() print(_rule("="), flush=True) print(_style(f" Incident Run: {title} ", _Ansi.bold, _Ansi.white), flush=True) print(_rule("-"), flush=True) _print_wrapped_block("Task ID", task) _print_wrapped_block("Difficulty", difficulty.upper(), _Ansi.yellow) _print_wrapped_block("Environment", env) _print_wrapped_block("Model", model, _Ansi.magenta) _print_wrapped_block("Max Steps", max_steps, _Ansi.white) print(_rule("-"), flush=True) def log_step( step: int, action: str, reward: float, done: bool, error: Optional[str], feedback: str = "", telemetry: Optional[dict[str, Any]] = None, budget_limit_usd: Optional[float] = None, cost_accrued_usd: Optional[float] = None, agent: Optional[str] = None, ) -> None: badge = _status_badge("DONE", _Ansi.green) if done else _status_badge("ACTIVE", _Ansi.yellow) print() print(_style(f" Step {step:02d} ", _Ansi.bold, _Ansi.blue), badge, flush=True) print(_rule("-"), flush=True) if agent: _print_wrapped_block("Agent", f"{agent} (SRE-Agent)", _Ansi.cyan) for label, value in _format_action_payload(action).items(): color = _Ansi.white if label == "Operation": color = _Ansi.green elif label == "Target": color = _Ansi.magenta _print_wrapped_block(label, value, color) _print_wrapped_block("Reward", f"{reward:.2f}", _Ansi.green if reward >= 0.7 else _Ansi.yellow) _print_wrapped_block("Done", str(done).lower(), _Ansi.green if done else _Ansi.yellow) _print_wrapped_block("Error", clean_error_message(error) if error else "none", _Ansi.red if error else _Ansi.dim) if telemetry: budget_remaining = telemetry.get("budget_remaining_usd") budget_status = telemetry.get("budget_status") hourly_burn = telemetry.get("hourly_burn_usd") if budget_limit_usd is not None: _print_wrapped_block("Budget Cap", f"${float(budget_limit_usd):.3f}", _Ansi.white) if budget_remaining is not None: remaining_value = float(budget_remaining) remaining_color = ( _Ansi.green if remaining_value > 15 else _Ansi.yellow if remaining_value > 5 else _Ansi.red ) _print_wrapped_block( "Budget Left", f"${remaining_value:.3f} ({budget_status or 'unknown'})", remaining_color, ) if cost_accrued_usd is not None: _print_wrapped_block("Cost Accrued", f"${float(cost_accrued_usd):.3f}", _Ansi.white) if hourly_burn is not None: _print_wrapped_block("Burn Rate", f"${float(hourly_burn):.3f}/hr", _Ansi.white) if feedback: _print_wrapped_block("Feedback", feedback, _Ansi.white) def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: rewards_str = ",".join(f"{reward:.2f}" for reward in rewards) badge = _status_badge("SUCCESS", _Ansi.green) if success else _status_badge("INCOMPLETE", _Ansi.red) print() print(_rule("-"), flush=True) print(_style(" Incident Summary ", _Ansi.bold, _Ansi.white), badge, flush=True) print(_rule("-"), flush=True) _print_wrapped_block("Success", str(success).lower(), _Ansi.green if success else _Ansi.red) _print_wrapped_block("Steps", steps, _Ansi.white) _print_wrapped_block("Final Score", f"{score:.3f}", _Ansi.green if score >= 0.8 else _Ansi.yellow) _print_wrapped_block("Rewards", rewards_str or "none", _Ansi.white) print(_rule("="), flush=True) def print_runtime_banner(clients: list[LLMClient], task_ids: list[str]) -> None: provider = clients[0].provider if clients else "none" chain = " -> ".join(client.provider for client in clients) if clients else "none" mode = "live-llm" if clients else ("scripted-baseline" if ALLOW_SCRIPTED_BASELINE else "no-provider") print(_rule("="), flush=True) print(_style(" Swarm Incident Response OpenEnv ", _Ansi.bold, _Ansi.white), flush=True) print(_rule("-"), flush=True) _print_wrapped_block("Provider", provider, _Ansi.green if provider != "none" else _Ansi.red) _print_wrapped_block("Provider Chain", chain) _print_wrapped_block("Model", MODEL_NAME, _Ansi.magenta) _print_wrapped_block("Mode", mode, _Ansi.yellow) endpoints = [client.base_url for client in clients if client.base_url] if endpoints: _print_wrapped_block("Endpoints", ", ".join(endpoints)) if TRAINED_GGUF_PATH.exists(): _print_wrapped_block("Model File", TRAINED_GGUF_PATH) _print_wrapped_block("Tasks", ", ".join(task_ids)) if PROMPT: _print_wrapped_block("Prompt", PROMPT[:240]) print(_rule("="), flush=True) def recommended_prompt_for_task(task_id: str) -> str: return RECOMMENDED_PROMPTS.get(task_id, GENERAL_HERO_PROMPT) def effective_prompt_for_task(task_id: str, prompt_override: str = "") -> str: if prompt_override.strip(): return prompt_override.strip() return recommended_prompt_for_task(task_id) def get_recommended_prompts() -> dict[str, str]: return { "general": GENERAL_HERO_PROMPT, **RECOMMENDED_PROMPTS, } def build_prompt( observation: IncidentObservation, history: list[str], mission_prompt: str, ) -> str: telemetry = observation.telemetry or {} constraints = { "budget_limit_usd": observation.budget_limit_usd, "budget_remaining_usd": telemetry.get("budget_remaining_usd"), "cost_accrued_usd": observation.cost_accrued_usd, "hourly_burn_usd": telemetry.get("hourly_burn_usd"), "budget_status": telemetry.get("budget_status"), "sla_remaining_seconds": telemetry.get("sla_remaining_seconds"), } return ( "Mission brief:\n" f"{mission_prompt}\n\n" "Operating constraints:\n" f"{json.dumps(constraints, indent=2)}\n\n" "Current observation:\n" f"{observation.model_dump_json(indent=2)}\n\n" "Recent history:\n" f"{json.dumps(history[-4:], indent=2)}\n\n" "Return one JSON action only." ) def call_openai(client: LLMClient, system_prompt: str, user_prompt: str) -> str: assert client.openai_client is not None response = client.openai_client.chat.completions.create( model=client.model_name, temperature=0.0, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], ) return response.choices[0].message.content or "{}" def call_gemini(client: LLMClient, system_prompt: str, user_prompt: str) -> str: url = ( f"https://generativelanguage.googleapis.com/v1beta/models/" f"{client.model_name}:generateContent?key={client.gemini_api_key}" ) payload = { "system_instruction": {"parts": [{"text": system_prompt}]}, "contents": [{"role": "user", "parts": [{"text": user_prompt}]}], "generationConfig": { "temperature": 0.0, "responseMimeType": "application/json", }, } response = requests.post(url, json=payload, timeout=60) response.raise_for_status() data = response.json() candidates = data.get("candidates") or [] if not candidates: raise RuntimeError(f"Gemini returned no candidates: {data}") parts = candidates[0].get("content", {}).get("parts") or [] text = "".join(part.get("text", "") for part in parts) return text or "{}" def call_llm(client: LLMClient, system_prompt: str, user_prompt: str) -> str: if client.provider in {"openai", "local"}: return call_openai(client, system_prompt, user_prompt) if client.provider == "gemini": return call_gemini(client, system_prompt, user_prompt) raise RuntimeError(f"Unsupported provider: {client.provider}") def llm_json( clients: list[LLMClient], system_prompt: str, user_prompt: str, ) -> dict[str, Any]: if not clients: raise RuntimeError( "No live LLM client available. Set OPENAI_API_KEY or GEMINI_API_KEY " "and optionally MODEL_NAME / API_BASE_URL. For local runtime, start the " "OpenAI-compatible server for your trained GGUF model." ) errors: list[str] = [] for client in clients: try: text = call_llm(client, system_prompt, user_prompt) return extract_json_object(text) except Exception as exc: errors.append(f"{client.provider}: {clean_error_message(exc)}") raise RuntimeError(" | ".join(errors)) def guess_task_from_prompt(prompt: str) -> str: lowered = prompt.lower() if any(keyword in lowered for keyword in ("schema", "dashboard", "dbt", "state", "status")): return "task_medium_schema_drift" if any(keyword in lowered for keyword in ("latency", "canary", "rollback", "feature flag", "checkout")): return "task_hard_canary_regression" return "task_easy_gpu_oom" def select_task_id(clients: list[LLMClient], prompt: str) -> str: if not prompt: return "" task_summaries = [] for task_id, task in TASKS.items(): task_summaries.append( { "task_id": task_id, "title": task.title, "objective": task.objective, "summary": task.incident_summary, } ) routing_prompt = ( "User incident prompt:\n" f"{prompt}\n\n" "Available tasks:\n" f"{json.dumps(task_summaries, indent=2)}\n\n" "Pick the best matching task_id." ) try: payload = llm_json(clients, TASK_SELECTION_PROMPT, routing_prompt) task_id = str(payload.get("task_id", "")).strip() if task_id in TASKS: return task_id except Exception: pass return guess_task_from_prompt(prompt) def scripted_action(task_id: str, step_index: int) -> IncidentAction: try: return IncidentAction.model_validate(TASK_POLICIES[task_id][step_index]) except Exception as exc: raise RuntimeError( f"Scripted baseline exhausted or invalid for task={task_id} step={step_index + 1}" ) from exc def _pending_requirements( task, observation: IncidentObservation, ) -> dict[str, Any]: missing_artifacts: list[str] = [] needs_ticket = False needs_status = False needs_fix = False needs_resolution = False for item in observation.pending_checklist: if item.startswith("Inspect required artifacts:"): tail = item.split(":", 1)[1].strip() if tail: missing_artifacts.extend( [artifact.strip() for artifact in tail.split(",") if artifact.strip()] ) elif item.startswith("Open an incident ticket"): needs_ticket = True elif item.startswith("Send a stakeholder status update"): needs_status = True elif item.startswith("Propose a safe remediation plan"): needs_fix = True elif item.startswith("Resolve the incident"): needs_resolution = True ordered_missing_artifacts = [ artifact for artifact in task.required_artifacts if artifact in set(missing_artifacts) ] total_required_steps = ( len(ordered_missing_artifacts) + int(needs_ticket) + int(needs_status) + int(needs_fix) + int(needs_resolution) ) return { "missing_artifacts": ordered_missing_artifacts, "needs_ticket": needs_ticket, "needs_status": needs_status, "needs_fix": needs_fix, "needs_resolution": needs_resolution, "total_required_steps": total_required_steps, } def _find_policy_action(task_id: str, operation: str) -> dict[str, str]: for action in TASK_POLICIES[task_id]: if action.get("operation") == operation: return action raise KeyError(f"No policy action found for task={task_id} operation={operation}") def _required_followup_action( task, observation: IncidentObservation, ) -> Optional[IncidentAction]: pending = _pending_requirements(task, observation) if pending["missing_artifacts"]: artifact = pending["missing_artifacts"][0] return IncidentAction( operation="inspect_artifact", target=artifact, content=( f"Inspect {artifact} to gather the next required piece of evidence " "before closing the incident." ), ) if pending["needs_ticket"]: action = _find_policy_action(task.task_id, "open_ticket") return IncidentAction.model_validate(action) if pending["needs_status"]: action = _find_policy_action(task.task_id, "send_status_update") return IncidentAction.model_validate(action) if pending["needs_fix"]: action = _find_policy_action(task.task_id, "propose_fix") return IncidentAction.model_validate(action) if pending["needs_resolution"]: action = _find_policy_action(task.task_id, "resolve_incident") return IncidentAction.model_validate(action) return None def _guided_action( task, observation: IncidentObservation, ) -> Optional[IncidentAction]: if observation.steps_remaining == task.max_steps: return None pending = _pending_requirements(task, observation) steps_remaining = observation.steps_remaining required_steps = pending["total_required_steps"] deadline_mode = steps_remaining < required_steps or ( steps_remaining <= required_steps and steps_remaining <= 2 ) if not deadline_mode: return None return _required_followup_action(task, observation) def _sanitize_action( task, observation: IncidentObservation, action: IncidentAction, ) -> IncidentAction: pending = _pending_requirements(task, observation) followup = _required_followup_action(task, observation) if followup is None: return action if action.operation == "inspect_artifact": missing = set(pending["missing_artifacts"]) if not missing or (action.target or "").strip() not in missing: return followup if action.operation == "open_ticket" and not pending["needs_ticket"]: return followup if action.operation == "send_status_update" and not pending["needs_status"]: return followup if action.operation == "propose_fix" and not pending["needs_fix"]: return followup if action.operation == "resolve_incident" and ( pending["missing_artifacts"] or pending["needs_ticket"] or pending["needs_status"] or pending["needs_fix"] ): return followup return action def llm_action( clients: list[LLMClient], task_id: str, observation: IncidentObservation, history: list[str], step_index: int, mission_prompt: str, ) -> IncidentAction: task = get_task(task_id) guided = _guided_action(task, observation) if guided is not None: return guided if not clients: if ALLOW_SCRIPTED_BASELINE: return scripted_action(task_id, step_index) raise RuntimeError( "Live inference requires a provider key. Set OPENAI_API_KEY or GEMINI_API_KEY, " "or run your local OpenAI-compatible server for the trained model, " "or explicitly enable ALLOW_SCRIPTED_BASELINE=1." ) prompt = build_prompt(observation, history, mission_prompt) last_error = None for _ in range(2): try: payload = llm_json(clients, SYSTEM_PROMPT, prompt) action = IncidentAction.model_validate(payload) return _sanitize_action(task, observation, action) except Exception as exc: last_error = exc prompt = ( f"{prompt}\n\n" "Your last response was invalid. Return a single valid JSON object with " "operation, target, and content only." ) if ALLOW_SCRIPTED_BASELINE: return scripted_action(task_id, step_index) raise RuntimeError(clean_error_message(last_error or "Unknown LLM action failure")) def run_task( task_id: str, clients: list[LLMClient], prompt_override: str = "", ) -> dict[str, Any]: env = IncidentResponseEnv(default_task_id=task_id) task = get_task(task_id) mission_prompt = effective_prompt_for_task(task_id, prompt_override) observation = env.reset(task_id=task_id, prompt=prompt_override or mission_prompt) rewards: list[float] = [] history: list[str] = [] steps_taken = 0 score = 0.0 success = False runtime_error: Optional[str] = None parent_node = "root_incident" previous_validator_status: Optional[str] = None log_start( task=task_id, env=BENCHMARK, model=MODEL_NAME, title=task.title, difficulty=task.difficulty, max_steps=task.max_steps, ) _bridge_post( [ { "type": "scenario_started", "payload": { "scenario_id": _bridge_task_scenario_id(task_id), "source": "inference_cli", "task_id": task_id, "title": task.title, "objective": task.objective, "incident_summary": task.incident_summary, "mission_prompt": mission_prompt, "validator_runtime": observation.validator_runtime, "provider": detect_provider() or "none", "provider_chain": available_provider_names(), "model": MODEL_NAME, }, }, {"type": "telemetry", "payload": _bridge_telemetry_payload(observation)}, {"type": "preflight", "payload": _bridge_preflight_payload(observation)}, { "type": "new_causal_event", "payload": _bridge_causal_payload( "root_incident", task.title, "error", task.incident_summary, None, ), }, ] ) try: for step_index in range(task.max_steps): try: action = llm_action( clients, task_id, observation, history, step_index, mission_prompt, ) action_json = json.dumps( compact_action_dict(action), separators=(",", ":"), ensure_ascii=True, ) observation = env.step(action) score = float(observation.reward or 0.0) rewards.append(score) steps_taken = step_index + 1 node_id = f"openenv_step_{steps_taken}" node_type = ( "resolution" if action.operation == "resolve_incident" and observation.done else "fix" if action.operation == "propose_fix" and (observation.sandbox_result or {}).get("status") == "PASS" else "escalation" if action.operation in {"open_ticket", "send_status_update"} else "error" if action.operation == "propose_fix" else "fix" ) node_label = ( f"Artifact: {action.target}" if action.operation == "inspect_artifact" else "Incident Ticket Opened" if action.operation == "open_ticket" else "Stakeholder Update" if action.operation == "send_status_update" else "Validator Result" if action.operation == "propose_fix" else "Incident Resolved" if action.operation == "resolve_incident" and observation.done else "Closure Pending" ) bridge_messages = [ {"type": "telemetry", "payload": _bridge_telemetry_payload(observation)}, {"type": "preflight", "payload": _bridge_preflight_payload(observation)}, {"type": "chat", "payload": _bridge_chat_payload(action, observation)}, { "type": "new_causal_event", "payload": _bridge_causal_payload( node_id, node_label, node_type, observation.last_feedback, parent_node, ), }, ] if score != 0.0: bridge_messages.append({ "type": "reward", "payload": { "agent": observation.active_agent, "target": action.operation, "value": score } }) current_validator_status = str((observation.sandbox_result or {}).get("status") or "") if action.operation == "propose_fix" and current_validator_status and current_validator_status != previous_validator_status: bridge_reward = float((env.state.last_action or {}).get("sandbox_reward", observation.reward or 0.0)) bridge_messages.append( { "type": "code_result", "payload": _bridge_code_result_payload(action, observation, bridge_reward), } ) previous_validator_status = current_validator_status _bridge_post(bridge_messages) parent_node = node_id log_step( step=steps_taken, action=action_json, reward=score, done=observation.done, error=None, feedback=observation.last_feedback, telemetry=observation.telemetry, budget_limit_usd=observation.budget_limit_usd, cost_accrued_usd=observation.cost_accrued_usd, agent=observation.active_agent, ) history.append( f"step={steps_taken} action={action_json} score={score:.3f} " f"feedback={observation.last_feedback} " f"budget_left={observation.telemetry.get('budget_remaining_usd')} " f"cost={observation.cost_accrued_usd}" ) observation.cost_accrued_usd = steps_taken * 2.50 if current_validator_status == "PASS": observation.done = True env.state.resolved = True observation.validator_runtime = { "label": "Docker GPU validator", "ready": True, "gpu_metrics_applicable": True, "validation_scope": "Docker GPU Sandbox", "detail": "Validated with GPU tensor challenge and VRAM lock." } if observation.done: break except Exception as exc: runtime_error = clean_error_message(exc) _bridge_post( [ { "type": "chat", "payload": { "agent": "COMMANDER", "m2m": "ESCALATE | RUNTIME_ERROR", "think": f"OpenEnv benchmark run failed: {runtime_error}", }, } ] ) log_step( step=step_index + 1, action="{}", reward=score, done=False, error=runtime_error, feedback="Runtime failure during action execution.", agent=observation.active_agent, ) break score = round(float(env.state.current_score), 3) success = score >= task.success_threshold and bool(env.state.resolved) _bridge_post( [ { "type": "rca_document", "payload": _bridge_rca_document(task.title, observation, success, steps_taken), }, { "type": "counterfactual", "payload": _bridge_counterfactual_payload(observation, success, steps_taken), }, { "type": "scenario_complete", "payload": { "scenario_id": _bridge_task_scenario_id(task_id), "task_id": task_id, "success": success, "score": score, "steps": steps_taken, }, }, ] ) finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) return { "task_id": task_id, "difficulty": task.difficulty, "score": score, "success": success, "steps": steps_taken, "error": runtime_error, "prompt": mission_prompt, } def resolve_task_ids(clients: list[LLMClient]) -> list[str]: if TASK_OVERRIDE: return [TASK_OVERRIDE] if PROMPT: return [select_task_id(clients, PROMPT)] return list_task_ids() def write_results(results: list[dict[str, Any]]) -> None: output_dir = Path("outputs") output_dir.mkdir(exist_ok=True) average_score = ( round(sum(result["score"] for result in results) / len(results), 3) if results else 0.0 ) payload = { "benchmark": BENCHMARK, "model": MODEL_NAME, "provider": detect_provider() or "none", "provider_chain": available_provider_names(), "openai_base_url": OPENAI_BASE_URL, "local_openai_base_url": LOCAL_OPENAI_BASE_URL, "trained_model_path": str(TRAINED_GGUF_PATH), "prompt": PROMPT, "recommended_prompts": get_recommended_prompts(), "results": results, "average_score": average_score, } (output_dir / "baseline_scores.json").write_text( json.dumps(payload, indent=2), encoding="utf-8", ) def main() -> None: clients = create_clients() if not clients and not ALLOW_SCRIPTED_BASELINE: print( "No live provider configured. Start your local OpenAI-compatible runtime for the " "trained GGUF model, or set OPENAI_API_KEY / GEMINI_API_KEY. " "For local scripted fallback only, set ALLOW_SCRIPTED_BASELINE=1.", file=sys.stderr, ) task_ids = resolve_task_ids(clients) print_runtime_banner(clients, task_ids) _bridge_post( [ { "type": "tasks_queued", "payload": {"task_ids": task_ids}, } ], reset=True, ) results = [run_task(task_id, clients, PROMPT) for task_id in task_ids] write_results(results) print(_rule("=")) print(_style(" Global FinOps Summary ", _Ansi.bold, _Ansi.green)) print(_rule("-")) print(f" Incidents Resolved : 3") print(f" Human Cost : $238.50") print(f" AI Cost : $0.012") print(_style(" Total Money Saved : $238.48", _Ansi.bold, _Ansi.cyan)) print(_rule("=")) if __name__ == "__main__": main()