Spaces:
Sleeping
Sleeping
File size: 5,188 Bytes
71c1ad2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | # app/pipeline/decision_engine.py
# Rule-based decision engine: final moderation verdict
from dataclasses import dataclass, field
from app.pipeline.risk_scorer import RiskScore
from app.pipeline.deep_analyzer import DeepAnalysisResult
from app.observability.logging import get_logger
logger = get_logger(__name__)
@dataclass
class Decision:
"""Final moderation decision."""
action: str # ALLOWED, WARNING, BLOCKED
reason: str
severity: str # low, medium, high, critical
categories: list[str] = field(default_factory=list)
should_alert_parent: bool = False
should_log: bool = True
escalation_notes: str | None = None
class DecisionEngine:
"""
Rule-based final decision engine.
Takes risk score + optional deep analysis and produces a final verdict.
Rules:
- LOW risk β ALLOWED (no action)
- MEDIUM risk β WARNING (increment user warning count, log)
- HIGH risk + deep_confirmed β BLOCKED (alert parent, log, escalate if critical)
- HIGH risk + deep_not_confirmed β WARNING (false positive recovery)
- Repeat offender with MEDIUM β BLOCKED (escalate)
"""
def decide(
self,
risk: RiskScore,
deep_result: DeepAnalysisResult | None = None,
user_history: dict | None = None,
) -> Decision:
"""
Produce final moderation decision.
Args:
risk: Composite risk score from the scoring engine.
deep_result: Optional deep analysis result (only for HIGH risk).
user_history: Optional user moderation history.
Returns:
Decision with action, reason, and metadata.
"""
# === LOW RISK ===
if risk.level == "LOW":
decision = Decision(
action="ALLOWED",
reason="Content passed all safety checks",
severity="low",
should_log=False, # Don't clutter logs with safe content
)
# === MEDIUM RISK ===
elif risk.level == "MEDIUM":
# Check for repeat offender escalation
if risk.repeat_offender:
decision = Decision(
action="BLOCKED",
reason="Repeat offender with moderately harmful content β escalated to block",
severity="high",
should_alert_parent=True,
escalation_notes="User has repeated violation history. Medium-risk content escalated.",
)
else:
decision = Decision(
action="WARNING",
reason=f"Content flagged as potentially harmful (risk score: {risk.score})",
severity="medium",
should_alert_parent=False,
)
# === HIGH RISK ===
elif risk.level == "HIGH":
if deep_result and deep_result.is_confirmed:
# Deep analysis confirms the threat
severity = deep_result.severity
should_escalate = severity == "critical"
decision = Decision(
action="BLOCKED",
reason=deep_result.reasoning,
severity=severity,
categories=deep_result.categories,
should_alert_parent=True,
escalation_notes=(
"CRITICAL: Immediate review required. "
f"Recommended action: {deep_result.recommended_action}"
if should_escalate
else None
),
)
elif deep_result and not deep_result.is_confirmed:
# Deep analysis says it's a false positive
decision = Decision(
action="WARNING",
reason=(
f"Content initially flagged as high-risk (score: {risk.score}) "
f"but deep analysis did not confirm threat. "
f"Reasoning: {deep_result.reasoning}"
),
severity="low",
should_alert_parent=False,
)
else:
# No deep analysis available β err on caution
decision = Decision(
action="BLOCKED",
reason=f"High-risk content detected (score: {risk.score}). Deep analysis unavailable.",
severity="high",
should_alert_parent=True,
escalation_notes="Deep analysis was not performed. Manual review recommended.",
)
else:
# Fallback
decision = Decision(
action="WARNING",
reason="Unclassified risk level",
severity="medium",
)
logger.info(
"decision_made",
action=decision.action,
severity=decision.severity,
alert_parent=decision.should_alert_parent,
risk_score=risk.score,
risk_level=risk.level,
)
return decision
|