claims-env / server /mock_systems.py
akhiilll's picture
Deploy ClaimSense adjudication gym
1cfeb15 verified
"""Backend stubs for the ClaimSense adjudication gym.
Each class mimics one corner of an insurer's IT estate (policy admin
system, history mart, fraud-scoring API, document repository, coverage
oracle, settlement maths, retail bank feed). Together they create the
*partial-observability* surface the agent must explore.
The data lives in ``CASE_LIBRARY`` — eight hand-crafted cases that span
clean approvals, partial pay-outs, denials, escalations, and two flavours
of fraud.
For backwards compatibility the original ``Mock*`` class names and the
``CLAIM_SCENARIOS`` constant are re-exported at the bottom of the module.
"""
from __future__ import annotations
import random
from dataclasses import dataclass, field
from typing import Any
# =============================================================================
# Case schema
# =============================================================================
@dataclass
class CaseFile:
"""One concrete claim, including the hidden answer key."""
# Public-facing claim header
claim_id: str
claim_type: str
claim_amount: float
claimant_name: str
incident_date: str
description: str
# Ground truth (server-only)
true_verdict: str
correct_payout: float
is_fraud: bool
fraud_type: str | None
# Policy facts revealed only via query_policy
policy_id: str
policy_coverage_limit: float
policy_deductible: float
policy_status: str
coverage_exclusions: list[str]
# Workflow shape
complexity: str
requires_documents: list[str]
requires_escalation: bool
# History profile (revealed via query_claim_history)
past_claims_count: int
past_claims_total: float
recent_claims_30_days: int
# =============================================================================
# The eight curated cases
# =============================================================================
def _build_library() -> list[CaseFile]:
"""Define the canonical case set in one place.
Wrapping in a function keeps the top-level module body short and lets
us regenerate the list cheaply in tests.
"""
return [
# --- 1. Routine fender-bender → straight approval ----------------
CaseFile(
claim_id="CLM-2024-001",
claim_type="auto_collision",
claim_amount=3500.0,
claimant_name="John Smith",
incident_date="2024-03-01",
description="Rear-ended at stoplight. Bumper and taillight damage.",
true_verdict="approve",
correct_payout=3000.0,
is_fraud=False,
fraud_type=None,
policy_id="POL-AUTO-78234",
policy_coverage_limit=50000.0,
policy_deductible=500.0,
policy_status="active",
coverage_exclusions=[],
complexity="simple",
requires_documents=["photos"],
requires_escalation=False,
past_claims_count=1,
past_claims_total=1200.0,
recent_claims_30_days=0,
),
# --- 2. Burst pipe with a low cap → partial settlement -----------
CaseFile(
claim_id="CLM-2024-002",
claim_type="home_water",
claim_amount=45000.0,
claimant_name="Sarah Johnson",
incident_date="2024-02-28",
description="Burst pipe caused flooding in basement. Extensive water damage.",
true_verdict="partial_approve",
correct_payout=24000.0,
is_fraud=False,
fraud_type=None,
policy_id="POL-HOME-45123",
policy_coverage_limit=25000.0,
policy_deductible=1000.0,
policy_status="active",
coverage_exclusions=["flood_external"],
complexity="standard",
requires_documents=["photos", "repair_estimates"],
requires_escalation=False,
past_claims_count=0,
past_claims_total=0.0,
recent_claims_30_days=0,
),
# --- 3. Staged accident → outright denial -----------------------
CaseFile(
claim_id="CLM-2024-003",
claim_type="auto_collision",
claim_amount=12000.0,
claimant_name="Mike Thompson",
incident_date="2024-03-03",
description="T-bone collision at intersection. Major damage to driver side.",
true_verdict="deny",
correct_payout=0.0,
is_fraud=True,
fraud_type="staged_accident",
policy_id="POL-AUTO-91827",
policy_coverage_limit=75000.0,
policy_deductible=500.0,
policy_status="active",
coverage_exclusions=[],
complexity="fraud",
requires_documents=["photos", "police_report"],
requires_escalation=True,
past_claims_count=4,
past_claims_total=28000.0,
recent_claims_30_days=2,
),
# --- 4. External flood — excluded → denial ----------------------
CaseFile(
claim_id="CLM-2024-004",
claim_type="home_water",
claim_amount=18000.0,
claimant_name="Emily Chen",
incident_date="2024-03-02",
description="Flooding from nearby river after heavy rains.",
true_verdict="deny",
correct_payout=0.0,
is_fraud=False,
fraud_type=None,
policy_id="POL-HOME-67890",
policy_coverage_limit=100000.0,
policy_deductible=1000.0,
policy_status="active",
coverage_exclusions=["flood_external", "earthquake"],
complexity="standard",
requires_documents=["photos"],
requires_escalation=False,
past_claims_count=1,
past_claims_total=5000.0,
recent_claims_30_days=0,
),
# --- 5. Six-figure house fire → escalate then approve -----------
CaseFile(
claim_id="CLM-2024-005",
claim_type="home_fire",
claim_amount=150000.0,
claimant_name="Robert Williams",
incident_date="2024-02-25",
description="Kitchen fire spread to living room. Significant structural damage.",
true_verdict="approve",
correct_payout=147500.0,
is_fraud=False,
fraud_type=None,
policy_id="POL-HOME-34521",
policy_coverage_limit=200000.0,
policy_deductible=2500.0,
policy_status="active",
coverage_exclusions=["intentional_damage"],
complexity="complex",
requires_documents=["photos", "fire_report", "repair_estimates", "inventory_list"],
requires_escalation=True,
past_claims_count=0,
past_claims_total=0.0,
recent_claims_30_days=0,
),
# --- 6. Inflated stolen-vehicle → fraud denial ------------------
CaseFile(
claim_id="CLM-2024-006",
claim_type="auto_theft",
claim_amount=35000.0,
claimant_name="David Miller",
incident_date="2024-03-04",
description="Vehicle stolen from parking lot. Claims vehicle had $10k in upgrades.",
true_verdict="deny",
correct_payout=0.0,
is_fraud=True,
fraud_type="inflated_claim",
policy_id="POL-AUTO-55432",
policy_coverage_limit=40000.0,
policy_deductible=1000.0,
policy_status="active",
coverage_exclusions=[],
complexity="fraud",
requires_documents=["police_report", "purchase_receipts"],
requires_escalation=True,
past_claims_count=2,
past_claims_total=15000.0,
recent_claims_30_days=1,
),
# --- 7. Slip-and-fall liability → clean approval ----------------
CaseFile(
claim_id="CLM-2024-007",
claim_type="liability",
claim_amount=8500.0,
claimant_name="Jennifer Davis",
incident_date="2024-02-20",
description="Visitor slipped on icy walkway. Medical bills for sprained ankle.",
true_verdict="approve",
correct_payout=8500.0,
is_fraud=False,
fraud_type=None,
policy_id="POL-HOME-78901",
policy_coverage_limit=100000.0,
policy_deductible=0.0,
policy_status="active",
coverage_exclusions=[],
complexity="standard",
requires_documents=["medical_records", "incident_report"],
requires_escalation=False,
past_claims_count=0,
past_claims_total=0.0,
recent_claims_30_days=0,
),
# --- 8. Lapsed policy → denial ----------------------------------
CaseFile(
claim_id="CLM-2024-008",
claim_type="auto_collision",
claim_amount=5500.0,
claimant_name="Amanda Wilson",
incident_date="2024-03-05",
description="Hit deer on highway. Front end damage.",
true_verdict="deny",
correct_payout=0.0,
is_fraud=False,
fraud_type=None,
policy_id="POL-AUTO-12345",
policy_coverage_limit=50000.0,
policy_deductible=500.0,
policy_status="lapsed",
coverage_exclusions=[],
complexity="simple",
requires_documents=["photos"],
requires_escalation=False,
past_claims_count=2,
past_claims_total=3000.0,
recent_claims_30_days=0,
),
]
CASE_LIBRARY: list[CaseFile] = _build_library()
# =============================================================================
# Backend stubs — one per imaginary upstream system
# =============================================================================
@dataclass
class PolicyRegistryStub:
"""Stand-in for the policy administration system."""
case: CaseFile
def lookup_policy(self) -> dict[str, Any]:
return {
"policy_id": self.case.policy_id,
"policy_status": self.case.policy_status,
"coverage_type": self._coverage_type(),
"coverage_limit": self.case.policy_coverage_limit,
"deductible": self.case.policy_deductible,
"effective_date": "2023-01-01",
"expiration_date": (
"2024-12-31" if self.case.policy_status == "active" else "2024-01-15"
),
}
def _coverage_type(self) -> str:
kind = self.case.claim_type
if kind.startswith("auto"):
return "comprehensive_auto"
if kind.startswith("home"):
return "homeowners_standard"
return "liability_general"
@dataclass
class HistoryLedgerStub:
"""Mart of past claims used to surface claim-frequency signals."""
case: CaseFile
def get_claim_history(self) -> dict[str, Any]:
n = self.case.past_claims_count
return {
"claimant_name": self.case.claimant_name,
"total_past_claims": n,
"total_claimed_amount": self.case.past_claims_total,
"claims_last_30_days": self.case.recent_claims_30_days,
"claims_last_year": n,
"average_claim_amount": self.case.past_claims_total / max(1, n),
"claim_frequency": "high" if n > 3 else "normal",
}
@dataclass
class RiskSignalEngine:
"""Lightweight fraud-risk scorer driven by per-case heuristics.
The score combines a small base rate with feature contributions so the
agent observes a realistic, non-binary signal.
"""
case: CaseFile
BASE_RISK: float = 0.10
RECENT_CLAIMS_WEIGHT: float = 0.20
HIGH_FREQUENCY_WEIGHT: float = 0.15
NEAR_LIMIT_WEIGHT: float = 0.10
FRAUD_PATTERN_WEIGHT: float = 0.40
NOISE_PROBABILITY: float = 0.10
SCORE_CEILING: float = 0.95
def check_fraud_signals(self) -> dict[str, Any]:
flags: list[str] = []
score = self.BASE_RISK
if self.case.recent_claims_30_days > 0:
flags.append("multiple_claims_30_days")
score += self.RECENT_CLAIMS_WEIGHT
if self.case.past_claims_count > 3:
flags.append("high_claim_frequency")
score += self.HIGH_FREQUENCY_WEIGHT
if self.case.claim_amount > self.case.policy_coverage_limit * 0.8:
flags.append("near_coverage_limit")
score += self.NEAR_LIMIT_WEIGHT
if self.case.is_fraud:
flags.append("pattern_match_known_fraud")
score += self.FRAUD_PATTERN_WEIGHT
if self.case.fraud_type == "staged_accident":
flags.append("inconsistent_damage_pattern")
elif self.case.fraud_type == "inflated_claim":
flags.append("claim_amount_anomaly")
elif random.random() < self.NOISE_PROBABILITY:
# Realistic false-positive
flags.append("minor_documentation_gap")
score += 0.05
score = min(self.SCORE_CEILING, score)
return {
"risk_score": round(score, 2),
"flags": flags,
"recommendation": _risk_to_recommendation(score),
"confidence": 0.85 if self.case.is_fraud else 0.75,
}
def _risk_to_recommendation(score: float) -> str:
if score > 0.70:
return "deny_high_risk"
if score > 0.40:
return "manual_review_required"
return "proceed_normal"
@dataclass
class EvidenceVault:
"""Document management front-end.
Each requested document gets a small dossier; missing documents are
flagged so the agent can detect incomplete submissions.
"""
case: CaseFile
def request_documents(self, doc_types: list[str]) -> dict[str, Any]:
results: dict[str, dict[str, Any]] = {}
for doc_type in doc_types:
results[doc_type] = self._evaluate_doc(doc_type)
# Fraud cases sneak in a metadata mismatch on photo evidence
if self.case.is_fraud and "photos" in results:
results["photos"]["notes"] = (
"Photos received but metadata shows inconsistencies."
)
results["photos"]["verified"] = False
return {
"documents": results,
"all_required_received": all(
doc in doc_types for doc in self.case.requires_documents
),
"missing_documents": [
doc for doc in self.case.requires_documents if doc not in doc_types
],
}
def _evaluate_doc(self, doc_type: str) -> dict[str, Any]:
nice_name = doc_type.replace("_", " ").title()
if doc_type in self.case.requires_documents:
return {
"status": "received",
"verified": True,
"notes": f"{nice_name} verified and matches claim.",
}
return {
"status": "not_required",
"verified": False,
"notes": f"{nice_name} not required for this claim type.",
}
@dataclass
class CoverageOracle:
"""Resolves whether a particular damage type is covered."""
case: CaseFile
DAMAGE_MAP: dict[str, list[str]] = field(
default_factory=lambda: {
"auto_collision": ["collision", "vehicle_damage", "property_damage"],
"auto_theft": ["theft", "stolen_vehicle", "stolen_contents"],
"home_water": ["water_damage", "pipe_burst", "plumbing"],
"home_fire": ["fire", "smoke_damage", "structural"],
"liability": ["bodily_injury", "property_damage", "medical"],
}
)
def verify_coverage(self, damage_type: str) -> dict[str, Any]:
if damage_type in self.case.coverage_exclusions:
idx = self.case.coverage_exclusions.index(damage_type) + 1
return {
"damage_type": damage_type,
"is_covered": False,
"reason": f"Excluded by policy: {damage_type}",
"exclusion_clause": f"Section 4.{idx}",
}
catalogue = self.DAMAGE_MAP.get(self.case.claim_type, [])
is_covered = damage_type.lower() in (item.lower() for item in catalogue)
return {
"damage_type": damage_type,
"is_covered": is_covered,
"reason": (
"Covered under policy" if is_covered else "Not covered under this policy type"
),
"coverage_section": "Section 2.1" if is_covered else None,
}
@dataclass
class SettlementMath:
"""Applies deductible and coverage cap to produce a payout figure."""
case: CaseFile
def calculate_payout(self, claimed_amount: float) -> dict[str, Any]:
after_ded = max(0.0, claimed_amount - self.case.policy_deductible)
capped = min(after_ded, self.case.policy_coverage_limit)
final = 0.0 if self.case.policy_status != "active" else capped
return {
"claimed_amount": claimed_amount,
"deductible_applied": self.case.policy_deductible,
"after_deductible": after_ded,
"coverage_limit": self.case.policy_coverage_limit,
"final_payout": final,
"payout_breakdown": {
"base": claimed_amount,
"deductible": -self.case.policy_deductible,
"limit_adjustment": min(
0.0, self.case.policy_coverage_limit - after_ded
),
},
"notes": self._explain(final, after_ded),
}
def _explain(self, final: float, after_ded: float) -> str:
if self.case.policy_status != "active":
return "Policy is not active. No payout eligible."
if final < after_ded:
return (
f"Payout capped at coverage limit of "
f"${self.case.policy_coverage_limit:,.2f}"
)
return "Standard calculation applied."
# =============================================================================
# Selectors
# =============================================================================
def pick_random_case(seed: int | None = None) -> CaseFile:
"""Sample a case at random (optionally seeded for reproducibility)."""
rng = random.Random(seed) if seed is not None else random
return rng.choice(CASE_LIBRARY)
def case_by_id(claim_id: str) -> CaseFile | None:
"""Look up a case by its public claim identifier."""
for case in CASE_LIBRARY:
if case.claim_id == claim_id:
return case
return None
def case_at(index: int) -> CaseFile:
"""Deterministic indexed access (wraps with modulo)."""
return CASE_LIBRARY[index % len(CASE_LIBRARY)]
# =============================================================================
# Backwards-compatible aliases
# =============================================================================
# Older callers used these names; keep them so the public surface area
# does not regress.
ClaimScenario = CaseFile
MockPolicyDB = PolicyRegistryStub
MockClaimsHistoryDB = HistoryLedgerStub
MockFraudAPI = RiskSignalEngine
MockDocumentSystem = EvidenceVault
MockCoverageVerifier = CoverageOracle
MockPayoutCalculator = SettlementMath
CLAIM_SCENARIOS = CASE_LIBRARY
get_random_scenario = pick_random_case
get_scenario_by_id = case_by_id
get_scenario_by_index = case_at
__all__ = [
"CaseFile",
"PolicyRegistryStub",
"HistoryLedgerStub",
"RiskSignalEngine",
"EvidenceVault",
"CoverageOracle",
"SettlementMath",
"CASE_LIBRARY",
"pick_random_case",
"case_by_id",
"case_at",
# legacy
"ClaimScenario",
"MockPolicyDB",
"MockClaimsHistoryDB",
"MockFraudAPI",
"MockDocumentSystem",
"MockCoverageVerifier",
"MockPayoutCalculator",
"CLAIM_SCENARIOS",
"get_random_scenario",
"get_scenario_by_id",
"get_scenario_by_index",
]