gsk-copay-fraud-detection / feature_engineering_v2.py
Harsh2396's picture
Upload feature_engineering_v2.py with huggingface_hub
4688357 verified
"""
feature_engineering_v2.py β€” Generate 60+ features from ingested GSK copay data.
KEY CAPABILITY: Graceful degradation. Every feature group checks whether its
source columns exist before computing. Missing columns β†’ feature is skipped
(filled with 0) and a warning is logged.
Uses FEATURE_DEPENDENCIES from config.py to determine what each feature needs.
HIERARCHICAL FEATURES (v3)
==========================
Organized into 4 analytical levels:
1. Transaction Level β€” per-claim anomalous behavior (gap, qty, DOS, benefit, OOP, NDC)
2. Patient Level β€” patient behavioral anomalies (one-&-done, avg gap, active duration)
3. HCP Level β€” prescriber-driven fraud indicators (specialty, avg benefit, concentration)
4. Pharmacy Level β€” pharmacy-centric fraud patterns (active flag, one-&-done, HCP conc, avg benefit)
"""
import logging
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from config import PRODUCT_CONFIG, FEATURE_DEPENDENCIES, GROUP_BENEFIT_CONFIG, COVERED_NDCS, CLAIM_SCENARIO_MAP
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)
def _has_columns(df, cols):
"""Check if all columns in `cols` exist in df."""
return all(c in df.columns for c in cols)
def _safe_groupby_transform(df, group_col, value_col, transform, feature_name):
"""Safely perform a groupby transform; if it fails, return zeros."""
try:
return df.groupby(group_col)[value_col].transform(transform)
except Exception as e:
logger.warning(f"Feature '{feature_name}' groupby failed: {e}. Filling with 0.")
return pd.Series(0, index=df.index)
def _safe_rolling(df, group_col, date_col, value_col, window_days, agg, feature_name):
"""Safely compute a time-based rolling window per group."""
try:
result = (
df.groupby(group_col)
.rolling(f"{window_days}D", on=date_col, min_periods=1)[value_col]
.agg(agg)
.reset_index(level=0, drop=True)
)
return result.fillna(0).values
except Exception as e:
logger.warning(f"Feature '{feature_name}' rolling window failed: {e}. Filling with 0.")
return np.zeros(len(df))
# ═══════════════════════════════════════════════════════════════════════════════
# 4.1 Temporal Features (Transaction Level)
# ═══════════════════════════════════════════════════════════════════════════════
def add_temporal_features(df: pd.DataFrame) -> pd.DataFrame:
deps = FEATURE_DEPENDENCIES
# days_between_fills
if _has_columns(df, deps["days_between_fills"]):
df = df.sort_values(["patient_id", "fill_date"]).copy()
df["days_between_fills"] = df.groupby("patient_id")["fill_date"].diff().dt.days
df["days_between_fills"] = df["days_between_fills"].fillna(0)
else:
logger.warning("Skipping 'days_between_fills' β€” missing patient_id or fill_date")
df["days_between_fills"] = 0
# early_refill_flag
if "days_between_fills" in df.columns:
threshold = PRODUCT_CONFIG["early_refill_threshold_days"]
df["early_refill_flag"] = (df["days_between_fills"] < threshold).astype(int).fillna(0)
else:
df["early_refill_flag"] = 0
# days_since_first_fill
if _has_columns(df, deps["days_since_first_fill"]):
first_fill = df.groupby("patient_id")["fill_date"].transform("min")
df["days_since_first_fill"] = (df["fill_date"] - first_fill).dt.days.fillna(0)
else:
df["days_since_first_fill"] = 0
# Calendar features
if "fill_date" in df.columns:
df["claim_month"] = df["fill_date"].dt.month.fillna(0)
df["claim_dow"] = df["fill_date"].dt.dayofweek.fillna(0)
df["claim_quarter"] = df["fill_date"].dt.quarter.fillna(0)
else:
df["claim_month"] = 0
df["claim_dow"] = 0
df["claim_quarter"] = 0
# rx_lag_days
if _has_columns(df, deps["rx_lag_days"]):
df["rx_lag_days"] = (df["fill_date"] - df["date_written"]).dt.days.fillna(0)
else:
df["rx_lag_days"] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.2 Rolling Windows (Patient Level)
# ═══════════════════════════════════════════════════════════════════════════════
def add_rolling_window_features(df: pd.DataFrame) -> pd.DataFrame:
df = df.sort_values("fill_date").copy()
deps = FEATURE_DEPENDENCIES
# Patient-level rolling counts
for window in [7, 30, 90]:
col = f"patient_fill_count_{window}d"
if _has_columns(df, deps[col]):
df[col] = _safe_rolling(df, "patient_id", "fill_date", "claim_id", window, "count", col)
else:
logger.warning(f"Skipping '{col}' β€” missing patient_id or fill_date")
df[col] = 0
# Patient-level copay spend
for window in [7, 30, 90]:
col = f"patient_copay_spend_{window}d"
if _has_columns(df, ["patient_id", "fill_date", "copay_after"]):
df[col] = _safe_rolling(df, "patient_id", "fill_date", "copay_after", window, "sum", col)
else:
df[col] = 0
# Patient-level total claim amount (usual_customary proxy)
for window in [7, 30, 90]:
col = f"patient_total_claim_{window}d"
if _has_columns(df, ["patient_id", "fill_date", "usual_customary"]):
df[col] = _safe_rolling(df, "patient_id", "fill_date", "usual_customary", window, "sum", col)
else:
df[col] = 0
# Patient-level benefit amount
for window in [7, 30, 90]:
col = f"patient_benefit_{window}d"
if _has_columns(df, ["patient_id", "fill_date", "benefit_amount"]):
df[col] = _safe_rolling(df, "patient_id", "fill_date", "benefit_amount", window, "sum", col)
else:
df[col] = 0
# Pharmacy-level rolling counts
for window in [30, 90]:
col = f"pharmacy_claim_count_{window}d"
if _has_columns(df, ["pharmacy_npi", "fill_date"]):
try:
counts = df.groupby(["pharmacy_npi", pd.Grouper(key="fill_date", freq=f"{window}D")]).size().reset_index(name="c")
df = df.merge(counts, on=["pharmacy_npi", "fill_date"], how="left")
df[col] = df["c"].fillna(0)
df.drop(columns=["c"], inplace=True, errors="ignore")
except Exception as e:
logger.warning(f"'{col}' pharmacy rolling failed: {e}")
df[col] = 0
else:
df[col] = 0
# Prescriber-level rolling counts
for window in [30, 90]:
col = f"prescriber_claim_count_{window}d"
if _has_columns(df, ["prescriber_npi", "fill_date"]):
try:
counts = df.groupby(["prescriber_npi", pd.Grouper(key="fill_date", freq=f"{window}D")]).size().reset_index(name="c")
df = df.merge(counts, on=["prescriber_npi", "fill_date"], how="left")
df[col] = df["c"].fillna(0)
df.drop(columns=["c"], inplace=True, errors="ignore")
except Exception as e:
logger.warning(f"'{col}' prescriber rolling failed: {e}")
df[col] = 0
else:
df[col] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.3 Patient Behavioral Features (Patient Level)
# ═══════════════════════════════════════════════════════════════════════════════
def add_patient_behavioral_features(df: pd.DataFrame) -> pd.DataFrame:
deps = FEATURE_DEPENDENCIES
# unique_pharmacies_overall
if _has_columns(df, deps["unique_pharmacies_overall"]):
df["unique_pharmacies_overall"] = df.groupby("patient_id")["pharmacy_npi"].transform("nunique")
else:
df["unique_pharmacies_overall"] = 1
# unique_programs_per_patient
if _has_columns(df, deps["unique_programs_per_patient"]):
df["unique_programs_per_patient"] = df.groupby("patient_id")["program_id"].transform("nunique")
else:
df["unique_programs_per_patient"] = 1
# unique_prescribers_per_patient
if _has_columns(df, deps["unique_prescribers_per_patient"]):
df["unique_prescribers_per_patient"] = df.groupby("patient_id")["prescriber_npi"].transform("nunique")
else:
df["unique_prescribers_per_patient"] = 1
# total_fills_per_patient
if _has_columns(df, deps["total_fills_per_patient"]):
df["total_fills_per_patient"] = df.groupby("patient_id")["claim_id"].transform("count")
else:
df["total_fills_per_patient"] = 1
# avg / std days_between_fills
if _has_columns(df, deps["avg_days_between_fills"]):
avg_days = df.groupby("patient_id")["days_between_fills"].transform("mean")
std_days = df.groupby("patient_id")["days_between_fills"].transform("std")
df["avg_days_between_fills"] = avg_days.fillna(0)
df["std_days_between_fills"] = std_days.fillna(0)
else:
df["avg_days_between_fills"] = 0
df["std_days_between_fills"] = 0
# max_fills_any_30d
if _has_columns(df, deps["max_fills_any_30d"]):
df["max_fills_any_30d"] = df.groupby("patient_id")["patient_fill_count_30d"].transform("max")
else:
df["max_fills_any_30d"] = 0
# copay_deviation_from_patient_mean
if _has_columns(df, deps["copay_deviation_from_patient_mean"]):
mean = df.groupby("patient_id")["copay_after"].transform("mean")
df["copay_deviation_from_patient_mean"] = (df["copay_after"] - mean).abs()
else:
df["copay_deviation_from_patient_mean"] = 0
# total_benefit_per_patient
if _has_columns(df, deps["total_benefit_per_patient"]):
df["total_benefit_per_patient"] = df.groupby("patient_id")["benefit_amount"].transform("sum")
else:
df["total_benefit_per_patient"] = 0
# ── Patient Level v3 ──
# patient_active_duration: days from first fill to last fill per patient
if _has_columns(df, deps["patient_active_duration"]):
first = df.groupby("patient_id")["fill_date"].transform("min")
last = df.groupby("patient_id")["fill_date"].transform("max")
df["patient_active_duration"] = (last - first).dt.days.fillna(0)
else:
df["patient_active_duration"] = 0
# patient_avg_gap: average gap between fills per patient
if _has_columns(df, deps["patient_avg_gap"]):
df["patient_avg_gap"] = df.groupby("patient_id")["days_between_fills"].transform("mean").fillna(0)
else:
df["patient_avg_gap"] = 0
# patient_one_and_done: patient has exactly 1 fill total
if _has_columns(df, deps["patient_one_and_done"]):
df["patient_one_and_done"] = (df["total_fills_per_patient"] == 1).astype(int)
else:
df["patient_one_and_done"] = 0
# patient_short_gap_pct: % of fills with gap < threshold
short_thresh = PRODUCT_CONFIG["patient_gap_short_threshold"]
if _has_columns(df, deps["patient_short_gap_pct"]):
df["patient_short_gap_pct"] = (
df.groupby("patient_id")["days_between_fills"]
.transform(lambda x: (x < short_thresh).mean())
.fillna(0)
)
else:
df["patient_short_gap_pct"] = 0
# patient_long_gap_pct: % of fills with gap > threshold
long_thresh = PRODUCT_CONFIG["patient_gap_long_threshold"]
if _has_columns(df, deps["patient_long_gap_pct"]):
df["patient_long_gap_pct"] = (
df.groupby("patient_id")["days_between_fills"]
.transform(lambda x: (x > long_thresh).mean())
.fillna(0)
)
else:
df["patient_long_gap_pct"] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.4 Pharmacy Features (Pharmacy Level)
# ═══════════════════════════════════════════════════════════════════════════════
def add_pharmacy_features(df: pd.DataFrame) -> pd.DataFrame:
deps = FEATURE_DEPENDENCIES
if "pharmacy_npi" not in df.columns:
logger.warning("Skipping all pharmacy features β€” pharmacy_npi missing")
for c in [
"pharmacy_unique_patients", "pharmacy_total_claims", "pharmacy_claims_per_patient_ratio",
"pharmacy_avg_copay", "pharmacy_mail_order_pct", "pharmacy_reject_rate",
"pharmacy_paper_submission_rate", "pharmacy_unique_hcps",
"pharmacy_hcp_concentration", "pharmacy_one_and_done_pct",
"pharmacy_total_benefit", "pharmacy_avg_benefit", "pharmacy_high_benefit_flag",
"pharmacy_active_flag", "pharmacy_std_benefit", "pharmacy_max_benefit_per_patient",
"pharmacy_std_copay", "pharmacy_max_claim_per_patient", "pharmacy_fraud_risk_score",
]:
df[c] = 0
return df
df["pharmacy_unique_patients"] = df.groupby("pharmacy_npi")["patient_id"].transform("nunique")
df["pharmacy_total_claims"] = df.groupby("pharmacy_npi")["claim_id"].transform("count")
df["pharmacy_claims_per_patient_ratio"] = df["pharmacy_total_claims"] / df["pharmacy_unique_patients"].replace(0, 1)
if "copay_after" in df.columns:
df["pharmacy_avg_copay"] = df.groupby("pharmacy_npi")["copay_after"].transform("mean")
df["pharmacy_std_copay"] = df.groupby("pharmacy_npi")["copay_after"].transform("std").fillna(0)
else:
df["pharmacy_avg_copay"] = 0
df["pharmacy_std_copay"] = 0
if "mail_order" in df.columns:
df["pharmacy_mail_order_pct"] = df.groupby("pharmacy_npi")["mail_order"].transform("mean")
else:
df["pharmacy_mail_order_pct"] = 0
if "has_reject_code" in df.columns:
df["pharmacy_reject_rate"] = df.groupby("pharmacy_npi")["has_reject_code"].transform("mean")
else:
df["pharmacy_reject_rate"] = 0
if "paper_submission" in df.columns:
df["pharmacy_paper_submission_rate"] = df.groupby("pharmacy_npi")["paper_submission"].transform("mean")
else:
df["pharmacy_paper_submission_rate"] = 0
# Pharmacy v3 features
if "prescriber_npi" in df.columns:
df["pharmacy_unique_hcps"] = df.groupby("pharmacy_npi")["prescriber_npi"].transform("nunique")
# HCP concentration: % of claims from top HCP
hcp_conc = df.groupby(["pharmacy_npi", "prescriber_npi"]).size().groupby(level=0).max()
total_claims = df.groupby("pharmacy_npi")["claim_id"].count()
hcp_conc_ratio = (hcp_conc / total_claims).fillna(0)
df["pharmacy_hcp_concentration"] = df["pharmacy_npi"].map(hcp_conc_ratio).fillna(0)
else:
df["pharmacy_unique_hcps"] = 1
df["pharmacy_hcp_concentration"] = 0
if "patient_one_and_done" in df.columns:
df["pharmacy_one_and_done_pct"] = df.groupby("pharmacy_npi")["patient_one_and_done"].transform("mean")
else:
df["pharmacy_one_and_done_pct"] = 0
if "benefit_amount" in df.columns:
df["pharmacy_total_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("sum")
df["pharmacy_avg_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("mean")
df["pharmacy_std_benefit"] = df.groupby("pharmacy_npi")["benefit_amount"].transform("std").fillna(0)
# max benefit per patient at this pharmacy
max_ben = df.groupby(["pharmacy_npi", "patient_id"])["benefit_amount"].sum().groupby(level=0).max()
df["pharmacy_max_benefit_per_patient"] = df["pharmacy_npi"].map(max_ben).fillna(0)
else:
df["pharmacy_total_benefit"] = 0
df["pharmacy_avg_benefit"] = 0
df["pharmacy_std_benefit"] = 0
df["pharmacy_max_benefit_per_patient"] = 0
# pharmacy_high_benefit_flag
hbt = PRODUCT_CONFIG["pharmacy_high_benefit_threshold"]
df["pharmacy_high_benefit_flag"] = (df["pharmacy_avg_benefit"] > hbt).astype(int)
# pharmacy_active_flag: from subcategory or default 1
if "pharmacy_subcategory" in df.columns:
sub = df["pharmacy_subcategory"].fillna("").str.lower()
df["pharmacy_active_flag"] = (
sub.str.contains("active|retail|chain|independent|mail").astype(int)
)
else:
df["pharmacy_active_flag"] = 1 # default active
# pharmacy_max_claim_per_patient
max_claim = df.groupby(["pharmacy_npi", "patient_id"]).size().groupby(level=0).max()
df["pharmacy_max_claim_per_patient"] = df["pharmacy_npi"].map(max_claim).fillna(0)
# pharmacy_fraud_risk_score: composite
reject_rate = df["pharmacy_reject_rate"].fillna(0)
paper_rate = df["pharmacy_paper_submission_rate"].fillna(0)
hcp_conc = df["pharmacy_hcp_concentration"].fillna(0)
one_done = df["pharmacy_one_and_done_pct"].fillna(0)
df["pharmacy_fraud_risk_score"] = (
0.25 * reject_rate + 0.20 * paper_rate + 0.30 * hcp_conc + 0.25 * one_done
)
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.5 Prescriber / HCP Features (HCP Level)
# ═══════════════════════════════════════════════════════════════════════════════
def add_prescriber_features(df: pd.DataFrame) -> pd.DataFrame:
deps = FEATURE_DEPENDENCIES
if "prescriber_npi" not in df.columns:
logger.warning("Skipping prescriber features β€” prescriber_npi missing")
for c in [
"prescriber_unique_patients", "prescriber_total_claims",
"hcp_total_benefit", "hcp_total_claims", "hcp_unique_patients",
"hcp_avg_benefit_per_claim", "hcp_avg_benefit_per_patient",
"hcp_unique_pharmacies", "hcp_one_and_done_pct",
"hcp_patient_concentration", "hcp_suspicious_specialty",
"hcp_high_benefit_flag", "hcp_max_benefit_per_patient",
"hcp_std_benefit", "hcp_patient_share",
]:
df[c] = 0
df["prescriber_specialty_valid"] = 1 # default to valid if unknown
return df
df["prescriber_unique_patients"] = df.groupby("prescriber_npi")["patient_id"].transform("nunique")
df["prescriber_total_claims"] = df.groupby("prescriber_npi")["claim_id"].transform("count")
valid_specialties = set(s.lower() for s in PRODUCT_CONFIG["valid_prescriber_specialties"])
if "prescriber_specialty" in df.columns:
df["prescriber_specialty_valid"] = (
df["prescriber_specialty"].fillna("").str.lower().isin(valid_specialties).astype(int)
)
else:
df["prescriber_specialty_valid"] = 1 # assume valid if unknown
# ── HCP Level v3 Features ──
if "benefit_amount" in df.columns:
df["hcp_total_benefit"] = df.groupby("prescriber_npi")["benefit_amount"].transform("sum")
df["hcp_avg_benefit_per_claim"] = df.groupby("prescriber_npi")["benefit_amount"].transform("mean")
df["hcp_std_benefit"] = df.groupby("prescriber_npi")["benefit_amount"].transform("std").fillna(0)
# avg benefit per patient = total benefit / unique patients
total_ben = df.groupby("prescriber_npi")["benefit_amount"].transform("sum")
uniq_pts = df.groupby("prescriber_npi")["patient_id"].transform("nunique")
df["hcp_avg_benefit_per_patient"] = (total_ben / uniq_pts.replace(0, 1)).fillna(0)
# max benefit concentrated on single patient
max_ben = df.groupby(["prescriber_npi", "patient_id"])["benefit_amount"].sum().groupby(level=0).max()
df["hcp_max_benefit_per_patient"] = df["prescriber_npi"].map(max_ben).fillna(0)
else:
df["hcp_total_benefit"] = 0
df["hcp_avg_benefit_per_claim"] = 0
df["hcp_std_benefit"] = 0
df["hcp_avg_benefit_per_patient"] = 0
df["hcp_max_benefit_per_patient"] = 0
df["hcp_total_claims"] = df.groupby("prescriber_npi")["claim_id"].transform("count")
df["hcp_unique_patients"] = df.groupby("prescriber_npi")["patient_id"].transform("nunique")
if "pharmacy_npi" in df.columns:
df["hcp_unique_pharmacies"] = df.groupby("prescriber_npi")["pharmacy_npi"].transform("nunique")
else:
df["hcp_unique_pharmacies"] = 1
# hcp_one_and_done_pct: % of patients with exactly 1 fill from this HCP
if "patient_one_and_done" in df.columns:
df["hcp_one_and_done_pct"] = df.groupby("prescriber_npi")["patient_one_and_done"].transform("mean")
else:
df["hcp_one_and_done_pct"] = 0
# hcp_patient_concentration: % of claims from top patient
top_patient = df.groupby(["prescriber_npi", "patient_id"]).size().groupby(level=0).max()
total_claims = df.groupby("prescriber_npi")["claim_id"].count()
conc = (top_patient / total_claims).fillna(0)
df["hcp_patient_concentration"] = df["prescriber_npi"].map(conc).fillna(0)
# hcp_patient_share: total claims per patient, averaged (proxy for volume)
df["hcp_patient_share"] = df["hcp_total_claims"] / df["hcp_unique_patients"].replace(0, 1)
# hcp_suspicious_specialty
sus_spec = set(s.lower() for s in PRODUCT_CONFIG["suspicious_prescriber_specialties"])
if "prescriber_specialty" in df.columns:
spec = df["prescriber_specialty"].fillna("").str.lower()
df["hcp_suspicious_specialty"] = spec.isin(sus_spec).astype(int)
else:
df["hcp_suspicious_specialty"] = 0
# hcp_high_benefit_flag
hbt = PRODUCT_CONFIG["hcp_high_benefit_threshold"]
df["hcp_high_benefit_flag"] = (df["hcp_avg_benefit_per_patient"] > hbt).astype(int)
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.6 Reject Code Features
# ═══════════════════════════════════════════════════════════════════════════════
def add_reject_code_features(df: pd.DataFrame) -> pd.DataFrame:
if "patient_id" not in df.columns:
for c in ["patient_reject_count", "patient_reject_rate", "patient_highrisk_reject_count", "patient_maximizer_count"]:
df[c] = 0
return df
if "has_reject_code" in df.columns:
df["patient_reject_count"] = df.groupby("patient_id")["has_reject_code"].transform("sum")
df["patient_reject_rate"] = df.groupby("patient_id")["has_reject_code"].transform("mean")
else:
df["patient_reject_count"] = 0
df["patient_reject_rate"] = 0
if "high_risk_reject" in df.columns:
df["patient_highrisk_reject_count"] = df.groupby("patient_id")["high_risk_reject"].transform("sum")
else:
df["patient_highrisk_reject_count"] = 0
if "maximizer_reject" in df.columns:
df["patient_maximizer_count"] = df.groupby("patient_id")["maximizer_reject"].transform("sum")
else:
df["patient_maximizer_count"] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.7 Plan Switching Features
# ═══════════════════════════════════════════════════════════════════════════════
def add_plan_switching_features(df: pd.DataFrame) -> pd.DataFrame:
if "patient_id" not in df.columns:
df["unique_plans_per_patient"] = 1
df["plan_switch_flag"] = 0
df["unique_bins_per_patient"] = 1
df["bin_switch_flag"] = 0
return df
if "primary_plan_id" in df.columns:
df["unique_plans_per_patient"] = df.groupby("patient_id")["primary_plan_id"].transform("nunique")
else:
df["unique_plans_per_patient"] = 1
df["plan_switch_flag"] = (df["unique_plans_per_patient"] > 1).astype(int)
if "primary_payer_bin" in df.columns:
df["unique_bins_per_patient"] = df.groupby("patient_id")["primary_payer_bin"].transform("nunique")
else:
df["unique_bins_per_patient"] = 1
df["bin_switch_flag"] = (df["unique_bins_per_patient"] > 1).astype(int)
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.8 Submission Features
# ═══════════════════════════════════════════════════════════════════════════════
def add_submission_features(df: pd.DataFrame) -> pd.DataFrame:
if _has_columns(df, FEATURE_DEPENDENCIES["patient_paper_submission_rate"]):
df["patient_paper_submission_rate"] = df.groupby("patient_id")["paper_submission"].transform("mean")
else:
df["patient_paper_submission_rate"] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.9 DAW Features
# ═══════════════════════════════════════════════════════════════════════════════
def add_daw_features(df: pd.DataFrame) -> pd.DataFrame:
if _has_columns(df, FEATURE_DEPENDENCIES["patient_daw_brand_count"]):
df["patient_daw_brand_count"] = df.groupby("patient_id")["daw_brand_required"].transform("sum")
else:
df["patient_daw_brand_count"] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.10 Linked Claim Features
# ═══════════════════════════════════════════════════════════════════════════════
def add_linked_claim_features(df: pd.DataFrame) -> pd.DataFrame:
if "patient_id" not in df.columns:
df["patient_adjusted_count"] = 0
df["patient_linked_count"] = 0
return df
if "is_adjusted" in df.columns:
df["patient_adjusted_count"] = df.groupby("patient_id")["is_adjusted"].transform("sum")
else:
df["patient_adjusted_count"] = 0
if "has_linked_claim" in df.columns:
df["patient_linked_count"] = df.groupby("patient_id")["has_linked_claim"].transform("sum")
else:
df["patient_linked_count"] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.11 Drug-Specific Features (Transaction Level)
# ═══════════════════════════════════════════════════════════════════════════════
def add_drug_specific_features(df: pd.DataFrame) -> pd.DataFrame:
deps = FEATURE_DEPENDENCIES
# patient_ndc_count
if _has_columns(df, deps["patient_ndc_count"]):
df["patient_ndc_count"] = df.groupby("patient_id")["drug_ndc"].transform("nunique")
else:
df["patient_ndc_count"] = 1
df["ndc_switch_flag"] = (df["patient_ndc_count"] > 1).astype(int)
# govt_insurance_flag
df["govt_insurance_flag"] = (df.get("insurance_type", pd.Series("", index=df.index)) == "Government").astype(int)
# quantity_anomaly
expected_qty = PRODUCT_CONFIG["quantity_expected"]
if "quantity" in df.columns:
df["quantity_anomaly"] = (df["quantity"] != expected_qty).astype(int)
else:
df["quantity_anomaly"] = 0
# days_supply_anomaly
expected_ds = PRODUCT_CONFIG["days_supply_expected"]
if "days_supply" in df.columns:
df["days_supply_anomaly"] = (df["days_supply"] != expected_ds).astype(int)
else:
df["days_supply_anomaly"] = 0
# age_violation_flag
if "patient_age" in df.columns:
df["age_violation_flag"] = (df["patient_age"] < PRODUCT_CONFIG["underage_threshold"]).astype(int)
else:
df["age_violation_flag"] = 0
# new_patient_burst
if _has_columns(df, deps["new_patient_burst"]):
df["new_patient_burst"] = (
(df["days_since_first_fill"] <= 7) & (df["total_fills_per_patient"] > 1)
).astype(int)
else:
df["new_patient_burst"] = 0
# cross_state_fill
if _has_columns(df, deps["cross_state_fill"]):
df["cross_state_fill"] = (df["patient_state"] != df["pharmacy_state"]).astype(int)
else:
df["cross_state_fill"] = 0
# Z-scores (relative to patient)
for col, zname in [("copay_after", "copay_zscore"), ("usual_customary", "total_claim_zscore"), ("usual_customary", "uc_zscore")]:
if col in df.columns and "patient_id" in df.columns:
mean = df.groupby("patient_id")[col].transform("mean")
std = df.groupby("patient_id")[col].transform("std").replace(0, np.nan)
df[zname] = ((df[col] - mean) / std).fillna(0)
else:
df[zname] = 0
# benefit_ratio
if _has_columns(df, deps["benefit_ratio"]):
df["benefit_ratio"] = (df["benefit_amount"] / df["usual_customary"].replace(0, np.nan)).fillna(0)
else:
df["benefit_ratio"] = 0
# Transaction Level: transaction_benefit_score, transaction_oop_score
if "benefit_amount" in df.columns:
ben_mean = df["benefit_amount"].mean()
ben_std = df["benefit_amount"].std()
if ben_std > 0:
df["transaction_benefit_score"] = ((df["benefit_amount"] - ben_mean) / ben_std).fillna(0)
else:
df["transaction_benefit_score"] = 0
else:
df["transaction_benefit_score"] = 0
if "copay_after" in df.columns:
oop_mean = df["copay_after"].mean()
oop_std = df["copay_after"].std()
if oop_std > 0:
df["transaction_oop_score"] = ((df["copay_after"] - oop_mean) / oop_std).fillna(0)
else:
df["transaction_oop_score"] = 0
else:
df["transaction_oop_score"] = 0
return df
# ═══════════════════════════════════════════════════════════════════════════════
# 4.12 Group-Aware Features (v4 β€” Group 8141 vs Group 8200)
# ═══════════════════════════════════════════════════════════════════════════════
def get_days_supply_tier(days_supply):
"""Map days_supply to tier string for benefit lookup."""
if pd.isna(days_supply):
return "1-30"
ds = int(days_supply)
if ds <= 30:
return "1-30"
elif ds <= 60:
return "31-60"
else:
return "61-90"
def lookup_benefit_cap(group_id, scenario, fill_date, days_supply_tier):
"""Look up the max allowed benefit for a claim based on group config."""
if pd.isna(group_id) or str(group_id).strip().upper() == "UNKNOWN":
return np.nan
gid = str(group_id).strip().upper()
if gid not in GROUP_BENEFIT_CONFIG:
return np.nan
cfg = GROUP_BENEFIT_CONFIG[gid]
if scenario not in cfg["covered_scenarios"]:
return 0.0 # non-covered scenario β†’ cap is $0
# Find matching date range in benefit_schedule
if fill_date is pd.NaT or pd.isna(fill_date):
return np.nan
for (sched_scenario, start_str, end_str), tier_map in cfg["benefit_schedule"].items():
if sched_scenario != scenario:
continue
start_dt = pd.Timestamp(start_str)
end_dt = pd.Timestamp(end_str)
if start_dt <= fill_date <= end_dt:
return tier_map.get(days_supply_tier, np.nan)
return np.nan
def add_group_aware_features(df: pd.DataFrame) -> pd.DataFrame:
"""Generate features based on Group 8141 vs Group 8200 benefit rules."""
deps = FEATURE_DEPENDENCIES
# days_supply_tier
if "days_supply" in df.columns:
df["days_supply_tier"] = df["days_supply"].apply(get_days_supply_tier)
else:
df["days_supply_tier"] = "1-30"
# allowed_benefit_cap
if _has_columns(df, deps["allowed_benefit_cap"]):
df["allowed_benefit_cap"] = df.apply(
lambda row: lookup_benefit_cap(
row.get("group_id", "UNKNOWN"),
row.get("claim_scenario_derived", "commercial_approved"),
row.get("fill_date", pd.NaT),
row.get("days_supply_tier", "1-30"),
), axis=1,
)
else:
logger.warning("Skipping 'allowed_benefit_cap' β€” missing group_id, claim_scenario, fill_date, or days_supply")
df["allowed_benefit_cap"] = np.nan
# cap_utilization_ratio = benefit_amount / allowed_benefit_cap
if "benefit_amount" in df.columns and "allowed_benefit_cap" in df.columns:
df["cap_utilization_ratio"] = (
df["benefit_amount"] / df["allowed_benefit_cap"].replace(0, np.nan)
).fillna(0).clip(lower=0)
# Cap at 10x to avoid extreme outliers
df["cap_utilization_ratio"] = df["cap_utilization_ratio"].clip(upper=10)
else:
df["cap_utilization_ratio"] = 0
# excess_payment_amount = max(0, benefit_amount - allowed_benefit_cap)
if "benefit_amount" in df.columns and "allowed_benefit_cap" in df.columns:
df["excess_payment_amount"] = (
(df["benefit_amount"] - df["allowed_benefit_cap"]).clip(lower=0)
).fillna(0)
else:
df["excess_payment_amount"] = 0
# scenario_not_covered_flag: claim scenario not covered by group
if _has_columns(df, deps["scenario_not_covered_flag"]):
def is_scenario_not_covered(row):
gid = str(row.get("group_id", "UNKNOWN")).strip().upper()
scenario = row.get("claim_scenario_derived", "commercial_approved")
if gid not in GROUP_BENEFIT_CONFIG:
return 0 # unknown group β†’ cannot flag
return int(scenario in GROUP_BENEFIT_CONFIG[gid]["non_covered_scenarios"])
df["scenario_not_covered_flag"] = df.apply(is_scenario_not_covered, axis=1)
else:
df["scenario_not_covered_flag"] = 0
# invalid_period_benefit_flag: group+scenario+date combo has no valid benefit schedule
if _has_columns(df, deps["invalid_period_benefit_flag"]):
def is_invalid_period(row):
gid = str(row.get("group_id", "UNKNOWN")).strip().upper()
scenario = row.get("claim_scenario_derived", "commercial_approved")
fill_date = row.get("fill_date", pd.NaT)
if gid not in GROUP_BENEFIT_CONFIG:
return 0
if scenario not in GROUP_BENEFIT_CONFIG[gid]["covered_scenarios"]:
return 0
# Check if any schedule entry matches this scenario+date
for (sched_scenario, start_str, end_str), _ in GROUP_BENEFIT_CONFIG[gid]["benefit_schedule"].items():
if sched_scenario == scenario:
start_dt = pd.Timestamp(start_str)
end_dt = pd.Timestamp(end_str)
if start_dt <= fill_date <= end_dt:
return 0
return 1 # no matching period found
df["invalid_period_benefit_flag"] = df.apply(is_invalid_period, axis=1)
else:
df["invalid_period_benefit_flag"] = 0
# group_benefit_mismatch_flag: scenario not covered OR invalid period
df["group_benefit_mismatch_flag"] = (
(df["scenario_not_covered_flag"] == 1) | (df["invalid_period_benefit_flag"] == 1)
).astype(int)
# annual_fill_count per patient per calendar year
if _has_columns(df, deps["annual_fill_count"]):
df["claim_year"] = df["fill_date"].dt.year
group_key = ["patient_id", "claim_year"]
if "card_id" in df.columns:
group_key.append("card_id")
df["annual_fill_count"] = df.groupby(group_key)["claim_id"].transform("count")
else:
df["annual_fill_count"] = 1
# annual_days_supply_count per patient per calendar year
if _has_columns(df, deps["annual_days_supply_count"]):
group_key = ["patient_id", "claim_year"]
if "card_id" in df.columns:
group_key.append("card_id")
df["annual_days_supply_count"] = df.groupby(group_key)["days_supply"].transform("sum")
else:
df["annual_days_supply_count"] = df.get("days_supply", pd.Series(30, index=df.index))
# annual_fill_limit_exceeded
max_fills = PRODUCT_CONFIG["annual_max_fills_per_patient"]
df["annual_fill_limit_exceeded"] = (df["annual_fill_count"] > max_fills).astype(int)
# annual_days_supply_limit_exceeded
max_ds = PRODUCT_CONFIG["annual_max_days_supply_per_patient"]
df["annual_days_supply_limit_exceeded"] = (df["annual_days_supply_count"] > max_ds).astype(int)
# non_covered_ndc_flag
if _has_columns(df, deps["non_covered_ndc_flag"]):
df["non_covered_ndc_flag"] = (~df["drug_ndc"].isin(COVERED_NDCS)).astype(int)
else:
df["non_covered_ndc_flag"] = 0
# govt_claim_with_benefit_flag: government insurance AND benefit_amount > 0
if _has_columns(df, deps["govt_claim_with_benefit_flag"]):
df["govt_claim_with_benefit_flag"] = (
(df["insurance_type"] == "Government") & (df["benefit_amount"] > 0)
).astype(int)
else:
df["govt_claim_with_benefit_flag"] = 0
# quantity_out_of_range_flag
if _has_columns(df, deps["quantity_out_of_range_flag"]):
qmin = PRODUCT_CONFIG["quantity_min"]
qmax = PRODUCT_CONFIG["quantity_max"]
df["quantity_out_of_range_flag"] = (
(df["quantity"] < qmin) | (df["quantity"] > qmax)
).astype(int)
else:
df["quantity_out_of_range_flag"] = 0
# days_supply_out_of_range_flag
if _has_columns(df, deps["days_supply_out_of_range_flag"]):
dsmin = PRODUCT_CONFIG["days_supply_min"]
dsmax = PRODUCT_CONFIG["days_supply_max"]
df["days_supply_out_of_range_flag"] = (
(df["days_supply"] < dsmin) | (df["days_supply"] > dsmax)
).astype(int)
else:
df["days_supply_out_of_range_flag"] = 0
# max_benefit_repeat_flag: patient+pharmacy gets max cap >=3 times
if _has_columns(df, deps["max_benefit_repeat_flag"]):
def get_cap(gid, scenario, tier):
if pd.isna(gid):
return np.nan
g = str(gid).strip().upper()
if g not in GROUP_BENEFIT_CONFIG:
return np.nan
for (sched_scenario, start_str, end_str), tier_map in GROUP_BENEFIT_CONFIG[g]["benefit_schedule"].items():
if sched_scenario == scenario:
return tier_map.get(tier, np.nan)
return np.nan
df["_tmp_cap"] = df.apply(
lambda row: get_cap(row.get("group_id"), row.get("claim_scenario_derived"), row.get("days_supply_tier")),
axis=1,
)
# Flag if benefit equals cap (within $1 tolerance) for same patient+pharmacy
df["_at_cap"] = (df["benefit_amount"] >= (df["_tmp_cap"] - 1)).astype(int)
cap_counts = df.groupby(["patient_id", "pharmacy_npi"])["_at_cap"].transform("sum")
df["max_benefit_repeat_flag"] = (cap_counts >= 3).astype(int)
df.drop(columns=["_tmp_cap", "_at_cap"], inplace=True, errors="ignore")
else:
df["max_benefit_repeat_flag"] = 0
logger.info(f"Group-aware features added: cap_utilization_ratio, excess_payment, scenario_not_covered, annual limits, NDC coverage, max_benefit_repeat")
return df
# ═══════════════════════════════════════════════════════════════════════════════
# MASTER FEATURE ENGINEERING
# ═══════════════════════════════════════════════════════════════════════════════
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
logger.info(f"Feature engineering on {len(df):,} claims...")
df = add_temporal_features(df)
df = add_rolling_window_features(df)
df = add_patient_behavioral_features(df)
df = add_pharmacy_features(df)
df = add_prescriber_features(df)
df = add_reject_code_features(df)
df = add_plan_switching_features(df)
df = add_submission_features(df)
df = add_daw_features(df)
df = add_linked_claim_features(df)
df = add_drug_specific_features(df)
df = add_group_aware_features(df)
logger.info(f"Feature engineering complete: {len(df.columns)} total columns")
return df
# ═══════════════════════════════════════════════════════════════════════════════
# SCALING & ENCODING
# ═══════════════════════════════════════════════════════════════════════════════
def scale_and_encode(df: pd.DataFrame) -> tuple:
"""
Scale numerics + encode categoricals. Only uses columns that actually exist.
Returns (df_processed, scaler, encoder, feature_names).
"""
# Auto-detect categorical columns present
cat_candidates = [
"claim_type", "refill_indicator", "rx_origin_code", "daw_code",
"other_coverage", "mail_order_indicator", "pharmacy_subcategory",
"prescriber_specialty", "primary_payer_bin", "primary_payer_pcn",
"primary_plan_id", "primary_plan_name", "primary_model_type",
"submission_method", "submission_type", "payment_method",
"reject_code", "reject_type", "linked_claim_type",
"group_number", "group_name", "pharmacy_state", "patient_state",
"drug_name", "drug_form", "drug_strength", "insurance_type",
"hcp_suspicious_specialty",
"pharmacy_active_flag", "pharmacy_high_benefit_flag",
"hcp_high_benefit_flag",
"risk_tier",
]
cat_present = [c for c in cat_candidates if c in df.columns]
# Auto-detect numeric columns present
num_candidates = [
# Core fields
"days_supply", "quantity", "copay_before", "copay_after",
"benefit_amount", "usual_customary", "dispensing_fee", "sales_tax",
"remaining_balance", "number_of_benefits", "total_copay",
# Temporal
"days_between_fills", "days_since_first_fill", "claim_month",
"claim_dow", "claim_quarter", "rx_lag_days",
# Patient rolling windows
"patient_fill_count_7d", "patient_fill_count_30d", "patient_fill_count_90d",
"patient_copay_spend_7d", "patient_copay_spend_30d", "patient_copay_spend_90d",
"patient_total_claim_7d", "patient_total_claim_30d", "patient_total_claim_90d",
"patient_benefit_7d", "patient_benefit_30d", "patient_benefit_90d",
# Pharmacy rolling
"pharmacy_claim_count_30d", "pharmacy_claim_count_90d",
"prescriber_claim_count_30d", "prescriber_claim_count_90d",
# Patient behavioral
"unique_pharmacies_overall", "unique_programs_per_patient",
"unique_prescribers_per_patient", "total_fills_per_patient",
"avg_days_between_fills", "std_days_between_fills", "max_fills_any_30d",
"copay_deviation_from_patient_mean", "total_benefit_per_patient",
"patient_reject_count", "patient_reject_rate", "patient_highrisk_reject_count",
"patient_maximizer_count", "unique_plans_per_patient", "unique_bins_per_patient",
"patient_paper_submission_rate", "patient_daw_brand_count",
"patient_adjusted_count", "patient_linked_count",
"patient_ndc_count", "copay_zscore", "total_claim_zscore",
"uc_zscore", "benefit_ratio",
# Patient level v3
"patient_active_duration", "patient_avg_gap",
"patient_one_and_done", "patient_short_gap_pct", "patient_long_gap_pct",
# HCP level v3
"hcp_total_benefit", "hcp_total_claims", "hcp_unique_patients",
"hcp_avg_benefit_per_claim", "hcp_avg_benefit_per_patient",
"hcp_unique_pharmacies", "hcp_one_and_done_pct",
"hcp_patient_concentration", "hcp_max_benefit_per_patient",
"hcp_std_benefit", "hcp_patient_share",
# Pharmacy level v3
"pharmacy_unique_patients", "pharmacy_total_claims",
"pharmacy_claims_per_patient_ratio", "pharmacy_avg_copay",
"pharmacy_mail_order_pct", "pharmacy_reject_rate",
"pharmacy_paper_submission_rate", "pharmacy_unique_hcps",
"pharmacy_hcp_concentration", "pharmacy_one_and_done_pct",
"pharmacy_total_benefit", "pharmacy_avg_benefit",
"pharmacy_std_benefit", "pharmacy_max_benefit_per_patient",
"pharmacy_std_copay", "pharmacy_max_claim_per_patient",
"pharmacy_fraud_risk_score",
# Transaction level v3
"transaction_benefit_score", "transaction_oop_score",
# Group-aware v4
"cap_utilization_ratio", "excess_payment_amount",
"annual_fill_count", "annual_days_supply_count",
]
num_present = [c for c in num_candidates if c in df.columns]
logger.info(f"Encoding {len(cat_present)} categorical, scaling {len(num_present)} numeric features")
# Save original categorical values for rule evaluation
for c in cat_present:
df[f"_orig_{c}"] = df[c].astype(str)
# Encode categoricals
if cat_present:
encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)
df_cat = pd.DataFrame(
encoder.fit_transform(df[cat_present].fillna("MISSING")),
columns=cat_present, index=df.index
)
else:
encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)
df_cat = pd.DataFrame(index=df.index)
# Scale numerics
if num_present:
scaler = StandardScaler()
df_num = pd.DataFrame(
scaler.fit_transform(df[num_present].fillna(0)),
columns=num_present, index=df.index
)
else:
scaler = StandardScaler()
df_num = pd.DataFrame(index=df.index)
df_processed = pd.concat([df_num, df_cat], axis=1)
feature_names = num_present + cat_present
return df_processed, scaler, encoder, feature_names
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Feature engineering v2")
parser.add_argument("--input", default="data/ingested.parquet", help="Ingested parquet")
parser.add_argument("--output", default="data/features.parquet", help="Output parquet")
args = parser.parse_args()
df = pd.read_parquet(args.input)
df = engineer_features(df)
df.to_parquet(args.output, index=False)
logger.info(f"Saved features to {args.output}")