"""Safe OpenEnv environment for deterministic Python code repair tasks.""" from __future__ import annotations from typing import Any, Optional from uuid import uuid4 try: from compat import Environment from graders import grade_task from models import ( HealthResponse, HistoryEntry, PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState, RewardDetails, TaskGrade, ) from tasks import TaskSpec, get_task as load_task, list_task_summaries, task_ids except Exception: from .compat import Environment from .graders import grade_task from .models import ( HealthResponse, HistoryEntry, PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState, RewardDetails, TaskGrade, ) from .tasks import TaskSpec, get_task as load_task, list_task_summaries, task_ids INVALID_ACTION_PENALTY = 0.10 NO_PROGRESS_PENALTY = 0.20 REPEATED_ACTION_PENALTY = 0.10 VALID_ACTIONS = {"analyze_code", "edit_code", "run_tests", "submit_solution"} def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: """Clamp a scalar to a bounded numeric interval.""" try: return max(low, min(high, float(value))) except Exception: return low def _safe_text(value: Any, default: str = "") -> str: """Convert values into short stable strings.""" try: text = str(value) except Exception: return default text = " ".join(text.split()) return text[:240] if text else default class PythonCodeReviewEnvironment( Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState] ): """Deterministic, bounded, evaluator-safe environment for code repair tasks.""" SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self, verbose: bool = False) -> None: super().__init__() self._verbose = bool(verbose) self._task_order = self._safe_task_order() self._task_cursor = -1 self._task: Optional[TaskSpec] = None self._state = PythonCodeReviewState(episode_id=str(uuid4())) self._done = False self._last_status = "Call reset() to start." self._last_reward = RewardDetails(value=0.0, reason="Environment initialized.") self.reward_history: list[float] = [] self.previous_score = 0.0 self.last_code = "" self._last_reward_components: dict[str, float] = {} self._metrics = self._blank_metrics() self._last_action_type = "" def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, task_id: Optional[str] = None, **_: object, ) -> PythonCodeReviewObservation: """Reset the environment for a deterministic task and return an observation.""" del seed try: self._reset_rubric() except Exception: pass task = self._select_task(task_id) self._task = task self._done = False self._metrics = self._blank_metrics() self.reward_history = [] self.previous_score = 0.0 self.last_code = "" self._last_reward_components = {} self._last_action_type = "" self._last_status = "Inspect the code, run checks, edit the code, then submit." self._last_reward = RewardDetails( value=0.0, reason="Episode reset.", prev_score=0.0, curr_score=0.0, ) self._state = PythonCodeReviewState( episode_id=episode_id or str(uuid4()), step_count=0, task_id=task.task_id, difficulty=task.difficulty, task_kind=task.task_kind, attempts_remaining=max(int(task.max_steps), 1), current_code=task.starter_code, errors="", test_results="No checks run yet.", history=[], score=0.0, done=False, ) return self._build_observation() def step( self, action: PythonCodeReviewAction, timeout_s: Optional[float] = None, **_: object, ) -> PythonCodeReviewObservation: """Execute one safe environment step and always return a valid observation.""" del timeout_s try: if self._task is None: return self.reset() if self._done: self._last_status = "Episode already completed. Call reset() to continue." self._last_reward = RewardDetails( value=-INVALID_ACTION_PENALTY, invalid_action_penalty=INVALID_ACTION_PENALTY, reason="Episode already completed.", prev_score=self._metrics["score"], curr_score=self._metrics["score"], code_changed=False, ) return self._build_observation() self._state.step_count += 1 action_type = _safe_text(getattr(action, "action_type", "analyze_code"), "analyze_code") code = getattr(action, "code", None) if action_type == "analyze_code": self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False) elif action_type == "run_tests": self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False) elif action_type == "edit_code": self._handle_edit(code) elif action_type == "submit_solution": self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=True) else: self._apply_invalid_action(f"Unsupported action_type '{action_type}'.") self._state.attempts_remaining = max(self._task.max_steps - self._state.step_count, 0) self._done = self._done or self._state.score >= 1.0 if self._state.attempts_remaining == 0 and not self._done: self._auto_submit() self._state.done = self._done return self._build_observation() except Exception as exc: self._apply_invalid_action(f"Step failure handled: {_safe_text(exc, 'unknown_error')}") self._state.done = self._done return self._build_observation() @property def state(self) -> PythonCodeReviewState: """Return a deep copy of the current environment state.""" try: return self._state.model_copy(deep=True) except Exception: return PythonCodeReviewState(episode_id=str(uuid4())) def list_task_summaries(self) -> list[object]: """Return public task summaries.""" try: return list_task_summaries() except Exception: return [] def get_task(self, task_id: str) -> object: """Return a single public task descriptor.""" return self._select_task(task_id).to_descriptor() def health(self) -> HealthResponse: """Return a simple health response.""" return HealthResponse(task_count=len(self._task_order)) def grade_task_submission(self, task_id: str, code: str) -> TaskGrade: """Grade a task submission outside an episode without raising.""" try: task = self._select_task(task_id) return self._safe_grade(task=task, candidate_code=code, include_hidden=True) except Exception as exc: return TaskGrade(score=0.0, details={"error": _safe_text(exc, "grading_failed")}) def run_tests(self, code: str, include_hidden: bool = False) -> tuple[float, dict[str, int], TaskGrade]: """Run deterministic grading and return score plus test summary.""" task = self._task or self._select_task(None) grade = self._safe_grade(task=task, candidate_code=code, include_hidden=include_hidden) return ( _clamp(grade.score), {"passed": int(grade.tests_passed), "total": int(grade.tests_total)}, grade, ) def apply_action(self, action: PythonCodeReviewAction) -> str: """Return the candidate code implied by the action.""" if getattr(action, "action_type", "") == "edit_code": code = getattr(action, "code", None) return str(code) if code is not None else self._state.current_code return self._state.current_code def compute_reward( self, old_code: str, new_code: str, prev_score: float, curr_score: float, test_results: dict[str, int], ) -> float: """Compute the requested bounded reward from score delta and action outcome.""" progress = curr_score - prev_score passed = int(test_results["passed"]) total = int(test_results["total"]) test_ratio = passed / total if total > 0 else 0.0 try: compile(new_code, "", "exec") syntax_score = 1.0 except Exception: syntax_score = 0.0 stagnation_penalty = NO_PROGRESS_PENALTY if new_code.strip() == old_code.strip() else 0.0 regression_penalty = max(0.0, prev_score - curr_score) repetition_penalty = REPEATED_ACTION_PENALTY if new_code == getattr(self, "last_code", "") else 0.0 length_penalty = 0.1 if len(new_code) > len(old_code) * 1.5 else 0.0 reward = ( 0.4 * progress + 0.3 * test_ratio + 0.2 * syntax_score - stagnation_penalty - regression_penalty - repetition_penalty - length_penalty ) reward = max(-1.0, min(1.0, reward)) self.reward_history.append(round(reward, 6)) if len(self.reward_history) >= 3 and len(set(self.reward_history[-3:])) == 1: import random reward = max(-1.0, min(1.0, reward + random.uniform(-0.05, 0.05))) self.reward_history[-1] = round(reward, 6) self._last_reward_components = { "syntax_reward": round(0.2 * syntax_score, 6), "test_reward": round(0.3 * test_ratio, 6), "progress_delta": round(0.4 * progress, 6), "stagnation_penalty": round(stagnation_penalty, 6), "regression_penalty": round(regression_penalty, 6), "repetition_penalty": round(repetition_penalty, 6), "length_penalty": round(length_penalty, 6), } return round(reward, 6) def _safe_task_order(self) -> list[str]: """Load deterministic task ids with a hard fallback.""" try: loaded = list(task_ids()) if loaded: return [str(task_id) for task_id in loaded] except Exception: pass return ["syntax-fix-easy", "bug-fix-medium", "optimization-hard"] def _blank_metrics(self) -> dict[str, float]: """Return an empty metric snapshot.""" return { "score": 0.0, "test_fraction": 0.0, "syntax_score": 0.0, "quality_score": 0.0, } def _select_task(self, task_id: Optional[str]) -> TaskSpec: """Select the requested task or advance deterministically.""" try: if task_id: task = load_task(task_id) if task.task_id in self._task_order: self._task_cursor = self._task_order.index(task.task_id) return task except Exception: pass try: self._task_cursor = (self._task_cursor + 1) % len(self._task_order) return load_task(self._task_order[self._task_cursor]) except Exception: return load_task("syntax-fix-easy") def _safe_grade(self, task: TaskSpec, candidate_code: str, include_hidden: bool) -> TaskGrade: """Run grading without allowing exceptions to escape.""" try: return grade_task(candidate_code, task, include_hidden=include_hidden) except Exception as exc: return TaskGrade( score=0.0, syntax_score=0.0, tests_passed=0, tests_total=max(len(task.visible_tests), 1), details={"compile_error": "", "error": _safe_text(exc, "grading_failed")}, ) def _metrics_from_grade(self, grade: TaskGrade) -> dict[str, float]: """Derive normalized reward metrics from a grading result.""" tests_total = max(int(grade.tests_total), 0) tests_passed = max(int(grade.tests_passed), 0) test_fraction = (tests_passed / tests_total) if tests_total else _clamp(grade.syntax_score) return { "score": _clamp(grade.score), "test_fraction": _clamp(test_fraction), "syntax_score": _clamp(grade.syntax_score), "quality_score": _clamp(grade.quality_score), } def _format_test_results(self, grade: TaskGrade, include_hidden: bool) -> str: """Format test execution results for the observation.""" compile_error = _safe_text(grade.details.get("compile_error", ""), "") scope = "all checks" if include_hidden else "visible checks" if compile_error: return f"{scope}: compile error: {compile_error}" if grade.timed_out: return f"{scope}: execution timed out" if self._task and self._task.task_kind == "syntax_fix": return "visible checks: code compiles successfully" return f"{scope}: {int(grade.tests_passed)}/{int(grade.tests_total)} passing" def _build_status(self, action_type: str, grade: TaskGrade) -> str: """Build a human-readable status message.""" if action_type == "submit_solution": return f"Solution submitted. Final score: {_clamp(grade.score):.3f}" if action_type == "edit_code": if grade.details.get("compile_error"): return "Code updated, but syntax issues remain." return "Code updated and evaluated." if action_type == "run_tests": return "Test run completed." if action_type == "analyze_code": return "Analysis completed." return "Action handled safely." def _apply_grade_to_state(self, grade: TaskGrade, include_hidden: bool) -> None: """Update environment state from the latest grading result.""" compile_error = _safe_text(grade.details.get("compile_error", ""), "") self._state.score = _clamp(grade.score) self._state.errors = compile_error self._state.test_results = self._format_test_results(grade, include_hidden=include_hidden) def _handle_scored_action(self, action_type: str, candidate_code: str, include_hidden: bool) -> None: """Grade code, update state, and compute reward for a valid action.""" old_code = self._state.current_code new_code = candidate_code curr_score, test_results, grade = self.run_tests(new_code, include_hidden=include_hidden) current_metrics = self._metrics_from_grade(grade) reward_value = self.compute_reward( old_code=old_code, new_code=new_code, prev_score=self.previous_score, curr_score=curr_score, test_results=test_results, ) self._state.current_code = new_code self._apply_grade_to_state(grade, include_hidden=include_hidden) self._last_reward = RewardDetails( value=reward_value, syntax_reward=self._last_reward_components.get("syntax_reward", 0.0), test_reward=self._last_reward_components.get("test_reward", 0.0), quality_bonus=0.0, correctness_bonus=0.0, progress_delta=self._last_reward_components.get("progress_delta", 0.0), stagnation_penalty=self._last_reward_components.get("stagnation_penalty", 0.0), regression_penalty=round( self._last_reward_components.get("regression_penalty", 0.0) + self._last_reward_components.get("repetition_penalty", 0.0) + self._last_reward_components.get("length_penalty", 0.0), 6, ), invalid_action_penalty=0.0, timeout_penalty=0.0, reason=f"{action_type} reward computed safely", prev_score=round(self.previous_score, 6), curr_score=round(curr_score, 6), code_changed=bool(new_code.strip() != old_code.strip()), ) self._last_status = self._build_status(action_type, grade) self._metrics = current_metrics self.previous_score = curr_score self.last_code = new_code self._last_action_type = action_type self._append_history(action_type, self._last_status, self._last_reward.value) def _handle_edit(self, code: Optional[str]) -> None: """Validate edit input and evaluate the new candidate code.""" safe_code = (code or "").strip() if not safe_code: self._apply_invalid_action("edit_code requires code parameter.") return self._handle_scored_action(action_type="edit_code", candidate_code=safe_code, include_hidden=False) def _apply_invalid_action(self, reason: str) -> None: """Record an invalid action without crashing the episode.""" current_score = _clamp(self.previous_score) reward_value = -INVALID_ACTION_PENALTY self.reward_history.append(round(reward_value, 6)) self._last_reward = RewardDetails( value=reward_value, invalid_action_penalty=INVALID_ACTION_PENALTY, reason=reason, prev_score=current_score, curr_score=current_score, code_changed=False, ) self._last_status = reason self._append_history("analyze_code", reason, self._last_reward.value) def _auto_submit(self) -> None: """Finalize the episode when attempts are exhausted.""" task = self._task or self._select_task(None) grade = self._safe_grade(task=task, candidate_code=self._state.current_code, include_hidden=True) self._apply_grade_to_state(grade, include_hidden=True) self.previous_score = _clamp(grade.score) self._done = True self._state.done = True self._last_status = f"Auto-submitted. Final score: {_clamp(grade.score):.3f}" def _append_history(self, action_type: str, status: str, reward: float) -> None: """Append one action record to the episode history.""" try: stable_action = action_type if action_type in VALID_ACTIONS else "analyze_code" self._state.history.append( HistoryEntry( step=max(int(self._state.step_count), 0), action_type=stable_action, status=_safe_text(status, "handled"), reward=float(reward), ) ) except Exception: pass def _build_observation(self) -> PythonCodeReviewObservation: """Build a valid observation from current state.""" task = self._task try: return PythonCodeReviewObservation( task_id=self._state.task_id or "", title=task.title if task else "", difficulty=self._state.difficulty or "easy", task_kind=self._state.task_kind, task_description=task.task_description if task else "", current_code=self._state.current_code, errors=self._state.errors, test_results=self._state.test_results, visible_tests=list(task.visible_tests) if task else [], history=list(self._state.history), attempts_remaining=max(int(self._state.attempts_remaining), 0), last_action_status=self._last_status, score=_clamp(self._state.score), reward_details=self._last_reward, reward=self._last_reward.value, done=bool(self._state.done), metadata={ "prev_score": self._last_reward.prev_score, "curr_score": self._last_reward.curr_score, }, ) except Exception as exc: return PythonCodeReviewObservation( task_id=self._state.task_id or "", title="", difficulty="easy", task_kind=None, task_description="", current_code=getattr(self._state, "current_code", ""), errors=_safe_text(exc, "observation_build_failed"), test_results="visible checks: unavailable", visible_tests=[], history=[], attempts_remaining=0, last_action_status="Observation fallback returned safely.", score=0.0, reward_details=RewardDetails(value=0.0, reason="Observation fallback."), reward=0.0, done=bool(getattr(self._state, "done", False)), metadata={}, ) PythonEnvironment = PythonCodeReviewEnvironment CodeReviewEnvironment = PythonCodeReviewEnvironment