"""Strict model-only inference runner for the queue operations benchmark. This variant intentionally removes heuristic fallback paths. Every decision must come from either: 1) replay trace input (ACTION_TRACE_FILE), or 2) model output. If model output is invalid/unavailable, the seed run is marked failed. """ import asyncio import csv import json import os import statistics import textwrap from typing import Any, List, Optional from urllib.parse import urlparse, urlunparse from dotenv import load_dotenv from openai import OpenAI load_dotenv() from cloud_queue_env import CloudQueueAction, CloudQueueEnv, CloudQueueObservation IMAGE_NAME = os.getenv("IMAGE_NAME") BASE_URL = os.getenv("BASE_URL") API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" MODEL_NAME = os.getenv("MODEL_NAME") or "Qwen/Qwen2.5-72B-Instruct" API_KEY = os.getenv("API_KEY") or os.getenv("HF_TOKEN") BENCHMARK = os.getenv("BENCHMARK", "queueops-openenv") TASKS = ["easy", "medium", "hard"] TASK_SEEDS_JSON = os.getenv("TASK_SEEDS_JSON") SEEDS = [11, 23, 37] TEMPERATURE = 0.2 MAX_TOKENS = 780 SUCCESS_SCORE_THRESHOLD = 0.60 # Test-friendly default. Set MAX_STEPS_OVERRIDE=0 for full horizon. MAX_STEPS_OVERRIDE = int(os.getenv("MAX_STEPS_OVERRIDE", "8") or "8") ACTION_TRACE_FILE = os.getenv("ACTION_TRACE_FILE") REPORT_JSON_PATH = os.getenv("REPORT_JSON_PATH") REPORT_CSV_PATH = os.getenv("REPORT_CSV_PATH") OPEN_SCORE_MIN = 0.001 OPEN_SCORE_MAX = 0.999 SYSTEM_PROMPT = textwrap.dedent( """ You are an agent controlling a cloud queue scheduling environment. Your goal: minimize wait times, SLA violations, and cost while maximizing throughput. OUTPUT FORMAT (strict): - Return exactly one JSON object. - No markdown, no code fences, no explanations, no extra keys. - Always include all fields below. Required JSON schema: { "action_type": "admit|reject|route|dispatch|scale|reprioritize|noop", "target_queue": integer or null, "target_server": integer or null, "scale_delta": integer or null, "new_priority": integer or null } Task constraints: - easy: only admit/reject/dispatch/noop - medium: only admit/reject/route/dispatch/reprioritize/noop - hard: only admit/reject/route/dispatch/reprioritize/scale/noop """ ).strip() ACTION_TYPES = ( "configure_task", "admit", "reject", "route", "dispatch", "scale", "reprioritize", "noop", ) TASK_ALLOWED_ACTIONS = { "easy": {"admit", "reject", "dispatch", "noop"}, "medium": {"admit", "reject", "route", "dispatch", "reprioritize", "noop"}, "hard": {"admit", "reject", "route", "dispatch", "reprioritize", "scale", "noop"}, } def clamp_open_score(value: float) -> float: if not isinstance(value, (int, float)) or not (value == value): return OPEN_SCORE_MIN return max(OPEN_SCORE_MIN, min(OPEN_SCORE_MAX, float(value))) def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) def parse_task_seed_map() -> dict[str, list[int]]: if TASK_SEEDS_JSON: try: data = json.loads(TASK_SEEDS_JSON) task_map: dict[str, list[int]] = {} for task_name, seeds in data.items(): parsed = [int(s) for s in seeds] if parsed: task_map[str(task_name)] = parsed if task_map: return task_map except Exception: pass return { "easy": [SEEDS[0]], "medium": [SEEDS[1]], "hard": [SEEDS[2]], } def _action_from_dict(data: dict) -> CloudQueueAction: return CloudQueueAction( action_type=str(data.get("action_type", "noop")), target_queue=data.get("target_queue"), target_server=data.get("target_server"), scale_delta=data.get("scale_delta"), new_priority=data.get("new_priority"), ) def load_replay_actions() -> dict[str, list[CloudQueueAction]]: if not ACTION_TRACE_FILE: return {} try: with open(ACTION_TRACE_FILE, "r", encoding="utf-8") as f: payload = json.load(f) except Exception: return {} replay: dict[str, list[CloudQueueAction]] = {} if isinstance(payload, dict): for key, action_list in payload.items(): if not isinstance(action_list, list): continue parsed = [] for item in action_list: if isinstance(item, dict): parsed.append(_action_from_dict(item)) if parsed: replay[str(key)] = parsed return replay def ci95(values: list[float]) -> float: if len(values) <= 1: return 0.0 std = statistics.pstdev(values) return 1.96 * std / (len(values) ** 0.5) def write_reports(seed_rows: list[dict], task_score_table: dict[str, list[float]]) -> None: if REPORT_JSON_PATH: report_payload = { "seed_rows": seed_rows, "task_summary": { task: { "mean": statistics.mean(scores) if scores else 0.0, "std": statistics.pstdev(scores) if len(scores) > 1 else 0.0, "ci95": ci95(scores), "count": len(scores), } for task, scores in task_score_table.items() }, } try: with open(REPORT_JSON_PATH, "w", encoding="utf-8") as f: json.dump(report_payload, f, indent=2) except Exception: pass if REPORT_CSV_PATH: try: with open(REPORT_CSV_PATH, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter( f, fieldnames=[ "task", "seed", "score", "steps", "success", "trace_digest", "invalid_actions", "harmful_scale_down", "failure_reason", ], ) writer.writeheader() for row in seed_rows: writer.writerow(row) except Exception: pass def build_obs_summary(obs: CloudQueueObservation, task_name: str) -> str: max_sizes = {"easy": 28, "medium": 42, "hard": 64} max_q = max_sizes.get(task_name, 30) fills = [f"{l}/{max_q}({100*l//max_q}%)" for l in obs.queue_lengths] busy_count = sum(obs.server_busy) total_servers = len(obs.server_busy) servers_str = f"{busy_count}/{total_servers} busy" if obs.incoming_job_present: urgency = "URGENT" if obs.incoming_job_priority >= 2 else "normal" incoming_str = f"YES [{urgency} size={obs.incoming_job_size:.1f} deadline={obs.incoming_job_deadline:.0f}]" else: incoming_str = "none" return ( f"task={task_name} | " f"queues={fills} | " f"servers={servers_str} | " f"incoming={incoming_str} | " f"sla_breach={obs.sla_violation_rate:.3f} | " f"abandonment={obs.abandonment_rate:.3f} | " f"cost_rate={obs.energy_cost_rate:.3f}" ) def build_user_prompt(step: int, obs_summary: str, last_reward: float, history: List[str]) -> str: history_block = "\n".join(history[-4:]) if history else "None" return textwrap.dedent( f""" Step {step} | Last reward: {last_reward:.2f} State: {obs_summary} Recent actions: {history_block} Choose the best action now. """ ).strip() def _coerce_optional_int(value: Any) -> Optional[int]: if value is None: return None if isinstance(value, bool): return int(value) if isinstance(value, int): return value if isinstance(value, float): return int(value) if isinstance(value, str): txt = value.strip().lower() if txt in {"", "null", "none"}: return None try: return int(txt) except ValueError: try: return int(float(txt)) except ValueError: return None return None def _extract_json_object(text: str) -> Optional[dict[str, Any]]: cleaned = (text or "").strip() if not cleaned: return None if cleaned.startswith("```"): chunks = [chunk.strip() for chunk in cleaned.split("```") if chunk.strip()] for chunk in chunks: candidate = chunk if candidate.lower().startswith("json"): candidate = candidate[4:].strip() try: parsed = json.loads(candidate) if isinstance(parsed, dict): return parsed if isinstance(parsed, list) and parsed and isinstance(parsed[0], dict): return parsed[0] except Exception: continue try: parsed = json.loads(cleaned) if isinstance(parsed, dict): return parsed if isinstance(parsed, list) and parsed and isinstance(parsed[0], dict): return parsed[0] except Exception: pass start = 0 while True: open_idx = cleaned.find("{", start) if open_idx < 0: return None depth = 0 for i in range(open_idx, len(cleaned)): ch = cleaned[i] if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: candidate = cleaned[open_idx : i + 1] try: parsed = json.loads(candidate) if isinstance(parsed, dict): return parsed except Exception: break start = open_idx + 1 def _normalize_action_payload(data: dict[str, Any], task_name: str) -> Optional[dict[str, Any]]: action_type = str(data.get("action_type", "noop")).strip().lower() if action_type not in ACTION_TYPES: return None if action_type not in TASK_ALLOWED_ACTIONS.get(task_name, set(ACTION_TYPES)): return None target_queue = _coerce_optional_int(data.get("target_queue")) target_server = _coerce_optional_int(data.get("target_server")) scale_delta = _coerce_optional_int(data.get("scale_delta")) new_priority = _coerce_optional_int(data.get("new_priority")) if action_type in {"admit", "route", "dispatch"} and target_queue is None: target_queue = 0 if action_type in {"reject", "noop"}: target_queue = None target_server = None if action_type == "scale": if scale_delta is None: return None scale_delta = max(-2, min(2, scale_delta)) else: scale_delta = None if action_type == "reprioritize": if new_priority is None: new_priority = 2 else: new_priority = None return { "action_type": action_type, "target_queue": target_queue, "target_server": target_server, "scale_delta": scale_delta, "new_priority": new_priority, } def parse_model_action(text: str, task_name: str) -> Optional[CloudQueueAction]: data = _extract_json_object(text) if data is None: return None payload = _normalize_action_payload(data, task_name) if payload is None: return None try: return CloudQueueAction(**payload) except Exception: return None def _single_line(text: str) -> str: return " ".join((text or "").split()) def get_model_action( client: OpenAI, task_name: str, step: int, obs_summary: str, last_reward: float, history: List[str], ) -> tuple[Optional[CloudQueueAction], Optional[str]]: user_prompt = build_user_prompt(step, obs_summary, last_reward, history) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ] try: completion = client.chat.completions.create( model=MODEL_NAME, messages=messages, temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() action = parse_model_action(text, task_name) if action is None: preview = " ".join(text.split())[:180] return None, f"invalid_model_action_payload: {preview}" return action, None except Exception as exc: return None, str(exc) def get_model_action_with_retry( client: OpenAI, task_name: str, step: int, obs_summary: str, last_reward: float, history: List[str], retries: int = 2, ) -> tuple[Optional[CloudQueueAction], Optional[str]]: last_error: Optional[str] = None for attempt in range(1, retries + 2): action, error = get_model_action( client=client, task_name=task_name, step=step, obs_summary=obs_summary, last_reward=last_reward, history=history, ) if action is not None: return action, None last_error = error return None, last_error def normalize_base_url(base_url: Optional[str]) -> Optional[str]: if not base_url: return base_url cleaned = base_url.strip().rstrip("/") parsed = urlparse(cleaned) if parsed.netloc.lower() == "huggingface.co": parts = [p for p in parsed.path.strip("/").split("/") if p] if len(parts) >= 3 and parts[0] == "spaces": owner, space = parts[1], parts[2] owner = owner.lower().replace("_", "-") space = space.lower().replace("_", "-") return f"https://{owner}-{space}.hf.space" if cleaned.endswith("/web"): cleaned = cleaned[:-4] parsed = urlparse(cleaned) host = (parsed.hostname or "").lower() if host.endswith(".hf.space"): safe_host = host.replace("_", "-") if safe_host != host or (parsed.netloc and parsed.netloc != parsed.netloc.lower()): port_part = f":{parsed.port}" if parsed.port else "" parsed = parsed._replace(netloc=f"{safe_host}{port_part}") cleaned = urlunparse(parsed) return cleaned def _smoke_test_model(client: OpenAI) -> bool: test_question = ( "You are a cloud scheduling agent. " "A job queue is 80% full and a new urgent job just arrived. " "Should you admit the job, reject it, or route it to another queue? " "Answer with exactly one JSON object containing action_type and optional fields." ) try: resp = client.chat.completions.create( model=MODEL_NAME, messages=[{"role": "user", "content": test_question}], temperature=0.0, max_tokens=80, ) reply = (resp.choices[0].message.content or "").strip() if not reply: return False return True except Exception: return False async def main() -> None: if not API_KEY: raise ValueError("API_KEY or HF_TOKEN is required for strict model inference.") client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) if not _smoke_test_model(client): raise RuntimeError("Model smoke test failed. Aborting strict model-only run.") runtime_base_url = normalize_base_url(BASE_URL) if runtime_base_url: env = CloudQueueEnv(base_url=runtime_base_url) else: if not IMAGE_NAME: raise ValueError("Set BASE_URL for deployed env, or IMAGE_NAME for local docker env.") env = await CloudQueueEnv.from_docker_image(IMAGE_NAME) try: task_seed_map = parse_task_seed_map() replay_map = load_replay_actions() task_score_table: dict[str, list[float]] = {} seed_rows: list[dict] = [] for task_name in TASKS: seeds = task_seed_map.get(task_name, []) if not seeds: continue task_score_table[task_name] = [] for seed in seeds: history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = OPEN_SCORE_MIN success = False failure_reason: Optional[str] = None log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) await env.reset() await env.step(CloudQueueAction(action_type="configure_task", task_id=task_name, seed=seed)) result = await env.reset() last_reward = 0.0 max_steps = max(1, int(result.observation.horizon)) if MAX_STEPS_OVERRIDE > 0: max_steps = min(max_steps, MAX_STEPS_OVERRIDE) replay_key = f"{task_name}:{seed}" replay_actions = replay_map.get(replay_key, []) for step in range(1, max_steps + 1): if result.done: break obs = result.observation obs_summary = build_obs_summary(obs, task_name) action: Optional[CloudQueueAction] = None model_error: Optional[str] = None if step - 1 < len(replay_actions): action = replay_actions[step - 1] else: action, model_error = get_model_action_with_retry( client=client, task_name=task_name, step=step, obs_summary=obs_summary, last_reward=last_reward, history=history, retries=2, ) if action is None: failure_reason = f"model_action_unavailable: {model_error}" log_step( step=step, action="model_action_error", reward=0.0, done=True, error=failure_reason, ) steps_taken = step break result = await env.step(action) reward = float(result.reward or 0.0) done = bool(result.done) error = None meta = result.observation.metadata or {} info = meta.get("info", {}) if isinstance(meta, dict) else {} if isinstance(info, dict) and info.get("valid_action") is False: error = str(info.get("note", "invalid_action")) rewards.append(reward) steps_taken = step last_reward = reward action_str = ( f"{action.action_type}(q={action.target_queue},s={action.target_server}," f"d={action.scale_delta},p={action.new_priority})" ) log_step(step=step, action=action_str, reward=reward, done=done, error=error) history.append(f"step={step} action={action_str} reward={reward:.2f}") if done: break if failure_reason is None and isinstance(result.observation.metadata, dict): score = float(result.observation.metadata.get("episode_score", OPEN_SCORE_MIN) or OPEN_SCORE_MIN) elif failure_reason is not None: score = OPEN_SCORE_MIN if failure_reason is None and not bool(result.done): failure_reason = "episode_not_done_within_max_steps" score = OPEN_SCORE_MIN score = clamp_open_score(score) task_score_table[task_name].append(score) success = failure_reason is None and score >= SUCCESS_SCORE_THRESHOLD log_end(success=success, steps=steps_taken, score=score, rewards=rewards) meta = result.observation.metadata or {} metrics = meta.get("metrics", {}) if isinstance(meta, dict) else {} seed_row = { "task": task_name, "seed": int(seed), "score": score, "steps": int(steps_taken), "success": bool(success), "trace_digest": str(meta.get("trace_digest", "")), "invalid_actions": float(metrics.get("invalid_actions", 0.0)), "harmful_scale_down": float(metrics.get("harmful_scale_down", 0.0)), "failure_reason": failure_reason or "", } seed_rows.append(seed_row) task_scores = task_score_table[task_name] task_mean = statistics.mean(task_scores) if task_scores else OPEN_SCORE_MIN task_std = statistics.pstdev(task_scores) if len(task_scores) > 1 else 0.0 task_ci = ci95(task_scores) all_task_means = [] for task_name in TASKS: scores = task_score_table.get(task_name, []) if scores: all_task_means.append(statistics.mean(scores)) if all_task_means: final_score = clamp_open_score(sum(all_task_means) / len(all_task_means)) easy_mean = clamp_open_score(statistics.mean(task_score_table.get("easy", [OPEN_SCORE_MIN]))) medium_mean = clamp_open_score(statistics.mean(task_score_table.get("medium", [OPEN_SCORE_MIN]))) hard_mean = clamp_open_score(statistics.mean(task_score_table.get("hard", [OPEN_SCORE_MIN]))) write_reports(seed_rows=seed_rows, task_score_table=task_score_table) finally: try: await env.close() except Exception: pass if __name__ == "__main__": asyncio.run(main())