debatefloor / server /claim_generator.py
AniketAsla's picture
sync: mirror git d05fcb5 to Space
b4ac377 verified
"""
server/claim_generator.py
DebateFloor — Procedural Claim Generator
Transforms DebateFloor from a fixed benchmark into a training environment.
Same (seed, fraud_type, coverage, difficulty) always produces the same episode.
Different seeds produce different claimant names, amounts, dates, and signal strengths.
5 fraud types x 4 coverage types x 3 jurisdictions x seed variation = 500+ unique episodes.
"""
from __future__ import annotations
import random
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field
# ─────────────────────────────────────────────────────────────
# CONSTANTS
# ─────────────────────────────────────────────────────────────
FRAUD_TYPES = [
"staged_accident",
"medical_inflation",
"identity_fraud",
"coordinated_ring",
"phantom_provider",
]
COVERAGE_TYPES = ["auto", "health", "property", "life"]
JURISDICTIONS = ["MH", "DL", "KA"] # Maharashtra, Delhi, Karnataka
DIFFICULTY_SIGNAL_STRENGTH = {
"easy": 0.90,
"medium": 0.55,
"hard": 0.20,
}
DIFFICULTY_AMBIGUITY = {
"easy": 0.10,
"medium": 0.45,
"hard": 0.80,
}
FRAUD_GROUND_TRUTH = {
"staged_accident": "deny_claim",
"medical_inflation": "deny_claim",
"identity_fraud": "deny_claim",
"coordinated_ring": "escalate_to_human",
"phantom_provider": "deny_claim",
"none": "approve_claim",
}
_FIRST_NAMES = [
"Arjun", "Priya", "Rahul", "Sunita", "Vikram", "Meena",
"Rohit", "Kavita", "Sanjay", "Anjali", "Deepak", "Pooja",
"Nikhil", "Rekha", "Amit", "Divya", "Suresh", "Nisha",
"Kiran", "Manoj", "Sneha", "Rajesh", "Lata", "Arun",
]
_LAST_NAMES = [
"Sharma", "Patel", "Singh", "Kumar", "Joshi", "Verma",
"Gupta", "Mehta", "Nair", "Reddy", "Das", "Iyer",
"Bhat", "Rao", "Pillai", "Saxena", "Tiwari", "Mishra",
]
_HOSPITALS = [
"Apollo Hospital", "Fortis Healthcare", "Manipal Hospital",
"Max Super Speciality", "Narayana Health", "Medanta",
"Kokilaben Dhirubhai Ambani", "Aster CMI", "Lilavati Hospital",
]
_GARAGES = [
"Tata Authorised Service", "Maruti True Value Workshop",
"Hyundai Care Centre", "Popular Motors", "City Auto Works",
"Highway Motors", "Star Auto Repair",
]
_INSURERS = ["HDFC ERGO", "ICICI Lombard", "Bajaj Allianz", "New India Assurance", "United India"]
# ─────────────────────────────────────────────────────────────
# DATA MODELS
# ─────────────────────────────────────────────────────────────
class ClaimScenario(BaseModel):
claim_id: str
seed: int
fraud_type: str
coverage_type: str
jurisdiction: str
difficulty: str
claimant: Dict[str, Any]
incident: Dict[str, Any]
documents: List[Dict[str, Any]]
ground_truth: str
ambiguity_score: float = Field(ge=0.0, le=1.0)
payout_amount_inr: float
expected_fraud_signals: List[str]
linked_claims: List[Dict[str, Any]] = Field(default_factory=list)
available_actions: List[str] = Field(default_factory=list)
max_steps: int = 10
task_id: str = ""
# ─────────────────────────────────────────────────────────────
# HELPERS
# ─────────────────────────────────────────────────────────────
def _make_claimant(rng: random.Random, jurisdiction: str) -> Dict[str, Any]:
first = rng.choice(_FIRST_NAMES)
last = rng.choice(_LAST_NAMES)
return {
"name": f"{first} {last}",
"age": rng.randint(24, 62),
"policy_number": f"POL-{jurisdiction}-{rng.randint(100000, 999999)}",
"policy_start_date": f"202{rng.randint(1,4)}-{rng.randint(1,12):02d}-01",
"insurer": rng.choice(_INSURERS),
"jurisdiction": jurisdiction,
"phone": f"+91-{rng.randint(7000000000, 9999999999)}",
}
def _incident_date(rng: random.Random) -> str:
return f"2025-{rng.randint(1,12):02d}-{rng.randint(1,28):02d}"
def _base_payout(coverage: str, rng: random.Random) -> float:
ranges = {
"auto": (80_000, 450_000),
"health": (120_000, 800_000),
"property": (200_000, 2_000_000),
"life": (500_000, 5_000_000),
}
lo, hi = ranges[coverage]
return round(rng.uniform(lo, hi), -3)
# ─────────────────────────────────────────────────────────────
# FRAUD TYPE BUILDERS
# ─────────────────────────────────────────────────────────────
def _build_staged_accident(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict:
payout = _base_payout(coverage, rng)
inflated = round(payout * rng.uniform(1.4, 2.1), -3)
garage = rng.choice(_GARAGES)
date = _incident_date(rng)
cost_mismatch = ss > 0.5
docs = [
{
"doc_id": "DOC-001", "doc_type": "FIR",
"content": f"FIR filed {date}. Vehicle collision at NH-48. Minor scratches and bumper dent.",
"is_tampered": False, "tamper_signal": None,
},
{
"doc_id": "DOC-002", "doc_type": "repair_estimate",
"content": (
f"Estimate from {garage}: Rs {inflated:,.0f}. "
f"{'Engine replacement, full front assembly, airbag deployment.' if cost_mismatch else 'Bumper repair, paint job.'}"
),
"is_tampered": cost_mismatch,
"tamper_signal": "cost_mismatch_with_damage" if cost_mismatch else None,
},
{
"doc_id": "DOC-003", "doc_type": "witness_statement",
"content": (
f"Witness {rng.choice(_FIRST_NAMES)} {rng.choice(_LAST_NAMES)}: "
f"'Vehicle was {'stationary when struck' if ss > 0.6 else 'moving normally'}.'"
),
"is_tampered": ss > 0.75,
"tamper_signal": "witness_inconsistency" if ss > 0.75 else None,
},
]
signals = []
if cost_mismatch:
signals.append("cost_mismatch_with_damage")
if ss > 0.75:
signals.append("witness_inconsistency")
if ss > 0.85:
signals.append("no_third_party_damage")
return {
"incident": {
"date": date, "type": "vehicle_collision",
"location": f"NH-48, {claimant['jurisdiction']}",
"description": "Collision reported on national highway.",
"claimed_amount_inr": inflated,
},
"documents": docs,
"payout_amount_inr": inflated,
"expected_fraud_signals": signals,
"linked_claims": [],
}
def _build_medical_inflation(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict:
actual = _base_payout("health", rng)
claimed = round(actual * rng.uniform(2.0, 4.5), -3)
hospital = rng.choice(_HOSPITALS)
date = _incident_date(rng)
real_proc = rng.choice(["appendectomy", "knee arthroscopy", "cataract surgery"])
fake_proc = rng.choice(["cardiac bypass", "spinal fusion", "liver transplant"])
inflated = ss > 0.4
docs = [
{
"doc_id": "DOC-001", "doc_type": "discharge_summary",
"content": (
f"Patient {claimant['name']} admitted {date}. "
f"Procedure: {fake_proc if inflated else real_proc}. Hospital: {hospital}."
),
"is_tampered": inflated,
"tamper_signal": "procedure_mismatch" if inflated else None,
},
{
"doc_id": "DOC-002", "doc_type": "hospital_bill",
"content": f"Total bill: Rs {claimed:,.0f}. ICU: Rs {claimed*0.4:,.0f}. Procedure: Rs {claimed*0.5:,.0f}.",
"is_tampered": ss > 0.6,
"tamper_signal": "billing_code_mismatch" if ss > 0.6 else None,
},
{
"doc_id": "DOC-003", "doc_type": "prescription",
"content": (
f"Post-procedure medication for {real_proc}. "
f"{'Inconsistent with discharge summary procedure.' if inflated else 'As prescribed.'}"
),
"is_tampered": inflated,
"tamper_signal": "prescription_procedure_mismatch" if inflated else None,
},
]
signals = []
if inflated:
signals.append("procedure_mismatch")
if ss > 0.6:
signals.append("billing_code_mismatch")
if ss > 0.8:
signals.append("hospital_no_record")
return {
"incident": {
"date": date, "type": "medical_procedure",
"location": hospital,
"description": f"Hospitalisation claim for {fake_proc if inflated else real_proc}.",
"claimed_amount_inr": claimed,
},
"documents": docs,
"payout_amount_inr": claimed,
"expected_fraud_signals": signals,
"linked_claims": [],
}
def _build_identity_fraud(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict:
date = _incident_date(rng)
payout = _base_payout(coverage, rng)
age_delta = rng.randint(8, 25)
docs = [
{
"doc_id": "DOC-001", "doc_type": "identity_proof",
"content": (
f"Aadhaar: {rng.randint(1000,9999)}-{rng.randint(1000,9999)}-{rng.randint(1000,9999)}. "
f"Name: {claimant['name']}. DOB mismatch: recorded age {claimant['age']}, Aadhaar age {claimant['age']+age_delta}."
),
"is_tampered": ss > 0.5,
"tamper_signal": "identity_mismatch" if ss > 0.5 else None,
},
{
"doc_id": "DOC-002", "doc_type": "policy_document",
"content": f"Policy {claimant['policy_number']} issued 5 days before incident. Claimant age discrepancy noted.",
"is_tampered": True,
"tamper_signal": "recent_policy_purchase",
},
{
"doc_id": "DOC-003", "doc_type": "hospital_admission",
"content": f"{'No record of admission for this Aadhaar.' if ss > 0.4 else 'Admission confirmed.'} Hospital: {rng.choice(_HOSPITALS)}.",
"is_tampered": ss > 0.4,
"tamper_signal": "hospital_no_record" if ss > 0.4 else None,
},
]
signals = ["identity_mismatch", "recent_policy_purchase"]
if ss > 0.4:
signals.append("hospital_no_record")
if ss > 0.7:
signals.append("dob_inconsistency")
return {
"incident": {
"date": date, "type": "identity_verified_claim",
"location": claimant["jurisdiction"],
"description": "Claim filed under suspected ghost identity.",
"claimed_amount_inr": payout,
},
"documents": docs,
"payout_amount_inr": payout,
"expected_fraud_signals": signals,
"linked_claims": [],
}
def _build_coordinated_ring(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict:
date = _incident_date(rng)
payout = _base_payout(coverage, rng)
broker = f"BRK-{rng.randint(1000, 9999)}"
linked = [
{
"claim_id": f"CLM-RING-{rng.randint(10000,99999)}",
"claimant_name": f"{rng.choice(_FIRST_NAMES)} {rng.choice(_LAST_NAMES)}",
"policy_number": f"POL-{claimant['jurisdiction']}-{rng.randint(100000,999999)}",
"amount_inr": round(payout * rng.uniform(0.7, 1.3), -3),
"broker_code": broker,
"incident_date": date,
"fraud_signal": "clustered_policy_broker" if ss > 0.3 else None,
}
for _ in range(rng.randint(3, 5))
]
docs = [
{
"doc_id": "DOC-001", "doc_type": "claim_form",
"content": f"Claim filed {date}. Amount: Rs {payout:,.0f}. Broker: {broker}.",
"is_tampered": False, "tamper_signal": None,
},
{
"doc_id": "DOC-002", "doc_type": "policy_document",
"content": f"Policy {claimant['policy_number']}. Broker: {broker}. Same broker across multiple simultaneous claims.",
"is_tampered": ss > 0.4,
"tamper_signal": "clustered_policy_broker" if ss > 0.4 else None,
},
]
signals = []
if ss > 0.3:
signals.append("clustered_policy_broker")
if ss > 0.5:
signals.append("coordinated_incident_timing")
if ss > 0.7:
signals.append("shared_witness_across_claims")
return {
"incident": {
"date": date, "type": "coordinated_fraud_ring",
"location": claimant["jurisdiction"],
"description": f"Claim linked to fraud ring via broker {broker}.",
"claimed_amount_inr": payout,
},
"documents": docs,
"payout_amount_inr": payout,
"expected_fraud_signals": signals,
"linked_claims": linked,
}
def _build_phantom_provider(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict:
date = _incident_date(rng)
payout = _base_payout("health", rng)
fake_hospital = f"Sri {rng.choice(_LAST_NAMES)} Medical Centre"
docs = [
{
"doc_id": "DOC-001", "doc_type": "discharge_summary",
"content": f"Discharged from {fake_hospital if ss > 0.4 else rng.choice(_HOSPITALS)}. Date: {date}.",
"is_tampered": ss > 0.4,
"tamper_signal": "unregistered_provider" if ss > 0.4 else None,
},
{
"doc_id": "DOC-002", "doc_type": "hospital_registration",
"content": f"{'Hospital not found in IRDAI registry.' if ss > 0.5 else 'Registered provider.'} GST: {'INVALID' if ss > 0.6 else 'VALID'}.",
"is_tampered": ss > 0.5,
"tamper_signal": "invalid_gst_registration" if ss > 0.6 else None,
},
{
"doc_id": "DOC-003", "doc_type": "receipt",
"content": f"Payment Rs {payout:,.0f}. {'No bank transfer record found.' if ss > 0.55 else 'Bank transfer confirmed.'}",
"is_tampered": ss > 0.55,
"tamper_signal": "no_payment_trail" if ss > 0.55 else None,
},
]
signals = []
if ss > 0.4:
signals.append("unregistered_provider")
if ss > 0.5:
signals.append("invalid_gst_registration")
if ss > 0.55:
signals.append("no_payment_trail")
if ss > 0.8:
signals.append("cloned_discharge_template")
return {
"incident": {
"date": date, "type": "phantom_provider_claim",
"location": claimant["jurisdiction"],
"description": f"Medical claim from provider {fake_hospital} — registration unverifiable.",
"claimed_amount_inr": payout,
},
"documents": docs,
"payout_amount_inr": payout,
"expected_fraud_signals": signals,
"linked_claims": [],
}
def _build_clean_claim(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict:
date = _incident_date(rng)
payout = _base_payout(coverage, rng)
return {
"incident": {
"date": date, "type": f"{coverage}_claim",
"location": claimant["jurisdiction"],
"description": "Legitimate claim with all documents in order.",
"claimed_amount_inr": payout,
},
"documents": [
{
"doc_id": "DOC-001", "doc_type": "claim_form",
"content": f"Claim filed {date}. Amount: Rs {payout:,.0f}. Coverage: {coverage}.",
"is_tampered": False, "tamper_signal": None,
},
{
"doc_id": "DOC-002", "doc_type": "supporting_document",
"content": f"All documents verified. Policy active since {claimant['policy_start_date']}.",
"is_tampered": False, "tamper_signal": None,
},
],
"payout_amount_inr": payout,
"expected_fraud_signals": [],
"linked_claims": [],
}
# ─────────────────────────────────────────────────────────────
# ACTION + TASK MAPPINGS
# ─────────────────────────────────────────────────────────────
_BASE_ACTIONS = [
"validate_document", "flag_fraud_signal", "request_information",
"query_historical_data", "estimate_payout",
"approve_claim", "deny_claim", "escalate_to_human",
]
_EXTRA_ACTIONS: Dict[str, List[str]] = {
"coordinated_ring": ["query_linked_claim"],
"identity_fraud": ["verify_identity"],
"phantom_provider": ["verify_provider_registration"],
"staged_accident": [],
"medical_inflation": [],
"none": [],
}
_TASK_ID_MAP: Dict[str, str] = {
"none": "clean_claim",
"medical_inflation": "contradictory_claim",
"staged_accident": "contradictory_claim",
"identity_fraud": "contradictory_claim",
"coordinated_ring": "distribution_shift_claim",
"phantom_provider": "distribution_shift_claim",
}
_MAX_STEPS: Dict[str, int] = {"easy": 10, "medium": 18, "hard": 28}
_BUILDERS = {
"staged_accident": _build_staged_accident,
"medical_inflation": _build_medical_inflation,
"identity_fraud": _build_identity_fraud,
"coordinated_ring": _build_coordinated_ring,
"phantom_provider": _build_phantom_provider,
"none": _build_clean_claim,
}
# ─────────────────────────────────────────────────────────────
# PUBLIC API
# ─────────────────────────────────────────────────────────────
def generate_claim(
seed: int,
fraud_type: str = "medical_inflation",
coverage_type: str = "health",
difficulty: Literal["easy", "medium", "hard"] = "medium",
jurisdiction: Optional[str] = None,
) -> ClaimScenario:
"""
Generate a deterministic insurance claim episode.
Same (seed, fraud_type, coverage_type, difficulty) always returns the same episode.
Vary seed across [0, 9999] for 500+ unique training episodes per combination.
"""
if fraud_type not in FRAUD_TYPES + ["none"]:
raise ValueError(f"Invalid fraud_type '{fraud_type}'. Choose from {FRAUD_TYPES + ['none']}")
if coverage_type not in COVERAGE_TYPES:
raise ValueError(f"Invalid coverage_type '{coverage_type}'. Choose from {COVERAGE_TYPES}")
if difficulty not in _MAX_STEPS:
raise ValueError(f"Invalid difficulty '{difficulty}'. Choose from easy, medium, hard")
rng = random.Random(seed)
jur = jurisdiction or rng.choice(JURISDICTIONS)
ss = DIFFICULTY_SIGNAL_STRENGTH[difficulty] * rng.uniform(0.85, 1.0)
ambiguity = float(max(0.0, min(1.0, DIFFICULTY_AMBIGUITY[difficulty] * rng.uniform(0.9, 1.1))))
claimant = _make_claimant(rng, jur)
episode = _BUILDERS[fraud_type](rng, claimant, coverage_type, ss)
return ClaimScenario(
claim_id=f"CLM-{seed:04d}-{fraud_type[:3].upper()}-{jur}",
seed=seed,
fraud_type=fraud_type,
coverage_type=coverage_type,
jurisdiction=jur,
difficulty=difficulty,
claimant=claimant,
incident=episode["incident"],
documents=episode["documents"],
ground_truth=FRAUD_GROUND_TRUTH[fraud_type],
ambiguity_score=ambiguity,
payout_amount_inr=episode["payout_amount_inr"],
expected_fraud_signals=episode["expected_fraud_signals"],
linked_claims=episode.get("linked_claims", []),
available_actions=_BASE_ACTIONS + _EXTRA_ACTIONS.get(fraud_type, []),
max_steps=_MAX_STEPS[difficulty],
task_id=_TASK_ID_MAP.get(fraud_type, "contradictory_claim"),
)
def generate_episode_pool(
count: int = 500,
fraud_types: Optional[List[str]] = None,
coverage_types: Optional[List[str]] = None,
difficulties: Optional[List[str]] = None,
) -> List[ClaimScenario]:
"""Generate a pool of training episodes across all fraud/coverage/difficulty combinations."""
fraud_types = fraud_types or FRAUD_TYPES
coverage_types = coverage_types or COVERAGE_TYPES
difficulties = difficulties or list(_MAX_STEPS.keys())
episodes: List[ClaimScenario] = []
seed = 0
while len(episodes) < count:
for ft in fraud_types:
for ct in coverage_types:
for diff in difficulties:
if len(episodes) >= count:
break
episodes.append(generate_claim(seed, ft, ct, diff))
seed += 1
return episodes