Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """Fail-safe inference entrypoint for the Python code review environment.""" | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| import time | |
| from collections.abc import Iterable | |
| from contextlib import redirect_stderr, redirect_stdout | |
| from typing import Any, Dict, Optional | |
| from compat import install_openenv_fastmcp_compat | |
| try: | |
| from openai import OpenAI | |
| except Exception: | |
| OpenAI = None # type: ignore[assignment] | |
| install_openenv_fastmcp_compat() | |
| try: | |
| from server.env import PythonCodeReviewEnvironment | |
| except Exception: | |
| PythonCodeReviewEnvironment = None # type: ignore[assignment] | |
| try: | |
| from models import PythonCodeReviewAction | |
| except Exception: | |
| PythonCodeReviewAction = None # type: ignore[assignment] | |
| try: | |
| from tasks import task_ids | |
| except Exception: | |
| task_ids = None # type: ignore[assignment] | |
| ALLOWED_ACTIONS = { | |
| "analyze_code", | |
| "edit_code", | |
| "run_tests", | |
| "submit_solution", | |
| } | |
| DEFAULT_MODEL_NAME = "mock-model" | |
| DEFAULT_ACTION = {"action_type": "analyze_code", "code": None, "fallback_reason": "mock_response"} | |
| API_TIMEOUT_SECONDS = 3.0 | |
| API_RETRIES = 1 | |
| API_RETRY_DELAY_SECONDS = 0.2 | |
| MAX_STEPS = 2 | |
| def safe_env(name: str, default: str = "") -> str: | |
| """Read an allowed environment variable and return a safe string default.""" | |
| try: | |
| value = os.getenv(name) | |
| if value is None: | |
| return default | |
| return str(value) | |
| except Exception: | |
| return default | |
| def clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: | |
| """Clamp a numeric value to a bounded range.""" | |
| try: | |
| return max(low, min(high, float(value))) | |
| except Exception: | |
| return low | |
| def safe_float(value: Any, default: float = 0.0) -> float: | |
| """Convert a value to float without raising.""" | |
| try: | |
| return float(value) | |
| except Exception: | |
| return default | |
| def safe_text(value: Any, default: str = "") -> str: | |
| """Convert any value into a bounded, printable string.""" | |
| try: | |
| text = str(value) | |
| except Exception: | |
| return default | |
| text = " ".join(text.split()) | |
| return text[:160] if text else default | |
| def safe_getattr(obj: Any, name: str, default: Any = None) -> Any: | |
| """Fetch an attribute from an object without raising.""" | |
| try: | |
| return getattr(obj, name, default) | |
| except Exception: | |
| return default | |
| def parse_json_response(raw_text: str) -> Dict[str, Any]: | |
| """Parse model output into a safe action payload with deterministic fallback.""" | |
| try: | |
| text = raw_text or "" | |
| start = text.find("{") | |
| end = text.rfind("}") + 1 | |
| if start >= 0 and end > start: | |
| payload = json.loads(text[start:end]) | |
| if isinstance(payload, dict): | |
| action_type = payload.get("action_type", DEFAULT_ACTION["action_type"]) | |
| code = payload.get("code") | |
| if action_type not in ALLOWED_ACTIONS: | |
| action_type = DEFAULT_ACTION["action_type"] | |
| if action_type != "edit_code": | |
| code = None | |
| return { | |
| "action_type": action_type, | |
| "code": code, | |
| "fallback_reason": "", | |
| } | |
| except Exception: | |
| pass | |
| return dict(DEFAULT_ACTION) | |
| def build_prompt(observation: Any) -> str: | |
| """Build a short prompt from the current observation with safe defaults.""" | |
| try: | |
| task_description = safe_text(safe_getattr(observation, "task_description", ""), "No task description.") | |
| current_code = safe_text(safe_getattr(observation, "current_code", ""), "") | |
| errors = safe_text(safe_getattr(observation, "errors", ""), "") | |
| tests = safe_text(safe_getattr(observation, "test_results", ""), "") | |
| score = clamp(safe_getattr(observation, "score", 0.0)) | |
| visible_tests = safe_getattr(observation, "visible_tests", []) | |
| if not isinstance(visible_tests, Iterable) or isinstance(visible_tests, (str, bytes)): | |
| visible_tests = [] | |
| visible_lines = [] | |
| for item in list(visible_tests)[:4]: | |
| visible_lines.append(f"- {safe_text(item, 'unknown test')}") | |
| visible_block = "\n".join(visible_lines) if visible_lines else "- none" | |
| return ( | |
| "Return exactly one JSON object with keys action_type and optional code.\n" | |
| "Allowed action_type values: analyze_code, edit_code, run_tests, submit_solution.\n" | |
| f"Task: {task_description}\n" | |
| f"Score: {score:.3f}\n" | |
| f"Errors: {errors or 'none'}\n" | |
| f"Tests: {tests or 'not available'}\n" | |
| f"Visible tests:\n{visible_block}\n" | |
| f"Code:\n{current_code}\n" | |
| ) | |
| except Exception: | |
| return ( | |
| "Return exactly one JSON object with keys action_type and optional code. " | |
| "Use action_type analyze_code." | |
| ) | |
| def create_client() -> Optional[Any]: | |
| """Create an OpenAI-compatible client using only the allowed environment variables.""" | |
| if OpenAI is None: | |
| return None | |
| base_url = safe_env("API_BASE_URL", "") | |
| if not base_url: | |
| return None | |
| try: | |
| if safe_env("HF_TOKEN", ""): | |
| os.environ["OPENAI_API_KEY"] = safe_env("HF_TOKEN", "") | |
| except Exception: | |
| pass | |
| try: | |
| client = OpenAI(base_url=os.getenv("API_BASE_URL")) | |
| return client | |
| except Exception: | |
| return None | |
| def run_llm(client: Optional[Any], model: str, prompt: str) -> Dict[str, Any]: | |
| """Call the LLM with timeout and retry, then fall back to a mock action.""" | |
| if client is None: | |
| fallback = dict(DEFAULT_ACTION) | |
| fallback["fallback_reason"] = "client_unavailable" | |
| return fallback | |
| last_reason = "llm_unavailable" | |
| for attempt in range(API_RETRIES + 1): | |
| try: | |
| with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): | |
| response = client.with_options(timeout=API_TIMEOUT_SECONDS).chat.completions.create( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}], | |
| temperature=0, | |
| max_tokens=300, | |
| ) | |
| message = safe_getattr(response.choices[0].message, "content", "") | |
| parsed = parse_json_response(message) | |
| if parsed.get("fallback_reason"): | |
| parsed["fallback_reason"] = "parse_failed" | |
| return parsed | |
| except Exception as exc: | |
| last_reason = safe_text(exc, "llm_error").lower().replace(" ", "_") | |
| if attempt < API_RETRIES: | |
| try: | |
| time.sleep(API_RETRY_DELAY_SECONDS * (attempt + 1)) | |
| except Exception: | |
| pass | |
| fallback = dict(DEFAULT_ACTION) | |
| fallback["fallback_reason"] = last_reason[:48] or "llm_retry_exhausted" | |
| return fallback | |
| def probe_docker(image_name: str) -> Dict[str, Any]: | |
| """Safely validate Docker connectivity when a local image name is provided.""" | |
| if not image_name: | |
| return {"checked": False, "available": False, "reason": "docker_skip"} | |
| try: | |
| with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): | |
| result = subprocess.run( | |
| ["docker", "image", "inspect", image_name], | |
| capture_output=True, | |
| text=True, | |
| timeout=3, | |
| check=False, | |
| ) | |
| if result.returncode == 0: | |
| return {"checked": True, "available": True, "reason": "docker_ok"} | |
| return {"checked": True, "available": False, "reason": "docker_unreachable"} | |
| except Exception as exc: | |
| return {"checked": True, "available": False, "reason": safe_text(exc, "docker_error").lower().replace(" ", "_")} | |
| def fallback_step_result(reason: str, docker_status: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """Return a deterministic dummy step result when environment execution fails.""" | |
| docker_reason = safe_text((docker_status or {}).get("reason", "docker_skip"), "docker_skip") | |
| short_reason = safe_text(reason, "env_fallback").lower().replace(" ", "_") | |
| return { | |
| "status": "ok", | |
| "fallback": True, | |
| "reason": short_reason[:64], | |
| "reward": 0.0, | |
| "improvement": 0.0, | |
| "score": 0.0, | |
| "done": True, | |
| "docker": docker_reason[:32], | |
| } | |
| def safe_task_list() -> list[str]: | |
| """Load task identifiers without raising.""" | |
| try: | |
| if callable(task_ids): | |
| loaded = list(task_ids()) | |
| if loaded: | |
| return [safe_text(item, "fallback-task") for item in loaded] | |
| except Exception: | |
| pass | |
| return ["fallback-task"] | |
| def make_action(action_payload: Dict[str, Any]) -> Any: | |
| """Build a validated environment action or a safe placeholder.""" | |
| action_type = action_payload.get("action_type", DEFAULT_ACTION["action_type"]) | |
| if action_type not in ALLOWED_ACTIONS: | |
| action_type = DEFAULT_ACTION["action_type"] | |
| code = action_payload.get("code") | |
| if action_type != "edit_code": | |
| code = None | |
| if PythonCodeReviewAction is None: | |
| return {"action_type": action_type, "code": code} | |
| try: | |
| return PythonCodeReviewAction(action_type=action_type, code=code) | |
| except Exception: | |
| try: | |
| return PythonCodeReviewAction(action_type=DEFAULT_ACTION["action_type"], code=None) | |
| except Exception: | |
| return {"action_type": DEFAULT_ACTION["action_type"], "code": None} | |
| def compute_reward( | |
| previous_score: float, | |
| current_score: float, | |
| step_reward: float, | |
| used_fallback: bool, | |
| done: bool, | |
| ) -> Dict[str, float]: | |
| """Compute a deterministic dynamic reward and improvement metric.""" | |
| prev_value = clamp(previous_score) | |
| curr_value = clamp(current_score) | |
| improvement = round(curr_value - prev_value, 4) | |
| bounded_step_reward = max(-1.0, min(1.0, safe_float(step_reward, 0.0))) | |
| reward_value = ( | |
| 0.55 * curr_value | |
| + 0.30 * max(improvement, 0.0) | |
| + 0.10 * max(bounded_step_reward, 0.0) | |
| + (0.05 if done and curr_value >= 0.99 else 0.0) | |
| - (0.05 if used_fallback else 0.0) | |
| ) | |
| return { | |
| "reward": round(clamp(reward_value), 4), | |
| "improvement": improvement, | |
| } | |
| def safe_step(env: Any, action: Any) -> Any: | |
| """Execute one environment step without allowing stdout leaks or exceptions.""" | |
| try: | |
| with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): | |
| return env.step(action) | |
| except Exception: | |
| return None | |
| def safe_reset(env: Any, task_id: str) -> Any: | |
| """Reset the environment safely for a task.""" | |
| try: | |
| with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): | |
| return env.reset(task_id=task_id) | |
| except Exception: | |
| return None | |
| def run_env(client: Optional[Any], model: str) -> Dict[str, Any]: | |
| """Run the environment loop safely and return a structured result payload.""" | |
| docker_status = probe_docker(safe_env("LOCAL_IMAGE_NAME", "")) | |
| if PythonCodeReviewEnvironment is None: | |
| return fallback_step_result("env_import_failed", docker_status) | |
| try: | |
| with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): | |
| env = PythonCodeReviewEnvironment(verbose=False) | |
| except Exception as exc: | |
| return fallback_step_result(f"env_init_failed_{safe_text(exc, 'unknown')}", docker_status) | |
| tasks = safe_task_list() | |
| task_id = tasks[0] if tasks else "fallback-task" | |
| observation = safe_reset(env, task_id) | |
| if observation is None: | |
| return fallback_step_result("env_reset_failed", docker_status) | |
| previous_score = clamp(safe_getattr(observation, "score", 0.0)) | |
| total_step_reward = 0.0 | |
| used_fallback = False | |
| final_status = "ok" | |
| final_reason = "completed" | |
| final_observation = observation | |
| for step_index in range(MAX_STEPS): | |
| prompt = build_prompt(final_observation) | |
| action_payload = run_llm(client, model, prompt) | |
| used_fallback = used_fallback or bool(action_payload.get("fallback_reason")) | |
| action = make_action(action_payload) | |
| next_observation = safe_step(env, action) | |
| if next_observation is None: | |
| final_status = "ok" | |
| final_reason = "env_step_fallback" | |
| used_fallback = True | |
| break | |
| final_observation = next_observation | |
| total_step_reward += safe_float(safe_getattr(final_observation, "reward", 0.0), 0.0) | |
| done = bool(safe_getattr(final_observation, "done", False)) | |
| score = clamp(safe_getattr(final_observation, "score", 0.0)) | |
| if safe_getattr(final_observation, "last_action_status", ""): | |
| final_reason = safe_text(safe_getattr(final_observation, "last_action_status", ""), "step_completed") | |
| elif action_payload.get("fallback_reason"): | |
| final_reason = safe_text(action_payload.get("fallback_reason"), "llm_fallback") | |
| else: | |
| final_reason = f"step_{step_index + 1}_completed" | |
| if done: | |
| break | |
| if step_index == 0: | |
| submit_action = make_action({"action_type": "submit_solution", "code": None}) | |
| submitted_observation = safe_step(env, submit_action) | |
| if submitted_observation is None: | |
| final_reason = "submit_fallback" | |
| used_fallback = True | |
| break | |
| final_observation = submitted_observation | |
| total_step_reward += safe_float(safe_getattr(final_observation, "reward", 0.0), 0.0) | |
| if safe_getattr(final_observation, "last_action_status", ""): | |
| final_reason = safe_text(safe_getattr(final_observation, "last_action_status", ""), "submit_completed") | |
| break | |
| current_score = clamp(safe_getattr(final_observation, "score", previous_score)) | |
| done = bool(safe_getattr(final_observation, "done", True)) | |
| metrics = compute_reward( | |
| previous_score=previous_score, | |
| current_score=current_score, | |
| step_reward=total_step_reward, | |
| used_fallback=used_fallback, | |
| done=done, | |
| ) | |
| return { | |
| "status": final_status, | |
| "fallback": used_fallback, | |
| "reason": safe_text(final_reason, "completed").lower().replace(" ", "_")[:64], | |
| "reward": metrics["reward"], | |
| "improvement": metrics["improvement"], | |
| "score": round(current_score, 4), | |
| "done": done, | |
| "docker": safe_text(docker_status.get("reason", "docker_skip"), "docker_skip")[:32], | |
| } | |
| def format_step_message(result: Dict[str, Any]) -> str: | |
| """Format the only allowed STEP line for stdout.""" | |
| try: | |
| fallback = bool(result.get("fallback", False)) | |
| reason = safe_text(result.get("reason", "completed"), "completed").lower().replace(" ", "_") | |
| if fallback: | |
| reward = safe_float(result.get("reward", 0.0), 0.0) | |
| improvement = safe_float(result.get("improvement", 0.0), 0.0) | |
| score = safe_float(result.get("score", 0.0), 0.0) | |
| status = safe_text(result.get("status", "ok"), "ok").lower().replace(" ", "_") | |
| return ( | |
| f"error handled: {reason} reward={reward:.4f} status={status} " | |
| f"fallback=true improvement={improvement:.4f} score={score:.4f}" | |
| ) | |
| reward = safe_float(result.get("reward", 0.0), 0.0) | |
| improvement = safe_float(result.get("improvement", 0.0), 0.0) | |
| score = safe_float(result.get("score", 0.0), 0.0) | |
| status = safe_text(result.get("status", "ok"), "ok").lower().replace(" ", "_") | |
| return ( | |
| f"reward={reward:.4f} status={status} " | |
| f"fallback=false improvement={improvement:.4f} score={score:.4f}" | |
| ) | |
| except Exception: | |
| return "error handled: formatting_failed" | |
| def main() -> int: | |
| """Run the inference workflow and always terminate successfully.""" | |
| step_message = "error handled: initialization_failed" | |
| try: | |
| model_name = safe_env("MODEL_NAME", DEFAULT_MODEL_NAME) or DEFAULT_MODEL_NAME | |
| client = create_client() | |
| result = run_env(client, model_name) | |
| step_message = format_step_message(result) | |
| except BaseException as exc: | |
| step_message = f"error handled: {safe_text(exc, 'unexpected_failure').lower().replace(' ', '_')[:64]}" | |
| finally: | |
| try: | |
| print("START") | |
| print(f"STEP: {step_message}") | |
| print("END") | |
| except Exception: | |
| pass | |
| return 0 | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except BaseException: | |
| try: | |
| print("START") | |
| print("STEP: error handled: fatal_guard") | |
| print("END") | |
| except Exception: | |
| pass | |
| sys.exit(0) | |