Spaces:
Running
Running
| """Strict-output inference runtime for OpenEnv validators.""" | |
| from __future__ import annotations | |
| from typing import Any | |
| from compat import install_openenv_fastmcp_compat | |
| from app.agents.review_agent import ReviewAgent | |
| from app.models.inference import AgentDecision, InferenceConfig | |
| from app.services.openai_service import OpenAIActionPlanner | |
| from app.utils.runtime import ( | |
| compact_text, | |
| format_bool, | |
| format_error, | |
| format_reward, | |
| observation_attr, | |
| parse_task_ids, | |
| suppress_output, | |
| ) | |
| install_openenv_fastmcp_compat() | |
| try: | |
| from models import PythonCodeReviewAction | |
| from server.env import PythonCodeReviewEnvironment | |
| except ImportError: # pragma: no cover | |
| from python_env.models import PythonCodeReviewAction # type: ignore[no-redef] | |
| from python_env.server.env import PythonCodeReviewEnvironment # type: ignore[no-redef] | |
| class InferenceRunner: | |
| """Run benchmark tasks with strict single-line progress output.""" | |
| def __init__(self, config: InferenceConfig) -> None: | |
| self.config = config | |
| self.agent = ReviewAgent(OpenAIActionPlanner(config)) | |
| def run(self) -> int: | |
| for task_name in parse_task_ids(): | |
| self.run_task(task_name) | |
| return 0 | |
| def run_task(self, task_name: str) -> None: | |
| rewards: list[str] = [] | |
| step_count = 0 | |
| success = False | |
| fatal_error: str | None = None | |
| final_score = 0.0 | |
| self._emit_start(task_name) | |
| try: | |
| env = self._create_env() | |
| observation = self._reset_env(env, task_name) | |
| done = bool(observation_attr(observation, "done", False)) | |
| final_score = float(observation_attr(observation, "score", 0.0) or 0.0) | |
| max_steps = max( | |
| 1, | |
| min( | |
| self.config.max_episode_steps, | |
| int(observation_attr(observation, "attempts_remaining", self.config.max_episode_steps) or self.config.max_episode_steps), | |
| ), | |
| ) | |
| while not done and step_count < max_steps: | |
| decision = self.agent.act(observation) | |
| observation, reward, done, info = self._step_env(env, decision) | |
| step_count += 1 | |
| final_score = float(observation_attr(observation, "score", final_score) or final_score) | |
| rewards.append(format_reward(reward)) | |
| step_error = self._resolve_step_error(info, observation, decision) | |
| self._emit_step(step_count, decision.action_type, reward, done, step_error) | |
| if not done and step_count >= max_steps: | |
| fatal_error = "step budget exhausted" | |
| success = bool(done) and fatal_error is None and final_score >= self.config.success_threshold | |
| except Exception as exc: | |
| fatal_error = compact_text(f"{type(exc).__name__}: {exc}", default="runtime failure") | |
| finally: | |
| self._emit_end(success=success, step_count=step_count, rewards=rewards) | |
| def _create_env(self) -> PythonCodeReviewEnvironment: | |
| with suppress_output(): | |
| return PythonCodeReviewEnvironment(verbose=False) | |
| def _reset_env(self, env: PythonCodeReviewEnvironment, task_name: str) -> Any: | |
| with suppress_output(): | |
| return env.reset(task_id=task_name) | |
| def _step_env( | |
| self, | |
| env: PythonCodeReviewEnvironment, | |
| decision: AgentDecision, | |
| ) -> tuple[Any, float, bool, dict[str, Any]]: | |
| action = PythonCodeReviewAction(action_type=decision.action_type, code=decision.code) | |
| with suppress_output(): | |
| observation, reward, done, info = env.step_result(action) | |
| return observation, float(reward), bool(done), dict(info or {}) | |
| def _resolve_step_error( | |
| self, | |
| info: dict[str, Any], | |
| observation: Any, | |
| decision: AgentDecision, | |
| ) -> str | None: | |
| env_error = compact_text( | |
| info.get("last_action_error") or observation_attr(observation, "last_action_error", None), | |
| default="", | |
| ) | |
| if env_error: | |
| return env_error | |
| if decision.error: | |
| return compact_text(decision.error, default="") | |
| return None | |
| def _emit_start(self, task_name: str) -> None: | |
| print( | |
| f"[START] task={task_name} env={self.config.benchmark_name} model={self.config.model_name}", | |
| flush=True, | |
| ) | |
| def _emit_step(self, step_count: int, action: str, reward: float, done: bool, error: str | None) -> None: | |
| print( | |
| f"[STEP] step={step_count} action={compact_text(action, default='analyze_code')} " | |
| f"reward={format_reward(reward)} done={format_bool(done)} error={format_error(error)}", | |
| flush=True, | |
| ) | |
| def _emit_end(self, *, success: bool, step_count: int, rewards: list[str]) -> None: | |
| print( | |
| f"[END] success={format_bool(success)} steps={step_count} rewards={','.join(rewards)}", | |
| flush=True, | |
| ) | |
| def main() -> int: | |
| """Entrypoint used by the root-level inference wrapper.""" | |
| return InferenceRunner(InferenceConfig.from_env()).run() | |