""" feature_engineering_v2.py — Generate 60+ features from ingested GSK copay data. KEY CAPABILITY: Graceful degradation. Every feature group checks whether its source columns exist before computing. Missing columns → feature is skipped (filled with 0) and a warning is logged. Uses FEATURE_DEPENDENCIES from config.py to determine what each feature needs. HIERARCHICAL FEATURES (v3) ========================== Organized into 4 analytical levels: 1. Transaction Level — per-claim anomalous behavior (gap, qty, DOS, benefit, OOP, NDC) 2. Patient Level — patient behavioral anomalies (one-&-done, avg gap, active duration) 3. HCP Level — prescriber-driven fraud indicators (specialty, avg benefit, concentration) 4. Pharmacy Level — pharmacy-centric fraud patterns (active flag, one-&-done, HCP conc, avg benefit) """ import logging import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler, OrdinalEncoder from config import PRODUCT_CONFIG, FEATURE_DEPENDENCIES, GROUP_BENEFIT_CONFIG, COVERED_NDCS, CLAIM_SCENARIO_MAP logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) def _has_columns(df, cols): """Check if all columns in `cols` exist in df.""" return all(c in df.columns for c in cols) def _safe_groupby_transform(df, group_col, value_col, transform, feature_name): """Safely perform a groupby transform; if it fails, return zeros.""" try: return df.groupby(group_col)[value_col].transform(transform) except Exception as e: logger.warning(f"Feature '{feature_name}' groupby failed: {e}. Filling with 0.") return pd.Series(0, index=df.index) def _safe_rolling(df, group_col, date_col, value_col, window_days, agg, feature_name): """Safely compute a time-based rolling window per group.""" try: result = ( df.groupby(group_col) .rolling(f"{window_days}D", on=date_col, min_periods=1)[value_col] .agg(agg) .reset_index(level=0, drop=True) ) return result.fillna(0).values except Exception as e: logger.warning(f"Feature '{feature_name}' rolling window failed: {e}. Filling with 0.") return np.zeros(len(df)) # ═══════════════════════════════════════════════════════════════════════════════ # 4.1 Temporal Features (Transaction Level) # ═══════════════════════════════════════════════════════════════════════════════ def add_temporal_features(df: pd.DataFrame) -> pd.DataFrame: deps = FEATURE_DEPENDENCIES # days_between_fills if _has_columns(df, deps["days_between_fills"]): df = df.sort_values(["patient_id", "fill_date"]).copy() df["days_between_fills"] = df.groupby("patient_id")["fill_date"].diff().dt.days df["days_between_fills"] = df["days_between_fills"].fillna(0) else: logger.warning("Skipping 'days_between_fills' — missing patient_id or fill_date") df["days_between_fills"] = 0 # early_refill_flag if "days_between_fills" in df.columns: threshold = PRODUCT_CONFIG["early_refill_threshold_days"] df["early_refill_flag"] = (df["days_between_fills"] < threshold).astype(int).fillna(0) else: df["early_refill_flag"] = 0 # days_since_first_fill if _has_columns(df, deps["days_since_first_fill"]): first_fill = df.groupby("patient_id")["fill_date"].transform("min") df["days_since_first_fill"] = (df["fill_date"] - first_fill).dt.days.fillna(0) else: df["days_since_first_fill"] = 0 # Calendar features if "fill_date" in df.columns: df["claim_month"] = df["fill_date"].dt.month.fillna(0) df["claim_dow"] = df["fill_date"].dt.dayofweek.fillna(0) df["claim_quarter"] = df["fill_date"].dt.quarter.fillna(0) else: df["claim_month"] = 0 df["claim_dow"] = 0 df["claim_quarter"] = 0 # rx_lag_days if _has_columns(df, deps["rx_lag_days"]): df["rx_lag_days"] = (df["fill_date"] - df["date_written"]).dt.days.fillna(0) else: df["rx_lag_days"] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.2 Rolling Windows (Patient Level) # ═══════════════════════════════════════════════════════════════════════════════ def add_rolling_window_features(df: pd.DataFrame) -> pd.DataFrame: df = df.sort_values("fill_date").copy() deps = FEATURE_DEPENDENCIES # Patient-level rolling counts for window in [7, 30, 90]: col = f"patient_fill_count_{window}d" if _has_columns(df, deps[col]): df[col] = _safe_rolling(df, "patient_id", "fill_date", "claim_id", window, "count", col) else: logger.warning(f"Skipping '{col}' — missing patient_id or fill_date") df[col] = 0 # Patient-level copay spend for window in [7, 30, 90]: col = f"patient_copay_spend_{window}d" if _has_columns(df, ["patient_id", "fill_date", "copay_after"]): df[col] = _safe_rolling(df, "patient_id", "fill_date", "copay_after", window, "sum", col) else: df[col] = 0 # Patient-level total claim amount (usual_customary proxy) for window in [7, 30, 90]: col = f"patient_total_claim_{window}d" if _has_columns(df, ["patient_id", "fill_date", "usual_customary"]): df[col] = _safe_rolling(df, "patient_id", "fill_date", "usual_customary", window, "sum", col) else: df[col] = 0 # Patient-level benefit amount for window in [7, 30, 90]: col = f"patient_benefit_{window}d" if _has_columns(df, ["patient_id", "fill_date", "benefit_amount"]): df[col] = _safe_rolling(df, "patient_id", "fill_date", "benefit_amount", window, "sum", col) else: df[col] = 0 # Pharmacy-level rolling counts for window in [30, 90]: col = f"pharmacy_claim_count_{window}d" if _has_columns(df, ["pharmacy_npi", "fill_date"]): try: counts = df.groupby(["pharmacy_npi", pd.Grouper(key="fill_date", freq=f"{window}D")]).size().reset_index(name="c") df = df.merge(counts, on=["pharmacy_npi", "fill_date"], how="left") df[col] = df["c"].fillna(0) df.drop(columns=["c"], inplace=True, errors="ignore") except Exception as e: logger.warning(f"'{col}' pharmacy rolling failed: {e}") df[col] = 0 else: df[col] = 0 # Prescriber-level rolling counts for window in [30, 90]: col = f"prescriber_claim_count_{window}d" if _has_columns(df, ["prescriber_npi", "fill_date"]): try: counts = df.groupby(["prescriber_npi", pd.Grouper(key="fill_date", freq=f"{window}D")]).size().reset_index(name="c") df = df.merge(counts, on=["prescriber_npi", "fill_date"], how="left") df[col] = df["c"].fillna(0) df.drop(columns=["c"], inplace=True, errors="ignore") except Exception as e: logger.warning(f"'{col}' prescriber rolling failed: {e}") df[col] = 0 else: df[col] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.3 Patient Behavioral Features (Patient Level) # ═══════════════════════════════════════════════════════════════════════════════ def add_patient_behavioral_features(df: pd.DataFrame) -> pd.DataFrame: deps = FEATURE_DEPENDENCIES # unique_pharmacies_overall if _has_columns(df, deps["unique_pharmacies_overall"]): df["unique_pharmacies_overall"] = df.groupby("patient_id")["pharmacy_npi"].transform("nunique") else: df["unique_pharmacies_overall"] = 1 # unique_programs_per_patient if _has_columns(df, deps["unique_programs_per_patient"]): df["unique_programs_per_patient"] = df.groupby("patient_id")["program_id"].transform("nunique") else: df["unique_programs_per_patient"] = 1 # unique_prescribers_per_patient if _has_columns(df, deps["unique_prescribers_per_patient"]): df["unique_prescribers_per_patient"] = df.groupby("patient_id")["prescriber_npi"].transform("nunique") else: df["unique_prescribers_per_patient"] = 1 # total_fills_per_patient if _has_columns(df, deps["total_fills_per_patient"]): df["total_fills_per_patient"] = df.groupby("patient_id")["claim_id"].transform("count") else: df["total_fills_per_patient"] = 1 # avg / std days_between_fills if _has_columns(df, deps["avg_days_between_fills"]): avg_days = df.groupby("patient_id")["days_between_fills"].transform("mean") std_days = df.groupby("patient_id")["days_between_fills"].transform("std") df["avg_days_between_fills"] = avg_days.fillna(0) df["std_days_between_fills"] = std_days.fillna(0) else: df["avg_days_between_fills"] = 0 df["std_days_between_fills"] = 0 # max_fills_any_30d if _has_columns(df, deps["max_fills_any_30d"]): df["max_fills_any_30d"] = df.groupby("patient_id")["patient_fill_count_30d"].transform("max") else: df["max_fills_any_30d"] = 0 # copay_deviation_from_patient_mean if _has_columns(df, deps["copay_deviation_from_patient_mean"]): mean = df.groupby("patient_id")["copay_after"].transform("mean") df["copay_deviation_from_patient_mean"] = (df["copay_after"] - mean).abs() else: df["copay_deviation_from_patient_mean"] = 0 # total_benefit_per_patient if _has_columns(df, deps["total_benefit_per_patient"]): df["total_benefit_per_patient"] = df.groupby("patient_id")["benefit_amount"].transform("sum") else: df["total_benefit_per_patient"] = 0 # ── Patient Level v3 ── # patient_active_duration: days from first fill to last fill per patient if _has_columns(df, deps["patient_active_duration"]): first = df.groupby("patient_id")["fill_date"].transform("min") last = df.groupby("patient_id")["fill_date"].transform("max") df["patient_active_duration"] = (last - first).dt.days.fillna(0) else: df["patient_active_duration"] = 0 # patient_avg_gap: average gap between fills per patient if _has_columns(df, deps["patient_avg_gap"]): df["patient_avg_gap"] = df.groupby("patient_id")["days_between_fills"].transform("mean").fillna(0) else: df["patient_avg_gap"] = 0 # patient_one_and_done: patient has exactly 1 fill total if _has_columns(df, deps["patient_one_and_done"]): df["patient_one_and_done"] = (df["total_fills_per_patient"] == 1).astype(int) else: df["patient_one_and_done"] = 0 # patient_short_gap_pct: % of fills with gap < threshold short_thresh = PRODUCT_CONFIG["patient_gap_short_threshold"] if _has_columns(df, deps["patient_short_gap_pct"]): df["patient_short_gap_pct"] = ( df.groupby("patient_id")["days_between_fills"] .transform(lambda x: (x < short_thresh).mean()) .fillna(0) ) else: df["patient_short_gap_pct"] = 0 # patient_long_gap_pct: % of fills with gap > threshold long_thresh = PRODUCT_CONFIG["patient_gap_long_threshold"] if _has_columns(df, deps["patient_long_gap_pct"]): df["patient_long_gap_pct"] = ( df.groupby("patient_id")["days_between_fills"] .transform(lambda x: (x > long_thresh).mean()) .fillna(0) ) else: df["patient_long_gap_pct"] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.4 Pharmacy Features (Pharmacy Level) # ═══════════════════════════════════════════════════════════════════════════════ def add_pharmacy_features(df: pd.DataFrame) -> pd.DataFrame: deps = FEATURE_DEPENDENCIES if "pharmacy_npi" not in df.columns: logger.warning("Skipping all pharmacy features — pharmacy_npi missing") for c in [ "pharmacy_unique_patients", "pharmacy_total_claims", "pharmacy_claims_per_patient_ratio", "pharmacy_avg_copay", "pharmacy_mail_order_pct", "pharmacy_reject_rate", "pharmacy_paper_submission_rate", "pharmacy_unique_hcps", "pharmacy_hcp_concentration", "pharmacy_one_and_done_pct", "pharmacy_total_benefit", "pharmacy_avg_benefit", "pharmacy_high_benefit_flag", "pharmacy_active_flag", "pharmacy_std_benefit", "pharmacy_max_benefit_per_patient", "pharmacy_std_copay", "pharmacy_max_claim_per_patient", "pharmacy_fraud_risk_score", ]: df[c] = 0 return df df["pharmacy_unique_patients"] = df.groupby("pharmacy_npi")["patient_id"].transform("nunique") df["pharmacy_total_claims"] = df.groupby("pharmacy_npi")["claim_id"].transform("count") df["pharmacy_claims_per_patient_ratio"] = df["pharmacy_total_claims"] / df["pharmacy_unique_patients"].replace(0, 1) if "copay_after" in df.columns: df["pharmacy_avg_copay"] = df.groupby("pharmacy_npi")["copay_after"].transform("mean") df["pharmacy_std_copay"] = df.groupby("pharmacy_npi")["copay_after"].transform("std").fillna(0) else: df["pharmacy_avg_copay"] = 0 df["pharmacy_std_copay"] = 0 if "mail_order" in df.columns: df["pharmacy_mail_order_pct"] = df.groupby("pharmacy_npi")["mail_order"].transform("mean") else: df["pharmacy_mail_order_pct"] = 0 if "has_reject_code" in df.columns: df["pharmacy_reject_rate"] = df.groupby("pharmacy_npi")["has_reject_code"].transform("mean") else: df["pharmacy_reject_rate"] = 0 if "paper_submission" in df.columns: df["pharmacy_paper_submission_rate"] = df.groupby("pharmacy_npi")["paper_submission"].transform("mean") else: df["pharmacy_paper_submission_rate"] = 0 # Pharmacy v3 features if "prescriber_npi" in df.columns: df["pharmacy_unique_hcps"] = df.groupby("pharmacy_npi")["prescriber_npi"].transform("nunique") # HCP concentration: % of claims from top HCP hcp_conc = df.groupby(["pharmacy_npi", "prescriber_npi"]).size().groupby(level=0).max() total_claims = df.groupby("pharmacy_npi")["claim_id"].count() hcp_conc_ratio = (hcp_conc / total_claims).fillna(0) df["pharmacy_hcp_concentration"] = df["pharmacy_npi"].map(hcp_conc_ratio).fillna(0) else: df["pharmacy_unique_hcps"] = 1 df["pharmacy_hcp_concentration"] = 0 if "patient_one_and_done" in df.columns: df["pharmacy_one_and_done_pct"] = df.groupby("pharmacy_npi")["patient_one_and_done"].transform("mean") else: df["pharmacy_one_and_done_pct"] = 0 if "benefit_amount" in df.columns: df["pharmacy_total_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("sum") df["pharmacy_avg_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("mean") df["pharmacy_std_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("std").fillna(0) # max benefit per patient at this pharmacy max_ben = df.groupby(["pharmacy_npi", "patient_id"])["benefit_amount"].sum().groupby(level=0).max() df["pharmacy_max_benefit_per_patient"] = df["pharmacy_npi"].map(max_ben).fillna(0) else: df["pharmacy_total_benefit"] = 0 df["pharmacy_avg_benefit"] = 0 df["pharmacy_std_benefit"] = 0 df["pharmacy_max_benefit_per_patient"] = 0 # pharmacy_high_benefit_flag hbt = PRODUCT_CONFIG["pharmacy_high_benefit_threshold"] df["pharmacy_high_benefit_flag"] = (df["pharmacy_avg_benefit"] > hbt).astype(int) # pharmacy_active_flag: from subcategory or default 1 if "pharmacy_subcategory" in df.columns: sub = df["pharmacy_subcategory"].fillna("").str.lower() df["pharmacy_active_flag"] = ( sub.str.contains("active|retail|chain|independent|mail").astype(int) ) else: df["pharmacy_active_flag"] = 1 # default active # pharmacy_max_claim_per_patient max_claim = df.groupby(["pharmacy_npi", "patient_id"]).size().groupby(level=0).max() df["pharmacy_max_claim_per_patient"] = df["pharmacy_npi"].map(max_claim).fillna(0) # pharmacy_fraud_risk_score: composite reject_rate = df["pharmacy_reject_rate"].fillna(0) paper_rate = df["pharmacy_paper_submission_rate"].fillna(0) hcp_conc = df["pharmacy_hcp_concentration"].fillna(0) one_done = df["pharmacy_one_and_done_pct"].fillna(0) df["pharmacy_fraud_risk_score"] = ( 0.25 * reject_rate + 0.20 * paper_rate + 0.30 * hcp_conc + 0.25 * one_done ) return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.5 Prescriber / HCP Features (HCP Level) # ═══════════════════════════════════════════════════════════════════════════════ def add_prescriber_features(df: pd.DataFrame) -> pd.DataFrame: deps = FEATURE_DEPENDENCIES if "prescriber_npi" not in df.columns: logger.warning("Skipping prescriber features — prescriber_npi missing") for c in [ "prescriber_unique_patients", "prescriber_total_claims", "hcp_total_benefit", "hcp_total_claims", "hcp_unique_patients", "hcp_avg_benefit_per_claim", "hcp_avg_benefit_per_patient", "hcp_unique_pharmacies", "hcp_one_and_done_pct", "hcp_patient_concentration", "hcp_suspicious_specialty", "hcp_high_benefit_flag", "hcp_max_benefit_per_patient", "hcp_std_benefit", "hcp_patient_share", ]: df[c] = 0 df["prescriber_specialty_valid"] = 1 # default to valid if unknown return df df["prescriber_unique_patients"] = df.groupby("prescriber_npi")["patient_id"].transform("nunique") df["prescriber_total_claims"] = df.groupby("prescriber_npi")["claim_id"].transform("count") valid_specialties = set(s.lower() for s in PRODUCT_CONFIG["valid_prescriber_specialties"]) if "prescriber_specialty" in df.columns: df["prescriber_specialty_valid"] = ( df["prescriber_specialty"].fillna("").str.lower().isin(valid_specialties).astype(int) ) else: df["prescriber_specialty_valid"] = 1 # assume valid if unknown # ── HCP Level v3 Features ── if "benefit_amount" in df.columns: df["hcp_total_benefit"] = df.groupby("prescriber_npi")["benefit_amount"].transform("sum") df["hcp_avg_benefit_per_claim"] = df.groupby("prescriber_npi")["benefit_amount"].transform("mean") df["hcp_std_benefit"] = df.groupby("prescriber_npi")["benefit_amount"].transform("std").fillna(0) # avg benefit per patient = total benefit / unique patients total_ben = df.groupby("prescriber_npi")["benefit_amount"].transform("sum") uniq_pts = df.groupby("prescriber_npi")["patient_id"].transform("nunique") df["hcp_avg_benefit_per_patient"] = (total_ben / uniq_pts.replace(0, 1)).fillna(0) # max benefit concentrated on single patient max_ben = df.groupby(["prescriber_npi", "patient_id"])["benefit_amount"].sum().groupby(level=0).max() df["hcp_max_benefit_per_patient"] = df["prescriber_npi"].map(max_ben).fillna(0) else: df["hcp_total_benefit"] = 0 df["hcp_avg_benefit_per_claim"] = 0 df["hcp_std_benefit"] = 0 df["hcp_avg_benefit_per_patient"] = 0 df["hcp_max_benefit_per_patient"] = 0 df["hcp_total_claims"] = df.groupby("prescriber_npi")["claim_id"].transform("count") df["hcp_unique_patients"] = df.groupby("prescriber_npi")["patient_id"].transform("nunique") if "pharmacy_npi" in df.columns: df["hcp_unique_pharmacies"] = df.groupby("prescriber_npi")["pharmacy_npi"].transform("nunique") else: df["hcp_unique_pharmacies"] = 1 # hcp_one_and_done_pct: % of patients with exactly 1 fill from this HCP if "patient_one_and_done" in df.columns: df["hcp_one_and_done_pct"] = df.groupby("prescriber_npi")["patient_one_and_done"].transform("mean") else: df["hcp_one_and_done_pct"] = 0 # hcp_patient_concentration: % of claims from top patient top_patient = df.groupby(["prescriber_npi", "patient_id"]).size().groupby(level=0).max() total_claims = df.groupby("prescriber_npi")["claim_id"].count() conc = (top_patient / total_claims).fillna(0) df["hcp_patient_concentration"] = df["prescriber_npi"].map(conc).fillna(0) # hcp_patient_share: total claims per patient, averaged (proxy for volume) df["hcp_patient_share"] = df["hcp_total_claims"] / df["hcp_unique_patients"].replace(0, 1) # hcp_suspicious_specialty sus_spec = set(s.lower() for s in PRODUCT_CONFIG["suspicious_prescriber_specialties"]) if "prescriber_specialty" in df.columns: spec = df["prescriber_specialty"].fillna("").str.lower() df["hcp_suspicious_specialty"] = spec.isin(sus_spec).astype(int) else: df["hcp_suspicious_specialty"] = 0 # hcp_high_benefit_flag hbt = PRODUCT_CONFIG["hcp_high_benefit_threshold"] df["hcp_high_benefit_flag"] = (df["hcp_avg_benefit_per_patient"] > hbt).astype(int) return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.6 Reject Code Features # ═══════════════════════════════════════════════════════════════════════════════ def add_reject_code_features(df: pd.DataFrame) -> pd.DataFrame: if "patient_id" not in df.columns: for c in ["patient_reject_count", "patient_reject_rate", "patient_highrisk_reject_count", "patient_maximizer_count"]: df[c] = 0 return df if "has_reject_code" in df.columns: df["patient_reject_count"] = df.groupby("patient_id")["has_reject_code"].transform("sum") df["patient_reject_rate"] = df.groupby("patient_id")["has_reject_code"].transform("mean") else: df["patient_reject_count"] = 0 df["patient_reject_rate"] = 0 if "high_risk_reject" in df.columns: df["patient_highrisk_reject_count"] = df.groupby("patient_id")["high_risk_reject"].transform("sum") else: df["patient_highrisk_reject_count"] = 0 if "maximizer_reject" in df.columns: df["patient_maximizer_count"] = df.groupby("patient_id")["maximizer_reject"].transform("sum") else: df["patient_maximizer_count"] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.7 Plan Switching Features # ═══════════════════════════════════════════════════════════════════════════════ def add_plan_switching_features(df: pd.DataFrame) -> pd.DataFrame: if "patient_id" not in df.columns: df["unique_plans_per_patient"] = 1 df["plan_switch_flag"] = 0 df["unique_bins_per_patient"] = 1 df["bin_switch_flag"] = 0 return df if "primary_plan_id" in df.columns: df["unique_plans_per_patient"] = df.groupby("patient_id")["primary_plan_id"].transform("nunique") else: df["unique_plans_per_patient"] = 1 df["plan_switch_flag"] = (df["unique_plans_per_patient"] > 1).astype(int) if "primary_payer_bin" in df.columns: df["unique_bins_per_patient"] = df.groupby("patient_id")["primary_payer_bin"].transform("nunique") else: df["unique_bins_per_patient"] = 1 df["bin_switch_flag"] = (df["unique_bins_per_patient"] > 1).astype(int) return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.8 Submission Features # ═══════════════════════════════════════════════════════════════════════════════ def add_submission_features(df: pd.DataFrame) -> pd.DataFrame: if _has_columns(df, FEATURE_DEPENDENCIES["patient_paper_submission_rate"]): df["patient_paper_submission_rate"] = df.groupby("patient_id")["paper_submission"].transform("mean") else: df["patient_paper_submission_rate"] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.9 DAW Features # ═══════════════════════════════════════════════════════════════════════════════ def add_daw_features(df: pd.DataFrame) -> pd.DataFrame: if _has_columns(df, FEATURE_DEPENDENCIES["patient_daw_brand_count"]): df["patient_daw_brand_count"] = df.groupby("patient_id")["daw_brand_required"].transform("sum") else: df["patient_daw_brand_count"] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.10 Linked Claim Features # ═══════════════════════════════════════════════════════════════════════════════ def add_linked_claim_features(df: pd.DataFrame) -> pd.DataFrame: if "patient_id" not in df.columns: df["patient_adjusted_count"] = 0 df["patient_linked_count"] = 0 return df if "is_adjusted" in df.columns: df["patient_adjusted_count"] = df.groupby("patient_id")["is_adjusted"].transform("sum") else: df["patient_adjusted_count"] = 0 if "has_linked_claim" in df.columns: df["patient_linked_count"] = df.groupby("patient_id")["has_linked_claim"].transform("sum") else: df["patient_linked_count"] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.11 Drug-Specific Features (Transaction Level) # ═══════════════════════════════════════════════════════════════════════════════ def add_drug_specific_features(df: pd.DataFrame) -> pd.DataFrame: deps = FEATURE_DEPENDENCIES # patient_ndc_count if _has_columns(df, deps["patient_ndc_count"]): df["patient_ndc_count"] = df.groupby("patient_id")["drug_ndc"].transform("nunique") else: df["patient_ndc_count"] = 1 df["ndc_switch_flag"] = (df["patient_ndc_count"] > 1).astype(int) # govt_insurance_flag df["govt_insurance_flag"] = (df.get("insurance_type", pd.Series("", index=df.index)) == "Government").astype(int) # quantity_anomaly expected_qty = PRODUCT_CONFIG["quantity_expected"] if "quantity" in df.columns: df["quantity_anomaly"] = (df["quantity"] != expected_qty).astype(int) else: df["quantity_anomaly"] = 0 # days_supply_anomaly expected_ds = PRODUCT_CONFIG["days_supply_expected"] if "days_supply" in df.columns: df["days_supply_anomaly"] = (df["days_supply"] != expected_ds).astype(int) else: df["days_supply_anomaly"] = 0 # age_violation_flag if "patient_age" in df.columns: df["age_violation_flag"] = (df["patient_age"] < PRODUCT_CONFIG["underage_threshold"]).astype(int) else: df["age_violation_flag"] = 0 # new_patient_burst if _has_columns(df, deps["new_patient_burst"]): df["new_patient_burst"] = ( (df["days_since_first_fill"] <= 7) & (df["total_fills_per_patient"] > 1) ).astype(int) else: df["new_patient_burst"] = 0 # cross_state_fill if _has_columns(df, deps["cross_state_fill"]): df["cross_state_fill"] = (df["patient_state"] != df["pharmacy_state"]).astype(int) else: df["cross_state_fill"] = 0 # Z-scores (relative to patient) for col, zname in [("copay_after", "copay_zscore"), ("usual_customary", "total_claim_zscore"), ("usual_customary", "uc_zscore")]: if col in df.columns and "patient_id" in df.columns: mean = df.groupby("patient_id")[col].transform("mean") std = df.groupby("patient_id")[col].transform("std").replace(0, np.nan) df[zname] = ((df[col] - mean) / std).fillna(0) else: df[zname] = 0 # benefit_ratio if _has_columns(df, deps["benefit_ratio"]): df["benefit_ratio"] = (df["benefit_amount"] / df["usual_customary"].replace(0, np.nan)).fillna(0) else: df["benefit_ratio"] = 0 # Transaction Level: transaction_benefit_score, transaction_oop_score if "benefit_amount" in df.columns: ben_mean = df["benefit_amount"].mean() ben_std = df["benefit_amount"].std() if ben_std > 0: df["transaction_benefit_score"] = ((df["benefit_amount"] - ben_mean) / ben_std).fillna(0) else: df["transaction_benefit_score"] = 0 else: df["transaction_benefit_score"] = 0 if "copay_after" in df.columns: oop_mean = df["copay_after"].mean() oop_std = df["copay_after"].std() if oop_std > 0: df["transaction_oop_score"] = ((df["copay_after"] - oop_mean) / oop_std).fillna(0) else: df["transaction_oop_score"] = 0 else: df["transaction_oop_score"] = 0 return df # ═══════════════════════════════════════════════════════════════════════════════ # 4.12 Group-Aware Features (v4 — Group 8141 vs Group 8200) # ═══════════════════════════════════════════════════════════════════════════════ def get_days_supply_tier(days_supply): """Map days_supply to tier string for benefit lookup.""" if pd.isna(days_supply): return "1-30" ds = int(days_supply) if ds <= 30: return "1-30" elif ds <= 60: return "31-60" else: return "61-90" def lookup_benefit_cap(group_id, scenario, fill_date, days_supply_tier): """Look up the max allowed benefit for a claim based on group config.""" if pd.isna(group_id) or str(group_id).strip().upper() == "UNKNOWN": return np.nan gid = str(group_id).strip().upper() if gid not in GROUP_BENEFIT_CONFIG: return np.nan cfg = GROUP_BENEFIT_CONFIG[gid] if scenario not in cfg["covered_scenarios"]: return 0.0 # non-covered scenario → cap is $0 # Find matching date range in benefit_schedule if fill_date is pd.NaT or pd.isna(fill_date): return np.nan for (sched_scenario, start_str, end_str), tier_map in cfg["benefit_schedule"].items(): if sched_scenario != scenario: continue start_dt = pd.Timestamp(start_str) end_dt = pd.Timestamp(end_str) if start_dt <= fill_date <= end_dt: return tier_map.get(days_supply_tier, np.nan) return np.nan def add_group_aware_features(df: pd.DataFrame) -> pd.DataFrame: """Generate features based on Group 8141 vs Group 8200 benefit rules.""" deps = FEATURE_DEPENDENCIES # days_supply_tier if "days_supply" in df.columns: df["days_supply_tier"] = df["days_supply"].apply(get_days_supply_tier) else: df["days_supply_tier"] = "1-30" # allowed_benefit_cap if _has_columns(df, deps["allowed_benefit_cap"]): df["allowed_benefit_cap"] = df.apply( lambda row: lookup_benefit_cap( row.get("group_id", "UNKNOWN"), row.get("claim_scenario_derived", "commercial_approved"), row.get("fill_date", pd.NaT), row.get("days_supply_tier", "1-30"), ), axis=1, ) else: logger.warning("Skipping 'allowed_benefit_cap' — missing group_id, claim_scenario, fill_date, or days_supply") df["allowed_benefit_cap"] = np.nan # cap_utilization_ratio = benefit_amount / allowed_benefit_cap if "benefit_amount" in df.columns and "allowed_benefit_cap" in df.columns: df["cap_utilization_ratio"] = ( df["benefit_amount"] / df["allowed_benefit_cap"].replace(0, np.nan) ).fillna(0).clip(lower=0) # Cap at 10x to avoid extreme outliers df["cap_utilization_ratio"] = df["cap_utilization_ratio"].clip(upper=10) else: df["cap_utilization_ratio"] = 0 # excess_payment_amount = max(0, benefit_amount - allowed_benefit_cap) if "benefit_amount" in df.columns and "allowed_benefit_cap" in df.columns: df["excess_payment_amount"] = ( (df["benefit_amount"] - df["allowed_benefit_cap"]).clip(lower=0) ).fillna(0) else: df["excess_payment_amount"] = 0 # scenario_not_covered_flag: claim scenario not covered by group if _has_columns(df, deps["scenario_not_covered_flag"]): def is_scenario_not_covered(row): gid = str(row.get("group_id", "UNKNOWN")).strip().upper() scenario = row.get("claim_scenario_derived", "commercial_approved") if gid not in GROUP_BENEFIT_CONFIG: return 0 # unknown group → cannot flag return int(scenario in GROUP_BENEFIT_CONFIG[gid]["non_covered_scenarios"]) df["scenario_not_covered_flag"] = df.apply(is_scenario_not_covered, axis=1) else: df["scenario_not_covered_flag"] = 0 # invalid_period_benefit_flag: group+scenario+date combo has no valid benefit schedule if _has_columns(df, deps["invalid_period_benefit_flag"]): def is_invalid_period(row): gid = str(row.get("group_id", "UNKNOWN")).strip().upper() scenario = row.get("claim_scenario_derived", "commercial_approved") fill_date = row.get("fill_date", pd.NaT) if gid not in GROUP_BENEFIT_CONFIG: return 0 if scenario not in GROUP_BENEFIT_CONFIG[gid]["covered_scenarios"]: return 0 # Check if any schedule entry matches this scenario+date for (sched_scenario, start_str, end_str), _ in GROUP_BENEFIT_CONFIG[gid]["benefit_schedule"].items(): if sched_scenario == scenario: start_dt = pd.Timestamp(start_str) end_dt = pd.Timestamp(end_str) if start_dt <= fill_date <= end_dt: return 0 return 1 # no matching period found df["invalid_period_benefit_flag"] = df.apply(is_invalid_period, axis=1) else: df["invalid_period_benefit_flag"] = 0 # group_benefit_mismatch_flag: scenario not covered OR invalid period df["group_benefit_mismatch_flag"] = ( (df["scenario_not_covered_flag"] == 1) | (df["invalid_period_benefit_flag"] == 1) ).astype(int) # annual_fill_count per patient per calendar year if _has_columns(df, deps["annual_fill_count"]): df["claim_year"] = df["fill_date"].dt.year group_key = ["patient_id", "claim_year"] if "card_id" in df.columns: group_key.append("card_id") df["annual_fill_count"] = df.groupby(group_key)["claim_id"].transform("count") else: df["annual_fill_count"] = 1 # annual_days_supply_count per patient per calendar year if _has_columns(df, deps["annual_days_supply_count"]): group_key = ["patient_id", "claim_year"] if "card_id" in df.columns: group_key.append("card_id") df["annual_days_supply_count"] = df.groupby(group_key)["days_supply"].transform("sum") else: df["annual_days_supply_count"] = df.get("days_supply", pd.Series(30, index=df.index)) # annual_fill_limit_exceeded max_fills = PRODUCT_CONFIG["annual_max_fills_per_patient"] df["annual_fill_limit_exceeded"] = (df["annual_fill_count"] > max_fills).astype(int) # annual_days_supply_limit_exceeded max_ds = PRODUCT_CONFIG["annual_max_days_supply_per_patient"] df["annual_days_supply_limit_exceeded"] = (df["annual_days_supply_count"] > max_ds).astype(int) # non_covered_ndc_flag if _has_columns(df, deps["non_covered_ndc_flag"]): df["non_covered_ndc_flag"] = (~df["drug_ndc"].isin(COVERED_NDCS)).astype(int) else: df["non_covered_ndc_flag"] = 0 # govt_claim_with_benefit_flag: government insurance AND benefit_amount > 0 if _has_columns(df, deps["govt_claim_with_benefit_flag"]): df["govt_claim_with_benefit_flag"] = ( (df["insurance_type"] == "Government") & (df["benefit_amount"] > 0) ).astype(int) else: df["govt_claim_with_benefit_flag"] = 0 # quantity_out_of_range_flag if _has_columns(df, deps["quantity_out_of_range_flag"]): qmin = PRODUCT_CONFIG["quantity_min"] qmax = PRODUCT_CONFIG["quantity_max"] df["quantity_out_of_range_flag"] = ( (df["quantity"] < qmin) | (df["quantity"] > qmax) ).astype(int) else: df["quantity_out_of_range_flag"] = 0 # days_supply_out_of_range_flag if _has_columns(df, deps["days_supply_out_of_range_flag"]): dsmin = PRODUCT_CONFIG["days_supply_min"] dsmax = PRODUCT_CONFIG["days_supply_max"] df["days_supply_out_of_range_flag"] = ( (df["days_supply"] < dsmin) | (df["days_supply"] > dsmax) ).astype(int) else: df["days_supply_out_of_range_flag"] = 0 # max_benefit_repeat_flag: patient+pharmacy gets max cap >=3 times if _has_columns(df, deps["max_benefit_repeat_flag"]): def get_cap(gid, scenario, tier): if pd.isna(gid): return np.nan g = str(gid).strip().upper() if g not in GROUP_BENEFIT_CONFIG: return np.nan for (sched_scenario, start_str, end_str), tier_map in GROUP_BENEFIT_CONFIG[g]["benefit_schedule"].items(): if sched_scenario == scenario: return tier_map.get(tier, np.nan) return np.nan df["_tmp_cap"] = df.apply( lambda row: get_cap(row.get("group_id"), row.get("claim_scenario_derived"), row.get("days_supply_tier")), axis=1, ) # Flag if benefit equals cap (within $1 tolerance) for same patient+pharmacy df["_at_cap"] = (df["benefit_amount"] >= (df["_tmp_cap"] - 1)).astype(int) cap_counts = df.groupby(["patient_id", "pharmacy_npi"])["_at_cap"].transform("sum") df["max_benefit_repeat_flag"] = (cap_counts >= 3).astype(int) df.drop(columns=["_tmp_cap", "_at_cap"], inplace=True, errors="ignore") else: df["max_benefit_repeat_flag"] = 0 logger.info(f"Group-aware features added: cap_utilization_ratio, excess_payment, scenario_not_covered, annual limits, NDC coverage, max_benefit_repeat") return df # ═══════════════════════════════════════════════════════════════════════════════ # MASTER FEATURE ENGINEERING # ═══════════════════════════════════════════════════════════════════════════════ def engineer_features(df: pd.DataFrame) -> pd.DataFrame: logger.info(f"Feature engineering on {len(df):,} claims...") df = add_temporal_features(df) df = add_rolling_window_features(df) df = add_patient_behavioral_features(df) df = add_pharmacy_features(df) df = add_prescriber_features(df) df = add_reject_code_features(df) df = add_plan_switching_features(df) df = add_submission_features(df) df = add_daw_features(df) df = add_linked_claim_features(df) df = add_drug_specific_features(df) df = add_group_aware_features(df) logger.info(f"Feature engineering complete: {len(df.columns)} total columns") return df # ═══════════════════════════════════════════════════════════════════════════════ # SCALING & ENCODING # ═══════════════════════════════════════════════════════════════════════════════ def scale_and_encode(df: pd.DataFrame) -> tuple: """ Scale numerics + encode categoricals. Only uses columns that actually exist. Returns (df_processed, scaler, encoder, feature_names). """ # Auto-detect categorical columns present cat_candidates = [ "claim_type", "refill_indicator", "rx_origin_code", "daw_code", "other_coverage", "mail_order_indicator", "pharmacy_subcategory", "prescriber_specialty", "primary_payer_bin", "primary_payer_pcn", "primary_plan_id", "primary_plan_name", "primary_model_type", "submission_method", "submission_type", "payment_method", "reject_code", "reject_type", "linked_claim_type", "group_number", "group_name", "pharmacy_state", "patient_state", "drug_name", "drug_form", "drug_strength", "insurance_type", "hcp_suspicious_specialty", "pharmacy_active_flag", "pharmacy_high_benefit_flag", "hcp_high_benefit_flag", "risk_tier", ] cat_present = [c for c in cat_candidates if c in df.columns] # Auto-detect numeric columns present num_candidates = [ # Core fields "days_supply", "quantity", "copay_before", "copay_after", "benefit_amount", "usual_customary", "dispensing_fee", "sales_tax", "remaining_balance", "number_of_benefits", "total_copay", # Temporal "days_between_fills", "days_since_first_fill", "claim_month", "claim_dow", "claim_quarter", "rx_lag_days", # Patient rolling windows "patient_fill_count_7d", "patient_fill_count_30d", "patient_fill_count_90d", "patient_copay_spend_7d", "patient_copay_spend_30d", "patient_copay_spend_90d", "patient_total_claim_7d", "patient_total_claim_30d", "patient_total_claim_90d", "patient_benefit_7d", "patient_benefit_30d", "patient_benefit_90d", # Pharmacy rolling "pharmacy_claim_count_30d", "pharmacy_claim_count_90d", "prescriber_claim_count_30d", "prescriber_claim_count_90d", # Patient behavioral "unique_pharmacies_overall", "unique_programs_per_patient", "unique_prescribers_per_patient", "total_fills_per_patient", "avg_days_between_fills", "std_days_between_fills", "max_fills_any_30d", "copay_deviation_from_patient_mean", "total_benefit_per_patient", "patient_reject_count", "patient_reject_rate", "patient_highrisk_reject_count", "patient_maximizer_count", "unique_plans_per_patient", "unique_bins_per_patient", "patient_paper_submission_rate", "patient_daw_brand_count", "patient_adjusted_count", "patient_linked_count", "patient_ndc_count", "copay_zscore", "total_claim_zscore", "uc_zscore", "benefit_ratio", # Patient level v3 "patient_active_duration", "patient_avg_gap", "patient_one_and_done", "patient_short_gap_pct", "patient_long_gap_pct", # HCP level v3 "hcp_total_benefit", "hcp_total_claims", "hcp_unique_patients", "hcp_avg_benefit_per_claim", "hcp_avg_benefit_per_patient", "hcp_unique_pharmacies", "hcp_one_and_done_pct", "hcp_patient_concentration", "hcp_max_benefit_per_patient", "hcp_std_benefit", "hcp_patient_share", # Pharmacy level v3 "pharmacy_unique_patients", "pharmacy_total_claims", "pharmacy_claims_per_patient_ratio", "pharmacy_avg_copay", "pharmacy_mail_order_pct", "pharmacy_reject_rate", "pharmacy_paper_submission_rate", "pharmacy_unique_hcps", "pharmacy_hcp_concentration", "pharmacy_one_and_done_pct", "pharmacy_total_benefit", "pharmacy_avg_benefit", "pharmacy_std_benefit", "pharmacy_max_benefit_per_patient", "pharmacy_std_copay", "pharmacy_max_claim_per_patient", "pharmacy_fraud_risk_score", # Transaction level v3 "transaction_benefit_score", "transaction_oop_score", # Group-aware v4 "cap_utilization_ratio", "excess_payment_amount", "annual_fill_count", "annual_days_supply_count", ] num_present = [c for c in num_candidates if c in df.columns] logger.info(f"Encoding {len(cat_present)} categorical, scaling {len(num_present)} numeric features") # Save original categorical values for rule evaluation for c in cat_present: df[f"_orig_{c}"] = df[c].astype(str) # Encode categoricals if cat_present: encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1) df_cat = pd.DataFrame( encoder.fit_transform(df[cat_present].fillna("MISSING")), columns=cat_present, index=df.index ) else: encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1) df_cat = pd.DataFrame(index=df.index) # Scale numerics if num_present: scaler = StandardScaler() df_num = pd.DataFrame( scaler.fit_transform(df[num_present].fillna(0)), columns=num_present, index=df.index ) else: scaler = StandardScaler() df_num = pd.DataFrame(index=df.index) df_processed = pd.concat([df_num, df_cat], axis=1) feature_names = num_present + cat_present return df_processed, scaler, encoder, feature_names if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Feature engineering v2") parser.add_argument("--input", default="data/ingested.parquet", help="Ingested parquet") parser.add_argument("--output", default="data/features.parquet", help="Output parquet") args = parser.parse_args() df = pd.read_parquet(args.input) df = engineer_features(df) df.to_parquet(args.output, index=False) logger.info(f"Saved features to {args.output}")