Spaces:
Sleeping
Sleeping
| """ | |
| ECHO ULTIMATE β All reward components. | |
| Brier score formula: BS = (p - o)^2 where p = conf/100, o = 1 if correct | |
| brier_reward = 1 - 2*BS β range [-1, 1] | |
| Verification: | |
| conf=100, correct β BS=0 β reward=+1.0 β | |
| conf=0, wrong β BS=0 β reward=+1.0 β | |
| conf=100, wrong β BS=1 β reward=-1.0 β | |
| conf=50, either β BS=0.25 β reward=+0.5 β | |
| """ | |
| import difflib | |
| import logging | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| import numpy as np | |
| import pandas as pd | |
| from config import cfg | |
| from core.metrics import CalibrationReport, compute_report | |
| logger = logging.getLogger(__name__) | |
| _NUM_RE = re.compile(r"-?\d[\d,]*(?:\.\d+)?") | |
| # ββ Number parsing ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _parse_num(text: str) -> Optional[float]: | |
| """Extract first number from text, handling commas and currency symbols.""" | |
| if not text: | |
| return None | |
| cleaned = re.sub(r"[$β¬Β£Β₯,]", "", str(text)) | |
| m = _NUM_RE.search(cleaned) | |
| if m: | |
| try: | |
| return float(m.group().replace(",", "")) | |
| except ValueError: | |
| pass | |
| return None | |
| def _norm_choice(text: str) -> str: | |
| """Normalize a multiple-choice letter: '(A)', 'A.', 'A)' β 'A'.""" | |
| if not text: | |
| return "" | |
| s = text.strip().upper() | |
| m = re.match(r"^\(?([A-Da-d])\)?\.?\s*", s) | |
| if m: | |
| return m.group(1).upper() | |
| return s[0] if s and s[0] in "ABCD" else s | |
| def _fuzzy(a: str, b: str) -> float: | |
| """SequenceMatcher similarity ratio in [0, 1].""" | |
| return difflib.SequenceMatcher(None, a.lower().strip(), b.lower().strip()).ratio() | |
| # ββ Accuracy reward βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def accuracy_reward( | |
| predicted: str, | |
| ground_truth: str, | |
| answer_aliases: list[str], | |
| domain: str, | |
| ) -> float: | |
| """ | |
| Domain-aware accuracy score in [0.0, 1.0]. | |
| - math: numeric tolerance (exact=1.0, Β±1%=0.8, Β±5%=0.5) | |
| - logic: exact letter match after normalization | |
| - factual: alias list + substring matching | |
| - science/medical/coding/creative: fuzzy string matching | |
| """ | |
| if not predicted: | |
| return 0.0 | |
| try: | |
| if domain == "math": | |
| p = _parse_num(predicted) | |
| t = _parse_num(ground_truth) | |
| if p is None or t is None: | |
| return 0.0 | |
| if p == t: | |
| return 1.0 | |
| denom = abs(t) if t != 0 else 1.0 | |
| rel = abs(p - t) / denom | |
| if rel <= 0.01: | |
| return 0.8 | |
| if rel <= 0.05: | |
| return 0.5 | |
| return 0.0 | |
| elif domain == "logic": | |
| return 1.0 if _norm_choice(predicted) == _norm_choice(ground_truth) else 0.0 | |
| elif domain in ("factual",): | |
| aliases = [ground_truth] + (answer_aliases or []) | |
| pred_low = predicted.strip().lower() | |
| for alias in aliases: | |
| if not alias: | |
| continue | |
| al = alias.strip().lower() | |
| if pred_low == al: | |
| return 1.0 | |
| for alias in aliases: | |
| if not alias: | |
| continue | |
| al = alias.strip().lower() | |
| if al in pred_low or pred_low in al: | |
| return 0.5 | |
| return 0.0 | |
| elif domain in ("science", "medical"): | |
| # Multiple choice first | |
| pn = _norm_choice(predicted) | |
| tn = _norm_choice(ground_truth) | |
| if pn in "ABCD" and tn in "ABCD": | |
| return 1.0 if pn == tn else 0.0 | |
| # Fuzzy fallback | |
| score = _fuzzy(predicted, ground_truth) | |
| if score > 0.85: | |
| return 1.0 | |
| if score > 0.65: | |
| return 0.7 | |
| if score > 0.45: | |
| return 0.4 | |
| return 0.0 | |
| elif domain in ("coding", "creative"): | |
| aliases = [ground_truth] + (answer_aliases or []) | |
| for alias in aliases: | |
| if not alias: | |
| continue | |
| score = _fuzzy(predicted, alias) | |
| if score > 0.85: | |
| return 1.0 | |
| if score > 0.65: | |
| return 0.7 | |
| if score > 0.45: | |
| return 0.4 | |
| return 0.0 | |
| else: | |
| return 1.0 if predicted.strip().lower() == ground_truth.strip().lower() else 0.0 | |
| except Exception as exc: | |
| logger.warning("accuracy_reward error: %s", exc) | |
| return 0.0 | |
| # ββ Brier reward ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def brier_reward(confidence: int, was_correct: bool) -> float: | |
| """ | |
| Convert Brier score to reward signal in [-1, 1]. | |
| BS = (p - o)^2 [0 = perfect, 1 = worst] | |
| reward = 1 - 2*BS | |
| """ | |
| p = max(0.0, min(1.0, confidence / 100.0)) | |
| o = 1.0 if was_correct else 0.0 | |
| bs = (p - o) ** 2 # Brier score | |
| return float(1.0 - 2.0 * bs) | |
| # ββ Penalties βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def overconfidence_penalty(confidence: int, was_correct: bool) -> float: | |
| """ | |
| Graduated overconfidence penalty. | |
| conf >= 95 AND wrong β HALLUCINATION_PENALTY (-0.80) | |
| conf >= 80 AND wrong β OVERCONFIDENCE_PENALTY (-0.60) | |
| """ | |
| if was_correct: | |
| return 0.0 | |
| if confidence >= 95: | |
| return float(cfg.HALLUCINATION_PENALTY) | |
| if confidence >= cfg.OVERCONFIDENCE_THRESHOLD: | |
| return float(cfg.OVERCONFIDENCE_PENALTY) | |
| return 0.0 | |
| def underconfidence_penalty(confidence: int, was_correct: bool) -> float: | |
| """Small penalty for falsely humble correct answers.""" | |
| if was_correct and confidence <= cfg.UNDERCONFIDENCE_THRESHOLD: | |
| return float(cfg.UNDERCONFIDENCE_PENALTY) | |
| return 0.0 | |
| # ββ Combined reward βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RewardBreakdown: | |
| """Full reward breakdown for one episode.""" | |
| accuracy_score: float = 0.0 | |
| brier_reward_val: float = 0.0 | |
| overconfidence_penalty_val: float = 0.0 | |
| underconfidence_penalty_val: float = 0.0 | |
| total: float = 0.0 | |
| was_correct: bool = False | |
| breakdown_str: str = "" | |
| def compute_reward( | |
| confidence: int, | |
| predicted: str, | |
| ground_truth: str, | |
| aliases: list[str], | |
| domain: str, | |
| ) -> RewardBreakdown: | |
| """Compute full reward breakdown for one episode.""" | |
| acc = accuracy_reward(predicted, ground_truth, aliases, domain) | |
| was_correct = acc >= 0.5 | |
| br = brier_reward(confidence, was_correct) | |
| oc = overconfidence_penalty(confidence, was_correct) | |
| uc = underconfidence_penalty(confidence, was_correct) | |
| raw = cfg.W_ACCURACY * acc + cfg.W_CALIBRATION * br + oc + uc | |
| total = float(np.clip(raw, cfg.REWARD_CLIP_LOW, cfg.REWARD_CLIP_HIGH)) | |
| icon = "β " if was_correct else "β" | |
| breakdown_str = ( | |
| f"{icon} acc={acc:.2f} brier={br:.2f} " | |
| f"oc_pen={oc:.2f} uc_pen={uc:.2f} β total={total:.3f}" | |
| ) | |
| return RewardBreakdown( | |
| accuracy_score=acc, | |
| brier_reward_val=br, | |
| overconfidence_penalty_val=oc, | |
| underconfidence_penalty_val=uc, | |
| total=total, | |
| was_correct=was_correct, | |
| breakdown_str=breakdown_str, | |
| ) | |
| # ββ RewardHistory βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RewardHistory: | |
| """ | |
| Rolling record of all episode outcomes. | |
| Feeds into calibration metrics and training logs. | |
| """ | |
| def __init__(self) -> None: | |
| self._records: list[dict] = [] | |
| def append( | |
| self, | |
| confidence: int, | |
| was_correct: bool, | |
| domain: str, | |
| difficulty: str, | |
| reward: float, | |
| is_abstention: bool = False, | |
| ) -> None: | |
| self._records.append({ | |
| "confidence": confidence, | |
| "was_correct": was_correct, | |
| "domain": domain, | |
| "difficulty": difficulty, | |
| "reward": reward, | |
| "is_abstention": is_abstention, | |
| }) | |
| def get_calibration_report( | |
| self, domain: Optional[str] = None | |
| ) -> CalibrationReport: | |
| records = self._records | |
| if domain: | |
| records = [r for r in records if r["domain"] == domain] | |
| if not records: | |
| return CalibrationReport(domain=domain) | |
| confs = [r["confidence"] for r in records] | |
| corrs = [r["was_correct"] for r in records] | |
| absts = [r["is_abstention"] for r in records] | |
| return compute_report(confs, corrs, absts, domain=domain) | |
| def get_domain_profiles(self) -> dict[str, CalibrationReport]: | |
| return {d: self.get_calibration_report(domain=d) for d in cfg.DOMAINS} | |
| def get_training_snapshot(self, last_n: int = 100) -> dict: | |
| records = self._records[-last_n:] | |
| if not records: | |
| return { | |
| "ece": 1.0, "accuracy": 0.0, "mean_confidence": 50.0, | |
| "overconfidence_rate": 0.5, "brier_score": 0.25, "mean_reward": 0.0, | |
| } | |
| confs = [r["confidence"] for r in records] | |
| corrs = [r["was_correct"] for r in records] | |
| rewards = [r["reward"] for r in records] | |
| rep = compute_report(confs, corrs) | |
| return { | |
| "ece": rep.ece, | |
| "accuracy": rep.accuracy, | |
| "mean_confidence": rep.mean_confidence, | |
| "overconfidence_rate": rep.overconfidence_rate, | |
| "brier_score": rep.brier_score, | |
| "mean_reward": float(np.mean(rewards)), | |
| } | |
| def to_dataframe(self) -> "pd.DataFrame": | |
| return pd.DataFrame(self._records) | |
| def __len__(self) -> int: | |
| return len(self._records) | |
| def reset(self) -> None: | |
| self._records.clear() | |