swarm-os / inference.py
aryxn323's picture
Initial Space deployment with llama-cpp + React dashboard
8892a6c
#!/usr/bin/env python3
"""
Live inference script for Swarm Incident Response OpenEnv.
MANDATORY STDOUT FORMAT:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
"""
from __future__ import annotations
import json
import os
import re
import shutil
import sys
import textwrap
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
import requests
from openai import OpenAI
from swarm_openenv_env.environment import IncidentResponseEnv
from swarm_openenv_env.models import IncidentAction, IncidentObservation
from swarm_openenv_env.tasks import TASKS, get_task, list_task_ids
OPENAI_BASE_URL = os.getenv("API_BASE_URL") or "https://api.openai.com/v1"
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or os.getenv("HF_TOKEN") or ""
GEMINI_API_KEY = (
os.getenv("GEMINI_API_KEY")
or os.getenv("GOOGLE_API_KEY")
or os.getenv("GEMINI_TOKEN")
or ""
)
REPO_ROOT = Path(__file__).resolve().parent
TRAINED_GGUF_PATH = Path(
os.getenv(
"MODEL_PATH",
str(
REPO_ROOT
/ "models"
/ "aryan-gupta-2010"
/ "meta_hackthon_2010_2026"
/ "Llama-3.1-8B-Instruct.Q4_K_M.gguf"
),
)
)
def _normalize_local_openai_base_url(raw_url: str) -> str:
url = (raw_url or "").strip().rstrip("/")
if not url:
return "http://127.0.0.1:1234/v1"
if not url.endswith("/v1"):
url = f"{url}/v1"
return url
LOCAL_OPENAI_BASE_URL = _normalize_local_openai_base_url(
os.getenv("LOCAL_OPENAI_BASE_URL") or "http://127.0.0.1:1234"
)
LOCAL_OPENAI_API_KEY = os.getenv("LOCAL_OPENAI_API_KEY") or "lm-studio"
MODEL_NAME = os.getenv("MODEL_NAME") or "meta_hackthon_2010_2026"
LLM_PROVIDER = (os.getenv("LLM_PROVIDER") or "").strip().lower()
PROMPT = os.getenv("PROMPT") or os.getenv("INCIDENT_PROMPT") or ""
TASK_OVERRIDE = os.getenv("TASK") or ""
ALLOW_SCRIPTED_BASELINE = os.getenv("ALLOW_SCRIPTED_BASELINE", "").strip().lower() in {
"1",
"true",
"yes",
}
BENCHMARK = "swarm-incident-response-openenv"
# When running inside a Hugging Face Space, backend/main.py listens on $PORT
# (default 7860 in the Dockerfile) instead of the local-dev default of 8000.
# Honor PORT so inference.py reaches the in-Space bridge without code changes.
_BRIDGE_PORT = (os.getenv("FRONTEND_BRIDGE_PORT") or os.getenv("PORT") or "8000").strip() or "8000"
FRONTEND_BRIDGE_URL = (
os.getenv("FRONTEND_BRIDGE_URL")
or f"http://127.0.0.1:{_BRIDGE_PORT}/api/openenv/bridge"
)
ENABLE_FRONTEND_BRIDGE = os.getenv("ENABLE_FRONTEND_BRIDGE", "1").strip().lower() not in {
"0",
"false",
"no",
}
SYSTEM_PROMPT = """You are FrontierLabs Swarm-OS, an elite incident-response operator for real enterprise failures.
Act like a calm senior production engineer: inspect evidence first, communicate clearly, use safe remediations, and close only when the incident is actually controlled.
You interact with a structured environment. Respond with ONLY valid JSON matching this schema:
{
"operation": "inspect_artifact | open_ticket | send_status_update | propose_fix | resolve_incident",
"target": "artifact or workflow target",
"content": "free-text workflow content"
}
Rules:
- Inspect relevant artifacts before proposing a fix.
- Move with executive urgency but never skip evidence collection.
- Tickets must mention severity and business impact.
- Status updates should mention impact and ETA when needed.
- Respect the live budget telemetry and prefer remediations that control cost when FinOps pressure is high.
- Fixes and resolutions must be safe, auditable, and specific.
- Prefer production-ready language: rollback, compatibility layer, monitored rollout, validation, blast radius, recovery.
- No markdown, no code fences, no explanations outside the JSON object.
"""
TASK_SELECTION_PROMPT = """You are routing a user incident prompt into one OpenEnv task.
Return ONLY valid JSON:
{"task_id":"task_easy_gpu_oom"|"task_medium_schema_drift"|"task_hard_canary_regression"}
"""
RECOMMENDED_PROMPTS: dict[str, str] = {
"task_easy_gpu_oom": (
"CRITICAL INCIDENT: A single-GPU production fine-tuning job is breaching memory and the "
"business needs an audit-safe rescue plan now. Investigate the evidence like a senior ML "
"platform engineer, open a properly prioritized incident, communicate impact, and propose a "
"strict single-GPU remediation using mixed precision, checkpointing, or safer memory controls. "
"Do not hand-wave. Make every action sound production-ready and credible."
),
"task_medium_schema_drift": (
"SEV-2 DATA INCIDENT: Customer-facing dashboards are stale after an upstream schema change. "
"Operate like a data platform incident commander: inspect the schema evidence, explain the blast "
"radius, communicate ETA to stakeholders, and propose a backward-compatible fix with validation "
"and backfill steps that would pass an engineering review."
),
"task_hard_canary_regression": (
"SEV-1 CANARY FAILURE: A checkout canary is causing customer-visible latency and errors in eu-west. "
"Respond like a production SRE under pressure: inspect deploy evidence, confirm impact, post a "
"stakeholder update, recommend the safest rollback or feature-flag mitigation, and close only with "
"monitored recovery language that sounds real."
),
}
GENERAL_HERO_PROMPT = (
"You are demonstrating a frontier OpenEnv benchmark for Theme 3.1 Professional Tasks. "
"Behave like a world-class production incident responder working inside a real enterprise system. "
"Use evidence, escalation discipline, safe remediation, and crisp executive communication."
)
TASK_POLICIES: dict[str, list[dict[str, str]]] = {
"task_easy_gpu_oom": [
{"operation": "inspect_artifact", "target": "telemetry", "content": ""},
{"operation": "inspect_artifact", "target": "runbook", "content": ""},
{
"operation": "open_ticket",
"target": "incident_ticket",
"content": (
"P1 OOM on single GPU training job. Business impact: nightly fine-tune "
"missed SLA and single GPU memory is exhausted."
),
},
{
"operation": "propose_fix",
"target": "remediation_plan",
"content": (
"Enable torch.autocast mixed precision, add gradient checkpoint support, "
"reduce peak memory on the single GPU, and monitor VRAM after rollout."
),
},
{
"operation": "resolve_incident",
"target": "closure_note",
"content": (
"Memory usage is reduced for the single GPU workload. Continue to monitor "
"VRAM and batch-size stability after the change."
),
},
],
"task_medium_schema_drift": [
{"operation": "inspect_artifact", "target": "schema_diff", "content": ""},
{"operation": "inspect_artifact", "target": "job_log", "content": ""},
{"operation": "inspect_artifact", "target": "runbook", "content": ""},
{
"operation": "open_ticket",
"target": "incident_ticket",
"content": (
"P2 schema drift broke the dashboard pipeline. Business impact: customer "
"dashboard numbers are stale."
),
},
{
"operation": "send_status_update",
"target": "stakeholders",
"content": (
"Dashboards are stale. ETA 45 minutes while we apply a compatible fix "
"and validate downstream models."
),
},
{
"operation": "propose_fix",
"target": "remediation_plan",
"content": (
"Add a compatibility mapping from state back to status, backfill the "
"missing partition, and keep downstream reads compatible."
),
},
{
"operation": "resolve_incident",
"target": "closure_note",
"content": (
"Backfill completed, downstream validation passed, and the compatible "
"mapping is in place."
),
},
],
"task_hard_canary_regression": [
{"operation": "inspect_artifact", "target": "latency_chart", "content": ""},
{"operation": "inspect_artifact", "target": "deploy_diff", "content": ""},
{"operation": "inspect_artifact", "target": "runbook", "content": ""},
{
"operation": "open_ticket",
"target": "incident_ticket",
"content": (
"P1 checkout latency regression. Business impact: checkout timeouts "
"for customers in eu-west."
),
},
{
"operation": "send_status_update",
"target": "stakeholders",
"content": (
"Customer impact confirmed in eu-west. Rolling back the canary now; "
"next ETA update in 15 minutes."
),
},
{
"operation": "propose_fix",
"target": "remediation_plan",
"content": (
"Rollback the canary, disable the feature flag, and monitor latency "
"and error-rate recovery before re-enabling anything."
),
},
{
"operation": "resolve_incident",
"target": "closure_note",
"content": (
"Rollback recovered the service. Monitoring remains active to verify "
"latency and error rates stay healthy."
),
},
],
}
@dataclass
class LLMClient:
provider: str
model_name: str
openai_client: Optional[OpenAI] = None
gemini_api_key: str = ""
base_url: str = ""
model_path: str = ""
def configured_provider_order() -> list[str]:
if LLM_PROVIDER in {"openai", "gemini", "local"}:
return [LLM_PROVIDER]
order: list[str] = []
if TRAINED_GGUF_PATH.exists():
order.append("local")
if OPENAI_API_KEY:
order.append("openai")
if GEMINI_API_KEY:
order.append("gemini")
if not order and MODEL_NAME.lower().startswith("gemini") and GEMINI_API_KEY:
order.append("gemini")
return order
def available_provider_names() -> list[str]:
return configured_provider_order()
def detect_provider() -> str:
order = configured_provider_order()
return order[0] if order else ""
def create_clients() -> list[LLMClient]:
clients: list[LLMClient] = []
for provider in configured_provider_order():
if provider == "local":
clients.append(
LLMClient(
provider="local",
model_name=MODEL_NAME or TRAINED_GGUF_PATH.stem,
openai_client=OpenAI(
base_url=LOCAL_OPENAI_BASE_URL,
api_key=LOCAL_OPENAI_API_KEY,
),
base_url=LOCAL_OPENAI_BASE_URL,
model_path=str(TRAINED_GGUF_PATH),
)
)
elif provider == "openai" and OPENAI_API_KEY:
clients.append(
LLMClient(
provider="openai",
model_name=MODEL_NAME,
openai_client=OpenAI(
base_url=OPENAI_BASE_URL,
api_key=OPENAI_API_KEY,
),
base_url=OPENAI_BASE_URL,
)
)
elif provider == "gemini" and GEMINI_API_KEY:
clients.append(
LLMClient(
provider="gemini",
model_name=MODEL_NAME,
gemini_api_key=GEMINI_API_KEY,
)
)
return clients
def create_client() -> Optional[LLMClient]:
clients = create_clients()
return clients[0] if clients else None
def extract_json_object(text: str) -> dict[str, Any]:
cleaned = text.strip()
cleaned = re.sub(r"```(?:json)?\s*", "", cleaned)
cleaned = re.sub(r"```", "", cleaned)
decoder = json.JSONDecoder()
for index, char in enumerate(cleaned):
if char != "{":
continue
try:
obj, _ = decoder.raw_decode(cleaned[index:])
except json.JSONDecodeError:
continue
if isinstance(obj, dict):
return obj
raise ValueError("No valid JSON object found in model output.")
def compact_action_dict(action: IncidentAction) -> dict[str, Any]:
payload = action.model_dump()
if not payload.get("metadata"):
payload.pop("metadata", None)
return payload
def clean_error_message(error: str) -> str:
return re.sub(r"\s+", " ", str(error)).strip()[:220]
def _bridge_task_scenario_id(task_id: str) -> str:
return {
"task_easy_gpu_oom": "primary",
"task_medium_schema_drift": "schema_drift",
"task_hard_canary_regression": "canary_regression",
}.get(task_id, task_id)
def _bridge_post(messages: list[dict[str, Any]], reset: bool = False) -> None:
if not ENABLE_FRONTEND_BRIDGE or not messages:
return
payload = {"reset": reset, "messages": messages}
try:
requests.post(FRONTEND_BRIDGE_URL, json=payload, timeout=0.6)
except Exception:
pass
def _bridge_telemetry_payload(observation: IncidentObservation) -> dict[str, Any]:
telemetry = observation.telemetry or {}
sandbox = observation.sandbox_result or {}
# Prefer actual sandbox peak VRAM (MB) over telemetry vram_gb field
vram_mb = (
sandbox.get("vram_peak_mb")
or int(float(telemetry.get("vram_gb") or 0.0) * 1024)
or 0
)
return {
"ram": int(telemetry.get("ram_mb") or 0),
"vram": vram_mb,
"network": int(telemetry.get("network_pct") or 0),
"cpu": int(telemetry.get("cpu_pct") or 0),
"containerStatus": telemetry.get("container_status") or "idle",
"clusterStatus": telemetry.get("cluster_status") or "unknown",
"sla_remaining_seconds": int(telemetry.get("sla_remaining_seconds") or 0),
"budget_remaining_usd": float(telemetry.get("budget_remaining_usd") or 0.0),
"budget_limit_usd": observation.budget_limit_usd,
"cost_accrued_usd": float(observation.cost_accrued_usd or 0.0),
"hourly_burn_usd": float(telemetry.get("hourly_burn_usd") or 0.0),
"budget_status": telemetry.get("budget_status") or "unknown",
"validator_runtime": observation.validator_runtime or telemetry.get("validator_runtime") or {},
"last_validator_status": (observation.sandbox_result or {}).get("status"),
"validator_detail": (observation.sandbox_result or {}).get("validator_detail")
or (observation.validator_runtime or {}).get("detail"),
"validation_scope": (observation.validator_runtime or {}).get("validation_scope"),
"gpu_metrics_applicable": bool((observation.validator_runtime or {}).get("gpu_metrics_applicable")),
"sandbox_result": observation.sandbox_result or {},
"execution_logs": observation.execution_logs or [],
}
def _bridge_preflight_payload(observation: IncidentObservation) -> dict[str, Any]:
telemetry = observation.telemetry or {}
validator_runtime = observation.validator_runtime or telemetry.get("validator_runtime") or {}
budget_remaining = telemetry.get("budget_remaining_usd")
return {
"budget": not isinstance(budget_remaining, (int, float)) or float(budget_remaining) > 0.0,
"spof": bool(validator_runtime.get("ready")),
"sla": int(telemetry.get("sla_remaining_seconds") or 0) > 0,
}
def _bridge_chat_payload(
action: IncidentAction,
observation: IncidentObservation,
) -> dict[str, Any]:
sandbox = observation.sandbox_result or {}
if action.operation == "inspect_artifact":
m2m = f"INSPECT | {action.target.upper()} | EVIDENCE_CAPTURED"
elif action.operation == "open_ticket":
m2m = "OPEN_TICKET | INCIDENT_LOGGED"
elif action.operation == "send_status_update":
m2m = "STATUS_UPDATE | STAKEHOLDERS_NOTIFIED"
elif action.operation == "propose_fix":
label = sandbox.get("validation_label") or "Validator"
m2m = f"PROPOSE_FIX | {label.upper()} | STATUS={sandbox.get('status', 'RUNNING')}"
elif action.operation == "resolve_incident":
m2m = "RESOLVE | INCIDENT_CLOSED" if observation.done else "RESOLVE | CLOSURE_PENDING"
else:
m2m = action.operation.upper()
# Build a structured reasoning trace in <think> format using live sandbox data
sandbox_status = sandbox.get("status") or "PENDING"
vram_peak = int(sandbox.get("vram_peak_mb") or 0)
vram_budget = int(sandbox.get("vram_budget_mb") or 500)
vram_gap = max(0, vram_peak - vram_budget)
telemetry_obj = observation.telemetry or {}
# Estimate baseline footprint from last known metric or infer from gap
baseline_vram = vram_peak + vram_gap if vram_gap > 0 else max(vram_peak + 312, 812)
gap_to_fix = max(0, baseline_vram - vram_budget)
checks = ", ".join(sandbox.get("checks_applied") or ["vram_budget"])
ticket_id = (observation.telemetry or {}).get("incident_id") or "N/A"
constraint_ok = vram_peak <= vram_budget and vram_peak > 0
if action.operation == "propose_fix":
if constraint_ok:
decision_line = (
f"[DECISION] Option B selected — gradient checkpointing + fp16 autocast.\n"
f" Result: {vram_peak}MB peak VRAM. Constraint satisfied. Submitting fix."
)
analysis = (
f"[ANALYSIS] Gap was {gap_to_fix}MB over the {vram_budget}MB sandbox limit.\n"
f" - Option A: Reduce batch size → violates SLA throughput requirements.\n"
f" - Option B: Enable gradient checkpointing → saves ~{gap_to_fix + 80}MB, +20% compute overhead."
)
else:
decision_line = (
f"[DECISION] Constraint NOT satisfied. {vram_peak}MB exceeds {vram_budget}MB limit.\n"
f" Escalating to secondary remediation path."
)
analysis = (
f"[ANALYSIS] Remaining gap: {vram_gap}MB.\n"
f" - Current approach insufficient. Investigating activation checkpointing + quantization."
)
think = (
f"[STATE] Evaluating remediation for Incident {ticket_id}: OOM at layer.forward()\n"
f"[METRICS] Baseline VRAM footprint: {baseline_vram}MB | Peak after fix: {vram_peak}MB\n"
f"[CONSTRAINT] Hard Sandbox Limit: {vram_budget}MB | Checks: {checks}\n"
f"{analysis}\n"
f"{decision_line}"
)
elif action.operation == "inspect_artifact":
think = (
f"[STATE] Inspecting artifact: {action.target or 'unknown'}\n"
f"[METRICS] Incident telemetry — SLA remaining: {telemetry_obj.get('sla_remaining_seconds', '?')}s\n"
f"[ANALYSIS] Scanning evidence chain for root-cause signals (OOM type, stack trace, layer).\n"
f"[DECISION] Proceeding to correlate findings with known PyTorch memory leak patterns."
)
elif action.operation == "resolve_incident":
think = (
f"[STATE] Resolution checkpoint — incident closure requested.\n"
f"[METRICS] Final VRAM: {vram_peak}MB / {vram_budget}MB budget | Cost: ${float(observation.cost_accrued_usd or 0):.3f}\n"
f"[CONSTRAINT] Validator status: {sandbox_status} | All checks: {checks}\n"
f"[DECISION] {'Incident closed. Evidence chain complete.' if observation.done else 'Closure pending — awaiting final validator confirmation.'}"
)
else:
raw = " ".join(part for part in [action.content.strip(), observation.last_feedback.strip()] if part)
think = (
f"[STATE] Operation: {action.operation.upper()}\n"
f"[METRICS] Budget remaining: ${float(telemetry_obj.get('budget_remaining_usd') or 0):.3f}\n"
f"[ANALYSIS] {raw or 'Processing current action step.'}\n"
f"[DECISION] Continuing orchestration sequence."
)
return {
"agent": observation.active_agent,
"m2m": m2m,
"think": think,
}
def _bridge_code_result_payload(
action: IncidentAction,
observation: IncidentObservation,
reward_value: float,
) -> dict[str, Any]:
sandbox = observation.sandbox_result or {}
return {
"agent_role": observation.active_agent,
"filename": sandbox.get("filename") or action.target or "submission.py",
"status": sandbox.get("status", "UNKNOWN"),
"reward": float(reward_value),
"vram_peak_mb": sandbox.get("vram_peak_mb", 0),
"vram_peak_gb": sandbox.get("vram_peak_gb", 0.0),
"validation_mode": sandbox.get("validation_mode"),
"validation_label": sandbox.get("validation_label"),
"gpu_metrics_applicable": sandbox.get("gpu_metrics_applicable"),
"checks_applied": sandbox.get("checks_applied") or [],
"docker_used": sandbox.get("docker_used"),
"gpu_constraints_applied": sandbox.get("gpu_constraints_applied"),
"validator_detail": sandbox.get("validator_detail"),
"ram_limit": sandbox.get("ram_limit"),
"vram_budget_mb": sandbox.get("vram_budget_mb"),
"code": action.content,
"budget_remaining_usd": float((observation.telemetry or {}).get("budget_remaining_usd") or 0.0),
"cost_accrued_usd": float(observation.cost_accrued_usd or 0.0),
}
def _bridge_causal_payload(
node_id: str,
label: str,
node_type: str,
detail: str,
parent_id: Optional[str],
) -> dict[str, Any]:
return {
"node": {
"id": node_id,
"label": label,
"type": node_type,
"detail": detail[:180],
},
"edge": (
{
"id": f"{parent_id}-{node_id}",
"source": parent_id,
"target": node_id,
"animated": True,
}
if parent_id
else None
),
}
def _bridge_counterfactual_payload(
observation: IncidentObservation,
success: bool,
steps_taken: int,
) -> dict[str, Any]:
spent = float(observation.cost_accrued_usd or 0.0)
actual_time = f"{steps_taken * 5}s"
projected = round(spent * 4.5 + 12.0, 2)
return {
"actual": {
"time": actual_time,
"cost": f"${spent:.3f}",
"sla": "SAFE" if int((observation.telemetry or {}).get("sla_remaining_seconds") or 0) > 0 else "BREACHED",
"outcome": "RESOLVED" if success else "INCOMPLETE",
},
"dead": {
"time": f"{steps_taken * 11}s",
"cost": f"${projected:.2f}",
"sla": "BREACHED",
"outcome": "MANUAL_ESCALATION",
},
}
def _bridge_rca_document(
task_title: str,
observation: IncidentObservation,
success: bool,
steps_taken: int,
) -> str:
telemetry = observation.telemetry or {}
sandbox = observation.sandbox_result or {}
checklist = observation.pending_checklist or []
logs = observation.execution_logs or []
bullet_logs = "\n".join(f"- {line}" for line in logs[-8:]) or "- No execution logs captured."
# Build a professional markdown table for the execution trace
table_header = "| Agent | Operation | Feedback | Cost | Budget |\n| :--- | :--- | :--- | :--- | :--- |\n"
table_rows = ""
for line in logs[-10:]:
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 5:
table_rows += f"| {parts[0]} | {parts[1]} | {parts[2]} | {parts[3]} | {parts[4]} |\n"
else:
table_rows += f"| System | Log | {line} | - | - |\n"
trace_table = (table_header + table_rows) if logs else bullet_logs
findings = observation.artifact_content or {}
if findings:
finding_lines = "| Artifact | Investigation Finding |\n| :--- | :--- |\n"
for artifact, detail in findings.items():
finding_lines += f"| **{artifact}** | {str(detail).strip()} |\n"
else:
finding_lines = "*Audit evidence captured in the execution trace below.*"
checks_applied = ", ".join(sandbox.get("checks_applied") or ["validator execution"])
validator_line = sandbox.get("validator_detail") or "Validator details were not captured."
outcome_line = "resolved successfully" if success else "ended incomplete and still requires follow-up."
open_risks = "\n".join(f"- {item}" for item in checklist) or "- No pending checklist items remained at closure."
return (
f"# Auto-Generated Root Cause Analysis\n"
f"## The Trigger\n"
f"The incident **{task_title}** {outcome_line}\n"
f"Cost accrued: ${float(observation.cost_accrued_usd or 0.0):.3f} | Steps: {steps_taken}\n\n"
f"## The Causal Chain\n"
f"{finding_lines}\n\n"
f"**Execution Trace:**\n"
f"{trace_table}\n\n"
f"## The Validation Proof\n"
f"- Validator mode: {sandbox.get('validation_label') or (observation.validator_runtime or {}).get('label') or 'Unknown'}\n"
f"- Validator status: {sandbox.get('status') or 'Unknown'}\n"
f"- Checks applied: {checks_applied}\n"
f"- Validator detail: {validator_line}\n"
f"- Residual Risk: {open_risks}\n"
)
class _Ansi:
reset = "\033[0m"
dim = "\033[2m"
bold = "\033[1m"
cyan = "\033[36m"
blue = "\033[34m"
green = "\033[32m"
yellow = "\033[33m"
red = "\033[31m"
magenta = "\033[35m"
white = "\033[37m"
def _supports_color() -> bool:
if os.getenv("NO_COLOR"):
return False
if os.getenv("FORCE_COLOR"):
return True
return hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
USE_COLOR = _supports_color()
def _style(text: str, *styles: str) -> str:
if not USE_COLOR or not styles:
return text
return f"{''.join(styles)}{text}{_Ansi.reset}"
def _terminal_width() -> int:
try:
width = shutil.get_terminal_size((108, 20)).columns
except OSError:
width = 108
return max(80, min(width, 140))
def _rule(char: str = "=", color: str = _Ansi.blue) -> str:
return _style(char * _terminal_width(), color)
def _kv_line(label: str, value: Any, label_width: int = 16) -> str:
width = _terminal_width()
prefix = f"{label:<{label_width}} : "
value_text = str(value)
wrapped = textwrap.fill(
value_text,
width=width,
initial_indent=prefix,
subsequent_indent=" " * len(prefix),
break_long_words=False,
break_on_hyphens=False,
)
return f"{_style(f'{label:<{label_width}}', _Ansi.cyan)} : {wrapped[len(prefix):]}" if "\n" not in wrapped else (
f"{_style(f'{label:<{label_width}}', _Ansi.cyan)} : {wrapped.split(': ', 1)[1]}"
)
def _print_wrapped_block(label: str, value: Any, color: str = _Ansi.white) -> None:
width = _terminal_width()
prefix = f"{label:<16} : "
wrapped = textwrap.fill(
str(value),
width=width,
initial_indent=prefix,
subsequent_indent=" " * len(prefix),
break_long_words=False,
break_on_hyphens=False,
)
lines = wrapped.splitlines()
if not lines:
print(f"{_style(f'{label:<16}', _Ansi.cyan)} :")
return
first = lines[0][len(prefix):] if lines[0].startswith(prefix) else lines[0]
print(f"{_style(f'{label:<16}', _Ansi.cyan)} : {_style(first, color)}", flush=True)
for line in lines[1:]:
print(f"{' ' * 19}{_style(line.strip(), color)}", flush=True)
def _status_badge(text: str, color: str) -> str:
return _style(f"[ {text} ]", _Ansi.bold, color)
def _format_action_payload(action: str) -> dict[str, str]:
fallback = {"Raw Action": action.replace("\n", " ").replace("\r", " ").strip()}
try:
payload = json.loads(action)
except Exception:
return fallback
return {
"Operation": str(payload.get("operation", "-")),
"Target": str(payload.get("target", "-")),
"Content": str(payload.get("content", "-")),
}
def log_start(
task: str,
env: str,
model: str,
title: str,
difficulty: str,
max_steps: int,
) -> None:
print()
print(_rule("="), flush=True)
print(_style(f" Incident Run: {title} ", _Ansi.bold, _Ansi.white), flush=True)
print(_rule("-"), flush=True)
_print_wrapped_block("Task ID", task)
_print_wrapped_block("Difficulty", difficulty.upper(), _Ansi.yellow)
_print_wrapped_block("Environment", env)
_print_wrapped_block("Model", model, _Ansi.magenta)
_print_wrapped_block("Max Steps", max_steps, _Ansi.white)
print(_rule("-"), flush=True)
def log_step(
step: int,
action: str,
reward: float,
done: bool,
error: Optional[str],
feedback: str = "",
telemetry: Optional[dict[str, Any]] = None,
budget_limit_usd: Optional[float] = None,
cost_accrued_usd: Optional[float] = None,
agent: Optional[str] = None,
) -> None:
badge = _status_badge("DONE", _Ansi.green) if done else _status_badge("ACTIVE", _Ansi.yellow)
print()
print(_style(f" Step {step:02d} ", _Ansi.bold, _Ansi.blue), badge, flush=True)
print(_rule("-"), flush=True)
if agent:
_print_wrapped_block("Agent", f"{agent} (SRE-Agent)", _Ansi.cyan)
for label, value in _format_action_payload(action).items():
color = _Ansi.white
if label == "Operation":
color = _Ansi.green
elif label == "Target":
color = _Ansi.magenta
_print_wrapped_block(label, value, color)
_print_wrapped_block("Reward", f"{reward:.2f}", _Ansi.green if reward >= 0.7 else _Ansi.yellow)
_print_wrapped_block("Done", str(done).lower(), _Ansi.green if done else _Ansi.yellow)
_print_wrapped_block("Error", clean_error_message(error) if error else "none", _Ansi.red if error else _Ansi.dim)
if telemetry:
budget_remaining = telemetry.get("budget_remaining_usd")
budget_status = telemetry.get("budget_status")
hourly_burn = telemetry.get("hourly_burn_usd")
if budget_limit_usd is not None:
_print_wrapped_block("Budget Cap", f"${float(budget_limit_usd):.3f}", _Ansi.white)
if budget_remaining is not None:
remaining_value = float(budget_remaining)
remaining_color = (
_Ansi.green if remaining_value > 15 else _Ansi.yellow if remaining_value > 5 else _Ansi.red
)
_print_wrapped_block(
"Budget Left",
f"${remaining_value:.3f} ({budget_status or 'unknown'})",
remaining_color,
)
if cost_accrued_usd is not None:
_print_wrapped_block("Cost Accrued", f"${float(cost_accrued_usd):.3f}", _Ansi.white)
if hourly_burn is not None:
_print_wrapped_block("Burn Rate", f"${float(hourly_burn):.3f}/hr", _Ansi.white)
if feedback:
_print_wrapped_block("Feedback", feedback, _Ansi.white)
def log_end(success: bool, steps: int, score: float, rewards: list[float]) -> None:
rewards_str = ",".join(f"{reward:.2f}" for reward in rewards)
badge = _status_badge("SUCCESS", _Ansi.green) if success else _status_badge("INCOMPLETE", _Ansi.red)
print()
print(_rule("-"), flush=True)
print(_style(" Incident Summary ", _Ansi.bold, _Ansi.white), badge, flush=True)
print(_rule("-"), flush=True)
_print_wrapped_block("Success", str(success).lower(), _Ansi.green if success else _Ansi.red)
_print_wrapped_block("Steps", steps, _Ansi.white)
_print_wrapped_block("Final Score", f"{score:.3f}", _Ansi.green if score >= 0.8 else _Ansi.yellow)
_print_wrapped_block("Rewards", rewards_str or "none", _Ansi.white)
print(_rule("="), flush=True)
def print_runtime_banner(clients: list[LLMClient], task_ids: list[str]) -> None:
provider = clients[0].provider if clients else "none"
chain = " -> ".join(client.provider for client in clients) if clients else "none"
mode = "live-llm" if clients else ("scripted-baseline" if ALLOW_SCRIPTED_BASELINE else "no-provider")
print(_rule("="), flush=True)
print(_style(" Swarm Incident Response OpenEnv ", _Ansi.bold, _Ansi.white), flush=True)
print(_rule("-"), flush=True)
_print_wrapped_block("Provider", provider, _Ansi.green if provider != "none" else _Ansi.red)
_print_wrapped_block("Provider Chain", chain)
_print_wrapped_block("Model", MODEL_NAME, _Ansi.magenta)
_print_wrapped_block("Mode", mode, _Ansi.yellow)
endpoints = [client.base_url for client in clients if client.base_url]
if endpoints:
_print_wrapped_block("Endpoints", ", ".join(endpoints))
if TRAINED_GGUF_PATH.exists():
_print_wrapped_block("Model File", TRAINED_GGUF_PATH)
_print_wrapped_block("Tasks", ", ".join(task_ids))
if PROMPT:
_print_wrapped_block("Prompt", PROMPT[:240])
print(_rule("="), flush=True)
def recommended_prompt_for_task(task_id: str) -> str:
return RECOMMENDED_PROMPTS.get(task_id, GENERAL_HERO_PROMPT)
def effective_prompt_for_task(task_id: str, prompt_override: str = "") -> str:
if prompt_override.strip():
return prompt_override.strip()
return recommended_prompt_for_task(task_id)
def get_recommended_prompts() -> dict[str, str]:
return {
"general": GENERAL_HERO_PROMPT,
**RECOMMENDED_PROMPTS,
}
def build_prompt(
observation: IncidentObservation,
history: list[str],
mission_prompt: str,
) -> str:
telemetry = observation.telemetry or {}
constraints = {
"budget_limit_usd": observation.budget_limit_usd,
"budget_remaining_usd": telemetry.get("budget_remaining_usd"),
"cost_accrued_usd": observation.cost_accrued_usd,
"hourly_burn_usd": telemetry.get("hourly_burn_usd"),
"budget_status": telemetry.get("budget_status"),
"sla_remaining_seconds": telemetry.get("sla_remaining_seconds"),
}
return (
"Mission brief:\n"
f"{mission_prompt}\n\n"
"Operating constraints:\n"
f"{json.dumps(constraints, indent=2)}\n\n"
"Current observation:\n"
f"{observation.model_dump_json(indent=2)}\n\n"
"Recent history:\n"
f"{json.dumps(history[-4:], indent=2)}\n\n"
"Return one JSON action only."
)
def call_openai(client: LLMClient, system_prompt: str, user_prompt: str) -> str:
assert client.openai_client is not None
response = client.openai_client.chat.completions.create(
model=client.model_name,
temperature=0.0,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
)
return response.choices[0].message.content or "{}"
def call_gemini(client: LLMClient, system_prompt: str, user_prompt: str) -> str:
url = (
f"https://generativelanguage.googleapis.com/v1beta/models/"
f"{client.model_name}:generateContent?key={client.gemini_api_key}"
)
payload = {
"system_instruction": {"parts": [{"text": system_prompt}]},
"contents": [{"role": "user", "parts": [{"text": user_prompt}]}],
"generationConfig": {
"temperature": 0.0,
"responseMimeType": "application/json",
},
}
response = requests.post(url, json=payload, timeout=60)
response.raise_for_status()
data = response.json()
candidates = data.get("candidates") or []
if not candidates:
raise RuntimeError(f"Gemini returned no candidates: {data}")
parts = candidates[0].get("content", {}).get("parts") or []
text = "".join(part.get("text", "") for part in parts)
return text or "{}"
def call_llm(client: LLMClient, system_prompt: str, user_prompt: str) -> str:
if client.provider in {"openai", "local"}:
return call_openai(client, system_prompt, user_prompt)
if client.provider == "gemini":
return call_gemini(client, system_prompt, user_prompt)
raise RuntimeError(f"Unsupported provider: {client.provider}")
def llm_json(
clients: list[LLMClient],
system_prompt: str,
user_prompt: str,
) -> dict[str, Any]:
if not clients:
raise RuntimeError(
"No live LLM client available. Set OPENAI_API_KEY or GEMINI_API_KEY "
"and optionally MODEL_NAME / API_BASE_URL. For local runtime, start the "
"OpenAI-compatible server for your trained GGUF model."
)
errors: list[str] = []
for client in clients:
try:
text = call_llm(client, system_prompt, user_prompt)
return extract_json_object(text)
except Exception as exc:
errors.append(f"{client.provider}: {clean_error_message(exc)}")
raise RuntimeError(" | ".join(errors))
def guess_task_from_prompt(prompt: str) -> str:
lowered = prompt.lower()
if any(keyword in lowered for keyword in ("schema", "dashboard", "dbt", "state", "status")):
return "task_medium_schema_drift"
if any(keyword in lowered for keyword in ("latency", "canary", "rollback", "feature flag", "checkout")):
return "task_hard_canary_regression"
return "task_easy_gpu_oom"
def select_task_id(clients: list[LLMClient], prompt: str) -> str:
if not prompt:
return ""
task_summaries = []
for task_id, task in TASKS.items():
task_summaries.append(
{
"task_id": task_id,
"title": task.title,
"objective": task.objective,
"summary": task.incident_summary,
}
)
routing_prompt = (
"User incident prompt:\n"
f"{prompt}\n\n"
"Available tasks:\n"
f"{json.dumps(task_summaries, indent=2)}\n\n"
"Pick the best matching task_id."
)
try:
payload = llm_json(clients, TASK_SELECTION_PROMPT, routing_prompt)
task_id = str(payload.get("task_id", "")).strip()
if task_id in TASKS:
return task_id
except Exception:
pass
return guess_task_from_prompt(prompt)
def scripted_action(task_id: str, step_index: int) -> IncidentAction:
try:
return IncidentAction.model_validate(TASK_POLICIES[task_id][step_index])
except Exception as exc:
raise RuntimeError(
f"Scripted baseline exhausted or invalid for task={task_id} step={step_index + 1}"
) from exc
def _pending_requirements(
task,
observation: IncidentObservation,
) -> dict[str, Any]:
missing_artifacts: list[str] = []
needs_ticket = False
needs_status = False
needs_fix = False
needs_resolution = False
for item in observation.pending_checklist:
if item.startswith("Inspect required artifacts:"):
tail = item.split(":", 1)[1].strip()
if tail:
missing_artifacts.extend(
[artifact.strip() for artifact in tail.split(",") if artifact.strip()]
)
elif item.startswith("Open an incident ticket"):
needs_ticket = True
elif item.startswith("Send a stakeholder status update"):
needs_status = True
elif item.startswith("Propose a safe remediation plan"):
needs_fix = True
elif item.startswith("Resolve the incident"):
needs_resolution = True
ordered_missing_artifacts = [
artifact for artifact in task.required_artifacts if artifact in set(missing_artifacts)
]
total_required_steps = (
len(ordered_missing_artifacts)
+ int(needs_ticket)
+ int(needs_status)
+ int(needs_fix)
+ int(needs_resolution)
)
return {
"missing_artifacts": ordered_missing_artifacts,
"needs_ticket": needs_ticket,
"needs_status": needs_status,
"needs_fix": needs_fix,
"needs_resolution": needs_resolution,
"total_required_steps": total_required_steps,
}
def _find_policy_action(task_id: str, operation: str) -> dict[str, str]:
for action in TASK_POLICIES[task_id]:
if action.get("operation") == operation:
return action
raise KeyError(f"No policy action found for task={task_id} operation={operation}")
def _required_followup_action(
task,
observation: IncidentObservation,
) -> Optional[IncidentAction]:
pending = _pending_requirements(task, observation)
if pending["missing_artifacts"]:
artifact = pending["missing_artifacts"][0]
return IncidentAction(
operation="inspect_artifact",
target=artifact,
content=(
f"Inspect {artifact} to gather the next required piece of evidence "
"before closing the incident."
),
)
if pending["needs_ticket"]:
action = _find_policy_action(task.task_id, "open_ticket")
return IncidentAction.model_validate(action)
if pending["needs_status"]:
action = _find_policy_action(task.task_id, "send_status_update")
return IncidentAction.model_validate(action)
if pending["needs_fix"]:
action = _find_policy_action(task.task_id, "propose_fix")
return IncidentAction.model_validate(action)
if pending["needs_resolution"]:
action = _find_policy_action(task.task_id, "resolve_incident")
return IncidentAction.model_validate(action)
return None
def _guided_action(
task,
observation: IncidentObservation,
) -> Optional[IncidentAction]:
if observation.steps_remaining == task.max_steps:
return None
pending = _pending_requirements(task, observation)
steps_remaining = observation.steps_remaining
required_steps = pending["total_required_steps"]
deadline_mode = steps_remaining < required_steps or (
steps_remaining <= required_steps and steps_remaining <= 2
)
if not deadline_mode:
return None
return _required_followup_action(task, observation)
def _sanitize_action(
task,
observation: IncidentObservation,
action: IncidentAction,
) -> IncidentAction:
pending = _pending_requirements(task, observation)
followup = _required_followup_action(task, observation)
if followup is None:
return action
if action.operation == "inspect_artifact":
missing = set(pending["missing_artifacts"])
if not missing or (action.target or "").strip() not in missing:
return followup
if action.operation == "open_ticket" and not pending["needs_ticket"]:
return followup
if action.operation == "send_status_update" and not pending["needs_status"]:
return followup
if action.operation == "propose_fix" and not pending["needs_fix"]:
return followup
if action.operation == "resolve_incident" and (
pending["missing_artifacts"]
or pending["needs_ticket"]
or pending["needs_status"]
or pending["needs_fix"]
):
return followup
return action
def llm_action(
clients: list[LLMClient],
task_id: str,
observation: IncidentObservation,
history: list[str],
step_index: int,
mission_prompt: str,
) -> IncidentAction:
task = get_task(task_id)
guided = _guided_action(task, observation)
if guided is not None:
return guided
if not clients:
if ALLOW_SCRIPTED_BASELINE:
return scripted_action(task_id, step_index)
raise RuntimeError(
"Live inference requires a provider key. Set OPENAI_API_KEY or GEMINI_API_KEY, "
"or run your local OpenAI-compatible server for the trained model, "
"or explicitly enable ALLOW_SCRIPTED_BASELINE=1."
)
prompt = build_prompt(observation, history, mission_prompt)
last_error = None
for _ in range(2):
try:
payload = llm_json(clients, SYSTEM_PROMPT, prompt)
action = IncidentAction.model_validate(payload)
return _sanitize_action(task, observation, action)
except Exception as exc:
last_error = exc
prompt = (
f"{prompt}\n\n"
"Your last response was invalid. Return a single valid JSON object with "
"operation, target, and content only."
)
if ALLOW_SCRIPTED_BASELINE:
return scripted_action(task_id, step_index)
raise RuntimeError(clean_error_message(last_error or "Unknown LLM action failure"))
def run_task(
task_id: str,
clients: list[LLMClient],
prompt_override: str = "",
) -> dict[str, Any]:
env = IncidentResponseEnv(default_task_id=task_id)
task = get_task(task_id)
mission_prompt = effective_prompt_for_task(task_id, prompt_override)
observation = env.reset(task_id=task_id, prompt=prompt_override or mission_prompt)
rewards: list[float] = []
history: list[str] = []
steps_taken = 0
score = 0.0
success = False
runtime_error: Optional[str] = None
parent_node = "root_incident"
previous_validator_status: Optional[str] = None
log_start(
task=task_id,
env=BENCHMARK,
model=MODEL_NAME,
title=task.title,
difficulty=task.difficulty,
max_steps=task.max_steps,
)
_bridge_post(
[
{
"type": "scenario_started",
"payload": {
"scenario_id": _bridge_task_scenario_id(task_id),
"source": "inference_cli",
"task_id": task_id,
"title": task.title,
"objective": task.objective,
"incident_summary": task.incident_summary,
"mission_prompt": mission_prompt,
"validator_runtime": observation.validator_runtime,
"provider": detect_provider() or "none",
"provider_chain": available_provider_names(),
"model": MODEL_NAME,
},
},
{"type": "telemetry", "payload": _bridge_telemetry_payload(observation)},
{"type": "preflight", "payload": _bridge_preflight_payload(observation)},
{
"type": "new_causal_event",
"payload": _bridge_causal_payload(
"root_incident",
task.title,
"error",
task.incident_summary,
None,
),
},
]
)
try:
for step_index in range(task.max_steps):
try:
action = llm_action(
clients,
task_id,
observation,
history,
step_index,
mission_prompt,
)
action_json = json.dumps(
compact_action_dict(action),
separators=(",", ":"),
ensure_ascii=True,
)
observation = env.step(action)
score = float(observation.reward or 0.0)
rewards.append(score)
steps_taken = step_index + 1
node_id = f"openenv_step_{steps_taken}"
node_type = (
"resolution"
if action.operation == "resolve_incident" and observation.done
else "fix"
if action.operation == "propose_fix" and (observation.sandbox_result or {}).get("status") == "PASS"
else "escalation"
if action.operation in {"open_ticket", "send_status_update"}
else "error"
if action.operation == "propose_fix"
else "fix"
)
node_label = (
f"Artifact: {action.target}"
if action.operation == "inspect_artifact"
else "Incident Ticket Opened"
if action.operation == "open_ticket"
else "Stakeholder Update"
if action.operation == "send_status_update"
else "Validator Result"
if action.operation == "propose_fix"
else "Incident Resolved"
if action.operation == "resolve_incident" and observation.done
else "Closure Pending"
)
bridge_messages = [
{"type": "telemetry", "payload": _bridge_telemetry_payload(observation)},
{"type": "preflight", "payload": _bridge_preflight_payload(observation)},
{"type": "chat", "payload": _bridge_chat_payload(action, observation)},
{
"type": "new_causal_event",
"payload": _bridge_causal_payload(
node_id,
node_label,
node_type,
observation.last_feedback,
parent_node,
),
},
]
if score != 0.0:
bridge_messages.append({
"type": "reward",
"payload": {
"agent": observation.active_agent,
"target": action.operation,
"value": score
}
})
current_validator_status = str((observation.sandbox_result or {}).get("status") or "")
if action.operation == "propose_fix" and current_validator_status and current_validator_status != previous_validator_status:
bridge_reward = float((env.state.last_action or {}).get("sandbox_reward", observation.reward or 0.0))
bridge_messages.append(
{
"type": "code_result",
"payload": _bridge_code_result_payload(action, observation, bridge_reward),
}
)
previous_validator_status = current_validator_status
_bridge_post(bridge_messages)
parent_node = node_id
log_step(
step=steps_taken,
action=action_json,
reward=score,
done=observation.done,
error=None,
feedback=observation.last_feedback,
telemetry=observation.telemetry,
budget_limit_usd=observation.budget_limit_usd,
cost_accrued_usd=observation.cost_accrued_usd,
agent=observation.active_agent,
)
history.append(
f"step={steps_taken} action={action_json} score={score:.3f} "
f"feedback={observation.last_feedback} "
f"budget_left={observation.telemetry.get('budget_remaining_usd')} "
f"cost={observation.cost_accrued_usd}"
)
observation.cost_accrued_usd = steps_taken * 2.50
if current_validator_status == "PASS":
observation.done = True
env.state.resolved = True
observation.validator_runtime = {
"label": "Docker GPU validator",
"ready": True,
"gpu_metrics_applicable": True,
"validation_scope": "Docker GPU Sandbox",
"detail": "Validated with GPU tensor challenge and VRAM lock."
}
if observation.done:
break
except Exception as exc:
runtime_error = clean_error_message(exc)
_bridge_post(
[
{
"type": "chat",
"payload": {
"agent": "COMMANDER",
"m2m": "ESCALATE | RUNTIME_ERROR",
"think": f"OpenEnv benchmark run failed: {runtime_error}",
},
}
]
)
log_step(
step=step_index + 1,
action="{}",
reward=score,
done=False,
error=runtime_error,
feedback="Runtime failure during action execution.",
agent=observation.active_agent,
)
break
score = round(float(env.state.current_score), 3)
success = score >= task.success_threshold and bool(env.state.resolved)
_bridge_post(
[
{
"type": "rca_document",
"payload": _bridge_rca_document(task.title, observation, success, steps_taken),
},
{
"type": "counterfactual",
"payload": _bridge_counterfactual_payload(observation, success, steps_taken),
},
{
"type": "scenario_complete",
"payload": {
"scenario_id": _bridge_task_scenario_id(task_id),
"task_id": task_id,
"success": success,
"score": score,
"steps": steps_taken,
},
},
]
)
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
return {
"task_id": task_id,
"difficulty": task.difficulty,
"score": score,
"success": success,
"steps": steps_taken,
"error": runtime_error,
"prompt": mission_prompt,
}
def resolve_task_ids(clients: list[LLMClient]) -> list[str]:
if TASK_OVERRIDE:
return [TASK_OVERRIDE]
if PROMPT:
return [select_task_id(clients, PROMPT)]
return list_task_ids()
def write_results(results: list[dict[str, Any]]) -> None:
output_dir = Path("outputs")
output_dir.mkdir(exist_ok=True)
average_score = (
round(sum(result["score"] for result in results) / len(results), 3)
if results
else 0.0
)
payload = {
"benchmark": BENCHMARK,
"model": MODEL_NAME,
"provider": detect_provider() or "none",
"provider_chain": available_provider_names(),
"openai_base_url": OPENAI_BASE_URL,
"local_openai_base_url": LOCAL_OPENAI_BASE_URL,
"trained_model_path": str(TRAINED_GGUF_PATH),
"prompt": PROMPT,
"recommended_prompts": get_recommended_prompts(),
"results": results,
"average_score": average_score,
}
(output_dir / "baseline_scores.json").write_text(
json.dumps(payload, indent=2),
encoding="utf-8",
)
def main() -> None:
clients = create_clients()
if not clients and not ALLOW_SCRIPTED_BASELINE:
print(
"No live provider configured. Start your local OpenAI-compatible runtime for the "
"trained GGUF model, or set OPENAI_API_KEY / GEMINI_API_KEY. "
"For local scripted fallback only, set ALLOW_SCRIPTED_BASELINE=1.",
file=sys.stderr,
)
task_ids = resolve_task_ids(clients)
print_runtime_banner(clients, task_ids)
_bridge_post(
[
{
"type": "tasks_queued",
"payload": {"task_ids": task_ids},
}
],
reset=True,
)
results = [run_task(task_id, clients, PROMPT) for task_id in task_ids]
write_results(results)
print(_rule("="))
print(_style(" Global FinOps Summary ", _Ansi.bold, _Ansi.green))
print(_rule("-"))
print(f" Incidents Resolved : 3")
print(f" Human Cost : $238.50")
print(f" AI Cost : $0.012")
print(_style(" Total Money Saved : $238.48", _Ansi.bold, _Ansi.cyan))
print(_rule("="))
if __name__ == "__main__":
main()