"""Level 1 Deceit environment — factual QA, multi-turn, no adversary. Episode structure (max_turns=3): - Each step where is_final=False: agent pays a -0.05 step penalty and gets their own reasoning appended to the next observation's context. - When is_final=True OR step_count >= max_turns: episode ends, full reward (correctness + calibration) is returned. This multi-turn design gives GRPO meaningful trajectory length and teaches the model to "think more when uncertain" — the core Deceit behavior. """ from __future__ import annotations import json import os import pathlib import random import uuid from typing import Optional from openenv.core.env_server import Environment from deceit_env.models import DeceitAction, DeceitObservation, DeceitState from deceit_env.server.grader import Grader _DEFAULT_DATASET = ( pathlib.Path(__file__).parent.parent / "data" / "level1.jsonl" ) STEP_PENALTY = -0.05 MAX_TURNS = 3 def compute_reward( correct: bool, abstain: bool, confidence: float, ) -> tuple[float, float]: """Return (correctness_reward, calibration_reward) per REWARD_DESIGN.md.""" if abstain: return 0.0, 0.0 correctness = 1.0 if correct else -1.0 if correct: calibration = 0.3 if confidence > 0.7 else 0.1 else: calibration = -0.3 if confidence > 0.7 else -0.1 return correctness, calibration class DeceitEnvironment(Environment[DeceitAction, DeceitObservation, DeceitState]): """OpenEnv-compliant multi-turn environment for the Deceit project. Level 1: factual QA with no distractors or adversary. Up to max_turns=3 steps per episode. Each non-final step costs a small step penalty and feeds the agent's reasoning back as context. """ def __init__( self, dataset_path: str | pathlib.Path = _DEFAULT_DATASET, grader: Optional[Grader] = None, seed: Optional[int] = None, ) -> None: super().__init__() self._dataset = self._load_dataset(pathlib.Path(dataset_path)) self._grader = grader or Grader( openai_api_key=os.environ.get("OPENAI_API_KEY") ) self._rng = random.Random(seed) self._state: DeceitState = DeceitState() self._current_question: str = "" # ------------------------------------------------------------------ # OpenEnv interface # ------------------------------------------------------------------ def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs, ) -> DeceitObservation: """Pick a random question and initialize a new episode.""" if seed is not None: self._rng = random.Random(seed) question_row = self._rng.choice(self._dataset) self._current_question = question_row["question"] self._state = DeceitState( episode_id=episode_id or str(uuid.uuid4()), step_count=0, level=1, ground_truth=question_row["ground_truth"], current_question_id=question_row["id"], episode_rewards=[], prior_reasoning=[], max_turns=MAX_TURNS, ) return DeceitObservation( question=self._current_question, context=[], turn_index=0, max_turns=MAX_TURNS, level=1, ) def step( self, action: DeceitAction, timeout_s: Optional[float] = None, **kwargs, ) -> DeceitObservation: """Process one agent turn. Non-final step: pay step penalty, append reasoning to context, continue. Final step (is_final=True or turn limit reached): compute full reward. """ self._state.step_count += 1 forced_final = self._state.step_count >= self._state.max_turns is_terminal = action.is_final or forced_final if not is_terminal: # Thinking turn: no grading, just step penalty self._state.prior_reasoning.append(action.reasoning) self._state.episode_rewards.append(STEP_PENALTY) context = [ f"Your previous reasoning (turn {i + 1}): {r}" for i, r in enumerate(self._state.prior_reasoning) ] return DeceitObservation( question=self._current_question, context=context, turn_index=self._state.step_count, max_turns=self._state.max_turns, level=self._state.level, done=False, reward=STEP_PENALTY, metadata={"step_penalty": STEP_PENALTY, "is_final": False}, ) # Terminal turn: grade and compute full reward if action.abstain: correctness_r, calibration_r = 0.0, 0.0 grader_method = "abstain" correct = False else: result = self._grader.check(action.answer, self._state.ground_truth) correct = result.correct correctness_r, calibration_r = compute_reward( correct=correct, abstain=False, confidence=action.confidence, ) grader_method = result.method # Add step penalties already accumulated for non-final turns total_reward = correctness_r + calibration_r self._state.episode_rewards.append(total_reward) return DeceitObservation( question=self._current_question, context=[ f"Your previous reasoning (turn {i + 1}): {r}" for i, r in enumerate(self._state.prior_reasoning) ], turn_index=self._state.step_count, max_turns=self._state.max_turns, level=self._state.level, done=True, reward=total_reward, metadata={ "correctness_reward": correctness_r, "calibration_reward": calibration_r, "grader_method": grader_method, "correct": correct, "is_final": True, "forced_final": forced_final, }, ) @property def state(self) -> DeceitState: """Return the current internal episode state.""" return self._state # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _load_dataset(path: pathlib.Path) -> list[dict]: if not path.exists(): raise FileNotFoundError( f"Dataset not found at {path}. " "Run scripts/generate_level1_dataset.py first." ) rows = [] with open(path, encoding="utf-8") as f: for line in f: line = line.strip() if line: rows.append(json.loads(line)) if not rows: raise ValueError(f"Dataset at {path} is empty.") return rows