debatefloor / app /environment.py
AniketAsla's picture
sync: mirror git d05fcb5 to Space
b4ac377 verified
from __future__ import annotations
from copy import deepcopy
from typing import Any, Dict, List, Optional
from uuid import uuid4
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import EnvironmentMetadata
from .rubrics import DebateFloorRubric
from .models import (
ClaimStatus,
InsuranceClaimAction,
InsuranceClaimObservation,
InsuranceClaimState,
)
from .tasks import (
ACTION_COSTS,
TASKS,
RuntimeTask,
build_runtime_task,
build_initial_payload,
compute_reward_breakdown,
get_compare_signals,
get_evidence_keyword_hints,
get_task_definition,
)
from server.calibration_grader import calibration_reward as compute_calibration_reward
from .session_store import record_episode_confidence
# Map Literal confidence levels to float for Brier-score compatibility
_CONFIDENCE_TO_FLOAT = {"HIGH": 0.9, "MED": 0.6, "LOW": 0.3}
# Correct terminal action for each task — used by calibration grader
_TASK_GROUND_TRUTH = {
"clean_claim": "approve_claim",
"contradictory_claim": "deny_claim",
"coordinated_fraud": "escalate_to_human",
"identity_fraud": "deny_claim",
"distribution_shift_claim": "escalate_to_human",
}
class InsuranceClaimEnvironment(
Environment[InsuranceClaimAction, InsuranceClaimObservation, InsuranceClaimState]
):
SUPPORTS_CONCURRENT_SESSIONS: bool = True # NOW ACTUALLY TRUE - session-managed via main.py
def __init__(self):
super().__init__(rubric=DebateFloorRubric())
self._state = InsuranceClaimState(episode_id=str(uuid4()), step_count=0)
self._payload: Dict[str, Any] = {}
self._action_history: List[Dict[str, Any]] = []
self._flags_raised: List[str] = []
self._found_signals: List[str] = []
self._discovered_signals: List[str] = []
self._false_flags: int = 0
self._investigation_targets: List[str] = []
self._evidence_hits: int = 0
self._evidence_total: int = 0
self._exploit_penalty: float = 0.0
self._request_info_streak: int = 0
self._last_progress_step: int = 0
self._runtime_task: RuntimeTask | None = None
self._last_message = "Environment initialized"
self._queried_claims: set[str] = set()
self._visible_linked_claims: list = []
self._policy_history_checked: bool = False
self._identity_verified: bool = False
self._agent_confidence: Optional[float] = None
self._agent_confidence_str: Optional[str] = None # "HIGH" | "MED" | "LOW"
self._calibration_score: Optional[float] = None # from 3x2 matrix
self._episode_history: List[Dict] = [] # for anti-gaming detection
self._budget_remaining: int = 0
self._compared_pairs: set[tuple] = set()
self._debate_transcript: Optional[Dict[str, Any]] = None
self._debate_convened: bool = False
self._last_rubric_components: Dict[str, float] = {}
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
task_id: Optional[str] = None,
**kwargs: Any,
) -> InsuranceClaimObservation:
self._reset_rubric()
if task_id is None:
task_id = kwargs.get("task_id")
selected_task = task_id or "clean_claim"
task = build_runtime_task(selected_task, seed=seed)
self._runtime_task = task
self._payload = build_initial_payload(task)
self._action_history = []
self._flags_raised = []
self._found_signals = []
self._discovered_signals = []
self._false_flags = 0
self._investigation_targets = []
self._evidence_hits = 0
self._evidence_total = 0
self._exploit_penalty = 0.0
self._request_info_streak = 0
self._last_progress_step = 0
self._queried_claims = set()
self._visible_linked_claims = deepcopy(self._payload.get("linked_claims", []))
self._policy_history_checked = False
self._identity_verified = False
self._agent_confidence = None
self._agent_confidence_str = None
self._calibration_score = None
self._budget_remaining = self._payload.get("investigation_budget", 0)
self._compared_pairs = set()
self._debate_transcript = None
self._debate_convened = False
self._last_rubric_components = {}
self._last_message = (
f"Task '{task.task_id}' loaded (variant={task.variant_id}). Start investigation."
)
self._state = InsuranceClaimState(
episode_id=episode_id or str(uuid4()),
step_count=0,
task_id=task.task_id,
claim_id=task.claim_id,
step_number=0,
max_steps=task.max_steps,
status=ClaimStatus.OPEN,
flags_raised=[],
discovered_signals=[],
found_signals=[],
penalty_total=0.0,
done=False,
last_action_error=None,
payout_estimate_inr=None,
final_decision=None,
final_score=0.0,
)
return self._apply_transform(self._build_observation(message=self._last_message))
def step(
self,
action: InsuranceClaimAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> InsuranceClaimObservation:
if self._state.task_id == "":
return self.reset(task_id="clean_claim")
if self._state.done:
return self._apply_transform(
self._build_observation(
message="Episode already complete. Call reset() to start a new episode."
)
)
self._state.step_count += 1
self._state.step_number += 1
self._state.status = ClaimStatus.INVESTIGATING
self._state.last_action_error = None
try:
message = self._apply_action(action)
self._last_message = message
except ValueError as exc:
self._state.last_action_error = str(exc)
self._state.penalty_total += 0.05
self._last_message = f"Invalid action: {exc}"
self._action_history.append(
{
"step": self._state.step_number,
"action_type": action.action_type,
"parameters": deepcopy(action.parameters),
"reasoning": action.reasoning,
}
)
if not self._state.done and (self._state.step_number - self._last_progress_step) >= 4:
self._exploit_penalty += 0.01
if self._state.step_number >= self._state.max_steps and not self._state.done:
self._state.done = True
self._state.status = ClaimStatus.CLOSED
self._last_message = "Max steps reached before final adjudication. Episode closed."
observation = self._build_observation(message=self._last_message)
self._sync_rubric_telemetry(action, observation)
self._state.final_score = float(observation.reward)
return self._apply_transform(observation)
@property
def state(self) -> InsuranceClaimState:
return self._state
def get_metadata(self) -> EnvironmentMetadata:
return EnvironmentMetadata(
name="debatefloor_insurance_calibration_env",
description=(
"OpenEnv insurance claim investigation environment with calibrated "
"confidence rewards and a prosecutor/defender/judge debate panel."
),
version="0.2.3",
author="Team DebateFloor",
documentation_url="https://github.com/AniketAslaliya/debateFloor",
)
def _apply_action(self, action: InsuranceClaimAction) -> str:
task = self._runtime_task or build_runtime_task(self._state.task_id)
# Deduct investigation budget; overage adds 0.02 penalty per unit
cost = ACTION_COSTS.get(action.action_type, 1)
self._budget_remaining -= cost
if self._budget_remaining < 0:
self._state.penalty_total += 0.02 # per unit over budget
if action.action_type == "request_information":
self._request_info_streak += 1
if self._request_info_streak > 2:
self._exploit_penalty += 0.03
if self._request_info_streak > 1:
self._state.penalty_total += 0.02
return "Additional information requested. Useful but consumes time and SLA budget."
self._request_info_streak = 0
if action.action_type == "lookup_policy_history":
task = self._runtime_task or build_runtime_task(self._state.task_id)
if self._policy_history_checked:
# Second lookup is an exploit — no new info
self._exploit_penalty += 0.03
return "Policy history already retrieved. No new information available."
self._policy_history_checked = True
history = task.policy_history
# For contradictory_claim: looking up history reveals the prior similar claim signal
if task.task_id == "contradictory_claim":
self._record_discovered_signals(["prior_similar_claim"])
# For identity_fraud: policy_age_days being very low reveals recent_policy_purchase
if task.task_id == "identity_fraud":
if history.get("policy_age_days", 999) <= 30:
self._record_discovered_signals(["recent_policy_purchase"])
return (
f"Policy history retrieved: {history['prior_claims']} prior claims. "
f"Customer for {history['years_as_customer']} years. "
f"Policy age: {history['policy_age_days']} days. "
f"Risk score: {history['risk_score']}. Note: {history['note']}"
)
if action.action_type == "verify_identity":
task = self._runtime_task or build_runtime_task(self._state.task_id)
if task.task_id != "identity_fraud":
raise ValueError("'verify_identity' is only available for the identity_fraud task")
if self._identity_verified:
self._exploit_penalty += 0.03
return "Identity verification already performed. No new information."
self._identity_verified = True
self._record_discovered_signals(["identity_mismatch", "hospital_no_record"])
return (
"Identity verification FAILED. National registry has no record matching "
"claimant name 'Aarav Mehta' with ID suffix 7821. "
"Hospital records show admission under a different name ('Aarav Kumar') with DOB mismatch. "
"KYC status at policy inception: PENDING — identity was never confirmed."
)
if action.action_type == "compare_documents":
task = self._runtime_task or build_runtime_task(self._state.task_id)
doc_id_a = str(action.parameters.get("doc_id_a", "")).strip()
doc_id_b = str(action.parameters.get("doc_id_b", "")).strip()
if not doc_id_a or not doc_id_b:
raise ValueError("'doc_id_a' and 'doc_id_b' are required for compare_documents")
if doc_id_a == doc_id_b:
raise ValueError("'doc_id_a' and 'doc_id_b' must be different documents")
all_doc_ids = {d["doc_id"] for d in self._payload["documents"]}
for did in (doc_id_a, doc_id_b):
if did not in all_doc_ids:
raise ValueError(f"Unknown doc_id '{did}'")
pair = (doc_id_a, doc_id_b)
pair_rev = (doc_id_b, doc_id_a)
if pair in self._compared_pairs or pair_rev in self._compared_pairs:
self._exploit_penalty += 0.03
return f"Documents {doc_id_a} and {doc_id_b} were already compared. No new findings."
self._compared_pairs.add(pair)
signals = get_compare_signals(task.task_id, doc_id_a, doc_id_b)
if signals:
self._record_discovered_signals(signals)
return (
f"Cross-document comparison of {doc_id_a} vs {doc_id_b} revealed "
f"inconsistencies: {', '.join(signals)}."
)
return f"Cross-document comparison of {doc_id_a} vs {doc_id_b}: documents are consistent."
if action.action_type == "validate_document":
doc_id = str(action.parameters.get("doc_id", "")).strip()
if not doc_id:
raise ValueError("'doc_id' is required for validate_document")
doc = next((d for d in self._payload["documents"] if d.get("doc_id") == doc_id), None)
if doc is None:
raise ValueError(f"Unknown doc_id '{doc_id}'")
discovered = self._discover_signals_from_document(doc_id, task.task_id)
if discovered:
self._record_discovered_signals(discovered)
return f"Validated {doc_id}. Potential inconsistencies detected: {', '.join(discovered)}"
return f"Validated {doc_id}. No direct inconsistency detected."
if action.action_type == "flag_fraud_signal":
flag_id = str(action.parameters.get("flag_id", "")).strip()
evidence = str(action.parameters.get("evidence", "")).strip()
if not flag_id:
raise ValueError("'flag_id' is required for flag_fraud_signal")
if not evidence:
raise ValueError("'evidence' is required for flag_fraud_signal")
if flag_id in self._flags_raised:
self._exploit_penalty += 0.05
if flag_id not in self._flags_raised:
self._flags_raised.append(flag_id)
self._evidence_total += 1
if flag_id in task.expected_signals:
if flag_id not in self._discovered_signals:
self._state.penalty_total += 0.08
self._exploit_penalty += 0.02
return (
f"Fraud signal '{flag_id}' was raised before it was discovered. "
"Investigate first, then flag with grounded evidence."
)
hints = get_evidence_keyword_hints(task.task_id, flag_id)
evidence_lc = evidence.lower()
if not hints or any(h in evidence_lc for h in hints):
self._evidence_hits += 1
else:
self._exploit_penalty += 0.02
if flag_id not in self._found_signals:
self._found_signals.append(flag_id)
self._last_progress_step = self._state.step_number
return f"Fraud signal '{flag_id}' logged with evidence."
self._false_flags += 1
return f"Fraud signal '{flag_id}' logged, but does not match ground-truth indicators."
if action.action_type == "estimate_payout":
amount = action.parameters.get("amount_inr")
if amount is None:
raise ValueError("'amount_inr' is required for estimate_payout")
try:
payout = float(amount)
except (TypeError, ValueError) as exc:
raise ValueError("'amount_inr' must be numeric") from exc
self._state.payout_estimate_inr = payout
return f"Payout estimate set to INR {payout:.2f}."
if action.action_type == "query_linked_claim":
claim_id = str(action.parameters.get("claim_id", "")).strip()
if not claim_id:
raise ValueError("'claim_id' is required for query_linked_claim")
full_linked = self._payload.get("_full_linked_claims", self._payload.get("linked_claims", []))
match = next((c for c in full_linked if c.get("claim_id") == claim_id), None)
if match is None:
raise ValueError(f"Linked claim '{claim_id}' not found")
# Reveal full detail in the visible linked claims list for this session
already_visible = any(
c.get("claim_id") == claim_id and len(c) > 2
for c in self._visible_linked_claims
)
if not already_visible:
self._visible_linked_claims = [
deepcopy(match) if c.get("claim_id") == claim_id else c
for c in self._visible_linked_claims
]
self._queried_claims.add(claim_id)
self._last_progress_step = self._state.step_number
# Dynamic ring expansion: after querying 2 existing claims, the 4th
# hidden claim (CLM-GROUP-304) surfaces in linked_claims.
expansion_hint = ""
if len(self._queried_claims) >= 2:
full_linked = self._payload.get("_full_linked_claims", [])
hidden = [
c for c in full_linked
if c.get("_hidden_until_queries", 0) <= len(self._queried_claims)
and not any(v.get("claim_id") == c["claim_id"] for v in self._visible_linked_claims)
]
for new_claim in hidden:
stub = {"claim_id": new_claim["claim_id"], "claimant": new_claim["claimant"]}
self._visible_linked_claims.append(stub)
expansion_hint = (
f" NEW: A previously unknown linked claim {new_claim['claim_id']} "
f"({new_claim['claimant']}) has surfaced. Query it for full details."
)
# After querying 2+ linked claims, the shared emergency contact becomes detectable.
hint = ""
if len(self._queried_claims) >= 2:
queried_data = [
c for c in self._visible_linked_claims
if c.get("claim_id") in self._queried_claims and len(c) > 2
]
contacts = [c.get("emergency_contact") for c in queried_data if c.get("emergency_contact")]
unique_contacts = set(contacts)
if len(contacts) > 1 and len(unique_contacts) == 1:
# NEW-7 fix: previously this only emitted a hint string but
# never recorded shared_emergency_contact in the discovered
# set, so distribution_shift_claim agents could not safely
# flag the signal (it'd trigger the "raised before
# discovered" penalty). Now we auto-record so cross-claim
# contact-match becomes a first-class discovery — symmetric
# to the broker discovery below.
self._record_discovered_signals(["shared_emergency_contact"])
hint = (
f" Cross-claim pattern detected: all queried claims share "
f"emergency_contact={contacts[0]} (shared_emergency_contact signal recorded)."
)
# Querying CLM-GROUP-304 reveals clustered_policy_broker signal
if match.get("broker_id") and claim_id == "CLM-GROUP-304":
self._record_discovered_signals(["clustered_policy_broker"])
hint += " All queried claims share broker_id=BRK-441 (clustered_policy_broker signal)."
# NEW-7 fix: broaden broker discovery to distribution_shift_claim
# (CLM-DIST-* linked claims). Once 2+ CLM-DIST-* claims have been
# queried and the current match has a broker_id, the broker cluster
# is observable — symmetric to coordinated_fraud's CLM-GROUP-304
# special case. Without this, distribution_shift_claim's
# clustered_policy_broker signal was never discoverable.
if (
match.get("broker_id")
and claim_id.startswith("CLM-DIST-")
and len(self._queried_claims) >= 2
):
self._record_discovered_signals(["clustered_policy_broker"])
hint += (
f" All queried CLM-DIST-* claims share broker_id={match['broker_id']} "
"(clustered_policy_broker signal recorded)."
)
return f"Linked claim detail retrieved for {claim_id}: {match}{hint}{expansion_hint}"
if action.action_type in {
"approve_claim", "deny_claim", "request_investigation", "escalate_to_human"
}:
# Normalise escalate_to_human → request_investigation for legacy grader
canonical_decision = (
"request_investigation"
if action.action_type == "escalate_to_human"
else action.action_type
)
self._state.final_decision = canonical_decision
self._state.done = True
self._state.status = ClaimStatus.DECIDED
# Capture Literal confidence and convert for Brier-score compatibility
if action.confidence is not None:
conf_str = str(action.confidence)
self._agent_confidence_str = conf_str
self._agent_confidence = _CONFIDENCE_TO_FLOAT.get(conf_str)
# Compute DebateFloor calibration reward via 3x2 matrix
ground_truth = _TASK_GROUND_TRUTH.get(self._state.task_id, "deny_claim")
# Map escalate_to_human ground truth to canonical for comparison
effective_decision = action.action_type
effective_ground_truth = (
"escalate_to_human"
if ground_truth == "request_investigation"
else ground_truth
)
# HIGH-2 fix: use the global cross-session history so anti-gaming
# detection actually fires during concurrent GRPO rollouts. The
# per-instance _episode_history is kept only for per-session debug.
global_history = record_episode_confidence(conf_str)
self._calibration_score = compute_calibration_reward(
effective_decision, conf_str, effective_ground_truth,
global_history,
)
self._episode_history.append({"confidence": conf_str})
if canonical_decision == "request_investigation":
targets = action.parameters.get("target_claim_ids", [])
if isinstance(targets, list):
self._investigation_targets = [str(t) for t in targets]
else:
raise ValueError("'target_claim_ids' must be a list for request_investigation")
reason = str(action.parameters.get("reason", "")).strip()
if not reason and action.action_type not in {"approve_claim", "escalate_to_human"}:
self._state.penalty_total += 0.03
self._state.status = ClaimStatus.CLOSED
return f"Final decision submitted: {action.action_type}."
if action.action_type == "query_historical_data":
# Alias for lookup_policy_history — used by distribution_shift_claim task
if self._policy_history_checked:
self._exploit_penalty += 0.03
return "Historical data already retrieved. No new information available."
self._policy_history_checked = True
task = self._runtime_task or build_runtime_task(self._state.task_id)
if task.task_id in {"contradictory_claim", "distribution_shift_claim"}:
self._record_discovered_signals(["prior_similar_claim"])
if task.task_id == "identity_fraud":
history = task.policy_history
if history.get("policy_age_days", 999) <= 30:
self._record_discovered_signals(["recent_policy_purchase"])
return (
"Historical data retrieved. Cross-claim patterns and policy history available. "
"Prior claim activity and linked policy data surfaced."
)
if action.action_type == "verify_provider_registration":
task = self._runtime_task or build_runtime_task(self._state.task_id)
if task.task_id not in {"distribution_shift_claim"}:
raise ValueError("'verify_provider_registration' is only available for distribution_shift_claim")
self._record_discovered_signals(["unregistered_provider", "invalid_gst_registration"])
return "Provider registration check: hospital not found in IRDAI registry. GST number invalid."
if action.action_type == "convene_debate_panel":
if self._debate_convened:
self._exploit_penalty += 0.03
return "Debate panel already convened this episode. Proceed to terminal decision."
self._debate_convened = True
self._debate_transcript = self._generate_debate_transcript()
self._last_progress_step = self._state.step_number
return (
f"Debate panel convened. "
f"Prosecutor: {self._debate_transcript['prosecutor_argument'][:80]}... "
f"Defender: {self._debate_transcript['defender_argument'][:80]}... "
f"Panel verdict: {self._debate_transcript['panel_verdict']}. "
"Review transcript in observation.debate_transcript, then make your final decision."
)
raise ValueError(f"Unsupported action_type '{action.action_type}'")
def _generate_debate_transcript(self) -> Dict[str, Any]:
"""Generate a structured prosecutor vs defender debate based on investigation state."""
task = self._runtime_task
found = self._found_signals
discovered = self._discovered_signals
claimant_name = self._payload.get("claimant", {}).get("name", "the claimant")
incident_type = self._payload.get("incident", {}).get("type", "the incident")
# Prosecutor builds case from discovered and flagged signals
if found:
fraud_signals_str = ", ".join(found)
prosecutor = (
f"PROSECUTOR: The evidence strongly suggests fraud. "
f"Investigation has uncovered {len(found)} fraud signal(s): {fraud_signals_str}. "
f"These signals are consistent with {task.task_id.replace('_', ' ')} fraud patterns. "
f"I recommend denial or escalation — approving this claim would reward deliberate deception."
)
prosecutor_strength = "STRONG" if len(found) >= 2 else "MODERATE"
elif discovered:
prosecutor = (
f"PROSECUTOR: Suspicious indicators have been discovered: {', '.join(discovered)}. "
f"While not yet formally flagged, these anomalies warrant serious scrutiny. "
f"The claim by {claimant_name} regarding {incident_type} shows red flags."
)
prosecutor_strength = "WEAK"
else:
prosecutor = (
f"PROSECUTOR: No fraud signals have been found yet, but the investigation "
f"may be incomplete. More documents should be validated before approval. "
f"Insufficient investigation is itself a risk."
)
prosecutor_strength = "INSUFFICIENT"
# Defender builds case from clean documents and policy context
doc_count = len(self._payload.get("documents", []))
policy_age = self._payload.get("_policy_history", {}).get("policy_age_days", 0)
if task and task.task_id == "clean_claim":
defender = (
f"DEFENDER: All {doc_count} documents are internally consistent. "
f"Claimant {claimant_name} has a clean policy history. "
f"No fraud indicators found. This is a legitimate claim — denial would be unjust."
)
defender_strength = "STRONG"
elif found and len(found) >= len(task.expected_signals if task else []) * 0.6:
defender = (
f"DEFENDER: While anomalies exist, the core claim documentation ({doc_count} docs) "
f"has not been fully discredited. Some apparent inconsistencies may have innocent explanations. "
f"Burden of proof requires clear evidence, not suspicion."
)
defender_strength = "WEAK"
else:
defender = (
f"DEFENDER: The claim has {doc_count} supporting documents submitted on time. "
f"Without confirmed fraud signals, denial would expose the insurer to legal challenge. "
f"Claimant {claimant_name} deserves due process. Standard processing is warranted."
)
defender_strength = "MODERATE"
# Panel verdict: which side has stronger case
strength_rank = {"STRONG": 3, "MODERATE": 2, "WEAK": 1, "INSUFFICIENT": 0}
p_rank = strength_rank.get(prosecutor_strength, 0)
d_rank = strength_rank.get(defender_strength, 0)
if p_rank > d_rank:
verdict = f"Panel leans PROSECUTION ({prosecutor_strength} case). Recommended action: deny_claim or escalate_to_human."
lean = "prosecution"
elif d_rank > p_rank:
verdict = f"Panel leans DEFENSE ({defender_strength} case). Recommended action: approve_claim."
lean = "defense"
else:
verdict = "Panel is SPLIT — both sides have comparable arguments. Judge must use independent judgment and declare LOW confidence."
lean = "split"
return {
"prosecutor_argument": prosecutor,
"prosecutor_strength": prosecutor_strength,
"defender_argument": defender,
"defender_strength": defender_strength,
"panel_verdict": verdict,
"panel_lean": lean,
"signals_at_debate": list(found),
"step_convened": self._state.step_number,
}
def _discover_signals_from_document(self, doc_id: str, task_id: str) -> List[str]:
if task_id == "clean_claim":
return []
mapping: Dict[str, Dict[str, List[str]]] = {
"contradictory_claim": {
"DOC-10": ["date_mismatch"],
"DOC-11": ["date_mismatch"],
"DOC-12": ["cost_inflation"],
"DOC-13": ["signature_mismatch"],
},
"coordinated_fraud": {
"DOC-21": ["shared_repair_shop_far"],
"DOC-22": ["near_identical_descriptions"],
"DOC-23": ["recent_policy_cluster"],
},
"identity_fraud": {
"DOC-31": ["identity_mismatch"],
"DOC-32": ["hospital_no_record"],
# DOC-33 (policy_inception) does NOT reveal recent_policy_purchase here;
# that signal is only discoverable via lookup_policy_history.
"DOC-34": ["dob_inconsistency"],
},
# NEW-7 fix: distribution_shift_claim previously had NO doc-level
# discovery path for any expected_signal. validate_document(...) for
# DOC-41/42/43 returned [], so the only way an honest agent could
# avoid the "raised before discovered" penalty was to skip flagging
# entirely (capping evidence_quality at 0.0 for the task). The
# mapping below mirrors coordinated_fraud:
# DOC-41 (claim_form, declared_cost + claim_date metadata) →
# surfaces recent_policy_cluster (the form's metadata is what
# lets a reviewer notice the recent-policy-window indicator).
# DOC-42 (garage_estimate, "FastRepair Hub Whitefield") →
# surfaces shared_repair_shop_far (the shop name is the
# evidence anchor for the geographic ring indicator).
# DOC-43 (police_report) reveals nothing direct; cross-claim only.
# shared_emergency_contact + clustered_policy_broker are still
# discovered via query_linked_claim (see _apply_action below).
"distribution_shift_claim": {
"DOC-41": ["recent_policy_cluster"],
"DOC-42": ["shared_repair_shop_far"],
},
}
signal_map = mapping.get(task_id, {})
signals = list(signal_map.get(doc_id, []))
# NOTE: shared_emergency_contact is NOT discoverable from primary documents.
# It can only be found by calling query_linked_claim on at least 2 linked claims,
# then flag_fraud_signal with evidence from the queried data. This enforces
# genuine multi-hop reasoning rather than single-step observation reading.
# Keep signal order deterministic and unique.
seen: set[str] = set()
unique_signals: List[str] = []
for signal in signals:
if signal not in seen:
seen.add(signal)
unique_signals.append(signal)
return unique_signals
def _record_discovered_signals(self, signals: List[str]) -> None:
progressed = False
for signal in signals:
if signal not in self._discovered_signals:
self._discovered_signals.append(signal)
progressed = True
if signal not in self._found_signals:
self._found_signals.append(signal)
if progressed:
self._last_progress_step = self._state.step_number
def _build_observation(self, message: str) -> InsuranceClaimObservation:
task = self._runtime_task or build_runtime_task(self._state.task_id)
self._state.flags_raised = deepcopy(self._flags_raised)
self._state.discovered_signals = deepcopy(self._discovered_signals)
self._state.found_signals = deepcopy(self._found_signals)
if self._state.step_number == 0:
# No actions taken yet — reward must be 0.0 so the trajectory is meaningful
evidence_quality_score = 0.0
elif len(task.expected_signals) == 0:
evidence_quality_score = 1.0 if self._false_flags == 0 else 0.0
else:
evidence_quality_score = (
float(self._evidence_hits) / float(self._evidence_total)
if self._evidence_total > 0
else 0.0
)
reward_breakdown = compute_reward_breakdown(
task_id=task.task_id,
expected_signals=task.expected_signals,
found_signals=self._found_signals,
false_flags=self._false_flags,
step_number=self._state.step_number,
max_steps=self._state.max_steps,
final_decision=self._state.final_decision,
allowed_decisions=task.allowed_final_decisions,
payout_estimate_inr=self._state.payout_estimate_inr,
payout_band=task.payout_band,
investigation_targets=self._investigation_targets,
evidence_quality_score=evidence_quality_score,
exploit_penalty=min(self._exploit_penalty, 0.5),
penalty_total=self._state.penalty_total,
queried_claims=self._queried_claims,
agent_confidence=self._agent_confidence,
ground_truth_confidence=task.ground_truth_confidence,
calibration_override=self._calibration_score,
)
return InsuranceClaimObservation(
claim_id=self._payload["claim_id"],
task_id=self._payload["task_id"],
claimant=deepcopy(self._payload["claimant"]),
incident=deepcopy(self._payload["incident"]),
documents=deepcopy(self._payload["documents"]),
linked_claims=deepcopy(self._visible_linked_claims),
action_history=deepcopy(self._action_history),
available_actions=deepcopy(self._payload["available_actions"]),
step_number=self._state.step_number,
max_steps=self._state.max_steps,
investigation_budget=self._payload.get("investigation_budget", 0),
budget_remaining=self._budget_remaining,
flags_raised=deepcopy(self._flags_raised),
discovered_signals=deepcopy(self._discovered_signals),
status=self._state.status,
message=message,
confidence_required=True,
done=self._state.done,
reward=reward_breakdown.total,
rubric_reward=0.0,
rubric_components={},
metadata={
"last_action_error": self._state.last_action_error,
"investigation_targets": self._investigation_targets,
"variant_id": self._payload.get("variant_id", 0),
"evidence_hits": self._evidence_hits,
"evidence_total": self._evidence_total,
"exploit_penalty": round(self._exploit_penalty, 4),
"policy_history_checked": self._policy_history_checked,
"identity_verified": self._identity_verified,
"agent_confidence": self._agent_confidence_str,
"calibration_score": self._calibration_score,
"budget_remaining": self._budget_remaining,
"discovered_signals": deepcopy(self._discovered_signals),
"compared_pairs": [list(p) for p in self._compared_pairs],
},
reward_breakdown=reward_breakdown,
debate_transcript=deepcopy(self._debate_transcript),
)
def _sync_rubric_telemetry(
self,
action: InsuranceClaimAction,
observation: InsuranceClaimObservation,
) -> None:
rubric_reward = self._apply_rubric(action, observation)
observation.rubric_reward = float(rubric_reward)
if self.rubric is not None and hasattr(self.rubric, "component_scores"):
component_scores = self.rubric.component_scores()
observation.rubric_components = dict(component_scores)
self._last_rubric_components = dict(component_scores)
observation.metadata["rubric_components"] = dict(component_scores)
else:
self._last_rubric_components = {}
observation.metadata["rubric_components"] = {}
def available_task_ids() -> List[str]:
return list(TASKS.keys())