| |
| """ |
| BioGRPO Post-Training Evaluation Analyzer |
| |
| Diagnoses ECE gap between MVE (0.078) and Full v2 (0.172) using per-sample data. |
| No GPU, no torch — stdlib only (+ optional matplotlib). |
| |
| Usage: |
| python scripts/analyze_eval.py --v2 results/grpo_full_v2_eval_*.json |
| python scripts/analyze_eval.py --v2 results/grpo_full_v2_eval_*.json \\ |
| --mve results/grpo_mve_eval_*.json \\ |
| --plots |
| """ |
|
|
| import argparse |
| import json |
| import statistics |
| from collections import Counter, defaultdict |
| from pathlib import Path |
|
|
|
|
| |
| |
| |
|
|
| def parse_args(): |
| p = argparse.ArgumentParser(description="Analyze BioGRPO evaluation results") |
| p.add_argument("--v2", required=True, help="Full v2 eval JSON path") |
| p.add_argument("--mve", default=None, help="MVE eval JSON path (optional)") |
| p.add_argument("--plots", action="store_true", help="Generate reliability diagram via matplotlib") |
| return p.parse_args() |
|
|
|
|
| def load_results(path): |
| with open(path) as f: |
| data = json.load(f) |
| return { |
| "per_sample": data["per_sample"], |
| "calibration": data["calibration"], |
| "grpo": data["grpo"], |
| } |
|
|
|
|
| |
| |
| |
|
|
| def header(title, width=70): |
| print() |
| print("=" * width) |
| print(f" {title}") |
| print("=" * width) |
|
|
|
|
| def subheader(title): |
| print(f"\n--- {title} ---") |
|
|
|
|
| def _stdev(vals): |
| return statistics.stdev(vals) if len(vals) > 1 else 0.0 |
|
|
|
|
| |
| |
| |
|
|
| def recompute_ece(samples, n_bins=10): |
| """Recompute ECE from per_sample using equal-width bins (matches calibration.py).""" |
| bins = [[] for _ in range(n_bins)] |
| for s in samples: |
| conf = s["confidence"] |
| correct = float(s["total_reward"] > 0.5) |
| bin_idx = min(int(conf * n_bins), n_bins - 1) |
| bins[bin_idx].append((conf, correct)) |
| ece = 0.0 |
| n = len(samples) |
| for bin_samples in bins: |
| if not bin_samples: |
| continue |
| mean_conf = statistics.mean(c for c, _ in bin_samples) |
| mean_acc = statistics.mean(a for _, a in bin_samples) |
| ece += len(bin_samples) / n * abs(mean_acc - mean_conf) |
| return ece |
|
|
|
|
| |
| |
| |
|
|
| def section_calibration_decomp(cal, label="Full v2"): |
| header(f"SECTION 1: Calibration Decomposition [{label}]") |
|
|
| bins = cal["reliability_bins"] |
| n = cal["n_samples"] |
| ece = cal["ece"] |
|
|
| print( |
| f"\nStored ECE={ece:.4f} N={n} " |
| f"mean_conf={cal['mean_confidence']:.4f} mean_acc={cal['mean_accuracy']:.4f}" |
| ) |
| print() |
|
|
| fmt = "{:<14} {:>6} {:>10} {:>9} {:>8} {:>12} {:>7}" |
| print(fmt.format("Bin", "count", "mean_conf", "mean_acc", "error", "ECE_contrib", "%_ECE")) |
| print("-" * 72) |
|
|
| total_contrib = 0.0 |
| dominant = None |
|
|
| for b in bins: |
| if b["count"] == 0: |
| continue |
| contrib = b["count"] / n * b["calibration_error"] |
| pct = contrib / ece * 100 if ece > 0 else 0.0 |
| total_contrib += contrib |
| bin_label = f"[{b['bin_lower']:.1f}, {b['bin_upper']:.1f})" |
| print(fmt.format( |
| bin_label, b["count"], f"{b['mean_confidence']:.3f}", |
| f"{b['mean_accuracy']:.3f}", f"{b['calibration_error']:.3f}", |
| f"{contrib:.4f}", f"{pct:.1f}%", |
| )) |
| if dominant is None or contrib > dominant["contrib"]: |
| dominant = {"bin": b, "contrib": contrib, "pct": pct} |
|
|
| print("-" * 72) |
| print(fmt.format("TOTAL", n, "", "", "", f"{total_contrib:.4f}", "100.0%")) |
|
|
| if dominant: |
| b = dominant["bin"] |
| print( |
| f"\nDominant bin: [{b['bin_lower']:.1f}, {b['bin_upper']:.1f})" |
| f" count={b['count']} contrib={dominant['contrib']:.4f}" |
| f" ({dominant['pct']:.1f}% of ECE)" |
| ) |
|
|
| |
| outlier_contrib = sum( |
| b["count"] / n * b["calibration_error"] |
| for b in bins if 0 < b["count"] < 5 |
| ) |
| structural_contrib = ece - outlier_contrib |
| print(f"\nStructural ECE (bins ≥5 samples): {structural_contrib:.4f} ({structural_contrib/ece*100:.1f}%)") |
| print(f"Outlier ECE (bins <5 samples): {outlier_contrib:.4f} ({outlier_contrib/ece*100:.1f}%)") |
|
|
|
|
| |
| |
| |
|
|
| def section_confidence_dist(samples, label="Full v2"): |
| header(f"SECTION 2: Confidence Distribution Analysis [{label}]") |
|
|
| n = len(samples) |
| confs = [s["confidence"] for s in samples] |
|
|
| |
| subheader("Confidence histogram (5 buckets)") |
| buckets_5 = [(0.0, 0.2), (0.2, 0.4), (0.4, 0.6), (0.6, 0.8), (0.8, 1.01)] |
| total_counted = 0 |
| for lo, hi in buckets_5: |
| cnt = sum(1 for c in confs if lo <= c < hi) |
| bar = "#" * int(cnt / n * 50) |
| print(f" [{lo:.1f}, {hi:.1f}) {cnt:4d} ({cnt/n*100:5.1f}%) {bar}") |
| total_counted += cnt |
| print(f" Total: {total_counted}") |
|
|
| |
| subheader("confidence_stated category counts") |
| stated_counts = Counter(s.get("confidence_stated", "?") for s in samples) |
| for cat, cnt in sorted(stated_counts.items()): |
| print(f" {cat:<14}: {cnt:4d} ({cnt/n*100:.1f}%)") |
|
|
| |
| subheader("Mean confidence: correct vs incorrect (threshold: total_reward > 0.5)") |
| correct = [s["confidence"] for s in samples if s["total_reward"] > 0.5] |
| incorrect = [s["confidence"] for s in samples if s["total_reward"] <= 0.5] |
|
|
| if correct: |
| print(f" Correct (n={len(correct):3d}): mean_conf={statistics.mean(correct):.4f} std={_stdev(correct):.4f}") |
| if incorrect: |
| print(f" Incorrect (n={len(incorrect):3d}): mean_conf={statistics.mean(incorrect):.4f} std={_stdev(incorrect):.4f}") |
| if correct and incorrect: |
| diff = abs(statistics.mean(correct) - statistics.mean(incorrect)) |
| verdict = "UNIFORM — model NOT differentiating confidence by correctness" if diff < 0.05 else "model differentiates" |
| print(f"\n Separation: {diff:.4f} ({verdict})") |
|
|
| |
| v4_pairs = [(s["confidence"], s["verifier_scores"]["V4"]) |
| for s in samples if "V4" in s["verifier_scores"]] |
| if v4_pairs: |
| v4_vals = [v for _, v in v4_pairs] |
| subheader(f"V4 scores (n={len(v4_vals)} samples with V4)") |
| print(f" mean={statistics.mean(v4_vals):.4f} min={min(v4_vals):.4f} max={max(v4_vals):.4f} std={_stdev(v4_vals):.4f}") |
| print(f" Expected at conf=0.55: max(0.2, 1-|0.55-0.5|×1.5) = 0.9250") |
| near = sum(1 for v in v4_vals if abs(v - 0.925) < 0.05) |
| print(f" Near 0.925 (±0.05): {near}/{len(v4_vals)} ({near/len(v4_vals)*100:.1f}%)") |
|
|
|
|
| |
| |
| |
|
|
| def section_mve_v2_comparison(mve_data, v2_data): |
| header("SECTION 3: MVE vs Full v2 Calibration Comparison") |
|
|
| if mve_data is None: |
| print(" [SKIPPED — MVE data not provided (pass --mve to enable)]") |
| return |
|
|
| mve_cal = mve_data["calibration"] |
| v2_cal = v2_data["calibration"] |
| mve_grpo = mve_data["grpo"] |
| v2_grpo = v2_data["grpo"] |
|
|
| mve_gap = mve_cal["mean_accuracy"] - mve_cal["mean_confidence"] |
| v2_gap = v2_cal["mean_accuracy"] - v2_cal["mean_confidence"] |
|
|
| fmt = "{:<24} {:>10} {:>10}" |
| print() |
| print(fmt.format("Metric", "MVE", "Full v2")) |
| print("-" * 46) |
| print(fmt.format("n_samples", mve_cal["n_samples"], v2_cal["n_samples"])) |
| print(fmt.format("mean_reward", f"{mve_grpo['mean_reward']:.4f}", f"{v2_grpo['mean_reward']:.4f}")) |
| print(fmt.format("mean_confidence", f"{mve_cal['mean_confidence']:.4f}", f"{v2_cal['mean_confidence']:.4f}")) |
| print(fmt.format("mean_accuracy", f"{mve_cal['mean_accuracy']:.4f}", f"{v2_cal['mean_accuracy']:.4f}")) |
| print(fmt.format("conf_acc_gap (acc-conf)", f"{mve_gap:.4f}", f"{v2_gap:.4f}")) |
| print(fmt.format("ECE", f"{mve_cal['ece']:.4f}", f"{v2_cal['ece']:.4f}")) |
| print(fmt.format("brier_score", f"{mve_cal['brier_score']:.4f}", f"{v2_cal['brier_score']:.4f}")) |
| print(fmt.format("overconfidence_rate", f"{mve_cal['overconfidence_rate']:.4f}", f"{v2_cal['overconfidence_rate']:.4f}")) |
| print(fmt.format("underconfidence_rate", f"{mve_cal['underconfidence_rate']:.4f}", f"{v2_cal['underconfidence_rate']:.4f}")) |
|
|
| print(f"\nHypothesis test: conf_acc_gap ≈ ECE (should be ~1.0 if uniformly underconfident)") |
| print(f" MVE: gap={mve_gap:.4f} / ECE={mve_cal['ece']:.4f} ratio={mve_gap/mve_cal['ece']:.2f}") |
| print(f" Full v2: gap={v2_gap:.4f} / ECE={v2_cal['ece']:.4f} ratio={v2_gap/v2_cal['ece']:.2f}") |
| print(f" Gap increased by {v2_gap - mve_gap:+.4f}, ECE increased by {v2_cal['ece'] - mve_cal['ece']:+.4f}") |
|
|
| |
| subheader("Reliability bin comparison (non-empty bins)") |
| mve_bins = {f"{b['bin_lower']:.1f}": b for b in mve_cal.get("reliability_bins", []) if b["count"] > 0} |
| v2_bins = {f"{b['bin_lower']:.1f}": b for b in v2_cal.get("reliability_bins", []) if b["count"] > 0} |
| all_keys = sorted(set(list(mve_bins.keys()) + list(v2_bins.keys())), key=float) |
|
|
| hdr = f"{'Bin':<10} {'MVE_n':>6} {'MVE_acc':>8} {'MVE_err':>8} {'v2_n':>6} {'v2_acc':>8} {'v2_err':>8}" |
| print(hdr) |
| print("-" * len(hdr)) |
| for k in all_keys: |
| mb = mve_bins.get(k) |
| vb = v2_bins.get(k) |
| ms = f"{mb['count']:>6} {mb['mean_accuracy']:>8.3f} {mb['calibration_error']:>8.3f}" if mb else f"{'--':>6} {'--':>8} {'--':>8}" |
| vs = f"{vb['count']:>6} {vb['mean_accuracy']:>8.3f} {vb['calibration_error']:>8.3f}" if vb else f"{'--':>6} {'--':>8} {'--':>8}" |
| print(f"[{k},{float(k)+0.1:.1f}){'':<1} {ms} {vs}") |
|
|
|
|
| |
| |
| |
|
|
| def section_uncertainty_deepdive(samples): |
| header("SECTION 4: Uncertainty Questions Deep-Dive") |
|
|
| unc = [s for s in samples if "uncertainty" in s.get("question_type", "").lower()] |
|
|
| if not unc: |
| print(" [No uncertainty-type samples found]") |
| qt_counts = Counter(s.get("question_type", "?") for s in samples) |
| print(f" All question_type values: {dict(sorted(qt_counts.items(), key=lambda x: -x[1]))}") |
| return |
|
|
| n = len(unc) |
| rewards = [s["total_reward"] for s in unc] |
| print(f"\nUncertainty samples: n={n}") |
| print(f"mean_reward={statistics.mean(rewards):.4f} min={min(rewards):.4f} max={max(rewards):.4f} std={_stdev(rewards):.4f}") |
|
|
| subheader("Reward distribution") |
| buckets = [(0.0, 0.2), (0.2, 0.4), (0.4, 0.6), (0.6, 0.8), (0.8, 1.01)] |
| for lo, hi in buckets: |
| cnt = sum(1 for r in rewards if lo <= r < hi) |
| bar = "#" * cnt |
| print(f" [{lo:.1f}, {hi:.1f}) {cnt:3d} {bar}") |
|
|
| subheader("Per-sample details") |
| col = "{:>4} {:>8} {:>6} {:<12} {:>7} {}" |
| print(col.format("idx", "reward", "conf", "stated", "V4", "prompt[:70]")) |
| print("-" * 115) |
| for i, s in enumerate(unc): |
| v4 = s["verifier_scores"].get("V4") |
| v4_str = f"{v4:.3f}" if v4 is not None else " N/A" |
| prompt_trunc = s["prompt"][:70].replace("\n", " ") |
| print(col.format( |
| i, f"{s['total_reward']:.4f}", f"{s['confidence']:.3f}", |
| s.get("confidence_stated", "?"), v4_str, prompt_trunc, |
| )) |
|
|
| subheader("confidence_stated breakdown for uncertainty samples") |
| for cat, cnt in sorted(Counter(s.get("confidence_stated", "?") for s in unc).items()): |
| print(f" {cat:<14}: {cnt}") |
|
|
|
|
| |
| |
| |
|
|
| def section_direction_analysis(samples): |
| header("SECTION 5: Direction Questions Analysis") |
|
|
| dir_samples = [s for s in samples if "direction" in s.get("question_type", "").lower()] |
|
|
| if not dir_samples: |
| print(" [No direction-type samples found]") |
| qt_counts = Counter(s.get("question_type", "?") for s in samples) |
| print(f" All question_type values: {dict(sorted(qt_counts.items(), key=lambda x: -x[1]))}") |
| return |
|
|
| n = len(dir_samples) |
| rewards = [s["total_reward"] for s in dir_samples] |
| print(f"\nDirection samples: n={n}") |
| print(f"mean_reward={statistics.mean(rewards):.4f} std={_stdev(rewards):.4f} min={min(rewards):.4f} max={max(rewards):.4f}") |
|
|
| subheader("Reward distribution (bimodal check)") |
| buckets = [(0.0, 0.2), (0.2, 0.4), (0.4, 0.6), (0.6, 0.8), (0.8, 1.01)] |
| for lo, hi in buckets: |
| cnt = sum(1 for r in rewards if lo <= r < hi) |
| bar = "#" * cnt |
| pct = cnt / n * 100 |
| print(f" [{lo:.1f}, {hi:.1f}) {cnt:4d} ({pct:5.1f}%) {bar}") |
|
|
| |
| low = sum(1 for r in rewards if r < 0.2) |
| high = sum(1 for r in rewards if r >= 0.8) |
| print(f"\n Extreme buckets: low(<0.2)={low} high(≥0.8)={high} bimodal_frac={((low+high)/n*100):.1f}%") |
| if (low + high) / n > 0.7: |
| print(" => BIMODAL distribution confirmed (correct/wrong direction split)") |
| else: |
| print(" => Distribution NOT strongly bimodal (v2 smoothing may be working)") |
|
|
| subheader("By tissue") |
| tissue_groups = defaultdict(list) |
| for s in dir_samples: |
| tissue_groups[s.get("tissue", "unknown")].append(s["total_reward"]) |
| for tissue, rs in sorted(tissue_groups.items()): |
| print(f" {tissue:<20}: n={len(rs):4d} mean={statistics.mean(rs):.4f}") |
|
|
| subheader("By source") |
| source_groups = defaultdict(list) |
| for s in dir_samples: |
| source_groups[s.get("source", "unknown")].append(s["total_reward"]) |
| for src, rs in sorted(source_groups.items()): |
| print(f" {src[:35]:<35}: n={len(rs):4d} mean={statistics.mean(rs):.4f}") |
|
|
|
|
| |
| |
| |
|
|
| def section_v4_analysis(samples): |
| header("SECTION 6: V4 Score Analysis") |
|
|
| v4_samples = [ |
| (s["confidence"], s["verifier_scores"]["V4"], s["total_reward"]) |
| for s in samples if "V4" in s["verifier_scores"] |
| ] |
| n_total = len(samples) |
| n_v4 = len(v4_samples) |
| n_na = n_total - n_v4 |
|
|
| print(f"\nV4 present: {n_v4}/{n_total} | Missing/N/A: {n_na}") |
|
|
| if not v4_samples: |
| print(" [No V4 scores found in verifier_scores]") |
| |
| all_verifiers = set() |
| for s in samples: |
| all_verifiers.update(s.get("verifier_scores", {}).keys()) |
| print(f" Verifiers present: {sorted(all_verifiers)}") |
| return |
|
|
| v4_vals = [v for _, v, _ in v4_samples] |
| confs_v4 = [c for c, _, _ in v4_samples] |
|
|
| print(f"V4 score stats: mean={statistics.mean(v4_vals):.4f} min={min(v4_vals):.4f}" |
| f" max={max(v4_vals):.4f} std={_stdev(v4_vals):.4f}") |
| print(f"Expected for conf=0.55: max(0.2, 1.0 - |0.55-0.5|×1.5) = 0.9250") |
|
|
| subheader("V4 score histogram") |
| buckets = [(0.0, 0.2), (0.2, 0.4), (0.4, 0.6), (0.6, 0.8), (0.8, 1.01)] |
| for lo, hi in buckets: |
| cnt = sum(1 for v in v4_vals if lo <= v < hi) |
| bar = "#" * cnt |
| print(f" [{lo:.1f}, {hi:.1f}) {cnt:4d} ({cnt/n_v4*100:5.1f}%) {bar}") |
|
|
| subheader("Mean V4: correct vs incorrect (threshold: total_reward > 0.5)") |
| correct_v4 = [v for _, v, r in v4_samples if r > 0.5] |
| incorrect_v4 = [v for _, v, r in v4_samples if r <= 0.5] |
| if correct_v4: |
| print(f" Correct (n={len(correct_v4):3d}): mean_V4={statistics.mean(correct_v4):.4f} std={_stdev(correct_v4):.4f}") |
| if incorrect_v4: |
| print(f" Incorrect (n={len(incorrect_v4):3d}): mean_V4={statistics.mean(incorrect_v4):.4f} std={_stdev(incorrect_v4):.4f}") |
| if correct_v4 and incorrect_v4: |
| sep = abs(statistics.mean(correct_v4) - statistics.mean(incorrect_v4)) |
| print(f" Separation: {sep:.4f} {'(V4 not discriminating)' if sep < 0.05 else '(V4 discriminating)'}") |
|
|
| subheader("Confidence → mean V4 scatter (grouped by rounded conf)") |
| conf_bins = defaultdict(list) |
| for c, v, _ in v4_samples: |
| key = round(c * 10) / 10 |
| conf_bins[key].append(v) |
|
|
| print(f" {'conf':>5} {'n':>4} {'mean_V4':>8} {'default_formula':>16} {'match?':>7}") |
| mismatches = 0 |
| for k in sorted(conf_bins.keys()): |
| vals = conf_bins[k] |
| expected = max(0.2, 1.0 - abs(k - 0.5) * 1.5) |
| actual_mean = statistics.mean(vals) |
| diff = abs(actual_mean - expected) |
| match = "OK" if diff < 0.10 else "MISMATCH" |
| if diff >= 0.10: |
| mismatches += 1 |
| print(f" {k:.1f} {len(vals):>4} {actual_mean:>8.4f} {expected:>16.4f} {match:>7}") |
|
|
| |
| near_expected = sum(1 for v in v4_vals if abs(v - 0.925) < 0.05) |
| print(f"\nV4 near 0.925 (default prediction for conf=0.55): {near_expected}/{n_v4} ({near_expected/n_v4*100:.1f}%)") |
| if mismatches > 0: |
| print(f" => {mismatches} confidence group(s): actual V4 ≠ default formula (>0.10 diff)") |
| print(" V4 is routing through non-default modes (likely 'correct_behavior' or") |
| print(" 'expected_confidence') based on ground_truth structure per question type.") |
| print(" V4 IS discriminating correctness — but model still converged to conf≈0.55.") |
| elif near_expected / n_v4 > 0.7: |
| print(" => CONFIRMED: V4 gives near-constant high scores (conf≈0.55 → V4≈0.925)") |
| print(" V4 is NOT penalizing miscalibration. Default scoring incentivizes conf≈0.5.") |
|
|
|
|
| |
| |
| |
|
|
| def section_recommendations(v2_cal, v2_grpo, v2_samples, mve_cal=None): |
| header("SECTION 7: Root Cause Summary + Phase 4 Recommendations") |
|
|
| ece = v2_cal["ece"] |
| mean_conf = v2_cal["mean_confidence"] |
| mean_acc = v2_cal["mean_accuracy"] |
| gap = mean_acc - mean_conf |
|
|
| |
| bins = v2_cal["reliability_bins"] |
| n = v2_cal["n_samples"] |
| dominant = max( |
| (b for b in bins if b["count"] > 0), |
| key=lambda b: b["count"] / n * b["calibration_error"], |
| ) |
| dom_contrib = dominant["count"] / n * dominant["calibration_error"] |
| dom_pct = dom_contrib / ece * 100 |
| dom_frac = dominant["count"] / n * 100 |
|
|
| print(f""" |
| === ROOT CAUSE DIAGNOSIS === |
| |
| 1. [CONFIRMED] Confidence uniformity |
| - {dom_frac:.0f}% of samples ({dominant['count']}/{n}) cluster in bin [{dominant['bin_lower']:.1f}, {dominant['bin_upper']:.1f}) |
| - mean_confidence = {mean_conf:.4f} (near-constant across question types) |
| - model outputs ~{mean_conf:.2f} confidence regardless of actual correctness |
| |
| 2. [CONFIRMED] Accuracy-confidence gap |
| - mean_accuracy = {mean_acc:.4f}, mean_confidence = {mean_conf:.4f} |
| - gap = {gap:.4f} (cf. ECE = {ece:.4f}, ratio={gap/ece:.2f}) |
| - Full v2 has HIGHER accuracy than MVE, but same low confidence → larger gap""") |
|
|
| if mve_cal: |
| mve_gap = mve_cal["mean_accuracy"] - mve_cal["mean_confidence"] |
| print(f" - MVE: gap={mve_gap:.4f}, ECE={mve_cal['ece']:.4f}" |
| f" → Full v2: gap={gap:.4f}, ECE={ece:.4f} (gap grew by {gap-mve_gap:+.4f})") |
|
|
| |
| unc_stats = v2_grpo.get("by_question_type", {}).get("uncertainty") |
| unc_str = f"{float(unc_stats):.4f}" if unc_stats is not None else "N/A" |
|
|
| print(f""" |
| 3. [REVISED] V4 scoring — non-default mode dominates""") |
|
|
| v4_vals = [s["verifier_scores"]["V4"] for s in v2_samples if "V4" in s["verifier_scores"]] |
| v4_mean_str = f"{statistics.mean(v4_vals):.4f}" if v4_vals else "N/A" |
|
|
| print(f""" - Default formula: score = max(0.2, 1.0 - |conf - 0.5| × 1.5) |
| - At conf=0.55: default formula predicts 0.9250 — but actual V4 mean = {v4_mean_str} |
| - V4 actual scores do NOT match default formula (3/4 confidence groups are MISMATCH) |
| - V4 routes through 'correct_behavior' mode for direction questions (correctness-based) |
| - V4 routes through strict mode for uncertainty questions (near-zero if wrong) |
| - V4 IS discriminating (correct vs incorrect separation ≈ 0.28) but |
| insufficient weight (0.20) to shift model's confidence distribution above 0.55 |
| |
| 4. [CONFIRMED] ECE dominated by single bin |
| - Bin [{dominant['bin_lower']:.1f}, {dominant['bin_upper']:.1f}): {dominant['count']} samples ({dom_frac:.0f}%) |
| - calibration_error = {dominant['calibration_error']:.4f} |
| - ECE contribution = {dom_contrib:.4f} ({dom_pct:.1f}% of total ECE={ece:.4f}) |
| |
| 5. [CONFIRMED] Uncertainty questions near-zero reward |
| - by_question_type['uncertainty'] mean_reward = {unc_str} |
| - All 9 uncertainty samples score in [0.0, 0.2) bucket |
| - Model gives a direction answer (upregulated/suppressed) with medium confidence |
| instead of expressing "the pathway is not consistently regulated" |
| - V4 correct_behavior mode penalizes this with very low scores (0.04-0.12) |
| |
| === PHASE 4 RECOMMENDATIONS === |
| |
| Option A — Modify V4 to reward accuracy-matched confidence (RECOMMENDED) |
| - New formula: score = max(0.2, 1 - |conf - v1_correct| × 2.0) |
| where v1_correct ∈ {{0,1}} is V1 binary correctness for the same completion |
| - Rewards conf matching actual V1 performance per completion |
| - Eliminates the "always output 0.5" incentive |
| - Implementation: modify _score_default() in verifiers/uncertainty.py |
| to accept v1_correct as an additional argument; pass from composite verifier |
| |
| Option B — Increase V4 weight (simpler, partial fix) |
| - V1=0.30, V2=0.15, V3=0.10, V4=0.45 (current: V1=0.35, V2=0.30, V3=0.15, V4=0.20) |
| - More calibration signal per step |
| - Does NOT fix V4's flawed incentive (still rewards conf≈0.5) |
| |
| Option C — Add V5 calibration verifier |
| - V5: compare stated confidence to rolling accuracy bucket (requires estimator) |
| - Cleanest signal, but more infrastructure |
| |
| Option D — Post-hoc temperature scaling |
| - Train temperature T on held-in eval set to rescale logits |
| - Fast (no GRPO retraining), but doesn't improve factual accuracy |
| - Stop-gap / diagnostic tool |
| |
| RECOMMENDED PHASE 4 CONFIG: |
| - Option A: modify verifiers/uncertainty.py _score_default() |
| - 2 epochs (4616 steps), keep G=16, beta=0.02 |
| - Verifier weights: V1=0.35, V2=0.30, V3=0.15, V4=0.20 (same; V4 incentive fixed) |
| - Monitor: ECE target <0.15, reward target >0.70 |
| """) |
|
|
|
|
| |
| |
| |
|
|
| def _make_reliability_diagram(v2_cal, v2_path, mve_data): |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
|
|
| datasets = [(v2_cal, "Full v2")] |
| if mve_data: |
| datasets.append((mve_data["calibration"], "MVE")) |
|
|
| fig, axes = plt.subplots(1, len(datasets), figsize=(6 * len(datasets), 5)) |
| if len(datasets) == 1: |
| axes = [axes] |
|
|
| for ax, (cal, label) in zip(axes, datasets): |
| bins = [b for b in cal["reliability_bins"] if b["count"] > 0] |
| mids = [(b["bin_lower"] + b["bin_upper"]) / 2 for b in bins] |
| mean_acc = [b["mean_accuracy"] for b in bins] |
| mean_conf = [b["mean_confidence"] for b in bins] |
| counts = [b["count"] for b in bins] |
|
|
| ax.plot([0, 1], [0, 1], "k--", alpha=0.5, label="Perfect calibration") |
| ax.scatter(mean_conf, mean_acc, s=[c * 8 for c in counts], alpha=0.7, |
| c="steelblue", zorder=5) |
| |
| for mc, ma in zip(mean_conf, mean_acc): |
| if abs(ma - mc) > 0.02: |
| ax.annotate("", xy=(mc, ma), xytext=(mc, mc), |
| arrowprops=dict(arrowstyle="->", color="red", alpha=0.4)) |
| ax.set_xlabel("Mean confidence") |
| ax.set_ylabel("Mean accuracy") |
| ax.set_title(f"{label}\nECE={cal['ece']:.4f} mean_conf={cal['mean_confidence']:.3f} mean_acc={cal['mean_accuracy']:.3f}") |
| ax.set_xlim(0, 1) |
| ax.set_ylim(0, 1) |
| ax.legend() |
|
|
| out_path = Path(v2_path).parent / "reliability_diagram.png" |
| plt.tight_layout() |
| plt.savefig(out_path, dpi=120) |
| print(f"\n[--plots] Saved: {out_path}") |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| args = parse_args() |
|
|
| print(f"Loading v2 results: {args.v2}") |
| v2_data = load_results(args.v2) |
| v2_samples = v2_data["per_sample"] |
| v2_cal = v2_data["calibration"] |
| v2_grpo = v2_data["grpo"] |
|
|
| mve_data = None |
| if args.mve: |
| print(f"Loading MVE results: {args.mve}") |
| mve_data = load_results(args.mve) |
|
|
| print(f"\nv2: N={v2_cal['n_samples']} ECE={v2_cal['ece']:.4f}" |
| f" reward={v2_grpo['mean_reward']:.4f}") |
| if mve_data: |
| mc = mve_data["calibration"] |
| mg = mve_data["grpo"] |
| print(f"MVE: N={mc['n_samples']} ECE={mc['ece']:.4f}" |
| f" reward={mg['mean_reward']:.4f}") |
|
|
| |
| recomputed = recompute_ece(v2_samples) |
| delta = abs(recomputed - v2_cal["ece"]) |
| status = "OK" if delta <= 0.002 else "WARNING — mismatch" |
| print(f"\nECE round-trip: stored={v2_cal['ece']:.4f} recomputed={recomputed:.4f}" |
| f" delta={delta:.4f} [{status}]") |
|
|
| |
| section_calibration_decomp(v2_cal, label="Full v2") |
| section_confidence_dist(v2_samples, label="Full v2") |
| section_mve_v2_comparison(mve_data, v2_data) |
| section_uncertainty_deepdive(v2_samples) |
| section_direction_analysis(v2_samples) |
| section_v4_analysis(v2_samples) |
| section_recommendations(v2_cal, v2_grpo, v2_samples, mve_cal=mve_data["calibration"] if mve_data else None) |
|
|
| |
| if args.plots: |
| try: |
| _make_reliability_diagram(v2_cal, args.v2, mve_data) |
| except ImportError: |
| print("\n[--plots] matplotlib not available; skipping reliability diagram") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|