Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from collections import defaultdict | |
| from collections.abc import Sequence | |
| from typing import Any | |
| import numpy as np | |
| SIMULATION_SIZE = 100_000 | |
| RISK_PRESETS: dict[str, tuple[float, float] | float] = { | |
| "uniform": (1.0, 1.0), | |
| "bimodal": (0.1, 0.1), | |
| "unimodal": (10.0, 10.0), | |
| "delta_half": 0.5, | |
| } | |
| def compute_empirical_probabilities( | |
| rows: list[dict[str, Any]], | |
| outcome_col: str, | |
| strata_features: Sequence[str], | |
| ) -> dict[tuple[Any, ...], dict[str, Any]]: | |
| """Compute empirical P(Y=1|X) for each feature stratum from true outcomes. | |
| Args: | |
| rows: List of data rows (dicts) with features and outcome | |
| outcome_col: Name of the outcome column (e.g., "PINCP > 50k") | |
| strata_features: List of feature names to define strata (e.g., ['AGEP', 'SEX']) | |
| Returns: | |
| Dictionary mapping stratum key -> { | |
| 'probability': empirical P(Y=1|X) = (# Y=1) / (# total), | |
| 'count': number of samples in stratum, | |
| 'positive_count': number of Y=1 samples, | |
| 'features': dict of feature values for this stratum | |
| } | |
| Example: | |
| >>> rows = [ | |
| ... {'AGEP': 35, 'SEX': 1, 'PINCP > 50k': True}, | |
| ... {'AGEP': 35, 'SEX': 1, 'PINCP > 50k': False}, | |
| ... {'AGEP': 35, 'SEX': 1, 'PINCP > 50k': True}, | |
| ... ] | |
| >>> strata = compute_empirical_probabilities(rows, 'PINCP > 50k', ['AGEP', 'SEX']) | |
| >>> strata[(35, 1)]['probability'] | |
| 0.6666666666666666 | |
| >>> strata[(35, 1)]['count'] | |
| 3 | |
| """ | |
| # Group by strata and count outcomes | |
| strata_counts: dict[tuple[Any, ...], dict[str, int]] = defaultdict(lambda: {"total": 0, "positive": 0}) | |
| strata_features_map: dict[tuple[Any, ...], dict[str, Any]] = {} | |
| for row in rows: | |
| # Create stratum key from selected features | |
| stratum_key = tuple(row.get(f) for f in strata_features) | |
| # Count outcomes | |
| outcome_value = row.get(outcome_col) | |
| strata_counts[stratum_key]["total"] += 1 | |
| # Convert outcome to boolean (handle "True"/"False" strings, True/False, 1/0, etc.) | |
| if _is_positive_outcome(outcome_value): | |
| strata_counts[stratum_key]["positive"] += 1 | |
| # Store feature values for this stratum | |
| if stratum_key not in strata_features_map: | |
| strata_features_map[stratum_key] = {f: row.get(f) for f in strata_features} | |
| # Compute empirical P(Y=1|X) for each stratum | |
| result = {} | |
| for stratum_key, counts in strata_counts.items(): | |
| total = counts["total"] | |
| positive = counts["positive"] | |
| result[stratum_key] = { | |
| "probability": positive / total if total > 0 else 0.0, | |
| "count": total, | |
| "positive_count": positive, | |
| "features": strata_features_map[stratum_key], | |
| } | |
| return result | |
| def _is_positive_outcome(value: Any) -> bool: | |
| """Helper to determine if outcome value represents Y=1.""" | |
| if value is None: | |
| return False | |
| if isinstance(value, bool): | |
| return value | |
| if isinstance(value, (int, float)): | |
| return value > 0 | |
| if isinstance(value, str): | |
| return value.lower() in ("true", "1", "yes", "t", "y") | |
| return False | |
| def generate_simulation_data( | |
| a: float | None = None, | |
| b: float | None = None, | |
| size: int = SIMULATION_SIZE, | |
| seed: int | None = None, | |
| point_mass: float | None = None, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| """Generate synthetic risk scores and binary outcomes. | |
| Supports two modes: | |
| - **Beta-Binomial**: risk_scores ~ Beta(a, b), outcomes ~ Binomial(1, risk_scores). | |
| - **Point mass**: all risk_scores = *point_mass*, outcomes ~ Binomial(1, point_mass). | |
| Args: | |
| a: Alpha parameter of the Beta distribution (ignored when *point_mass* is set). | |
| b: Beta parameter of the Beta distribution (ignored when *point_mass* is set). | |
| size: Number of samples to generate. | |
| seed: Random seed for reproducibility. | |
| point_mass: If provided, every risk score is set to this constant value. | |
| Returns: | |
| Tuple of (risk_scores, outcomes). | |
| """ | |
| rng = np.random.default_rng(seed) | |
| if point_mass is not None: | |
| risk_scores = np.full(size, point_mass) | |
| else: | |
| risk_scores = rng.beta(a, b, size=size) | |
| outcomes = rng.binomial(1, risk_scores) | |
| return risk_scores, outcomes | |
| def _build_rows_with_risk( | |
| rows: list[dict[str, Any]], | |
| outcome_col: str, | |
| strata_features: Sequence[str], | |
| prediction_col: str, | |
| seed: int | None, | |
| use_custom_risk_col: str | None, | |
| simulation: str | tuple[float, float] | None, | |
| ) -> list[dict[str, Any]]: | |
| rows_with_risk = [] | |
| if simulation is not None: | |
| # Generate synthetic data from a Beta distribution | |
| if isinstance(simulation, str): | |
| if simulation not in RISK_PRESETS: | |
| raise ValueError(f"Unknown simulation preset '{simulation}'. Choose from {list(RISK_PRESETS.keys())}.") | |
| preset = RISK_PRESETS[simulation] | |
| else: | |
| preset = simulation | |
| if isinstance(preset, (int, float)): | |
| risk_scores, outcomes = generate_simulation_data(size=SIMULATION_SIZE, seed=seed, point_mass=float(preset)) | |
| else: | |
| a, b = preset | |
| risk_scores, outcomes = generate_simulation_data(a, b, size=SIMULATION_SIZE, seed=seed) | |
| for i in range(SIMULATION_SIZE): | |
| rows_with_risk.append( | |
| { | |
| "row": {"_sim_index": i, "_sim_feature": 0, outcome_col: bool(outcomes[i])}, | |
| "empirical_risk": float(risk_scores[i]), | |
| "true_outcome": bool(outcomes[i]), | |
| "model_prediction": float(risk_scores[i]), | |
| "_input_index": i, | |
| } | |
| ) | |
| elif use_custom_risk_col is not None: | |
| # Use custom risk column directly | |
| for input_index, row in enumerate(rows): | |
| risk = row.get(use_custom_risk_col, 0.5) | |
| rows_with_risk.append( | |
| { | |
| "row": row, | |
| "empirical_risk": risk, | |
| "true_outcome": _is_positive_outcome(row.get(outcome_col)), | |
| "model_prediction": row.get(prediction_col, 0.5), | |
| "_input_index": input_index, | |
| } | |
| ) | |
| else: | |
| # Compute empirical P(Y=1|X) for each stratum | |
| empirical_probs = compute_empirical_probabilities(rows, outcome_col, strata_features) | |
| for input_index, row in enumerate(rows): | |
| stratum_key = tuple(row.get(f) for f in strata_features) | |
| empirical_risk = empirical_probs.get(stratum_key, {}).get("probability", 0.5) | |
| rows_with_risk.append( | |
| { | |
| "row": row, | |
| "empirical_risk": empirical_risk, | |
| "true_outcome": _is_positive_outcome(row.get(outcome_col)), | |
| "model_prediction": row.get(prediction_col, 0.5), | |
| "_input_index": input_index, | |
| } | |
| ) | |
| return rows_with_risk | |
| def _end_index_for_target_mass(n: int, target_mass: float) -> int: | |
| if n <= 0 or target_mass <= 0: | |
| return 0 | |
| cumulative_mass = 0.0 | |
| for i in range(n): | |
| cumulative_mass += 1.0 / n | |
| if cumulative_mass >= target_mass: | |
| return i + 1 | |
| return n | |
| def _find_optimal_band_indices( | |
| rows_with_risk: list[dict[str, Any]], | |
| beta: float, | |
| alpha: float, | |
| max_iterations: int, | |
| tolerance: float, | |
| ) -> tuple[int, int, int, float]: | |
| assert alpha <= beta, f"Screening budget α={alpha} exceeds treatment budget β={beta}" | |
| n = len(rows_with_risk) | |
| if n == 0: | |
| return 0, 0, 0, 0.0 | |
| prev_avg_risk = 0.0 | |
| band1_end_idx = 0 | |
| band2_end_idx = 0 | |
| band3_end_idx = 0 | |
| avg_risk_band3 = 0.0 | |
| for _iteration in range(max_iterations): | |
| # Compute target mass: ∫ f(risk) d(risk) = target | |
| # Where f(risk) is the density over risk values | |
| # For discrete: sum of (count at each risk / total count) = proportion of population at that risk | |
| band1_target_mass = beta - alpha | |
| # Band 2 size: ∫ 1 × f(risk) d(risk) over Band 2 = ∫ (1 - risk) × f(risk) d(risk) over Band 3 | |
| # Since Band 3 has mass α and average risk prev_avg_risk: | |
| # ∫ (1 - risk) × f(risk) d(risk) over Band 3 = α × (1 - prev_avg_risk) | |
| band2_target_mass = alpha * (1 - prev_avg_risk) | |
| band3_target_mass = alpha | |
| band1_end_idx = _end_index_for_target_mass(n, band1_target_mass) | |
| band2_end_idx = _end_index_for_target_mass(n, band1_target_mass + band2_target_mass) | |
| band3_end_idx = _end_index_for_target_mass(n, band1_target_mass + band2_target_mass + band3_target_mass) | |
| # Ensure indices are ordered and within bounds | |
| band1_end_idx = min(band1_end_idx, n) | |
| band2_end_idx = max(band1_end_idx, min(band2_end_idx, n)) | |
| band3_end_idx = max(band2_end_idx, min(band3_end_idx, n)) | |
| # Compute average risk of Band 3 | |
| if band3_end_idx > band2_end_idx: | |
| band3_risks = [rows_with_risk[i]["empirical_risk"] for i in range(band2_end_idx, band3_end_idx)] | |
| current_avg_risk = np.mean(band3_risks) if band3_risks else 0.0 | |
| else: | |
| current_avg_risk = 0.0 | |
| avg_risk_band3 = current_avg_risk | |
| # Check convergence | |
| if abs(current_avg_risk - prev_avg_risk) < tolerance: | |
| break | |
| prev_avg_risk = current_avg_risk | |
| return band1_end_idx, band2_end_idx, band3_end_idx, avg_risk_band3 | |
| def compute_optimal_screening_actions( | |
| rows: list[dict[str, Any]], | |
| outcome_col: str, | |
| strata_features: Sequence[str], | |
| prediction_col: str = "probability", | |
| beta: float = 0.5, | |
| alpha: float = 0.0, | |
| max_iterations: int = 20, | |
| tolerance: float = 1e-6, | |
| seed: int | None = None, | |
| use_custom_risk_col: str | None = None, | |
| simulation: str | tuple[float, float] | None = None, | |
| ) -> list[int]: | |
| """Compute one optimal screening allocation. | |
| Returns one action per input row, preserving input order: | |
| - 0: ignore | |
| - 1: treat directly | |
| - 2: screen | |
| """ | |
| rows_with_risk = _build_rows_with_risk( | |
| rows=rows, | |
| outcome_col=outcome_col, | |
| strata_features=strata_features, | |
| prediction_col=prediction_col, | |
| seed=seed, | |
| use_custom_risk_col=use_custom_risk_col, | |
| simulation=simulation, | |
| ) | |
| rows_with_risk.sort(key=lambda x: x["empirical_risk"], reverse=True) | |
| _band1_end_idx, band2_end_idx, band3_end_idx, _avg_risk_band3 = _find_optimal_band_indices( | |
| rows_with_risk=rows_with_risk, | |
| beta=beta, | |
| alpha=alpha, | |
| max_iterations=max_iterations, | |
| tolerance=tolerance, | |
| ) | |
| actions_by_input_index: dict[int, int] = {} | |
| for sorted_index, item in enumerate(rows_with_risk): | |
| if sorted_index < band2_end_idx: | |
| action = 1 | |
| elif sorted_index < band3_end_idx: | |
| action = 2 | |
| else: | |
| action = 0 | |
| actions_by_input_index[item["_input_index"]] = action | |
| return [actions_by_input_index[i] for i in range(len(rows_with_risk))] | |
| def compute_optimal_screening_curve( | |
| rows: list[dict[str, Any]], | |
| outcome_col: str, | |
| strata_features: Sequence[str], | |
| prediction_col: str = "probability", | |
| beta: float = 0.5, | |
| alpha_quantiles: Sequence[float] | None = None, | |
| max_iterations: int = 20, | |
| tolerance: float = 1e-6, | |
| seed: int | None = None, | |
| use_custom_risk_col: str | None = None, | |
| simulation: str | tuple[float, float] | None = None, | |
| ) -> dict[str, Any]: | |
| """Compute optimal screening curve with treatment budget β and screening budget α. | |
| Band structure (highest to lowest risk): | |
| - Band 1: Top (β - α) - Treated, model predictions | |
| - Band 2: Next (α - avg_risk(Band 3)) - Treated, model predictions | |
| - Band 3: Next α - Screened (true outcomes) | |
| - Band 4: Bottom (1 - β - α + avg_risk) - Untreated (predict 0) | |
| Uses iterative method to resolve circular dependency between Band 2 and Band 3. | |
| Args: | |
| rows: List of data rows with features, outcome, and predictions | |
| outcome_col: Name of outcome column | |
| strata_features: Features defining strata for computing empirical P(Y=1|X) | |
| prediction_col: Column name for model predictions | |
| beta: Treatment budget (proportion who can be treated) | |
| alpha_quantiles: Screening budget levels to evaluate | |
| max_iterations: Maximum iterations for convergence | |
| tolerance: Convergence tolerance for avg_risk | |
| seed: Random seed for uniform distribution override (for debugging) | |
| use_custom_risk_col: If provided, use this column for risk instead of computing | |
| empirical probabilities from strata. Useful for comparing LLM predictions | |
| with empirical baselines. | |
| simulation: If provided, generate synthetic data from a Beta distribution instead | |
| of using real data. Pass a preset name ('uniform', 'bimodal', 'unimodal') or | |
| a tuple (a, b) of Beta distribution parameters. Uses SIMULATION_SIZE samples. | |
| Returns: | |
| Dictionary with screening curves and band information | |
| """ | |
| if alpha_quantiles is None: | |
| # Default: 10 equally spaced values from 0 to beta | |
| alpha_quantiles = [beta * i / 49 for i in range(50)] | |
| # Assign each row its risk (simulation, custom, or empirical) | |
| rows_with_risk = _build_rows_with_risk( | |
| rows=rows, | |
| outcome_col=outcome_col, | |
| strata_features=strata_features, | |
| prediction_col=prediction_col, | |
| seed=seed, | |
| use_custom_risk_col=use_custom_risk_col, | |
| simulation=simulation, | |
| ) | |
| # Sort by risk (highest to lowest) | |
| rows_with_risk.sort(key=lambda x: x["empirical_risk"], reverse=True) | |
| total_positive = sum(1 for r in rows_with_risk if r["true_outcome"]) | |
| n = len(rows_with_risk) | |
| # Results storage | |
| results = { | |
| "beta": beta, | |
| "alpha_values": [], | |
| "true_positives": [], | |
| "band_info": [], | |
| "total_positive": total_positive, | |
| "total_samples": n, | |
| } | |
| for alpha in alpha_quantiles: | |
| band1_end_idx, band2_end_idx, band3_end_idx, avg_risk_band3 = _find_optimal_band_indices( | |
| rows_with_risk=rows_with_risk, | |
| beta=beta, | |
| alpha=alpha, | |
| max_iterations=max_iterations, | |
| tolerance=tolerance, | |
| ) | |
| # Compute integrals: ∫ risk × (1/n) dx for each band (for reporting purposes) | |
| band1_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(0, band1_end_idx)) | |
| band2_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(band1_end_idx, band2_end_idx)) | |
| band3_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(band2_end_idx, band3_end_idx)) | |
| band4_integral = sum(rows_with_risk[i]["empirical_risk"] / n for i in range(band3_end_idx, n)) | |
| # Population proportions = ∫ f(risk) d(risk) for each band | |
| # This is the "mass" used for band selection | |
| band1_pop_prop = band1_end_idx / n | |
| band2_pop_prop = (band2_end_idx - band1_end_idx) / n | |
| band3_pop_prop = (band3_end_idx - band2_end_idx) / n | |
| band4_pop_prop = (n - band3_end_idx) / n | |
| # Expected negatives in Band 3: ∫ (1 - risk) × f(risk) d(risk) over Band 3 | |
| band3_expected_negatives = sum( | |
| (1 - rows_with_risk[i]["empirical_risk"]) / n for i in range(band2_end_idx, band3_end_idx) | |
| ) | |
| # Count true positives in each band | |
| tp_count = 0 | |
| # Band 1: Treated, empirical predictions | |
| for i in range(0, band1_end_idx): | |
| item = rows_with_risk[i] | |
| if item["true_outcome"]: | |
| tp_count += 1 | |
| # Band 2: Treated, empirical predictions | |
| for i in range(band1_end_idx, band2_end_idx): | |
| item = rows_with_risk[i] | |
| if item["true_outcome"]: | |
| tp_count += 1 | |
| # Band 3: Screened, use true outcomes | |
| for i in range(band2_end_idx, band3_end_idx): | |
| item = rows_with_risk[i] | |
| if item["true_outcome"]: | |
| tp_count += 1 | |
| # Band 4: Untreated, predict 0 (no TPs) | |
| # (no contribution to tp_count) | |
| results["alpha_values"].append(alpha) | |
| # Enforce monotonicity: TP can never decrease as screening budget grows | |
| tp_count = max(tp_count, results["true_positives"][-1] if results["true_positives"] else 0) | |
| results["true_positives"].append(tp_count) | |
| results["band_info"].append( | |
| { | |
| "alpha": alpha, | |
| "band1_integral": band1_integral, | |
| "band2_integral": band2_integral, | |
| "band3_integral": band3_integral, | |
| "band4_integral": band4_integral, | |
| "band1_pop_prop": band1_pop_prop, | |
| "band2_pop_prop": band2_pop_prop, | |
| "band3_pop_prop": band3_pop_prop, | |
| "band4_pop_prop": band4_pop_prop, | |
| "band3_expected_negatives": band3_expected_negatives, | |
| "avg_risk_band3": avg_risk_band3, | |
| "band1_end_idx": band1_end_idx, | |
| "band2_end_idx": band2_end_idx, | |
| "band3_end_idx": band3_end_idx, | |
| } | |
| ) | |
| return results | |
| def compute_random_screening_curve( | |
| rows: list[dict[str, Any]], | |
| outcome_col: str, | |
| strata_features: Sequence[str], | |
| prediction_col: str = "probability", | |
| beta: float = 0.5, | |
| alpha_quantiles: Sequence[float] | None = None, | |
| seed: int = 42, | |
| use_custom_risk_col: str | None = None, | |
| simulation: str | tuple[float, float] | None = None, | |
| ) -> dict[str, Any]: | |
| """Compute random screening baseline curve. | |
| This baseline screens α proportion of the population at random (instead of targeting | |
| low-risk individuals). It treats: | |
| 1. All screened individuals with Y=1 (true positive outcome) | |
| 2. From unscreened, treats top (β + prop_screened_negatives - prop_screened_positives) by risk | |
| The intuition: by randomly screening, we identify some negatives and don't waste treatment | |
| budget on them, allowing us to treat more high-risk unscreened individuals. | |
| Args: | |
| rows: List of data rows with features, outcome, and predictions | |
| outcome_col: Name of outcome column | |
| strata_features: Features defining strata (used for risk scoring) | |
| prediction_col: Column name for model predictions | |
| beta: Treatment budget (proportion who can be treated) | |
| alpha_quantiles: Screening budget levels to evaluate | |
| seed: Random seed for reproducible random screening | |
| use_custom_risk_col: If provided, use this column for risk instead of empirical | |
| simulation: If provided, generate synthetic data from a Beta distribution instead | |
| of using real data. Pass a preset name ('uniform', 'bimodal', 'unimodal') or | |
| a tuple (a, b) of Beta distribution parameters. Uses SIMULATION_SIZE samples. | |
| Returns: | |
| Dictionary with screening curves | |
| """ | |
| if alpha_quantiles is None: | |
| alpha_quantiles = [beta * i / 49 for i in range(50)] | |
| # Assign each row its risk (simulation, custom, or empirical) | |
| rows_with_risk = [] | |
| if simulation is not None: | |
| # Generate synthetic data from a Beta distribution | |
| if isinstance(simulation, str): | |
| if simulation not in RISK_PRESETS: | |
| raise ValueError(f"Unknown simulation preset '{simulation}'. Choose from {list(RISK_PRESETS.keys())}.") | |
| preset = RISK_PRESETS[simulation] | |
| else: | |
| preset = simulation | |
| if isinstance(preset, (int, float)): | |
| risk_scores, outcomes = generate_simulation_data(size=SIMULATION_SIZE, seed=seed, point_mass=float(preset)) | |
| else: | |
| a, b = preset | |
| risk_scores, outcomes = generate_simulation_data(a, b, size=SIMULATION_SIZE, seed=seed) | |
| for i in range(SIMULATION_SIZE): | |
| rows_with_risk.append( | |
| { | |
| "row": {"_sim_index": i, "_sim_feature": 0, outcome_col: bool(outcomes[i])}, | |
| "empirical_risk": float(risk_scores[i]), | |
| "true_outcome": bool(outcomes[i]), | |
| "model_prediction": float(risk_scores[i]), | |
| } | |
| ) | |
| elif use_custom_risk_col is not None: | |
| # Use custom risk column directly | |
| for row in rows: | |
| risk = row.get(use_custom_risk_col, 0.5) | |
| rows_with_risk.append( | |
| { | |
| "row": row, | |
| "empirical_risk": risk, | |
| "true_outcome": _is_positive_outcome(row.get(outcome_col)), | |
| "model_prediction": row.get(prediction_col, 0.5), | |
| } | |
| ) | |
| else: | |
| # Compute empirical P(Y=1|X) for each stratum | |
| empirical_probs = compute_empirical_probabilities(rows, outcome_col, strata_features) | |
| for row in rows: | |
| stratum_key = tuple(row.get(f) for f in strata_features) | |
| empirical_risk = empirical_probs.get(stratum_key, {}).get("probability", 0.5) | |
| rows_with_risk.append( | |
| { | |
| "row": row, | |
| "empirical_risk": empirical_risk, | |
| "true_outcome": _is_positive_outcome(row.get(outcome_col)), | |
| "model_prediction": row.get(prediction_col, 0.5), | |
| } | |
| ) | |
| total_positive = sum(1 for r in rows_with_risk if r["true_outcome"]) | |
| n = len(rows_with_risk) | |
| # Results storage | |
| results = { | |
| "beta": beta, | |
| "alpha_values": [], | |
| "true_positives": [], | |
| "total_positive": total_positive, | |
| "total_samples": n, | |
| } | |
| # Set random seed for reproducibility — use a single permutation so that | |
| # screened sets are nested (larger α always includes the smaller α set). | |
| rng = np.random.RandomState(seed) | |
| random_order = rng.permutation(n) | |
| for alpha in alpha_quantiles: | |
| assert alpha <= beta, f"Screening budget α={alpha} exceeds treatment budget β={beta}" | |
| # Screen α proportion uniformly at random | |
| n_screen = min(int(alpha * n), n) | |
| n_treat = int(beta * n) | |
| screened_indices = set(random_order[:n_screen]) | |
| # Identify screened positives (gamma mass) | |
| screened_positive_indices = {idx for idx in screened_indices if rows_with_risk[idx]["true_outcome"]} | |
| gamma_count = len(screened_positive_indices) | |
| # Treat screened positives up to budget | |
| tp_from_screening = min(gamma_count, n_treat) | |
| remaining_budget = max(0, n_treat - tp_from_screening) | |
| # Pool for risk-based treatment: everyone except screened positives | |
| pool = [(idx, rows_with_risk[idx]) for idx in range(n) if idx not in screened_positive_indices] | |
| pool.sort(key=lambda x: x[1]["empirical_risk"], reverse=True) | |
| # Treat top (β - γ) mass by risk score | |
| n_treat_by_risk = min(remaining_budget, len(pool)) | |
| tp_from_risk = sum(1 for i in range(n_treat_by_risk) if pool[i][1]["true_outcome"]) | |
| tp_count = tp_from_screening + tp_from_risk | |
| results["alpha_values"].append(alpha) | |
| results["true_positives"].append(tp_count) | |
| return results | |
| def compute_intuitive_optimal_curve( | |
| rows: list[dict[str, Any]], | |
| outcome_col: str, | |
| strata_features: Sequence[str], | |
| prediction_col: str = "probability", | |
| beta: float = 0.5, | |
| alpha_quantiles: Sequence[float] | None = None, | |
| seed: int | None = None, | |
| use_custom_risk_col: str | None = None, | |
| simulation: str | tuple[float, float] | None = None, | |
| ) -> dict[str, Any]: | |
| """Compute intuitive-optimal screening curve. | |
| Algorithm (all bands are adjacent slices of the risk-sorted population): | |
| 1. Band A: treat the top (β − α) mass by risk (highest risk, no screening). | |
| 2. Band B: screen the next α mass. Let γ ≤ α be the mass of screened | |
| individuals with Y=0. Screened Y=1 are treated; screened Y=0 are not. | |
| 3. Band C: treat the next γ mass below the screened band (replaces the | |
| screened negatives, preserving total treatment budget = β). | |
| Args: | |
| rows: List of data rows (ignored when *simulation* is set). | |
| outcome_col: Name of outcome column. | |
| strata_features: Features defining strata. | |
| prediction_col: Column name for model predictions. | |
| beta: Treatment budget (proportion who can be treated). | |
| alpha_quantiles: Screening budget levels to evaluate. | |
| seed: Random seed for simulation mode. | |
| use_custom_risk_col: Use this column for risk instead of empirical. | |
| simulation: Preset name or (a, b) Beta parameters for synthetic data. | |
| Returns: | |
| Dictionary with alpha_values, true_positives, total_positive, total_samples. | |
| """ | |
| if alpha_quantiles is None: | |
| alpha_quantiles = [beta * i / 49 for i in range(50)] | |
| # --- Build rows_with_risk (same logic as compute_optimal_screening_curve) --- | |
| rows_with_risk = [] | |
| if simulation is not None: | |
| if isinstance(simulation, str): | |
| if simulation not in RISK_PRESETS: | |
| raise ValueError(f"Unknown simulation preset '{simulation}'. Choose from {list(RISK_PRESETS.keys())}.") | |
| preset = RISK_PRESETS[simulation] | |
| else: | |
| preset = simulation | |
| if isinstance(preset, (int, float)): | |
| risk_scores, outcomes = generate_simulation_data(size=SIMULATION_SIZE, seed=seed, point_mass=float(preset)) | |
| else: | |
| a, b = preset | |
| risk_scores, outcomes = generate_simulation_data(a, b, size=SIMULATION_SIZE, seed=seed) | |
| for i in range(SIMULATION_SIZE): | |
| rows_with_risk.append( | |
| { | |
| "row": {"_sim_index": i, "_sim_feature": 0, outcome_col: bool(outcomes[i])}, | |
| "empirical_risk": float(risk_scores[i]), | |
| "true_outcome": bool(outcomes[i]), | |
| "model_prediction": float(risk_scores[i]), | |
| } | |
| ) | |
| elif use_custom_risk_col is not None: | |
| for row in rows: | |
| risk = row.get(use_custom_risk_col, 0.5) | |
| rows_with_risk.append( | |
| { | |
| "row": row, | |
| "empirical_risk": risk, | |
| "true_outcome": _is_positive_outcome(row.get(outcome_col)), | |
| "model_prediction": row.get(prediction_col, 0.5), | |
| } | |
| ) | |
| else: | |
| empirical_probs = compute_empirical_probabilities(rows, outcome_col, strata_features) | |
| for row in rows: | |
| stratum_key = tuple(row.get(f) for f in strata_features) | |
| empirical_risk = empirical_probs.get(stratum_key, {}).get("probability", 0.5) | |
| rows_with_risk.append( | |
| { | |
| "row": row, | |
| "empirical_risk": empirical_risk, | |
| "true_outcome": _is_positive_outcome(row.get(outcome_col)), | |
| "model_prediction": row.get(prediction_col, 0.5), | |
| } | |
| ) | |
| # Sort by risk (highest to lowest) | |
| rows_with_risk.sort(key=lambda x: x["empirical_risk"], reverse=True) | |
| total_positive = sum(1 for r in rows_with_risk if r["true_outcome"]) | |
| n = len(rows_with_risk) | |
| results = { | |
| "beta": beta, | |
| "alpha_values": [], | |
| "true_positives": [], | |
| "total_positive": total_positive, | |
| "total_samples": n, | |
| } | |
| for alpha in alpha_quantiles: | |
| assert alpha <= beta, f"Screening budget α={alpha} exceeds treatment budget β={beta}" | |
| band_a_end = int((beta - alpha) * n) | |
| band_b_end = band_a_end + int(alpha * n) | |
| # Band A: treated by risk | |
| tp_band_a = 0 | |
| for i in range(band_a_end): | |
| item = rows_with_risk[i] | |
| if item["true_outcome"]: | |
| tp_band_a += 1 | |
| # Band B: screened — Y=1 treated, Y=0 not treated | |
| tp_band_b = 0 | |
| gamma_count = 0 # number of screened with Y=0 | |
| for i in range(band_a_end, band_b_end): | |
| item = rows_with_risk[i] | |
| if item["true_outcome"]: | |
| tp_band_b += 1 | |
| else: | |
| gamma_count += 1 | |
| # Band C: next gamma_count individuals treated by risk | |
| band_c_end = min(band_b_end + gamma_count, n) | |
| tp_band_c = 0 | |
| for i in range(band_b_end, band_c_end): | |
| item = rows_with_risk[i] | |
| if item["true_outcome"]: | |
| tp_band_c += 1 | |
| tp_count = tp_band_a + tp_band_b + tp_band_c | |
| results["alpha_values"].append(alpha) | |
| results["true_positives"].append(tp_count) | |
| return results | |