Spaces:
Running
Running
File size: 5,173 Bytes
71c1ad2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | # app/pipeline/risk_scorer.py
# Composite risk scoring engine
from dataclasses import dataclass
from app.config import get_settings
from app.pipeline.fast_filter import FilterResult
from app.observability.logging import get_logger
logger = get_logger(__name__)
@dataclass
class RiskScore:
"""Composite risk assessment."""
score: float # 0-100
level: str # LOW, MEDIUM, HIGH
components: dict # breakdown of scoring factors
repeat_offender: bool = False
class RiskScorer:
"""
Computes a composite risk score (0-100) from multiple signals.
Scoring formula:
- Base score from model confidence (weighted by category severity)
- Repeat offender boost (user history)
- Multi-category penalty (multiple harmful categories = higher risk)
Thresholds (configurable via env):
- 0-30: LOW → Allow
- 31-65: MEDIUM → Warning
- 66-100: HIGH → Deep Analysis
"""
# Category severity weights (how dangerous each type is)
CATEGORY_WEIGHTS = {
# Text categories (from RoBERTa toxic-bert)
"toxic": 0.6,
"severe_toxic": 1.0,
"obscene": 0.5,
"threat": 1.0,
"insult": 0.5,
"identity_hate": 0.9,
# Image categories
"violence": 0.9,
"nsfw": 0.8,
"self_harm": 1.0,
"hate_symbol": 0.9,
# Generic fallback
"harassment": 0.7,
"bullying": 0.7,
}
# Repeat offender thresholds
REPEAT_OFFENDER_VIOLATIONS = 3
REPEAT_OFFENDER_BOOST = 15 # points added
def __init__(self):
self.settings = get_settings()
def score(
self,
filter_result: FilterResult,
user_history: dict | None = None,
) -> RiskScore:
"""
Compute composite risk score.
Args:
filter_result: Output from fast filter stage.
user_history: Optional user moderation history.
Returns:
RiskScore with level classification.
"""
# 1. Base score from model confidence
base_score = self._compute_base_score(filter_result)
# 2. Multi-category penalty
multi_cat_penalty = self._multi_category_penalty(filter_result)
# 3. Repeat offender boost
repeat_boost, is_repeat = self._repeat_offender_boost(user_history)
# 4. Combine
raw_score = base_score + multi_cat_penalty + repeat_boost
final_score = min(100.0, max(0.0, raw_score))
# 5. Classify level
level = self._classify_level(final_score)
result = RiskScore(
score=round(final_score, 1),
level=level,
components={
"base_score": round(base_score, 1),
"multi_category_penalty": round(multi_cat_penalty, 1),
"repeat_offender_boost": round(repeat_boost, 1),
},
repeat_offender=is_repeat,
)
logger.info(
"risk_scored",
score=result.score,
level=result.level,
components=result.components,
repeat_offender=is_repeat,
)
return result
def _compute_base_score(self, result: FilterResult) -> float:
"""
Compute base score from model predictions.
Uses weighted sum of flagged category scores.
"""
if not result.is_flagged:
# Even unflagged content gets a small score based on max prediction
return result.max_score * 20 # Scale 0-1 → 0-20
# Weighted sum of flagged category scores
weighted_sum = 0.0
weight_total = 0.0
for category, score in result.scores.items():
weight = self.CATEGORY_WEIGHTS.get(category.lower(), 0.5)
weighted_sum += score * weight * 100
weight_total += weight
if weight_total > 0:
return weighted_sum / weight_total
return result.max_score * 60
def _multi_category_penalty(self, result: FilterResult) -> float:
"""Add penalty when multiple harmful categories are detected."""
num_categories = len(result.categories)
if num_categories <= 1:
return 0.0
# Each additional category adds 5 points
return (num_categories - 1) * 5.0
def _repeat_offender_boost(self, user_history: dict | None) -> tuple[float, bool]:
"""Boost score for users with violation history."""
if not user_history:
return 0.0, False
total_violations = user_history.get("total_violations", 0)
is_repeat = total_violations >= self.REPEAT_OFFENDER_VIOLATIONS
if is_repeat:
return self.REPEAT_OFFENDER_BOOST, True
elif total_violations > 0:
# Smaller boost for users with some history
return total_violations * 3.0, False
return 0.0, False
def _classify_level(self, score: float) -> str:
"""Map numeric score to risk level."""
if score <= self.settings.risk_low_max:
return "LOW"
elif score <= self.settings.risk_medium_max:
return "MEDIUM"
else:
return "HIGH"
|