File size: 5,306 Bytes
71c1ad2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# app/pipeline/deep_analyzer.py
# Deep analysis layer: CLIP + Gemini reasoning (HIGH risk only)

from dataclasses import dataclass, field
from PIL import Image
from app.models.model_registry import model_registry
from app.services.gemini_service import gemini_service
from app.pipeline.fast_filter import FilterResult
from app.utils.image_utils import image_to_base64
from app.observability.logging import get_logger

logger = get_logger(__name__)


@dataclass
class DeepAnalysisResult:
    """Output from the deep analysis stage."""
    is_confirmed: bool  # Does deep analysis confirm the threat?
    severity: str  # low, medium, high, critical
    reasoning: str  # Explanation from Gemini
    categories: list[str] = field(default_factory=list)
    recommended_action: str = "warn"  # allow, warn, block, escalate
    confidence: float = 0.0
    clip_scores: dict = field(default_factory=dict)
    gemini_raw: dict = field(default_factory=dict)


class DeepAnalyzer:
    """
    Deep analysis layer invoked only for HIGH-risk content.

    Pipeline:
    1. CLIP multimodal alignment (if image present)
    2. Gemini reasoning via LangChain
    3. Combine signals into final assessment

    This layer trades speed for accuracy — expected latency: 1-3 seconds.
    """

    async def analyze_text(
        self,
        text: str,
        filter_result: FilterResult,
    ) -> DeepAnalysisResult:
        """
        Deep analysis for flagged text content.

        Args:
            text: Original text content.
            filter_result: Results from the fast filter stage.

        Returns:
            DeepAnalysisResult with Gemini reasoning.
        """
        logger.info("deep_analysis_text_started")

        # Prepare context from fast filter
        context = {
            "flagged_categories": filter_result.categories,
            "max_score": filter_result.max_score,
            "max_label": filter_result.max_label,
            "all_scores": filter_result.scores,
        }

        # Invoke Gemini for contextual reasoning
        gemini_result = await gemini_service.analyze_text(text, context)

        result = self._build_result(gemini_result)

        logger.info(
            "deep_analysis_text_complete",
            confirmed=result.is_confirmed,
            severity=result.severity,
            action=result.recommended_action,
        )
        return result

    async def analyze_image(
        self,
        image: Image.Image,
        filter_result: FilterResult,
        context_text: str | None = None,
    ) -> DeepAnalysisResult:
        """
        Deep analysis for flagged image content.

        Args:
            image: PIL Image.
            filter_result: Results from the fast filter.
            context_text: Optional text accompanying the image.

        Returns:
            DeepAnalysisResult with CLIP alignment + Gemini reasoning.
        """
        logger.info("deep_analysis_image_started")

        # Step 1: CLIP multimodal alignment
        clip_scores = {}
        if model_registry.clip_available:
            try:
                clip_result = model_registry.clip_model.align_content(image, context_text)
                clip_scores = clip_result
                logger.info("clip_alignment_complete", most_aligned=clip_result.get("most_aligned"))
            except Exception as e:
                logger.warning("clip_alignment_failed", error=str(e))

        # Step 2: Gemini image reasoning
        context = {
            "flagged_categories": filter_result.categories,
            "max_score": filter_result.max_score,
            "clip_alignment": clip_scores.get("most_aligned", "unknown"),
        }

        image_b64 = image_to_base64(image)
        gemini_result = await gemini_service.analyze_image(image_b64, context)

        result = self._build_result(gemini_result, clip_scores)

        logger.info(
            "deep_analysis_image_complete",
            confirmed=result.is_confirmed,
            severity=result.severity,
        )
        return result

    def _build_result(
        self,
        gemini_result: dict,
        clip_scores: dict | None = None,
    ) -> DeepAnalysisResult:
        """Build DeepAnalysisResult from Gemini response."""
        if "error" in gemini_result:
            # Gemini failed — err on the side of caution
            return DeepAnalysisResult(
                is_confirmed=True,  # Assume harmful if we can't verify
                severity="medium",
                reasoning=f"Deep analysis unavailable: {gemini_result['error']}. Defaulting to caution.",
                recommended_action="warn",
                confidence=0.3,
                clip_scores=clip_scores or {},
                gemini_raw=gemini_result,
            )

        return DeepAnalysisResult(
            is_confirmed=gemini_result.get("is_confirmed", False),
            severity=gemini_result.get("severity", "medium"),
            reasoning=gemini_result.get("reasoning", "No reasoning provided"),
            categories=gemini_result.get("categories", []),
            recommended_action=gemini_result.get("recommended_action", "warn"),
            confidence=gemini_result.get("confidence", 0.5),
            clip_scores=clip_scores or {},
            gemini_raw=gemini_result,
        )