| """Level 1 Deceit environment — factual QA, multi-turn, no adversary. |
| |
| Episode structure (max_turns=3): |
| - Each step where is_final=False: agent pays a -0.05 step penalty and gets |
| their own reasoning appended to the next observation's context. |
| - When is_final=True OR step_count >= max_turns: episode ends, full reward |
| (correctness + calibration) is returned. |
| |
| This multi-turn design gives GRPO meaningful trajectory length and teaches the |
| model to "think more when uncertain" — the core Deceit behavior. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import os |
| import pathlib |
| import random |
| import uuid |
| from typing import Optional |
|
|
| from openenv.core.env_server import Environment |
|
|
| from deceit_env.models import DeceitAction, DeceitObservation, DeceitState |
| from deceit_env.server.grader import Grader |
|
|
| _DEFAULT_DATASET = ( |
| pathlib.Path(__file__).parent.parent / "data" / "level1.jsonl" |
| ) |
|
|
| STEP_PENALTY = -0.05 |
| MAX_TURNS = 3 |
|
|
|
|
| def compute_reward( |
| correct: bool, |
| abstain: bool, |
| confidence: float, |
| ) -> tuple[float, float]: |
| """Return (correctness_reward, calibration_reward) per REWARD_DESIGN.md.""" |
| if abstain: |
| return 0.0, 0.0 |
|
|
| correctness = 1.0 if correct else -1.0 |
|
|
| if correct: |
| calibration = 0.3 if confidence > 0.7 else 0.1 |
| else: |
| calibration = -0.3 if confidence > 0.7 else -0.1 |
|
|
| return correctness, calibration |
|
|
|
|
| class DeceitEnvironment(Environment[DeceitAction, DeceitObservation, DeceitState]): |
| """OpenEnv-compliant multi-turn environment for the Deceit project. |
| |
| Level 1: factual QA with no distractors or adversary. |
| Up to max_turns=3 steps per episode. Each non-final step costs a small |
| step penalty and feeds the agent's reasoning back as context. |
| """ |
|
|
| def __init__( |
| self, |
| dataset_path: str | pathlib.Path = _DEFAULT_DATASET, |
| grader: Optional[Grader] = None, |
| seed: Optional[int] = None, |
| ) -> None: |
| super().__init__() |
| self._dataset = self._load_dataset(pathlib.Path(dataset_path)) |
| self._grader = grader or Grader( |
| openai_api_key=os.environ.get("OPENAI_API_KEY") |
| ) |
| self._rng = random.Random(seed) |
| self._state: DeceitState = DeceitState() |
| self._current_question: str = "" |
|
|
| |
| |
| |
|
|
| def reset( |
| self, |
| seed: Optional[int] = None, |
| episode_id: Optional[str] = None, |
| **kwargs, |
| ) -> DeceitObservation: |
| """Pick a random question and initialize a new episode.""" |
| if seed is not None: |
| self._rng = random.Random(seed) |
|
|
| question_row = self._rng.choice(self._dataset) |
| self._current_question = question_row["question"] |
| self._state = DeceitState( |
| episode_id=episode_id or str(uuid.uuid4()), |
| step_count=0, |
| level=1, |
| ground_truth=question_row["ground_truth"], |
| current_question_id=question_row["id"], |
| episode_rewards=[], |
| prior_reasoning=[], |
| max_turns=MAX_TURNS, |
| ) |
| return DeceitObservation( |
| question=self._current_question, |
| context=[], |
| turn_index=0, |
| max_turns=MAX_TURNS, |
| level=1, |
| ) |
|
|
| def step( |
| self, |
| action: DeceitAction, |
| timeout_s: Optional[float] = None, |
| **kwargs, |
| ) -> DeceitObservation: |
| """Process one agent turn. |
| |
| Non-final step: pay step penalty, append reasoning to context, continue. |
| Final step (is_final=True or turn limit reached): compute full reward. |
| """ |
| self._state.step_count += 1 |
| forced_final = self._state.step_count >= self._state.max_turns |
| is_terminal = action.is_final or forced_final |
|
|
| if not is_terminal: |
| |
| self._state.prior_reasoning.append(action.reasoning) |
| self._state.episode_rewards.append(STEP_PENALTY) |
| context = [ |
| f"Your previous reasoning (turn {i + 1}): {r}" |
| for i, r in enumerate(self._state.prior_reasoning) |
| ] |
| return DeceitObservation( |
| question=self._current_question, |
| context=context, |
| turn_index=self._state.step_count, |
| max_turns=self._state.max_turns, |
| level=self._state.level, |
| done=False, |
| reward=STEP_PENALTY, |
| metadata={"step_penalty": STEP_PENALTY, "is_final": False}, |
| ) |
|
|
| |
| if action.abstain: |
| correctness_r, calibration_r = 0.0, 0.0 |
| grader_method = "abstain" |
| correct = False |
| else: |
| result = self._grader.check(action.answer, self._state.ground_truth) |
| correct = result.correct |
| correctness_r, calibration_r = compute_reward( |
| correct=correct, |
| abstain=False, |
| confidence=action.confidence, |
| ) |
| grader_method = result.method |
|
|
| |
| total_reward = correctness_r + calibration_r |
| self._state.episode_rewards.append(total_reward) |
|
|
| return DeceitObservation( |
| question=self._current_question, |
| context=[ |
| f"Your previous reasoning (turn {i + 1}): {r}" |
| for i, r in enumerate(self._state.prior_reasoning) |
| ], |
| turn_index=self._state.step_count, |
| max_turns=self._state.max_turns, |
| level=self._state.level, |
| done=True, |
| reward=total_reward, |
| metadata={ |
| "correctness_reward": correctness_r, |
| "calibration_reward": calibration_r, |
| "grader_method": grader_method, |
| "correct": correct, |
| "is_final": True, |
| "forced_final": forced_final, |
| }, |
| ) |
|
|
| @property |
| def state(self) -> DeceitState: |
| """Return the current internal episode state.""" |
| return self._state |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _load_dataset(path: pathlib.Path) -> list[dict]: |
| if not path.exists(): |
| raise FileNotFoundError( |
| f"Dataset not found at {path}. " |
| "Run scripts/generate_level1_dataset.py first." |
| ) |
| rows = [] |
| with open(path, encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if line: |
| rows.append(json.loads(line)) |
| if not rows: |
| raise ValueError(f"Dataset at {path} is empty.") |
| return rows |
|
|