Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """Python code-review environment implementation.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from datetime import UTC, datetime | |
| from typing import Dict, Iterable, List, Optional | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import ( | |
| Difficulty, | |
| PythonAction, | |
| PythonEnvConfig, | |
| PythonObservation, | |
| PythonState, | |
| ReviewFinding, | |
| TaskDescriptor, | |
| TaskEvaluation, | |
| TaskMetadata, | |
| ) | |
| except ImportError: | |
| from models import ( # type: ignore | |
| Difficulty, | |
| PythonAction, | |
| PythonEnvConfig, | |
| PythonObservation, | |
| PythonState, | |
| ReviewFinding, | |
| TaskDescriptor, | |
| TaskEvaluation, | |
| TaskMetadata, | |
| ) | |
| class ReferenceFinding: | |
| """Hidden finding metadata used for deterministic grading.""" | |
| rule_id: str | |
| title: str | |
| line: int | |
| category: str | |
| severity: str | |
| rationale: str | |
| recommendation: str | |
| weight: float | |
| keywords: List[str] = field(default_factory=list) | |
| class ReviewTask: | |
| """A visible task plus its hidden grading references.""" | |
| descriptor: TaskDescriptor | |
| references: tuple[ReferenceFinding, ...] | |
| hint: str | |
| patched_code: Optional[str] = None | |
| TASK_BANK: Dict[str, ReviewTask] = { | |
| "py-review-easy": ReviewTask( | |
| descriptor=TaskDescriptor( | |
| task_id="py-review-easy", | |
| difficulty="easy", | |
| title="Mutable default argument", | |
| objective="Find the correctness issue and explain a safe fix.", | |
| code=( | |
| "def add_tag(tag, tags=[]):\n" | |
| " tags.append(tag)\n" | |
| " return tags\n" | |
| ), | |
| max_steps=4, | |
| success_threshold=0.7, | |
| ), | |
| references=( | |
| ReferenceFinding( | |
| rule_id="mutable-default", | |
| title="Mutable default list is shared across calls", | |
| line=1, | |
| category="bug", | |
| severity="warning", | |
| rationale="The list persists between calls and leaks state.", | |
| recommendation="Use None as the default and create a new list inside the function.", | |
| weight=1.0, | |
| keywords=["mutable", "default", "list", "shared", "persists", "leaks"], | |
| ), | |
| ), | |
| hint="Look for state that survives between separate function calls.", | |
| patched_code=( | |
| "def add_tag(tag, tags=None):\n" | |
| " if tags is None:\n" | |
| " tags = []\n" | |
| " tags.append(tag)\n" | |
| " return tags\n" | |
| ), | |
| ), | |
| "py-review-medium": ReviewTask( | |
| descriptor=TaskDescriptor( | |
| task_id="py-review-medium", | |
| difficulty="medium", | |
| title="Unsafe shell invocation", | |
| objective="Review the snippet for security-sensitive behavior.", | |
| code=( | |
| "import os\n\n" | |
| "def run_backup(path):\n" | |
| " os.system(f\"tar -czf backup.tgz {path}\")\n" | |
| ), | |
| max_steps=4, | |
| success_threshold=0.72, | |
| ), | |
| references=( | |
| ReferenceFinding( | |
| rule_id="shell-injection", | |
| title="User input is interpolated into a shell command", | |
| line=4, | |
| category="security", | |
| severity="critical", | |
| rationale="An attacker can inject shell metacharacters through the path argument.", | |
| recommendation="Use subprocess with an argument list instead of os.system.", | |
| weight=1.0, | |
| keywords=["shell", "injection", "os.system", "subprocess", "input", "unsantized", "escaping"], | |
| ), | |
| ), | |
| hint="Check how external commands are invoked and whether user input is escaped.", | |
| patched_code=( | |
| "import subprocess\n\n" | |
| "def run_backup(path):\n" | |
| " subprocess.run([\"tar\", \"-czf\", \"backup.tgz\", path], check=True)\n" | |
| ), | |
| ), | |
| "py-review-hard": ReviewTask( | |
| descriptor=TaskDescriptor( | |
| task_id="py-review-hard", | |
| difficulty="hard", | |
| title="Retry helper hides failures", | |
| objective="Identify correctness and maintainability issues in the retry logic.", | |
| code=( | |
| "import time\n\n" | |
| "def fetch_with_retry(client, url, retries=3):\n" | |
| " last_error = None\n" | |
| " for _ in range(retries):\n" | |
| " try:\n" | |
| " return client.get(url, timeout=1)\n" | |
| " except Exception as exc:\n" | |
| " last_error = exc\n" | |
| " time.sleep(0.1)\n" | |
| " return None\n" | |
| ), | |
| max_steps=4, | |
| success_threshold=0.74, | |
| ), | |
| references=( | |
| ReferenceFinding( | |
| rule_id="swallowed-error", | |
| title="Function swallows the final exception and returns None", | |
| line=10, | |
| category="bug", | |
| severity="warning", | |
| rationale="Callers cannot distinguish a failed request from a valid None result.", | |
| recommendation="Re-raise the last exception after retries are exhausted.", | |
| weight=0.65, | |
| keywords=["swallowed", "exception", "return none", "error handling"], | |
| ), | |
| ReferenceFinding( | |
| rule_id="broad-except", | |
| title="Broad exception handler catches unexpected failures", | |
| line=7, | |
| category="maintainability", | |
| severity="info", | |
| rationale="Catching Exception masks programming errors and interrupts.", | |
| recommendation="Catch only the client or network exceptions you expect to retry.", | |
| weight=0.35, | |
| keywords=["broad", "except", "catch exception"], | |
| ), | |
| ), | |
| hint="Consider what happens to the final error after the retry loop finishes.", | |
| patched_code=( | |
| "import time\n\n" | |
| "def fetch_with_retry(client, url, retries=3):\n" | |
| " last_error = None\n" | |
| " for _ in range(retries):\n" | |
| " try:\n" | |
| " return client.get(url, timeout=1)\n" | |
| " except client.retryable_exceptions as exc:\n" | |
| " last_error = exc\n" | |
| " time.sleep(0.1)\n" | |
| " if last_error is not None:\n" | |
| " raise last_error\n" | |
| ), | |
| ), | |
| } | |
| def _utc_now() -> str: | |
| return datetime.now(UTC).isoformat() | |
| def _normalize_text(value: Optional[str]) -> str: | |
| return " ".join((value or "").strip().lower().split()) | |
| def _normalize_code(value: Optional[str]) -> str: | |
| return "\n".join(line.rstrip() for line in (value or "").strip().splitlines()) | |
| class PythonEnvironment(Environment[PythonAction, PythonObservation, State]): | |
| """Deterministic benchmark environment for Python code review tasks.""" | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self, config: Optional[PythonEnvConfig] = None): | |
| super().__init__() | |
| self._config = config or PythonEnvConfig() | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._task_cursor = -1 | |
| self._current_task: Optional[ReviewTask] = None | |
| self._submitted_findings: List[ReviewFinding] = [] | |
| self._hints_used = 0 | |
| self._created_at = _utc_now() | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs, | |
| ) -> PythonObservation: | |
| """Start the next configured review task.""" | |
| del seed, kwargs | |
| self._task_cursor = (self._task_cursor + 1) % len(self._config.task_order) | |
| task_id = self._config.task_order[self._task_cursor] | |
| self._current_task = TASK_BANK.get(task_id, TASK_BANK["py-review-easy"]) | |
| self._state = State( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| ) | |
| self._submitted_findings = [] | |
| self._hints_used = 0 | |
| self._created_at = _utc_now() | |
| return self._build_observation( | |
| feedback="New review task loaded. Submit findings or request a hint.", | |
| reward=0.0, | |
| done=False, | |
| ) | |
| def step( | |
| self, | |
| action: PythonAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs, | |
| ) -> PythonObservation: | |
| """Process one review action and return updated feedback.""" | |
| del timeout_s, kwargs | |
| if self._current_task is None: | |
| return self.reset() | |
| self._state.step_count += 1 | |
| operation = action.operation | |
| feedback = "" | |
| reward = 0.0 | |
| done = False | |
| if operation == "request_hint": | |
| self._hints_used += 1 | |
| feedback = self._current_task.hint | |
| evaluation = self._evaluate(self._submitted_findings, action.patched_code) | |
| reward = evaluation.score | |
| else: | |
| if action.findings: | |
| self._submitted_findings.extend(action.findings) | |
| evaluation = self._evaluate(self._submitted_findings, action.patched_code) | |
| reward = evaluation.score | |
| if operation == "finalize": | |
| done = True | |
| feedback = ( | |
| "Review finalized. " | |
| f"Matched {evaluation.matched_findings}/{evaluation.total_findings} " | |
| "reference findings." | |
| ) | |
| else: | |
| feedback = ( | |
| f"Progress saved. Matched {evaluation.matched_findings}/" | |
| f"{evaluation.total_findings} findings with score {evaluation.score:.2f}." | |
| ) | |
| if self._state.step_count >= self._max_steps(): | |
| done = True | |
| if operation != "finalize": | |
| feedback = ( | |
| f"{feedback} Maximum steps reached." | |
| if feedback | |
| else "Maximum steps reached." | |
| ) | |
| return self._build_observation( | |
| feedback=feedback, | |
| reward=reward, | |
| done=done, | |
| patched_code=action.patched_code, | |
| ) | |
| def _build_observation( | |
| self, | |
| *, | |
| feedback: str, | |
| reward: float, | |
| done: bool, | |
| patched_code: Optional[str] = None, | |
| ) -> PythonObservation: | |
| assert self._current_task is not None | |
| evaluation = self._evaluate(self._submitted_findings, patched_code) | |
| attempts_remaining = max( | |
| self._max_steps() - self._state.step_count, | |
| 0, | |
| ) | |
| return PythonObservation( | |
| task=self._current_task.descriptor, | |
| feedback=feedback, | |
| submitted_findings=list(self._submitted_findings), | |
| hints_used=self._hints_used, | |
| attempts_remaining=attempts_remaining, | |
| evaluation=evaluation, | |
| score=evaluation.score, | |
| review_time_ms=float(self._state.step_count * 125), | |
| done=done, | |
| reward=reward, | |
| metadata={ | |
| "episode_id": self._state.episode_id, | |
| "created_at": self._created_at, | |
| "updated_at": _utc_now(), | |
| }, | |
| ) | |
| def _evaluate( | |
| self, | |
| findings: Iterable[ReviewFinding], | |
| patched_code: Optional[str], | |
| ) -> TaskEvaluation: | |
| assert self._current_task is not None | |
| references = self._current_task.references | |
| matched_reference_ids: List[str] = [] | |
| matched_weight = 0.0 | |
| false_positives = 0 | |
| duplicate_findings = 0 | |
| seen_ids = set() | |
| for finding in findings: | |
| ref_id = self._match_reference(finding, references) | |
| if ref_id is None: | |
| false_positives += 1 | |
| continue | |
| if ref_id in seen_ids: | |
| duplicate_findings += 1 | |
| continue | |
| seen_ids.add(ref_id) | |
| matched_reference_ids.append(ref_id) | |
| matched_weight += next(ref.weight for ref in references if ref.rule_id == ref_id) | |
| total_weight = sum(ref.weight for ref in references) or 1.0 | |
| weighted_recall = min(matched_weight / total_weight, 1.0) | |
| patch_score = 0.0 | |
| if self._current_task.patched_code and patched_code: | |
| patch_score = float( | |
| _normalize_code(patched_code) == _normalize_code(self._current_task.patched_code) | |
| ) | |
| raw_score = ( | |
| weighted_recall | |
| + (self._config.patch_bonus_multiplier * patch_score) | |
| - (self._config.false_positive_penalty * false_positives) | |
| - (self._config.duplicate_penalty * duplicate_findings) | |
| - (self._config.hint_penalty * self._hints_used) | |
| ) | |
| score = max(0.0, min(raw_score, 1.0)) | |
| return TaskEvaluation( | |
| matched_reference_ids=matched_reference_ids, | |
| matched_findings=len(matched_reference_ids), | |
| total_findings=len(references), | |
| false_positives=false_positives, | |
| duplicate_findings=duplicate_findings, | |
| weighted_recall=weighted_recall, | |
| patch_score=patch_score, | |
| score=score, | |
| passed=score >= self._current_task.descriptor.success_threshold, | |
| ) | |
| def _match_reference( | |
| self, | |
| finding: ReviewFinding, | |
| references: Iterable[ReferenceFinding], | |
| ) -> Optional[str]: | |
| finding_rule = _normalize_text(finding.rule_id) | |
| finding_title = _normalize_text(finding.title) | |
| for reference in references: | |
| if finding_rule and finding_rule == _normalize_text(reference.rule_id): | |
| return reference.rule_id | |
| line_matches = finding.line is not None and finding.line == reference.line | |
| category_matches = finding.category == reference.category | |
| title_matches = finding_title and ( | |
| finding_title in _normalize_text(reference.title) | |
| or _normalize_text(reference.title) in finding_title | |
| ) | |
| # Keyword match: check if any reference keywords are in the finding text | |
| keyword_match = any( | |
| _normalize_text(kw) in finding_title | |
| for kw in getattr(reference, "keywords", []) | |
| ) if finding_title else False | |
| # Relaxed matching: allow matching if the title or keywords match even if the line is missing | |
| if (line_matches and (category_matches or title_matches)) or title_matches or keyword_match: | |
| return reference.rule_id | |
| return None | |
| def _max_steps(self) -> int: | |
| assert self._current_task is not None | |
| return min( | |
| self._current_task.descriptor.max_steps, | |
| self._config.max_steps_per_task, | |
| ) | |
| def state(self) -> State: | |
| """Return the current environment state.""" | |
| return self._state | |
| # Compatibility bridge: | |
| # keep the old module path, but route the actual app/runtime through the | |
| # dataset-backed dense-reward benchmark implementation. | |
| try: | |
| from .review_runtime import ( | |
| PythonReviewRuntime as _BenchmarkPythonEnvironment, | |
| get_current_state, | |
| get_health_response, | |
| get_metrics_response, | |
| get_tasks_response, | |
| ) | |
| except ImportError: | |
| from server.review_runtime import ( # type: ignore | |
| PythonReviewRuntime as _BenchmarkPythonEnvironment, | |
| get_current_state, | |
| get_health_response, | |
| get_metrics_response, | |
| get_tasks_response, | |
| ) | |
| _GLOBAL_ENV: Optional[_BenchmarkPythonEnvironment] = None | |
| def get_environment() -> _BenchmarkPythonEnvironment: | |
| """Return the shared benchmark environment used by the HTTP app.""" | |
| global _GLOBAL_ENV | |
| if _GLOBAL_ENV is None: | |
| _GLOBAL_ENV = _BenchmarkPythonEnvironment() | |
| return _GLOBAL_ENV | |
| PythonEnvironment = _BenchmarkPythonEnvironment | |