"""Backend stubs for the ClaimSense adjudication gym. Each class mimics one corner of an insurer's IT estate (policy admin system, history mart, fraud-scoring API, document repository, coverage oracle, settlement maths, retail bank feed). Together they create the *partial-observability* surface the agent must explore. The data lives in ``CASE_LIBRARY`` — eight hand-crafted cases that span clean approvals, partial pay-outs, denials, escalations, and two flavours of fraud. For backwards compatibility the original ``Mock*`` class names and the ``CLAIM_SCENARIOS`` constant are re-exported at the bottom of the module. """ from __future__ import annotations import random from dataclasses import dataclass, field from typing import Any # ============================================================================= # Case schema # ============================================================================= @dataclass class CaseFile: """One concrete claim, including the hidden answer key.""" # Public-facing claim header claim_id: str claim_type: str claim_amount: float claimant_name: str incident_date: str description: str # Ground truth (server-only) true_verdict: str correct_payout: float is_fraud: bool fraud_type: str | None # Policy facts revealed only via query_policy policy_id: str policy_coverage_limit: float policy_deductible: float policy_status: str coverage_exclusions: list[str] # Workflow shape complexity: str requires_documents: list[str] requires_escalation: bool # History profile (revealed via query_claim_history) past_claims_count: int past_claims_total: float recent_claims_30_days: int # ============================================================================= # The eight curated cases # ============================================================================= def _build_library() -> list[CaseFile]: """Define the canonical case set in one place. Wrapping in a function keeps the top-level module body short and lets us regenerate the list cheaply in tests. """ return [ # --- 1. Routine fender-bender → straight approval ---------------- CaseFile( claim_id="CLM-2024-001", claim_type="auto_collision", claim_amount=3500.0, claimant_name="John Smith", incident_date="2024-03-01", description="Rear-ended at stoplight. Bumper and taillight damage.", true_verdict="approve", correct_payout=3000.0, is_fraud=False, fraud_type=None, policy_id="POL-AUTO-78234", policy_coverage_limit=50000.0, policy_deductible=500.0, policy_status="active", coverage_exclusions=[], complexity="simple", requires_documents=["photos"], requires_escalation=False, past_claims_count=1, past_claims_total=1200.0, recent_claims_30_days=0, ), # --- 2. Burst pipe with a low cap → partial settlement ----------- CaseFile( claim_id="CLM-2024-002", claim_type="home_water", claim_amount=45000.0, claimant_name="Sarah Johnson", incident_date="2024-02-28", description="Burst pipe caused flooding in basement. Extensive water damage.", true_verdict="partial_approve", correct_payout=24000.0, is_fraud=False, fraud_type=None, policy_id="POL-HOME-45123", policy_coverage_limit=25000.0, policy_deductible=1000.0, policy_status="active", coverage_exclusions=["flood_external"], complexity="standard", requires_documents=["photos", "repair_estimates"], requires_escalation=False, past_claims_count=0, past_claims_total=0.0, recent_claims_30_days=0, ), # --- 3. Staged accident → outright denial ----------------------- CaseFile( claim_id="CLM-2024-003", claim_type="auto_collision", claim_amount=12000.0, claimant_name="Mike Thompson", incident_date="2024-03-03", description="T-bone collision at intersection. Major damage to driver side.", true_verdict="deny", correct_payout=0.0, is_fraud=True, fraud_type="staged_accident", policy_id="POL-AUTO-91827", policy_coverage_limit=75000.0, policy_deductible=500.0, policy_status="active", coverage_exclusions=[], complexity="fraud", requires_documents=["photos", "police_report"], requires_escalation=True, past_claims_count=4, past_claims_total=28000.0, recent_claims_30_days=2, ), # --- 4. External flood — excluded → denial ---------------------- CaseFile( claim_id="CLM-2024-004", claim_type="home_water", claim_amount=18000.0, claimant_name="Emily Chen", incident_date="2024-03-02", description="Flooding from nearby river after heavy rains.", true_verdict="deny", correct_payout=0.0, is_fraud=False, fraud_type=None, policy_id="POL-HOME-67890", policy_coverage_limit=100000.0, policy_deductible=1000.0, policy_status="active", coverage_exclusions=["flood_external", "earthquake"], complexity="standard", requires_documents=["photos"], requires_escalation=False, past_claims_count=1, past_claims_total=5000.0, recent_claims_30_days=0, ), # --- 5. Six-figure house fire → escalate then approve ----------- CaseFile( claim_id="CLM-2024-005", claim_type="home_fire", claim_amount=150000.0, claimant_name="Robert Williams", incident_date="2024-02-25", description="Kitchen fire spread to living room. Significant structural damage.", true_verdict="approve", correct_payout=147500.0, is_fraud=False, fraud_type=None, policy_id="POL-HOME-34521", policy_coverage_limit=200000.0, policy_deductible=2500.0, policy_status="active", coverage_exclusions=["intentional_damage"], complexity="complex", requires_documents=["photos", "fire_report", "repair_estimates", "inventory_list"], requires_escalation=True, past_claims_count=0, past_claims_total=0.0, recent_claims_30_days=0, ), # --- 6. Inflated stolen-vehicle → fraud denial ------------------ CaseFile( claim_id="CLM-2024-006", claim_type="auto_theft", claim_amount=35000.0, claimant_name="David Miller", incident_date="2024-03-04", description="Vehicle stolen from parking lot. Claims vehicle had $10k in upgrades.", true_verdict="deny", correct_payout=0.0, is_fraud=True, fraud_type="inflated_claim", policy_id="POL-AUTO-55432", policy_coverage_limit=40000.0, policy_deductible=1000.0, policy_status="active", coverage_exclusions=[], complexity="fraud", requires_documents=["police_report", "purchase_receipts"], requires_escalation=True, past_claims_count=2, past_claims_total=15000.0, recent_claims_30_days=1, ), # --- 7. Slip-and-fall liability → clean approval ---------------- CaseFile( claim_id="CLM-2024-007", claim_type="liability", claim_amount=8500.0, claimant_name="Jennifer Davis", incident_date="2024-02-20", description="Visitor slipped on icy walkway. Medical bills for sprained ankle.", true_verdict="approve", correct_payout=8500.0, is_fraud=False, fraud_type=None, policy_id="POL-HOME-78901", policy_coverage_limit=100000.0, policy_deductible=0.0, policy_status="active", coverage_exclusions=[], complexity="standard", requires_documents=["medical_records", "incident_report"], requires_escalation=False, past_claims_count=0, past_claims_total=0.0, recent_claims_30_days=0, ), # --- 8. Lapsed policy → denial ---------------------------------- CaseFile( claim_id="CLM-2024-008", claim_type="auto_collision", claim_amount=5500.0, claimant_name="Amanda Wilson", incident_date="2024-03-05", description="Hit deer on highway. Front end damage.", true_verdict="deny", correct_payout=0.0, is_fraud=False, fraud_type=None, policy_id="POL-AUTO-12345", policy_coverage_limit=50000.0, policy_deductible=500.0, policy_status="lapsed", coverage_exclusions=[], complexity="simple", requires_documents=["photos"], requires_escalation=False, past_claims_count=2, past_claims_total=3000.0, recent_claims_30_days=0, ), ] CASE_LIBRARY: list[CaseFile] = _build_library() # ============================================================================= # Backend stubs — one per imaginary upstream system # ============================================================================= @dataclass class PolicyRegistryStub: """Stand-in for the policy administration system.""" case: CaseFile def lookup_policy(self) -> dict[str, Any]: return { "policy_id": self.case.policy_id, "policy_status": self.case.policy_status, "coverage_type": self._coverage_type(), "coverage_limit": self.case.policy_coverage_limit, "deductible": self.case.policy_deductible, "effective_date": "2023-01-01", "expiration_date": ( "2024-12-31" if self.case.policy_status == "active" else "2024-01-15" ), } def _coverage_type(self) -> str: kind = self.case.claim_type if kind.startswith("auto"): return "comprehensive_auto" if kind.startswith("home"): return "homeowners_standard" return "liability_general" @dataclass class HistoryLedgerStub: """Mart of past claims used to surface claim-frequency signals.""" case: CaseFile def get_claim_history(self) -> dict[str, Any]: n = self.case.past_claims_count return { "claimant_name": self.case.claimant_name, "total_past_claims": n, "total_claimed_amount": self.case.past_claims_total, "claims_last_30_days": self.case.recent_claims_30_days, "claims_last_year": n, "average_claim_amount": self.case.past_claims_total / max(1, n), "claim_frequency": "high" if n > 3 else "normal", } @dataclass class RiskSignalEngine: """Lightweight fraud-risk scorer driven by per-case heuristics. The score combines a small base rate with feature contributions so the agent observes a realistic, non-binary signal. """ case: CaseFile BASE_RISK: float = 0.10 RECENT_CLAIMS_WEIGHT: float = 0.20 HIGH_FREQUENCY_WEIGHT: float = 0.15 NEAR_LIMIT_WEIGHT: float = 0.10 FRAUD_PATTERN_WEIGHT: float = 0.40 NOISE_PROBABILITY: float = 0.10 SCORE_CEILING: float = 0.95 def check_fraud_signals(self) -> dict[str, Any]: flags: list[str] = [] score = self.BASE_RISK if self.case.recent_claims_30_days > 0: flags.append("multiple_claims_30_days") score += self.RECENT_CLAIMS_WEIGHT if self.case.past_claims_count > 3: flags.append("high_claim_frequency") score += self.HIGH_FREQUENCY_WEIGHT if self.case.claim_amount > self.case.policy_coverage_limit * 0.8: flags.append("near_coverage_limit") score += self.NEAR_LIMIT_WEIGHT if self.case.is_fraud: flags.append("pattern_match_known_fraud") score += self.FRAUD_PATTERN_WEIGHT if self.case.fraud_type == "staged_accident": flags.append("inconsistent_damage_pattern") elif self.case.fraud_type == "inflated_claim": flags.append("claim_amount_anomaly") elif random.random() < self.NOISE_PROBABILITY: # Realistic false-positive flags.append("minor_documentation_gap") score += 0.05 score = min(self.SCORE_CEILING, score) return { "risk_score": round(score, 2), "flags": flags, "recommendation": _risk_to_recommendation(score), "confidence": 0.85 if self.case.is_fraud else 0.75, } def _risk_to_recommendation(score: float) -> str: if score > 0.70: return "deny_high_risk" if score > 0.40: return "manual_review_required" return "proceed_normal" @dataclass class EvidenceVault: """Document management front-end. Each requested document gets a small dossier; missing documents are flagged so the agent can detect incomplete submissions. """ case: CaseFile def request_documents(self, doc_types: list[str]) -> dict[str, Any]: results: dict[str, dict[str, Any]] = {} for doc_type in doc_types: results[doc_type] = self._evaluate_doc(doc_type) # Fraud cases sneak in a metadata mismatch on photo evidence if self.case.is_fraud and "photos" in results: results["photos"]["notes"] = ( "Photos received but metadata shows inconsistencies." ) results["photos"]["verified"] = False return { "documents": results, "all_required_received": all( doc in doc_types for doc in self.case.requires_documents ), "missing_documents": [ doc for doc in self.case.requires_documents if doc not in doc_types ], } def _evaluate_doc(self, doc_type: str) -> dict[str, Any]: nice_name = doc_type.replace("_", " ").title() if doc_type in self.case.requires_documents: return { "status": "received", "verified": True, "notes": f"{nice_name} verified and matches claim.", } return { "status": "not_required", "verified": False, "notes": f"{nice_name} not required for this claim type.", } @dataclass class CoverageOracle: """Resolves whether a particular damage type is covered.""" case: CaseFile DAMAGE_MAP: dict[str, list[str]] = field( default_factory=lambda: { "auto_collision": ["collision", "vehicle_damage", "property_damage"], "auto_theft": ["theft", "stolen_vehicle", "stolen_contents"], "home_water": ["water_damage", "pipe_burst", "plumbing"], "home_fire": ["fire", "smoke_damage", "structural"], "liability": ["bodily_injury", "property_damage", "medical"], } ) def verify_coverage(self, damage_type: str) -> dict[str, Any]: if damage_type in self.case.coverage_exclusions: idx = self.case.coverage_exclusions.index(damage_type) + 1 return { "damage_type": damage_type, "is_covered": False, "reason": f"Excluded by policy: {damage_type}", "exclusion_clause": f"Section 4.{idx}", } catalogue = self.DAMAGE_MAP.get(self.case.claim_type, []) is_covered = damage_type.lower() in (item.lower() for item in catalogue) return { "damage_type": damage_type, "is_covered": is_covered, "reason": ( "Covered under policy" if is_covered else "Not covered under this policy type" ), "coverage_section": "Section 2.1" if is_covered else None, } @dataclass class SettlementMath: """Applies deductible and coverage cap to produce a payout figure.""" case: CaseFile def calculate_payout(self, claimed_amount: float) -> dict[str, Any]: after_ded = max(0.0, claimed_amount - self.case.policy_deductible) capped = min(after_ded, self.case.policy_coverage_limit) final = 0.0 if self.case.policy_status != "active" else capped return { "claimed_amount": claimed_amount, "deductible_applied": self.case.policy_deductible, "after_deductible": after_ded, "coverage_limit": self.case.policy_coverage_limit, "final_payout": final, "payout_breakdown": { "base": claimed_amount, "deductible": -self.case.policy_deductible, "limit_adjustment": min( 0.0, self.case.policy_coverage_limit - after_ded ), }, "notes": self._explain(final, after_ded), } def _explain(self, final: float, after_ded: float) -> str: if self.case.policy_status != "active": return "Policy is not active. No payout eligible." if final < after_ded: return ( f"Payout capped at coverage limit of " f"${self.case.policy_coverage_limit:,.2f}" ) return "Standard calculation applied." # ============================================================================= # Selectors # ============================================================================= def pick_random_case(seed: int | None = None) -> CaseFile: """Sample a case at random (optionally seeded for reproducibility).""" rng = random.Random(seed) if seed is not None else random return rng.choice(CASE_LIBRARY) def case_by_id(claim_id: str) -> CaseFile | None: """Look up a case by its public claim identifier.""" for case in CASE_LIBRARY: if case.claim_id == claim_id: return case return None def case_at(index: int) -> CaseFile: """Deterministic indexed access (wraps with modulo).""" return CASE_LIBRARY[index % len(CASE_LIBRARY)] # ============================================================================= # Backwards-compatible aliases # ============================================================================= # Older callers used these names; keep them so the public surface area # does not regress. ClaimScenario = CaseFile MockPolicyDB = PolicyRegistryStub MockClaimsHistoryDB = HistoryLedgerStub MockFraudAPI = RiskSignalEngine MockDocumentSystem = EvidenceVault MockCoverageVerifier = CoverageOracle MockPayoutCalculator = SettlementMath CLAIM_SCENARIOS = CASE_LIBRARY get_random_scenario = pick_random_case get_scenario_by_id = case_by_id get_scenario_by_index = case_at __all__ = [ "CaseFile", "PolicyRegistryStub", "HistoryLedgerStub", "RiskSignalEngine", "EvidenceVault", "CoverageOracle", "SettlementMath", "CASE_LIBRARY", "pick_random_case", "case_by_id", "case_at", # legacy "ClaimScenario", "MockPolicyDB", "MockClaimsHistoryDB", "MockFraudAPI", "MockDocumentSystem", "MockCoverageVerifier", "MockPayoutCalculator", "CLAIM_SCENARIOS", "get_random_scenario", "get_scenario_by_id", "get_scenario_by_index", ]