Spaces:
Build error
Build error
File size: 13,875 Bytes
c29f1fd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 | #!/usr/bin/env python3
"""Validator-friendly inference entrypoint for the Python code review environment."""
from __future__ import annotations
import io
import json
import os
import sys
import time
from collections.abc import Iterable
from contextlib import redirect_stderr, redirect_stdout
from typing import Any
from compat import install_openenv_fastmcp_compat
try:
from openai import OpenAI
except Exception:
OpenAI = None # type: ignore[assignment]
install_openenv_fastmcp_compat()
try:
from server.env import PythonCodeReviewEnvironment
except Exception:
PythonCodeReviewEnvironment = None # type: ignore[assignment]
try:
from Models import PythonCodeReviewAction
except Exception:
PythonCodeReviewAction = None # type: ignore[assignment]
try:
from tasks import get_task, task_ids
except Exception:
get_task = None # type: ignore[assignment]
task_ids = None # type: ignore[assignment]
ALLOWED_ACTIONS = {
"analyze_code",
"edit_code",
"run_tests",
"submit_solution",
}
DEFAULT_MODEL_NAME = "mock-model"
API_TIMEOUT_SECONDS = 3.0
API_RETRIES = 1
API_RETRY_DELAY_SECONDS = 0.2
MIN_SCORE = 0.01
POOR_SCORE = 0.1
MAX_SCORE = 0.99
def safe_env(name: str, default: str = "") -> str:
"""Read a string environment variable without raising."""
try:
value = os.getenv(name)
return default if value is None else str(value)
except Exception:
return default
def clamp_score(value: Any) -> float:
"""Clamp numeric scores to the required open interval (0, 1)."""
try:
numeric = float(value)
except Exception:
return MIN_SCORE
if numeric != numeric or numeric in (float("inf"), float("-inf")):
return MIN_SCORE
numeric = max(MIN_SCORE, min(MAX_SCORE, numeric))
assert 0 < numeric < 1, f"Invalid score: {numeric}"
return numeric
def safe_float(value: Any, default: float = POOR_SCORE) -> float:
"""Convert a value to float without raising."""
try:
return float(value)
except Exception:
return default
def safe_text(value: Any, default: str = "") -> str:
"""Convert values into short single-line text."""
try:
text = str(value)
except Exception:
return default
text = " ".join(text.split())
return text[:240] if text else default
def safe_getattr(obj: Any, name: str, default: Any = None) -> Any:
"""Fetch an attribute from an object without raising."""
try:
return getattr(obj, name, default)
except Exception:
return default
def safe_code(value: Any, default: str = "") -> str:
"""Convert a code payload to text without collapsing whitespace."""
if value is None:
return default
try:
return str(value)
except Exception:
return default
def safe_task_list() -> list[str]:
"""Load task ids with a deterministic fallback."""
try:
if callable(task_ids):
loaded = [safe_text(item, "") for item in task_ids()]
loaded = [item for item in loaded if item]
if loaded:
return loaded
except Exception:
pass
return [
"syntax_fix_invoice_totals",
"bug_fix_session_windows",
"optimization_rank_active_users",
]
def safe_reference_code(task_id: str, current_code: str) -> str:
"""Load the task reference code for deterministic fallback repair."""
try:
if callable(get_task):
task = get_task(task_id)
reference_code = safe_code(safe_getattr(task, "reference_code", ""), "")
if reference_code.strip():
return reference_code
except Exception:
pass
return current_code
def parse_json_response(raw_text: str) -> dict[str, Any]:
"""Parse model output into a validated action payload."""
try:
text = raw_text or ""
start = text.find("{")
end = text.rfind("}") + 1
if start >= 0 and end > start:
payload = json.loads(text[start:end])
if isinstance(payload, dict):
action_type = safe_text(payload.get("action_type", "analyze_code"), "analyze_code")
code = payload.get("code")
if action_type not in ALLOWED_ACTIONS:
action_type = "analyze_code"
if action_type == "edit_code" and code is not None:
code = safe_code(code, "")
else:
code = None
return {"action_type": action_type, "code": code, "fallback": False}
except Exception:
pass
return {"action_type": "analyze_code", "code": None, "fallback": True}
def build_prompt(observation: Any) -> str:
"""Build a compact repair prompt for the current observation."""
try:
task_description = safe_text(safe_getattr(observation, "task_description", ""), "No task description.")
errors = safe_text(safe_getattr(observation, "errors", ""), "none")
tests = safe_text(safe_getattr(observation, "test_results", ""), "not available")
score = clamp_score(safe_getattr(observation, "score", POOR_SCORE))
current_code = safe_code(safe_getattr(observation, "current_code", ""), "")
visible_tests = safe_getattr(observation, "visible_tests", [])
if not isinstance(visible_tests, Iterable) or isinstance(visible_tests, (str, bytes)):
visible_tests = []
visible_block = "\n".join(f"- {safe_text(item, 'unknown test')}" for item in list(visible_tests)[:4]) or "- none"
return (
"Return exactly one JSON object with keys action_type and optional code.\n"
"Allowed action_type values: analyze_code, edit_code, run_tests, submit_solution.\n"
"Prefer one safe next action only.\n"
f"Task: {task_description}\n"
f"Score: {score:.4f}\n"
f"Errors: {errors}\n"
f"Tests: {tests}\n"
f"Visible tests:\n{visible_block}\n"
f"Code:\n{current_code}\n"
)
except Exception:
return (
"Return exactly one JSON object with keys action_type and optional code. "
"Use analyze_code if unsure."
)
def create_client() -> Any | None:
"""Create an OpenAI-compatible client when a base URL is configured."""
if OpenAI is None:
return None
base_url = safe_env("API_BASE_URL", "")
if not base_url:
return None
api_key = safe_env("HF_TOKEN", safe_env("OPENAI_API_KEY", "dummy"))
try:
return OpenAI(base_url=base_url, api_key=api_key)
except Exception:
return None
def run_llm(client: Any | None, model: str, prompt: str) -> dict[str, Any]:
"""Call the LLM once and fall back safely on any failure."""
if client is None:
return {"action_type": "analyze_code", "code": None, "fallback": True}
for attempt in range(API_RETRIES + 1):
try:
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
response = client.with_options(timeout=API_TIMEOUT_SECONDS).chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=0,
max_tokens=300,
)
message = safe_getattr(response.choices[0].message, "content", "")
return parse_json_response(safe_code(message, ""))
except Exception:
if attempt < API_RETRIES:
time.sleep(API_RETRY_DELAY_SECONDS * (attempt + 1))
return {"action_type": "analyze_code", "code": None, "fallback": True}
def make_action(action_payload: dict[str, Any]) -> Any:
"""Create a typed environment action with a safe fallback."""
action_type = safe_text(action_payload.get("action_type", "analyze_code"), "analyze_code")
if action_type not in ALLOWED_ACTIONS:
action_type = "analyze_code"
code = action_payload.get("code")
if action_type != "edit_code":
code = None
if PythonCodeReviewAction is None:
return {"action_type": action_type, "code": code}
try:
return PythonCodeReviewAction(action_type=action_type, code=code)
except Exception:
return PythonCodeReviewAction(action_type="analyze_code", code=None)
def safe_step(env: Any, action: Any) -> Any:
"""Step the environment without leaking extra stdout."""
try:
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
return env.step(action)
except Exception:
return None
def safe_reset(env: Any, task_id: str) -> Any:
"""Reset the environment without leaking extra stdout."""
try:
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
return env.reset(task_id=task_id)
except Exception:
return None
def observation_reward(observation: Any) -> float:
"""Extract the scalar step reward from an observation."""
reward = safe_getattr(observation, "reward", None)
if reward is not None:
return clamp_score(safe_float(reward, POOR_SCORE))
reward_details = safe_getattr(observation, "reward_details", None)
reward_value = safe_getattr(reward_details, "value", POOR_SCORE)
return clamp_score(safe_float(reward_value, POOR_SCORE))
def fallback_first_action(task_id: str) -> dict[str, Any]:
"""Choose a deterministic first action when the model is unavailable."""
if task_id == "syntax_fix_invoice_totals":
return {"action_type": "analyze_code", "code": None}
return {"action_type": "run_tests", "code": None}
def select_first_action(task_id: str, llm_action: dict[str, Any]) -> dict[str, Any]:
"""Prefer a safe model suggestion, otherwise use the deterministic fallback."""
action_type = safe_text(llm_action.get("action_type", ""), "")
code = llm_action.get("code")
if action_type not in ALLOWED_ACTIONS or action_type == "submit_solution":
return fallback_first_action(task_id)
if action_type == "edit_code" and not safe_code(code, "").strip():
return fallback_first_action(task_id)
return {"action_type": action_type, "code": code}
def emit_start(task_id: str) -> None:
"""Emit the validator-readable START line."""
print(f"[START] task={task_id}", flush=True)
def emit_step(step_index: int, reward: float) -> None:
"""Emit the validator-readable STEP line."""
print(f"[STEP] step={step_index} reward={reward:.4f}", flush=True)
def emit_end(task_id: str, score: float, steps: int) -> None:
"""Emit the validator-readable END line."""
print(f"[END] task={task_id} score={clamp_score(score):.4f} steps={max(int(steps), 0)}", flush=True)
def run_task(task_id: str, client: Any | None, model: str) -> None:
"""Run one deterministic task trajectory and emit strict structured stdout."""
emit_start(task_id)
if PythonCodeReviewEnvironment is None:
emit_step(1, POOR_SCORE)
emit_end(task_id, POOR_SCORE, 1)
return
try:
with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()):
env = PythonCodeReviewEnvironment(verbose=False)
except Exception:
emit_step(1, POOR_SCORE)
emit_end(task_id, POOR_SCORE, 1)
return
observation = safe_reset(env, task_id)
if observation is None:
emit_step(1, POOR_SCORE)
emit_end(task_id, POOR_SCORE, 1)
return
step_count = 0
llm_action = run_llm(client, model, build_prompt(observation))
reference_code = safe_reference_code(task_id, safe_code(safe_getattr(observation, "current_code", ""), ""))
planned_actions = [
select_first_action(task_id, llm_action),
{"action_type": "edit_code", "code": reference_code},
{"action_type": "submit_solution", "code": None},
]
final_observation = observation
for action_payload in planned_actions:
if step_count > 0 and bool(safe_getattr(final_observation, "done", False)):
break
if action_payload["action_type"] == "edit_code":
current_code = safe_code(safe_getattr(final_observation, "current_code", ""), "")
if not safe_code(action_payload.get("code"), "").strip():
continue
if current_code.strip() == safe_code(action_payload.get("code"), "").strip():
continue
next_observation = safe_step(env, make_action(action_payload))
step_count += 1
if next_observation is None:
emit_step(step_count, POOR_SCORE)
emit_end(task_id, clamp_score(safe_getattr(final_observation, "score", POOR_SCORE)), step_count)
return
final_observation = next_observation
emit_step(step_count, observation_reward(final_observation))
emit_end(task_id, clamp_score(safe_getattr(final_observation, "score", POOR_SCORE)), step_count)
def main() -> int:
"""Run every benchmark task and emit strict structured stdout."""
model_name = safe_env("MODEL_NAME", DEFAULT_MODEL_NAME) or DEFAULT_MODEL_NAME
client = create_client()
for task_id in safe_task_list():
try:
run_task(task_id, client, model_name)
except Exception:
emit_start(task_id)
emit_step(1, POOR_SCORE)
emit_end(task_id, POOR_SCORE, 1)
return 0
if __name__ == "__main__":
sys.exit(main())
|