"""Offline aggregator: reads turns.jsonl, evals.jsonl, ratings.jsonl and prints per-persona metrics. Run: python -m backend.evals.aggregate """ import argparse import json import statistics import sys from collections import defaultdict from pathlib import Path from backend.config.settings import settings # Mean pairwise cosine distance below this means the picker showed near-paraphrases. _DIVERSITY_FLOOR = 0.10 def _load(path: Path) -> list[dict]: if not path.exists(): return [] out = [] skipped = 0 with open(path, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: out.append(json.loads(line)) except json.JSONDecodeError: skipped += 1 if skipped: print( f"[aggregate] skipped {skipped} malformed lines in {path}", file=sys.stderr, ) return out def _quantile(values: list[float], q: float) -> float: if not values: return 0.0 if len(values) == 1: return values[0] idx = max(0, min(len(values) - 1, int(round(q * (len(values) - 1))))) return sorted(values)[idx] def _fmt_ms(s: float) -> str: return f"{s * 1000:.0f}ms" def report_latency(turns: list[dict]) -> None: print("\n=== Communication Efficiency (latency) ===") by_group: dict[tuple[str, str], list[float]] = defaultdict(list) for t in turns: key = (t.get("user_id", "?"), t.get("llm_tier", "?")) by_group[key].append(t.get("latency", {}).get("t_total", 0.0)) slo = settings.slo_target_s print(f"SLO target: < {slo}s") print( f"{'user_id':<18} {'tier':<10} {'n':>5} {'p50':>8} {'p95':>8} {'p99':>8} {'pass%':>7}" ) for (uid, tier), lats in sorted(by_group.items()): if not lats: continue p50 = _quantile(lats, 0.5) p95 = _quantile(lats, 0.95) p99 = _quantile(lats, 0.99) passed = sum(1 for x in lats if x < slo) / len(lats) * 100 print( f"{uid:<18} {tier:<10} {len(lats):>5} " f"{_fmt_ms(p50):>8} {_fmt_ms(p95):>8} {_fmt_ms(p99):>8} {passed:>6.1f}%" ) def report_faithfulness(evals: list[dict]) -> None: print("\n=== Factual Faithfulness ===") scored = [e for e in evals if not e.get("no_evidence")] if not scored: print("(no turns with retrieved evidence)") return by_user: dict[str, list[dict]] = defaultdict(list) for e in scored: by_user[e.get("user_id", "?")].append(e) print(f"{'user_id':<18} {'n':>5} {'groundedness':>14} {'hallucination':>14}") for uid, rows in sorted(by_user.items()): g = statistics.mean(r["groundedness"] for r in rows) h = statistics.mean(r["hallucination_rate"] for r in rows) print(f"{uid:<18} {len(rows):>5} {g:>13.2%} {h:>13.2%}") def _mean_nonzero(rows: list[dict], key: str) -> tuple[float, float]: # Coverage % undercounts real zeros (a genuinely 0.0-aligned response looks # identical to one where the signal was absent). Fixable by serializing # null for absent signals in compute_multimodal_alignment. vals = [float(r.get(key, 0.0)) for r in rows] nonzero = [v for v in vals if v > 0] if not nonzero: return 0.0, 0.0 return statistics.mean(nonzero), len(nonzero) / len(vals) def _fmt_mean_cov(rows: list[dict], key: str) -> str: mean, cov = _mean_nonzero(rows, key) return f"{mean:>5.0%}|{cov:>5.0%}" def report_multimodal(evals: list[dict]) -> None: print("\n=== Multimodal Alignment (mean among non-zero | coverage) ===") if not evals: print("(no evals logged)") return by_user: dict[str, list[dict]] = defaultdict(list) for e in evals: by_user[e.get("user_id", "?")].append(e) print(f"{'user_id':<18} {'n':>5} {'affect':>16} {'gesture':>16} {'gaze':>16}") for uid, rows in sorted(by_user.items()): print( f"{uid:<18} {len(rows):>5} " f"{_fmt_mean_cov(rows, 'affect_alignment'):>16} " f"{_fmt_mean_cov(rows, 'gesture_alignment'):>16} " f"{_fmt_mean_cov(rows, 'gaze_alignment'):>16}" ) def report_authenticity(ratings: list[dict]) -> None: print("\n=== Perceived Authenticity (Likert 1-5) ===") by_user: dict[str, list[int]] = defaultdict(list) for r in ratings: raw = r.get("authenticity") try: score = int(raw) except (TypeError, ValueError): continue if not 1 <= score <= 5: continue by_user[r.get("user_id", "?")].append(score) if not by_user: print("(no valid ratings logged yet)") return print(f"{'user_id':<18} {'n':>5} {'mean':>6} {'dist (1..5)':>22}") for uid, scores in sorted(by_user.items()): mean = statistics.mean(scores) dist = [scores.count(i) for i in range(1, 6)] dist_str = "/".join(str(x) for x in dist) print(f"{uid:<18} {len(scores):>5} {mean:>6.2f} {dist_str:>22}") def report_picker(turns: list[dict], picks: list[dict], evals: list[dict]) -> None: """Picker behaviour: pick rate, regenerate rate, strategy win rate, and whether the user's pick beat candidate 0 on grounded/relevance. Sources: - turns.jsonl one row per turn, includes `candidates` and `n_candidates` - picks.jsonl one row per /chat/pick — strategy, picked_idx, run_id - evals.jsonl candidates_eval[] with per-candidate grounded + relevance """ print("\n=== Picker Behaviour ===") multi = [t for t in turns if (t.get("n_candidates") or 0) >= 2] if not multi: print( "(no multi-candidate turns logged — older format or single-candidate runs)" ) return picks_by_run = {p["run_id"]: p for p in picks if p.get("run_id")} evals_by_run = {e["run_id"]: e for e in evals if e.get("run_id")} n_multi = len(multi) n_picked = sum(1 for t in multi if t["run_id"] in picks_by_run) # A (user_id, turn_id) seen more than once means the planner re-ran for # the same partner query — that's a regenerate. The denominator is the # number of distinct (user, turn) conversations that had at least one # multi-candidate run, not the raw row count. seen: dict[tuple[str, int], int] = defaultdict(int) for t in multi: seen[(t.get("user_id", "?"), t.get("turn_id", -1))] += 1 n_regenerated_turns = sum(1 for c in seen.values() if c > 1) n_distinct_turns = max(1, len(seen)) print( f"multi-candidate turns: {n_multi} ({n_distinct_turns} distinct) " f"pick rate: {n_picked / n_multi:.0%} " f"regenerate rate: {n_regenerated_turns / n_distinct_turns:.0%} " f"(% of distinct turns that re-ran)" ) # Strategy win rate — among multi-candidate picks only, how often does # each strategy win. Picks on single-candidate turns aren't a real "win" # (no alternative to lose to) so we filter them out. multi_run_ids = {t["run_id"] for t in multi} strategy_count: dict[str, int] = defaultdict(int) for run_id, p in picks_by_run.items(): if run_id in multi_run_ids: strategy_count[p.get("strategy", "unknown")] += 1 if strategy_count: total = sum(strategy_count.values()) print(f"\nStrategy win rate (n={total} picks):") print(f" {'strategy':<16} {'picks':>6} {'pct':>6}") for s, n in sorted(strategy_count.items(), key=lambda x: -x[1]): print(f" {s:<16} {n:>6} {n / total:>5.0%}") # Did the picker beat candidate 0? Only meaningful when we have per-candidate # eval scores AND the user picked a non-zero index. A "win" = picked # candidate scored strictly higher on the metric than candidate 0. head_to_head = [] for run_id, pick in picks_by_run.items(): ev = evals_by_run.get(run_id) if not ev or not ev.get("candidates_eval"): continue cands = ev["candidates_eval"] if len(cands) < 2: continue picked_idx = pick.get("picked_idx", 0) if picked_idx == 0 or picked_idx >= len(cands): continue head_to_head.append( { "picked_grounded": cands[picked_idx]["groundedness"], "cand0_grounded": cands[0]["groundedness"], "picked_relevance": cands[picked_idx].get("relevance", 0.0), "cand0_relevance": cands[0].get("relevance", 0.0), } ) if head_to_head: n = len(head_to_head) beat_grounded = sum( 1 for h in head_to_head if h["picked_grounded"] > h["cand0_grounded"] ) tied_grounded = sum( 1 for h in head_to_head if h["picked_grounded"] == h["cand0_grounded"] ) beat_rel = sum( 1 for h in head_to_head if h["picked_relevance"] > h["cand0_relevance"] ) print(f"\nDid picker beat candidate 0? (n={n} picks where picked_idx > 0)") print( f" groundedness: picker > cand0 = {beat_grounded}/{n} ({beat_grounded / n:.0%}), " f"tied = {tied_grounded}/{n}" ) print(f" relevance: picker > cand0 = {beat_rel}/{n} ({beat_rel / n:.0%})") else: print( "\n(no picks of candidate 1+ with per-candidate eval data — can't measure picker quality yet)" ) # Diversity: among multi-candidate turns with eval data, how often is the # picker showing near-paraphrases (the "aloha" problem)? div_scored = [ ev for ev in evals_by_run.values() if ev.get("n_candidates", 0) >= 2 and "candidate_diversity" in ev ] if div_scored: diversities = [float(e["candidate_diversity"]) for e in div_scored] low = sum(1 for d in diversities if d < _DIVERSITY_FLOOR) print( f"\nCandidate diversity (n={len(div_scored)} turns): " f"mean={statistics.mean(diversities):.2f} " f"low (<{_DIVERSITY_FLOOR:.2f}): {low}/{len(div_scored)} ({low / len(div_scored):.0%})" ) def main() -> None: parser = argparse.ArgumentParser(description="Aggregate AAC eval metrics") parser.add_argument("--logs", type=Path, default=settings.logs_dir) args = parser.parse_args() turns = _load(args.logs / "turns.jsonl") evals = _load(args.logs / "evals.jsonl") ratings = _load(args.logs / "ratings.jsonl") picks = _load(args.logs / "picks.jsonl") print( f"Loaded: {len(turns)} turns, {len(evals)} evals, " f"{len(picks)} picks, {len(ratings)} ratings" ) report_latency(turns) report_faithfulness(evals) report_multimodal(evals) report_picker(turns, picks, evals) report_authenticity(ratings) if __name__ == "__main__": main()