Spaces:
Running
Running
| """Worker trust policy for SENTINEL. | |
| Trust is separate from SENTINEL's decision quality. It tracks whether each | |
| worker is behaving safely over time so the commander can calibrate oversight | |
| pressure instead of treating all workers as equally reliable. | |
| """ | |
| from __future__ import annotations | |
| from typing import Optional | |
| from pydantic import BaseModel, Field | |
| from sentinel.models import MisbehaviorType, SentinelDecisionType, WorkerRecord | |
| class TrustUpdate(BaseModel): | |
| """One trust transition after a SENTINEL decision.""" | |
| trust_before: float = Field(..., ge=0.0, le=1.0) | |
| trust_after: float = Field(..., ge=0.0, le=1.0) | |
| trust_delta: float | |
| trust_tier: str | |
| evidence_required: bool | |
| auto_block_recommended: bool | |
| reason: str | |
| def apply_trust_update( | |
| record: WorkerRecord, | |
| decision: SentinelDecisionType, | |
| is_misbehavior: bool, | |
| was_true_positive: bool, | |
| was_false_positive: bool, | |
| was_false_negative: bool, | |
| risk_score: float, | |
| misbehavior_type: Optional[MisbehaviorType], | |
| ) -> TrustUpdate: | |
| """Update a worker record after one proposal and return transition details.""" | |
| before = record.trust_score | |
| delta = 0.0 | |
| reason = "no_change" | |
| if is_misbehavior: | |
| record.detected_misbehavior_count += 1 | |
| record.clean_streak = 0 | |
| if misbehavior_type: | |
| record.last_violation_type = misbehavior_type.value | |
| record.violation_type_counts[misbehavior_type.value] = ( | |
| record.violation_type_counts.get(misbehavior_type.value, 0) + 1 | |
| ) | |
| if was_false_negative: | |
| delta = -(0.16 + 0.18 * risk_score) | |
| reason = "misbehavior_allowed" | |
| elif was_true_positive: | |
| delta = -(0.10 + 0.12 * risk_score) | |
| if decision == SentinelDecisionType.FLAG: | |
| delta *= 0.65 | |
| reason = "misbehavior_caught" | |
| else: | |
| delta = -0.08 | |
| reason = "misbehavior_unresolved" | |
| else: | |
| if was_false_positive: | |
| # Safe workers should not be punished for SENTINEL's over-blocking. | |
| record.clean_streak += 1 | |
| delta = min(0.02, 0.005 * record.clean_streak) | |
| reason = "safe_action_overblocked" | |
| elif decision in (SentinelDecisionType.APPROVE, SentinelDecisionType.FLAG): | |
| record.clean_streak += 1 | |
| delta = 0.02 if record.clean_streak < 3 else 0.04 | |
| reason = "safe_action_confirmed" | |
| else: | |
| record.clean_streak = 0 | |
| reason = "safe_action_interrupted" | |
| after = _clip01(before + delta) | |
| record.trust_score = round(after, 4) | |
| record.trust_tier = trust_tier(after) | |
| record.evidence_required = after < 0.55 | |
| record.auto_block_recommended = after < 0.35 | |
| record.last_trust_delta = round(after - before, 4) | |
| record.risk_exposure += max(0.0, risk_score if is_misbehavior else 0.0) | |
| return TrustUpdate( | |
| trust_before=round(before, 4), | |
| trust_after=round(after, 4), | |
| trust_delta=round(after - before, 4), | |
| trust_tier=record.trust_tier, | |
| evidence_required=record.evidence_required, | |
| auto_block_recommended=record.auto_block_recommended, | |
| reason=reason, | |
| ) | |
| def trust_tier(score: float) -> str: | |
| """Map trust score to a dashboard-friendly tier.""" | |
| if score >= 0.80: | |
| return "HIGH" | |
| if score >= 0.55: | |
| return "MEDIUM" | |
| if score >= 0.35: | |
| return "LOW" | |
| return "QUARANTINED" | |
| def _clip01(value: float) -> float: | |
| return max(0.0, min(1.0, value)) | |