"""Strict-output inference runtime for OpenEnv validators.""" from __future__ import annotations from typing import Any from compat import install_openenv_fastmcp_compat from app.agents.review_agent import ReviewAgent from app.models.inference import AgentDecision, InferenceConfig from app.services.openai_service import OpenAIActionPlanner from app.utils.runtime import ( compact_text, format_bool, format_error, format_reward, observation_attr, parse_task_ids, suppress_output, ) install_openenv_fastmcp_compat() try: from models import PythonCodeReviewAction from server.env import PythonCodeReviewEnvironment except ImportError: # pragma: no cover from python_env.models import PythonCodeReviewAction # type: ignore[no-redef] from python_env.server.env import PythonCodeReviewEnvironment # type: ignore[no-redef] class InferenceRunner: """Run benchmark tasks with strict single-line progress output.""" def __init__(self, config: InferenceConfig) -> None: self.config = config self.agent = ReviewAgent(OpenAIActionPlanner(config)) def run(self) -> int: for task_name in parse_task_ids(): self.run_task(task_name) return 0 def run_task(self, task_name: str) -> None: rewards: list[str] = [] step_count = 0 success = False fatal_error: str | None = None final_score = 0.0 self._emit_start(task_name) try: env = self._create_env() observation = self._reset_env(env, task_name) done = bool(observation_attr(observation, "done", False)) final_score = float(observation_attr(observation, "score", 0.0) or 0.0) max_steps = max( 1, min( self.config.max_episode_steps, int(observation_attr(observation, "attempts_remaining", self.config.max_episode_steps) or self.config.max_episode_steps), ), ) while not done and step_count < max_steps: decision = self.agent.act(observation) observation, reward, done, info = self._step_env(env, decision) step_count += 1 final_score = float(observation_attr(observation, "score", final_score) or final_score) rewards.append(format_reward(reward)) step_error = self._resolve_step_error(info, observation, decision) self._emit_step(step_count, decision.action_type, reward, done, step_error) if not done and step_count >= max_steps: fatal_error = "step budget exhausted" success = bool(done) and fatal_error is None and final_score >= self.config.success_threshold except Exception as exc: fatal_error = compact_text(f"{type(exc).__name__}: {exc}", default="runtime failure") finally: self._emit_end(success=success, step_count=step_count, rewards=rewards) def _create_env(self) -> PythonCodeReviewEnvironment: with suppress_output(): return PythonCodeReviewEnvironment(verbose=False) def _reset_env(self, env: PythonCodeReviewEnvironment, task_name: str) -> Any: with suppress_output(): return env.reset(task_id=task_name) def _step_env( self, env: PythonCodeReviewEnvironment, decision: AgentDecision, ) -> tuple[Any, float, bool, dict[str, Any]]: action = PythonCodeReviewAction(action_type=decision.action_type, code=decision.code) with suppress_output(): observation, reward, done, info = env.step_result(action) return observation, float(reward), bool(done), dict(info or {}) def _resolve_step_error( self, info: dict[str, Any], observation: Any, decision: AgentDecision, ) -> str | None: env_error = compact_text( info.get("last_action_error") or observation_attr(observation, "last_action_error", None), default="", ) if env_error: return env_error if decision.error: return compact_text(decision.error, default="") return None def _emit_start(self, task_name: str) -> None: print( f"[START] task={task_name} env={self.config.benchmark_name} model={self.config.model_name}", flush=True, ) def _emit_step(self, step_count: int, action: str, reward: float, done: bool, error: str | None) -> None: print( f"[STEP] step={step_count} action={compact_text(action, default='analyze_code')} " f"reward={format_reward(reward)} done={format_bool(done)} error={format_error(error)}", flush=True, ) def _emit_end(self, *, success: bool, step_count: int, rewards: list[str]) -> None: print( f"[END] success={format_bool(success)} steps={step_count} rewards={','.join(rewards)}", flush=True, ) def main() -> int: """Entrypoint used by the root-level inference wrapper.""" return InferenceRunner(InferenceConfig.from_env()).run()