Spaces:
Sleeping
Sleeping
| # app/api/v1/analyze.py | |
| # Core analysis endpoints β text, image, video | |
| import time | |
| import uuid | |
| from dataclasses import asdict | |
| from fastapi import APIRouter, UploadFile, File, Form, HTTPException | |
| from app.api.schemas.requests import TextAnalysisRequest | |
| from app.api.schemas.responses import ( | |
| AnalysisResponse, | |
| DecisionDetail, | |
| RiskDetail, | |
| FilterDetail, | |
| DeepAnalysisDetail, | |
| ErrorResponse, | |
| ) | |
| from app.services.mongo_service import mongo_service | |
| from app.services.redis_service import redis_service | |
| from app.pipeline.workflow import get_workflow, PipelineState | |
| from app.observability.logging import get_logger | |
| from app.config import get_settings | |
| logger = get_logger(__name__) | |
| settings = get_settings() | |
| router = APIRouter(tags=["Analysis"]) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helper: Convert pipeline state to API response | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_response( | |
| state: dict, | |
| input_type: str, | |
| request_id: str, | |
| start_time: float, | |
| cached: bool = False, | |
| ) -> AnalysisResponse: | |
| """Convert pipeline output state into the unified AnalysisResponse.""" | |
| risk = state.get("risk_score") | |
| decision = state.get("decision") | |
| filter_result = state.get("filter_result") | |
| deep_result = state.get("deep_result") | |
| # Build nested models | |
| decision_detail = DecisionDetail( | |
| action=decision.action if decision else "WARNING", | |
| reason=decision.reason if decision else "Pipeline incomplete", | |
| severity=decision.severity if decision else "medium", | |
| should_alert_parent=decision.should_alert_parent if decision else False, | |
| escalation_notes=decision.escalation_notes if decision else None, | |
| ) | |
| risk_detail = RiskDetail( | |
| score=risk.score if risk else 0.0, | |
| level=risk.level if risk else "LOW", | |
| components=risk.components if risk else {}, | |
| repeat_offender=risk.repeat_offender if risk else False, | |
| ) | |
| filter_detail = FilterDetail( | |
| is_flagged=filter_result.is_flagged if filter_result else False, | |
| scores=filter_result.scores if filter_result else {}, | |
| max_score=filter_result.max_score if filter_result else 0.0, | |
| max_label=filter_result.max_label if filter_result else "", | |
| categories=filter_result.categories if filter_result else [], | |
| ) | |
| deep_analysis = None | |
| if deep_result: | |
| deep_analysis = DeepAnalysisDetail( | |
| is_confirmed=deep_result.is_confirmed, | |
| severity=deep_result.severity, | |
| reasoning=deep_result.reasoning, | |
| categories=deep_result.categories, | |
| recommended_action=deep_result.recommended_action, | |
| confidence=deep_result.confidence, | |
| clip_scores=deep_result.clip_scores, | |
| ) | |
| processing_time = int((time.time() - start_time) * 1000) | |
| return AnalysisResponse( | |
| request_id=request_id, | |
| input_type=input_type, | |
| status=decision_detail.action, | |
| risk_level=risk_detail.level, | |
| risk_score=risk_detail.score, | |
| categories=filter_detail.categories, | |
| confidence=filter_detail.max_score, | |
| decision=decision_detail, | |
| risk_detail=risk_detail, | |
| filter_detail=filter_detail, | |
| deep_analysis=deep_analysis, | |
| processing_time_ms=processing_time, | |
| trace_id=None, # TODO: capture from LangSmith | |
| cached=cached, | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # POST /analyze/text | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def analyze_text(request: TextAnalysisRequest): | |
| """ | |
| Full moderation pipeline for text content. | |
| Pipeline: Preprocess β RoBERTa Filter β Risk Score β [Deep Analysis] β Decision | |
| """ | |
| request_id = f"req_{uuid.uuid4().hex[:12]}" | |
| start_time = time.time() | |
| logger.info("analyze_text_started", request_id=request_id, text_length=len(request.text)) | |
| # Validate text length | |
| if len(request.text) > settings.max_text_length: | |
| raise HTTPException(status_code=400, detail=f"Text too long (max {settings.max_text_length} chars)") | |
| # Check cache | |
| cached_result = await redis_service.get_cached_result(request.text, "text") | |
| if cached_result: | |
| logger.info("analyze_text_cached", request_id=request_id) | |
| cached_result["request_id"] = request_id | |
| cached_result["cached"] = True | |
| cached_result["processing_time_ms"] = int((time.time() - start_time) * 1000) | |
| return AnalysisResponse(**cached_result) | |
| # Run pipeline | |
| try: | |
| workflow = get_workflow() | |
| initial_state: PipelineState = { | |
| "input_type": "text", | |
| "raw_content": request.text, | |
| "user_id": request.user_id, | |
| } | |
| result_state = await workflow.ainvoke(initial_state) | |
| # Check for pipeline errors | |
| if result_state.get("error"): | |
| raise HTTPException(status_code=500, detail=result_state["error"]) | |
| response = _build_response(result_state, "text", request_id, start_time) | |
| # Cache the result | |
| await redis_service.cache_result( | |
| request.text, "text", response.model_dump() | |
| ) | |
| # Log to MongoDB | |
| await _log_moderation(request_id, "text", request.user_id, response) | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("analyze_text_failed", request_id=request_id, error=str(e)) | |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # POST /analyze/image | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def analyze_image( | |
| file: UploadFile = File(..., description="Image file to analyze"), | |
| user_id: str | None = Form(None, description="Optional user ID"), | |
| source_app: str | None = Form(None, description="Source application"), | |
| ): | |
| """ | |
| Full moderation pipeline for image content. | |
| Pipeline: Preprocess β EfficientNet Filter β Risk Score β [CLIP + Gemini] β Decision | |
| """ | |
| request_id = f"req_{uuid.uuid4().hex[:12]}" | |
| start_time = time.time() | |
| # Validate file type | |
| if not file.content_type or not file.content_type.startswith("image/"): | |
| raise HTTPException(status_code=400, detail="File must be an image (JPEG, PNG, etc.)") | |
| logger.info("analyze_image_started", request_id=request_id, filename=file.filename) | |
| # Validate file size | |
| if file.size and file.size > settings.max_image_size: | |
| raise HTTPException(status_code=400, detail=f"Image too large (max {settings.max_image_size / 1024 / 1024}MB)") | |
| try: | |
| image_bytes = await file.read() | |
| workflow = get_workflow() | |
| initial_state: PipelineState = { | |
| "input_type": "image", | |
| "raw_content": image_bytes, | |
| "user_id": user_id, | |
| } | |
| result_state = await workflow.ainvoke(initial_state) | |
| if result_state.get("error"): | |
| raise HTTPException(status_code=500, detail=result_state["error"]) | |
| response = _build_response(result_state, "image", request_id, start_time) | |
| # Log to MongoDB | |
| await _log_moderation(request_id, "image", user_id, response) | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("analyze_image_failed", request_id=request_id, error=str(e)) | |
| raise HTTPException(status_code=500, detail=f"Image analysis failed: {str(e)}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # POST /analyze/video | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def analyze_video( | |
| file: UploadFile = File(..., description="Video file to analyze"), | |
| user_id: str | None = Form(None, description="Optional user ID"), | |
| source_app: str | None = Form(None, description="Source application"), | |
| ): | |
| """ | |
| Full moderation pipeline for video content. | |
| Pipeline: Extract frames β Per-frame EfficientNet β Aggregate risk β [Deep Analysis] β Decision | |
| """ | |
| request_id = f"req_{uuid.uuid4().hex[:12]}" | |
| start_time = time.time() | |
| # Validate file type | |
| if not file.content_type or not file.content_type.startswith("video/"): | |
| raise HTTPException(status_code=400, detail="File must be a video (MP4, AVI, etc.)") | |
| logger.info("analyze_video_started", request_id=request_id, filename=file.filename) | |
| # Validate file size | |
| if file.size and file.size > settings.max_video_size: | |
| raise HTTPException(status_code=400, detail=f"Video too large (max {settings.max_video_size / 1024 / 1024}MB)") | |
| try: | |
| video_bytes = await file.read() | |
| workflow = get_workflow() | |
| initial_state: PipelineState = { | |
| "input_type": "video", | |
| "raw_content": video_bytes, | |
| "user_id": user_id, | |
| } | |
| result_state = await workflow.ainvoke(initial_state) | |
| if result_state.get("error"): | |
| raise HTTPException(status_code=500, detail=result_state["error"]) | |
| response = _build_response(result_state, "video", request_id, start_time) | |
| # Log to MongoDB | |
| await _log_moderation(request_id, "video", user_id, response) | |
| return response | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("analyze_video_failed", request_id=request_id, error=str(e)) | |
| raise HTTPException(status_code=500, detail=f"Video analysis failed: {str(e)}") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helper: Log moderation result to MongoDB | |
| # ββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _log_moderation( | |
| request_id: str, | |
| input_type: str, | |
| user_id: str | None, | |
| response: AnalysisResponse, | |
| ) -> None: | |
| """Log the moderation result and update user history.""" | |
| try: | |
| log_entry = { | |
| "request_id": request_id, | |
| "input_type": input_type, | |
| "user_id": user_id, | |
| "status": response.status, | |
| "risk_level": response.risk_level, | |
| "risk_score": response.risk_score, | |
| "categories": response.categories, | |
| "processing_time_ms": response.processing_time_ms, | |
| } | |
| await mongo_service.log_moderation(log_entry) | |
| # Update user history | |
| if user_id: | |
| await mongo_service.update_user_history(user_id, { | |
| "risk_level": response.risk_level, | |
| "categories": response.categories, | |
| }) | |
| # Invalidate cached history | |
| await redis_service.invalidate_user_history(user_id) | |
| except Exception as e: | |
| logger.warning("moderation_logging_failed", error=str(e)) | |