""" server/claim_generator.py DebateFloor — Procedural Claim Generator Transforms DebateFloor from a fixed benchmark into a training environment. Same (seed, fraud_type, coverage, difficulty) always produces the same episode. Different seeds produce different claimant names, amounts, dates, and signal strengths. 5 fraud types x 4 coverage types x 3 jurisdictions x seed variation = 500+ unique episodes. """ from __future__ import annotations import random from typing import Any, Dict, List, Literal, Optional from pydantic import BaseModel, Field # ───────────────────────────────────────────────────────────── # CONSTANTS # ───────────────────────────────────────────────────────────── FRAUD_TYPES = [ "staged_accident", "medical_inflation", "identity_fraud", "coordinated_ring", "phantom_provider", ] COVERAGE_TYPES = ["auto", "health", "property", "life"] JURISDICTIONS = ["MH", "DL", "KA"] # Maharashtra, Delhi, Karnataka DIFFICULTY_SIGNAL_STRENGTH = { "easy": 0.90, "medium": 0.55, "hard": 0.20, } DIFFICULTY_AMBIGUITY = { "easy": 0.10, "medium": 0.45, "hard": 0.80, } FRAUD_GROUND_TRUTH = { "staged_accident": "deny_claim", "medical_inflation": "deny_claim", "identity_fraud": "deny_claim", "coordinated_ring": "escalate_to_human", "phantom_provider": "deny_claim", "none": "approve_claim", } _FIRST_NAMES = [ "Arjun", "Priya", "Rahul", "Sunita", "Vikram", "Meena", "Rohit", "Kavita", "Sanjay", "Anjali", "Deepak", "Pooja", "Nikhil", "Rekha", "Amit", "Divya", "Suresh", "Nisha", "Kiran", "Manoj", "Sneha", "Rajesh", "Lata", "Arun", ] _LAST_NAMES = [ "Sharma", "Patel", "Singh", "Kumar", "Joshi", "Verma", "Gupta", "Mehta", "Nair", "Reddy", "Das", "Iyer", "Bhat", "Rao", "Pillai", "Saxena", "Tiwari", "Mishra", ] _HOSPITALS = [ "Apollo Hospital", "Fortis Healthcare", "Manipal Hospital", "Max Super Speciality", "Narayana Health", "Medanta", "Kokilaben Dhirubhai Ambani", "Aster CMI", "Lilavati Hospital", ] _GARAGES = [ "Tata Authorised Service", "Maruti True Value Workshop", "Hyundai Care Centre", "Popular Motors", "City Auto Works", "Highway Motors", "Star Auto Repair", ] _INSURERS = ["HDFC ERGO", "ICICI Lombard", "Bajaj Allianz", "New India Assurance", "United India"] # ───────────────────────────────────────────────────────────── # DATA MODELS # ───────────────────────────────────────────────────────────── class ClaimScenario(BaseModel): claim_id: str seed: int fraud_type: str coverage_type: str jurisdiction: str difficulty: str claimant: Dict[str, Any] incident: Dict[str, Any] documents: List[Dict[str, Any]] ground_truth: str ambiguity_score: float = Field(ge=0.0, le=1.0) payout_amount_inr: float expected_fraud_signals: List[str] linked_claims: List[Dict[str, Any]] = Field(default_factory=list) available_actions: List[str] = Field(default_factory=list) max_steps: int = 10 task_id: str = "" # ───────────────────────────────────────────────────────────── # HELPERS # ───────────────────────────────────────────────────────────── def _make_claimant(rng: random.Random, jurisdiction: str) -> Dict[str, Any]: first = rng.choice(_FIRST_NAMES) last = rng.choice(_LAST_NAMES) return { "name": f"{first} {last}", "age": rng.randint(24, 62), "policy_number": f"POL-{jurisdiction}-{rng.randint(100000, 999999)}", "policy_start_date": f"202{rng.randint(1,4)}-{rng.randint(1,12):02d}-01", "insurer": rng.choice(_INSURERS), "jurisdiction": jurisdiction, "phone": f"+91-{rng.randint(7000000000, 9999999999)}", } def _incident_date(rng: random.Random) -> str: return f"2025-{rng.randint(1,12):02d}-{rng.randint(1,28):02d}" def _base_payout(coverage: str, rng: random.Random) -> float: ranges = { "auto": (80_000, 450_000), "health": (120_000, 800_000), "property": (200_000, 2_000_000), "life": (500_000, 5_000_000), } lo, hi = ranges[coverage] return round(rng.uniform(lo, hi), -3) # ───────────────────────────────────────────────────────────── # FRAUD TYPE BUILDERS # ───────────────────────────────────────────────────────────── def _build_staged_accident(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict: payout = _base_payout(coverage, rng) inflated = round(payout * rng.uniform(1.4, 2.1), -3) garage = rng.choice(_GARAGES) date = _incident_date(rng) cost_mismatch = ss > 0.5 docs = [ { "doc_id": "DOC-001", "doc_type": "FIR", "content": f"FIR filed {date}. Vehicle collision at NH-48. Minor scratches and bumper dent.", "is_tampered": False, "tamper_signal": None, }, { "doc_id": "DOC-002", "doc_type": "repair_estimate", "content": ( f"Estimate from {garage}: Rs {inflated:,.0f}. " f"{'Engine replacement, full front assembly, airbag deployment.' if cost_mismatch else 'Bumper repair, paint job.'}" ), "is_tampered": cost_mismatch, "tamper_signal": "cost_mismatch_with_damage" if cost_mismatch else None, }, { "doc_id": "DOC-003", "doc_type": "witness_statement", "content": ( f"Witness {rng.choice(_FIRST_NAMES)} {rng.choice(_LAST_NAMES)}: " f"'Vehicle was {'stationary when struck' if ss > 0.6 else 'moving normally'}.'" ), "is_tampered": ss > 0.75, "tamper_signal": "witness_inconsistency" if ss > 0.75 else None, }, ] signals = [] if cost_mismatch: signals.append("cost_mismatch_with_damage") if ss > 0.75: signals.append("witness_inconsistency") if ss > 0.85: signals.append("no_third_party_damage") return { "incident": { "date": date, "type": "vehicle_collision", "location": f"NH-48, {claimant['jurisdiction']}", "description": "Collision reported on national highway.", "claimed_amount_inr": inflated, }, "documents": docs, "payout_amount_inr": inflated, "expected_fraud_signals": signals, "linked_claims": [], } def _build_medical_inflation(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict: actual = _base_payout("health", rng) claimed = round(actual * rng.uniform(2.0, 4.5), -3) hospital = rng.choice(_HOSPITALS) date = _incident_date(rng) real_proc = rng.choice(["appendectomy", "knee arthroscopy", "cataract surgery"]) fake_proc = rng.choice(["cardiac bypass", "spinal fusion", "liver transplant"]) inflated = ss > 0.4 docs = [ { "doc_id": "DOC-001", "doc_type": "discharge_summary", "content": ( f"Patient {claimant['name']} admitted {date}. " f"Procedure: {fake_proc if inflated else real_proc}. Hospital: {hospital}." ), "is_tampered": inflated, "tamper_signal": "procedure_mismatch" if inflated else None, }, { "doc_id": "DOC-002", "doc_type": "hospital_bill", "content": f"Total bill: Rs {claimed:,.0f}. ICU: Rs {claimed*0.4:,.0f}. Procedure: Rs {claimed*0.5:,.0f}.", "is_tampered": ss > 0.6, "tamper_signal": "billing_code_mismatch" if ss > 0.6 else None, }, { "doc_id": "DOC-003", "doc_type": "prescription", "content": ( f"Post-procedure medication for {real_proc}. " f"{'Inconsistent with discharge summary procedure.' if inflated else 'As prescribed.'}" ), "is_tampered": inflated, "tamper_signal": "prescription_procedure_mismatch" if inflated else None, }, ] signals = [] if inflated: signals.append("procedure_mismatch") if ss > 0.6: signals.append("billing_code_mismatch") if ss > 0.8: signals.append("hospital_no_record") return { "incident": { "date": date, "type": "medical_procedure", "location": hospital, "description": f"Hospitalisation claim for {fake_proc if inflated else real_proc}.", "claimed_amount_inr": claimed, }, "documents": docs, "payout_amount_inr": claimed, "expected_fraud_signals": signals, "linked_claims": [], } def _build_identity_fraud(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict: date = _incident_date(rng) payout = _base_payout(coverage, rng) age_delta = rng.randint(8, 25) docs = [ { "doc_id": "DOC-001", "doc_type": "identity_proof", "content": ( f"Aadhaar: {rng.randint(1000,9999)}-{rng.randint(1000,9999)}-{rng.randint(1000,9999)}. " f"Name: {claimant['name']}. DOB mismatch: recorded age {claimant['age']}, Aadhaar age {claimant['age']+age_delta}." ), "is_tampered": ss > 0.5, "tamper_signal": "identity_mismatch" if ss > 0.5 else None, }, { "doc_id": "DOC-002", "doc_type": "policy_document", "content": f"Policy {claimant['policy_number']} issued 5 days before incident. Claimant age discrepancy noted.", "is_tampered": True, "tamper_signal": "recent_policy_purchase", }, { "doc_id": "DOC-003", "doc_type": "hospital_admission", "content": f"{'No record of admission for this Aadhaar.' if ss > 0.4 else 'Admission confirmed.'} Hospital: {rng.choice(_HOSPITALS)}.", "is_tampered": ss > 0.4, "tamper_signal": "hospital_no_record" if ss > 0.4 else None, }, ] signals = ["identity_mismatch", "recent_policy_purchase"] if ss > 0.4: signals.append("hospital_no_record") if ss > 0.7: signals.append("dob_inconsistency") return { "incident": { "date": date, "type": "identity_verified_claim", "location": claimant["jurisdiction"], "description": "Claim filed under suspected ghost identity.", "claimed_amount_inr": payout, }, "documents": docs, "payout_amount_inr": payout, "expected_fraud_signals": signals, "linked_claims": [], } def _build_coordinated_ring(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict: date = _incident_date(rng) payout = _base_payout(coverage, rng) broker = f"BRK-{rng.randint(1000, 9999)}" linked = [ { "claim_id": f"CLM-RING-{rng.randint(10000,99999)}", "claimant_name": f"{rng.choice(_FIRST_NAMES)} {rng.choice(_LAST_NAMES)}", "policy_number": f"POL-{claimant['jurisdiction']}-{rng.randint(100000,999999)}", "amount_inr": round(payout * rng.uniform(0.7, 1.3), -3), "broker_code": broker, "incident_date": date, "fraud_signal": "clustered_policy_broker" if ss > 0.3 else None, } for _ in range(rng.randint(3, 5)) ] docs = [ { "doc_id": "DOC-001", "doc_type": "claim_form", "content": f"Claim filed {date}. Amount: Rs {payout:,.0f}. Broker: {broker}.", "is_tampered": False, "tamper_signal": None, }, { "doc_id": "DOC-002", "doc_type": "policy_document", "content": f"Policy {claimant['policy_number']}. Broker: {broker}. Same broker across multiple simultaneous claims.", "is_tampered": ss > 0.4, "tamper_signal": "clustered_policy_broker" if ss > 0.4 else None, }, ] signals = [] if ss > 0.3: signals.append("clustered_policy_broker") if ss > 0.5: signals.append("coordinated_incident_timing") if ss > 0.7: signals.append("shared_witness_across_claims") return { "incident": { "date": date, "type": "coordinated_fraud_ring", "location": claimant["jurisdiction"], "description": f"Claim linked to fraud ring via broker {broker}.", "claimed_amount_inr": payout, }, "documents": docs, "payout_amount_inr": payout, "expected_fraud_signals": signals, "linked_claims": linked, } def _build_phantom_provider(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict: date = _incident_date(rng) payout = _base_payout("health", rng) fake_hospital = f"Sri {rng.choice(_LAST_NAMES)} Medical Centre" docs = [ { "doc_id": "DOC-001", "doc_type": "discharge_summary", "content": f"Discharged from {fake_hospital if ss > 0.4 else rng.choice(_HOSPITALS)}. Date: {date}.", "is_tampered": ss > 0.4, "tamper_signal": "unregistered_provider" if ss > 0.4 else None, }, { "doc_id": "DOC-002", "doc_type": "hospital_registration", "content": f"{'Hospital not found in IRDAI registry.' if ss > 0.5 else 'Registered provider.'} GST: {'INVALID' if ss > 0.6 else 'VALID'}.", "is_tampered": ss > 0.5, "tamper_signal": "invalid_gst_registration" if ss > 0.6 else None, }, { "doc_id": "DOC-003", "doc_type": "receipt", "content": f"Payment Rs {payout:,.0f}. {'No bank transfer record found.' if ss > 0.55 else 'Bank transfer confirmed.'}", "is_tampered": ss > 0.55, "tamper_signal": "no_payment_trail" if ss > 0.55 else None, }, ] signals = [] if ss > 0.4: signals.append("unregistered_provider") if ss > 0.5: signals.append("invalid_gst_registration") if ss > 0.55: signals.append("no_payment_trail") if ss > 0.8: signals.append("cloned_discharge_template") return { "incident": { "date": date, "type": "phantom_provider_claim", "location": claimant["jurisdiction"], "description": f"Medical claim from provider {fake_hospital} — registration unverifiable.", "claimed_amount_inr": payout, }, "documents": docs, "payout_amount_inr": payout, "expected_fraud_signals": signals, "linked_claims": [], } def _build_clean_claim(rng: random.Random, claimant: Dict, coverage: str, ss: float) -> Dict: date = _incident_date(rng) payout = _base_payout(coverage, rng) return { "incident": { "date": date, "type": f"{coverage}_claim", "location": claimant["jurisdiction"], "description": "Legitimate claim with all documents in order.", "claimed_amount_inr": payout, }, "documents": [ { "doc_id": "DOC-001", "doc_type": "claim_form", "content": f"Claim filed {date}. Amount: Rs {payout:,.0f}. Coverage: {coverage}.", "is_tampered": False, "tamper_signal": None, }, { "doc_id": "DOC-002", "doc_type": "supporting_document", "content": f"All documents verified. Policy active since {claimant['policy_start_date']}.", "is_tampered": False, "tamper_signal": None, }, ], "payout_amount_inr": payout, "expected_fraud_signals": [], "linked_claims": [], } # ───────────────────────────────────────────────────────────── # ACTION + TASK MAPPINGS # ───────────────────────────────────────────────────────────── _BASE_ACTIONS = [ "validate_document", "flag_fraud_signal", "request_information", "query_historical_data", "estimate_payout", "approve_claim", "deny_claim", "escalate_to_human", ] _EXTRA_ACTIONS: Dict[str, List[str]] = { "coordinated_ring": ["query_linked_claim"], "identity_fraud": ["verify_identity"], "phantom_provider": ["verify_provider_registration"], "staged_accident": [], "medical_inflation": [], "none": [], } _TASK_ID_MAP: Dict[str, str] = { "none": "clean_claim", "medical_inflation": "contradictory_claim", "staged_accident": "contradictory_claim", "identity_fraud": "contradictory_claim", "coordinated_ring": "distribution_shift_claim", "phantom_provider": "distribution_shift_claim", } _MAX_STEPS: Dict[str, int] = {"easy": 10, "medium": 18, "hard": 28} _BUILDERS = { "staged_accident": _build_staged_accident, "medical_inflation": _build_medical_inflation, "identity_fraud": _build_identity_fraud, "coordinated_ring": _build_coordinated_ring, "phantom_provider": _build_phantom_provider, "none": _build_clean_claim, } # ───────────────────────────────────────────────────────────── # PUBLIC API # ───────────────────────────────────────────────────────────── def generate_claim( seed: int, fraud_type: str = "medical_inflation", coverage_type: str = "health", difficulty: Literal["easy", "medium", "hard"] = "medium", jurisdiction: Optional[str] = None, ) -> ClaimScenario: """ Generate a deterministic insurance claim episode. Same (seed, fraud_type, coverage_type, difficulty) always returns the same episode. Vary seed across [0, 9999] for 500+ unique training episodes per combination. """ if fraud_type not in FRAUD_TYPES + ["none"]: raise ValueError(f"Invalid fraud_type '{fraud_type}'. Choose from {FRAUD_TYPES + ['none']}") if coverage_type not in COVERAGE_TYPES: raise ValueError(f"Invalid coverage_type '{coverage_type}'. Choose from {COVERAGE_TYPES}") if difficulty not in _MAX_STEPS: raise ValueError(f"Invalid difficulty '{difficulty}'. Choose from easy, medium, hard") rng = random.Random(seed) jur = jurisdiction or rng.choice(JURISDICTIONS) ss = DIFFICULTY_SIGNAL_STRENGTH[difficulty] * rng.uniform(0.85, 1.0) ambiguity = float(max(0.0, min(1.0, DIFFICULTY_AMBIGUITY[difficulty] * rng.uniform(0.9, 1.1)))) claimant = _make_claimant(rng, jur) episode = _BUILDERS[fraud_type](rng, claimant, coverage_type, ss) return ClaimScenario( claim_id=f"CLM-{seed:04d}-{fraud_type[:3].upper()}-{jur}", seed=seed, fraud_type=fraud_type, coverage_type=coverage_type, jurisdiction=jur, difficulty=difficulty, claimant=claimant, incident=episode["incident"], documents=episode["documents"], ground_truth=FRAUD_GROUND_TRUTH[fraud_type], ambiguity_score=ambiguity, payout_amount_inr=episode["payout_amount_inr"], expected_fraud_signals=episode["expected_fraud_signals"], linked_claims=episode.get("linked_claims", []), available_actions=_BASE_ACTIONS + _EXTRA_ACTIONS.get(fraud_type, []), max_steps=_MAX_STEPS[difficulty], task_id=_TASK_ID_MAP.get(fraud_type, "contradictory_claim"), ) def generate_episode_pool( count: int = 500, fraud_types: Optional[List[str]] = None, coverage_types: Optional[List[str]] = None, difficulties: Optional[List[str]] = None, ) -> List[ClaimScenario]: """Generate a pool of training episodes across all fraud/coverage/difficulty combinations.""" fraud_types = fraud_types or FRAUD_TYPES coverage_types = coverage_types or COVERAGE_TYPES difficulties = difficulties or list(_MAX_STEPS.keys()) episodes: List[ClaimScenario] = [] seed = 0 while len(episodes) < count: for ft in fraud_types: for ct in coverage_types: for diff in difficulties: if len(episodes) >= count: break episodes.append(generate_claim(seed, ft, ct, diff)) seed += 1 return episodes