from __future__ import annotations from collections import defaultdict from collections.abc import Sequence from typing import Any import numpy as np SIMULATION_SIZE = 100_000 RISK_PRESETS: dict[str, tuple[float, float] | float] = { "uniform": (1.0, 1.0), "bimodal": (0.1, 0.1), "unimodal": (10.0, 10.0), "delta_half": 0.5, } def compute_empirical_probabilities( rows: list[dict[str, Any]], outcome_col: str, strata_features: Sequence[str], ) -> dict[tuple[Any, ...], dict[str, Any]]: """Compute empirical P(Y=1|X) for each feature stratum from true outcomes. Args: rows: List of data rows (dicts) with features and outcome outcome_col: Name of the outcome column (e.g., "PINCP > 50k") strata_features: List of feature names to define strata (e.g., ['AGEP', 'SEX']) Returns: Dictionary mapping stratum key -> { 'probability': empirical P(Y=1|X) = (# Y=1) / (# total), 'count': number of samples in stratum, 'positive_count': number of Y=1 samples, 'features': dict of feature values for this stratum } Example: >>> rows = [ ... {'AGEP': 35, 'SEX': 1, 'PINCP > 50k': True}, ... {'AGEP': 35, 'SEX': 1, 'PINCP > 50k': False}, ... {'AGEP': 35, 'SEX': 1, 'PINCP > 50k': True}, ... ] >>> strata = compute_empirical_probabilities(rows, 'PINCP > 50k', ['AGEP', 'SEX']) >>> strata[(35, 1)]['probability'] 0.6666666666666666 >>> strata[(35, 1)]['count'] 3 """ # Group by strata and count outcomes strata_counts: dict[tuple[Any, ...], dict[str, int]] = defaultdict(lambda: {"total": 0, "positive": 0}) strata_features_map: dict[tuple[Any, ...], dict[str, Any]] = {} for row in rows: # Create stratum key from selected features stratum_key = tuple(row.get(f) for f in strata_features) # Count outcomes outcome_value = row.get(outcome_col) strata_counts[stratum_key]["total"] += 1 # Convert outcome to boolean (handle "True"/"False" strings, True/False, 1/0, etc.) if _is_positive_outcome(outcome_value): strata_counts[stratum_key]["positive"] += 1 # Store feature values for this stratum if stratum_key not in strata_features_map: strata_features_map[stratum_key] = {f: row.get(f) for f in strata_features} # Compute empirical P(Y=1|X) for each stratum result = {} for stratum_key, counts in strata_counts.items(): total = counts["total"] positive = counts["positive"] result[stratum_key] = { "probability": positive / total if total > 0 else 0.0, "count": total, "positive_count": positive, "features": strata_features_map[stratum_key], } return result def _is_positive_outcome(value: Any) -> bool: """Helper to determine if outcome value represents Y=1.""" if value is None: return False if isinstance(value, bool): return value if isinstance(value, (int, float)): return value > 0 if isinstance(value, str): return value.lower() in ("true", "1", "yes", "t", "y") return False def generate_simulation_data( a: float | None = None, b: float | None = None, size: int = SIMULATION_SIZE, seed: int | None = None, point_mass: float | None = None, ) -> tuple[np.ndarray, np.ndarray]: """Generate synthetic risk scores and binary outcomes. Supports two modes: - **Beta-Binomial**: risk_scores ~ Beta(a, b), outcomes ~ Binomial(1, risk_scores). - **Point mass**: all risk_scores = *point_mass*, outcomes ~ Binomial(1, point_mass). Args: a: Alpha parameter of the Beta distribution (ignored when *point_mass* is set). b: Beta parameter of the Beta distribution (ignored when *point_mass* is set). size: Number of samples to generate. seed: Random seed for reproducibility. point_mass: If provided, every risk score is set to this constant value. Returns: Tuple of (risk_scores, outcomes). """ rng = np.random.default_rng(seed) if point_mass is not None: risk_scores = np.full(size, point_mass) else: risk_scores = rng.beta(a, b, size=size) outcomes = rng.binomial(1, risk_scores) return risk_scores, outcomes def _build_rows_with_risk( rows: list[dict[str, Any]], outcome_col: str, strata_features: Sequence[str], prediction_col: str, seed: int | None, use_custom_risk_col: str | None, simulation: str | tuple[float, float] | None, ) -> list[dict[str, Any]]: rows_with_risk = [] if simulation is not None: # Generate synthetic data from a Beta distribution if isinstance(simulation, str): if simulation not in RISK_PRESETS: raise ValueError(f"Unknown simulation preset '{simulation}'. Choose from {list(RISK_PRESETS.keys())}.") preset = RISK_PRESETS[simulation] else: preset = simulation if isinstance(preset, (int, float)): risk_scores, outcomes = generate_simulation_data(size=SIMULATION_SIZE, seed=seed, point_mass=float(preset)) else: a, b = preset risk_scores, outcomes = generate_simulation_data(a, b, size=SIMULATION_SIZE, seed=seed) for i in range(SIMULATION_SIZE): rows_with_risk.append( { "row": {"_sim_index": i, "_sim_feature": 0, outcome_col: bool(outcomes[i])}, "empirical_risk": float(risk_scores[i]), "true_outcome": bool(outcomes[i]), "model_prediction": float(risk_scores[i]), "_input_index": i, } ) elif use_custom_risk_col is not None: # Use custom risk column directly for input_index, row in enumerate(rows): risk = row.get(use_custom_risk_col, 0.5) rows_with_risk.append( { "row": row, "empirical_risk": risk, "true_outcome": _is_positive_outcome(row.get(outcome_col)), "model_prediction": row.get(prediction_col, 0.5), "_input_index": input_index, } ) else: # Compute empirical P(Y=1|X) for each stratum empirical_probs = compute_empirical_probabilities(rows, outcome_col, strata_features) for input_index, row in enumerate(rows): stratum_key = tuple(row.get(f) for f in strata_features) empirical_risk = empirical_probs.get(stratum_key, {}).get("probability", 0.5) rows_with_risk.append( { "row": row, "empirical_risk": empirical_risk, "true_outcome": _is_positive_outcome(row.get(outcome_col)), "model_prediction": row.get(prediction_col, 0.5), "_input_index": input_index, } ) return rows_with_risk def _end_index_for_target_mass(n: int, target_mass: float) -> int: if n <= 0 or target_mass <= 0: return 0 cumulative_mass = 0.0 for i in range(n): cumulative_mass += 1.0 / n if cumulative_mass >= target_mass: return i + 1 return n def _find_optimal_band_indices( rows_with_risk: list[dict[str, Any]], beta: float, alpha: float, max_iterations: int, tolerance: float, ) -> tuple[int, int, int, float]: assert alpha <= beta, f"Screening budget α={alpha} exceeds treatment budget β={beta}" n = len(rows_with_risk) if n == 0: return 0, 0, 0, 0.0 prev_avg_risk = 0.0 band1_end_idx = 0 band2_end_idx = 0 band3_end_idx = 0 avg_risk_band3 = 0.0 for _iteration in range(max_iterations): # Compute target mass: ∫ f(risk) d(risk) = target # Where f(risk) is the density over risk values # For discrete: sum of (count at each risk / total count) = proportion of population at that risk band1_target_mass = beta - alpha # Band 2 size: ∫ 1 × f(risk) d(risk) over Band 2 = ∫ (1 - risk) × f(risk) d(risk) over Band 3 # Since Band 3 has mass α and average risk prev_avg_risk: # ∫ (1 - risk) × f(risk) d(risk) over Band 3 = α × (1 - prev_avg_risk) band2_target_mass = alpha * (1 - prev_avg_risk) band3_target_mass = alpha band1_end_idx = _end_index_for_target_mass(n, band1_target_mass) band2_end_idx = _end_index_for_target_mass(n, band1_target_mass + band2_target_mass) band3_end_idx = _end_index_for_target_mass(n, band1_target_mass + band2_target_mass + band3_target_mass) # Ensure indices are ordered and within bounds band1_end_idx = min(band1_end_idx, n) band2_end_idx = max(band1_end_idx, min(band2_end_idx, n)) band3_end_idx = max(band2_end_idx, min(band3_end_idx, n)) # Compute average risk of Band 3 if band3_end_idx > band2_end_idx: band3_risks = [rows_with_risk[i]["empirical_risk"] for i in range(band2_end_idx, band3_end_idx)] current_avg_risk = np.mean(band3_risks) if band3_risks else 0.0 else: current_avg_risk = 0.0 avg_risk_band3 = current_avg_risk # Check convergence if abs(current_avg_risk - prev_avg_risk) < tolerance: break prev_avg_risk = current_avg_risk return band1_end_idx, band2_end_idx, band3_end_idx, avg_risk_band3 def compute_optimal_screening_actions( rows: list[dict[str, Any]], outcome_col: str, strata_features: Sequence[str], prediction_col: str = "probability", beta: float = 0.5, alpha: float = 0.0, max_iterations: int = 20, tolerance: float = 1e-6, seed: int | None = None, use_custom_risk_col: str | None = None, simulation: str | tuple[float, float] | None = None, ) -> list[int]: """Compute one optimal screening allocation. Returns one action per input row, preserving input order: - 0: ignore - 1: treat directly - 2: screen """ rows_with_risk = _build_rows_with_risk( rows=rows, outcome_col=outcome_col, strata_features=strata_features, prediction_col=prediction_col, seed=seed, use_custom_risk_col=use_custom_risk_col, simulation=simulation, ) rows_with_risk.sort(key=lambda x: x["empirical_risk"], reverse=True) _band1_end_idx, band2_end_idx, band3_end_idx, _avg_risk_band3 = _find_optimal_band_indices( rows_with_risk=rows_with_risk, beta=beta, alpha=alpha, max_iterations=max_iterations, tolerance=tolerance, ) actions_by_input_index: dict[int, int] = {} for sorted_index, item in enumerate(rows_with_risk): if sorted_index < band2_end_idx: action = 1 elif sorted_index < band3_end_idx: action = 2 else: action = 0 actions_by_input_index[item["_input_index"]] = action return [actions_by_input_index[i] for i in range(len(rows_with_risk))] def compute_optimal_screening_curve( rows: list[dict[str, Any]], outcome_col: str, strata_features: Sequence[str], prediction_col: str = "probability", beta: float = 0.5, alpha_quantiles: Sequence[float] | None = None, max_iterations: int = 20, tolerance: float = 1e-6, seed: int | None = None, use_custom_risk_col: str | None = None, simulation: str | tuple[float, float] | None = None, ) -> dict[str, Any]: """Compute optimal screening curve with treatment budget β and screening budget α. Band structure (highest to lowest risk): - Band 1: Top (β - α) - Treated, model predictions - Band 2: Next (α - avg_risk(Band 3)) - Treated, model predictions - Band 3: Next α - Screened (true outcomes) - Band 4: Bottom (1 - β - α + avg_risk) - Untreated (predict 0) Uses iterative method to resolve circular dependency between Band 2 and Band 3. Args: rows: List of data rows with features, outcome, and predictions outcome_col: Name of outcome column strata_features: Features defining strata for computing empirical P(Y=1|X) prediction_col: Column name for model predictions beta: Treatment budget (proportion who can be treated) alpha_quantiles: Screening budget levels to evaluate max_iterations: Maximum iterations for convergence tolerance: Convergence tolerance for avg_risk seed: Random seed for uniform distribution override (for debugging) use_custom_risk_col: If provided, use this column for risk instead of computing empirical probabilities from strata. Useful for comparing LLM predictions with empirical baselines. simulation: If provided, generate synthetic data from a Beta distribution instead of using real data. Pass a preset name ('uniform', 'bimodal', 'unimodal') or a tuple (a, b) of Beta distribution parameters. Uses SIMULATION_SIZE samples. Returns: Dictionary with screening curves and band information """ if alpha_quantiles is None: # Default: 10 equally spaced values from 0 to beta alpha_quantiles = [beta * i / 49 for i in range(50)] # Assign each row its risk (simulation, custom, or empirical) rows_with_risk = _build_rows_with_risk( rows=rows, outcome_col=outcome_col, strata_features=strata_features, prediction_col=prediction_col, seed=seed, use_custom_risk_col=use_custom_risk_col, simulation=simulation, ) # Sort by risk (highest to lowest) rows_with_risk.sort(key=lambda x: x["empirical_risk"], reverse=True) total_positive = sum(1 for r in rows_with_risk if r["true_outcome"]) n = len(rows_with_risk) # Results storage results = { "beta": beta, "alpha_values": [], "true_positives": [], "band_info": [], "total_positive": total_positive, "total_samples": n, } for alpha in alpha_quantiles: band1_end_idx, band2_end_idx, band3_end_idx, avg_risk_band3 = _find_optimal_band_indices( rows_with_risk=rows_with_risk, beta=beta, alpha=alpha, max_iterations=max_iterations, tolerance=tolerance, ) # Compute integrals: ∫ risk × (1/n) dx for each band (for reporting purposes) band1_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(0, band1_end_idx)) band2_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(band1_end_idx, band2_end_idx)) band3_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(band2_end_idx, band3_end_idx)) band4_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(band3_end_idx, n)) # Population proportions = ∫ f(risk) d(risk) for each band # This is the "mass" used for band selection band1_pop_prop = band1_end_idx / n band2_pop_prop = (band2_end_idx - band1_end_idx) / n band3_pop_prop = (band3_end_idx - band2_end_idx) / n band4_pop_prop = (n - band3_end_idx) / n # Expected negatives in Band 3: ∫ (1 - risk) × f(risk) d(risk) over Band 3 band3_expected_negatives = sum( (1 - rows_with_risk[i]["empirical_risk"]) / n for i in range(band2_end_idx, band3_end_idx) ) # Count true positives in each band tp_count = 0 # Band 1: Treated, empirical predictions for i in range(0, band1_end_idx): item = rows_with_risk[i] if item["true_outcome"]: tp_count += 1 # Band 2: Treated, empirical predictions for i in range(band1_end_idx, band2_end_idx): item = rows_with_risk[i] if item["true_outcome"]: tp_count += 1 # Band 3: Screened, use true outcomes for i in range(band2_end_idx, band3_end_idx): item = rows_with_risk[i] if item["true_outcome"]: tp_count += 1 # Band 4: Untreated, predict 0 (no TPs) # (no contribution to tp_count) results["alpha_values"].append(alpha) # Enforce monotonicity: TP can never decrease as screening budget grows tp_count = max(tp_count, results["true_positives"][-1] if results["true_positives"] else 0) results["true_positives"].append(tp_count) results["band_info"].append( { "alpha": alpha, "band1_integral": band1_integral, "band2_integral": band2_integral, "band3_integral": band3_integral, "band4_integral": band4_integral, "band1_pop_prop": band1_pop_prop, "band2_pop_prop": band2_pop_prop, "band3_pop_prop": band3_pop_prop, "band4_pop_prop": band4_pop_prop, "band3_expected_negatives": band3_expected_negatives, "avg_risk_band3": avg_risk_band3, "band1_end_idx": band1_end_idx, "band2_end_idx": band2_end_idx, "band3_end_idx": band3_end_idx, } ) return results def compute_random_screening_curve( rows: list[dict[str, Any]], outcome_col: str, strata_features: Sequence[str], prediction_col: str = "probability", beta: float = 0.5, alpha_quantiles: Sequence[float] | None = None, seed: int = 42, use_custom_risk_col: str | None = None, simulation: str | tuple[float, float] | None = None, ) -> dict[str, Any]: """Compute random screening baseline curve. This baseline screens α proportion of the population at random (instead of targeting low-risk individuals). It treats: 1. All screened individuals with Y=1 (true positive outcome) 2. From unscreened, treats top (β + prop_screened_negatives - prop_screened_positives) by risk The intuition: by randomly screening, we identify some negatives and don't waste treatment budget on them, allowing us to treat more high-risk unscreened individuals. Args: rows: List of data rows with features, outcome, and predictions outcome_col: Name of outcome column strata_features: Features defining strata (used for risk scoring) prediction_col: Column name for model predictions beta: Treatment budget (proportion who can be treated) alpha_quantiles: Screening budget levels to evaluate seed: Random seed for reproducible random screening use_custom_risk_col: If provided, use this column for risk instead of empirical simulation: If provided, generate synthetic data from a Beta distribution instead of using real data. Pass a preset name ('uniform', 'bimodal', 'unimodal') or a tuple (a, b) of Beta distribution parameters. Uses SIMULATION_SIZE samples. Returns: Dictionary with screening curves """ if alpha_quantiles is None: alpha_quantiles = [beta * i / 49 for i in range(50)] # Assign each row its risk (simulation, custom, or empirical) rows_with_risk = [] if simulation is not None: # Generate synthetic data from a Beta distribution if isinstance(simulation, str): if simulation not in RISK_PRESETS: raise ValueError(f"Unknown simulation preset '{simulation}'. Choose from {list(RISK_PRESETS.keys())}.") preset = RISK_PRESETS[simulation] else: preset = simulation if isinstance(preset, (int, float)): risk_scores, outcomes = generate_simulation_data(size=SIMULATION_SIZE, seed=seed, point_mass=float(preset)) else: a, b = preset risk_scores, outcomes = generate_simulation_data(a, b, size=SIMULATION_SIZE, seed=seed) for i in range(SIMULATION_SIZE): rows_with_risk.append( { "row": {"_sim_index": i, "_sim_feature": 0, outcome_col: bool(outcomes[i])}, "empirical_risk": float(risk_scores[i]), "true_outcome": bool(outcomes[i]), "model_prediction": float(risk_scores[i]), } ) elif use_custom_risk_col is not None: # Use custom risk column directly for row in rows: risk = row.get(use_custom_risk_col, 0.5) rows_with_risk.append( { "row": row, "empirical_risk": risk, "true_outcome": _is_positive_outcome(row.get(outcome_col)), "model_prediction": row.get(prediction_col, 0.5), } ) else: # Compute empirical P(Y=1|X) for each stratum empirical_probs = compute_empirical_probabilities(rows, outcome_col, strata_features) for row in rows: stratum_key = tuple(row.get(f) for f in strata_features) empirical_risk = empirical_probs.get(stratum_key, {}).get("probability", 0.5) rows_with_risk.append( { "row": row, "empirical_risk": empirical_risk, "true_outcome": _is_positive_outcome(row.get(outcome_col)), "model_prediction": row.get(prediction_col, 0.5), } ) total_positive = sum(1 for r in rows_with_risk if r["true_outcome"]) n = len(rows_with_risk) # Results storage results = { "beta": beta, "alpha_values": [], "true_positives": [], "total_positive": total_positive, "total_samples": n, } # Set random seed for reproducibility — use a single permutation so that # screened sets are nested (larger α always includes the smaller α set). rng = np.random.RandomState(seed) random_order = rng.permutation(n) for alpha in alpha_quantiles: assert alpha <= beta, f"Screening budget α={alpha} exceeds treatment budget β={beta}" # Screen α proportion uniformly at random n_screen = min(int(alpha * n), n) n_treat = int(beta * n) screened_indices = set(random_order[:n_screen]) # Identify screened positives (gamma mass) screened_positive_indices = {idx for idx in screened_indices if rows_with_risk[idx]["true_outcome"]} gamma_count = len(screened_positive_indices) # Treat screened positives up to budget tp_from_screening = min(gamma_count, n_treat) remaining_budget = max(0, n_treat - tp_from_screening) # Pool for risk-based treatment: everyone except screened positives pool = [(idx, rows_with_risk[idx]) for idx in range(n) if idx not in screened_positive_indices] pool.sort(key=lambda x: x[1]["empirical_risk"], reverse=True) # Treat top (β - γ) mass by risk score n_treat_by_risk = min(remaining_budget, len(pool)) tp_from_risk = sum(1 for i in range(n_treat_by_risk) if pool[i][1]["true_outcome"]) tp_count = tp_from_screening + tp_from_risk results["alpha_values"].append(alpha) results["true_positives"].append(tp_count) return results def compute_intuitive_optimal_curve( rows: list[dict[str, Any]], outcome_col: str, strata_features: Sequence[str], prediction_col: str = "probability", beta: float = 0.5, alpha_quantiles: Sequence[float] | None = None, seed: int | None = None, use_custom_risk_col: str | None = None, simulation: str | tuple[float, float] | None = None, ) -> dict[str, Any]: """Compute intuitive-optimal screening curve. Algorithm (all bands are adjacent slices of the risk-sorted population): 1. Band A: treat the top (β − α) mass by risk (highest risk, no screening). 2. Band B: screen the next α mass. Let γ ≤ α be the mass of screened individuals with Y=0. Screened Y=1 are treated; screened Y=0 are not. 3. Band C: treat the next γ mass below the screened band (replaces the screened negatives, preserving total treatment budget = β). Args: rows: List of data rows (ignored when *simulation* is set). outcome_col: Name of outcome column. strata_features: Features defining strata. prediction_col: Column name for model predictions. beta: Treatment budget (proportion who can be treated). alpha_quantiles: Screening budget levels to evaluate. seed: Random seed for simulation mode. use_custom_risk_col: Use this column for risk instead of empirical. simulation: Preset name or (a, b) Beta parameters for synthetic data. Returns: Dictionary with alpha_values, true_positives, total_positive, total_samples. """ if alpha_quantiles is None: alpha_quantiles = [beta * i / 49 for i in range(50)] # --- Build rows_with_risk (same logic as compute_optimal_screening_curve) --- rows_with_risk = [] if simulation is not None: if isinstance(simulation, str): if simulation not in RISK_PRESETS: raise ValueError(f"Unknown simulation preset '{simulation}'. Choose from {list(RISK_PRESETS.keys())}.") preset = RISK_PRESETS[simulation] else: preset = simulation if isinstance(preset, (int, float)): risk_scores, outcomes = generate_simulation_data(size=SIMULATION_SIZE, seed=seed, point_mass=float(preset)) else: a, b = preset risk_scores, outcomes = generate_simulation_data(a, b, size=SIMULATION_SIZE, seed=seed) for i in range(SIMULATION_SIZE): rows_with_risk.append( { "row": {"_sim_index": i, "_sim_feature": 0, outcome_col: bool(outcomes[i])}, "empirical_risk": float(risk_scores[i]), "true_outcome": bool(outcomes[i]), "model_prediction": float(risk_scores[i]), } ) elif use_custom_risk_col is not None: for row in rows: risk = row.get(use_custom_risk_col, 0.5) rows_with_risk.append( { "row": row, "empirical_risk": risk, "true_outcome": _is_positive_outcome(row.get(outcome_col)), "model_prediction": row.get(prediction_col, 0.5), } ) else: empirical_probs = compute_empirical_probabilities(rows, outcome_col, strata_features) for row in rows: stratum_key = tuple(row.get(f) for f in strata_features) empirical_risk = empirical_probs.get(stratum_key, {}).get("probability", 0.5) rows_with_risk.append( { "row": row, "empirical_risk": empirical_risk, "true_outcome": _is_positive_outcome(row.get(outcome_col)), "model_prediction": row.get(prediction_col, 0.5), } ) # Sort by risk (highest to lowest) rows_with_risk.sort(key=lambda x: x["empirical_risk"], reverse=True) total_positive = sum(1 for r in rows_with_risk if r["true_outcome"]) n = len(rows_with_risk) results = { "beta": beta, "alpha_values": [], "true_positives": [], "total_positive": total_positive, "total_samples": n, } for alpha in alpha_quantiles: assert alpha <= beta, f"Screening budget α={alpha} exceeds treatment budget β={beta}" band_a_end = int((beta - alpha) * n) band_b_end = band_a_end + int(alpha * n) # Band A: treated by risk tp_band_a = 0 for i in range(band_a_end): item = rows_with_risk[i] if item["true_outcome"]: tp_band_a += 1 # Band B: screened — Y=1 treated, Y=0 not treated tp_band_b = 0 gamma_count = 0 # number of screened with Y=0 for i in range(band_a_end, band_b_end): item = rows_with_risk[i] if item["true_outcome"]: tp_band_b += 1 else: gamma_count += 1 # Band C: next gamma_count individuals treated by risk band_c_end = min(band_b_end + gamma_count, n) tp_band_c = 0 for i in range(band_b_end, band_c_end): item = rows_with_risk[i] if item["true_outcome"]: tp_band_c += 1 tp_count = tp_band_a + tp_band_b + tp_band_c results["alpha_values"].append(alpha) results["true_positives"].append(tp_count) return results