Spaces:
Sleeping
Sleeping
| # app/services/gemini_service.py | |
| # Gemini API integration via LangChain for deep analysis reasoning | |
| from app.config import get_settings | |
| from app.observability.logging import get_logger | |
| logger = get_logger(__name__) | |
| class GeminiService: | |
| """ | |
| Google Gemini API client powered by LangChain. | |
| Used in the deep analysis path for contextual reasoning | |
| about flagged content. Replaces the old raw REST calls | |
| with structured LangChain invocations for: | |
| - Reliable structured output (JSON) | |
| - Automatic retry and fallback | |
| - LangSmith trace integration | |
| """ | |
| def __init__(self): | |
| self.settings = get_settings() | |
| self.llm = None | |
| self._current_key_idx = 0 | |
| self._initialized = False | |
| def initialize(self) -> None: | |
| """Initialize the LangChain Gemini client.""" | |
| keys = self.settings.gemini_keys_list | |
| if not keys: | |
| logger.warning("gemini_no_keys", reason="No API keys configured") | |
| return | |
| try: | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| self.llm = ChatGoogleGenerativeAI( | |
| model=self.settings.gemini_model, | |
| google_api_key=keys[self._current_key_idx], | |
| temperature=0.1, | |
| max_output_tokens=1024, | |
| convert_system_message_to_human=True, | |
| ) | |
| self._initialized = True | |
| logger.info("gemini_initialized", model=self.settings.gemini_model) | |
| except Exception as e: | |
| logger.error("gemini_init_failed", error=str(e)) | |
| def _rotate_key(self) -> bool: | |
| """Rotate to the next API key. Returns False if all keys exhausted.""" | |
| keys = self.settings.gemini_keys_list | |
| if not keys: | |
| return False | |
| self._current_key_idx = (self._current_key_idx + 1) % len(keys) | |
| try: | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| self.llm = ChatGoogleGenerativeAI( | |
| model=self.settings.gemini_model, | |
| google_api_key=keys[self._current_key_idx], | |
| temperature=0.1, | |
| max_output_tokens=1024, | |
| convert_system_message_to_human=True, | |
| ) | |
| logger.info("gemini_key_rotated", key_index=self._current_key_idx) | |
| return True | |
| except Exception: | |
| return False | |
| async def analyze_text(self, text: str, context: dict | None = None) -> dict: | |
| """ | |
| Perform deep contextual analysis of flagged text. | |
| Args: | |
| text: The flagged text content. | |
| context: Additional context (fast filter results, categories, etc.). | |
| Returns: | |
| Structured analysis with verdict, reasoning, and severity. | |
| """ | |
| if not self._initialized: | |
| return {"error": "Gemini not initialized", "is_confirmed": False} | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| system_prompt = """You are an expert content moderator specializing in cyberbullying detection. | |
| You are analyzing content that has been flagged as potentially harmful by automated filters. | |
| Your task: Provide a detailed, contextual analysis. Consider: | |
| - Intent and context (sarcasm, jokes, genuine threats) | |
| - Severity level (mild rudeness vs. serious threats) | |
| - Whether this constitutes cyberbullying | |
| - Impact on minors (under 18) | |
| Respond in this exact JSON format: | |
| { | |
| "is_confirmed": true/false, | |
| "severity": "low" | "medium" | "high" | "critical", | |
| "categories": ["category1", "category2"], | |
| "reasoning": "Detailed explanation of your analysis", | |
| "recommended_action": "allow" | "warn" | "block" | "escalate", | |
| "confidence": 0.0-1.0 | |
| }""" | |
| context_str = "" | |
| if context: | |
| context_str = f"\n\nPre-filter context: {context}" | |
| human_message = f"""Analyze this flagged content:{context_str} | |
| Content: "{text}" | |
| Provide your JSON analysis:""" | |
| return await self._invoke(system_prompt, human_message) | |
| async def analyze_image(self, image_base64: str, context: dict | None = None) -> dict: | |
| """ | |
| Perform deep analysis of a flagged image. | |
| Args: | |
| image_base64: Base64-encoded image. | |
| context: Additional context from fast filter. | |
| Returns: | |
| Structured analysis dict. | |
| """ | |
| if not self._initialized: | |
| return {"error": "Gemini not initialized", "is_confirmed": False} | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| system_prompt = """You are an expert content moderator analyzing images for content harmful to minors. | |
| Analyze the image for: | |
| - Violence, gore, weapons | |
| - Nudity, sexual content | |
| - Drug/alcohol imagery | |
| - Self-harm or suicide content | |
| - Hate symbols, extremist content | |
| - Cyberbullying imagery (humiliating photos, etc.) | |
| Respond in this exact JSON format: | |
| { | |
| "is_confirmed": true/false, | |
| "severity": "low" | "medium" | "high" | "critical", | |
| "categories": ["category1", "category2"], | |
| "reasoning": "Description of what was found", | |
| "recommended_action": "allow" | "warn" | "block" | "escalate", | |
| "confidence": 0.0-1.0 | |
| }""" | |
| context_str = f"\nPre-filter flags: {context}" if context else "" | |
| human_content = [ | |
| {"type": "text", "text": f"Analyze this flagged image:{context_str}\n\nProvide your JSON analysis:"}, | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}, | |
| }, | |
| ] | |
| return await self._invoke_multimodal(system_prompt, human_content) | |
| async def _invoke(self, system_prompt: str, human_message: str) -> dict: | |
| """Invoke Gemini with retry on rate limits.""" | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| import json | |
| keys = self.settings.gemini_keys_list | |
| attempts = max(len(keys), 1) | |
| for attempt in range(attempts): | |
| try: | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=human_message), | |
| ] | |
| response = await self.llm.ainvoke(messages) | |
| return self._parse_response(response.content) | |
| except Exception as e: | |
| error_str = str(e) | |
| if "429" in error_str or "quota" in error_str.lower(): | |
| logger.warning("gemini_rate_limited", attempt=attempt) | |
| if not self._rotate_key(): | |
| break | |
| else: | |
| logger.error("gemini_invoke_failed", error=error_str) | |
| return {"error": error_str, "is_confirmed": False} | |
| return {"error": "All Gemini API keys exhausted", "is_confirmed": False} | |
| async def _invoke_multimodal(self, system_prompt: str, human_content: list) -> dict: | |
| """Invoke Gemini with multimodal content.""" | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| keys = self.settings.gemini_keys_list | |
| attempts = max(len(keys), 1) | |
| for attempt in range(attempts): | |
| try: | |
| messages = [ | |
| SystemMessage(content=system_prompt), | |
| HumanMessage(content=human_content), | |
| ] | |
| response = await self.llm.ainvoke(messages) | |
| return self._parse_response(response.content) | |
| except Exception as e: | |
| error_str = str(e) | |
| if "429" in error_str or "quota" in error_str.lower(): | |
| if not self._rotate_key(): | |
| break | |
| else: | |
| return {"error": error_str, "is_confirmed": False} | |
| return {"error": "All Gemini API keys exhausted", "is_confirmed": False} | |
| def _parse_response(self, text: str) -> dict: | |
| """Parse JSON from Gemini response text.""" | |
| import json, re | |
| try: | |
| # Extract JSON block (handle markdown code fences) | |
| json_match = re.search(r"\{[\s\S]*\}", text) | |
| if json_match: | |
| return json.loads(json_match.group()) | |
| except json.JSONDecodeError: | |
| pass | |
| logger.warning("gemini_parse_failed", raw_text=text[:200]) | |
| return { | |
| "error": "Failed to parse Gemini response", | |
| "is_confirmed": False, | |
| "raw_response": text[:500], | |
| } | |
| def is_initialized(self) -> bool: | |
| return self._initialized | |
| # Global singleton | |
| gemini_service = GeminiService() | |