Spaces:
Sleeping
Sleeping
File size: 8,657 Bytes
71c1ad2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 | # app/services/gemini_service.py
# Gemini API integration via LangChain for deep analysis reasoning
from app.config import get_settings
from app.observability.logging import get_logger
logger = get_logger(__name__)
class GeminiService:
"""
Google Gemini API client powered by LangChain.
Used in the deep analysis path for contextual reasoning
about flagged content. Replaces the old raw REST calls
with structured LangChain invocations for:
- Reliable structured output (JSON)
- Automatic retry and fallback
- LangSmith trace integration
"""
def __init__(self):
self.settings = get_settings()
self.llm = None
self._current_key_idx = 0
self._initialized = False
def initialize(self) -> None:
"""Initialize the LangChain Gemini client."""
keys = self.settings.gemini_keys_list
if not keys:
logger.warning("gemini_no_keys", reason="No API keys configured")
return
try:
from langchain_google_genai import ChatGoogleGenerativeAI
self.llm = ChatGoogleGenerativeAI(
model=self.settings.gemini_model,
google_api_key=keys[self._current_key_idx],
temperature=0.1,
max_output_tokens=1024,
convert_system_message_to_human=True,
)
self._initialized = True
logger.info("gemini_initialized", model=self.settings.gemini_model)
except Exception as e:
logger.error("gemini_init_failed", error=str(e))
def _rotate_key(self) -> bool:
"""Rotate to the next API key. Returns False if all keys exhausted."""
keys = self.settings.gemini_keys_list
if not keys:
return False
self._current_key_idx = (self._current_key_idx + 1) % len(keys)
try:
from langchain_google_genai import ChatGoogleGenerativeAI
self.llm = ChatGoogleGenerativeAI(
model=self.settings.gemini_model,
google_api_key=keys[self._current_key_idx],
temperature=0.1,
max_output_tokens=1024,
convert_system_message_to_human=True,
)
logger.info("gemini_key_rotated", key_index=self._current_key_idx)
return True
except Exception:
return False
async def analyze_text(self, text: str, context: dict | None = None) -> dict:
"""
Perform deep contextual analysis of flagged text.
Args:
text: The flagged text content.
context: Additional context (fast filter results, categories, etc.).
Returns:
Structured analysis with verdict, reasoning, and severity.
"""
if not self._initialized:
return {"error": "Gemini not initialized", "is_confirmed": False}
from langchain_core.messages import SystemMessage, HumanMessage
system_prompt = """You are an expert content moderator specializing in cyberbullying detection.
You are analyzing content that has been flagged as potentially harmful by automated filters.
Your task: Provide a detailed, contextual analysis. Consider:
- Intent and context (sarcasm, jokes, genuine threats)
- Severity level (mild rudeness vs. serious threats)
- Whether this constitutes cyberbullying
- Impact on minors (under 18)
Respond in this exact JSON format:
{
"is_confirmed": true/false,
"severity": "low" | "medium" | "high" | "critical",
"categories": ["category1", "category2"],
"reasoning": "Detailed explanation of your analysis",
"recommended_action": "allow" | "warn" | "block" | "escalate",
"confidence": 0.0-1.0
}"""
context_str = ""
if context:
context_str = f"\n\nPre-filter context: {context}"
human_message = f"""Analyze this flagged content:{context_str}
Content: "{text}"
Provide your JSON analysis:"""
return await self._invoke(system_prompt, human_message)
async def analyze_image(self, image_base64: str, context: dict | None = None) -> dict:
"""
Perform deep analysis of a flagged image.
Args:
image_base64: Base64-encoded image.
context: Additional context from fast filter.
Returns:
Structured analysis dict.
"""
if not self._initialized:
return {"error": "Gemini not initialized", "is_confirmed": False}
from langchain_core.messages import SystemMessage, HumanMessage
system_prompt = """You are an expert content moderator analyzing images for content harmful to minors.
Analyze the image for:
- Violence, gore, weapons
- Nudity, sexual content
- Drug/alcohol imagery
- Self-harm or suicide content
- Hate symbols, extremist content
- Cyberbullying imagery (humiliating photos, etc.)
Respond in this exact JSON format:
{
"is_confirmed": true/false,
"severity": "low" | "medium" | "high" | "critical",
"categories": ["category1", "category2"],
"reasoning": "Description of what was found",
"recommended_action": "allow" | "warn" | "block" | "escalate",
"confidence": 0.0-1.0
}"""
context_str = f"\nPre-filter flags: {context}" if context else ""
human_content = [
{"type": "text", "text": f"Analyze this flagged image:{context_str}\n\nProvide your JSON analysis:"},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_base64}"},
},
]
return await self._invoke_multimodal(system_prompt, human_content)
async def _invoke(self, system_prompt: str, human_message: str) -> dict:
"""Invoke Gemini with retry on rate limits."""
from langchain_core.messages import SystemMessage, HumanMessage
import json
keys = self.settings.gemini_keys_list
attempts = max(len(keys), 1)
for attempt in range(attempts):
try:
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=human_message),
]
response = await self.llm.ainvoke(messages)
return self._parse_response(response.content)
except Exception as e:
error_str = str(e)
if "429" in error_str or "quota" in error_str.lower():
logger.warning("gemini_rate_limited", attempt=attempt)
if not self._rotate_key():
break
else:
logger.error("gemini_invoke_failed", error=error_str)
return {"error": error_str, "is_confirmed": False}
return {"error": "All Gemini API keys exhausted", "is_confirmed": False}
async def _invoke_multimodal(self, system_prompt: str, human_content: list) -> dict:
"""Invoke Gemini with multimodal content."""
from langchain_core.messages import SystemMessage, HumanMessage
keys = self.settings.gemini_keys_list
attempts = max(len(keys), 1)
for attempt in range(attempts):
try:
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=human_content),
]
response = await self.llm.ainvoke(messages)
return self._parse_response(response.content)
except Exception as e:
error_str = str(e)
if "429" in error_str or "quota" in error_str.lower():
if not self._rotate_key():
break
else:
return {"error": error_str, "is_confirmed": False}
return {"error": "All Gemini API keys exhausted", "is_confirmed": False}
def _parse_response(self, text: str) -> dict:
"""Parse JSON from Gemini response text."""
import json, re
try:
# Extract JSON block (handle markdown code fences)
json_match = re.search(r"\{[\s\S]*\}", text)
if json_match:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
logger.warning("gemini_parse_failed", raw_text=text[:200])
return {
"error": "Failed to parse Gemini response",
"is_confirmed": False,
"raw_response": text[:500],
}
@property
def is_initialized(self) -> bool:
return self._initialized
# Global singleton
gemini_service = GeminiService()
|