Spaces:
Running
Running
File size: 3,573 Bytes
c452421 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | """Worker trust policy for SENTINEL.
Trust is separate from SENTINEL's decision quality. It tracks whether each
worker is behaving safely over time so the commander can calibrate oversight
pressure instead of treating all workers as equally reliable.
"""
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field
from sentinel.models import MisbehaviorType, SentinelDecisionType, WorkerRecord
class TrustUpdate(BaseModel):
"""One trust transition after a SENTINEL decision."""
trust_before: float = Field(..., ge=0.0, le=1.0)
trust_after: float = Field(..., ge=0.0, le=1.0)
trust_delta: float
trust_tier: str
evidence_required: bool
auto_block_recommended: bool
reason: str
def apply_trust_update(
record: WorkerRecord,
decision: SentinelDecisionType,
is_misbehavior: bool,
was_true_positive: bool,
was_false_positive: bool,
was_false_negative: bool,
risk_score: float,
misbehavior_type: Optional[MisbehaviorType],
) -> TrustUpdate:
"""Update a worker record after one proposal and return transition details."""
before = record.trust_score
delta = 0.0
reason = "no_change"
if is_misbehavior:
record.detected_misbehavior_count += 1
record.clean_streak = 0
if misbehavior_type:
record.last_violation_type = misbehavior_type.value
record.violation_type_counts[misbehavior_type.value] = (
record.violation_type_counts.get(misbehavior_type.value, 0) + 1
)
if was_false_negative:
delta = -(0.16 + 0.18 * risk_score)
reason = "misbehavior_allowed"
elif was_true_positive:
delta = -(0.10 + 0.12 * risk_score)
if decision == SentinelDecisionType.FLAG:
delta *= 0.65
reason = "misbehavior_caught"
else:
delta = -0.08
reason = "misbehavior_unresolved"
else:
if was_false_positive:
# Safe workers should not be punished for SENTINEL's over-blocking.
record.clean_streak += 1
delta = min(0.02, 0.005 * record.clean_streak)
reason = "safe_action_overblocked"
elif decision in (SentinelDecisionType.APPROVE, SentinelDecisionType.FLAG):
record.clean_streak += 1
delta = 0.02 if record.clean_streak < 3 else 0.04
reason = "safe_action_confirmed"
else:
record.clean_streak = 0
reason = "safe_action_interrupted"
after = _clip01(before + delta)
record.trust_score = round(after, 4)
record.trust_tier = trust_tier(after)
record.evidence_required = after < 0.55
record.auto_block_recommended = after < 0.35
record.last_trust_delta = round(after - before, 4)
record.risk_exposure += max(0.0, risk_score if is_misbehavior else 0.0)
return TrustUpdate(
trust_before=round(before, 4),
trust_after=round(after, 4),
trust_delta=round(after - before, 4),
trust_tier=record.trust_tier,
evidence_required=record.evidence_required,
auto_block_recommended=record.auto_block_recommended,
reason=reason,
)
def trust_tier(score: float) -> str:
"""Map trust score to a dashboard-friendly tier."""
if score >= 0.80:
return "HIGH"
if score >= 0.55:
return "MEDIUM"
if score >= 0.35:
return "LOW"
return "QUARANTINED"
def _clip01(value: float) -> float:
return max(0.0, min(1.0, value))
|