# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """Python code-review environment implementation.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import UTC, datetime from typing import Dict, Iterable, List, Optional from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import State try: from ..models import ( Difficulty, PythonAction, PythonEnvConfig, PythonObservation, PythonState, ReviewFinding, TaskDescriptor, TaskEvaluation, TaskMetadata, ) except ImportError: from models import ( # type: ignore Difficulty, PythonAction, PythonEnvConfig, PythonObservation, PythonState, ReviewFinding, TaskDescriptor, TaskEvaluation, TaskMetadata, ) @dataclass(frozen=True) class ReferenceFinding: """Hidden finding metadata used for deterministic grading.""" rule_id: str title: str line: int category: str severity: str rationale: str recommendation: str weight: float keywords: List[str] = field(default_factory=list) @dataclass(frozen=True) class ReviewTask: """A visible task plus its hidden grading references.""" descriptor: TaskDescriptor references: tuple[ReferenceFinding, ...] hint: str patched_code: Optional[str] = None TASK_BANK: Dict[str, ReviewTask] = { "py-review-easy": ReviewTask( descriptor=TaskDescriptor( task_id="py-review-easy", difficulty="easy", title="Mutable default argument", objective="Find the correctness issue and explain a safe fix.", code=( "def add_tag(tag, tags=[]):\n" " tags.append(tag)\n" " return tags\n" ), max_steps=4, success_threshold=0.7, ), references=( ReferenceFinding( rule_id="mutable-default", title="Mutable default list is shared across calls", line=1, category="bug", severity="warning", rationale="The list persists between calls and leaks state.", recommendation="Use None as the default and create a new list inside the function.", weight=1.0, keywords=["mutable", "default", "list", "shared", "persists", "leaks"], ), ), hint="Look for state that survives between separate function calls.", patched_code=( "def add_tag(tag, tags=None):\n" " if tags is None:\n" " tags = []\n" " tags.append(tag)\n" " return tags\n" ), ), "py-review-medium": ReviewTask( descriptor=TaskDescriptor( task_id="py-review-medium", difficulty="medium", title="Unsafe shell invocation", objective="Review the snippet for security-sensitive behavior.", code=( "import os\n\n" "def run_backup(path):\n" " os.system(f\"tar -czf backup.tgz {path}\")\n" ), max_steps=4, success_threshold=0.72, ), references=( ReferenceFinding( rule_id="shell-injection", title="User input is interpolated into a shell command", line=4, category="security", severity="critical", rationale="An attacker can inject shell metacharacters through the path argument.", recommendation="Use subprocess with an argument list instead of os.system.", weight=1.0, keywords=["shell", "injection", "os.system", "subprocess", "input", "unsantized", "escaping"], ), ), hint="Check how external commands are invoked and whether user input is escaped.", patched_code=( "import subprocess\n\n" "def run_backup(path):\n" " subprocess.run([\"tar\", \"-czf\", \"backup.tgz\", path], check=True)\n" ), ), "py-review-hard": ReviewTask( descriptor=TaskDescriptor( task_id="py-review-hard", difficulty="hard", title="Retry helper hides failures", objective="Identify correctness and maintainability issues in the retry logic.", code=( "import time\n\n" "def fetch_with_retry(client, url, retries=3):\n" " last_error = None\n" " for _ in range(retries):\n" " try:\n" " return client.get(url, timeout=1)\n" " except Exception as exc:\n" " last_error = exc\n" " time.sleep(0.1)\n" " return None\n" ), max_steps=4, success_threshold=0.74, ), references=( ReferenceFinding( rule_id="swallowed-error", title="Function swallows the final exception and returns None", line=10, category="bug", severity="warning", rationale="Callers cannot distinguish a failed request from a valid None result.", recommendation="Re-raise the last exception after retries are exhausted.", weight=0.65, keywords=["swallowed", "exception", "return none", "error handling"], ), ReferenceFinding( rule_id="broad-except", title="Broad exception handler catches unexpected failures", line=7, category="maintainability", severity="info", rationale="Catching Exception masks programming errors and interrupts.", recommendation="Catch only the client or network exceptions you expect to retry.", weight=0.35, keywords=["broad", "except", "catch exception"], ), ), hint="Consider what happens to the final error after the retry loop finishes.", patched_code=( "import time\n\n" "def fetch_with_retry(client, url, retries=3):\n" " last_error = None\n" " for _ in range(retries):\n" " try:\n" " return client.get(url, timeout=1)\n" " except client.retryable_exceptions as exc:\n" " last_error = exc\n" " time.sleep(0.1)\n" " if last_error is not None:\n" " raise last_error\n" ), ), } def _utc_now() -> str: return datetime.now(UTC).isoformat() def _normalize_text(value: Optional[str]) -> str: return " ".join((value or "").strip().lower().split()) def _normalize_code(value: Optional[str]) -> str: return "\n".join(line.rstrip() for line in (value or "").strip().splitlines()) class PythonEnvironment(Environment[PythonAction, PythonObservation, State]): """Deterministic benchmark environment for Python code review tasks.""" SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self, config: Optional[PythonEnvConfig] = None): super().__init__() self._config = config or PythonEnvConfig() self._state = State(episode_id=str(uuid4()), step_count=0) self._task_cursor = -1 self._current_task: Optional[ReviewTask] = None self._submitted_findings: List[ReviewFinding] = [] self._hints_used = 0 self._created_at = _utc_now() def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs, ) -> PythonObservation: """Start the next configured review task.""" del seed, kwargs self._task_cursor = (self._task_cursor + 1) % len(self._config.task_order) task_id = self._config.task_order[self._task_cursor] self._current_task = TASK_BANK.get(task_id, TASK_BANK["py-review-easy"]) self._state = State( episode_id=episode_id or str(uuid4()), step_count=0, ) self._submitted_findings = [] self._hints_used = 0 self._created_at = _utc_now() return self._build_observation( feedback="New review task loaded. Submit findings or request a hint.", reward=0.0, done=False, ) def step( self, action: PythonAction, timeout_s: Optional[float] = None, **kwargs, ) -> PythonObservation: """Process one review action and return updated feedback.""" del timeout_s, kwargs if self._current_task is None: return self.reset() self._state.step_count += 1 operation = action.operation feedback = "" reward = 0.0 done = False if operation == "request_hint": self._hints_used += 1 feedback = self._current_task.hint evaluation = self._evaluate(self._submitted_findings, action.patched_code) reward = evaluation.score else: if action.findings: self._submitted_findings.extend(action.findings) evaluation = self._evaluate(self._submitted_findings, action.patched_code) reward = evaluation.score if operation == "finalize": done = True feedback = ( "Review finalized. " f"Matched {evaluation.matched_findings}/{evaluation.total_findings} " "reference findings." ) else: feedback = ( f"Progress saved. Matched {evaluation.matched_findings}/" f"{evaluation.total_findings} findings with score {evaluation.score:.2f}." ) if self._state.step_count >= self._max_steps(): done = True if operation != "finalize": feedback = ( f"{feedback} Maximum steps reached." if feedback else "Maximum steps reached." ) return self._build_observation( feedback=feedback, reward=reward, done=done, patched_code=action.patched_code, ) def _build_observation( self, *, feedback: str, reward: float, done: bool, patched_code: Optional[str] = None, ) -> PythonObservation: assert self._current_task is not None evaluation = self._evaluate(self._submitted_findings, patched_code) attempts_remaining = max( self._max_steps() - self._state.step_count, 0, ) return PythonObservation( task=self._current_task.descriptor, feedback=feedback, submitted_findings=list(self._submitted_findings), hints_used=self._hints_used, attempts_remaining=attempts_remaining, evaluation=evaluation, score=evaluation.score, review_time_ms=float(self._state.step_count * 125), done=done, reward=reward, metadata={ "episode_id": self._state.episode_id, "created_at": self._created_at, "updated_at": _utc_now(), }, ) def _evaluate( self, findings: Iterable[ReviewFinding], patched_code: Optional[str], ) -> TaskEvaluation: assert self._current_task is not None references = self._current_task.references matched_reference_ids: List[str] = [] matched_weight = 0.0 false_positives = 0 duplicate_findings = 0 seen_ids = set() for finding in findings: ref_id = self._match_reference(finding, references) if ref_id is None: false_positives += 1 continue if ref_id in seen_ids: duplicate_findings += 1 continue seen_ids.add(ref_id) matched_reference_ids.append(ref_id) matched_weight += next(ref.weight for ref in references if ref.rule_id == ref_id) total_weight = sum(ref.weight for ref in references) or 1.0 weighted_recall = min(matched_weight / total_weight, 1.0) patch_score = 0.0 if self._current_task.patched_code and patched_code: patch_score = float( _normalize_code(patched_code) == _normalize_code(self._current_task.patched_code) ) raw_score = ( weighted_recall + (self._config.patch_bonus_multiplier * patch_score) - (self._config.false_positive_penalty * false_positives) - (self._config.duplicate_penalty * duplicate_findings) - (self._config.hint_penalty * self._hints_used) ) score = max(0.0, min(raw_score, 1.0)) return TaskEvaluation( matched_reference_ids=matched_reference_ids, matched_findings=len(matched_reference_ids), total_findings=len(references), false_positives=false_positives, duplicate_findings=duplicate_findings, weighted_recall=weighted_recall, patch_score=patch_score, score=score, passed=score >= self._current_task.descriptor.success_threshold, ) def _match_reference( self, finding: ReviewFinding, references: Iterable[ReferenceFinding], ) -> Optional[str]: finding_rule = _normalize_text(finding.rule_id) finding_title = _normalize_text(finding.title) for reference in references: if finding_rule and finding_rule == _normalize_text(reference.rule_id): return reference.rule_id line_matches = finding.line is not None and finding.line == reference.line category_matches = finding.category == reference.category title_matches = finding_title and ( finding_title in _normalize_text(reference.title) or _normalize_text(reference.title) in finding_title ) # Keyword match: check if any reference keywords are in the finding text keyword_match = any( _normalize_text(kw) in finding_title for kw in getattr(reference, "keywords", []) ) if finding_title else False # Relaxed matching: allow matching if the title or keywords match even if the line is missing if (line_matches and (category_matches or title_matches)) or title_matches or keyword_match: return reference.rule_id return None def _max_steps(self) -> int: assert self._current_task is not None return min( self._current_task.descriptor.max_steps, self._config.max_steps_per_task, ) @property def state(self) -> State: """Return the current environment state.""" return self._state # Compatibility bridge: # keep the old module path, but route the actual app/runtime through the # dataset-backed dense-reward benchmark implementation. try: from .review_runtime import ( PythonReviewRuntime as _BenchmarkPythonEnvironment, get_current_state, get_health_response, get_metrics_response, get_tasks_response, ) except ImportError: from server.review_runtime import ( # type: ignore PythonReviewRuntime as _BenchmarkPythonEnvironment, get_current_state, get_health_response, get_metrics_response, get_tasks_response, ) _GLOBAL_ENV: Optional[_BenchmarkPythonEnvironment] = None def get_environment() -> _BenchmarkPythonEnvironment: """Return the shared benchmark environment used by the HTTP app.""" global _GLOBAL_ENV if _GLOBAL_ENV is None: _GLOBAL_ENV = _BenchmarkPythonEnvironment() return _GLOBAL_ENV PythonEnvironment = _BenchmarkPythonEnvironment