File size: 5,173 Bytes
71c1ad2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# app/pipeline/risk_scorer.py
# Composite risk scoring engine

from dataclasses import dataclass
from app.config import get_settings
from app.pipeline.fast_filter import FilterResult
from app.observability.logging import get_logger

logger = get_logger(__name__)


@dataclass
class RiskScore:
    """Composite risk assessment."""
    score: float  # 0-100
    level: str  # LOW, MEDIUM, HIGH
    components: dict  # breakdown of scoring factors
    repeat_offender: bool = False


class RiskScorer:
    """
    Computes a composite risk score (0-100) from multiple signals.

    Scoring formula:
    - Base score from model confidence (weighted by category severity)
    - Repeat offender boost (user history)
    - Multi-category penalty (multiple harmful categories = higher risk)

    Thresholds (configurable via env):
    - 0-30:  LOW    → Allow
    - 31-65: MEDIUM → Warning
    - 66-100: HIGH  → Deep Analysis
    """

    # Category severity weights (how dangerous each type is)
    CATEGORY_WEIGHTS = {
        # Text categories (from RoBERTa toxic-bert)
        "toxic": 0.6,
        "severe_toxic": 1.0,
        "obscene": 0.5,
        "threat": 1.0,
        "insult": 0.5,
        "identity_hate": 0.9,
        # Image categories
        "violence": 0.9,
        "nsfw": 0.8,
        "self_harm": 1.0,
        "hate_symbol": 0.9,
        # Generic fallback
        "harassment": 0.7,
        "bullying": 0.7,
    }

    # Repeat offender thresholds
    REPEAT_OFFENDER_VIOLATIONS = 3
    REPEAT_OFFENDER_BOOST = 15  # points added

    def __init__(self):
        self.settings = get_settings()

    def score(
        self,
        filter_result: FilterResult,
        user_history: dict | None = None,
    ) -> RiskScore:
        """
        Compute composite risk score.

        Args:
            filter_result: Output from fast filter stage.
            user_history: Optional user moderation history.

        Returns:
            RiskScore with level classification.
        """
        # 1. Base score from model confidence
        base_score = self._compute_base_score(filter_result)

        # 2. Multi-category penalty
        multi_cat_penalty = self._multi_category_penalty(filter_result)

        # 3. Repeat offender boost
        repeat_boost, is_repeat = self._repeat_offender_boost(user_history)

        # 4. Combine
        raw_score = base_score + multi_cat_penalty + repeat_boost
        final_score = min(100.0, max(0.0, raw_score))

        # 5. Classify level
        level = self._classify_level(final_score)

        result = RiskScore(
            score=round(final_score, 1),
            level=level,
            components={
                "base_score": round(base_score, 1),
                "multi_category_penalty": round(multi_cat_penalty, 1),
                "repeat_offender_boost": round(repeat_boost, 1),
            },
            repeat_offender=is_repeat,
        )

        logger.info(
            "risk_scored",
            score=result.score,
            level=result.level,
            components=result.components,
            repeat_offender=is_repeat,
        )
        return result

    def _compute_base_score(self, result: FilterResult) -> float:
        """
        Compute base score from model predictions.

        Uses weighted sum of flagged category scores.
        """
        if not result.is_flagged:
            # Even unflagged content gets a small score based on max prediction
            return result.max_score * 20  # Scale 0-1 → 0-20

        # Weighted sum of flagged category scores
        weighted_sum = 0.0
        weight_total = 0.0

        for category, score in result.scores.items():
            weight = self.CATEGORY_WEIGHTS.get(category.lower(), 0.5)
            weighted_sum += score * weight * 100
            weight_total += weight

        if weight_total > 0:
            return weighted_sum / weight_total
        return result.max_score * 60

    def _multi_category_penalty(self, result: FilterResult) -> float:
        """Add penalty when multiple harmful categories are detected."""
        num_categories = len(result.categories)
        if num_categories <= 1:
            return 0.0
        # Each additional category adds 5 points
        return (num_categories - 1) * 5.0

    def _repeat_offender_boost(self, user_history: dict | None) -> tuple[float, bool]:
        """Boost score for users with violation history."""
        if not user_history:
            return 0.0, False

        total_violations = user_history.get("total_violations", 0)
        is_repeat = total_violations >= self.REPEAT_OFFENDER_VIOLATIONS

        if is_repeat:
            return self.REPEAT_OFFENDER_BOOST, True
        elif total_violations > 0:
            # Smaller boost for users with some history
            return total_violations * 3.0, False
        return 0.0, False

    def _classify_level(self, score: float) -> str:
        """Map numeric score to risk level."""
        if score <= self.settings.risk_low_max:
            return "LOW"
        elif score <= self.settings.risk_medium_max:
            return "MEDIUM"
        else:
            return "HIGH"