| """Tools for tuning pairs of scalar thresholds to trade off sensitivity, specificity, and indeterminate rate. |
| |
| See `IndetSnSpArray` and subclass docstrings for details. |
| |
| """ |
| from dataclasses import asdict, dataclass |
| from typing import NamedTuple, Optional |
| from typing_extensions import Self |
|
|
| import numpy as np |
|
|
|
|
| def running_argmax_indices(a): |
| """Return indices of a where the value is larger than all previous values. |
| |
| >>> running_argmax_indices([1, 0, 3, 4, 4, 2, 5, 7, 1]) |
| array([0, 2, 3, 6, 7]) |
| |
| """ |
| m = np.maximum.accumulate(a) |
| return np.flatnonzero(np.r_[True, m[:-1] < m[1:]]) |
|
|
|
|
| def pareto_2d_indices(x, y): |
| """Compute indices of the Pareto frontier maximizing x and y, sorted in increasing x and decreasing y. |
| |
| e.g. the Pareto frontier of the point set below is [A, G] |
| |
| B A |
| C |
| E D |
| F G |
| H |
| |
| >>> u = [2, 0, 1, 2, 0, 0, 3, 1] |
| >>> v = [4, 4, 3, 2, 2, 1, 1, 0] |
| >>> pareto_2d_indices(np.array(u), np.array(v)) |
| array([0, 6]) |
| |
| """ |
| sort_indices = np.lexsort((-x, -y)) |
| return sort_indices[running_argmax_indices(x[sort_indices])] |
|
|
|
|
| def midpoints_with_infs(x): |
| """Return the midpoints between the sorted unique elements of x, along with +/-inf.""" |
| unique_scores = np.unique(np.r_[-np.inf, x, np.inf]) |
| return (unique_scores[1:] + unique_scores[:-1]) / 2 |
|
|
|
|
| def kde_disc_mass( |
| data: np.ndarray, |
| edges: np.ndarray, |
| bandwidth: float, |
| weights: Optional[np.ndarray] = None, |
| ): |
| """Perform Kernel Density Estimation (KDE) on data & weights, then compute the probability mass between edges.""" |
| import scipy |
| z_score = (edges[:, None] - data[None, :]) / bandwidth |
| component_cdfs = scipy.stats.norm.cdf(z_score) |
| if weights is None: |
| weights = np.ones_like(data) |
| cdf = np.dot(component_cdfs, weights / weights.sum()) |
| return np.diff(cdf) |
|
|
|
|
| class BinaryLabeledScores(NamedTuple): |
| """An array of numeric scores along with associated 0/1 ground truth and optional numeric weights.""" |
|
|
| y_score: np.ndarray |
| y_true: np.ndarray |
| weights: Optional[np.ndarray] = None |
|
|
| def smooth( |
| self, num_points: int, bandwidth: float, padding_bandwidths: float = 5.0 |
| ) -> "BinaryLabeledScores": |
| """KDE-smooth positive and negative scores separately and discretize each to equally spaced weighted points. |
| |
| Args: |
| num_points: number of points to use each for positive and negative score discretizations |
| bandwidth: bandwidth of kernel density estimation, i.e. standard deviation of noise to be added |
| padding_bandwidths: number of bandwidths to extend past lowest and highest scores when selecting |
| discretization endpoints |
| |
| Returns: |
| `BinaryLabeledScores` object representing the smoothed and re-discretized weighted labeled scores |
| |
| """ |
| pos = self.y_true == 1 |
| neg = self.y_true == 0 |
| if self.weights is not None: |
| pos_weights = self.weights[pos] |
| neg_weights = self.weights[neg] |
| else: |
| pos_weights = None |
| neg_weights = None |
| padding = padding_bandwidths * bandwidth |
| all_points = np.linspace( |
| self.y_score.min() - padding, |
| self.y_score.max() + padding, |
| 2 * num_points + 1, |
| ) |
| edges = all_points[::2] |
| centers = all_points[1::2] |
| pos_kde_weights = kde_disc_mass( |
| self.y_score[pos], edges, bandwidth, pos_weights |
| ) |
| neg_kde_weights = kde_disc_mass( |
| self.y_score[neg], edges, bandwidth, neg_weights |
| ) |
| return BinaryLabeledScores( |
| y_true=np.r_[np.zeros_like(centers), np.ones_like(centers)], |
| y_score=np.r_[centers, centers], |
| weights=np.r_[neg_kde_weights, pos_kde_weights], |
| ) |
|
|
| def indet_sn_sp_array(self) -> "IndetSnSpArray": |
| """Build `IndetSnSpArray`.""" |
| return IndetSnSpArray.build(**self._asdict()) |
|
|
|
|
| def fake_vectorized_binom_ci( |
| k: np.ndarray, n: np.ndarray, p: float | np.ndarray = 0.95 |
| ) -> tuple[np.ndarray, np.ndarray]: |
| """Compute binomial confidence intervals on arrays of parameters inefficiently.""" |
| import scipy |
| k, n, p = np.broadcast_arrays(k, n, p) |
| |
| flat_out = [ |
| scipy.stats.binomtest(k_, n_).proportion_ci(p_) |
| for k_, n_, p_ in zip(k.flatten(), n.flatten(), p.flatten()) |
| ] |
| low = np.array([ci.low for ci in flat_out]).reshape(k.shape) |
| high = np.array([ci.high for ci in flat_out]).reshape(k.shape) |
| return low, high |
|
|
|
|
| @dataclass |
| class IndetSnSpArray: |
| """An array of metrics at different lower and upper threshold values. |
| |
| This class and subclasses are for selecting pairs of model thresholds based on sensitivity, |
| specificity, and indeterminate rate. Throughout we assume scores are scalars and ground truth is binary. |
| |
| Each member `lower_thresh`, `upper_thresh`, `sn`, `sp`, and `indet_frac` must be a numpy array, and they all must |
| have the same shape. Corresponding entries of these arrays specify a pair of thresholds and the metrics when a |
| common dataset is evaluated using those thresholds. The thresholding logic is that scores less than the lower |
| threshold count as negative outputs, scores greater than or equal to the upper threshold count as positive outputs, |
| and scores in between are indeterminate outputs. |
| |
| Indeterminate fraction is defined as the proportion of scores in between the two thresholds. All other |
| metrics are interpreted as conditioned on the scores not being indeterminate. For example, sensitivity |
| is defined as usual as (true positives) / (total positives) *except that examples with indeterminate |
| scores do not count towards the numerator or the denominator*. |
| |
| """ |
|
|
| lower_thresh: np.ndarray |
| upper_thresh: np.ndarray |
| tp: np.ndarray |
| fp: np.ndarray |
| tn: np.ndarray |
| fn: np.ndarray |
| indet: np.ndarray |
| weighted: bool = False |
| min_weight: float = 1.0 |
| eps: float = 1e-8 |
|
|
| def __post_init__(self): |
| self.sn = self.tp / np.maximum(self.tp + self.fn, self.min_weight) |
| self.sp = self.tn / np.maximum(self.tn + self.fp, self.min_weight) |
| self.ppv = self.tp / np.maximum(self.tp + self.fp, self.min_weight) |
| self.npv = self.tn / np.maximum(self.tn + self.fn, self.min_weight) |
| total = self.indet + self.fn + self.fp + self.tn + self.tp |
| self.indet_frac = self.indet / np.maximum(total, self.min_weight) |
| for attr in ("sn", "sp", "ppv", "npv", "indet_frac"): |
| value = getattr(self, attr) |
| if value.size: |
| if value.max() > 1 + self.eps: |
| raise ValueError( |
| f"Numerical precision issues produced invalid value {attr} = {value.max()}." |
| ) |
| if value.min() < -self.eps: |
| raise ValueError( |
| f"Numerical precision issues produced invalid value {attr} = {value.min()}." |
| ) |
| setattr(self, attr, np.clip(value, 0.0, 1.0)) |
|
|
| @property |
| def min_sn_sp(self): |
| return np.minimum(self.sn, self.sp) |
|
|
| @classmethod |
| def build( |
| cls, |
| lower_thresh: Optional[np.ndarray] = None, |
| upper_thresh: Optional[np.ndarray] = None, |
| *, |
| y_true: np.ndarray, |
| y_score: np.ndarray, |
| weights: Optional[np.ndarray] = None, |
| eps: float = 1e-8, |
| ) -> Self: |
| """Find `IndetSnSpArray` values for given truth and scores as thresholds vary (à la sklearn.metrics.roc_curve). |
| |
| The output object contains arrays for `sn`, `sp`, `indet_frac`, `lower_thresh`, and `upper_thresh`, all with the |
| same shape. What these arrays contain and what their common shape is depends on the input as follows. |
| |
| If both lower_thresh and upper_thresh are provided, they must have the same shape and this method computes |
| metrics at the pairs given by corresponding entries in these arrays. The common output shape will be the same as |
| this common input shape. |
| |
| If only one set of thresholds is provided, this method computes metrics at all sorted pairs of these thresholds |
| (along with +/- inf). If neither is provided, sort scores and allow thresholds between each pair (along with |
| +/- inf). In both of these cases, the common output shape is a 1-d vector of length equal to the number of such |
| pairs. |
| |
| """ |
| weights = weights if weights is not None else np.ones_like(y_true) |
| y_true = y_true[weights > 0] |
| y_score = y_score[weights > 0] |
| weights = weights[weights > 0] |
|
|
| |
| if lower_thresh is not None and upper_thresh is not None: |
| threshes = np.unique(np.r_[-np.inf, lower_thresh, upper_thresh, np.inf]) |
| lower_indices = np.searchsorted(threshes, lower_thresh) |
| upper_indices = np.searchsorted(threshes, upper_thresh) |
| else: |
| if lower_thresh is not None: |
| threshes = np.unique(np.r_[-np.inf, lower_thresh, np.inf]) |
| elif upper_thresh is not None: |
| threshes = np.unique(np.r_[-np.inf, upper_thresh, np.inf]) |
| else: |
| unique_scores = np.unique(np.r_[-np.inf, y_score, np.inf]) |
| threshes = (unique_scores[1:] + unique_scores[:-1]) / 2 |
| threshes = np.unique(threshes) |
| lower_indices, upper_indices = np.triu_indices(len(threshes)) |
|
|
| count_by_bin = np.histogram(y_score, bins=threshes, weights=weights)[0] |
| pos_by_bin = np.histogram(y_score, bins=threshes, weights=y_true * weights)[0] |
| count_by_thresh = np.pad(np.cumsum(count_by_bin), (1, 0)) |
| pos_by_thresh = np.pad(np.cumsum(pos_by_bin), (1, 0)) |
| tn_plus_fn = count_by_thresh[lower_indices] |
| total_minus_tp_minus_fp = count_by_thresh[upper_indices] |
| tp_plus_fp = count_by_thresh[-1] - total_minus_tp_minus_fp |
| fn = pos_by_thresh[lower_indices] |
| total_pos = pos_by_thresh[-1] |
| tp = total_pos - pos_by_thresh[upper_indices] |
| fp = tp_plus_fp - tp |
| tn = tn_plus_fn - fn |
| min_weight = weights.min() |
| indet = total_minus_tp_minus_fp - tn_plus_fn |
| return cls( |
| lower_thresh=threshes[lower_indices], |
| upper_thresh=threshes[upper_indices], |
| tp=tp, |
| fp=fp, |
| tn=tn, |
| fn=fn, |
| indet=indet, |
| weighted=not all(weights == 1.0), |
| min_weight=min_weight, |
| eps=eps, |
| ) |
|
|
| def eval( |
| self, |
| *, |
| y_true, |
| y_score, |
| weights: Optional[np.ndarray] = None, |
| ) -> "IndetSnSpArray": |
| """Evaluate the given data on the thresholds of `self`.""" |
| return IndetSnSpArray.build( |
| lower_thresh=self.lower_thresh, |
| upper_thresh=self.upper_thresh, |
| y_true=y_true, |
| y_score=y_score, |
| weights=weights, |
| ) |
|
|
| def __getitem__(self, item) -> "IndetSnSpArray": |
| """Extract a subarray with numpy-style indexing.""" |
| return IndetSnSpArray( |
| lower_thresh=self.lower_thresh[item], |
| upper_thresh=self.upper_thresh[item], |
| tp=self.tp[item], |
| fp=self.fp[item], |
| fn=self.fn[item], |
| tn=self.tn[item], |
| indet=self.indet[item], |
| weighted=self.weighted, |
| min_weight=self.min_weight, |
| eps=self.eps, |
| ) |
|
|
| def __add__(self, other: "IndetSnSpArray") -> "IndetSnSpArray": |
| if not isinstance(other, IndetSnSpArray): |
| raise TypeError(f"Cannot add {type(other)} to IndetSnSpArray.") |
| tp = self.tp + other.tp |
| if np.array_equal(self.lower_thresh, other.lower_thresh): |
| lower_thresh = self.lower_thresh |
| else: |
| lower_thresh = np.nan * np.ones_like(tp) |
| if np.array_equal(self.upper_thresh, other.upper_thresh): |
| upper_thresh = self.upper_thresh |
| else: |
| upper_thresh = np.nan * np.ones_like(tp) |
| return IndetSnSpArray( |
| lower_thresh=lower_thresh, |
| upper_thresh=upper_thresh, |
| tp=tp, |
| fp=self.fp + other.fp, |
| fn=self.fn + other.fn, |
| tn=self.tn + other.tn, |
| indet=self.indet + other.indet, |
| weighted=self.weighted or other.weighted, |
| min_weight=min(self.min_weight, other.min_weight), |
| eps=self.eps, |
| ) |
|
|
| def confidence_interval_bound(self, p: float = 0.95) -> "IndetSnSpArray": |
| """Compute two-sided confidence interval bounds: upper for indet_frac and lower for other metrics.""" |
| if self.weighted: |
| raise NotImplementedError( |
| "Confidence intervals only implemented for unweighted confusion matrices." |
| ) |
| copy = IndetSnSpArray(**asdict(self)) |
| copy.sn, _ = fake_vectorized_binom_ci(self.tp, self.tp + self.fn, p=p) |
| copy.sp, _ = fake_vectorized_binom_ci(self.tn, self.tn + self.fp, p=p) |
| copy.ppv, _ = fake_vectorized_binom_ci(self.tp, self.fp + self.tp, p=p) |
| copy.npv, _ = fake_vectorized_binom_ci(self.tn, self.tn + self.fn, p=p) |
| _, copy.indet_frac = fake_vectorized_binom_ci( |
| self.indet, self.tp + self.fn + self.fp + self.tn + self.indet, p=p |
| ) |
| return copy |
|
|
| def roc_curve(self, indet_budget=0.0) -> "IndetRocCurve": |
| """Compute ROC curve with indeterminate budget, sorted by increasing sn and decreasing sp. |
| |
| Restrict `self` to Pareto-optimal pairs (sn, sp) for which `indet_frac <= indet_budget`. Other points are worse |
| than the points on the curve in the sense of having worse Sn, worse Sp, or not meeting the indeterminate budget. |
| |
| """ |
| within_budget = self[self.indet_frac <= indet_budget] |
| frontier = pareto_2d_indices(within_budget.sn, within_budget.sp) |
| return IndetRocCurve(**asdict(within_budget[frontier])) |
|
|
| def sn_eq_sp_graph(self) -> "IndetSnEqSpGraph": |
| """Compute sn=sp as a function of indet_frac, returning both sorted in increasing order. |
| |
| Method: restrict to Pareto-optimal pairs (s, indet_frac) where s = min(sn, sp). |
| |
| Pareto-optimality means that if (s, indet_frac) is in the output, there is no point (sn', sp', indet_frac') in |
| the input with indet_frac' <= indet_frac and sn', sp' > s. In other words, s is the maximum value such that |
| the quadrant { sn, sp >= s} intersects `self.roc_curve(indet_frac)`. This maximum occurs where the ROC curve |
| intersects the diagonal, up to an error bounded by the distance between points on the ROC curve. |
| |
| """ |
| frontier = pareto_2d_indices(self.min_sn_sp, -self.indet_frac) |
| return IndetSnEqSpGraph(**asdict(self[frontier])) |
|
|
|
|
| class IndetRocCurve(IndetSnSpArray): |
| """Sn, Sp achievable within some indeterminate budget and associated lower and upper thresholds. |
| |
| `sn` is assumed to be sorted in increasing order and `sp` decreasing. |
| |
| """ |
|
|
| def sn_eq_sp(self) -> IndetSnSpArray: |
| """Locate the point on the ROC curve closest to the diagonal""" |
| return self[np.argmax(self.min_sn_sp)] |
|
|
| def auc(self) -> float: |
| """Compute the area under the ROC curve.""" |
| |
| |
| from sklearn.metrics import auc |
| return auc(1 - np.r_[1.0, self.sp, 0.0], np.r_[0.0, self.sn, 1.0]) |
|
|
| @classmethod |
| def build( |
| cls, |
| thresh=None, |
| *, |
| y_true: np.ndarray, |
| y_score: np.ndarray, |
| weights: Optional[np.ndarray] = None, |
| ) -> Self: |
| """Build an indeterminate=0 ROC curve in n log n time (vs n**2 for IndetSnSpArray.build().roc_curve()).""" |
| if thresh is None: |
| thresh = midpoints_with_infs(y_score)[ |
| ::-1 |
| ] |
| issa = IndetSnSpArray.build( |
| lower_thresh=thresh, |
| upper_thresh=thresh, |
| y_true=y_true, |
| y_score=y_score, |
| weights=weights, |
| ) |
| return cls(**asdict(issa)) |
|
|
|
|
| class IndetSnEqSpGraph(IndetSnSpArray): |
| """Sn=Sp achievable as a function of indeterminate budget and associated lower and upper thresholds. |
| |
| Both min(self.sn, self.sp) and self.indet_frac are assumed to be sorted in non-decreasing order. |
| |
| """ |
|
|
| def at_budget(self, indet_budget: float = 0.0) -> IndetSnSpArray: |
| """Locate the best point on the graph within the given budget.""" |
| return self[np.searchsorted(self.indet_frac, indet_budget, side="right") - 1] |
|
|