Spaces:
Sleeping
Sleeping
File size: 7,515 Bytes
9f586bd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | """
Inference Script — SRE Incident Response Environment
===================================
MANDATORY
- Before submitting, ensure the following variables are defined in your environment configuration:
API_BASE_URL The API endpoint for the LLM.
MODEL_NAME The model identifier to use for inference.
HF_TOKEN Your Hugging Face / API key.
LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image()
STDOUT FORMAT
- The script must emit exactly three line types to stdout, in this order:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
"""
import asyncio
import json
import os
import textwrap
from typing import List, Optional
from openai import OpenAI
from sre_incident_env import SREAction, SREIncidentEnv
IMAGE_NAME = os.getenv("IMAGE_NAME")
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1"
MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct"
BENCHMARK = "sre_incident_env"
TEMPERATURE = 0.7
MAX_TOKENS = 300
SUCCESS_SCORE_THRESHOLD = 0.3
TASKS = [
{"task_id": "easy", "max_steps": 15, "max_reward": 1.0},
{"task_id": "medium", "max_steps": 20, "max_reward": 1.0},
{"task_id": "hard", "max_steps": 25, "max_reward": 1.0},
]
SYSTEM_PROMPT = textwrap.dedent("""
You are an expert SRE (Site Reliability Engineer) responding to a production incident.
You interact with a simulated microservices cluster through commands.
Available commands:
- check_logs <service>: View recent logs
- get_metrics <service>: Get CPU, memory, latency, error rate
- list_alerts: List all active alerts
- check_dependencies <service>: Show dependency graph
- check_network <service>: Show network connections
- check_processes <service>: List running processes
- restart_service <service>: Restart a service
- scale_service <service> replicas=<n>: Scale a service
- rollback_service <service>: Rollback to previous deployment
- kill_process <service> pid=<pid>: Kill a specific process
- update_config <service> key=<k> value=<v>: Update config
- rotate_credentials <service>: Rotate credentials
- clear_disk <service> path=<path>: Clear disk space
- submit_diagnosis root_cause=<text> affected_services=<svc1,svc2>: Submit root cause
Respond with ONLY a JSON object: {"command": "...", "target": "...", "parameters": {...}}
Do not include any other text.
""").strip()
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)
def parse_action(text: str) -> SREAction:
"""Parse LLM response into SREAction."""
text = text.strip()
# Try JSON parse first
try:
data = json.loads(text)
return SREAction(
command=data.get("command", "list_alerts"),
target=data.get("target", ""),
parameters=data.get("parameters", {}),
)
except (json.JSONDecodeError, KeyError):
pass
# Fallback: parse text like "check_logs api-gateway"
parts = text.split()
if len(parts) >= 2:
return SREAction(command=parts[0], target=parts[1])
elif len(parts) == 1:
return SREAction(command=parts[0])
return SREAction(command="list_alerts")
def build_user_prompt(step: int, last_output: str, last_reward: float, history: List[str]) -> str:
history_block = "\n".join(history[-4:]) if history else "None"
return textwrap.dedent(f"""
Step: {step}
Last command output:
{last_output[:1500]}
Last reward: {last_reward:.2f}
Previous actions:
{history_block}
Analyze the situation and decide your next action.
Respond with ONLY a JSON object: {{"command": "...", "target": "...", "parameters": {{...}}}}
""").strip()
def get_model_action(client: OpenAI, step: int, last_output: str, last_reward: float, history: List[str]) -> SREAction:
user_prompt = build_user_prompt(step, last_output, last_reward, history)
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
return parse_action(text) if text else SREAction(command="list_alerts")
except Exception as exc:
print(f"[DEBUG] Model request failed: {exc}", flush=True)
return SREAction(command="list_alerts")
async def run_task(client: OpenAI, env: SREIncidentEnv, task: dict) -> None:
task_id = task["task_id"]
max_steps = task["max_steps"]
max_total_reward = task["max_reward"]
history: List[str] = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
try:
result = await env.reset(task_id=task_id)
last_output = result.observation.output
last_reward = 0.0
for step in range(1, max_steps + 1):
if result.done:
break
action = get_model_action(client, step, last_output, last_reward, history)
action_str = f"{action.command}({action.target})"
result = await env.step(action)
obs = result.observation
reward = result.reward or 0.0
done = result.done
error = None
rewards.append(reward)
steps_taken = step
last_output = obs.output
last_reward = reward
log_step(step=step, action=action_str, reward=reward, done=done, error=error)
history.append(f"Step {step}: {action_str} -> reward {reward:+.2f}")
if done:
break
score = sum(rewards) / max_total_reward if max_total_reward > 0 else 0.0
score = min(max(score, 0.0), 1.0)
success = score >= SUCCESS_SCORE_THRESHOLD
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
async def main() -> None:
client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
env = await SREIncidentEnv.from_docker_image(IMAGE_NAME)
try:
for task in TASKS:
await run_task(client, env, task)
finally:
try:
await env.close()
except Exception as e:
print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True)
if __name__ == "__main__":
asyncio.run(main())
|