narcolepticchicken's picture
Upload aco/pareto.py
ee673f9 verified
"""Cost-Quality Pareto Frontier analysis.
Implements RouterBench NDCH (arxiv:2403.12031) and
RouteLLM CPT/APGR (arxiv:2406.18665) metrics.
"""
import numpy as np
from typing import Dict, List, Optional, Tuple
def compute_ndch(points: np.ndarray) -> np.ndarray:
"""Non-Decreasing Convex Hull from RouterBench Sec 3.2.
Args:
points: Nx2 array of (cost, quality) pairs
Returns:
NDCH vertices sorted by cost
"""
from scipy.spatial import ConvexHull
if len(points) < 3:
idx = np.argsort(points[:, 0])
return points[idx]
hull = ConvexHull(points)
hull_pts = points[hull.vertices]
hull_pts = hull_pts[hull_pts[:, 0].argsort()]
ndch = [hull_pts[0]]
for pt in hull_pts[1:]:
if pt[1] >= ndch[-1][1]:
ndch.append(pt)
return np.array(ndch)
def cost_at_quality(ndch: np.ndarray, target_q: float) -> Optional[float]:
"""Find minimum cost on NDCH achieving target_quality."""
for i in range(len(ndch) - 1):
q1, q2 = ndch[i, 1], ndch[i+1, 1]
if q1 <= target_q <= q2:
c1, c2 = ndch[i, 0], ndch[i+1, 0]
return c1 + (c2 - c1) * (target_q - q1) / (q2 - q1)
return None
def cost_savings_at_iso(ndch_opt, ndch_base, target_q):
"""Cost reduction at iso-quality: 1 - c_opt/c_base."""
c_opt = cost_at_quality(ndch_opt, target_q)
c_base = cost_at_quality(ndch_base, target_q)
if c_opt is None or c_base is None:
return None
return 1.0 - c_opt / c_base
def compute_aiq(ndch, c_min, c_max):
"""Average Improvement in Quality (RouterBench)."""
costs = np.linspace(c_min, c_max, 100)
quals = np.interp(costs, ndch[:, 0], ndch[:, 1])
return float(np.trapz(quals, costs) / (c_max - c_min))
def compute_apgr(ndch_router, ndch_random, c_max):
"""Average Performance Gap Recovery (RouteLLM).
APGR = integral of PGR over cost fraction [0,1].
PGR(c) = (q_router(c) - q_weak(c)) / (q_strong(c) - q_weak(c))
"""
costs = np.linspace(0, c_max, 100)
q_r = np.interp(costs, ndch_router[:, 0], ndch_router[:, 1])
q_rand = np.interp(costs, ndch_random[:, 0], ndch_random[:, 1])
q_weak = ndch_random[0, 1]
q_strong = ndch_random[-1, 1]
pgr = (q_r - q_weak) / max(q_strong - q_weak, 1e-6)
return float(np.trapz(pgr, costs) / c_max)
def build_frontier_report(policies: Dict[str, Tuple[float, float]]):
"""Build full Pareto frontier report.
Args:
policies: {name: (success_rate, cost_per_task)}
"""
points = np.array([(c, q) for q, c in policies.values()])
names = list(policies.keys())
ndch = compute_ndch(points)
c_min, c_max = points[:, 0].min(), points[:, 0].max()
aiq = compute_aiq(ndch, c_min, c_max)
report = {
"policies": {n: {"success": q, "cost": c} for n, (q, c) in policies.items()},
"ndch_vertices": ndch.tolist(),
"aiq": round(aiq, 4),
"frontier_quality_levels": {},
}
for q_target in [0.65, 0.70, 0.75, 0.78, 0.80, 0.85, 0.87]:
c = cost_at_quality(ndch, q_target)
if c is not None:
report["frontier_quality_levels"][f"q={q_target:.2f}"] = round(c, 4)
return report