python_env / server /python_env_environment.py
darshanajudiya7's picture
Upload folder using huggingface_hub
83bfb8f verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""Python code-review environment implementation."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Dict, Iterable, List, Optional
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import (
Difficulty,
PythonAction,
PythonEnvConfig,
PythonObservation,
PythonState,
ReviewFinding,
TaskDescriptor,
TaskEvaluation,
TaskMetadata,
)
except ImportError:
from models import ( # type: ignore
Difficulty,
PythonAction,
PythonEnvConfig,
PythonObservation,
PythonState,
ReviewFinding,
TaskDescriptor,
TaskEvaluation,
TaskMetadata,
)
@dataclass(frozen=True)
class ReferenceFinding:
"""Hidden finding metadata used for deterministic grading."""
rule_id: str
title: str
line: int
category: str
severity: str
rationale: str
recommendation: str
weight: float
keywords: List[str] = field(default_factory=list)
@dataclass(frozen=True)
class ReviewTask:
"""A visible task plus its hidden grading references."""
descriptor: TaskDescriptor
references: tuple[ReferenceFinding, ...]
hint: str
patched_code: Optional[str] = None
TASK_BANK: Dict[str, ReviewTask] = {
"py-review-easy": ReviewTask(
descriptor=TaskDescriptor(
task_id="py-review-easy",
difficulty="easy",
title="Mutable default argument",
objective="Find the correctness issue and explain a safe fix.",
code=(
"def add_tag(tag, tags=[]):\n"
" tags.append(tag)\n"
" return tags\n"
),
max_steps=4,
success_threshold=0.7,
),
references=(
ReferenceFinding(
rule_id="mutable-default",
title="Mutable default list is shared across calls",
line=1,
category="bug",
severity="warning",
rationale="The list persists between calls and leaks state.",
recommendation="Use None as the default and create a new list inside the function.",
weight=1.0,
keywords=["mutable", "default", "list", "shared", "persists", "leaks"],
),
),
hint="Look for state that survives between separate function calls.",
patched_code=(
"def add_tag(tag, tags=None):\n"
" if tags is None:\n"
" tags = []\n"
" tags.append(tag)\n"
" return tags\n"
),
),
"py-review-medium": ReviewTask(
descriptor=TaskDescriptor(
task_id="py-review-medium",
difficulty="medium",
title="Unsafe shell invocation",
objective="Review the snippet for security-sensitive behavior.",
code=(
"import os\n\n"
"def run_backup(path):\n"
" os.system(f\"tar -czf backup.tgz {path}\")\n"
),
max_steps=4,
success_threshold=0.72,
),
references=(
ReferenceFinding(
rule_id="shell-injection",
title="User input is interpolated into a shell command",
line=4,
category="security",
severity="critical",
rationale="An attacker can inject shell metacharacters through the path argument.",
recommendation="Use subprocess with an argument list instead of os.system.",
weight=1.0,
keywords=["shell", "injection", "os.system", "subprocess", "input", "unsantized", "escaping"],
),
),
hint="Check how external commands are invoked and whether user input is escaped.",
patched_code=(
"import subprocess\n\n"
"def run_backup(path):\n"
" subprocess.run([\"tar\", \"-czf\", \"backup.tgz\", path], check=True)\n"
),
),
"py-review-hard": ReviewTask(
descriptor=TaskDescriptor(
task_id="py-review-hard",
difficulty="hard",
title="Retry helper hides failures",
objective="Identify correctness and maintainability issues in the retry logic.",
code=(
"import time\n\n"
"def fetch_with_retry(client, url, retries=3):\n"
" last_error = None\n"
" for _ in range(retries):\n"
" try:\n"
" return client.get(url, timeout=1)\n"
" except Exception as exc:\n"
" last_error = exc\n"
" time.sleep(0.1)\n"
" return None\n"
),
max_steps=4,
success_threshold=0.74,
),
references=(
ReferenceFinding(
rule_id="swallowed-error",
title="Function swallows the final exception and returns None",
line=10,
category="bug",
severity="warning",
rationale="Callers cannot distinguish a failed request from a valid None result.",
recommendation="Re-raise the last exception after retries are exhausted.",
weight=0.65,
keywords=["swallowed", "exception", "return none", "error handling"],
),
ReferenceFinding(
rule_id="broad-except",
title="Broad exception handler catches unexpected failures",
line=7,
category="maintainability",
severity="info",
rationale="Catching Exception masks programming errors and interrupts.",
recommendation="Catch only the client or network exceptions you expect to retry.",
weight=0.35,
keywords=["broad", "except", "catch exception"],
),
),
hint="Consider what happens to the final error after the retry loop finishes.",
patched_code=(
"import time\n\n"
"def fetch_with_retry(client, url, retries=3):\n"
" last_error = None\n"
" for _ in range(retries):\n"
" try:\n"
" return client.get(url, timeout=1)\n"
" except client.retryable_exceptions as exc:\n"
" last_error = exc\n"
" time.sleep(0.1)\n"
" if last_error is not None:\n"
" raise last_error\n"
),
),
}
def _utc_now() -> str:
return datetime.now(UTC).isoformat()
def _normalize_text(value: Optional[str]) -> str:
return " ".join((value or "").strip().lower().split())
def _normalize_code(value: Optional[str]) -> str:
return "\n".join(line.rstrip() for line in (value or "").strip().splitlines())
class PythonEnvironment(Environment[PythonAction, PythonObservation, State]):
"""Deterministic benchmark environment for Python code review tasks."""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self, config: Optional[PythonEnvConfig] = None):
super().__init__()
self._config = config or PythonEnvConfig()
self._state = State(episode_id=str(uuid4()), step_count=0)
self._task_cursor = -1
self._current_task: Optional[ReviewTask] = None
self._submitted_findings: List[ReviewFinding] = []
self._hints_used = 0
self._created_at = _utc_now()
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs,
) -> PythonObservation:
"""Start the next configured review task."""
del seed, kwargs
self._task_cursor = (self._task_cursor + 1) % len(self._config.task_order)
task_id = self._config.task_order[self._task_cursor]
self._current_task = TASK_BANK.get(task_id, TASK_BANK["py-review-easy"])
self._state = State(
episode_id=episode_id or str(uuid4()),
step_count=0,
)
self._submitted_findings = []
self._hints_used = 0
self._created_at = _utc_now()
return self._build_observation(
feedback="New review task loaded. Submit findings or request a hint.",
reward=0.0,
done=False,
)
def step(
self,
action: PythonAction,
timeout_s: Optional[float] = None,
**kwargs,
) -> PythonObservation:
"""Process one review action and return updated feedback."""
del timeout_s, kwargs
if self._current_task is None:
return self.reset()
self._state.step_count += 1
operation = action.operation
feedback = ""
reward = 0.0
done = False
if operation == "request_hint":
self._hints_used += 1
feedback = self._current_task.hint
evaluation = self._evaluate(self._submitted_findings, action.patched_code)
reward = evaluation.score
else:
if action.findings:
self._submitted_findings.extend(action.findings)
evaluation = self._evaluate(self._submitted_findings, action.patched_code)
reward = evaluation.score
if operation == "finalize":
done = True
feedback = (
"Review finalized. "
f"Matched {evaluation.matched_findings}/{evaluation.total_findings} "
"reference findings."
)
else:
feedback = (
f"Progress saved. Matched {evaluation.matched_findings}/"
f"{evaluation.total_findings} findings with score {evaluation.score:.2f}."
)
if self._state.step_count >= self._max_steps():
done = True
if operation != "finalize":
feedback = (
f"{feedback} Maximum steps reached."
if feedback
else "Maximum steps reached."
)
return self._build_observation(
feedback=feedback,
reward=reward,
done=done,
patched_code=action.patched_code,
)
def _build_observation(
self,
*,
feedback: str,
reward: float,
done: bool,
patched_code: Optional[str] = None,
) -> PythonObservation:
assert self._current_task is not None
evaluation = self._evaluate(self._submitted_findings, patched_code)
attempts_remaining = max(
self._max_steps() - self._state.step_count,
0,
)
return PythonObservation(
task=self._current_task.descriptor,
feedback=feedback,
submitted_findings=list(self._submitted_findings),
hints_used=self._hints_used,
attempts_remaining=attempts_remaining,
evaluation=evaluation,
score=evaluation.score,
review_time_ms=float(self._state.step_count * 125),
done=done,
reward=reward,
metadata={
"episode_id": self._state.episode_id,
"created_at": self._created_at,
"updated_at": _utc_now(),
},
)
def _evaluate(
self,
findings: Iterable[ReviewFinding],
patched_code: Optional[str],
) -> TaskEvaluation:
assert self._current_task is not None
references = self._current_task.references
matched_reference_ids: List[str] = []
matched_weight = 0.0
false_positives = 0
duplicate_findings = 0
seen_ids = set()
for finding in findings:
ref_id = self._match_reference(finding, references)
if ref_id is None:
false_positives += 1
continue
if ref_id in seen_ids:
duplicate_findings += 1
continue
seen_ids.add(ref_id)
matched_reference_ids.append(ref_id)
matched_weight += next(ref.weight for ref in references if ref.rule_id == ref_id)
total_weight = sum(ref.weight for ref in references) or 1.0
weighted_recall = min(matched_weight / total_weight, 1.0)
patch_score = 0.0
if self._current_task.patched_code and patched_code:
patch_score = float(
_normalize_code(patched_code) == _normalize_code(self._current_task.patched_code)
)
raw_score = (
weighted_recall
+ (self._config.patch_bonus_multiplier * patch_score)
- (self._config.false_positive_penalty * false_positives)
- (self._config.duplicate_penalty * duplicate_findings)
- (self._config.hint_penalty * self._hints_used)
)
score = max(0.0, min(raw_score, 1.0))
return TaskEvaluation(
matched_reference_ids=matched_reference_ids,
matched_findings=len(matched_reference_ids),
total_findings=len(references),
false_positives=false_positives,
duplicate_findings=duplicate_findings,
weighted_recall=weighted_recall,
patch_score=patch_score,
score=score,
passed=score >= self._current_task.descriptor.success_threshold,
)
def _match_reference(
self,
finding: ReviewFinding,
references: Iterable[ReferenceFinding],
) -> Optional[str]:
finding_rule = _normalize_text(finding.rule_id)
finding_title = _normalize_text(finding.title)
for reference in references:
if finding_rule and finding_rule == _normalize_text(reference.rule_id):
return reference.rule_id
line_matches = finding.line is not None and finding.line == reference.line
category_matches = finding.category == reference.category
title_matches = finding_title and (
finding_title in _normalize_text(reference.title)
or _normalize_text(reference.title) in finding_title
)
# Keyword match: check if any reference keywords are in the finding text
keyword_match = any(
_normalize_text(kw) in finding_title
for kw in getattr(reference, "keywords", [])
) if finding_title else False
# Relaxed matching: allow matching if the title or keywords match even if the line is missing
if (line_matches and (category_matches or title_matches)) or title_matches or keyword_match:
return reference.rule_id
return None
def _max_steps(self) -> int:
assert self._current_task is not None
return min(
self._current_task.descriptor.max_steps,
self._config.max_steps_per_task,
)
@property
def state(self) -> State:
"""Return the current environment state."""
return self._state
# Compatibility bridge:
# keep the old module path, but route the actual app/runtime through the
# dataset-backed dense-reward benchmark implementation.
try:
from .review_runtime import (
PythonReviewRuntime as _BenchmarkPythonEnvironment,
get_current_state,
get_health_response,
get_metrics_response,
get_tasks_response,
)
except ImportError:
from server.review_runtime import ( # type: ignore
PythonReviewRuntime as _BenchmarkPythonEnvironment,
get_current_state,
get_health_response,
get_metrics_response,
get_tasks_response,
)
_GLOBAL_ENV: Optional[_BenchmarkPythonEnvironment] = None
def get_environment() -> _BenchmarkPythonEnvironment:
"""Return the shared benchmark environment used by the HTTP app."""
global _GLOBAL_ENV
if _GLOBAL_ENV is None:
_GLOBAL_ENV = _BenchmarkPythonEnvironment()
return _GLOBAL_ENV
PythonEnvironment = _BenchmarkPythonEnvironment