File size: 3,230 Bytes
ee673f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""Cost-Quality Pareto Frontier analysis.

Implements RouterBench NDCH (arxiv:2403.12031) and
RouteLLM CPT/APGR (arxiv:2406.18665) metrics.
"""
import numpy as np
from typing import Dict, List, Optional, Tuple


def compute_ndch(points: np.ndarray) -> np.ndarray:
    """Non-Decreasing Convex Hull from RouterBench Sec 3.2.
    
    Args:
        points: Nx2 array of (cost, quality) pairs
    Returns:
        NDCH vertices sorted by cost
    """
    from scipy.spatial import ConvexHull
    if len(points) < 3:
        idx = np.argsort(points[:, 0])
        return points[idx]
    hull = ConvexHull(points)
    hull_pts = points[hull.vertices]
    hull_pts = hull_pts[hull_pts[:, 0].argsort()]
    ndch = [hull_pts[0]]
    for pt in hull_pts[1:]:
        if pt[1] >= ndch[-1][1]:
            ndch.append(pt)
    return np.array(ndch)


def cost_at_quality(ndch: np.ndarray, target_q: float) -> Optional[float]:
    """Find minimum cost on NDCH achieving target_quality."""
    for i in range(len(ndch) - 1):
        q1, q2 = ndch[i, 1], ndch[i+1, 1]
        if q1 <= target_q <= q2:
            c1, c2 = ndch[i, 0], ndch[i+1, 0]
            return c1 + (c2 - c1) * (target_q - q1) / (q2 - q1)
    return None


def cost_savings_at_iso(ndch_opt, ndch_base, target_q):
    """Cost reduction at iso-quality: 1 - c_opt/c_base."""
    c_opt = cost_at_quality(ndch_opt, target_q)
    c_base = cost_at_quality(ndch_base, target_q)
    if c_opt is None or c_base is None:
        return None
    return 1.0 - c_opt / c_base


def compute_aiq(ndch, c_min, c_max):
    """Average Improvement in Quality (RouterBench)."""
    costs = np.linspace(c_min, c_max, 100)
    quals = np.interp(costs, ndch[:, 0], ndch[:, 1])
    return float(np.trapz(quals, costs) / (c_max - c_min))


def compute_apgr(ndch_router, ndch_random, c_max):
    """Average Performance Gap Recovery (RouteLLM).
    APGR = integral of PGR over cost fraction [0,1].
    PGR(c) = (q_router(c) - q_weak(c)) / (q_strong(c) - q_weak(c))
    """
    costs = np.linspace(0, c_max, 100)
    q_r = np.interp(costs, ndch_router[:, 0], ndch_router[:, 1])
    q_rand = np.interp(costs, ndch_random[:, 0], ndch_random[:, 1])
    q_weak = ndch_random[0, 1]
    q_strong = ndch_random[-1, 1]
    pgr = (q_r - q_weak) / max(q_strong - q_weak, 1e-6)
    return float(np.trapz(pgr, costs) / c_max)


def build_frontier_report(policies: Dict[str, Tuple[float, float]]):
    """Build full Pareto frontier report.
    
    Args:
        policies: {name: (success_rate, cost_per_task)}
    """
    points = np.array([(c, q) for q, c in policies.values()])
    names = list(policies.keys())
    
    ndch = compute_ndch(points)
    c_min, c_max = points[:, 0].min(), points[:, 0].max()
    aiq = compute_aiq(ndch, c_min, c_max)
    
    report = {
        "policies": {n: {"success": q, "cost": c} for n, (q, c) in policies.items()},
        "ndch_vertices": ndch.tolist(),
        "aiq": round(aiq, 4),
        "frontier_quality_levels": {},
    }
    
    for q_target in [0.65, 0.70, 0.75, 0.78, 0.80, 0.85, 0.87]:
        c = cost_at_quality(ndch, q_target)
        if c is not None:
            report["frontier_quality_levels"][f"q={q_target:.2f}"] = round(c, 4)
    
    return report