Spaces:
Running
Running
| from __future__ import annotations | |
| from copy import deepcopy | |
| from typing import Any, Dict, List, Optional | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import EnvironmentMetadata | |
| from .rubrics import DebateFloorRubric | |
| from .models import ( | |
| ClaimStatus, | |
| InsuranceClaimAction, | |
| InsuranceClaimObservation, | |
| InsuranceClaimState, | |
| ) | |
| from .tasks import ( | |
| ACTION_COSTS, | |
| TASKS, | |
| RuntimeTask, | |
| build_runtime_task, | |
| build_initial_payload, | |
| compute_reward_breakdown, | |
| get_compare_signals, | |
| get_evidence_keyword_hints, | |
| get_task_definition, | |
| ) | |
| from server.calibration_grader import calibration_reward as compute_calibration_reward | |
| from .session_store import record_episode_confidence | |
| # Map Literal confidence levels to float for Brier-score compatibility | |
| _CONFIDENCE_TO_FLOAT = {"HIGH": 0.9, "MED": 0.6, "LOW": 0.3} | |
| # Correct terminal action for each task — used by calibration grader | |
| _TASK_GROUND_TRUTH = { | |
| "clean_claim": "approve_claim", | |
| "contradictory_claim": "deny_claim", | |
| "coordinated_fraud": "escalate_to_human", | |
| "identity_fraud": "deny_claim", | |
| "distribution_shift_claim": "escalate_to_human", | |
| } | |
| class InsuranceClaimEnvironment( | |
| Environment[InsuranceClaimAction, InsuranceClaimObservation, InsuranceClaimState] | |
| ): | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True # NOW ACTUALLY TRUE - session-managed via main.py | |
| def __init__(self): | |
| super().__init__(rubric=DebateFloorRubric()) | |
| self._state = InsuranceClaimState(episode_id=str(uuid4()), step_count=0) | |
| self._payload: Dict[str, Any] = {} | |
| self._action_history: List[Dict[str, Any]] = [] | |
| self._flags_raised: List[str] = [] | |
| self._found_signals: List[str] = [] | |
| self._discovered_signals: List[str] = [] | |
| self._false_flags: int = 0 | |
| self._investigation_targets: List[str] = [] | |
| self._evidence_hits: int = 0 | |
| self._evidence_total: int = 0 | |
| self._exploit_penalty: float = 0.0 | |
| self._request_info_streak: int = 0 | |
| self._last_progress_step: int = 0 | |
| self._runtime_task: RuntimeTask | None = None | |
| self._last_message = "Environment initialized" | |
| self._queried_claims: set[str] = set() | |
| self._visible_linked_claims: list = [] | |
| self._policy_history_checked: bool = False | |
| self._identity_verified: bool = False | |
| self._agent_confidence: Optional[float] = None | |
| self._agent_confidence_str: Optional[str] = None # "HIGH" | "MED" | "LOW" | |
| self._calibration_score: Optional[float] = None # from 3x2 matrix | |
| self._episode_history: List[Dict] = [] # for anti-gaming detection | |
| self._budget_remaining: int = 0 | |
| self._compared_pairs: set[tuple] = set() | |
| self._debate_transcript: Optional[Dict[str, Any]] = None | |
| self._debate_convened: bool = False | |
| self._last_rubric_components: Dict[str, float] = {} | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| task_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> InsuranceClaimObservation: | |
| self._reset_rubric() | |
| if task_id is None: | |
| task_id = kwargs.get("task_id") | |
| selected_task = task_id or "clean_claim" | |
| task = build_runtime_task(selected_task, seed=seed) | |
| self._runtime_task = task | |
| self._payload = build_initial_payload(task) | |
| self._action_history = [] | |
| self._flags_raised = [] | |
| self._found_signals = [] | |
| self._discovered_signals = [] | |
| self._false_flags = 0 | |
| self._investigation_targets = [] | |
| self._evidence_hits = 0 | |
| self._evidence_total = 0 | |
| self._exploit_penalty = 0.0 | |
| self._request_info_streak = 0 | |
| self._last_progress_step = 0 | |
| self._queried_claims = set() | |
| self._visible_linked_claims = deepcopy(self._payload.get("linked_claims", [])) | |
| self._policy_history_checked = False | |
| self._identity_verified = False | |
| self._agent_confidence = None | |
| self._agent_confidence_str = None | |
| self._calibration_score = None | |
| self._budget_remaining = self._payload.get("investigation_budget", 0) | |
| self._compared_pairs = set() | |
| self._debate_transcript = None | |
| self._debate_convened = False | |
| self._last_rubric_components = {} | |
| self._last_message = ( | |
| f"Task '{task.task_id}' loaded (variant={task.variant_id}). Start investigation." | |
| ) | |
| self._state = InsuranceClaimState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| task_id=task.task_id, | |
| claim_id=task.claim_id, | |
| step_number=0, | |
| max_steps=task.max_steps, | |
| status=ClaimStatus.OPEN, | |
| flags_raised=[], | |
| discovered_signals=[], | |
| found_signals=[], | |
| penalty_total=0.0, | |
| done=False, | |
| last_action_error=None, | |
| payout_estimate_inr=None, | |
| final_decision=None, | |
| final_score=0.0, | |
| ) | |
| return self._apply_transform(self._build_observation(message=self._last_message)) | |
| def step( | |
| self, | |
| action: InsuranceClaimAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> InsuranceClaimObservation: | |
| if self._state.task_id == "": | |
| return self.reset(task_id="clean_claim") | |
| if self._state.done: | |
| return self._apply_transform( | |
| self._build_observation( | |
| message="Episode already complete. Call reset() to start a new episode." | |
| ) | |
| ) | |
| self._state.step_count += 1 | |
| self._state.step_number += 1 | |
| self._state.status = ClaimStatus.INVESTIGATING | |
| self._state.last_action_error = None | |
| try: | |
| message = self._apply_action(action) | |
| self._last_message = message | |
| except ValueError as exc: | |
| self._state.last_action_error = str(exc) | |
| self._state.penalty_total += 0.05 | |
| self._last_message = f"Invalid action: {exc}" | |
| self._action_history.append( | |
| { | |
| "step": self._state.step_number, | |
| "action_type": action.action_type, | |
| "parameters": deepcopy(action.parameters), | |
| "reasoning": action.reasoning, | |
| } | |
| ) | |
| if not self._state.done and (self._state.step_number - self._last_progress_step) >= 4: | |
| self._exploit_penalty += 0.01 | |
| if self._state.step_number >= self._state.max_steps and not self._state.done: | |
| self._state.done = True | |
| self._state.status = ClaimStatus.CLOSED | |
| self._last_message = "Max steps reached before final adjudication. Episode closed." | |
| observation = self._build_observation(message=self._last_message) | |
| self._sync_rubric_telemetry(action, observation) | |
| self._state.final_score = float(observation.reward) | |
| return self._apply_transform(observation) | |
| def state(self) -> InsuranceClaimState: | |
| return self._state | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| return EnvironmentMetadata( | |
| name="debatefloor_insurance_calibration_env", | |
| description=( | |
| "OpenEnv insurance claim investigation environment with calibrated " | |
| "confidence rewards and a prosecutor/defender/judge debate panel." | |
| ), | |
| version="0.2.3", | |
| author="Team DebateFloor", | |
| documentation_url="https://github.com/AniketAslaliya/debateFloor", | |
| ) | |
| def _apply_action(self, action: InsuranceClaimAction) -> str: | |
| task = self._runtime_task or build_runtime_task(self._state.task_id) | |
| # Deduct investigation budget; overage adds 0.02 penalty per unit | |
| cost = ACTION_COSTS.get(action.action_type, 1) | |
| self._budget_remaining -= cost | |
| if self._budget_remaining < 0: | |
| self._state.penalty_total += 0.02 # per unit over budget | |
| if action.action_type == "request_information": | |
| self._request_info_streak += 1 | |
| if self._request_info_streak > 2: | |
| self._exploit_penalty += 0.03 | |
| if self._request_info_streak > 1: | |
| self._state.penalty_total += 0.02 | |
| return "Additional information requested. Useful but consumes time and SLA budget." | |
| self._request_info_streak = 0 | |
| if action.action_type == "lookup_policy_history": | |
| task = self._runtime_task or build_runtime_task(self._state.task_id) | |
| if self._policy_history_checked: | |
| # Second lookup is an exploit — no new info | |
| self._exploit_penalty += 0.03 | |
| return "Policy history already retrieved. No new information available." | |
| self._policy_history_checked = True | |
| history = task.policy_history | |
| # For contradictory_claim: looking up history reveals the prior similar claim signal | |
| if task.task_id == "contradictory_claim": | |
| self._record_discovered_signals(["prior_similar_claim"]) | |
| # For identity_fraud: policy_age_days being very low reveals recent_policy_purchase | |
| if task.task_id == "identity_fraud": | |
| if history.get("policy_age_days", 999) <= 30: | |
| self._record_discovered_signals(["recent_policy_purchase"]) | |
| return ( | |
| f"Policy history retrieved: {history['prior_claims']} prior claims. " | |
| f"Customer for {history['years_as_customer']} years. " | |
| f"Policy age: {history['policy_age_days']} days. " | |
| f"Risk score: {history['risk_score']}. Note: {history['note']}" | |
| ) | |
| if action.action_type == "verify_identity": | |
| task = self._runtime_task or build_runtime_task(self._state.task_id) | |
| if task.task_id != "identity_fraud": | |
| raise ValueError("'verify_identity' is only available for the identity_fraud task") | |
| if self._identity_verified: | |
| self._exploit_penalty += 0.03 | |
| return "Identity verification already performed. No new information." | |
| self._identity_verified = True | |
| self._record_discovered_signals(["identity_mismatch", "hospital_no_record"]) | |
| return ( | |
| "Identity verification FAILED. National registry has no record matching " | |
| "claimant name 'Aarav Mehta' with ID suffix 7821. " | |
| "Hospital records show admission under a different name ('Aarav Kumar') with DOB mismatch. " | |
| "KYC status at policy inception: PENDING — identity was never confirmed." | |
| ) | |
| if action.action_type == "compare_documents": | |
| task = self._runtime_task or build_runtime_task(self._state.task_id) | |
| doc_id_a = str(action.parameters.get("doc_id_a", "")).strip() | |
| doc_id_b = str(action.parameters.get("doc_id_b", "")).strip() | |
| if not doc_id_a or not doc_id_b: | |
| raise ValueError("'doc_id_a' and 'doc_id_b' are required for compare_documents") | |
| if doc_id_a == doc_id_b: | |
| raise ValueError("'doc_id_a' and 'doc_id_b' must be different documents") | |
| all_doc_ids = {d["doc_id"] for d in self._payload["documents"]} | |
| for did in (doc_id_a, doc_id_b): | |
| if did not in all_doc_ids: | |
| raise ValueError(f"Unknown doc_id '{did}'") | |
| pair = (doc_id_a, doc_id_b) | |
| pair_rev = (doc_id_b, doc_id_a) | |
| if pair in self._compared_pairs or pair_rev in self._compared_pairs: | |
| self._exploit_penalty += 0.03 | |
| return f"Documents {doc_id_a} and {doc_id_b} were already compared. No new findings." | |
| self._compared_pairs.add(pair) | |
| signals = get_compare_signals(task.task_id, doc_id_a, doc_id_b) | |
| if signals: | |
| self._record_discovered_signals(signals) | |
| return ( | |
| f"Cross-document comparison of {doc_id_a} vs {doc_id_b} revealed " | |
| f"inconsistencies: {', '.join(signals)}." | |
| ) | |
| return f"Cross-document comparison of {doc_id_a} vs {doc_id_b}: documents are consistent." | |
| if action.action_type == "validate_document": | |
| doc_id = str(action.parameters.get("doc_id", "")).strip() | |
| if not doc_id: | |
| raise ValueError("'doc_id' is required for validate_document") | |
| doc = next((d for d in self._payload["documents"] if d.get("doc_id") == doc_id), None) | |
| if doc is None: | |
| raise ValueError(f"Unknown doc_id '{doc_id}'") | |
| discovered = self._discover_signals_from_document(doc_id, task.task_id) | |
| if discovered: | |
| self._record_discovered_signals(discovered) | |
| return f"Validated {doc_id}. Potential inconsistencies detected: {', '.join(discovered)}" | |
| return f"Validated {doc_id}. No direct inconsistency detected." | |
| if action.action_type == "flag_fraud_signal": | |
| flag_id = str(action.parameters.get("flag_id", "")).strip() | |
| evidence = str(action.parameters.get("evidence", "")).strip() | |
| if not flag_id: | |
| raise ValueError("'flag_id' is required for flag_fraud_signal") | |
| if not evidence: | |
| raise ValueError("'evidence' is required for flag_fraud_signal") | |
| if flag_id in self._flags_raised: | |
| self._exploit_penalty += 0.05 | |
| if flag_id not in self._flags_raised: | |
| self._flags_raised.append(flag_id) | |
| self._evidence_total += 1 | |
| if flag_id in task.expected_signals: | |
| if flag_id not in self._discovered_signals: | |
| self._state.penalty_total += 0.08 | |
| self._exploit_penalty += 0.02 | |
| return ( | |
| f"Fraud signal '{flag_id}' was raised before it was discovered. " | |
| "Investigate first, then flag with grounded evidence." | |
| ) | |
| hints = get_evidence_keyword_hints(task.task_id, flag_id) | |
| evidence_lc = evidence.lower() | |
| if not hints or any(h in evidence_lc for h in hints): | |
| self._evidence_hits += 1 | |
| else: | |
| self._exploit_penalty += 0.02 | |
| if flag_id not in self._found_signals: | |
| self._found_signals.append(flag_id) | |
| self._last_progress_step = self._state.step_number | |
| return f"Fraud signal '{flag_id}' logged with evidence." | |
| self._false_flags += 1 | |
| return f"Fraud signal '{flag_id}' logged, but does not match ground-truth indicators." | |
| if action.action_type == "estimate_payout": | |
| amount = action.parameters.get("amount_inr") | |
| if amount is None: | |
| raise ValueError("'amount_inr' is required for estimate_payout") | |
| try: | |
| payout = float(amount) | |
| except (TypeError, ValueError) as exc: | |
| raise ValueError("'amount_inr' must be numeric") from exc | |
| self._state.payout_estimate_inr = payout | |
| return f"Payout estimate set to INR {payout:.2f}." | |
| if action.action_type == "query_linked_claim": | |
| claim_id = str(action.parameters.get("claim_id", "")).strip() | |
| if not claim_id: | |
| raise ValueError("'claim_id' is required for query_linked_claim") | |
| full_linked = self._payload.get("_full_linked_claims", self._payload.get("linked_claims", [])) | |
| match = next((c for c in full_linked if c.get("claim_id") == claim_id), None) | |
| if match is None: | |
| raise ValueError(f"Linked claim '{claim_id}' not found") | |
| # Reveal full detail in the visible linked claims list for this session | |
| already_visible = any( | |
| c.get("claim_id") == claim_id and len(c) > 2 | |
| for c in self._visible_linked_claims | |
| ) | |
| if not already_visible: | |
| self._visible_linked_claims = [ | |
| deepcopy(match) if c.get("claim_id") == claim_id else c | |
| for c in self._visible_linked_claims | |
| ] | |
| self._queried_claims.add(claim_id) | |
| self._last_progress_step = self._state.step_number | |
| # Dynamic ring expansion: after querying 2 existing claims, the 4th | |
| # hidden claim (CLM-GROUP-304) surfaces in linked_claims. | |
| expansion_hint = "" | |
| if len(self._queried_claims) >= 2: | |
| full_linked = self._payload.get("_full_linked_claims", []) | |
| hidden = [ | |
| c for c in full_linked | |
| if c.get("_hidden_until_queries", 0) <= len(self._queried_claims) | |
| and not any(v.get("claim_id") == c["claim_id"] for v in self._visible_linked_claims) | |
| ] | |
| for new_claim in hidden: | |
| stub = {"claim_id": new_claim["claim_id"], "claimant": new_claim["claimant"]} | |
| self._visible_linked_claims.append(stub) | |
| expansion_hint = ( | |
| f" NEW: A previously unknown linked claim {new_claim['claim_id']} " | |
| f"({new_claim['claimant']}) has surfaced. Query it for full details." | |
| ) | |
| # After querying 2+ linked claims, the shared emergency contact becomes detectable. | |
| hint = "" | |
| if len(self._queried_claims) >= 2: | |
| queried_data = [ | |
| c for c in self._visible_linked_claims | |
| if c.get("claim_id") in self._queried_claims and len(c) > 2 | |
| ] | |
| contacts = [c.get("emergency_contact") for c in queried_data if c.get("emergency_contact")] | |
| unique_contacts = set(contacts) | |
| if len(contacts) > 1 and len(unique_contacts) == 1: | |
| # NEW-7 fix: previously this only emitted a hint string but | |
| # never recorded shared_emergency_contact in the discovered | |
| # set, so distribution_shift_claim agents could not safely | |
| # flag the signal (it'd trigger the "raised before | |
| # discovered" penalty). Now we auto-record so cross-claim | |
| # contact-match becomes a first-class discovery — symmetric | |
| # to the broker discovery below. | |
| self._record_discovered_signals(["shared_emergency_contact"]) | |
| hint = ( | |
| f" Cross-claim pattern detected: all queried claims share " | |
| f"emergency_contact={contacts[0]} (shared_emergency_contact signal recorded)." | |
| ) | |
| # Querying CLM-GROUP-304 reveals clustered_policy_broker signal | |
| if match.get("broker_id") and claim_id == "CLM-GROUP-304": | |
| self._record_discovered_signals(["clustered_policy_broker"]) | |
| hint += " All queried claims share broker_id=BRK-441 (clustered_policy_broker signal)." | |
| # NEW-7 fix: broaden broker discovery to distribution_shift_claim | |
| # (CLM-DIST-* linked claims). Once 2+ CLM-DIST-* claims have been | |
| # queried and the current match has a broker_id, the broker cluster | |
| # is observable — symmetric to coordinated_fraud's CLM-GROUP-304 | |
| # special case. Without this, distribution_shift_claim's | |
| # clustered_policy_broker signal was never discoverable. | |
| if ( | |
| match.get("broker_id") | |
| and claim_id.startswith("CLM-DIST-") | |
| and len(self._queried_claims) >= 2 | |
| ): | |
| self._record_discovered_signals(["clustered_policy_broker"]) | |
| hint += ( | |
| f" All queried CLM-DIST-* claims share broker_id={match['broker_id']} " | |
| "(clustered_policy_broker signal recorded)." | |
| ) | |
| return f"Linked claim detail retrieved for {claim_id}: {match}{hint}{expansion_hint}" | |
| if action.action_type in { | |
| "approve_claim", "deny_claim", "request_investigation", "escalate_to_human" | |
| }: | |
| # Normalise escalate_to_human → request_investigation for legacy grader | |
| canonical_decision = ( | |
| "request_investigation" | |
| if action.action_type == "escalate_to_human" | |
| else action.action_type | |
| ) | |
| self._state.final_decision = canonical_decision | |
| self._state.done = True | |
| self._state.status = ClaimStatus.DECIDED | |
| # Capture Literal confidence and convert for Brier-score compatibility | |
| if action.confidence is not None: | |
| conf_str = str(action.confidence) | |
| self._agent_confidence_str = conf_str | |
| self._agent_confidence = _CONFIDENCE_TO_FLOAT.get(conf_str) | |
| # Compute DebateFloor calibration reward via 3x2 matrix | |
| ground_truth = _TASK_GROUND_TRUTH.get(self._state.task_id, "deny_claim") | |
| # Map escalate_to_human ground truth to canonical for comparison | |
| effective_decision = action.action_type | |
| effective_ground_truth = ( | |
| "escalate_to_human" | |
| if ground_truth == "request_investigation" | |
| else ground_truth | |
| ) | |
| # HIGH-2 fix: use the global cross-session history so anti-gaming | |
| # detection actually fires during concurrent GRPO rollouts. The | |
| # per-instance _episode_history is kept only for per-session debug. | |
| global_history = record_episode_confidence(conf_str) | |
| self._calibration_score = compute_calibration_reward( | |
| effective_decision, conf_str, effective_ground_truth, | |
| global_history, | |
| ) | |
| self._episode_history.append({"confidence": conf_str}) | |
| if canonical_decision == "request_investigation": | |
| targets = action.parameters.get("target_claim_ids", []) | |
| if isinstance(targets, list): | |
| self._investigation_targets = [str(t) for t in targets] | |
| else: | |
| raise ValueError("'target_claim_ids' must be a list for request_investigation") | |
| reason = str(action.parameters.get("reason", "")).strip() | |
| if not reason and action.action_type not in {"approve_claim", "escalate_to_human"}: | |
| self._state.penalty_total += 0.03 | |
| self._state.status = ClaimStatus.CLOSED | |
| return f"Final decision submitted: {action.action_type}." | |
| if action.action_type == "query_historical_data": | |
| # Alias for lookup_policy_history — used by distribution_shift_claim task | |
| if self._policy_history_checked: | |
| self._exploit_penalty += 0.03 | |
| return "Historical data already retrieved. No new information available." | |
| self._policy_history_checked = True | |
| task = self._runtime_task or build_runtime_task(self._state.task_id) | |
| if task.task_id in {"contradictory_claim", "distribution_shift_claim"}: | |
| self._record_discovered_signals(["prior_similar_claim"]) | |
| if task.task_id == "identity_fraud": | |
| history = task.policy_history | |
| if history.get("policy_age_days", 999) <= 30: | |
| self._record_discovered_signals(["recent_policy_purchase"]) | |
| return ( | |
| "Historical data retrieved. Cross-claim patterns and policy history available. " | |
| "Prior claim activity and linked policy data surfaced." | |
| ) | |
| if action.action_type == "verify_provider_registration": | |
| task = self._runtime_task or build_runtime_task(self._state.task_id) | |
| if task.task_id not in {"distribution_shift_claim"}: | |
| raise ValueError("'verify_provider_registration' is only available for distribution_shift_claim") | |
| self._record_discovered_signals(["unregistered_provider", "invalid_gst_registration"]) | |
| return "Provider registration check: hospital not found in IRDAI registry. GST number invalid." | |
| if action.action_type == "convene_debate_panel": | |
| if self._debate_convened: | |
| self._exploit_penalty += 0.03 | |
| return "Debate panel already convened this episode. Proceed to terminal decision." | |
| self._debate_convened = True | |
| self._debate_transcript = self._generate_debate_transcript() | |
| self._last_progress_step = self._state.step_number | |
| return ( | |
| f"Debate panel convened. " | |
| f"Prosecutor: {self._debate_transcript['prosecutor_argument'][:80]}... " | |
| f"Defender: {self._debate_transcript['defender_argument'][:80]}... " | |
| f"Panel verdict: {self._debate_transcript['panel_verdict']}. " | |
| "Review transcript in observation.debate_transcript, then make your final decision." | |
| ) | |
| raise ValueError(f"Unsupported action_type '{action.action_type}'") | |
| def _generate_debate_transcript(self) -> Dict[str, Any]: | |
| """Generate a structured prosecutor vs defender debate based on investigation state.""" | |
| task = self._runtime_task | |
| found = self._found_signals | |
| discovered = self._discovered_signals | |
| claimant_name = self._payload.get("claimant", {}).get("name", "the claimant") | |
| incident_type = self._payload.get("incident", {}).get("type", "the incident") | |
| # Prosecutor builds case from discovered and flagged signals | |
| if found: | |
| fraud_signals_str = ", ".join(found) | |
| prosecutor = ( | |
| f"PROSECUTOR: The evidence strongly suggests fraud. " | |
| f"Investigation has uncovered {len(found)} fraud signal(s): {fraud_signals_str}. " | |
| f"These signals are consistent with {task.task_id.replace('_', ' ')} fraud patterns. " | |
| f"I recommend denial or escalation — approving this claim would reward deliberate deception." | |
| ) | |
| prosecutor_strength = "STRONG" if len(found) >= 2 else "MODERATE" | |
| elif discovered: | |
| prosecutor = ( | |
| f"PROSECUTOR: Suspicious indicators have been discovered: {', '.join(discovered)}. " | |
| f"While not yet formally flagged, these anomalies warrant serious scrutiny. " | |
| f"The claim by {claimant_name} regarding {incident_type} shows red flags." | |
| ) | |
| prosecutor_strength = "WEAK" | |
| else: | |
| prosecutor = ( | |
| f"PROSECUTOR: No fraud signals have been found yet, but the investigation " | |
| f"may be incomplete. More documents should be validated before approval. " | |
| f"Insufficient investigation is itself a risk." | |
| ) | |
| prosecutor_strength = "INSUFFICIENT" | |
| # Defender builds case from clean documents and policy context | |
| doc_count = len(self._payload.get("documents", [])) | |
| policy_age = self._payload.get("_policy_history", {}).get("policy_age_days", 0) | |
| if task and task.task_id == "clean_claim": | |
| defender = ( | |
| f"DEFENDER: All {doc_count} documents are internally consistent. " | |
| f"Claimant {claimant_name} has a clean policy history. " | |
| f"No fraud indicators found. This is a legitimate claim — denial would be unjust." | |
| ) | |
| defender_strength = "STRONG" | |
| elif found and len(found) >= len(task.expected_signals if task else []) * 0.6: | |
| defender = ( | |
| f"DEFENDER: While anomalies exist, the core claim documentation ({doc_count} docs) " | |
| f"has not been fully discredited. Some apparent inconsistencies may have innocent explanations. " | |
| f"Burden of proof requires clear evidence, not suspicion." | |
| ) | |
| defender_strength = "WEAK" | |
| else: | |
| defender = ( | |
| f"DEFENDER: The claim has {doc_count} supporting documents submitted on time. " | |
| f"Without confirmed fraud signals, denial would expose the insurer to legal challenge. " | |
| f"Claimant {claimant_name} deserves due process. Standard processing is warranted." | |
| ) | |
| defender_strength = "MODERATE" | |
| # Panel verdict: which side has stronger case | |
| strength_rank = {"STRONG": 3, "MODERATE": 2, "WEAK": 1, "INSUFFICIENT": 0} | |
| p_rank = strength_rank.get(prosecutor_strength, 0) | |
| d_rank = strength_rank.get(defender_strength, 0) | |
| if p_rank > d_rank: | |
| verdict = f"Panel leans PROSECUTION ({prosecutor_strength} case). Recommended action: deny_claim or escalate_to_human." | |
| lean = "prosecution" | |
| elif d_rank > p_rank: | |
| verdict = f"Panel leans DEFENSE ({defender_strength} case). Recommended action: approve_claim." | |
| lean = "defense" | |
| else: | |
| verdict = "Panel is SPLIT — both sides have comparable arguments. Judge must use independent judgment and declare LOW confidence." | |
| lean = "split" | |
| return { | |
| "prosecutor_argument": prosecutor, | |
| "prosecutor_strength": prosecutor_strength, | |
| "defender_argument": defender, | |
| "defender_strength": defender_strength, | |
| "panel_verdict": verdict, | |
| "panel_lean": lean, | |
| "signals_at_debate": list(found), | |
| "step_convened": self._state.step_number, | |
| } | |
| def _discover_signals_from_document(self, doc_id: str, task_id: str) -> List[str]: | |
| if task_id == "clean_claim": | |
| return [] | |
| mapping: Dict[str, Dict[str, List[str]]] = { | |
| "contradictory_claim": { | |
| "DOC-10": ["date_mismatch"], | |
| "DOC-11": ["date_mismatch"], | |
| "DOC-12": ["cost_inflation"], | |
| "DOC-13": ["signature_mismatch"], | |
| }, | |
| "coordinated_fraud": { | |
| "DOC-21": ["shared_repair_shop_far"], | |
| "DOC-22": ["near_identical_descriptions"], | |
| "DOC-23": ["recent_policy_cluster"], | |
| }, | |
| "identity_fraud": { | |
| "DOC-31": ["identity_mismatch"], | |
| "DOC-32": ["hospital_no_record"], | |
| # DOC-33 (policy_inception) does NOT reveal recent_policy_purchase here; | |
| # that signal is only discoverable via lookup_policy_history. | |
| "DOC-34": ["dob_inconsistency"], | |
| }, | |
| # NEW-7 fix: distribution_shift_claim previously had NO doc-level | |
| # discovery path for any expected_signal. validate_document(...) for | |
| # DOC-41/42/43 returned [], so the only way an honest agent could | |
| # avoid the "raised before discovered" penalty was to skip flagging | |
| # entirely (capping evidence_quality at 0.0 for the task). The | |
| # mapping below mirrors coordinated_fraud: | |
| # DOC-41 (claim_form, declared_cost + claim_date metadata) → | |
| # surfaces recent_policy_cluster (the form's metadata is what | |
| # lets a reviewer notice the recent-policy-window indicator). | |
| # DOC-42 (garage_estimate, "FastRepair Hub Whitefield") → | |
| # surfaces shared_repair_shop_far (the shop name is the | |
| # evidence anchor for the geographic ring indicator). | |
| # DOC-43 (police_report) reveals nothing direct; cross-claim only. | |
| # shared_emergency_contact + clustered_policy_broker are still | |
| # discovered via query_linked_claim (see _apply_action below). | |
| "distribution_shift_claim": { | |
| "DOC-41": ["recent_policy_cluster"], | |
| "DOC-42": ["shared_repair_shop_far"], | |
| }, | |
| } | |
| signal_map = mapping.get(task_id, {}) | |
| signals = list(signal_map.get(doc_id, [])) | |
| # NOTE: shared_emergency_contact is NOT discoverable from primary documents. | |
| # It can only be found by calling query_linked_claim on at least 2 linked claims, | |
| # then flag_fraud_signal with evidence from the queried data. This enforces | |
| # genuine multi-hop reasoning rather than single-step observation reading. | |
| # Keep signal order deterministic and unique. | |
| seen: set[str] = set() | |
| unique_signals: List[str] = [] | |
| for signal in signals: | |
| if signal not in seen: | |
| seen.add(signal) | |
| unique_signals.append(signal) | |
| return unique_signals | |
| def _record_discovered_signals(self, signals: List[str]) -> None: | |
| progressed = False | |
| for signal in signals: | |
| if signal not in self._discovered_signals: | |
| self._discovered_signals.append(signal) | |
| progressed = True | |
| if signal not in self._found_signals: | |
| self._found_signals.append(signal) | |
| if progressed: | |
| self._last_progress_step = self._state.step_number | |
| def _build_observation(self, message: str) -> InsuranceClaimObservation: | |
| task = self._runtime_task or build_runtime_task(self._state.task_id) | |
| self._state.flags_raised = deepcopy(self._flags_raised) | |
| self._state.discovered_signals = deepcopy(self._discovered_signals) | |
| self._state.found_signals = deepcopy(self._found_signals) | |
| if self._state.step_number == 0: | |
| # No actions taken yet — reward must be 0.0 so the trajectory is meaningful | |
| evidence_quality_score = 0.0 | |
| elif len(task.expected_signals) == 0: | |
| evidence_quality_score = 1.0 if self._false_flags == 0 else 0.0 | |
| else: | |
| evidence_quality_score = ( | |
| float(self._evidence_hits) / float(self._evidence_total) | |
| if self._evidence_total > 0 | |
| else 0.0 | |
| ) | |
| reward_breakdown = compute_reward_breakdown( | |
| task_id=task.task_id, | |
| expected_signals=task.expected_signals, | |
| found_signals=self._found_signals, | |
| false_flags=self._false_flags, | |
| step_number=self._state.step_number, | |
| max_steps=self._state.max_steps, | |
| final_decision=self._state.final_decision, | |
| allowed_decisions=task.allowed_final_decisions, | |
| payout_estimate_inr=self._state.payout_estimate_inr, | |
| payout_band=task.payout_band, | |
| investigation_targets=self._investigation_targets, | |
| evidence_quality_score=evidence_quality_score, | |
| exploit_penalty=min(self._exploit_penalty, 0.5), | |
| penalty_total=self._state.penalty_total, | |
| queried_claims=self._queried_claims, | |
| agent_confidence=self._agent_confidence, | |
| ground_truth_confidence=task.ground_truth_confidence, | |
| calibration_override=self._calibration_score, | |
| ) | |
| return InsuranceClaimObservation( | |
| claim_id=self._payload["claim_id"], | |
| task_id=self._payload["task_id"], | |
| claimant=deepcopy(self._payload["claimant"]), | |
| incident=deepcopy(self._payload["incident"]), | |
| documents=deepcopy(self._payload["documents"]), | |
| linked_claims=deepcopy(self._visible_linked_claims), | |
| action_history=deepcopy(self._action_history), | |
| available_actions=deepcopy(self._payload["available_actions"]), | |
| step_number=self._state.step_number, | |
| max_steps=self._state.max_steps, | |
| investigation_budget=self._payload.get("investigation_budget", 0), | |
| budget_remaining=self._budget_remaining, | |
| flags_raised=deepcopy(self._flags_raised), | |
| discovered_signals=deepcopy(self._discovered_signals), | |
| status=self._state.status, | |
| message=message, | |
| confidence_required=True, | |
| done=self._state.done, | |
| reward=reward_breakdown.total, | |
| rubric_reward=0.0, | |
| rubric_components={}, | |
| metadata={ | |
| "last_action_error": self._state.last_action_error, | |
| "investigation_targets": self._investigation_targets, | |
| "variant_id": self._payload.get("variant_id", 0), | |
| "evidence_hits": self._evidence_hits, | |
| "evidence_total": self._evidence_total, | |
| "exploit_penalty": round(self._exploit_penalty, 4), | |
| "policy_history_checked": self._policy_history_checked, | |
| "identity_verified": self._identity_verified, | |
| "agent_confidence": self._agent_confidence_str, | |
| "calibration_score": self._calibration_score, | |
| "budget_remaining": self._budget_remaining, | |
| "discovered_signals": deepcopy(self._discovered_signals), | |
| "compared_pairs": [list(p) for p in self._compared_pairs], | |
| }, | |
| reward_breakdown=reward_breakdown, | |
| debate_transcript=deepcopy(self._debate_transcript), | |
| ) | |
| def _sync_rubric_telemetry( | |
| self, | |
| action: InsuranceClaimAction, | |
| observation: InsuranceClaimObservation, | |
| ) -> None: | |
| rubric_reward = self._apply_rubric(action, observation) | |
| observation.rubric_reward = float(rubric_reward) | |
| if self.rubric is not None and hasattr(self.rubric, "component_scores"): | |
| component_scores = self.rubric.component_scores() | |
| observation.rubric_components = dict(component_scores) | |
| self._last_rubric_components = dict(component_scores) | |
| observation.metadata["rubric_components"] = dict(component_scores) | |
| else: | |
| self._last_rubric_components = {} | |
| observation.metadata["rubric_components"] = {} | |
| def available_task_ids() -> List[str]: | |
| return list(TASKS.keys()) | |