File size: 16,681 Bytes
d25ab77
 
 
 
 
 
 
 
 
 
a9df702
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a9df702
d25ab77
 
 
 
 
 
 
 
 
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b577709
d25ab77
 
 
 
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b577709
d25ab77
 
 
 
 
 
 
 
b577709
d25ab77
 
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b577709
d25ab77
b577709
d25ab77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83bfb8f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d25ab77
83bfb8f
d25ab77
 
83bfb8f
 
 
d25ab77
 
83bfb8f
d25ab77
 
 
83bfb8f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""Python code-review environment implementation."""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Dict, Iterable, List, Optional
from uuid import uuid4

from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State

try:
    from ..models import (
        Difficulty,
        PythonAction,
        PythonEnvConfig,
        PythonObservation,
        PythonState,
        ReviewFinding,
        TaskDescriptor,
        TaskEvaluation,
        TaskMetadata,
    )
except ImportError:
    from models import (  # type: ignore
        Difficulty,
        PythonAction,
        PythonEnvConfig,
        PythonObservation,
        PythonState,
        ReviewFinding,
        TaskDescriptor,
        TaskEvaluation,
        TaskMetadata,
    )


@dataclass(frozen=True)
class ReferenceFinding:
    """Hidden finding metadata used for deterministic grading."""

    rule_id: str
    title: str
    line: int
    category: str
    severity: str
    rationale: str
    recommendation: str
    weight: float
    keywords: List[str] = field(default_factory=list)


@dataclass(frozen=True)
class ReviewTask:
    """A visible task plus its hidden grading references."""

    descriptor: TaskDescriptor
    references: tuple[ReferenceFinding, ...]
    hint: str
    patched_code: Optional[str] = None


TASK_BANK: Dict[str, ReviewTask] = {
    "py-review-easy": ReviewTask(
        descriptor=TaskDescriptor(
            task_id="py-review-easy",
            difficulty="easy",
            title="Mutable default argument",
            objective="Find the correctness issue and explain a safe fix.",
            code=(
                "def add_tag(tag, tags=[]):\n"
                "    tags.append(tag)\n"
                "    return tags\n"
            ),
            max_steps=4,
            success_threshold=0.7,
        ),
        references=(
            ReferenceFinding(
                rule_id="mutable-default",
                title="Mutable default list is shared across calls",
                line=1,
                category="bug",
                severity="warning",
                rationale="The list persists between calls and leaks state.",
                recommendation="Use None as the default and create a new list inside the function.",
                weight=1.0,
                keywords=["mutable", "default", "list", "shared", "persists", "leaks"],
            ),
        ),
        hint="Look for state that survives between separate function calls.",
        patched_code=(
            "def add_tag(tag, tags=None):\n"
            "    if tags is None:\n"
            "        tags = []\n"
            "    tags.append(tag)\n"
            "    return tags\n"
        ),
    ),
    "py-review-medium": ReviewTask(
        descriptor=TaskDescriptor(
            task_id="py-review-medium",
            difficulty="medium",
            title="Unsafe shell invocation",
            objective="Review the snippet for security-sensitive behavior.",
            code=(
                "import os\n\n"
                "def run_backup(path):\n"
                "    os.system(f\"tar -czf backup.tgz {path}\")\n"
            ),
            max_steps=4,
            success_threshold=0.72,
        ),
        references=(
            ReferenceFinding(
                rule_id="shell-injection",
                title="User input is interpolated into a shell command",
                line=4,
                category="security",
                severity="critical",
                rationale="An attacker can inject shell metacharacters through the path argument.",
                recommendation="Use subprocess with an argument list instead of os.system.",
                weight=1.0,
                keywords=["shell", "injection", "os.system", "subprocess", "input", "unsantized", "escaping"],
            ),
        ),
        hint="Check how external commands are invoked and whether user input is escaped.",
        patched_code=(
            "import subprocess\n\n"
            "def run_backup(path):\n"
            "    subprocess.run([\"tar\", \"-czf\", \"backup.tgz\", path], check=True)\n"
        ),
    ),
    "py-review-hard": ReviewTask(
        descriptor=TaskDescriptor(
            task_id="py-review-hard",
            difficulty="hard",
            title="Retry helper hides failures",
            objective="Identify correctness and maintainability issues in the retry logic.",
            code=(
                "import time\n\n"
                "def fetch_with_retry(client, url, retries=3):\n"
                "    last_error = None\n"
                "    for _ in range(retries):\n"
                "        try:\n"
                "            return client.get(url, timeout=1)\n"
                "        except Exception as exc:\n"
                "            last_error = exc\n"
                "            time.sleep(0.1)\n"
                "    return None\n"
            ),
            max_steps=4,
            success_threshold=0.74,
        ),
        references=(
            ReferenceFinding(
                rule_id="swallowed-error",
                title="Function swallows the final exception and returns None",
                line=10,
                category="bug",
                severity="warning",
                rationale="Callers cannot distinguish a failed request from a valid None result.",
                recommendation="Re-raise the last exception after retries are exhausted.",
                weight=0.65,
                keywords=["swallowed", "exception", "return none", "error handling"],
            ),
            ReferenceFinding(
                rule_id="broad-except",
                title="Broad exception handler catches unexpected failures",
                line=7,
                category="maintainability",
                severity="info",
                rationale="Catching Exception masks programming errors and interrupts.",
                recommendation="Catch only the client or network exceptions you expect to retry.",
                weight=0.35,
                keywords=["broad", "except", "catch exception"],
            ),
        ),
        hint="Consider what happens to the final error after the retry loop finishes.",
        patched_code=(
            "import time\n\n"
            "def fetch_with_retry(client, url, retries=3):\n"
            "    last_error = None\n"
            "    for _ in range(retries):\n"
            "        try:\n"
            "            return client.get(url, timeout=1)\n"
            "        except client.retryable_exceptions as exc:\n"
            "            last_error = exc\n"
            "            time.sleep(0.1)\n"
            "    if last_error is not None:\n"
            "        raise last_error\n"
        ),
    ),
}


def _utc_now() -> str:
    return datetime.now(UTC).isoformat()


def _normalize_text(value: Optional[str]) -> str:
    return " ".join((value or "").strip().lower().split())


def _normalize_code(value: Optional[str]) -> str:
    return "\n".join(line.rstrip() for line in (value or "").strip().splitlines())


class PythonEnvironment(Environment[PythonAction, PythonObservation, State]):
    """Deterministic benchmark environment for Python code review tasks."""

    SUPPORTS_CONCURRENT_SESSIONS: bool = True

    def __init__(self, config: Optional[PythonEnvConfig] = None):
        super().__init__()
        self._config = config or PythonEnvConfig()
        self._state = State(episode_id=str(uuid4()), step_count=0)
        self._task_cursor = -1
        self._current_task: Optional[ReviewTask] = None
        self._submitted_findings: List[ReviewFinding] = []
        self._hints_used = 0
        self._created_at = _utc_now()

    def reset(
        self,
        seed: Optional[int] = None,
        episode_id: Optional[str] = None,
        **kwargs,
    ) -> PythonObservation:
        """Start the next configured review task."""

        del seed, kwargs
        self._task_cursor = (self._task_cursor + 1) % len(self._config.task_order)
        task_id = self._config.task_order[self._task_cursor]
        self._current_task = TASK_BANK.get(task_id, TASK_BANK["py-review-easy"])
        self._state = State(
            episode_id=episode_id or str(uuid4()),
            step_count=0,
        )
        self._submitted_findings = []
        self._hints_used = 0
        self._created_at = _utc_now()
        return self._build_observation(
            feedback="New review task loaded. Submit findings or request a hint.",
            reward=0.0,
            done=False,
        )

    def step(
        self,
        action: PythonAction,
        timeout_s: Optional[float] = None,
        **kwargs,
    ) -> PythonObservation:
        """Process one review action and return updated feedback."""

        del timeout_s, kwargs
        if self._current_task is None:
            return self.reset()

        self._state.step_count += 1
        operation = action.operation
        feedback = ""
        reward = 0.0
        done = False

        if operation == "request_hint":
            self._hints_used += 1
            feedback = self._current_task.hint
            evaluation = self._evaluate(self._submitted_findings, action.patched_code)
            reward = evaluation.score
        else:
            if action.findings:
                self._submitted_findings.extend(action.findings)
            evaluation = self._evaluate(self._submitted_findings, action.patched_code)
            reward = evaluation.score
            if operation == "finalize":
                done = True
                feedback = (
                    "Review finalized. "
                    f"Matched {evaluation.matched_findings}/{evaluation.total_findings} "
                    "reference findings."
                )
            else:
                feedback = (
                    f"Progress saved. Matched {evaluation.matched_findings}/"
                    f"{evaluation.total_findings} findings with score {evaluation.score:.2f}."
                )

        if self._state.step_count >= self._max_steps():
            done = True
            if operation != "finalize":
                feedback = (
                    f"{feedback} Maximum steps reached."
                    if feedback
                    else "Maximum steps reached."
                )

        return self._build_observation(
            feedback=feedback,
            reward=reward,
            done=done,
            patched_code=action.patched_code,
        )

    def _build_observation(
        self,
        *,
        feedback: str,
        reward: float,
        done: bool,
        patched_code: Optional[str] = None,
    ) -> PythonObservation:
        assert self._current_task is not None
        evaluation = self._evaluate(self._submitted_findings, patched_code)
        attempts_remaining = max(
            self._max_steps() - self._state.step_count,
            0,
        )
        return PythonObservation(
            task=self._current_task.descriptor,
            feedback=feedback,
            submitted_findings=list(self._submitted_findings),
            hints_used=self._hints_used,
            attempts_remaining=attempts_remaining,
            evaluation=evaluation,
            score=evaluation.score,
            review_time_ms=float(self._state.step_count * 125),
            done=done,
            reward=reward,
            metadata={
                "episode_id": self._state.episode_id,
                "created_at": self._created_at,
                "updated_at": _utc_now(),
            },
        )

    def _evaluate(
        self,
        findings: Iterable[ReviewFinding],
        patched_code: Optional[str],
    ) -> TaskEvaluation:
        assert self._current_task is not None

        references = self._current_task.references
        matched_reference_ids: List[str] = []
        matched_weight = 0.0
        false_positives = 0
        duplicate_findings = 0

        seen_ids = set()
        for finding in findings:
            ref_id = self._match_reference(finding, references)
            if ref_id is None:
                false_positives += 1
                continue
            if ref_id in seen_ids:
                duplicate_findings += 1
                continue
            seen_ids.add(ref_id)
            matched_reference_ids.append(ref_id)
            matched_weight += next(ref.weight for ref in references if ref.rule_id == ref_id)

        total_weight = sum(ref.weight for ref in references) or 1.0
        weighted_recall = min(matched_weight / total_weight, 1.0)

        patch_score = 0.0
        if self._current_task.patched_code and patched_code:
            patch_score = float(
                _normalize_code(patched_code) == _normalize_code(self._current_task.patched_code)
            )

        raw_score = (
            weighted_recall
            + (self._config.patch_bonus_multiplier * patch_score)
            - (self._config.false_positive_penalty * false_positives)
            - (self._config.duplicate_penalty * duplicate_findings)
            - (self._config.hint_penalty * self._hints_used)
        )
        score = max(0.0, min(raw_score, 1.0))

        return TaskEvaluation(
            matched_reference_ids=matched_reference_ids,
            matched_findings=len(matched_reference_ids),
            total_findings=len(references),
            false_positives=false_positives,
            duplicate_findings=duplicate_findings,
            weighted_recall=weighted_recall,
            patch_score=patch_score,
            score=score,
            passed=score >= self._current_task.descriptor.success_threshold,
        )

    def _match_reference(
        self,
        finding: ReviewFinding,
        references: Iterable[ReferenceFinding],
    ) -> Optional[str]:
        finding_rule = _normalize_text(finding.rule_id)
        finding_title = _normalize_text(finding.title)
        for reference in references:
            if finding_rule and finding_rule == _normalize_text(reference.rule_id):
                return reference.rule_id
            line_matches = finding.line is not None and finding.line == reference.line
            category_matches = finding.category == reference.category
            title_matches = finding_title and (
                finding_title in _normalize_text(reference.title)
                or _normalize_text(reference.title) in finding_title
            )
            
            # Keyword match: check if any reference keywords are in the finding text
            keyword_match = any(
                _normalize_text(kw) in finding_title
                for kw in getattr(reference, "keywords", [])
            ) if finding_title else False

            # Relaxed matching: allow matching if the title or keywords match even if the line is missing
            if (line_matches and (category_matches or title_matches)) or title_matches or keyword_match:
                return reference.rule_id
        return None

    def _max_steps(self) -> int:
        assert self._current_task is not None
        return min(
            self._current_task.descriptor.max_steps,
            self._config.max_steps_per_task,
        )

    @property
    def state(self) -> State:
        """Return the current environment state."""

        return self._state


# Compatibility bridge:
# keep the old module path, but route the actual app/runtime through the
# dataset-backed dense-reward benchmark implementation.
try:
    from .review_runtime import (
        PythonReviewRuntime as _BenchmarkPythonEnvironment,
        get_current_state,
        get_health_response,
        get_metrics_response,
        get_tasks_response,
    )
except ImportError:
    from server.review_runtime import (  # type: ignore
        PythonReviewRuntime as _BenchmarkPythonEnvironment,
        get_current_state,
        get_health_response,
        get_metrics_response,
        get_tasks_response,
    )


_GLOBAL_ENV: Optional[_BenchmarkPythonEnvironment] = None


def get_environment() -> _BenchmarkPythonEnvironment:
    """Return the shared benchmark environment used by the HTTP app."""

    global _GLOBAL_ENV
    if _GLOBAL_ENV is None:
        _GLOBAL_ENV = _BenchmarkPythonEnvironment()
    return _GLOBAL_ENV


PythonEnvironment = _BenchmarkPythonEnvironment