SentinelAI / app /services /gemini_service.py
sajith-0701's picture
initial deployment for HF Spaces
71c1ad2
# app/services/gemini_service.py
# Gemini API integration via LangChain for deep analysis reasoning
from app.config import get_settings
from app.observability.logging import get_logger
logger = get_logger(__name__)
class GeminiService:
"""
Google Gemini API client powered by LangChain.
Used in the deep analysis path for contextual reasoning
about flagged content. Replaces the old raw REST calls
with structured LangChain invocations for:
- Reliable structured output (JSON)
- Automatic retry and fallback
- LangSmith trace integration
"""
def __init__(self):
self.settings = get_settings()
self.llm = None
self._current_key_idx = 0
self._initialized = False
def initialize(self) -> None:
"""Initialize the LangChain Gemini client."""
keys = self.settings.gemini_keys_list
if not keys:
logger.warning("gemini_no_keys", reason="No API keys configured")
return
try:
from langchain_google_genai import ChatGoogleGenerativeAI
self.llm = ChatGoogleGenerativeAI(
model=self.settings.gemini_model,
google_api_key=keys[self._current_key_idx],
temperature=0.1,
max_output_tokens=1024,
convert_system_message_to_human=True,
)
self._initialized = True
logger.info("gemini_initialized", model=self.settings.gemini_model)
except Exception as e:
logger.error("gemini_init_failed", error=str(e))
def _rotate_key(self) -> bool:
"""Rotate to the next API key. Returns False if all keys exhausted."""
keys = self.settings.gemini_keys_list
if not keys:
return False
self._current_key_idx = (self._current_key_idx + 1) % len(keys)
try:
from langchain_google_genai import ChatGoogleGenerativeAI
self.llm = ChatGoogleGenerativeAI(
model=self.settings.gemini_model,
google_api_key=keys[self._current_key_idx],
temperature=0.1,
max_output_tokens=1024,
convert_system_message_to_human=True,
)
logger.info("gemini_key_rotated", key_index=self._current_key_idx)
return True
except Exception:
return False
async def analyze_text(self, text: str, context: dict | None = None) -> dict:
"""
Perform deep contextual analysis of flagged text.
Args:
text: The flagged text content.
context: Additional context (fast filter results, categories, etc.).
Returns:
Structured analysis with verdict, reasoning, and severity.
"""
if not self._initialized:
return {"error": "Gemini not initialized", "is_confirmed": False}
from langchain_core.messages import SystemMessage, HumanMessage
system_prompt = """You are an expert content moderator specializing in cyberbullying detection.
You are analyzing content that has been flagged as potentially harmful by automated filters.
Your task: Provide a detailed, contextual analysis. Consider:
- Intent and context (sarcasm, jokes, genuine threats)
- Severity level (mild rudeness vs. serious threats)
- Whether this constitutes cyberbullying
- Impact on minors (under 18)
Respond in this exact JSON format:
{
"is_confirmed": true/false,
"severity": "low" | "medium" | "high" | "critical",
"categories": ["category1", "category2"],
"reasoning": "Detailed explanation of your analysis",
"recommended_action": "allow" | "warn" | "block" | "escalate",
"confidence": 0.0-1.0
}"""
context_str = ""
if context:
context_str = f"\n\nPre-filter context: {context}"
human_message = f"""Analyze this flagged content:{context_str}
Content: "{text}"
Provide your JSON analysis:"""
return await self._invoke(system_prompt, human_message)
async def analyze_image(self, image_base64: str, context: dict | None = None) -> dict:
"""
Perform deep analysis of a flagged image.
Args:
image_base64: Base64-encoded image.
context: Additional context from fast filter.
Returns:
Structured analysis dict.
"""
if not self._initialized:
return {"error": "Gemini not initialized", "is_confirmed": False}
from langchain_core.messages import SystemMessage, HumanMessage
system_prompt = """You are an expert content moderator analyzing images for content harmful to minors.
Analyze the image for:
- Violence, gore, weapons
- Nudity, sexual content
- Drug/alcohol imagery
- Self-harm or suicide content
- Hate symbols, extremist content
- Cyberbullying imagery (humiliating photos, etc.)
Respond in this exact JSON format:
{
"is_confirmed": true/false,
"severity": "low" | "medium" | "high" | "critical",
"categories": ["category1", "category2"],
"reasoning": "Description of what was found",
"recommended_action": "allow" | "warn" | "block" | "escalate",
"confidence": 0.0-1.0
}"""
context_str = f"\nPre-filter flags: {context}" if context else ""
human_content = [
{"type": "text", "text": f"Analyze this flagged image:{context_str}\n\nProvide your JSON analysis:"},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_base64}"},
},
]
return await self._invoke_multimodal(system_prompt, human_content)
async def _invoke(self, system_prompt: str, human_message: str) -> dict:
"""Invoke Gemini with retry on rate limits."""
from langchain_core.messages import SystemMessage, HumanMessage
import json
keys = self.settings.gemini_keys_list
attempts = max(len(keys), 1)
for attempt in range(attempts):
try:
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=human_message),
]
response = await self.llm.ainvoke(messages)
return self._parse_response(response.content)
except Exception as e:
error_str = str(e)
if "429" in error_str or "quota" in error_str.lower():
logger.warning("gemini_rate_limited", attempt=attempt)
if not self._rotate_key():
break
else:
logger.error("gemini_invoke_failed", error=error_str)
return {"error": error_str, "is_confirmed": False}
return {"error": "All Gemini API keys exhausted", "is_confirmed": False}
async def _invoke_multimodal(self, system_prompt: str, human_content: list) -> dict:
"""Invoke Gemini with multimodal content."""
from langchain_core.messages import SystemMessage, HumanMessage
keys = self.settings.gemini_keys_list
attempts = max(len(keys), 1)
for attempt in range(attempts):
try:
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=human_content),
]
response = await self.llm.ainvoke(messages)
return self._parse_response(response.content)
except Exception as e:
error_str = str(e)
if "429" in error_str or "quota" in error_str.lower():
if not self._rotate_key():
break
else:
return {"error": error_str, "is_confirmed": False}
return {"error": "All Gemini API keys exhausted", "is_confirmed": False}
def _parse_response(self, text: str) -> dict:
"""Parse JSON from Gemini response text."""
import json, re
try:
# Extract JSON block (handle markdown code fences)
json_match = re.search(r"\{[\s\S]*\}", text)
if json_match:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
logger.warning("gemini_parse_failed", raw_text=text[:200])
return {
"error": "Failed to parse Gemini response",
"is_confirmed": False,
"raw_response": text[:500],
}
@property
def is_initialized(self) -> bool:
return self._initialized
# Global singleton
gemini_service = GeminiService()