# app/pipeline/risk_scorer.py # Composite risk scoring engine from dataclasses import dataclass from app.config import get_settings from app.pipeline.fast_filter import FilterResult from app.observability.logging import get_logger logger = get_logger(__name__) @dataclass class RiskScore: """Composite risk assessment.""" score: float # 0-100 level: str # LOW, MEDIUM, HIGH components: dict # breakdown of scoring factors repeat_offender: bool = False class RiskScorer: """ Computes a composite risk score (0-100) from multiple signals. Scoring formula: - Base score from model confidence (weighted by category severity) - Repeat offender boost (user history) - Multi-category penalty (multiple harmful categories = higher risk) Thresholds (configurable via env): - 0-30: LOW → Allow - 31-65: MEDIUM → Warning - 66-100: HIGH → Deep Analysis """ # Category severity weights (how dangerous each type is) CATEGORY_WEIGHTS = { # Text categories (from RoBERTa toxic-bert) "toxic": 0.6, "severe_toxic": 1.0, "obscene": 0.5, "threat": 1.0, "insult": 0.5, "identity_hate": 0.9, # Image categories "violence": 0.9, "nsfw": 0.8, "self_harm": 1.0, "hate_symbol": 0.9, # Generic fallback "harassment": 0.7, "bullying": 0.7, } # Repeat offender thresholds REPEAT_OFFENDER_VIOLATIONS = 3 REPEAT_OFFENDER_BOOST = 15 # points added def __init__(self): self.settings = get_settings() def score( self, filter_result: FilterResult, user_history: dict | None = None, ) -> RiskScore: """ Compute composite risk score. Args: filter_result: Output from fast filter stage. user_history: Optional user moderation history. Returns: RiskScore with level classification. """ # 1. Base score from model confidence base_score = self._compute_base_score(filter_result) # 2. Multi-category penalty multi_cat_penalty = self._multi_category_penalty(filter_result) # 3. Repeat offender boost repeat_boost, is_repeat = self._repeat_offender_boost(user_history) # 4. Combine raw_score = base_score + multi_cat_penalty + repeat_boost final_score = min(100.0, max(0.0, raw_score)) # 5. Classify level level = self._classify_level(final_score) result = RiskScore( score=round(final_score, 1), level=level, components={ "base_score": round(base_score, 1), "multi_category_penalty": round(multi_cat_penalty, 1), "repeat_offender_boost": round(repeat_boost, 1), }, repeat_offender=is_repeat, ) logger.info( "risk_scored", score=result.score, level=result.level, components=result.components, repeat_offender=is_repeat, ) return result def _compute_base_score(self, result: FilterResult) -> float: """ Compute base score from model predictions. Uses weighted sum of flagged category scores. """ if not result.is_flagged: # Even unflagged content gets a small score based on max prediction return result.max_score * 20 # Scale 0-1 → 0-20 # Weighted sum of flagged category scores weighted_sum = 0.0 weight_total = 0.0 for category, score in result.scores.items(): weight = self.CATEGORY_WEIGHTS.get(category.lower(), 0.5) weighted_sum += score * weight * 100 weight_total += weight if weight_total > 0: return weighted_sum / weight_total return result.max_score * 60 def _multi_category_penalty(self, result: FilterResult) -> float: """Add penalty when multiple harmful categories are detected.""" num_categories = len(result.categories) if num_categories <= 1: return 0.0 # Each additional category adds 5 points return (num_categories - 1) * 5.0 def _repeat_offender_boost(self, user_history: dict | None) -> tuple[float, bool]: """Boost score for users with violation history.""" if not user_history: return 0.0, False total_violations = user_history.get("total_violations", 0) is_repeat = total_violations >= self.REPEAT_OFFENDER_VIOLATIONS if is_repeat: return self.REPEAT_OFFENDER_BOOST, True elif total_violations > 0: # Smaller boost for users with some history return total_violations * 3.0, False return 0.0, False def _classify_level(self, score: float) -> str: """Map numeric score to risk level.""" if score <= self.settings.risk_low_max: return "LOW" elif score <= self.settings.risk_medium_max: return "MEDIUM" else: return "HIGH"