| """Conformal calibration for escalation thresholds. |
| |
| Implements RouteNLP-style conformal risk control: |
| P(failure AND no escalation) <= alpha |
| |
| Method: |
| 1. Compute nonconformity scores from calibrated P(success) |
| 2. Find conformal quantile threshold |
| 3. Guarantee coverage under exchangeability |
| """ |
| import numpy as np |
| from typing import Dict, List, Optional, Tuple |
|
|
|
|
| class ConformalEscalationCalibrator: |
| """Calibrate escalation thresholds with distribution-free coverage guarantees. |
| |
| Based on RouteNLP (arxiv 2604.23577) and Conformal Risk Control |
| (Angelopoulos et al., arxiv 2208.02814). |
| |
| Guarantee: P(y=fail AND no_escalation) <= alpha |
| """ |
| |
| def __init__(self, alpha: float = 0.05): |
| self.alpha = alpha |
| self.thresholds: Dict[int, float] = {} |
| self.calibrated = False |
| |
| def calibrate( |
| self, |
| psuccess: Dict[int, np.ndarray], |
| outcomes: Dict[int, np.ndarray], |
| ) -> Dict[int, float]: |
| """Calibrate per-tier escalation thresholds. |
| |
| Args: |
| psuccess: {tier: array of calibrated P(success)} |
| outcomes: {tier: array of binary outcomes (1=success, 0=fail)} |
| |
| Returns: |
| {tier: conformal_threshold} |
| """ |
| for tier in sorted(psuccess.keys()): |
| p = psuccess[tier] |
| y = outcomes[tier] |
| n = len(y) |
| |
| |
| |
| failed_mask = y == 0 |
| if failed_mask.sum() == 0: |
| self.thresholds[tier] = 1.0 |
| continue |
| |
| |
| |
| |
| |
| |
| sorted_p = np.sort(p[failed_mask]) |
| |
| |
| q = int(np.ceil((1 - self.alpha) * (n + 1) / n)) |
| q = min(q, len(sorted_p)) |
| |
| |
| |
| threshold = sorted_p[q - 1] if q > 0 else 0.0 |
| self.thresholds[tier] = float(threshold) |
| |
| self.calibrated = True |
| return self.thresholds |
| |
| def should_escalate(self, tier: int, psuccess: float) -> bool: |
| """Decide whether to escalate from this tier. |
| |
| Returns True if P(success) is below conformal threshold, |
| meaning we can't guarantee success at this tier with 1-alpha coverage. |
| """ |
| if not self.calibrated: |
| return psuccess < 0.65 |
| threshold = self.thresholds.get(tier, 0.65) |
| return psuccess < threshold |
| |
| def coverage_check( |
| self, |
| psuccess: Dict[int, np.ndarray], |
| outcomes: Dict[int, np.ndarray], |
| ) -> Dict[int, Dict[str, float]]: |
| """Verify conformal coverage on test data.""" |
| results = {} |
| for tier in sorted(psuccess.keys()): |
| p = psuccess[tier] |
| y = outcomes[tier] |
| threshold = self.thresholds.get(tier, 0.65) |
| |
| no_escalate = p >= threshold |
| failed_no_escalate = (y == 0) & no_escalate |
| n_no_escalate = no_escalate.sum() |
| |
| violation_rate = failed_no_escalate.sum() / max(n_no_escalate, 1) |
| escalation_rate = 1 - no_escalate.mean() |
| |
| results[tier] = { |
| "violation_rate": float(violation_rate), |
| "escalation_rate": float(escalation_rate), |
| "threshold": float(threshold), |
| "n_no_escalate": int(n_no_escalate), |
| "n_violations": int(failed_no_escalate.sum()), |
| "covered": violation_rate <= self.alpha, |
| } |
| return results |
|
|