File size: 5,188 Bytes
71c1ad2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# app/pipeline/decision_engine.py
# Rule-based decision engine: final moderation verdict

from dataclasses import dataclass, field
from app.pipeline.risk_scorer import RiskScore
from app.pipeline.deep_analyzer import DeepAnalysisResult
from app.observability.logging import get_logger

logger = get_logger(__name__)


@dataclass
class Decision:
    """Final moderation decision."""
    action: str  # ALLOWED, WARNING, BLOCKED
    reason: str
    severity: str  # low, medium, high, critical
    categories: list[str] = field(default_factory=list)
    should_alert_parent: bool = False
    should_log: bool = True
    escalation_notes: str | None = None


class DecisionEngine:
    """
    Rule-based final decision engine.

    Takes risk score + optional deep analysis and produces a final verdict.

    Rules:
    - LOW risk  β†’ ALLOWED (no action)
    - MEDIUM risk β†’ WARNING (increment user warning count, log)
    - HIGH risk + deep_confirmed β†’ BLOCKED (alert parent, log, escalate if critical)
    - HIGH risk + deep_not_confirmed β†’ WARNING (false positive recovery)
    - Repeat offender with MEDIUM β†’ BLOCKED (escalate)
    """

    def decide(
        self,
        risk: RiskScore,
        deep_result: DeepAnalysisResult | None = None,
        user_history: dict | None = None,
    ) -> Decision:
        """
        Produce final moderation decision.

        Args:
            risk: Composite risk score from the scoring engine.
            deep_result: Optional deep analysis result (only for HIGH risk).
            user_history: Optional user moderation history.

        Returns:
            Decision with action, reason, and metadata.
        """

        # === LOW RISK ===
        if risk.level == "LOW":
            decision = Decision(
                action="ALLOWED",
                reason="Content passed all safety checks",
                severity="low",
                should_log=False,  # Don't clutter logs with safe content
            )

        # === MEDIUM RISK ===
        elif risk.level == "MEDIUM":
            # Check for repeat offender escalation
            if risk.repeat_offender:
                decision = Decision(
                    action="BLOCKED",
                    reason="Repeat offender with moderately harmful content β€” escalated to block",
                    severity="high",
                    should_alert_parent=True,
                    escalation_notes="User has repeated violation history. Medium-risk content escalated.",
                )
            else:
                decision = Decision(
                    action="WARNING",
                    reason=f"Content flagged as potentially harmful (risk score: {risk.score})",
                    severity="medium",
                    should_alert_parent=False,
                )

        # === HIGH RISK ===
        elif risk.level == "HIGH":
            if deep_result and deep_result.is_confirmed:
                # Deep analysis confirms the threat
                severity = deep_result.severity
                should_escalate = severity == "critical"

                decision = Decision(
                    action="BLOCKED",
                    reason=deep_result.reasoning,
                    severity=severity,
                    categories=deep_result.categories,
                    should_alert_parent=True,
                    escalation_notes=(
                        "CRITICAL: Immediate review required. "
                        f"Recommended action: {deep_result.recommended_action}"
                        if should_escalate
                        else None
                    ),
                )
            elif deep_result and not deep_result.is_confirmed:
                # Deep analysis says it's a false positive
                decision = Decision(
                    action="WARNING",
                    reason=(
                        f"Content initially flagged as high-risk (score: {risk.score}) "
                        f"but deep analysis did not confirm threat. "
                        f"Reasoning: {deep_result.reasoning}"
                    ),
                    severity="low",
                    should_alert_parent=False,
                )
            else:
                # No deep analysis available β€” err on caution
                decision = Decision(
                    action="BLOCKED",
                    reason=f"High-risk content detected (score: {risk.score}). Deep analysis unavailable.",
                    severity="high",
                    should_alert_parent=True,
                    escalation_notes="Deep analysis was not performed. Manual review recommended.",
                )

        else:
            # Fallback
            decision = Decision(
                action="WARNING",
                reason="Unclassified risk level",
                severity="medium",
            )

        logger.info(
            "decision_made",
            action=decision.action,
            severity=decision.severity,
            alert_parent=decision.should_alert_parent,
            risk_score=risk.score,
            risk_level=risk.level,
        )
        return decision