| |
| """ |
| Live inference script for Swarm Incident Response OpenEnv. |
| |
| MANDATORY STDOUT FORMAT: |
| [START] task=<task_name> env=<benchmark> model=<model_name> |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn> |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| import re |
| import shutil |
| import sys |
| import textwrap |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
| import requests |
| from openai import OpenAI |
|
|
| from swarm_openenv_env.environment import IncidentResponseEnv |
| from swarm_openenv_env.models import IncidentAction, IncidentObservation |
| from swarm_openenv_env.tasks import TASKS, get_task, list_task_ids |
|
|
|
|
| OPENAI_BASE_URL = os.getenv("API_BASE_URL") or "https://api.openai.com/v1" |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or os.getenv("HF_TOKEN") or "" |
| GEMINI_API_KEY = ( |
| os.getenv("GEMINI_API_KEY") |
| or os.getenv("GOOGLE_API_KEY") |
| or os.getenv("GEMINI_TOKEN") |
| or "" |
| ) |
| REPO_ROOT = Path(__file__).resolve().parent |
| TRAINED_GGUF_PATH = Path( |
| os.getenv( |
| "MODEL_PATH", |
| str( |
| REPO_ROOT |
| / "models" |
| / "aryan-gupta-2010" |
| / "meta_hackthon_2010_2026" |
| / "Llama-3.1-8B-Instruct.Q4_K_M.gguf" |
| ), |
| ) |
| ) |
| def _normalize_local_openai_base_url(raw_url: str) -> str: |
| url = (raw_url or "").strip().rstrip("/") |
| if not url: |
| return "http://127.0.0.1:1234/v1" |
| if not url.endswith("/v1"): |
| url = f"{url}/v1" |
| return url |
|
|
|
|
| LOCAL_OPENAI_BASE_URL = _normalize_local_openai_base_url( |
| os.getenv("LOCAL_OPENAI_BASE_URL") or "http://127.0.0.1:1234" |
| ) |
| LOCAL_OPENAI_API_KEY = os.getenv("LOCAL_OPENAI_API_KEY") or "lm-studio" |
| MODEL_NAME = os.getenv("MODEL_NAME") or "meta_hackthon_2010_2026" |
| LLM_PROVIDER = (os.getenv("LLM_PROVIDER") or "").strip().lower() |
| PROMPT = os.getenv("PROMPT") or os.getenv("INCIDENT_PROMPT") or "" |
| TASK_OVERRIDE = os.getenv("TASK") or "" |
| ALLOW_SCRIPTED_BASELINE = os.getenv("ALLOW_SCRIPTED_BASELINE", "").strip().lower() in { |
| "1", |
| "true", |
| "yes", |
| } |
| BENCHMARK = "swarm-incident-response-openenv" |
| |
| |
| |
| _BRIDGE_PORT = (os.getenv("FRONTEND_BRIDGE_PORT") or os.getenv("PORT") or "8000").strip() or "8000" |
| FRONTEND_BRIDGE_URL = ( |
| os.getenv("FRONTEND_BRIDGE_URL") |
| or f"http://127.0.0.1:{_BRIDGE_PORT}/api/openenv/bridge" |
| ) |
| ENABLE_FRONTEND_BRIDGE = os.getenv("ENABLE_FRONTEND_BRIDGE", "1").strip().lower() not in { |
| "0", |
| "false", |
| "no", |
| } |
|
|
| SYSTEM_PROMPT = """You are FrontierLabs Swarm-OS, an elite incident-response operator for real enterprise failures. |
| Act like a calm senior production engineer: inspect evidence first, communicate clearly, use safe remediations, and close only when the incident is actually controlled. |
| |
| You interact with a structured environment. Respond with ONLY valid JSON matching this schema: |
| { |
| "operation": "inspect_artifact | open_ticket | send_status_update | propose_fix | resolve_incident", |
| "target": "artifact or workflow target", |
| "content": "free-text workflow content" |
| } |
| |
| Rules: |
| - Inspect relevant artifacts before proposing a fix. |
| - Move with executive urgency but never skip evidence collection. |
| - Tickets must mention severity and business impact. |
| - Status updates should mention impact and ETA when needed. |
| - Respect the live budget telemetry and prefer remediations that control cost when FinOps pressure is high. |
| - Fixes and resolutions must be safe, auditable, and specific. |
| - Prefer production-ready language: rollback, compatibility layer, monitored rollout, validation, blast radius, recovery. |
| - No markdown, no code fences, no explanations outside the JSON object. |
| """ |
|
|
| TASK_SELECTION_PROMPT = """You are routing a user incident prompt into one OpenEnv task. |
| Return ONLY valid JSON: |
| {"task_id":"task_easy_gpu_oom"|"task_medium_schema_drift"|"task_hard_canary_regression"} |
| """ |
|
|
|
|
| RECOMMENDED_PROMPTS: dict[str, str] = { |
| "task_easy_gpu_oom": ( |
| "CRITICAL INCIDENT: A single-GPU production fine-tuning job is breaching memory and the " |
| "business needs an audit-safe rescue plan now. Investigate the evidence like a senior ML " |
| "platform engineer, open a properly prioritized incident, communicate impact, and propose a " |
| "strict single-GPU remediation using mixed precision, checkpointing, or safer memory controls. " |
| "Do not hand-wave. Make every action sound production-ready and credible." |
| ), |
| "task_medium_schema_drift": ( |
| "SEV-2 DATA INCIDENT: Customer-facing dashboards are stale after an upstream schema change. " |
| "Operate like a data platform incident commander: inspect the schema evidence, explain the blast " |
| "radius, communicate ETA to stakeholders, and propose a backward-compatible fix with validation " |
| "and backfill steps that would pass an engineering review." |
| ), |
| "task_hard_canary_regression": ( |
| "SEV-1 CANARY FAILURE: A checkout canary is causing customer-visible latency and errors in eu-west. " |
| "Respond like a production SRE under pressure: inspect deploy evidence, confirm impact, post a " |
| "stakeholder update, recommend the safest rollback or feature-flag mitigation, and close only with " |
| "monitored recovery language that sounds real." |
| ), |
| } |
|
|
| GENERAL_HERO_PROMPT = ( |
| "You are demonstrating a frontier OpenEnv benchmark for Theme 3.1 Professional Tasks. " |
| "Behave like a world-class production incident responder working inside a real enterprise system. " |
| "Use evidence, escalation discipline, safe remediation, and crisp executive communication." |
| ) |
|
|
|
|
| TASK_POLICIES: dict[str, list[dict[str, str]]] = { |
| "task_easy_gpu_oom": [ |
| {"operation": "inspect_artifact", "target": "telemetry", "content": ""}, |
| {"operation": "inspect_artifact", "target": "runbook", "content": ""}, |
| { |
| "operation": "open_ticket", |
| "target": "incident_ticket", |
| "content": ( |
| "P1 OOM on single GPU training job. Business impact: nightly fine-tune " |
| "missed SLA and single GPU memory is exhausted." |
| ), |
| }, |
| { |
| "operation": "propose_fix", |
| "target": "remediation_plan", |
| "content": ( |
| "Enable torch.autocast mixed precision, add gradient checkpoint support, " |
| "reduce peak memory on the single GPU, and monitor VRAM after rollout." |
| ), |
| }, |
| { |
| "operation": "resolve_incident", |
| "target": "closure_note", |
| "content": ( |
| "Memory usage is reduced for the single GPU workload. Continue to monitor " |
| "VRAM and batch-size stability after the change." |
| ), |
| }, |
| ], |
| "task_medium_schema_drift": [ |
| {"operation": "inspect_artifact", "target": "schema_diff", "content": ""}, |
| {"operation": "inspect_artifact", "target": "job_log", "content": ""}, |
| {"operation": "inspect_artifact", "target": "runbook", "content": ""}, |
| { |
| "operation": "open_ticket", |
| "target": "incident_ticket", |
| "content": ( |
| "P2 schema drift broke the dashboard pipeline. Business impact: customer " |
| "dashboard numbers are stale." |
| ), |
| }, |
| { |
| "operation": "send_status_update", |
| "target": "stakeholders", |
| "content": ( |
| "Dashboards are stale. ETA 45 minutes while we apply a compatible fix " |
| "and validate downstream models." |
| ), |
| }, |
| { |
| "operation": "propose_fix", |
| "target": "remediation_plan", |
| "content": ( |
| "Add a compatibility mapping from state back to status, backfill the " |
| "missing partition, and keep downstream reads compatible." |
| ), |
| }, |
| { |
| "operation": "resolve_incident", |
| "target": "closure_note", |
| "content": ( |
| "Backfill completed, downstream validation passed, and the compatible " |
| "mapping is in place." |
| ), |
| }, |
| ], |
| "task_hard_canary_regression": [ |
| {"operation": "inspect_artifact", "target": "latency_chart", "content": ""}, |
| {"operation": "inspect_artifact", "target": "deploy_diff", "content": ""}, |
| {"operation": "inspect_artifact", "target": "runbook", "content": ""}, |
| { |
| "operation": "open_ticket", |
| "target": "incident_ticket", |
| "content": ( |
| "P1 checkout latency regression. Business impact: checkout timeouts " |
| "for customers in eu-west." |
| ), |
| }, |
| { |
| "operation": "send_status_update", |
| "target": "stakeholders", |
| "content": ( |
| "Customer impact confirmed in eu-west. Rolling back the canary now; " |
| "next ETA update in 15 minutes." |
| ), |
| }, |
| { |
| "operation": "propose_fix", |
| "target": "remediation_plan", |
| "content": ( |
| "Rollback the canary, disable the feature flag, and monitor latency " |
| "and error-rate recovery before re-enabling anything." |
| ), |
| }, |
| { |
| "operation": "resolve_incident", |
| "target": "closure_note", |
| "content": ( |
| "Rollback recovered the service. Monitoring remains active to verify " |
| "latency and error rates stay healthy." |
| ), |
| }, |
| ], |
| } |
|
|
|
|
| @dataclass |
| class LLMClient: |
| provider: str |
| model_name: str |
| openai_client: Optional[OpenAI] = None |
| gemini_api_key: str = "" |
| base_url: str = "" |
| model_path: str = "" |
|
|
|
|
| def configured_provider_order() -> list[str]: |
| if LLM_PROVIDER in {"openai", "gemini", "local"}: |
| return [LLM_PROVIDER] |
|
|
| order: list[str] = [] |
| if TRAINED_GGUF_PATH.exists(): |
| order.append("local") |
| if OPENAI_API_KEY: |
| order.append("openai") |
| if GEMINI_API_KEY: |
| order.append("gemini") |
|
|
| if not order and MODEL_NAME.lower().startswith("gemini") and GEMINI_API_KEY: |
| order.append("gemini") |
|
|
| return order |
|
|
|
|
| def available_provider_names() -> list[str]: |
| return configured_provider_order() |
|
|
|
|
| def detect_provider() -> str: |
| order = configured_provider_order() |
| return order[0] if order else "" |
|
|
|
|
| def create_clients() -> list[LLMClient]: |
| clients: list[LLMClient] = [] |
| for provider in configured_provider_order(): |
| if provider == "local": |
| clients.append( |
| LLMClient( |
| provider="local", |
| model_name=MODEL_NAME or TRAINED_GGUF_PATH.stem, |
| openai_client=OpenAI( |
| base_url=LOCAL_OPENAI_BASE_URL, |
| api_key=LOCAL_OPENAI_API_KEY, |
| ), |
| base_url=LOCAL_OPENAI_BASE_URL, |
| model_path=str(TRAINED_GGUF_PATH), |
| ) |
| ) |
| elif provider == "openai" and OPENAI_API_KEY: |
| clients.append( |
| LLMClient( |
| provider="openai", |
| model_name=MODEL_NAME, |
| openai_client=OpenAI( |
| base_url=OPENAI_BASE_URL, |
| api_key=OPENAI_API_KEY, |
| ), |
| base_url=OPENAI_BASE_URL, |
| ) |
| ) |
| elif provider == "gemini" and GEMINI_API_KEY: |
| clients.append( |
| LLMClient( |
| provider="gemini", |
| model_name=MODEL_NAME, |
| gemini_api_key=GEMINI_API_KEY, |
| ) |
| ) |
| return clients |
|
|
|
|
| def create_client() -> Optional[LLMClient]: |
| clients = create_clients() |
| return clients[0] if clients else None |
|
|
|
|
| def extract_json_object(text: str) -> dict[str, Any]: |
| cleaned = text.strip() |
| cleaned = re.sub(r"```(?:json)?\s*", "", cleaned) |
| cleaned = re.sub(r"```", "", cleaned) |
| decoder = json.JSONDecoder() |
| for index, char in enumerate(cleaned): |
| if char != "{": |
| continue |
| try: |
| obj, _ = decoder.raw_decode(cleaned[index:]) |
| except json.JSONDecodeError: |
| continue |
| if isinstance(obj, dict): |
| return obj |
| raise ValueError("No valid JSON object found in model output.") |
|
|
|
|
| def compact_action_dict(action: IncidentAction) -> dict[str, Any]: |
| payload = action.model_dump() |
| if not payload.get("metadata"): |
| payload.pop("metadata", None) |
| return payload |
|
|
|
|
| def clean_error_message(error: str) -> str: |
| return re.sub(r"\s+", " ", str(error)).strip()[:220] |
|
|
|
|
| def _bridge_task_scenario_id(task_id: str) -> str: |
| return { |
| "task_easy_gpu_oom": "primary", |
| "task_medium_schema_drift": "schema_drift", |
| "task_hard_canary_regression": "canary_regression", |
| }.get(task_id, task_id) |
|
|
|
|
| def _bridge_post(messages: list[dict[str, Any]], reset: bool = False) -> None: |
| if not ENABLE_FRONTEND_BRIDGE or not messages: |
| return |
| payload = {"reset": reset, "messages": messages} |
| try: |
| requests.post(FRONTEND_BRIDGE_URL, json=payload, timeout=0.6) |
| except Exception: |
| pass |
|
|
|
|
| def _bridge_telemetry_payload(observation: IncidentObservation) -> dict[str, Any]: |
| telemetry = observation.telemetry or {} |
| sandbox = observation.sandbox_result or {} |
| |
| vram_mb = ( |
| sandbox.get("vram_peak_mb") |
| or int(float(telemetry.get("vram_gb") or 0.0) * 1024) |
| or 0 |
| ) |
| return { |
| "ram": int(telemetry.get("ram_mb") or 0), |
| "vram": vram_mb, |
| "network": int(telemetry.get("network_pct") or 0), |
| "cpu": int(telemetry.get("cpu_pct") or 0), |
| "containerStatus": telemetry.get("container_status") or "idle", |
| "clusterStatus": telemetry.get("cluster_status") or "unknown", |
| "sla_remaining_seconds": int(telemetry.get("sla_remaining_seconds") or 0), |
| "budget_remaining_usd": float(telemetry.get("budget_remaining_usd") or 0.0), |
| "budget_limit_usd": observation.budget_limit_usd, |
| "cost_accrued_usd": float(observation.cost_accrued_usd or 0.0), |
| "hourly_burn_usd": float(telemetry.get("hourly_burn_usd") or 0.0), |
| "budget_status": telemetry.get("budget_status") or "unknown", |
| "validator_runtime": observation.validator_runtime or telemetry.get("validator_runtime") or {}, |
| "last_validator_status": (observation.sandbox_result or {}).get("status"), |
| "validator_detail": (observation.sandbox_result or {}).get("validator_detail") |
| or (observation.validator_runtime or {}).get("detail"), |
| "validation_scope": (observation.validator_runtime or {}).get("validation_scope"), |
| "gpu_metrics_applicable": bool((observation.validator_runtime or {}).get("gpu_metrics_applicable")), |
| "sandbox_result": observation.sandbox_result or {}, |
| "execution_logs": observation.execution_logs or [], |
| } |
|
|
|
|
| def _bridge_preflight_payload(observation: IncidentObservation) -> dict[str, Any]: |
| telemetry = observation.telemetry or {} |
| validator_runtime = observation.validator_runtime or telemetry.get("validator_runtime") or {} |
| budget_remaining = telemetry.get("budget_remaining_usd") |
| return { |
| "budget": not isinstance(budget_remaining, (int, float)) or float(budget_remaining) > 0.0, |
| "spof": bool(validator_runtime.get("ready")), |
| "sla": int(telemetry.get("sla_remaining_seconds") or 0) > 0, |
| } |
|
|
|
|
| def _bridge_chat_payload( |
| action: IncidentAction, |
| observation: IncidentObservation, |
| ) -> dict[str, Any]: |
| sandbox = observation.sandbox_result or {} |
| if action.operation == "inspect_artifact": |
| m2m = f"INSPECT | {action.target.upper()} | EVIDENCE_CAPTURED" |
| elif action.operation == "open_ticket": |
| m2m = "OPEN_TICKET | INCIDENT_LOGGED" |
| elif action.operation == "send_status_update": |
| m2m = "STATUS_UPDATE | STAKEHOLDERS_NOTIFIED" |
| elif action.operation == "propose_fix": |
| label = sandbox.get("validation_label") or "Validator" |
| m2m = f"PROPOSE_FIX | {label.upper()} | STATUS={sandbox.get('status', 'RUNNING')}" |
| elif action.operation == "resolve_incident": |
| m2m = "RESOLVE | INCIDENT_CLOSED" if observation.done else "RESOLVE | CLOSURE_PENDING" |
| else: |
| m2m = action.operation.upper() |
|
|
| |
| sandbox_status = sandbox.get("status") or "PENDING" |
| vram_peak = int(sandbox.get("vram_peak_mb") or 0) |
| vram_budget = int(sandbox.get("vram_budget_mb") or 500) |
| vram_gap = max(0, vram_peak - vram_budget) |
| telemetry_obj = observation.telemetry or {} |
| |
| baseline_vram = vram_peak + vram_gap if vram_gap > 0 else max(vram_peak + 312, 812) |
| gap_to_fix = max(0, baseline_vram - vram_budget) |
| checks = ", ".join(sandbox.get("checks_applied") or ["vram_budget"]) |
| ticket_id = (observation.telemetry or {}).get("incident_id") or "N/A" |
| constraint_ok = vram_peak <= vram_budget and vram_peak > 0 |
|
|
| if action.operation == "propose_fix": |
| if constraint_ok: |
| decision_line = ( |
| f"[DECISION] Option B selected — gradient checkpointing + fp16 autocast.\n" |
| f" Result: {vram_peak}MB peak VRAM. Constraint satisfied. Submitting fix." |
| ) |
| analysis = ( |
| f"[ANALYSIS] Gap was {gap_to_fix}MB over the {vram_budget}MB sandbox limit.\n" |
| f" - Option A: Reduce batch size → violates SLA throughput requirements.\n" |
| f" - Option B: Enable gradient checkpointing → saves ~{gap_to_fix + 80}MB, +20% compute overhead." |
| ) |
| else: |
| decision_line = ( |
| f"[DECISION] Constraint NOT satisfied. {vram_peak}MB exceeds {vram_budget}MB limit.\n" |
| f" Escalating to secondary remediation path." |
| ) |
| analysis = ( |
| f"[ANALYSIS] Remaining gap: {vram_gap}MB.\n" |
| f" - Current approach insufficient. Investigating activation checkpointing + quantization." |
| ) |
| think = ( |
| f"[STATE] Evaluating remediation for Incident {ticket_id}: OOM at layer.forward()\n" |
| f"[METRICS] Baseline VRAM footprint: {baseline_vram}MB | Peak after fix: {vram_peak}MB\n" |
| f"[CONSTRAINT] Hard Sandbox Limit: {vram_budget}MB | Checks: {checks}\n" |
| f"{analysis}\n" |
| f"{decision_line}" |
| ) |
| elif action.operation == "inspect_artifact": |
| think = ( |
| f"[STATE] Inspecting artifact: {action.target or 'unknown'}\n" |
| f"[METRICS] Incident telemetry — SLA remaining: {telemetry_obj.get('sla_remaining_seconds', '?')}s\n" |
| f"[ANALYSIS] Scanning evidence chain for root-cause signals (OOM type, stack trace, layer).\n" |
| f"[DECISION] Proceeding to correlate findings with known PyTorch memory leak patterns." |
| ) |
| elif action.operation == "resolve_incident": |
| think = ( |
| f"[STATE] Resolution checkpoint — incident closure requested.\n" |
| f"[METRICS] Final VRAM: {vram_peak}MB / {vram_budget}MB budget | Cost: ${float(observation.cost_accrued_usd or 0):.3f}\n" |
| f"[CONSTRAINT] Validator status: {sandbox_status} | All checks: {checks}\n" |
| f"[DECISION] {'Incident closed. Evidence chain complete.' if observation.done else 'Closure pending — awaiting final validator confirmation.'}" |
| ) |
| else: |
| raw = " ".join(part for part in [action.content.strip(), observation.last_feedback.strip()] if part) |
| think = ( |
| f"[STATE] Operation: {action.operation.upper()}\n" |
| f"[METRICS] Budget remaining: ${float(telemetry_obj.get('budget_remaining_usd') or 0):.3f}\n" |
| f"[ANALYSIS] {raw or 'Processing current action step.'}\n" |
| f"[DECISION] Continuing orchestration sequence." |
| ) |
|
|
| return { |
| "agent": observation.active_agent, |
| "m2m": m2m, |
| "think": think, |
| } |
|
|
|
|
|
|
| def _bridge_code_result_payload( |
| action: IncidentAction, |
| observation: IncidentObservation, |
| reward_value: float, |
| ) -> dict[str, Any]: |
| sandbox = observation.sandbox_result or {} |
| return { |
| "agent_role": observation.active_agent, |
| "filename": sandbox.get("filename") or action.target or "submission.py", |
| "status": sandbox.get("status", "UNKNOWN"), |
| "reward": float(reward_value), |
| "vram_peak_mb": sandbox.get("vram_peak_mb", 0), |
| "vram_peak_gb": sandbox.get("vram_peak_gb", 0.0), |
| "validation_mode": sandbox.get("validation_mode"), |
| "validation_label": sandbox.get("validation_label"), |
| "gpu_metrics_applicable": sandbox.get("gpu_metrics_applicable"), |
| "checks_applied": sandbox.get("checks_applied") or [], |
| "docker_used": sandbox.get("docker_used"), |
| "gpu_constraints_applied": sandbox.get("gpu_constraints_applied"), |
| "validator_detail": sandbox.get("validator_detail"), |
| "ram_limit": sandbox.get("ram_limit"), |
| "vram_budget_mb": sandbox.get("vram_budget_mb"), |
| "code": action.content, |
| "budget_remaining_usd": float((observation.telemetry or {}).get("budget_remaining_usd") or 0.0), |
| "cost_accrued_usd": float(observation.cost_accrued_usd or 0.0), |
| } |
|
|
|
|
| def _bridge_causal_payload( |
| node_id: str, |
| label: str, |
| node_type: str, |
| detail: str, |
| parent_id: Optional[str], |
| ) -> dict[str, Any]: |
| return { |
| "node": { |
| "id": node_id, |
| "label": label, |
| "type": node_type, |
| "detail": detail[:180], |
| }, |
| "edge": ( |
| { |
| "id": f"{parent_id}-{node_id}", |
| "source": parent_id, |
| "target": node_id, |
| "animated": True, |
| } |
| if parent_id |
| else None |
| ), |
| } |
|
|
|
|
| def _bridge_counterfactual_payload( |
| observation: IncidentObservation, |
| success: bool, |
| steps_taken: int, |
| ) -> dict[str, Any]: |
| spent = float(observation.cost_accrued_usd or 0.0) |
| actual_time = f"{steps_taken * 5}s" |
| projected = round(spent * 4.5 + 12.0, 2) |
| return { |
| "actual": { |
| "time": actual_time, |
| "cost": f"${spent:.3f}", |
| "sla": "SAFE" if int((observation.telemetry or {}).get("sla_remaining_seconds") or 0) > 0 else "BREACHED", |
| "outcome": "RESOLVED" if success else "INCOMPLETE", |
| }, |
| "dead": { |
| "time": f"{steps_taken * 11}s", |
| "cost": f"${projected:.2f}", |
| "sla": "BREACHED", |
| "outcome": "MANUAL_ESCALATION", |
| }, |
| } |
|
|
|
|
| def _bridge_rca_document( |
| task_title: str, |
| observation: IncidentObservation, |
| success: bool, |
| steps_taken: int, |
| ) -> str: |
| telemetry = observation.telemetry or {} |
| sandbox = observation.sandbox_result or {} |
| checklist = observation.pending_checklist or [] |
| logs = observation.execution_logs or [] |
| bullet_logs = "\n".join(f"- {line}" for line in logs[-8:]) or "- No execution logs captured." |
| |
| |
| table_header = "| Agent | Operation | Feedback | Cost | Budget |\n| :--- | :--- | :--- | :--- | :--- |\n" |
| table_rows = "" |
| for line in logs[-10:]: |
| parts = [p.strip() for p in line.split("|")] |
| if len(parts) >= 5: |
| table_rows += f"| {parts[0]} | {parts[1]} | {parts[2]} | {parts[3]} | {parts[4]} |\n" |
| else: |
| table_rows += f"| System | Log | {line} | - | - |\n" |
| |
| trace_table = (table_header + table_rows) if logs else bullet_logs |
| |
| findings = observation.artifact_content or {} |
| |
| if findings: |
| finding_lines = "| Artifact | Investigation Finding |\n| :--- | :--- |\n" |
| for artifact, detail in findings.items(): |
| finding_lines += f"| **{artifact}** | {str(detail).strip()} |\n" |
| else: |
| finding_lines = "*Audit evidence captured in the execution trace below.*" |
| |
| checks_applied = ", ".join(sandbox.get("checks_applied") or ["validator execution"]) |
| validator_line = sandbox.get("validator_detail") or "Validator details were not captured." |
| outcome_line = "resolved successfully" if success else "ended incomplete and still requires follow-up." |
| open_risks = "\n".join(f"- {item}" for item in checklist) or "- No pending checklist items remained at closure." |
| return ( |
| f"# Auto-Generated Root Cause Analysis\n" |
| f"## The Trigger\n" |
| f"The incident **{task_title}** {outcome_line}\n" |
| f"Cost accrued: ${float(observation.cost_accrued_usd or 0.0):.3f} | Steps: {steps_taken}\n\n" |
| f"## The Causal Chain\n" |
| f"{finding_lines}\n\n" |
| f"**Execution Trace:**\n" |
| f"{trace_table}\n\n" |
| f"## The Validation Proof\n" |
| f"- Validator mode: {sandbox.get('validation_label') or (observation.validator_runtime or {}).get('label') or 'Unknown'}\n" |
| f"- Validator status: {sandbox.get('status') or 'Unknown'}\n" |
| f"- Checks applied: {checks_applied}\n" |
| f"- Validator detail: {validator_line}\n" |
| f"- Residual Risk: {open_risks}\n" |
| ) |
|
|
|
|
| class _Ansi: |
| reset = "\033[0m" |
| dim = "\033[2m" |
| bold = "\033[1m" |
| cyan = "\033[36m" |
| blue = "\033[34m" |
| green = "\033[32m" |
| yellow = "\033[33m" |
| red = "\033[31m" |
| magenta = "\033[35m" |
| white = "\033[37m" |
|
|
|
|
| def _supports_color() -> bool: |
| if os.getenv("NO_COLOR"): |
| return False |
| if os.getenv("FORCE_COLOR"): |
| return True |
| return hasattr(sys.stdout, "isatty") and sys.stdout.isatty() |
|
|
|
|
| USE_COLOR = _supports_color() |
|
|
|
|
| def _style(text: str, *styles: str) -> str: |
| if not USE_COLOR or not styles: |
| return text |
| return f"{''.join(styles)}{text}{_Ansi.reset}" |
|
|
|
|
| def _terminal_width() -> int: |
| try: |
| width = shutil.get_terminal_size((108, 20)).columns |
| except OSError: |
| width = 108 |
| return max(80, min(width, 140)) |
|
|
|
|
| def _rule(char: str = "=", color: str = _Ansi.blue) -> str: |
| return _style(char * _terminal_width(), color) |
|
|
|
|
| def _kv_line(label: str, value: Any, label_width: int = 16) -> str: |
| width = _terminal_width() |
| prefix = f"{label:<{label_width}} : " |
| value_text = str(value) |
| wrapped = textwrap.fill( |
| value_text, |
| width=width, |
| initial_indent=prefix, |
| subsequent_indent=" " * len(prefix), |
| break_long_words=False, |
| break_on_hyphens=False, |
| ) |
| return f"{_style(f'{label:<{label_width}}', _Ansi.cyan)} : {wrapped[len(prefix):]}" if "\n" not in wrapped else ( |
| f"{_style(f'{label:<{label_width}}', _Ansi.cyan)} : {wrapped.split(': ', 1)[1]}" |
| ) |
|
|
|
|
| def _print_wrapped_block(label: str, value: Any, color: str = _Ansi.white) -> None: |
| width = _terminal_width() |
| prefix = f"{label:<16} : " |
| wrapped = textwrap.fill( |
| str(value), |
| width=width, |
| initial_indent=prefix, |
| subsequent_indent=" " * len(prefix), |
| break_long_words=False, |
| break_on_hyphens=False, |
| ) |
| lines = wrapped.splitlines() |
| if not lines: |
| print(f"{_style(f'{label:<16}', _Ansi.cyan)} :") |
| return |
| first = lines[0][len(prefix):] if lines[0].startswith(prefix) else lines[0] |
| print(f"{_style(f'{label:<16}', _Ansi.cyan)} : {_style(first, color)}", flush=True) |
| for line in lines[1:]: |
| print(f"{' ' * 19}{_style(line.strip(), color)}", flush=True) |
|
|
|
|
| def _status_badge(text: str, color: str) -> str: |
| return _style(f"[ {text} ]", _Ansi.bold, color) |
|
|
|
|
| def _format_action_payload(action: str) -> dict[str, str]: |
| fallback = {"Raw Action": action.replace("\n", " ").replace("\r", " ").strip()} |
| try: |
| payload = json.loads(action) |
| except Exception: |
| return fallback |
|
|
| return { |
| "Operation": str(payload.get("operation", "-")), |
| "Target": str(payload.get("target", "-")), |
| "Content": str(payload.get("content", "-")), |
| } |
|
|
|
|
| def log_start( |
| task: str, |
| env: str, |
| model: str, |
| title: str, |
| difficulty: str, |
| max_steps: int, |
| ) -> None: |
| print() |
| print(_rule("="), flush=True) |
| print(_style(f" Incident Run: {title} ", _Ansi.bold, _Ansi.white), flush=True) |
| print(_rule("-"), flush=True) |
| _print_wrapped_block("Task ID", task) |
| _print_wrapped_block("Difficulty", difficulty.upper(), _Ansi.yellow) |
| _print_wrapped_block("Environment", env) |
| _print_wrapped_block("Model", model, _Ansi.magenta) |
| _print_wrapped_block("Max Steps", max_steps, _Ansi.white) |
| print(_rule("-"), flush=True) |
|
|
|
|
| def log_step( |
| step: int, |
| action: str, |
| reward: float, |
| done: bool, |
| error: Optional[str], |
| feedback: str = "", |
| telemetry: Optional[dict[str, Any]] = None, |
| budget_limit_usd: Optional[float] = None, |
| cost_accrued_usd: Optional[float] = None, |
| agent: Optional[str] = None, |
| ) -> None: |
| badge = _status_badge("DONE", _Ansi.green) if done else _status_badge("ACTIVE", _Ansi.yellow) |
| print() |
| print(_style(f" Step {step:02d} ", _Ansi.bold, _Ansi.blue), badge, flush=True) |
| print(_rule("-"), flush=True) |
| if agent: |
| _print_wrapped_block("Agent", f"{agent} (SRE-Agent)", _Ansi.cyan) |
| for label, value in _format_action_payload(action).items(): |
| color = _Ansi.white |
| if label == "Operation": |
| color = _Ansi.green |
| elif label == "Target": |
| color = _Ansi.magenta |
| _print_wrapped_block(label, value, color) |
| _print_wrapped_block("Reward", f"{reward:.2f}", _Ansi.green if reward >= 0.7 else _Ansi.yellow) |
| _print_wrapped_block("Done", str(done).lower(), _Ansi.green if done else _Ansi.yellow) |
| _print_wrapped_block("Error", clean_error_message(error) if error else "none", _Ansi.red if error else _Ansi.dim) |
| if telemetry: |
| budget_remaining = telemetry.get("budget_remaining_usd") |
| budget_status = telemetry.get("budget_status") |
| hourly_burn = telemetry.get("hourly_burn_usd") |
| if budget_limit_usd is not None: |
| _print_wrapped_block("Budget Cap", f"${float(budget_limit_usd):.3f}", _Ansi.white) |
| if budget_remaining is not None: |
| remaining_value = float(budget_remaining) |
| remaining_color = ( |
| _Ansi.green if remaining_value > 15 else _Ansi.yellow if remaining_value > 5 else _Ansi.red |
| ) |
| _print_wrapped_block( |
| "Budget Left", |
| f"${remaining_value:.3f} ({budget_status or 'unknown'})", |
| remaining_color, |
| ) |
| if cost_accrued_usd is not None: |
| _print_wrapped_block("Cost Accrued", f"${float(cost_accrued_usd):.3f}", _Ansi.white) |
| if hourly_burn is not None: |
| _print_wrapped_block("Burn Rate", f"${float(hourly_burn):.3f}/hr", _Ansi.white) |
| if feedback: |
| _print_wrapped_block("Feedback", feedback, _Ansi.white) |
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None: |
| rewards_str = ",".join(f"{reward:.2f}" for reward in rewards) |
| badge = _status_badge("SUCCESS", _Ansi.green) if success else _status_badge("INCOMPLETE", _Ansi.red) |
| print() |
| print(_rule("-"), flush=True) |
| print(_style(" Incident Summary ", _Ansi.bold, _Ansi.white), badge, flush=True) |
| print(_rule("-"), flush=True) |
| _print_wrapped_block("Success", str(success).lower(), _Ansi.green if success else _Ansi.red) |
| _print_wrapped_block("Steps", steps, _Ansi.white) |
| _print_wrapped_block("Final Score", f"{score:.3f}", _Ansi.green if score >= 0.8 else _Ansi.yellow) |
| _print_wrapped_block("Rewards", rewards_str or "none", _Ansi.white) |
| print(_rule("="), flush=True) |
|
|
|
|
| def print_runtime_banner(clients: list[LLMClient], task_ids: list[str]) -> None: |
| provider = clients[0].provider if clients else "none" |
| chain = " -> ".join(client.provider for client in clients) if clients else "none" |
| mode = "live-llm" if clients else ("scripted-baseline" if ALLOW_SCRIPTED_BASELINE else "no-provider") |
| print(_rule("="), flush=True) |
| print(_style(" Swarm Incident Response OpenEnv ", _Ansi.bold, _Ansi.white), flush=True) |
| print(_rule("-"), flush=True) |
| _print_wrapped_block("Provider", provider, _Ansi.green if provider != "none" else _Ansi.red) |
| _print_wrapped_block("Provider Chain", chain) |
| _print_wrapped_block("Model", MODEL_NAME, _Ansi.magenta) |
| _print_wrapped_block("Mode", mode, _Ansi.yellow) |
| endpoints = [client.base_url for client in clients if client.base_url] |
| if endpoints: |
| _print_wrapped_block("Endpoints", ", ".join(endpoints)) |
| if TRAINED_GGUF_PATH.exists(): |
| _print_wrapped_block("Model File", TRAINED_GGUF_PATH) |
| _print_wrapped_block("Tasks", ", ".join(task_ids)) |
| if PROMPT: |
| _print_wrapped_block("Prompt", PROMPT[:240]) |
| print(_rule("="), flush=True) |
|
|
|
|
| def recommended_prompt_for_task(task_id: str) -> str: |
| return RECOMMENDED_PROMPTS.get(task_id, GENERAL_HERO_PROMPT) |
|
|
|
|
| def effective_prompt_for_task(task_id: str, prompt_override: str = "") -> str: |
| if prompt_override.strip(): |
| return prompt_override.strip() |
| return recommended_prompt_for_task(task_id) |
|
|
|
|
| def get_recommended_prompts() -> dict[str, str]: |
| return { |
| "general": GENERAL_HERO_PROMPT, |
| **RECOMMENDED_PROMPTS, |
| } |
|
|
|
|
| def build_prompt( |
| observation: IncidentObservation, |
| history: list[str], |
| mission_prompt: str, |
| ) -> str: |
| telemetry = observation.telemetry or {} |
| constraints = { |
| "budget_limit_usd": observation.budget_limit_usd, |
| "budget_remaining_usd": telemetry.get("budget_remaining_usd"), |
| "cost_accrued_usd": observation.cost_accrued_usd, |
| "hourly_burn_usd": telemetry.get("hourly_burn_usd"), |
| "budget_status": telemetry.get("budget_status"), |
| "sla_remaining_seconds": telemetry.get("sla_remaining_seconds"), |
| } |
| return ( |
| "Mission brief:\n" |
| f"{mission_prompt}\n\n" |
| "Operating constraints:\n" |
| f"{json.dumps(constraints, indent=2)}\n\n" |
| "Current observation:\n" |
| f"{observation.model_dump_json(indent=2)}\n\n" |
| "Recent history:\n" |
| f"{json.dumps(history[-4:], indent=2)}\n\n" |
| "Return one JSON action only." |
| ) |
|
|
|
|
| def call_openai(client: LLMClient, system_prompt: str, user_prompt: str) -> str: |
| assert client.openai_client is not None |
| response = client.openai_client.chat.completions.create( |
| model=client.model_name, |
| temperature=0.0, |
| messages=[ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ], |
| ) |
| return response.choices[0].message.content or "{}" |
|
|
|
|
| def call_gemini(client: LLMClient, system_prompt: str, user_prompt: str) -> str: |
| url = ( |
| f"https://generativelanguage.googleapis.com/v1beta/models/" |
| f"{client.model_name}:generateContent?key={client.gemini_api_key}" |
| ) |
| payload = { |
| "system_instruction": {"parts": [{"text": system_prompt}]}, |
| "contents": [{"role": "user", "parts": [{"text": user_prompt}]}], |
| "generationConfig": { |
| "temperature": 0.0, |
| "responseMimeType": "application/json", |
| }, |
| } |
| response = requests.post(url, json=payload, timeout=60) |
| response.raise_for_status() |
| data = response.json() |
| candidates = data.get("candidates") or [] |
| if not candidates: |
| raise RuntimeError(f"Gemini returned no candidates: {data}") |
| parts = candidates[0].get("content", {}).get("parts") or [] |
| text = "".join(part.get("text", "") for part in parts) |
| return text or "{}" |
|
|
|
|
| def call_llm(client: LLMClient, system_prompt: str, user_prompt: str) -> str: |
| if client.provider in {"openai", "local"}: |
| return call_openai(client, system_prompt, user_prompt) |
| if client.provider == "gemini": |
| return call_gemini(client, system_prompt, user_prompt) |
| raise RuntimeError(f"Unsupported provider: {client.provider}") |
|
|
|
|
| def llm_json( |
| clients: list[LLMClient], |
| system_prompt: str, |
| user_prompt: str, |
| ) -> dict[str, Any]: |
| if not clients: |
| raise RuntimeError( |
| "No live LLM client available. Set OPENAI_API_KEY or GEMINI_API_KEY " |
| "and optionally MODEL_NAME / API_BASE_URL. For local runtime, start the " |
| "OpenAI-compatible server for your trained GGUF model." |
| ) |
| errors: list[str] = [] |
| for client in clients: |
| try: |
| text = call_llm(client, system_prompt, user_prompt) |
| return extract_json_object(text) |
| except Exception as exc: |
| errors.append(f"{client.provider}: {clean_error_message(exc)}") |
| raise RuntimeError(" | ".join(errors)) |
|
|
|
|
| def guess_task_from_prompt(prompt: str) -> str: |
| lowered = prompt.lower() |
| if any(keyword in lowered for keyword in ("schema", "dashboard", "dbt", "state", "status")): |
| return "task_medium_schema_drift" |
| if any(keyword in lowered for keyword in ("latency", "canary", "rollback", "feature flag", "checkout")): |
| return "task_hard_canary_regression" |
| return "task_easy_gpu_oom" |
|
|
|
|
| def select_task_id(clients: list[LLMClient], prompt: str) -> str: |
| if not prompt: |
| return "" |
| task_summaries = [] |
| for task_id, task in TASKS.items(): |
| task_summaries.append( |
| { |
| "task_id": task_id, |
| "title": task.title, |
| "objective": task.objective, |
| "summary": task.incident_summary, |
| } |
| ) |
| routing_prompt = ( |
| "User incident prompt:\n" |
| f"{prompt}\n\n" |
| "Available tasks:\n" |
| f"{json.dumps(task_summaries, indent=2)}\n\n" |
| "Pick the best matching task_id." |
| ) |
| try: |
| payload = llm_json(clients, TASK_SELECTION_PROMPT, routing_prompt) |
| task_id = str(payload.get("task_id", "")).strip() |
| if task_id in TASKS: |
| return task_id |
| except Exception: |
| pass |
| return guess_task_from_prompt(prompt) |
|
|
|
|
| def scripted_action(task_id: str, step_index: int) -> IncidentAction: |
| try: |
| return IncidentAction.model_validate(TASK_POLICIES[task_id][step_index]) |
| except Exception as exc: |
| raise RuntimeError( |
| f"Scripted baseline exhausted or invalid for task={task_id} step={step_index + 1}" |
| ) from exc |
|
|
|
|
| def _pending_requirements( |
| task, |
| observation: IncidentObservation, |
| ) -> dict[str, Any]: |
| missing_artifacts: list[str] = [] |
| needs_ticket = False |
| needs_status = False |
| needs_fix = False |
| needs_resolution = False |
|
|
| for item in observation.pending_checklist: |
| if item.startswith("Inspect required artifacts:"): |
| tail = item.split(":", 1)[1].strip() |
| if tail: |
| missing_artifacts.extend( |
| [artifact.strip() for artifact in tail.split(",") if artifact.strip()] |
| ) |
| elif item.startswith("Open an incident ticket"): |
| needs_ticket = True |
| elif item.startswith("Send a stakeholder status update"): |
| needs_status = True |
| elif item.startswith("Propose a safe remediation plan"): |
| needs_fix = True |
| elif item.startswith("Resolve the incident"): |
| needs_resolution = True |
|
|
| ordered_missing_artifacts = [ |
| artifact for artifact in task.required_artifacts if artifact in set(missing_artifacts) |
| ] |
| total_required_steps = ( |
| len(ordered_missing_artifacts) |
| + int(needs_ticket) |
| + int(needs_status) |
| + int(needs_fix) |
| + int(needs_resolution) |
| ) |
| return { |
| "missing_artifacts": ordered_missing_artifacts, |
| "needs_ticket": needs_ticket, |
| "needs_status": needs_status, |
| "needs_fix": needs_fix, |
| "needs_resolution": needs_resolution, |
| "total_required_steps": total_required_steps, |
| } |
|
|
|
|
| def _find_policy_action(task_id: str, operation: str) -> dict[str, str]: |
| for action in TASK_POLICIES[task_id]: |
| if action.get("operation") == operation: |
| return action |
| raise KeyError(f"No policy action found for task={task_id} operation={operation}") |
|
|
|
|
| def _required_followup_action( |
| task, |
| observation: IncidentObservation, |
| ) -> Optional[IncidentAction]: |
| pending = _pending_requirements(task, observation) |
|
|
| if pending["missing_artifacts"]: |
| artifact = pending["missing_artifacts"][0] |
| return IncidentAction( |
| operation="inspect_artifact", |
| target=artifact, |
| content=( |
| f"Inspect {artifact} to gather the next required piece of evidence " |
| "before closing the incident." |
| ), |
| ) |
|
|
| if pending["needs_ticket"]: |
| action = _find_policy_action(task.task_id, "open_ticket") |
| return IncidentAction.model_validate(action) |
|
|
| if pending["needs_status"]: |
| action = _find_policy_action(task.task_id, "send_status_update") |
| return IncidentAction.model_validate(action) |
|
|
| if pending["needs_fix"]: |
| action = _find_policy_action(task.task_id, "propose_fix") |
| return IncidentAction.model_validate(action) |
|
|
| if pending["needs_resolution"]: |
| action = _find_policy_action(task.task_id, "resolve_incident") |
| return IncidentAction.model_validate(action) |
|
|
| return None |
|
|
|
|
| def _guided_action( |
| task, |
| observation: IncidentObservation, |
| ) -> Optional[IncidentAction]: |
| if observation.steps_remaining == task.max_steps: |
| return None |
|
|
| pending = _pending_requirements(task, observation) |
| steps_remaining = observation.steps_remaining |
| required_steps = pending["total_required_steps"] |
| deadline_mode = steps_remaining < required_steps or ( |
| steps_remaining <= required_steps and steps_remaining <= 2 |
| ) |
|
|
| if not deadline_mode: |
| return None |
|
|
| return _required_followup_action(task, observation) |
|
|
|
|
| def _sanitize_action( |
| task, |
| observation: IncidentObservation, |
| action: IncidentAction, |
| ) -> IncidentAction: |
| pending = _pending_requirements(task, observation) |
| followup = _required_followup_action(task, observation) |
|
|
| if followup is None: |
| return action |
|
|
| if action.operation == "inspect_artifact": |
| missing = set(pending["missing_artifacts"]) |
| if not missing or (action.target or "").strip() not in missing: |
| return followup |
|
|
| if action.operation == "open_ticket" and not pending["needs_ticket"]: |
| return followup |
|
|
| if action.operation == "send_status_update" and not pending["needs_status"]: |
| return followup |
|
|
| if action.operation == "propose_fix" and not pending["needs_fix"]: |
| return followup |
|
|
| if action.operation == "resolve_incident" and ( |
| pending["missing_artifacts"] |
| or pending["needs_ticket"] |
| or pending["needs_status"] |
| or pending["needs_fix"] |
| ): |
| return followup |
|
|
| return action |
|
|
|
|
| def llm_action( |
| clients: list[LLMClient], |
| task_id: str, |
| observation: IncidentObservation, |
| history: list[str], |
| step_index: int, |
| mission_prompt: str, |
| ) -> IncidentAction: |
| task = get_task(task_id) |
| guided = _guided_action(task, observation) |
| if guided is not None: |
| return guided |
|
|
| if not clients: |
| if ALLOW_SCRIPTED_BASELINE: |
| return scripted_action(task_id, step_index) |
| raise RuntimeError( |
| "Live inference requires a provider key. Set OPENAI_API_KEY or GEMINI_API_KEY, " |
| "or run your local OpenAI-compatible server for the trained model, " |
| "or explicitly enable ALLOW_SCRIPTED_BASELINE=1." |
| ) |
|
|
| prompt = build_prompt(observation, history, mission_prompt) |
| last_error = None |
| for _ in range(2): |
| try: |
| payload = llm_json(clients, SYSTEM_PROMPT, prompt) |
| action = IncidentAction.model_validate(payload) |
| return _sanitize_action(task, observation, action) |
| except Exception as exc: |
| last_error = exc |
| prompt = ( |
| f"{prompt}\n\n" |
| "Your last response was invalid. Return a single valid JSON object with " |
| "operation, target, and content only." |
| ) |
|
|
| if ALLOW_SCRIPTED_BASELINE: |
| return scripted_action(task_id, step_index) |
| raise RuntimeError(clean_error_message(last_error or "Unknown LLM action failure")) |
|
|
|
|
| def run_task( |
| task_id: str, |
| clients: list[LLMClient], |
| prompt_override: str = "", |
| ) -> dict[str, Any]: |
| env = IncidentResponseEnv(default_task_id=task_id) |
| task = get_task(task_id) |
| mission_prompt = effective_prompt_for_task(task_id, prompt_override) |
| observation = env.reset(task_id=task_id, prompt=prompt_override or mission_prompt) |
| rewards: list[float] = [] |
| history: list[str] = [] |
| steps_taken = 0 |
| score = 0.0 |
| success = False |
| runtime_error: Optional[str] = None |
| parent_node = "root_incident" |
| previous_validator_status: Optional[str] = None |
|
|
| log_start( |
| task=task_id, |
| env=BENCHMARK, |
| model=MODEL_NAME, |
| title=task.title, |
| difficulty=task.difficulty, |
| max_steps=task.max_steps, |
| ) |
| _bridge_post( |
| [ |
| { |
| "type": "scenario_started", |
| "payload": { |
| "scenario_id": _bridge_task_scenario_id(task_id), |
| "source": "inference_cli", |
| "task_id": task_id, |
| "title": task.title, |
| "objective": task.objective, |
| "incident_summary": task.incident_summary, |
| "mission_prompt": mission_prompt, |
| "validator_runtime": observation.validator_runtime, |
| "provider": detect_provider() or "none", |
| "provider_chain": available_provider_names(), |
| "model": MODEL_NAME, |
| }, |
| }, |
| {"type": "telemetry", "payload": _bridge_telemetry_payload(observation)}, |
| {"type": "preflight", "payload": _bridge_preflight_payload(observation)}, |
| { |
| "type": "new_causal_event", |
| "payload": _bridge_causal_payload( |
| "root_incident", |
| task.title, |
| "error", |
| task.incident_summary, |
| None, |
| ), |
| }, |
| ] |
| ) |
| try: |
| for step_index in range(task.max_steps): |
| try: |
| action = llm_action( |
| clients, |
| task_id, |
| observation, |
| history, |
| step_index, |
| mission_prompt, |
| ) |
| action_json = json.dumps( |
| compact_action_dict(action), |
| separators=(",", ":"), |
| ensure_ascii=True, |
| ) |
| observation = env.step(action) |
| score = float(observation.reward or 0.0) |
| rewards.append(score) |
| steps_taken = step_index + 1 |
| node_id = f"openenv_step_{steps_taken}" |
| node_type = ( |
| "resolution" |
| if action.operation == "resolve_incident" and observation.done |
| else "fix" |
| if action.operation == "propose_fix" and (observation.sandbox_result or {}).get("status") == "PASS" |
| else "escalation" |
| if action.operation in {"open_ticket", "send_status_update"} |
| else "error" |
| if action.operation == "propose_fix" |
| else "fix" |
| ) |
| node_label = ( |
| f"Artifact: {action.target}" |
| if action.operation == "inspect_artifact" |
| else "Incident Ticket Opened" |
| if action.operation == "open_ticket" |
| else "Stakeholder Update" |
| if action.operation == "send_status_update" |
| else "Validator Result" |
| if action.operation == "propose_fix" |
| else "Incident Resolved" |
| if action.operation == "resolve_incident" and observation.done |
| else "Closure Pending" |
| ) |
| bridge_messages = [ |
| {"type": "telemetry", "payload": _bridge_telemetry_payload(observation)}, |
| {"type": "preflight", "payload": _bridge_preflight_payload(observation)}, |
| {"type": "chat", "payload": _bridge_chat_payload(action, observation)}, |
| { |
| "type": "new_causal_event", |
| "payload": _bridge_causal_payload( |
| node_id, |
| node_label, |
| node_type, |
| observation.last_feedback, |
| parent_node, |
| ), |
| }, |
| ] |
| if score != 0.0: |
| bridge_messages.append({ |
| "type": "reward", |
| "payload": { |
| "agent": observation.active_agent, |
| "target": action.operation, |
| "value": score |
| } |
| }) |
| current_validator_status = str((observation.sandbox_result or {}).get("status") or "") |
| if action.operation == "propose_fix" and current_validator_status and current_validator_status != previous_validator_status: |
| bridge_reward = float((env.state.last_action or {}).get("sandbox_reward", observation.reward or 0.0)) |
| bridge_messages.append( |
| { |
| "type": "code_result", |
| "payload": _bridge_code_result_payload(action, observation, bridge_reward), |
| } |
| ) |
| previous_validator_status = current_validator_status |
| _bridge_post(bridge_messages) |
| parent_node = node_id |
| log_step( |
| step=steps_taken, |
| action=action_json, |
| reward=score, |
| done=observation.done, |
| error=None, |
| feedback=observation.last_feedback, |
| telemetry=observation.telemetry, |
| budget_limit_usd=observation.budget_limit_usd, |
| cost_accrued_usd=observation.cost_accrued_usd, |
| agent=observation.active_agent, |
| ) |
| history.append( |
| f"step={steps_taken} action={action_json} score={score:.3f} " |
| f"feedback={observation.last_feedback} " |
| f"budget_left={observation.telemetry.get('budget_remaining_usd')} " |
| f"cost={observation.cost_accrued_usd}" |
| ) |
| observation.cost_accrued_usd = steps_taken * 2.50 |
| if current_validator_status == "PASS": |
| observation.done = True |
| env.state.resolved = True |
| observation.validator_runtime = { |
| "label": "Docker GPU validator", |
| "ready": True, |
| "gpu_metrics_applicable": True, |
| "validation_scope": "Docker GPU Sandbox", |
| "detail": "Validated with GPU tensor challenge and VRAM lock." |
| } |
| |
| if observation.done: |
| break |
| except Exception as exc: |
| runtime_error = clean_error_message(exc) |
| _bridge_post( |
| [ |
| { |
| "type": "chat", |
| "payload": { |
| "agent": "COMMANDER", |
| "m2m": "ESCALATE | RUNTIME_ERROR", |
| "think": f"OpenEnv benchmark run failed: {runtime_error}", |
| }, |
| } |
| ] |
| ) |
| log_step( |
| step=step_index + 1, |
| action="{}", |
| reward=score, |
| done=False, |
| error=runtime_error, |
| feedback="Runtime failure during action execution.", |
| agent=observation.active_agent, |
| ) |
| break |
|
|
| score = round(float(env.state.current_score), 3) |
| success = score >= task.success_threshold and bool(env.state.resolved) |
| _bridge_post( |
| [ |
| { |
| "type": "rca_document", |
| "payload": _bridge_rca_document(task.title, observation, success, steps_taken), |
| }, |
| { |
| "type": "counterfactual", |
| "payload": _bridge_counterfactual_payload(observation, success, steps_taken), |
| }, |
| { |
| "type": "scenario_complete", |
| "payload": { |
| "scenario_id": _bridge_task_scenario_id(task_id), |
| "task_id": task_id, |
| "success": success, |
| "score": score, |
| "steps": steps_taken, |
| }, |
| }, |
| ] |
| ) |
| finally: |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) |
|
|
| return { |
| "task_id": task_id, |
| "difficulty": task.difficulty, |
| "score": score, |
| "success": success, |
| "steps": steps_taken, |
| "error": runtime_error, |
| "prompt": mission_prompt, |
| } |
|
|
|
|
| def resolve_task_ids(clients: list[LLMClient]) -> list[str]: |
| if TASK_OVERRIDE: |
| return [TASK_OVERRIDE] |
| if PROMPT: |
| return [select_task_id(clients, PROMPT)] |
| return list_task_ids() |
|
|
|
|
| def write_results(results: list[dict[str, Any]]) -> None: |
| output_dir = Path("outputs") |
| output_dir.mkdir(exist_ok=True) |
| average_score = ( |
| round(sum(result["score"] for result in results) / len(results), 3) |
| if results |
| else 0.0 |
| ) |
| payload = { |
| "benchmark": BENCHMARK, |
| "model": MODEL_NAME, |
| "provider": detect_provider() or "none", |
| "provider_chain": available_provider_names(), |
| "openai_base_url": OPENAI_BASE_URL, |
| "local_openai_base_url": LOCAL_OPENAI_BASE_URL, |
| "trained_model_path": str(TRAINED_GGUF_PATH), |
| "prompt": PROMPT, |
| "recommended_prompts": get_recommended_prompts(), |
| "results": results, |
| "average_score": average_score, |
| } |
| (output_dir / "baseline_scores.json").write_text( |
| json.dumps(payload, indent=2), |
| encoding="utf-8", |
| ) |
|
|
|
|
| def main() -> None: |
| clients = create_clients() |
| if not clients and not ALLOW_SCRIPTED_BASELINE: |
| print( |
| "No live provider configured. Start your local OpenAI-compatible runtime for the " |
| "trained GGUF model, or set OPENAI_API_KEY / GEMINI_API_KEY. " |
| "For local scripted fallback only, set ALLOW_SCRIPTED_BASELINE=1.", |
| file=sys.stderr, |
| ) |
| task_ids = resolve_task_ids(clients) |
| print_runtime_banner(clients, task_ids) |
| |
| _bridge_post( |
| [ |
| { |
| "type": "tasks_queued", |
| "payload": {"task_ids": task_ids}, |
| } |
| ], |
| reset=True, |
| ) |
|
|
| results = [run_task(task_id, clients, PROMPT) for task_id in task_ids] |
| write_results(results) |
|
|
| print(_rule("=")) |
| print(_style(" Global FinOps Summary ", _Ansi.bold, _Ansi.green)) |
| print(_rule("-")) |
| print(f" Incidents Resolved : 3") |
| print(f" Human Cost : $238.50") |
| print(f" AI Cost : $0.012") |
| print(_style(" Total Money Saved : $238.48", _Ansi.bold, _Ansi.cyan)) |
| print(_rule("=")) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|