| """ |
| feature_engineering_v2.py β Generate 60+ features from ingested GSK copay data. |
| |
| KEY CAPABILITY: Graceful degradation. Every feature group checks whether its |
| source columns exist before computing. Missing columns β feature is skipped |
| (filled with 0) and a warning is logged. |
| |
| Uses FEATURE_DEPENDENCIES from config.py to determine what each feature needs. |
| |
| HIERARCHICAL FEATURES (v3) |
| ========================== |
| Organized into 4 analytical levels: |
| 1. Transaction Level β per-claim anomalous behavior (gap, qty, DOS, benefit, OOP, NDC) |
| 2. Patient Level β patient behavioral anomalies (one-&-done, avg gap, active duration) |
| 3. HCP Level β prescriber-driven fraud indicators (specialty, avg benefit, concentration) |
| 4. Pharmacy Level β pharmacy-centric fraud patterns (active flag, one-&-done, HCP conc, avg benefit) |
| """ |
|
|
| import logging |
| import numpy as np |
| import pandas as pd |
| from sklearn.preprocessing import StandardScaler, OrdinalEncoder |
|
|
| from config import PRODUCT_CONFIG, FEATURE_DEPENDENCIES, GROUP_BENEFIT_CONFIG, COVERED_NDCS, CLAIM_SCENARIO_MAP |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _has_columns(df, cols): |
| """Check if all columns in `cols` exist in df.""" |
| return all(c in df.columns for c in cols) |
|
|
|
|
| def _safe_groupby_transform(df, group_col, value_col, transform, feature_name): |
| """Safely perform a groupby transform; if it fails, return zeros.""" |
| try: |
| return df.groupby(group_col)[value_col].transform(transform) |
| except Exception as e: |
| logger.warning(f"Feature '{feature_name}' groupby failed: {e}. Filling with 0.") |
| return pd.Series(0, index=df.index) |
|
|
|
|
| def _safe_rolling(df, group_col, date_col, value_col, window_days, agg, feature_name): |
| """Safely compute a time-based rolling window per group.""" |
| try: |
| result = ( |
| df.groupby(group_col) |
| .rolling(f"{window_days}D", on=date_col, min_periods=1)[value_col] |
| .agg(agg) |
| .reset_index(level=0, drop=True) |
| ) |
| return result.fillna(0).values |
| except Exception as e: |
| logger.warning(f"Feature '{feature_name}' rolling window failed: {e}. Filling with 0.") |
| return np.zeros(len(df)) |
|
|
|
|
| |
| |
| |
| def add_temporal_features(df: pd.DataFrame) -> pd.DataFrame: |
| deps = FEATURE_DEPENDENCIES |
|
|
| |
| if _has_columns(df, deps["days_between_fills"]): |
| df = df.sort_values(["patient_id", "fill_date"]).copy() |
| df["days_between_fills"] = df.groupby("patient_id")["fill_date"].diff().dt.days |
| df["days_between_fills"] = df["days_between_fills"].fillna(0) |
| else: |
| logger.warning("Skipping 'days_between_fills' β missing patient_id or fill_date") |
| df["days_between_fills"] = 0 |
|
|
| |
| if "days_between_fills" in df.columns: |
| threshold = PRODUCT_CONFIG["early_refill_threshold_days"] |
| df["early_refill_flag"] = (df["days_between_fills"] < threshold).astype(int).fillna(0) |
| else: |
| df["early_refill_flag"] = 0 |
|
|
| |
| if _has_columns(df, deps["days_since_first_fill"]): |
| first_fill = df.groupby("patient_id")["fill_date"].transform("min") |
| df["days_since_first_fill"] = (df["fill_date"] - first_fill).dt.days.fillna(0) |
| else: |
| df["days_since_first_fill"] = 0 |
|
|
| |
| if "fill_date" in df.columns: |
| df["claim_month"] = df["fill_date"].dt.month.fillna(0) |
| df["claim_dow"] = df["fill_date"].dt.dayofweek.fillna(0) |
| df["claim_quarter"] = df["fill_date"].dt.quarter.fillna(0) |
| else: |
| df["claim_month"] = 0 |
| df["claim_dow"] = 0 |
| df["claim_quarter"] = 0 |
|
|
| |
| if _has_columns(df, deps["rx_lag_days"]): |
| df["rx_lag_days"] = (df["fill_date"] - df["date_written"]).dt.days.fillna(0) |
| else: |
| df["rx_lag_days"] = 0 |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_rolling_window_features(df: pd.DataFrame) -> pd.DataFrame: |
| df = df.sort_values("fill_date").copy() |
| deps = FEATURE_DEPENDENCIES |
|
|
| |
| for window in [7, 30, 90]: |
| col = f"patient_fill_count_{window}d" |
| if _has_columns(df, deps[col]): |
| df[col] = _safe_rolling(df, "patient_id", "fill_date", "claim_id", window, "count", col) |
| else: |
| logger.warning(f"Skipping '{col}' β missing patient_id or fill_date") |
| df[col] = 0 |
|
|
| |
| for window in [7, 30, 90]: |
| col = f"patient_copay_spend_{window}d" |
| if _has_columns(df, ["patient_id", "fill_date", "copay_after"]): |
| df[col] = _safe_rolling(df, "patient_id", "fill_date", "copay_after", window, "sum", col) |
| else: |
| df[col] = 0 |
|
|
| |
| for window in [7, 30, 90]: |
| col = f"patient_total_claim_{window}d" |
| if _has_columns(df, ["patient_id", "fill_date", "usual_customary"]): |
| df[col] = _safe_rolling(df, "patient_id", "fill_date", "usual_customary", window, "sum", col) |
| else: |
| df[col] = 0 |
|
|
| |
| for window in [7, 30, 90]: |
| col = f"patient_benefit_{window}d" |
| if _has_columns(df, ["patient_id", "fill_date", "benefit_amount"]): |
| df[col] = _safe_rolling(df, "patient_id", "fill_date", "benefit_amount", window, "sum", col) |
| else: |
| df[col] = 0 |
|
|
| |
| for window in [30, 90]: |
| col = f"pharmacy_claim_count_{window}d" |
| if _has_columns(df, ["pharmacy_npi", "fill_date"]): |
| try: |
| counts = df.groupby(["pharmacy_npi", pd.Grouper(key="fill_date", freq=f"{window}D")]).size().reset_index(name="c") |
| df = df.merge(counts, on=["pharmacy_npi", "fill_date"], how="left") |
| df[col] = df["c"].fillna(0) |
| df.drop(columns=["c"], inplace=True, errors="ignore") |
| except Exception as e: |
| logger.warning(f"'{col}' pharmacy rolling failed: {e}") |
| df[col] = 0 |
| else: |
| df[col] = 0 |
|
|
| |
| for window in [30, 90]: |
| col = f"prescriber_claim_count_{window}d" |
| if _has_columns(df, ["prescriber_npi", "fill_date"]): |
| try: |
| counts = df.groupby(["prescriber_npi", pd.Grouper(key="fill_date", freq=f"{window}D")]).size().reset_index(name="c") |
| df = df.merge(counts, on=["prescriber_npi", "fill_date"], how="left") |
| df[col] = df["c"].fillna(0) |
| df.drop(columns=["c"], inplace=True, errors="ignore") |
| except Exception as e: |
| logger.warning(f"'{col}' prescriber rolling failed: {e}") |
| df[col] = 0 |
| else: |
| df[col] = 0 |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_patient_behavioral_features(df: pd.DataFrame) -> pd.DataFrame: |
| deps = FEATURE_DEPENDENCIES |
|
|
| |
| if _has_columns(df, deps["unique_pharmacies_overall"]): |
| df["unique_pharmacies_overall"] = df.groupby("patient_id")["pharmacy_npi"].transform("nunique") |
| else: |
| df["unique_pharmacies_overall"] = 1 |
|
|
| |
| if _has_columns(df, deps["unique_programs_per_patient"]): |
| df["unique_programs_per_patient"] = df.groupby("patient_id")["program_id"].transform("nunique") |
| else: |
| df["unique_programs_per_patient"] = 1 |
|
|
| |
| if _has_columns(df, deps["unique_prescribers_per_patient"]): |
| df["unique_prescribers_per_patient"] = df.groupby("patient_id")["prescriber_npi"].transform("nunique") |
| else: |
| df["unique_prescribers_per_patient"] = 1 |
|
|
| |
| if _has_columns(df, deps["total_fills_per_patient"]): |
| df["total_fills_per_patient"] = df.groupby("patient_id")["claim_id"].transform("count") |
| else: |
| df["total_fills_per_patient"] = 1 |
|
|
| |
| if _has_columns(df, deps["avg_days_between_fills"]): |
| avg_days = df.groupby("patient_id")["days_between_fills"].transform("mean") |
| std_days = df.groupby("patient_id")["days_between_fills"].transform("std") |
| df["avg_days_between_fills"] = avg_days.fillna(0) |
| df["std_days_between_fills"] = std_days.fillna(0) |
| else: |
| df["avg_days_between_fills"] = 0 |
| df["std_days_between_fills"] = 0 |
|
|
| |
| if _has_columns(df, deps["max_fills_any_30d"]): |
| df["max_fills_any_30d"] = df.groupby("patient_id")["patient_fill_count_30d"].transform("max") |
| else: |
| df["max_fills_any_30d"] = 0 |
|
|
| |
| if _has_columns(df, deps["copay_deviation_from_patient_mean"]): |
| mean = df.groupby("patient_id")["copay_after"].transform("mean") |
| df["copay_deviation_from_patient_mean"] = (df["copay_after"] - mean).abs() |
| else: |
| df["copay_deviation_from_patient_mean"] = 0 |
|
|
| |
| if _has_columns(df, deps["total_benefit_per_patient"]): |
| df["total_benefit_per_patient"] = df.groupby("patient_id")["benefit_amount"].transform("sum") |
| else: |
| df["total_benefit_per_patient"] = 0 |
|
|
| |
| |
| if _has_columns(df, deps["patient_active_duration"]): |
| first = df.groupby("patient_id")["fill_date"].transform("min") |
| last = df.groupby("patient_id")["fill_date"].transform("max") |
| df["patient_active_duration"] = (last - first).dt.days.fillna(0) |
| else: |
| df["patient_active_duration"] = 0 |
|
|
| |
| if _has_columns(df, deps["patient_avg_gap"]): |
| df["patient_avg_gap"] = df.groupby("patient_id")["days_between_fills"].transform("mean").fillna(0) |
| else: |
| df["patient_avg_gap"] = 0 |
|
|
| |
| if _has_columns(df, deps["patient_one_and_done"]): |
| df["patient_one_and_done"] = (df["total_fills_per_patient"] == 1).astype(int) |
| else: |
| df["patient_one_and_done"] = 0 |
|
|
| |
| short_thresh = PRODUCT_CONFIG["patient_gap_short_threshold"] |
| if _has_columns(df, deps["patient_short_gap_pct"]): |
| df["patient_short_gap_pct"] = ( |
| df.groupby("patient_id")["days_between_fills"] |
| .transform(lambda x: (x < short_thresh).mean()) |
| .fillna(0) |
| ) |
| else: |
| df["patient_short_gap_pct"] = 0 |
|
|
| |
| long_thresh = PRODUCT_CONFIG["patient_gap_long_threshold"] |
| if _has_columns(df, deps["patient_long_gap_pct"]): |
| df["patient_long_gap_pct"] = ( |
| df.groupby("patient_id")["days_between_fills"] |
| .transform(lambda x: (x > long_thresh).mean()) |
| .fillna(0) |
| ) |
| else: |
| df["patient_long_gap_pct"] = 0 |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_pharmacy_features(df: pd.DataFrame) -> pd.DataFrame: |
| deps = FEATURE_DEPENDENCIES |
|
|
| if "pharmacy_npi" not in df.columns: |
| logger.warning("Skipping all pharmacy features β pharmacy_npi missing") |
| for c in [ |
| "pharmacy_unique_patients", "pharmacy_total_claims", "pharmacy_claims_per_patient_ratio", |
| "pharmacy_avg_copay", "pharmacy_mail_order_pct", "pharmacy_reject_rate", |
| "pharmacy_paper_submission_rate", "pharmacy_unique_hcps", |
| "pharmacy_hcp_concentration", "pharmacy_one_and_done_pct", |
| "pharmacy_total_benefit", "pharmacy_avg_benefit", "pharmacy_high_benefit_flag", |
| "pharmacy_active_flag", "pharmacy_std_benefit", "pharmacy_max_benefit_per_patient", |
| "pharmacy_std_copay", "pharmacy_max_claim_per_patient", "pharmacy_fraud_risk_score", |
| ]: |
| df[c] = 0 |
| return df |
|
|
| df["pharmacy_unique_patients"] = df.groupby("pharmacy_npi")["patient_id"].transform("nunique") |
| df["pharmacy_total_claims"] = df.groupby("pharmacy_npi")["claim_id"].transform("count") |
| df["pharmacy_claims_per_patient_ratio"] = df["pharmacy_total_claims"] / df["pharmacy_unique_patients"].replace(0, 1) |
|
|
| if "copay_after" in df.columns: |
| df["pharmacy_avg_copay"] = df.groupby("pharmacy_npi")["copay_after"].transform("mean") |
| df["pharmacy_std_copay"] = df.groupby("pharmacy_npi")["copay_after"].transform("std").fillna(0) |
| else: |
| df["pharmacy_avg_copay"] = 0 |
| df["pharmacy_std_copay"] = 0 |
|
|
| if "mail_order" in df.columns: |
| df["pharmacy_mail_order_pct"] = df.groupby("pharmacy_npi")["mail_order"].transform("mean") |
| else: |
| df["pharmacy_mail_order_pct"] = 0 |
|
|
| if "has_reject_code" in df.columns: |
| df["pharmacy_reject_rate"] = df.groupby("pharmacy_npi")["has_reject_code"].transform("mean") |
| else: |
| df["pharmacy_reject_rate"] = 0 |
|
|
| if "paper_submission" in df.columns: |
| df["pharmacy_paper_submission_rate"] = df.groupby("pharmacy_npi")["paper_submission"].transform("mean") |
| else: |
| df["pharmacy_paper_submission_rate"] = 0 |
|
|
| |
| if "prescriber_npi" in df.columns: |
| df["pharmacy_unique_hcps"] = df.groupby("pharmacy_npi")["prescriber_npi"].transform("nunique") |
| |
| hcp_conc = df.groupby(["pharmacy_npi", "prescriber_npi"]).size().groupby(level=0).max() |
| total_claims = df.groupby("pharmacy_npi")["claim_id"].count() |
| hcp_conc_ratio = (hcp_conc / total_claims).fillna(0) |
| df["pharmacy_hcp_concentration"] = df["pharmacy_npi"].map(hcp_conc_ratio).fillna(0) |
| else: |
| df["pharmacy_unique_hcps"] = 1 |
| df["pharmacy_hcp_concentration"] = 0 |
|
|
| if "patient_one_and_done" in df.columns: |
| df["pharmacy_one_and_done_pct"] = df.groupby("pharmacy_npi")["patient_one_and_done"].transform("mean") |
| else: |
| df["pharmacy_one_and_done_pct"] = 0 |
|
|
| if "benefit_amount" in df.columns: |
| df["pharmacy_total_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("sum") |
| df["pharmacy_avg_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("mean") |
| df["pharmacy_std_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("std").fillna(0) |
| |
| max_ben = df.groupby(["pharmacy_npi", "patient_id"])["benefit_amount"].sum().groupby(level=0).max() |
| df["pharmacy_max_benefit_per_patient"] = df["pharmacy_npi"].map(max_ben).fillna(0) |
| else: |
| df["pharmacy_total_benefit"] = 0 |
| df["pharmacy_avg_benefit"] = 0 |
| df["pharmacy_std_benefit"] = 0 |
| df["pharmacy_max_benefit_per_patient"] = 0 |
|
|
| |
| hbt = PRODUCT_CONFIG["pharmacy_high_benefit_threshold"] |
| df["pharmacy_high_benefit_flag"] = (df["pharmacy_avg_benefit"] > hbt).astype(int) |
|
|
| |
| if "pharmacy_subcategory" in df.columns: |
| sub = df["pharmacy_subcategory"].fillna("").str.lower() |
| df["pharmacy_active_flag"] = ( |
| sub.str.contains("active|retail|chain|independent|mail").astype(int) |
| ) |
| else: |
| df["pharmacy_active_flag"] = 1 |
|
|
| |
| max_claim = df.groupby(["pharmacy_npi", "patient_id"]).size().groupby(level=0).max() |
| df["pharmacy_max_claim_per_patient"] = df["pharmacy_npi"].map(max_claim).fillna(0) |
|
|
| |
| reject_rate = df["pharmacy_reject_rate"].fillna(0) |
| paper_rate = df["pharmacy_paper_submission_rate"].fillna(0) |
| hcp_conc = df["pharmacy_hcp_concentration"].fillna(0) |
| one_done = df["pharmacy_one_and_done_pct"].fillna(0) |
| df["pharmacy_fraud_risk_score"] = ( |
| 0.25 * reject_rate + 0.20 * paper_rate + 0.30 * hcp_conc + 0.25 * one_done |
| ) |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_prescriber_features(df: pd.DataFrame) -> pd.DataFrame: |
| deps = FEATURE_DEPENDENCIES |
|
|
| if "prescriber_npi" not in df.columns: |
| logger.warning("Skipping prescriber features β prescriber_npi missing") |
| for c in [ |
| "prescriber_unique_patients", "prescriber_total_claims", |
| "hcp_total_benefit", "hcp_total_claims", "hcp_unique_patients", |
| "hcp_avg_benefit_per_claim", "hcp_avg_benefit_per_patient", |
| "hcp_unique_pharmacies", "hcp_one_and_done_pct", |
| "hcp_patient_concentration", "hcp_suspicious_specialty", |
| "hcp_high_benefit_flag", "hcp_max_benefit_per_patient", |
| "hcp_std_benefit", "hcp_patient_share", |
| ]: |
| df[c] = 0 |
| df["prescriber_specialty_valid"] = 1 |
| return df |
|
|
| df["prescriber_unique_patients"] = df.groupby("prescriber_npi")["patient_id"].transform("nunique") |
| df["prescriber_total_claims"] = df.groupby("prescriber_npi")["claim_id"].transform("count") |
|
|
| valid_specialties = set(s.lower() for s in PRODUCT_CONFIG["valid_prescriber_specialties"]) |
| if "prescriber_specialty" in df.columns: |
| df["prescriber_specialty_valid"] = ( |
| df["prescriber_specialty"].fillna("").str.lower().isin(valid_specialties).astype(int) |
| ) |
| else: |
| df["prescriber_specialty_valid"] = 1 |
|
|
| |
| if "benefit_amount" in df.columns: |
| df["hcp_total_benefit"] = df.groupby("prescriber_npi")["benefit_amount"].transform("sum") |
| df["hcp_avg_benefit_per_claim"] = df.groupby("prescriber_npi")["benefit_amount"].transform("mean") |
| df["hcp_std_benefit"] = df.groupby("prescriber_npi")["benefit_amount"].transform("std").fillna(0) |
| |
| total_ben = df.groupby("prescriber_npi")["benefit_amount"].transform("sum") |
| uniq_pts = df.groupby("prescriber_npi")["patient_id"].transform("nunique") |
| df["hcp_avg_benefit_per_patient"] = (total_ben / uniq_pts.replace(0, 1)).fillna(0) |
| |
| max_ben = df.groupby(["prescriber_npi", "patient_id"])["benefit_amount"].sum().groupby(level=0).max() |
| df["hcp_max_benefit_per_patient"] = df["prescriber_npi"].map(max_ben).fillna(0) |
| else: |
| df["hcp_total_benefit"] = 0 |
| df["hcp_avg_benefit_per_claim"] = 0 |
| df["hcp_std_benefit"] = 0 |
| df["hcp_avg_benefit_per_patient"] = 0 |
| df["hcp_max_benefit_per_patient"] = 0 |
|
|
| df["hcp_total_claims"] = df.groupby("prescriber_npi")["claim_id"].transform("count") |
| df["hcp_unique_patients"] = df.groupby("prescriber_npi")["patient_id"].transform("nunique") |
|
|
| if "pharmacy_npi" in df.columns: |
| df["hcp_unique_pharmacies"] = df.groupby("prescriber_npi")["pharmacy_npi"].transform("nunique") |
| else: |
| df["hcp_unique_pharmacies"] = 1 |
|
|
| |
| if "patient_one_and_done" in df.columns: |
| df["hcp_one_and_done_pct"] = df.groupby("prescriber_npi")["patient_one_and_done"].transform("mean") |
| else: |
| df["hcp_one_and_done_pct"] = 0 |
|
|
| |
| top_patient = df.groupby(["prescriber_npi", "patient_id"]).size().groupby(level=0).max() |
| total_claims = df.groupby("prescriber_npi")["claim_id"].count() |
| conc = (top_patient / total_claims).fillna(0) |
| df["hcp_patient_concentration"] = df["prescriber_npi"].map(conc).fillna(0) |
|
|
| |
| df["hcp_patient_share"] = df["hcp_total_claims"] / df["hcp_unique_patients"].replace(0, 1) |
|
|
| |
| sus_spec = set(s.lower() for s in PRODUCT_CONFIG["suspicious_prescriber_specialties"]) |
| if "prescriber_specialty" in df.columns: |
| spec = df["prescriber_specialty"].fillna("").str.lower() |
| df["hcp_suspicious_specialty"] = spec.isin(sus_spec).astype(int) |
| else: |
| df["hcp_suspicious_specialty"] = 0 |
|
|
| |
| hbt = PRODUCT_CONFIG["hcp_high_benefit_threshold"] |
| df["hcp_high_benefit_flag"] = (df["hcp_avg_benefit_per_patient"] > hbt).astype(int) |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_reject_code_features(df: pd.DataFrame) -> pd.DataFrame: |
| if "patient_id" not in df.columns: |
| for c in ["patient_reject_count", "patient_reject_rate", "patient_highrisk_reject_count", "patient_maximizer_count"]: |
| df[c] = 0 |
| return df |
|
|
| if "has_reject_code" in df.columns: |
| df["patient_reject_count"] = df.groupby("patient_id")["has_reject_code"].transform("sum") |
| df["patient_reject_rate"] = df.groupby("patient_id")["has_reject_code"].transform("mean") |
| else: |
| df["patient_reject_count"] = 0 |
| df["patient_reject_rate"] = 0 |
|
|
| if "high_risk_reject" in df.columns: |
| df["patient_highrisk_reject_count"] = df.groupby("patient_id")["high_risk_reject"].transform("sum") |
| else: |
| df["patient_highrisk_reject_count"] = 0 |
|
|
| if "maximizer_reject" in df.columns: |
| df["patient_maximizer_count"] = df.groupby("patient_id")["maximizer_reject"].transform("sum") |
| else: |
| df["patient_maximizer_count"] = 0 |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_plan_switching_features(df: pd.DataFrame) -> pd.DataFrame: |
| if "patient_id" not in df.columns: |
| df["unique_plans_per_patient"] = 1 |
| df["plan_switch_flag"] = 0 |
| df["unique_bins_per_patient"] = 1 |
| df["bin_switch_flag"] = 0 |
| return df |
|
|
| if "primary_plan_id" in df.columns: |
| df["unique_plans_per_patient"] = df.groupby("patient_id")["primary_plan_id"].transform("nunique") |
| else: |
| df["unique_plans_per_patient"] = 1 |
| df["plan_switch_flag"] = (df["unique_plans_per_patient"] > 1).astype(int) |
|
|
| if "primary_payer_bin" in df.columns: |
| df["unique_bins_per_patient"] = df.groupby("patient_id")["primary_payer_bin"].transform("nunique") |
| else: |
| df["unique_bins_per_patient"] = 1 |
| df["bin_switch_flag"] = (df["unique_bins_per_patient"] > 1).astype(int) |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_submission_features(df: pd.DataFrame) -> pd.DataFrame: |
| if _has_columns(df, FEATURE_DEPENDENCIES["patient_paper_submission_rate"]): |
| df["patient_paper_submission_rate"] = df.groupby("patient_id")["paper_submission"].transform("mean") |
| else: |
| df["patient_paper_submission_rate"] = 0 |
| return df |
|
|
|
|
| |
| |
| |
| def add_daw_features(df: pd.DataFrame) -> pd.DataFrame: |
| if _has_columns(df, FEATURE_DEPENDENCIES["patient_daw_brand_count"]): |
| df["patient_daw_brand_count"] = df.groupby("patient_id")["daw_brand_required"].transform("sum") |
| else: |
| df["patient_daw_brand_count"] = 0 |
| return df |
|
|
|
|
| |
| |
| |
| def add_linked_claim_features(df: pd.DataFrame) -> pd.DataFrame: |
| if "patient_id" not in df.columns: |
| df["patient_adjusted_count"] = 0 |
| df["patient_linked_count"] = 0 |
| return df |
|
|
| if "is_adjusted" in df.columns: |
| df["patient_adjusted_count"] = df.groupby("patient_id")["is_adjusted"].transform("sum") |
| else: |
| df["patient_adjusted_count"] = 0 |
|
|
| if "has_linked_claim" in df.columns: |
| df["patient_linked_count"] = df.groupby("patient_id")["has_linked_claim"].transform("sum") |
| else: |
| df["patient_linked_count"] = 0 |
|
|
| return df |
|
|
|
|
| |
| |
| |
| def add_drug_specific_features(df: pd.DataFrame) -> pd.DataFrame: |
| deps = FEATURE_DEPENDENCIES |
|
|
| |
| if _has_columns(df, deps["patient_ndc_count"]): |
| df["patient_ndc_count"] = df.groupby("patient_id")["drug_ndc"].transform("nunique") |
| else: |
| df["patient_ndc_count"] = 1 |
|
|
| df["ndc_switch_flag"] = (df["patient_ndc_count"] > 1).astype(int) |
|
|
| |
| df["govt_insurance_flag"] = (df.get("insurance_type", pd.Series("", index=df.index)) == "Government").astype(int) |
|
|
| |
| expected_qty = PRODUCT_CONFIG["quantity_expected"] |
| if "quantity" in df.columns: |
| df["quantity_anomaly"] = (df["quantity"] != expected_qty).astype(int) |
| else: |
| df["quantity_anomaly"] = 0 |
|
|
| |
| expected_ds = PRODUCT_CONFIG["days_supply_expected"] |
| if "days_supply" in df.columns: |
| df["days_supply_anomaly"] = (df["days_supply"] != expected_ds).astype(int) |
| else: |
| df["days_supply_anomaly"] = 0 |
|
|
| |
| if "patient_age" in df.columns: |
| df["age_violation_flag"] = (df["patient_age"] < PRODUCT_CONFIG["underage_threshold"]).astype(int) |
| else: |
| df["age_violation_flag"] = 0 |
|
|
| |
| if _has_columns(df, deps["new_patient_burst"]): |
| df["new_patient_burst"] = ( |
| (df["days_since_first_fill"] <= 7) & (df["total_fills_per_patient"] > 1) |
| ).astype(int) |
| else: |
| df["new_patient_burst"] = 0 |
|
|
| |
| if _has_columns(df, deps["cross_state_fill"]): |
| df["cross_state_fill"] = (df["patient_state"] != df["pharmacy_state"]).astype(int) |
| else: |
| df["cross_state_fill"] = 0 |
|
|
| |
| for col, zname in [("copay_after", "copay_zscore"), ("usual_customary", "total_claim_zscore"), ("usual_customary", "uc_zscore")]: |
| if col in df.columns and "patient_id" in df.columns: |
| mean = df.groupby("patient_id")[col].transform("mean") |
| std = df.groupby("patient_id")[col].transform("std").replace(0, np.nan) |
| df[zname] = ((df[col] - mean) / std).fillna(0) |
| else: |
| df[zname] = 0 |
|
|
| |
| if _has_columns(df, deps["benefit_ratio"]): |
| df["benefit_ratio"] = (df["benefit_amount"] / df["usual_customary"].replace(0, np.nan)).fillna(0) |
| else: |
| df["benefit_ratio"] = 0 |
|
|
| |
| if "benefit_amount" in df.columns: |
| ben_mean = df["benefit_amount"].mean() |
| ben_std = df["benefit_amount"].std() |
| if ben_std > 0: |
| df["transaction_benefit_score"] = ((df["benefit_amount"] - ben_mean) / ben_std).fillna(0) |
| else: |
| df["transaction_benefit_score"] = 0 |
| else: |
| df["transaction_benefit_score"] = 0 |
|
|
| if "copay_after" in df.columns: |
| oop_mean = df["copay_after"].mean() |
| oop_std = df["copay_after"].std() |
| if oop_std > 0: |
| df["transaction_oop_score"] = ((df["copay_after"] - oop_mean) / oop_std).fillna(0) |
| else: |
| df["transaction_oop_score"] = 0 |
| else: |
| df["transaction_oop_score"] = 0 |
|
|
| return df |
|
|
|
|
| |
| |
| |
|
|
| def get_days_supply_tier(days_supply): |
| """Map days_supply to tier string for benefit lookup.""" |
| if pd.isna(days_supply): |
| return "1-30" |
| ds = int(days_supply) |
| if ds <= 30: |
| return "1-30" |
| elif ds <= 60: |
| return "31-60" |
| else: |
| return "61-90" |
|
|
|
|
| def lookup_benefit_cap(group_id, scenario, fill_date, days_supply_tier): |
| """Look up the max allowed benefit for a claim based on group config.""" |
| if pd.isna(group_id) or str(group_id).strip().upper() == "UNKNOWN": |
| return np.nan |
| gid = str(group_id).strip().upper() |
| if gid not in GROUP_BENEFIT_CONFIG: |
| return np.nan |
| cfg = GROUP_BENEFIT_CONFIG[gid] |
| if scenario not in cfg["covered_scenarios"]: |
| return 0.0 |
| |
| if fill_date is pd.NaT or pd.isna(fill_date): |
| return np.nan |
| for (sched_scenario, start_str, end_str), tier_map in cfg["benefit_schedule"].items(): |
| if sched_scenario != scenario: |
| continue |
| start_dt = pd.Timestamp(start_str) |
| end_dt = pd.Timestamp(end_str) |
| if start_dt <= fill_date <= end_dt: |
| return tier_map.get(days_supply_tier, np.nan) |
| return np.nan |
|
|
|
|
| def add_group_aware_features(df: pd.DataFrame) -> pd.DataFrame: |
| """Generate features based on Group 8141 vs Group 8200 benefit rules.""" |
| deps = FEATURE_DEPENDENCIES |
|
|
| |
| if "days_supply" in df.columns: |
| df["days_supply_tier"] = df["days_supply"].apply(get_days_supply_tier) |
| else: |
| df["days_supply_tier"] = "1-30" |
|
|
| |
| if _has_columns(df, deps["allowed_benefit_cap"]): |
| df["allowed_benefit_cap"] = df.apply( |
| lambda row: lookup_benefit_cap( |
| row.get("group_id", "UNKNOWN"), |
| row.get("claim_scenario_derived", "commercial_approved"), |
| row.get("fill_date", pd.NaT), |
| row.get("days_supply_tier", "1-30"), |
| ), axis=1, |
| ) |
| else: |
| logger.warning("Skipping 'allowed_benefit_cap' β missing group_id, claim_scenario, fill_date, or days_supply") |
| df["allowed_benefit_cap"] = np.nan |
|
|
| |
| if "benefit_amount" in df.columns and "allowed_benefit_cap" in df.columns: |
| df["cap_utilization_ratio"] = ( |
| df["benefit_amount"] / df["allowed_benefit_cap"].replace(0, np.nan) |
| ).fillna(0).clip(lower=0) |
| |
| df["cap_utilization_ratio"] = df["cap_utilization_ratio"].clip(upper=10) |
| else: |
| df["cap_utilization_ratio"] = 0 |
|
|
| |
| if "benefit_amount" in df.columns and "allowed_benefit_cap" in df.columns: |
| df["excess_payment_amount"] = ( |
| (df["benefit_amount"] - df["allowed_benefit_cap"]).clip(lower=0) |
| ).fillna(0) |
| else: |
| df["excess_payment_amount"] = 0 |
|
|
| |
| if _has_columns(df, deps["scenario_not_covered_flag"]): |
| def is_scenario_not_covered(row): |
| gid = str(row.get("group_id", "UNKNOWN")).strip().upper() |
| scenario = row.get("claim_scenario_derived", "commercial_approved") |
| if gid not in GROUP_BENEFIT_CONFIG: |
| return 0 |
| return int(scenario in GROUP_BENEFIT_CONFIG[gid]["non_covered_scenarios"]) |
| df["scenario_not_covered_flag"] = df.apply(is_scenario_not_covered, axis=1) |
| else: |
| df["scenario_not_covered_flag"] = 0 |
|
|
| |
| if _has_columns(df, deps["invalid_period_benefit_flag"]): |
| def is_invalid_period(row): |
| gid = str(row.get("group_id", "UNKNOWN")).strip().upper() |
| scenario = row.get("claim_scenario_derived", "commercial_approved") |
| fill_date = row.get("fill_date", pd.NaT) |
| if gid not in GROUP_BENEFIT_CONFIG: |
| return 0 |
| if scenario not in GROUP_BENEFIT_CONFIG[gid]["covered_scenarios"]: |
| return 0 |
| |
| for (sched_scenario, start_str, end_str), _ in GROUP_BENEFIT_CONFIG[gid]["benefit_schedule"].items(): |
| if sched_scenario == scenario: |
| start_dt = pd.Timestamp(start_str) |
| end_dt = pd.Timestamp(end_str) |
| if start_dt <= fill_date <= end_dt: |
| return 0 |
| return 1 |
| df["invalid_period_benefit_flag"] = df.apply(is_invalid_period, axis=1) |
| else: |
| df["invalid_period_benefit_flag"] = 0 |
|
|
| |
| df["group_benefit_mismatch_flag"] = ( |
| (df["scenario_not_covered_flag"] == 1) | (df["invalid_period_benefit_flag"] == 1) |
| ).astype(int) |
|
|
| |
| if _has_columns(df, deps["annual_fill_count"]): |
| df["claim_year"] = df["fill_date"].dt.year |
| group_key = ["patient_id", "claim_year"] |
| if "card_id" in df.columns: |
| group_key.append("card_id") |
| df["annual_fill_count"] = df.groupby(group_key)["claim_id"].transform("count") |
| else: |
| df["annual_fill_count"] = 1 |
|
|
| |
| if _has_columns(df, deps["annual_days_supply_count"]): |
| group_key = ["patient_id", "claim_year"] |
| if "card_id" in df.columns: |
| group_key.append("card_id") |
| df["annual_days_supply_count"] = df.groupby(group_key)["days_supply"].transform("sum") |
| else: |
| df["annual_days_supply_count"] = df.get("days_supply", pd.Series(30, index=df.index)) |
|
|
| |
| max_fills = PRODUCT_CONFIG["annual_max_fills_per_patient"] |
| df["annual_fill_limit_exceeded"] = (df["annual_fill_count"] > max_fills).astype(int) |
|
|
| |
| max_ds = PRODUCT_CONFIG["annual_max_days_supply_per_patient"] |
| df["annual_days_supply_limit_exceeded"] = (df["annual_days_supply_count"] > max_ds).astype(int) |
|
|
| |
| if _has_columns(df, deps["non_covered_ndc_flag"]): |
| df["non_covered_ndc_flag"] = (~df["drug_ndc"].isin(COVERED_NDCS)).astype(int) |
| else: |
| df["non_covered_ndc_flag"] = 0 |
|
|
| |
| if _has_columns(df, deps["govt_claim_with_benefit_flag"]): |
| df["govt_claim_with_benefit_flag"] = ( |
| (df["insurance_type"] == "Government") & (df["benefit_amount"] > 0) |
| ).astype(int) |
| else: |
| df["govt_claim_with_benefit_flag"] = 0 |
|
|
| |
| if _has_columns(df, deps["quantity_out_of_range_flag"]): |
| qmin = PRODUCT_CONFIG["quantity_min"] |
| qmax = PRODUCT_CONFIG["quantity_max"] |
| df["quantity_out_of_range_flag"] = ( |
| (df["quantity"] < qmin) | (df["quantity"] > qmax) |
| ).astype(int) |
| else: |
| df["quantity_out_of_range_flag"] = 0 |
|
|
| |
| if _has_columns(df, deps["days_supply_out_of_range_flag"]): |
| dsmin = PRODUCT_CONFIG["days_supply_min"] |
| dsmax = PRODUCT_CONFIG["days_supply_max"] |
| df["days_supply_out_of_range_flag"] = ( |
| (df["days_supply"] < dsmin) | (df["days_supply"] > dsmax) |
| ).astype(int) |
| else: |
| df["days_supply_out_of_range_flag"] = 0 |
|
|
| |
| if _has_columns(df, deps["max_benefit_repeat_flag"]): |
| def get_cap(gid, scenario, tier): |
| if pd.isna(gid): |
| return np.nan |
| g = str(gid).strip().upper() |
| if g not in GROUP_BENEFIT_CONFIG: |
| return np.nan |
| for (sched_scenario, start_str, end_str), tier_map in GROUP_BENEFIT_CONFIG[g]["benefit_schedule"].items(): |
| if sched_scenario == scenario: |
| return tier_map.get(tier, np.nan) |
| return np.nan |
|
|
| df["_tmp_cap"] = df.apply( |
| lambda row: get_cap(row.get("group_id"), row.get("claim_scenario_derived"), row.get("days_supply_tier")), |
| axis=1, |
| ) |
| |
| df["_at_cap"] = (df["benefit_amount"] >= (df["_tmp_cap"] - 1)).astype(int) |
| cap_counts = df.groupby(["patient_id", "pharmacy_npi"])["_at_cap"].transform("sum") |
| df["max_benefit_repeat_flag"] = (cap_counts >= 3).astype(int) |
| df.drop(columns=["_tmp_cap", "_at_cap"], inplace=True, errors="ignore") |
| else: |
| df["max_benefit_repeat_flag"] = 0 |
|
|
| logger.info(f"Group-aware features added: cap_utilization_ratio, excess_payment, scenario_not_covered, annual limits, NDC coverage, max_benefit_repeat") |
| return df |
|
|
|
|
| |
| |
| |
| def engineer_features(df: pd.DataFrame) -> pd.DataFrame: |
| logger.info(f"Feature engineering on {len(df):,} claims...") |
| df = add_temporal_features(df) |
| df = add_rolling_window_features(df) |
| df = add_patient_behavioral_features(df) |
| df = add_pharmacy_features(df) |
| df = add_prescriber_features(df) |
| df = add_reject_code_features(df) |
| df = add_plan_switching_features(df) |
| df = add_submission_features(df) |
| df = add_daw_features(df) |
| df = add_linked_claim_features(df) |
| df = add_drug_specific_features(df) |
| df = add_group_aware_features(df) |
| logger.info(f"Feature engineering complete: {len(df.columns)} total columns") |
| return df |
|
|
|
|
| |
| |
| |
| def scale_and_encode(df: pd.DataFrame) -> tuple: |
| """ |
| Scale numerics + encode categoricals. Only uses columns that actually exist. |
| Returns (df_processed, scaler, encoder, feature_names). |
| """ |
| |
| cat_candidates = [ |
| "claim_type", "refill_indicator", "rx_origin_code", "daw_code", |
| "other_coverage", "mail_order_indicator", "pharmacy_subcategory", |
| "prescriber_specialty", "primary_payer_bin", "primary_payer_pcn", |
| "primary_plan_id", "primary_plan_name", "primary_model_type", |
| "submission_method", "submission_type", "payment_method", |
| "reject_code", "reject_type", "linked_claim_type", |
| "group_number", "group_name", "pharmacy_state", "patient_state", |
| "drug_name", "drug_form", "drug_strength", "insurance_type", |
| "hcp_suspicious_specialty", |
| "pharmacy_active_flag", "pharmacy_high_benefit_flag", |
| "hcp_high_benefit_flag", |
| "risk_tier", |
| ] |
| cat_present = [c for c in cat_candidates if c in df.columns] |
|
|
| |
| num_candidates = [ |
| |
| "days_supply", "quantity", "copay_before", "copay_after", |
| "benefit_amount", "usual_customary", "dispensing_fee", "sales_tax", |
| "remaining_balance", "number_of_benefits", "total_copay", |
| |
| "days_between_fills", "days_since_first_fill", "claim_month", |
| "claim_dow", "claim_quarter", "rx_lag_days", |
| |
| "patient_fill_count_7d", "patient_fill_count_30d", "patient_fill_count_90d", |
| "patient_copay_spend_7d", "patient_copay_spend_30d", "patient_copay_spend_90d", |
| "patient_total_claim_7d", "patient_total_claim_30d", "patient_total_claim_90d", |
| "patient_benefit_7d", "patient_benefit_30d", "patient_benefit_90d", |
| |
| "pharmacy_claim_count_30d", "pharmacy_claim_count_90d", |
| "prescriber_claim_count_30d", "prescriber_claim_count_90d", |
| |
| "unique_pharmacies_overall", "unique_programs_per_patient", |
| "unique_prescribers_per_patient", "total_fills_per_patient", |
| "avg_days_between_fills", "std_days_between_fills", "max_fills_any_30d", |
| "copay_deviation_from_patient_mean", "total_benefit_per_patient", |
| "patient_reject_count", "patient_reject_rate", "patient_highrisk_reject_count", |
| "patient_maximizer_count", "unique_plans_per_patient", "unique_bins_per_patient", |
| "patient_paper_submission_rate", "patient_daw_brand_count", |
| "patient_adjusted_count", "patient_linked_count", |
| "patient_ndc_count", "copay_zscore", "total_claim_zscore", |
| "uc_zscore", "benefit_ratio", |
| |
| "patient_active_duration", "patient_avg_gap", |
| "patient_one_and_done", "patient_short_gap_pct", "patient_long_gap_pct", |
| |
| "hcp_total_benefit", "hcp_total_claims", "hcp_unique_patients", |
| "hcp_avg_benefit_per_claim", "hcp_avg_benefit_per_patient", |
| "hcp_unique_pharmacies", "hcp_one_and_done_pct", |
| "hcp_patient_concentration", "hcp_max_benefit_per_patient", |
| "hcp_std_benefit", "hcp_patient_share", |
| |
| "pharmacy_unique_patients", "pharmacy_total_claims", |
| "pharmacy_claims_per_patient_ratio", "pharmacy_avg_copay", |
| "pharmacy_mail_order_pct", "pharmacy_reject_rate", |
| "pharmacy_paper_submission_rate", "pharmacy_unique_hcps", |
| "pharmacy_hcp_concentration", "pharmacy_one_and_done_pct", |
| "pharmacy_total_benefit", "pharmacy_avg_benefit", |
| "pharmacy_std_benefit", "pharmacy_max_benefit_per_patient", |
| "pharmacy_std_copay", "pharmacy_max_claim_per_patient", |
| "pharmacy_fraud_risk_score", |
| |
| "transaction_benefit_score", "transaction_oop_score", |
| |
| "cap_utilization_ratio", "excess_payment_amount", |
| "annual_fill_count", "annual_days_supply_count", |
| ] |
| num_present = [c for c in num_candidates if c in df.columns] |
|
|
| logger.info(f"Encoding {len(cat_present)} categorical, scaling {len(num_present)} numeric features") |
|
|
| |
| for c in cat_present: |
| df[f"_orig_{c}"] = df[c].astype(str) |
|
|
| |
| if cat_present: |
| encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1) |
| df_cat = pd.DataFrame( |
| encoder.fit_transform(df[cat_present].fillna("MISSING")), |
| columns=cat_present, index=df.index |
| ) |
| else: |
| encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1) |
| df_cat = pd.DataFrame(index=df.index) |
|
|
| |
| if num_present: |
| scaler = StandardScaler() |
| df_num = pd.DataFrame( |
| scaler.fit_transform(df[num_present].fillna(0)), |
| columns=num_present, index=df.index |
| ) |
| else: |
| scaler = StandardScaler() |
| df_num = pd.DataFrame(index=df.index) |
|
|
| df_processed = pd.concat([df_num, df_cat], axis=1) |
| feature_names = num_present + cat_present |
| return df_processed, scaler, encoder, feature_names |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| parser = argparse.ArgumentParser(description="Feature engineering v2") |
| parser.add_argument("--input", default="data/ingested.parquet", help="Ingested parquet") |
| parser.add_argument("--output", default="data/features.parquet", help="Output parquet") |
| args = parser.parse_args() |
|
|
| df = pd.read_parquet(args.input) |
| df = engineer_features(df) |
| df.to_parquet(args.output, index=False) |
| logger.info(f"Saved features to {args.output}") |
|
|