Spaces:
Sleeping
Sleeping
| """Safe OpenEnv environment for deterministic Python code repair tasks.""" | |
| from __future__ import annotations | |
| from typing import Any, Optional | |
| from uuid import uuid4 | |
| try: | |
| from compat import Environment | |
| from graders import grade_task | |
| from models import ( | |
| HealthResponse, | |
| HistoryEntry, | |
| PythonCodeReviewAction, | |
| PythonCodeReviewObservation, | |
| PythonCodeReviewState, | |
| RewardDetails, | |
| TaskGrade, | |
| ) | |
| from tasks import TaskSpec, get_task as load_task, list_task_summaries, task_ids | |
| except Exception: | |
| from .compat import Environment | |
| from .graders import grade_task | |
| from .models import ( | |
| HealthResponse, | |
| HistoryEntry, | |
| PythonCodeReviewAction, | |
| PythonCodeReviewObservation, | |
| PythonCodeReviewState, | |
| RewardDetails, | |
| TaskGrade, | |
| ) | |
| from .tasks import TaskSpec, get_task as load_task, list_task_summaries, task_ids | |
| INVALID_ACTION_PENALTY = 0.10 | |
| NO_PROGRESS_PENALTY = 0.20 | |
| REPEATED_ACTION_PENALTY = 0.10 | |
| VALID_ACTIONS = {"analyze_code", "edit_code", "run_tests", "submit_solution"} | |
| def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: | |
| """Clamp a scalar to a bounded numeric interval.""" | |
| try: | |
| return max(low, min(high, float(value))) | |
| except Exception: | |
| return low | |
| def _safe_text(value: Any, default: str = "") -> str: | |
| """Convert values into short stable strings.""" | |
| try: | |
| text = str(value) | |
| except Exception: | |
| return default | |
| text = " ".join(text.split()) | |
| return text[:240] if text else default | |
| class PythonCodeReviewEnvironment( | |
| Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState] | |
| ): | |
| """Deterministic, bounded, evaluator-safe environment for code repair tasks.""" | |
| SUPPORTS_CONCURRENT_SESSIONS = True | |
| def __init__(self, verbose: bool = False) -> None: | |
| super().__init__() | |
| self._verbose = bool(verbose) | |
| self._task_order = self._safe_task_order() | |
| self._task_cursor = -1 | |
| self._task: Optional[TaskSpec] = None | |
| self._state = PythonCodeReviewState(episode_id=str(uuid4())) | |
| self._done = False | |
| self._last_status = "Call reset() to start." | |
| self._last_reward = RewardDetails(value=0.0, reason="Environment initialized.") | |
| self.reward_history: list[float] = [] | |
| self.previous_score = 0.0 | |
| self.last_code = "" | |
| self._last_reward_components: dict[str, float] = {} | |
| self._metrics = self._blank_metrics() | |
| self._last_action_type = "" | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **_: object, | |
| ) -> PythonCodeReviewObservation: | |
| """Reset the environment for a deterministic task and return an observation.""" | |
| del seed | |
| try: | |
| self._reset_rubric() | |
| except Exception: | |
| pass | |
| task = self._select_task(task_id) | |
| self._task = task | |
| self._done = False | |
| self._metrics = self._blank_metrics() | |
| self.reward_history = [] | |
| self.previous_score = 0.0 | |
| self.last_code = "" | |
| self._last_reward_components = {} | |
| self._last_action_type = "" | |
| self._last_status = "Inspect the code, run checks, edit the code, then submit." | |
| self._last_reward = RewardDetails( | |
| value=0.0, | |
| reason="Episode reset.", | |
| prev_score=0.0, | |
| curr_score=0.0, | |
| ) | |
| self._state = PythonCodeReviewState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| task_id=task.task_id, | |
| difficulty=task.difficulty, | |
| task_kind=task.task_kind, | |
| attempts_remaining=max(int(task.max_steps), 1), | |
| current_code=task.starter_code, | |
| errors="", | |
| test_results="No checks run yet.", | |
| history=[], | |
| score=0.0, | |
| done=False, | |
| ) | |
| return self._build_observation() | |
| def step( | |
| self, | |
| action: PythonCodeReviewAction, | |
| timeout_s: Optional[float] = None, | |
| **_: object, | |
| ) -> PythonCodeReviewObservation: | |
| """Execute one safe environment step and always return a valid observation.""" | |
| del timeout_s | |
| try: | |
| if self._task is None: | |
| return self.reset() | |
| if self._done: | |
| self._last_status = "Episode already completed. Call reset() to continue." | |
| self._last_reward = RewardDetails( | |
| value=-INVALID_ACTION_PENALTY, | |
| invalid_action_penalty=INVALID_ACTION_PENALTY, | |
| reason="Episode already completed.", | |
| prev_score=self._metrics["score"], | |
| curr_score=self._metrics["score"], | |
| code_changed=False, | |
| ) | |
| return self._build_observation() | |
| self._state.step_count += 1 | |
| action_type = _safe_text(getattr(action, "action_type", "analyze_code"), "analyze_code") | |
| code = getattr(action, "code", None) | |
| if action_type == "analyze_code": | |
| self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False) | |
| elif action_type == "run_tests": | |
| self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=False) | |
| elif action_type == "edit_code": | |
| self._handle_edit(code) | |
| elif action_type == "submit_solution": | |
| self._handle_scored_action(action_type=action_type, candidate_code=self._state.current_code, include_hidden=True) | |
| else: | |
| self._apply_invalid_action(f"Unsupported action_type '{action_type}'.") | |
| self._state.attempts_remaining = max(self._task.max_steps - self._state.step_count, 0) | |
| self._done = self._done or self._state.score >= 1.0 | |
| if self._state.attempts_remaining == 0 and not self._done: | |
| self._auto_submit() | |
| self._state.done = self._done | |
| return self._build_observation() | |
| except Exception as exc: | |
| self._apply_invalid_action(f"Step failure handled: {_safe_text(exc, 'unknown_error')}") | |
| self._state.done = self._done | |
| return self._build_observation() | |
| def state(self) -> PythonCodeReviewState: | |
| """Return a deep copy of the current environment state.""" | |
| try: | |
| return self._state.model_copy(deep=True) | |
| except Exception: | |
| return PythonCodeReviewState(episode_id=str(uuid4())) | |
| def list_task_summaries(self) -> list[object]: | |
| """Return public task summaries.""" | |
| try: | |
| return list_task_summaries() | |
| except Exception: | |
| return [] | |
| def get_task(self, task_id: str) -> object: | |
| """Return a single public task descriptor.""" | |
| return self._select_task(task_id).to_descriptor() | |
| def health(self) -> HealthResponse: | |
| """Return a simple health response.""" | |
| return HealthResponse(task_count=len(self._task_order)) | |
| def grade_task_submission(self, task_id: str, code: str) -> TaskGrade: | |
| """Grade a task submission outside an episode without raising.""" | |
| try: | |
| task = self._select_task(task_id) | |
| return self._safe_grade(task=task, candidate_code=code, include_hidden=True) | |
| except Exception as exc: | |
| return TaskGrade(score=0.0, details={"error": _safe_text(exc, "grading_failed")}) | |
| def run_tests(self, code: str, include_hidden: bool = False) -> tuple[float, dict[str, int], TaskGrade]: | |
| """Run deterministic grading and return score plus test summary.""" | |
| task = self._task or self._select_task(None) | |
| grade = self._safe_grade(task=task, candidate_code=code, include_hidden=include_hidden) | |
| return ( | |
| _clamp(grade.score), | |
| {"passed": int(grade.tests_passed), "total": int(grade.tests_total)}, | |
| grade, | |
| ) | |
| def apply_action(self, action: PythonCodeReviewAction) -> str: | |
| """Return the candidate code implied by the action.""" | |
| if getattr(action, "action_type", "") == "edit_code": | |
| code = getattr(action, "code", None) | |
| return str(code) if code is not None else self._state.current_code | |
| return self._state.current_code | |
| def compute_reward( | |
| self, | |
| old_code: str, | |
| new_code: str, | |
| prev_score: float, | |
| curr_score: float, | |
| test_results: dict[str, int], | |
| ) -> float: | |
| """Compute the requested bounded reward from score delta and action outcome.""" | |
| progress = curr_score - prev_score | |
| passed = int(test_results["passed"]) | |
| total = int(test_results["total"]) | |
| test_ratio = passed / total if total > 0 else 0.0 | |
| try: | |
| compile(new_code, "<string>", "exec") | |
| syntax_score = 1.0 | |
| except Exception: | |
| syntax_score = 0.0 | |
| stagnation_penalty = NO_PROGRESS_PENALTY if new_code.strip() == old_code.strip() else 0.0 | |
| regression_penalty = max(0.0, prev_score - curr_score) | |
| repetition_penalty = REPEATED_ACTION_PENALTY if new_code == getattr(self, "last_code", "") else 0.0 | |
| length_penalty = 0.1 if len(new_code) > len(old_code) * 1.5 else 0.0 | |
| reward = ( | |
| 0.4 * progress | |
| + 0.3 * test_ratio | |
| + 0.2 * syntax_score | |
| - stagnation_penalty | |
| - regression_penalty | |
| - repetition_penalty | |
| - length_penalty | |
| ) | |
| reward = max(-1.0, min(1.0, reward)) | |
| self.reward_history.append(round(reward, 6)) | |
| if len(self.reward_history) >= 3 and len(set(self.reward_history[-3:])) == 1: | |
| import random | |
| reward = max(-1.0, min(1.0, reward + random.uniform(-0.05, 0.05))) | |
| self.reward_history[-1] = round(reward, 6) | |
| self._last_reward_components = { | |
| "syntax_reward": round(0.2 * syntax_score, 6), | |
| "test_reward": round(0.3 * test_ratio, 6), | |
| "progress_delta": round(0.4 * progress, 6), | |
| "stagnation_penalty": round(stagnation_penalty, 6), | |
| "regression_penalty": round(regression_penalty, 6), | |
| "repetition_penalty": round(repetition_penalty, 6), | |
| "length_penalty": round(length_penalty, 6), | |
| } | |
| return round(reward, 6) | |
| def _safe_task_order(self) -> list[str]: | |
| """Load deterministic task ids with a hard fallback.""" | |
| try: | |
| loaded = list(task_ids()) | |
| if loaded: | |
| return [str(task_id) for task_id in loaded] | |
| except Exception: | |
| pass | |
| return ["syntax-fix-easy", "bug-fix-medium", "optimization-hard"] | |
| def _blank_metrics(self) -> dict[str, float]: | |
| """Return an empty metric snapshot.""" | |
| return { | |
| "score": 0.0, | |
| "test_fraction": 0.0, | |
| "syntax_score": 0.0, | |
| "quality_score": 0.0, | |
| } | |
| def _select_task(self, task_id: Optional[str]) -> TaskSpec: | |
| """Select the requested task or advance deterministically.""" | |
| try: | |
| if task_id: | |
| task = load_task(task_id) | |
| if task.task_id in self._task_order: | |
| self._task_cursor = self._task_order.index(task.task_id) | |
| return task | |
| except Exception: | |
| pass | |
| try: | |
| self._task_cursor = (self._task_cursor + 1) % len(self._task_order) | |
| return load_task(self._task_order[self._task_cursor]) | |
| except Exception: | |
| return load_task("syntax-fix-easy") | |
| def _safe_grade(self, task: TaskSpec, candidate_code: str, include_hidden: bool) -> TaskGrade: | |
| """Run grading without allowing exceptions to escape.""" | |
| try: | |
| return grade_task(candidate_code, task, include_hidden=include_hidden) | |
| except Exception as exc: | |
| return TaskGrade( | |
| score=0.0, | |
| syntax_score=0.0, | |
| tests_passed=0, | |
| tests_total=max(len(task.visible_tests), 1), | |
| details={"compile_error": "", "error": _safe_text(exc, "grading_failed")}, | |
| ) | |
| def _metrics_from_grade(self, grade: TaskGrade) -> dict[str, float]: | |
| """Derive normalized reward metrics from a grading result.""" | |
| tests_total = max(int(grade.tests_total), 0) | |
| tests_passed = max(int(grade.tests_passed), 0) | |
| test_fraction = (tests_passed / tests_total) if tests_total else _clamp(grade.syntax_score) | |
| return { | |
| "score": _clamp(grade.score), | |
| "test_fraction": _clamp(test_fraction), | |
| "syntax_score": _clamp(grade.syntax_score), | |
| "quality_score": _clamp(grade.quality_score), | |
| } | |
| def _format_test_results(self, grade: TaskGrade, include_hidden: bool) -> str: | |
| """Format test execution results for the observation.""" | |
| compile_error = _safe_text(grade.details.get("compile_error", ""), "") | |
| scope = "all checks" if include_hidden else "visible checks" | |
| if compile_error: | |
| return f"{scope}: compile error: {compile_error}" | |
| if grade.timed_out: | |
| return f"{scope}: execution timed out" | |
| if self._task and self._task.task_kind == "syntax_fix": | |
| return "visible checks: code compiles successfully" | |
| return f"{scope}: {int(grade.tests_passed)}/{int(grade.tests_total)} passing" | |
| def _build_status(self, action_type: str, grade: TaskGrade) -> str: | |
| """Build a human-readable status message.""" | |
| if action_type == "submit_solution": | |
| return f"Solution submitted. Final score: {_clamp(grade.score):.3f}" | |
| if action_type == "edit_code": | |
| if grade.details.get("compile_error"): | |
| return "Code updated, but syntax issues remain." | |
| return "Code updated and evaluated." | |
| if action_type == "run_tests": | |
| return "Test run completed." | |
| if action_type == "analyze_code": | |
| return "Analysis completed." | |
| return "Action handled safely." | |
| def _apply_grade_to_state(self, grade: TaskGrade, include_hidden: bool) -> None: | |
| """Update environment state from the latest grading result.""" | |
| compile_error = _safe_text(grade.details.get("compile_error", ""), "") | |
| self._state.score = _clamp(grade.score) | |
| self._state.errors = compile_error | |
| self._state.test_results = self._format_test_results(grade, include_hidden=include_hidden) | |
| def _handle_scored_action(self, action_type: str, candidate_code: str, include_hidden: bool) -> None: | |
| """Grade code, update state, and compute reward for a valid action.""" | |
| old_code = self._state.current_code | |
| new_code = candidate_code | |
| curr_score, test_results, grade = self.run_tests(new_code, include_hidden=include_hidden) | |
| current_metrics = self._metrics_from_grade(grade) | |
| reward_value = self.compute_reward( | |
| old_code=old_code, | |
| new_code=new_code, | |
| prev_score=self.previous_score, | |
| curr_score=curr_score, | |
| test_results=test_results, | |
| ) | |
| self._state.current_code = new_code | |
| self._apply_grade_to_state(grade, include_hidden=include_hidden) | |
| self._last_reward = RewardDetails( | |
| value=reward_value, | |
| syntax_reward=self._last_reward_components.get("syntax_reward", 0.0), | |
| test_reward=self._last_reward_components.get("test_reward", 0.0), | |
| quality_bonus=0.0, | |
| correctness_bonus=0.0, | |
| progress_delta=self._last_reward_components.get("progress_delta", 0.0), | |
| stagnation_penalty=self._last_reward_components.get("stagnation_penalty", 0.0), | |
| regression_penalty=round( | |
| self._last_reward_components.get("regression_penalty", 0.0) | |
| + self._last_reward_components.get("repetition_penalty", 0.0) | |
| + self._last_reward_components.get("length_penalty", 0.0), | |
| 6, | |
| ), | |
| invalid_action_penalty=0.0, | |
| timeout_penalty=0.0, | |
| reason=f"{action_type} reward computed safely", | |
| prev_score=round(self.previous_score, 6), | |
| curr_score=round(curr_score, 6), | |
| code_changed=bool(new_code.strip() != old_code.strip()), | |
| ) | |
| self._last_status = self._build_status(action_type, grade) | |
| self._metrics = current_metrics | |
| self.previous_score = curr_score | |
| self.last_code = new_code | |
| self._last_action_type = action_type | |
| self._append_history(action_type, self._last_status, self._last_reward.value) | |
| def _handle_edit(self, code: Optional[str]) -> None: | |
| """Validate edit input and evaluate the new candidate code.""" | |
| safe_code = (code or "").strip() | |
| if not safe_code: | |
| self._apply_invalid_action("edit_code requires code parameter.") | |
| return | |
| self._handle_scored_action(action_type="edit_code", candidate_code=safe_code, include_hidden=False) | |
| def _apply_invalid_action(self, reason: str) -> None: | |
| """Record an invalid action without crashing the episode.""" | |
| current_score = _clamp(self.previous_score) | |
| reward_value = -INVALID_ACTION_PENALTY | |
| self.reward_history.append(round(reward_value, 6)) | |
| self._last_reward = RewardDetails( | |
| value=reward_value, | |
| invalid_action_penalty=INVALID_ACTION_PENALTY, | |
| reason=reason, | |
| prev_score=current_score, | |
| curr_score=current_score, | |
| code_changed=False, | |
| ) | |
| self._last_status = reason | |
| self._append_history("analyze_code", reason, self._last_reward.value) | |
| def _auto_submit(self) -> None: | |
| """Finalize the episode when attempts are exhausted.""" | |
| task = self._task or self._select_task(None) | |
| grade = self._safe_grade(task=task, candidate_code=self._state.current_code, include_hidden=True) | |
| self._apply_grade_to_state(grade, include_hidden=True) | |
| self.previous_score = _clamp(grade.score) | |
| self._done = True | |
| self._state.done = True | |
| self._last_status = f"Auto-submitted. Final score: {_clamp(grade.score):.3f}" | |
| def _append_history(self, action_type: str, status: str, reward: float) -> None: | |
| """Append one action record to the episode history.""" | |
| try: | |
| stable_action = action_type if action_type in VALID_ACTIONS else "analyze_code" | |
| self._state.history.append( | |
| HistoryEntry( | |
| step=max(int(self._state.step_count), 0), | |
| action_type=stable_action, | |
| status=_safe_text(status, "handled"), | |
| reward=float(reward), | |
| ) | |
| ) | |
| except Exception: | |
| pass | |
| def _build_observation(self) -> PythonCodeReviewObservation: | |
| """Build a valid observation from current state.""" | |
| task = self._task | |
| try: | |
| return PythonCodeReviewObservation( | |
| task_id=self._state.task_id or "", | |
| title=task.title if task else "", | |
| difficulty=self._state.difficulty or "easy", | |
| task_kind=self._state.task_kind, | |
| task_description=task.task_description if task else "", | |
| current_code=self._state.current_code, | |
| errors=self._state.errors, | |
| test_results=self._state.test_results, | |
| visible_tests=list(task.visible_tests) if task else [], | |
| history=list(self._state.history), | |
| attempts_remaining=max(int(self._state.attempts_remaining), 0), | |
| last_action_status=self._last_status, | |
| score=_clamp(self._state.score), | |
| reward_details=self._last_reward, | |
| reward=self._last_reward.value, | |
| done=bool(self._state.done), | |
| metadata={ | |
| "prev_score": self._last_reward.prev_score, | |
| "curr_score": self._last_reward.curr_score, | |
| }, | |
| ) | |
| except Exception as exc: | |
| return PythonCodeReviewObservation( | |
| task_id=self._state.task_id or "", | |
| title="", | |
| difficulty="easy", | |
| task_kind=None, | |
| task_description="", | |
| current_code=getattr(self._state, "current_code", ""), | |
| errors=_safe_text(exc, "observation_build_failed"), | |
| test_results="visible checks: unavailable", | |
| visible_tests=[], | |
| history=[], | |
| attempts_remaining=0, | |
| last_action_status="Observation fallback returned safely.", | |
| score=0.0, | |
| reward_details=RewardDetails(value=0.0, reason="Observation fallback."), | |
| reward=0.0, | |
| done=bool(getattr(self._state, "done", False)), | |
| metadata={}, | |
| ) | |
| PythonEnvironment = PythonCodeReviewEnvironment | |
| CodeReviewEnvironment = PythonCodeReviewEnvironment | |