""" ECHO ULTIMATE — OpenEnv-compliant environment. EchoOpenEnv extends BOTH openenv.core.Environment AND gymnasium.Env (via EchoEnv), satisfying the full OpenEnv protocol: reset(seed, episode_id, **kwargs) → EchoObservation step(action: EchoAction, ...) → EchoObservation state → EchoState (property) get_metadata() → EnvironmentMetadata Plus OpenEnv task-listing helpers: info() → environment metadata dict list_tasks() → all TaskSpec dicts get_task(id) → single TaskSpec dict Gymnasium-style callers (server, training) use the _gym_reset / _gym_step helpers which still return (obs_dict, info) / (obs, reward, done, …) tuples. """ from __future__ import annotations from dataclasses import dataclass, asdict from typing import Any, Dict, Optional, List, Tuple try: from openenv.core import Environment try: from openenv.core.env import EnvironmentMetadata except ImportError: EnvironmentMetadata = None except ImportError: # Fallback: plain base class when openenv is not available class Environment: def __init__(self, transform=None, rubric=None, **kwargs): pass EnvironmentMetadata = None from env.echo_env import EchoEnv from env.task_bank import TaskBank from env.reward import RewardHistory from models import EchoAction, EchoObservation, EchoState from core.tasks import TASKS from config import cfg # ── OpenEnv task spec ───────────────────────────────────────────────────────── @dataclass class TaskSpec: id: str name: str description: str pass_threshold: float metric: str n_episodes: int domains: List[str] difficulties: List[str] def to_dict(self) -> dict: return asdict(self) @dataclass class EnvInfo: name: str version: str description: str observation_format: str action_format: str reward_range: Tuple[float, float] domains: List[str] tasks: List[str] def to_dict(self) -> dict: return asdict(self) # ── Main environment ────────────────────────────────────────────────────────── class EchoOpenEnv(Environment[EchoAction, EchoObservation, EchoState], EchoEnv): """ ECHO ULTIMATE: OpenEnv-compliant RL environment for LLM calibration. Extends openenv.core.Environment (OpenEnv protocol) AND EchoEnv (gymnasium.Env). OpenEnv usage — stateless per-request: env = EchoOpenEnv() obs = env.reset() # EchoObservation obs = env.step(EchoAction(response="...")) # EchoObservation s = env.state # EchoState Gymnasium usage — stateful episodes: obs_dict, info = env._gym_reset() obs_dict, r, done, _, info = env._gym_step("72Paris") Training loop: env = EchoOpenEnv(phase=1) for _ in range(n_steps): obs_dict, info = env._gym_reset() prompt = info["formatted_prompt"] response = model.generate(prompt) _, reward, _, _, _ = env._gym_step(response) """ # OpenEnv class attributes SUPPORTS_CONCURRENT_SESSIONS: bool = False OPENENV_PROTOCOL_VERSION: str = "1.0" N_TASKS: int = 3 OBSERVATION_TYPE: str = "dict" ACTION_TYPE: str = "text" def __init__( self, task_id: Optional[str] = None, task_bank: Optional[TaskBank] = None, reward_history: Optional[RewardHistory] = None, phase: int = 1, self_consistency: bool = False, generate_fn=None, render_mode: Optional[str] = None, ) -> None: # Init gymnasium env (EchoEnv sets up task_bank, reward_history, spaces, etc.) EchoEnv.__init__( self, task_bank=task_bank, reward_history=reward_history, phase=phase, self_consistency=self_consistency, generate_fn=generate_fn, render_mode=render_mode, ) # Init openenv.core.Environment (sets transform=None, rubric=None) Environment.__init__(self, transform=None, rubric=None) self._default_task_id = task_id # ── OpenEnv abstract method: reset ──────────────────────────────────────── def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs, ) -> EchoObservation: """ OpenEnv reset — returns EchoObservation. Accepts kwargs: options={"task_id": "task_hard"} or task_id="task_easy". """ options = kwargs.get("options") task_id = kwargs.get("task_id") or self._default_task_id if options is None and task_id: options = {"task_id": task_id} obs_dict, _ = EchoEnv.reset(self, seed=seed, options=options) return self._obs_from_dict(obs_dict, done=False) # ── OpenEnv abstract method: step ───────────────────────────────────────── def step( self, action: EchoAction | str, timeout_s: Optional[float] = None, **kwargs, ) -> EchoObservation: """OpenEnv step — accepts EchoAction or raw string, returns EchoObservation.""" response = action.response if isinstance(action, EchoAction) else str(action) obs_dict, reward, terminated, truncated, info = EchoEnv.step(self, response) return self._obs_from_step(obs_dict, reward, terminated or truncated, info) # ── OpenEnv abstract property: state ────────────────────────────────────── @property def state(self) -> EchoState: """OpenEnv state property — returns full EchoState snapshot.""" task = self._current_task or {} snap = self.reward_history.get_training_snapshot(last_n=100) profiles = self.reward_history.get_domain_profiles() return EchoState( current_question=task.get("question", ""), domain=task.get("domain", ""), difficulty=task.get("difficulty", ""), phase=self.phase, step_count=self._episode_step, total_reward=self._episode_reward, domain_stats={ d: {"ece": round(p.ece, 3), "accuracy": round(p.accuracy, 3)} for d, p in profiles.items() if p.n_samples > 0 }, ) # ── OpenEnv metadata ────────────────────────────────────────────────────── def get_metadata(self): """OpenEnv environment metadata.""" if EnvironmentMetadata is not None: return EnvironmentMetadata( name="ECHO-ULTIMATE", version="2.0.0", description=( "RL environment for LLM metacognitive calibration. " "Trains models to accurately predict their own probability of " "being correct across 7 domains via GRPO with Brier-score rewards." ), ) return { "name": "ECHO-ULTIMATE", "version": "2.0.0", "description": "OpenEnv RL environment for LLM metacognitive calibration.", } # ── Gymnasium-compatible helpers (for server + training) ────────────────── def _gym_reset( self, seed: Optional[int] = None, options: Optional[dict] = None, ) -> Tuple[dict, dict]: """Gymnasium-style reset returning (obs_dict, info) tuple.""" if options is None and self._default_task_id: options = {"task_id": self._default_task_id} return EchoEnv.reset(self, seed=seed, options=options) def _gym_step(self, response: str) -> Tuple[dict, float, bool, bool, dict]: """Gymnasium-style step returning (obs, reward, terminated, truncated, info).""" return EchoEnv.step(self, response) # ── Task-listing helpers (OpenEnv task bank protocol) ───────────────────── def info(self) -> dict: """Return environment metadata dict.""" return EnvInfo( name="ECHO-ULTIMATE", version="2.0.0", description=( "RL environment for LLM metacognitive calibration. " "Teaches models to accurately predict their own probability of being correct " "across 7 domains via GRPO with Brier-score calibration rewards." ), observation_format=( "EchoObservation: {question, domain, difficulty, reward, done, " "ece, accuracy, confidence, brier_score, is_correct, feedback}" ), action_format="EchoAction: {response='NTEXT'}", reward_range=(cfg.REWARD_CLIP_LOW, cfg.REWARD_CLIP_HIGH), domains=cfg.DOMAINS, tasks=[t.id for t in TASKS], ).to_dict() def list_tasks(self) -> List[dict]: """Return all task specifications.""" return [ TaskSpec( id=t.id, name=t.name, description=t.description, pass_threshold=t.pass_threshold, metric=t.metric, n_episodes=t.n_episodes, domains=cfg.DOMAINS, difficulties=cfg.DIFFICULTIES, ).to_dict() for t in TASKS ] def get_task(self, task_id: str) -> Optional[dict]: """Return a single task spec by ID.""" for t in TASKS: if t.id == task_id: return TaskSpec( id=t.id, name=t.name, description=t.description, pass_threshold=t.pass_threshold, metric=t.metric, n_episodes=t.n_episodes, domains=cfg.DOMAINS, difficulties=cfg.DIFFICULTIES, ).to_dict() return None # ── Evaluation helper ───────────────────────────────────────────────────── def evaluate( self, n_episodes: int = 30, task_id: Optional[str] = None, ) -> dict: """Run n_episodes and return OpenEnv-style evaluation results.""" rewards = [] for _ in range(n_episodes): obs_dict, info = self._gym_reset( options={"task_id": task_id} if task_id else None ) placeholder = "50unknown" _, reward, _, _, _ = self._gym_step(placeholder) rewards.append(reward) metrics = self.get_metrics() task_spec = self.get_task(task_id) if task_id else None threshold = task_spec["pass_threshold"] if task_spec else 0.5 score = max(0.0, 1.0 - metrics.ece) * min(1.0, metrics.accuracy / 0.55) return { "n_episodes": n_episodes, "ece": round(metrics.ece, 4), "accuracy": round(metrics.accuracy, 4), "brier_score": round(metrics.brier, 4), "overconfidence_rate": round(metrics.overconfidence_rate, 4), "mean_reward": round(sum(rewards) / len(rewards), 4), "score": round(score, 4), "pass_threshold": threshold, "passed": score >= threshold, } # ── Internal helpers ────────────────────────────────────────────────────── def _obs_from_dict(self, obs_dict: dict, done: bool = False) -> EchoObservation: """Convert _build_obs() dict → EchoObservation (after reset).""" task = self._current_task or {} return EchoObservation( question=task.get("question", obs_dict.get("question", "")), domain=obs_dict.get("domain", ""), difficulty=obs_dict.get("difficulty", ""), ece=float(obs_dict.get("running_ece", 0.0)), accuracy=float(obs_dict.get("running_accuracy", 0.0)), confidence=int(obs_dict.get("running_mean_confidence", 50)), done=done, ) def _obs_from_step( self, obs_dict: dict, reward: float, done: bool, info: dict, ) -> EchoObservation: """Convert step() outputs → EchoObservation.""" return EchoObservation( question=(self._current_task or {}).get("question", ""), domain=info.get("domain", obs_dict.get("domain", "")), difficulty=info.get("difficulty", obs_dict.get("difficulty", "")), reward=float(reward), done=done, ece=float(obs_dict.get("running_ece", 0.0)), accuracy=float(info.get("accuracy", 0.0)), confidence=int(info.get("parsed_confidence", 50)), brier_score=float(info.get("brier_reward", 0.0)), is_correct=bool(info.get("was_correct", False)), feedback=info.get("breakdown", ""), ) # ── Convenience factory ─────────────────────────────────────────────────────── def make_echo_env( task_id: Optional[str] = None, phase: int = 1, **kwargs, ) -> EchoOpenEnv: """Factory function for creating an ECHO OpenEnv environment.""" return EchoOpenEnv(task_id=task_id, phase=phase, **kwargs)