File size: 15,929 Bytes
c29f1fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
"""OpenEnv environment implementation for Python code review tasks."""

from __future__ import annotations

from typing import Any, Dict, Optional, Tuple
from uuid import uuid4

from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import EnvironmentMetadata

try:
    from ..graders import grade_task
    from ..graders.shared import component_score, safe_ratio, strict_score
    from ..Models import (
        HistoryEntry,
        PythonCodeReviewAction,
        PythonCodeReviewObservation,
        PythonCodeReviewState,
        RewardDetails,
        TaskGrade,
    )
    from ..tasks import ReviewTask, list_tasks, select_task
except ImportError:
    from graders import grade_task
    from graders.shared import component_score, safe_ratio, strict_score
    from Models import (
        HistoryEntry,
        PythonCodeReviewAction,
        PythonCodeReviewObservation,
        PythonCodeReviewState,
        RewardDetails,
        TaskGrade,
    )
    from tasks import ReviewTask, list_tasks, select_task


def _empty_grade() -> TaskGrade:
    return TaskGrade(
        score=component_score(0.01),
        syntax_score=component_score(0.01),
        tests_passed=0,
        tests_total=0,
        quality_score=component_score(0.01),
        runtime_score=component_score(0.01),
    )


def _reward_value(value: float) -> float:
    return strict_score(value)


class PythonCodeReviewEnvironment(
    Environment[PythonCodeReviewAction, PythonCodeReviewObservation, PythonCodeReviewState]
):
    """Structured environment for deterministic Python code review workflows."""

    SUPPORTS_CONCURRENT_SESSIONS: bool = True

    def __init__(self, verbose: bool = False, **_: Any) -> None:
        super().__init__()
        self.verbose = verbose
        self._task: ReviewTask = list_tasks()[0]
        self._current_code: str = self._task.starter_code
        self._history: list[HistoryEntry] = []
        self._last_reward = RewardDetails(value=0.1, reason="Environment initialized.")
        self._current_grade = _empty_grade()
        self._state = PythonCodeReviewState(episode_id=str(uuid4()), step_count=0)
        self.reset()

    def reset(

        self,

        seed: Optional[int] = None,

        episode_id: Optional[str] = None,

        **kwargs: Any,

    ) -> PythonCodeReviewObservation:
        task_id = kwargs.get("task_id")
        self._task = select_task(seed=seed, task_id=task_id)
        self._current_code = self._task.starter_code
        self._history = []
        self._last_reward = RewardDetails(value=0.1, reason="Environment reset.")
        self._current_grade = grade_task(self._task, self._current_code, include_hidden=False)

        self._state = PythonCodeReviewState(
            episode_id=episode_id or str(uuid4()),
            step_count=0,
            task_id=self._task.task_id,
            difficulty=self._task.difficulty,
            task_kind=self._task.task_kind,
            attempts_remaining=self._task.max_steps,
            current_code=self._current_code,
            errors=self._format_errors(self._current_grade),
            test_results=self._format_test_results(self._current_grade),
            history=[],
            score=self._current_grade.score,
            done=False,
        )
        return self._build_observation(
            grade=self._current_grade,
            status=f"Loaded task {self._task.task_id}.",
            reward_details=self._last_reward,
        )

    def step(

        self,

        action: PythonCodeReviewAction,

        timeout_s: Optional[float] = None,

        **kwargs: Any,

    ) -> PythonCodeReviewObservation:
        observation, _, _, _ = self._step_transition(action, timeout_s=timeout_s, **kwargs)
        return observation

    def step_result(

        self,

        action: PythonCodeReviewAction,

        timeout_s: Optional[float] = None,

        **kwargs: Any,

    ) -> Tuple[PythonCodeReviewObservation, float, bool, Dict[str, Any]]:
        """Gym-style helper used by local scripts and tests."""

        return self._step_transition(action, timeout_s=timeout_s, **kwargs)

    def _step_transition(

        self,

        action: PythonCodeReviewAction,

        timeout_s: Optional[float] = None,

        **kwargs: Any,

    ) -> Tuple[PythonCodeReviewObservation, float, bool, Dict[str, Any]]:
        if self._state.done:
            reward = RewardDetails(
                value=_reward_value(0.05 + 0.25 * self._current_grade.score),
                reason="Episode already finished. Call reset() to continue.",
            )
            observation = self._build_observation(
                grade=self._current_grade,
                status="Episode already finished.",
                reward_details=reward,
            )
            return observation, reward.value, observation.done, {"task_id": observation.task_id, "score": observation.score}

        previous_grade = self._current_grade
        status = ""
        invalid_action = False
        code_changed = False
        use_hidden_grading = False

        if action.action_type == "edit_code":
            if not action.code or not action.code.strip():
                invalid_action = True
                status = "edit_code requires a non-empty code payload."
            else:
                code_changed = action.code != self._current_code
                self._current_code = action.code
                status = "Updated working copy from agent patch."
        elif action.action_type == "submit_solution":
            if action.code is not None and action.code.strip():
                code_changed = action.code != self._current_code
                self._current_code = action.code
            use_hidden_grading = True
            status = "Submission received for final grading."
        elif action.action_type == "run_tests":
            status = "Executed public validation suite."
        elif action.action_type == "analyze_code":
            status = "Generated static review summary."
        else:  # pragma: no cover
            invalid_action = True
            status = f"Unsupported action_type: {action.action_type}"

        self._state.step_count += 1

        if invalid_action:
            current_grade = previous_grade
        else:
            current_grade = grade_task(
                self._task,
                self._current_code,
                include_hidden=use_hidden_grading,
                timeout_s=timeout_s or 3.0,
            )
            if action.action_type == "analyze_code":
                status = self._analysis_status(current_grade)
            elif action.action_type == "run_tests":
                status = self._run_tests_status(current_grade, use_hidden_grading)
            elif action.action_type == "submit_solution":
                status = self._submission_status(current_grade)

        done = use_hidden_grading or self._state.step_count >= self._task.max_steps
        if self._state.step_count >= self._task.max_steps and not use_hidden_grading:
            status = f"{status} Step budget exhausted."

        reward_details = self._compute_reward(
            previous_grade=previous_grade,
            current_grade=current_grade,
            action=action,
            invalid_action=invalid_action,
            timed_out=current_grade.timed_out,
            code_changed=code_changed,
            final_submission=use_hidden_grading,
        )

        self._history.append(
            HistoryEntry(
                step=self._state.step_count,
                action_type=action.action_type,
                status=status,
                reward=reward_details.value,
            )
        )

        self._current_grade = current_grade
        self._last_reward = reward_details
        attempts_remaining = max(self._task.max_steps - self._state.step_count, 0)

        self._state.task_id = self._task.task_id
        self._state.difficulty = self._task.difficulty
        self._state.task_kind = self._task.task_kind
        self._state.attempts_remaining = attempts_remaining
        self._state.current_code = self._current_code
        self._state.errors = self._format_errors(current_grade)
        self._state.test_results = self._format_test_results(current_grade)
        self._state.history = list(self._history)
        self._state.score = current_grade.score
        self._state.done = done

        observation = self._build_observation(
            grade=current_grade,
            status=status,
            reward_details=reward_details,
        )
        return observation, reward_details.value, observation.done, {"task_id": observation.task_id, "score": observation.score}

    @property
    def state(self) -> PythonCodeReviewState:
        return self._state

    def _build_observation(

        self,

        *,

        grade: TaskGrade,

        status: str,

        reward_details: RewardDetails,

    ) -> PythonCodeReviewObservation:
        return PythonCodeReviewObservation(
            task_id=self._task.task_id,
            title=self._task.title,
            difficulty=self._task.difficulty,
            task_kind=self._task.task_kind,
            task_description=self._task.task_description,
            current_code=self._current_code,
            errors=self._format_errors(grade),
            test_results=self._format_test_results(grade),
            visible_tests=list(self._task.visible_tests),
            history=list(self._history),
            attempts_remaining=self._state.attempts_remaining,
            last_action_status=status,
            score=grade.score,
            reward=reward_details.value,
            done=self._state.done,
            reward_details=reward_details,
            metadata={
                "goal": self._task.goal,
                "repo_summary": self._task.repo_summary,
                "changed_files": self._task.changed_files,
                "available_files": self._task.available_files,
                "grade_details": grade.details,
            },
        )

    def _compute_reward(

        self,

        *,

        previous_grade: TaskGrade,

        current_grade: TaskGrade,

        action: PythonCodeReviewAction,

        invalid_action: bool,

        timed_out: bool,

        code_changed: bool,

        final_submission: bool,

    ) -> RewardDetails:
        prev_score = previous_grade.score
        curr_score = current_grade.score
        prev_rate = safe_ratio(previous_grade.tests_passed, previous_grade.tests_total)
        curr_rate = safe_ratio(current_grade.tests_passed, current_grade.tests_total)

        syntax_reward = 0.14 if previous_grade.syntax_score < 0.9 and current_grade.syntax_score >= 0.9 else 0.0
        test_reward = round(max(curr_rate - prev_rate, 0.0) * 0.22, 3)
        progress_delta = round(max(curr_score - prev_score, 0.0) * 0.35, 3)
        quality_bonus = round(max(current_grade.quality_score - previous_grade.quality_score, 0.0) * 0.08, 3)
        correctness_bonus = 0.12 if final_submission and curr_score >= 0.94 and prev_score < 0.94 else 0.0

        invalid_action_penalty = 0.12 if invalid_action else 0.0
        timeout_penalty = 0.14 if timed_out else 0.0
        regression_penalty = round(max(prev_score - curr_score, 0.0) * 0.2, 3)
        stagnation_penalty = 0.06 if action.action_type == "edit_code" and not code_changed else 0.0

        raw_value = (
            0.1
            + 0.45 * curr_score
            + syntax_reward
            + test_reward
            + progress_delta
            + quality_bonus
            + correctness_bonus
            - invalid_action_penalty
            - timeout_penalty
            - regression_penalty
            - stagnation_penalty
        )
        value = _reward_value(raw_value)

        reason_parts = []
        if syntax_reward:
            reason_parts.append("syntax fixed")
        if test_reward:
            reason_parts.append("public test progress")
        if progress_delta:
            reason_parts.append("overall score improved")
        if quality_bonus:
            reason_parts.append("code quality improved")
        if correctness_bonus:
            reason_parts.append("full correctness bonus")
        if invalid_action_penalty:
            reason_parts.append("invalid action penalty")
        if timeout_penalty:
            reason_parts.append("timeout penalty")
        if regression_penalty:
            reason_parts.append("regression penalty")
        if stagnation_penalty:
            reason_parts.append("unchanged patch penalty")
        if not reason_parts:
            reason_parts.append("no meaningful state change")

        return RewardDetails(
            value=value,
            syntax_reward=syntax_reward,
            test_reward=test_reward,
            correctness_bonus=correctness_bonus,
            quality_bonus=quality_bonus,
            progress_delta=progress_delta,
            invalid_action_penalty=invalid_action_penalty,
            timeout_penalty=timeout_penalty,
            regression_penalty=regression_penalty,
            stagnation_penalty=stagnation_penalty,
            reason=", ".join(reason_parts),
            prev_score=prev_score,
            curr_score=curr_score,
            code_changed=code_changed,
        )

    def _format_errors(self, grade: TaskGrade) -> str:
        compile_error = str(grade.details.get("compile_error", "")).strip()
        if compile_error:
            return compile_error
        return "Code parses successfully."

    def _format_test_results(self, grade: TaskGrade) -> str:
        parts = [grade.details.get("test_summary", "No test feedback available.")]
        benchmark = grade.details.get("benchmark")
        if isinstance(benchmark, dict):
            parts.append(
                "Benchmark: "
                f"candidate {benchmark['candidate_seconds']}s vs baseline {benchmark['baseline_seconds']}s "
                f"(x{benchmark['improvement_ratio']})."
            )
        elif isinstance(benchmark, str) and benchmark:
            parts.append(f"Benchmark: {benchmark}")
        return "\n".join(part for part in parts if part)

    def _analysis_status(self, grade: TaskGrade) -> str:
        notes = grade.details.get("quality_notes", [])
        quality_note = notes[0] if notes else "No major static quality issues detected."
        return (
            f"Syntax score {grade.syntax_score:.2f}; "
            f"public tests {grade.tests_passed}/{grade.tests_total}; "
            f"quality {grade.quality_score:.2f}. {quality_note}"
        )

    def _run_tests_status(self, grade: TaskGrade, include_hidden: bool) -> str:
        visibility = "full" if include_hidden else "public"
        return f"Ran {visibility} tests: {grade.tests_passed}/{grade.tests_total} passed."

    def _submission_status(self, grade: TaskGrade) -> str:
        runtime_text = ""
        if isinstance(grade.details.get("benchmark"), dict):
            runtime_text = f" runtime {grade.runtime_score:.2f};"
        return (
            f"Submission graded with score {grade.score:.2f}; "
            f"tests {grade.tests_passed}/{grade.tests_total};"
            f"{runtime_text} quality {grade.quality_score:.2f}."
        )

    def get_metadata(self) -> EnvironmentMetadata:
        return EnvironmentMetadata(
            name="python_code_review_env",
            description="Production-style Python code review environment with deterministic grading.",
            version="1.0.0",
        )