SentinelAI / app /pipeline /decision_engine.py
sajith-0701's picture
initial deployment for HF Spaces
71c1ad2
# app/pipeline/decision_engine.py
# Rule-based decision engine: final moderation verdict
from dataclasses import dataclass, field
from app.pipeline.risk_scorer import RiskScore
from app.pipeline.deep_analyzer import DeepAnalysisResult
from app.observability.logging import get_logger
logger = get_logger(__name__)
@dataclass
class Decision:
"""Final moderation decision."""
action: str # ALLOWED, WARNING, BLOCKED
reason: str
severity: str # low, medium, high, critical
categories: list[str] = field(default_factory=list)
should_alert_parent: bool = False
should_log: bool = True
escalation_notes: str | None = None
class DecisionEngine:
"""
Rule-based final decision engine.
Takes risk score + optional deep analysis and produces a final verdict.
Rules:
- LOW risk β†’ ALLOWED (no action)
- MEDIUM risk β†’ WARNING (increment user warning count, log)
- HIGH risk + deep_confirmed β†’ BLOCKED (alert parent, log, escalate if critical)
- HIGH risk + deep_not_confirmed β†’ WARNING (false positive recovery)
- Repeat offender with MEDIUM β†’ BLOCKED (escalate)
"""
def decide(
self,
risk: RiskScore,
deep_result: DeepAnalysisResult | None = None,
user_history: dict | None = None,
) -> Decision:
"""
Produce final moderation decision.
Args:
risk: Composite risk score from the scoring engine.
deep_result: Optional deep analysis result (only for HIGH risk).
user_history: Optional user moderation history.
Returns:
Decision with action, reason, and metadata.
"""
# === LOW RISK ===
if risk.level == "LOW":
decision = Decision(
action="ALLOWED",
reason="Content passed all safety checks",
severity="low",
should_log=False, # Don't clutter logs with safe content
)
# === MEDIUM RISK ===
elif risk.level == "MEDIUM":
# Check for repeat offender escalation
if risk.repeat_offender:
decision = Decision(
action="BLOCKED",
reason="Repeat offender with moderately harmful content β€” escalated to block",
severity="high",
should_alert_parent=True,
escalation_notes="User has repeated violation history. Medium-risk content escalated.",
)
else:
decision = Decision(
action="WARNING",
reason=f"Content flagged as potentially harmful (risk score: {risk.score})",
severity="medium",
should_alert_parent=False,
)
# === HIGH RISK ===
elif risk.level == "HIGH":
if deep_result and deep_result.is_confirmed:
# Deep analysis confirms the threat
severity = deep_result.severity
should_escalate = severity == "critical"
decision = Decision(
action="BLOCKED",
reason=deep_result.reasoning,
severity=severity,
categories=deep_result.categories,
should_alert_parent=True,
escalation_notes=(
"CRITICAL: Immediate review required. "
f"Recommended action: {deep_result.recommended_action}"
if should_escalate
else None
),
)
elif deep_result and not deep_result.is_confirmed:
# Deep analysis says it's a false positive
decision = Decision(
action="WARNING",
reason=(
f"Content initially flagged as high-risk (score: {risk.score}) "
f"but deep analysis did not confirm threat. "
f"Reasoning: {deep_result.reasoning}"
),
severity="low",
should_alert_parent=False,
)
else:
# No deep analysis available β€” err on caution
decision = Decision(
action="BLOCKED",
reason=f"High-risk content detected (score: {risk.score}). Deep analysis unavailable.",
severity="high",
should_alert_parent=True,
escalation_notes="Deep analysis was not performed. Manual review recommended.",
)
else:
# Fallback
decision = Decision(
action="WARNING",
reason="Unclassified risk level",
severity="medium",
)
logger.info(
"decision_made",
action=decision.action,
severity=decision.severity,
alert_parent=decision.should_alert_parent,
risk_score=risk.score,
risk_level=risk.level,
)
return decision