echo-ultimate / env /reward.py
Vikaspandey582003's picture
Upload folder using huggingface_hub
acb327b verified
"""
ECHO ULTIMATE β€” All reward components.
Brier score formula: BS = (p - o)^2 where p = conf/100, o = 1 if correct
brier_reward = 1 - 2*BS β†’ range [-1, 1]
Verification:
conf=100, correct β†’ BS=0 β†’ reward=+1.0 βœ…
conf=0, wrong β†’ BS=0 β†’ reward=+1.0 βœ…
conf=100, wrong β†’ BS=1 β†’ reward=-1.0 βœ…
conf=50, either β†’ BS=0.25 β†’ reward=+0.5 βœ…
"""
import difflib
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import pandas as pd
from config import cfg
from core.metrics import CalibrationReport, compute_report
logger = logging.getLogger(__name__)
_NUM_RE = re.compile(r"-?\d[\d,]*(?:\.\d+)?")
# ── Number parsing ────────────────────────────────────────────────────────────
def _parse_num(text: str) -> Optional[float]:
"""Extract first number from text, handling commas and currency symbols."""
if not text:
return None
cleaned = re.sub(r"[$€£Β₯,]", "", str(text))
m = _NUM_RE.search(cleaned)
if m:
try:
return float(m.group().replace(",", ""))
except ValueError:
pass
return None
def _norm_choice(text: str) -> str:
"""Normalize a multiple-choice letter: '(A)', 'A.', 'A)' β†’ 'A'."""
if not text:
return ""
s = text.strip().upper()
m = re.match(r"^\(?([A-Da-d])\)?\.?\s*", s)
if m:
return m.group(1).upper()
return s[0] if s and s[0] in "ABCD" else s
def _fuzzy(a: str, b: str) -> float:
"""SequenceMatcher similarity ratio in [0, 1]."""
return difflib.SequenceMatcher(None, a.lower().strip(), b.lower().strip()).ratio()
# ── Accuracy reward ───────────────────────────────────────────────────────────
def accuracy_reward(
predicted: str,
ground_truth: str,
answer_aliases: list[str],
domain: str,
) -> float:
"""
Domain-aware accuracy score in [0.0, 1.0].
- math: numeric tolerance (exact=1.0, Β±1%=0.8, Β±5%=0.5)
- logic: exact letter match after normalization
- factual: alias list + substring matching
- science/medical/coding/creative: fuzzy string matching
"""
if not predicted:
return 0.0
try:
if domain == "math":
p = _parse_num(predicted)
t = _parse_num(ground_truth)
if p is None or t is None:
return 0.0
if p == t:
return 1.0
denom = abs(t) if t != 0 else 1.0
rel = abs(p - t) / denom
if rel <= 0.01:
return 0.8
if rel <= 0.05:
return 0.5
return 0.0
elif domain == "logic":
return 1.0 if _norm_choice(predicted) == _norm_choice(ground_truth) else 0.0
elif domain in ("factual",):
aliases = [ground_truth] + (answer_aliases or [])
pred_low = predicted.strip().lower()
for alias in aliases:
if not alias:
continue
al = alias.strip().lower()
if pred_low == al:
return 1.0
for alias in aliases:
if not alias:
continue
al = alias.strip().lower()
if al in pred_low or pred_low in al:
return 0.5
return 0.0
elif domain in ("science", "medical"):
# Multiple choice first
pn = _norm_choice(predicted)
tn = _norm_choice(ground_truth)
if pn in "ABCD" and tn in "ABCD":
return 1.0 if pn == tn else 0.0
# Fuzzy fallback
score = _fuzzy(predicted, ground_truth)
if score > 0.85:
return 1.0
if score > 0.65:
return 0.7
if score > 0.45:
return 0.4
return 0.0
elif domain in ("coding", "creative"):
aliases = [ground_truth] + (answer_aliases or [])
for alias in aliases:
if not alias:
continue
score = _fuzzy(predicted, alias)
if score > 0.85:
return 1.0
if score > 0.65:
return 0.7
if score > 0.45:
return 0.4
return 0.0
else:
return 1.0 if predicted.strip().lower() == ground_truth.strip().lower() else 0.0
except Exception as exc:
logger.warning("accuracy_reward error: %s", exc)
return 0.0
# ── Brier reward ──────────────────────────────────────────────────────────────
def brier_reward(confidence: int, was_correct: bool) -> float:
"""
Convert Brier score to reward signal in [-1, 1].
BS = (p - o)^2 [0 = perfect, 1 = worst]
reward = 1 - 2*BS
"""
p = max(0.0, min(1.0, confidence / 100.0))
o = 1.0 if was_correct else 0.0
bs = (p - o) ** 2 # Brier score
return float(1.0 - 2.0 * bs)
# ── Penalties ─────────────────────────────────────────────────────────────────
def overconfidence_penalty(confidence: int, was_correct: bool) -> float:
"""
Graduated overconfidence penalty.
conf >= 95 AND wrong β†’ HALLUCINATION_PENALTY (-0.80)
conf >= 80 AND wrong β†’ OVERCONFIDENCE_PENALTY (-0.60)
"""
if was_correct:
return 0.0
if confidence >= 95:
return float(cfg.HALLUCINATION_PENALTY)
if confidence >= cfg.OVERCONFIDENCE_THRESHOLD:
return float(cfg.OVERCONFIDENCE_PENALTY)
return 0.0
def underconfidence_penalty(confidence: int, was_correct: bool) -> float:
"""Small penalty for falsely humble correct answers."""
if was_correct and confidence <= cfg.UNDERCONFIDENCE_THRESHOLD:
return float(cfg.UNDERCONFIDENCE_PENALTY)
return 0.0
# ── Combined reward ───────────────────────────────────────────────────────────
@dataclass
class RewardBreakdown:
"""Full reward breakdown for one episode."""
accuracy_score: float = 0.0
brier_reward_val: float = 0.0
overconfidence_penalty_val: float = 0.0
underconfidence_penalty_val: float = 0.0
total: float = 0.0
was_correct: bool = False
breakdown_str: str = ""
def compute_reward(
confidence: int,
predicted: str,
ground_truth: str,
aliases: list[str],
domain: str,
) -> RewardBreakdown:
"""Compute full reward breakdown for one episode."""
acc = accuracy_reward(predicted, ground_truth, aliases, domain)
was_correct = acc >= 0.5
br = brier_reward(confidence, was_correct)
oc = overconfidence_penalty(confidence, was_correct)
uc = underconfidence_penalty(confidence, was_correct)
raw = cfg.W_ACCURACY * acc + cfg.W_CALIBRATION * br + oc + uc
total = float(np.clip(raw, cfg.REWARD_CLIP_LOW, cfg.REWARD_CLIP_HIGH))
icon = "βœ…" if was_correct else "❌"
breakdown_str = (
f"{icon} acc={acc:.2f} brier={br:.2f} "
f"oc_pen={oc:.2f} uc_pen={uc:.2f} β†’ total={total:.3f}"
)
return RewardBreakdown(
accuracy_score=acc,
brier_reward_val=br,
overconfidence_penalty_val=oc,
underconfidence_penalty_val=uc,
total=total,
was_correct=was_correct,
breakdown_str=breakdown_str,
)
# ── RewardHistory ─────────────────────────────────────────────────────────────
class RewardHistory:
"""
Rolling record of all episode outcomes.
Feeds into calibration metrics and training logs.
"""
def __init__(self) -> None:
self._records: list[dict] = []
def append(
self,
confidence: int,
was_correct: bool,
domain: str,
difficulty: str,
reward: float,
is_abstention: bool = False,
) -> None:
self._records.append({
"confidence": confidence,
"was_correct": was_correct,
"domain": domain,
"difficulty": difficulty,
"reward": reward,
"is_abstention": is_abstention,
})
def get_calibration_report(
self, domain: Optional[str] = None
) -> CalibrationReport:
records = self._records
if domain:
records = [r for r in records if r["domain"] == domain]
if not records:
return CalibrationReport(domain=domain)
confs = [r["confidence"] for r in records]
corrs = [r["was_correct"] for r in records]
absts = [r["is_abstention"] for r in records]
return compute_report(confs, corrs, absts, domain=domain)
def get_domain_profiles(self) -> dict[str, CalibrationReport]:
return {d: self.get_calibration_report(domain=d) for d in cfg.DOMAINS}
def get_training_snapshot(self, last_n: int = 100) -> dict:
records = self._records[-last_n:]
if not records:
return {
"ece": 1.0, "accuracy": 0.0, "mean_confidence": 50.0,
"overconfidence_rate": 0.5, "brier_score": 0.25, "mean_reward": 0.0,
}
confs = [r["confidence"] for r in records]
corrs = [r["was_correct"] for r in records]
rewards = [r["reward"] for r in records]
rep = compute_report(confs, corrs)
return {
"ece": rep.ece,
"accuracy": rep.accuracy,
"mean_confidence": rep.mean_confidence,
"overconfidence_rate": rep.overconfidence_rate,
"brier_score": rep.brier_score,
"mean_reward": float(np.mean(rewards)),
}
def to_dataframe(self) -> "pd.DataFrame":
return pd.DataFrame(self._records)
def __len__(self) -> int:
return len(self._records)
def reset(self) -> None:
self._records.clear()