"""Cost-Quality Pareto Frontier analysis. Implements RouterBench NDCH (arxiv:2403.12031) and RouteLLM CPT/APGR (arxiv:2406.18665) metrics. """ import numpy as np from typing import Dict, List, Optional, Tuple def compute_ndch(points: np.ndarray) -> np.ndarray: """Non-Decreasing Convex Hull from RouterBench Sec 3.2. Args: points: Nx2 array of (cost, quality) pairs Returns: NDCH vertices sorted by cost """ from scipy.spatial import ConvexHull if len(points) < 3: idx = np.argsort(points[:, 0]) return points[idx] hull = ConvexHull(points) hull_pts = points[hull.vertices] hull_pts = hull_pts[hull_pts[:, 0].argsort()] ndch = [hull_pts[0]] for pt in hull_pts[1:]: if pt[1] >= ndch[-1][1]: ndch.append(pt) return np.array(ndch) def cost_at_quality(ndch: np.ndarray, target_q: float) -> Optional[float]: """Find minimum cost on NDCH achieving target_quality.""" for i in range(len(ndch) - 1): q1, q2 = ndch[i, 1], ndch[i+1, 1] if q1 <= target_q <= q2: c1, c2 = ndch[i, 0], ndch[i+1, 0] return c1 + (c2 - c1) * (target_q - q1) / (q2 - q1) return None def cost_savings_at_iso(ndch_opt, ndch_base, target_q): """Cost reduction at iso-quality: 1 - c_opt/c_base.""" c_opt = cost_at_quality(ndch_opt, target_q) c_base = cost_at_quality(ndch_base, target_q) if c_opt is None or c_base is None: return None return 1.0 - c_opt / c_base def compute_aiq(ndch, c_min, c_max): """Average Improvement in Quality (RouterBench).""" costs = np.linspace(c_min, c_max, 100) quals = np.interp(costs, ndch[:, 0], ndch[:, 1]) return float(np.trapz(quals, costs) / (c_max - c_min)) def compute_apgr(ndch_router, ndch_random, c_max): """Average Performance Gap Recovery (RouteLLM). APGR = integral of PGR over cost fraction [0,1]. PGR(c) = (q_router(c) - q_weak(c)) / (q_strong(c) - q_weak(c)) """ costs = np.linspace(0, c_max, 100) q_r = np.interp(costs, ndch_router[:, 0], ndch_router[:, 1]) q_rand = np.interp(costs, ndch_random[:, 0], ndch_random[:, 1]) q_weak = ndch_random[0, 1] q_strong = ndch_random[-1, 1] pgr = (q_r - q_weak) / max(q_strong - q_weak, 1e-6) return float(np.trapz(pgr, costs) / c_max) def build_frontier_report(policies: Dict[str, Tuple[float, float]]): """Build full Pareto frontier report. Args: policies: {name: (success_rate, cost_per_task)} """ points = np.array([(c, q) for q, c in policies.values()]) names = list(policies.keys()) ndch = compute_ndch(points) c_min, c_max = points[:, 0].min(), points[:, 0].max() aiq = compute_aiq(ndch, c_min, c_max) report = { "policies": {n: {"success": q, "cost": c} for n, (q, c) in policies.items()}, "ndch_vertices": ndch.tolist(), "aiq": round(aiq, 4), "frontier_quality_levels": {}, } for q_target in [0.65, 0.70, 0.75, 0.78, 0.80, 0.85, 0.87]: c = cost_at_quality(ndch, q_target) if c is not None: report["frontier_quality_levels"][f"q={q_target:.2f}"] = round(c, 4) return report