final-python-env / app /env /runner.py
uvpatel7271's picture
Upload folder using huggingface_hub
989722c verified
"""Strict-output inference runtime for OpenEnv validators."""
from __future__ import annotations
from typing import Any
from compat import install_openenv_fastmcp_compat
from app.agents.review_agent import ReviewAgent
from app.models.inference import AgentDecision, InferenceConfig
from app.services.openai_service import OpenAIActionPlanner
from app.utils.runtime import (
compact_text,
format_bool,
format_error,
format_reward,
observation_attr,
parse_task_ids,
suppress_output,
)
install_openenv_fastmcp_compat()
try:
from models import PythonCodeReviewAction
from server.env import PythonCodeReviewEnvironment
except ImportError: # pragma: no cover
from python_env.models import PythonCodeReviewAction # type: ignore[no-redef]
from python_env.server.env import PythonCodeReviewEnvironment # type: ignore[no-redef]
class InferenceRunner:
"""Run benchmark tasks with strict single-line progress output."""
def __init__(self, config: InferenceConfig) -> None:
self.config = config
self.agent = ReviewAgent(OpenAIActionPlanner(config))
def run(self) -> int:
for task_name in parse_task_ids():
self.run_task(task_name)
return 0
def run_task(self, task_name: str) -> None:
rewards: list[str] = []
step_count = 0
success = False
fatal_error: str | None = None
final_score = 0.0
self._emit_start(task_name)
try:
env = self._create_env()
observation = self._reset_env(env, task_name)
done = bool(observation_attr(observation, "done", False))
final_score = float(observation_attr(observation, "score", 0.0) or 0.0)
max_steps = max(
1,
min(
self.config.max_episode_steps,
int(observation_attr(observation, "attempts_remaining", self.config.max_episode_steps) or self.config.max_episode_steps),
),
)
while not done and step_count < max_steps:
decision = self.agent.act(observation)
observation, reward, done, info = self._step_env(env, decision)
step_count += 1
final_score = float(observation_attr(observation, "score", final_score) or final_score)
rewards.append(format_reward(reward))
step_error = self._resolve_step_error(info, observation, decision)
self._emit_step(step_count, decision.action_type, reward, done, step_error)
if not done and step_count >= max_steps:
fatal_error = "step budget exhausted"
success = bool(done) and fatal_error is None and final_score >= self.config.success_threshold
except Exception as exc:
fatal_error = compact_text(f"{type(exc).__name__}: {exc}", default="runtime failure")
finally:
self._emit_end(success=success, step_count=step_count, rewards=rewards)
def _create_env(self) -> PythonCodeReviewEnvironment:
with suppress_output():
return PythonCodeReviewEnvironment(verbose=False)
def _reset_env(self, env: PythonCodeReviewEnvironment, task_name: str) -> Any:
with suppress_output():
return env.reset(task_id=task_name)
def _step_env(
self,
env: PythonCodeReviewEnvironment,
decision: AgentDecision,
) -> tuple[Any, float, bool, dict[str, Any]]:
action = PythonCodeReviewAction(action_type=decision.action_type, code=decision.code)
with suppress_output():
observation, reward, done, info = env.step_result(action)
return observation, float(reward), bool(done), dict(info or {})
def _resolve_step_error(
self,
info: dict[str, Any],
observation: Any,
decision: AgentDecision,
) -> str | None:
env_error = compact_text(
info.get("last_action_error") or observation_attr(observation, "last_action_error", None),
default="",
)
if env_error:
return env_error
if decision.error:
return compact_text(decision.error, default="")
return None
def _emit_start(self, task_name: str) -> None:
print(
f"[START] task={task_name} env={self.config.benchmark_name} model={self.config.model_name}",
flush=True,
)
def _emit_step(self, step_count: int, action: str, reward: float, done: bool, error: str | None) -> None:
print(
f"[STEP] step={step_count} action={compact_text(action, default='analyze_code')} "
f"reward={format_reward(reward)} done={format_bool(done)} error={format_error(error)}",
flush=True,
)
def _emit_end(self, *, success: bool, step_count: int, rewards: list[str]) -> None:
print(
f"[END] success={format_bool(success)} steps={step_count} rewards={','.join(rewards)}",
flush=True,
)
def main() -> int:
"""Entrypoint used by the root-level inference wrapper."""
return InferenceRunner(InferenceConfig.from_env()).run()