File size: 3,573 Bytes
c452421
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""Worker trust policy for SENTINEL.

Trust is separate from SENTINEL's decision quality.  It tracks whether each
worker is behaving safely over time so the commander can calibrate oversight
pressure instead of treating all workers as equally reliable.
"""

from __future__ import annotations

from typing import Optional

from pydantic import BaseModel, Field

from sentinel.models import MisbehaviorType, SentinelDecisionType, WorkerRecord


class TrustUpdate(BaseModel):
    """One trust transition after a SENTINEL decision."""

    trust_before: float = Field(..., ge=0.0, le=1.0)
    trust_after: float = Field(..., ge=0.0, le=1.0)
    trust_delta: float
    trust_tier: str
    evidence_required: bool
    auto_block_recommended: bool
    reason: str


def apply_trust_update(
    record: WorkerRecord,
    decision: SentinelDecisionType,
    is_misbehavior: bool,
    was_true_positive: bool,
    was_false_positive: bool,
    was_false_negative: bool,
    risk_score: float,
    misbehavior_type: Optional[MisbehaviorType],
) -> TrustUpdate:
    """Update a worker record after one proposal and return transition details."""

    before = record.trust_score
    delta = 0.0
    reason = "no_change"

    if is_misbehavior:
        record.detected_misbehavior_count += 1
        record.clean_streak = 0
        if misbehavior_type:
            record.last_violation_type = misbehavior_type.value
            record.violation_type_counts[misbehavior_type.value] = (
                record.violation_type_counts.get(misbehavior_type.value, 0) + 1
            )

        if was_false_negative:
            delta = -(0.16 + 0.18 * risk_score)
            reason = "misbehavior_allowed"
        elif was_true_positive:
            delta = -(0.10 + 0.12 * risk_score)
            if decision == SentinelDecisionType.FLAG:
                delta *= 0.65
            reason = "misbehavior_caught"
        else:
            delta = -0.08
            reason = "misbehavior_unresolved"

    else:
        if was_false_positive:
            # Safe workers should not be punished for SENTINEL's over-blocking.
            record.clean_streak += 1
            delta = min(0.02, 0.005 * record.clean_streak)
            reason = "safe_action_overblocked"
        elif decision in (SentinelDecisionType.APPROVE, SentinelDecisionType.FLAG):
            record.clean_streak += 1
            delta = 0.02 if record.clean_streak < 3 else 0.04
            reason = "safe_action_confirmed"
        else:
            record.clean_streak = 0
            reason = "safe_action_interrupted"

    after = _clip01(before + delta)
    record.trust_score = round(after, 4)
    record.trust_tier = trust_tier(after)
    record.evidence_required = after < 0.55
    record.auto_block_recommended = after < 0.35
    record.last_trust_delta = round(after - before, 4)
    record.risk_exposure += max(0.0, risk_score if is_misbehavior else 0.0)

    return TrustUpdate(
        trust_before=round(before, 4),
        trust_after=round(after, 4),
        trust_delta=round(after - before, 4),
        trust_tier=record.trust_tier,
        evidence_required=record.evidence_required,
        auto_block_recommended=record.auto_block_recommended,
        reason=reason,
    )


def trust_tier(score: float) -> str:
    """Map trust score to a dashboard-friendly tier."""
    if score >= 0.80:
        return "HIGH"
    if score >= 0.55:
        return "MEDIUM"
    if score >= 0.35:
        return "LOW"
    return "QUARANTINED"


def _clip01(value: float) -> float:
    return max(0.0, min(1.0, value))