| """Cost-Quality Pareto Frontier analysis. |
| |
| Implements RouterBench NDCH (arxiv:2403.12031) and |
| RouteLLM CPT/APGR (arxiv:2406.18665) metrics. |
| """ |
| import numpy as np |
| from typing import Dict, List, Optional, Tuple |
|
|
|
|
| def compute_ndch(points: np.ndarray) -> np.ndarray: |
| """Non-Decreasing Convex Hull from RouterBench Sec 3.2. |
| |
| Args: |
| points: Nx2 array of (cost, quality) pairs |
| Returns: |
| NDCH vertices sorted by cost |
| """ |
| from scipy.spatial import ConvexHull |
| if len(points) < 3: |
| idx = np.argsort(points[:, 0]) |
| return points[idx] |
| hull = ConvexHull(points) |
| hull_pts = points[hull.vertices] |
| hull_pts = hull_pts[hull_pts[:, 0].argsort()] |
| ndch = [hull_pts[0]] |
| for pt in hull_pts[1:]: |
| if pt[1] >= ndch[-1][1]: |
| ndch.append(pt) |
| return np.array(ndch) |
|
|
|
|
| def cost_at_quality(ndch: np.ndarray, target_q: float) -> Optional[float]: |
| """Find minimum cost on NDCH achieving target_quality.""" |
| for i in range(len(ndch) - 1): |
| q1, q2 = ndch[i, 1], ndch[i+1, 1] |
| if q1 <= target_q <= q2: |
| c1, c2 = ndch[i, 0], ndch[i+1, 0] |
| return c1 + (c2 - c1) * (target_q - q1) / (q2 - q1) |
| return None |
|
|
|
|
| def cost_savings_at_iso(ndch_opt, ndch_base, target_q): |
| """Cost reduction at iso-quality: 1 - c_opt/c_base.""" |
| c_opt = cost_at_quality(ndch_opt, target_q) |
| c_base = cost_at_quality(ndch_base, target_q) |
| if c_opt is None or c_base is None: |
| return None |
| return 1.0 - c_opt / c_base |
|
|
|
|
| def compute_aiq(ndch, c_min, c_max): |
| """Average Improvement in Quality (RouterBench).""" |
| costs = np.linspace(c_min, c_max, 100) |
| quals = np.interp(costs, ndch[:, 0], ndch[:, 1]) |
| return float(np.trapz(quals, costs) / (c_max - c_min)) |
|
|
|
|
| def compute_apgr(ndch_router, ndch_random, c_max): |
| """Average Performance Gap Recovery (RouteLLM). |
| APGR = integral of PGR over cost fraction [0,1]. |
| PGR(c) = (q_router(c) - q_weak(c)) / (q_strong(c) - q_weak(c)) |
| """ |
| costs = np.linspace(0, c_max, 100) |
| q_r = np.interp(costs, ndch_router[:, 0], ndch_router[:, 1]) |
| q_rand = np.interp(costs, ndch_random[:, 0], ndch_random[:, 1]) |
| q_weak = ndch_random[0, 1] |
| q_strong = ndch_random[-1, 1] |
| pgr = (q_r - q_weak) / max(q_strong - q_weak, 1e-6) |
| return float(np.trapz(pgr, costs) / c_max) |
|
|
|
|
| def build_frontier_report(policies: Dict[str, Tuple[float, float]]): |
| """Build full Pareto frontier report. |
| |
| Args: |
| policies: {name: (success_rate, cost_per_task)} |
| """ |
| points = np.array([(c, q) for q, c in policies.values()]) |
| names = list(policies.keys()) |
| |
| ndch = compute_ndch(points) |
| c_min, c_max = points[:, 0].min(), points[:, 0].max() |
| aiq = compute_aiq(ndch, c_min, c_max) |
| |
| report = { |
| "policies": {n: {"success": q, "cost": c} for n, (q, c) in policies.items()}, |
| "ndch_vertices": ndch.tolist(), |
| "aiq": round(aiq, 4), |
| "frontier_quality_levels": {}, |
| } |
| |
| for q_target in [0.65, 0.70, 0.75, 0.78, 0.80, 0.85, 0.87]: |
| c = cost_at_quality(ndch, q_target) |
| if c is not None: |
| report["frontier_quality_levels"][f"q={q_target:.2f}"] = round(c, 4) |
| |
| return report |
|
|