Spaces:
Sleeping
Sleeping
| # app/services/mongo_service.py | |
| # Async MongoDB client for moderation logs and user history | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from app.config import get_settings | |
| from app.observability.logging import get_logger | |
| logger = get_logger(__name__) | |
| class MongoService: | |
| """ | |
| Async MongoDB client using Motor. | |
| Collections: | |
| - moderation_logs: Full pipeline run traces | |
| - user_history: Aggregated user moderation stats | |
| """ | |
| def __init__(self): | |
| self.settings = get_settings() | |
| self.client = None | |
| self.db = None | |
| self._connected = False | |
| async def connect(self) -> None: | |
| """Establish MongoDB connection.""" | |
| try: | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| self.client = AsyncIOMotorClient( | |
| self.settings.mongodb_uri, | |
| serverSelectionTimeoutMS=5000, | |
| ) | |
| # Verify connection | |
| await self.client.server_info() | |
| self.db = self.client[self.settings.mongodb_db_name] | |
| self._connected = True | |
| logger.info("mongodb_connected", db=self.settings.mongodb_db_name) | |
| # Create indexes | |
| await self._create_indexes() | |
| except Exception as e: | |
| logger.warning("mongodb_connection_failed", error=str(e)) | |
| self._connected = False | |
| async def _create_indexes(self) -> None: | |
| """Create necessary indexes for performance.""" | |
| try: | |
| # moderation_logs indexes | |
| logs = self.db["moderation_logs"] | |
| await logs.create_index("user_id") | |
| await logs.create_index("created_at") | |
| await logs.create_index([("user_id", 1), ("created_at", -1)]) | |
| # user_history indexes | |
| history = self.db["user_history"] | |
| await history.create_index("user_id", unique=True) | |
| logger.info("mongodb_indexes_created") | |
| except Exception as e: | |
| logger.warning("mongodb_index_creation_failed", error=str(e)) | |
| async def disconnect(self) -> None: | |
| """Close MongoDB connection.""" | |
| if self.client: | |
| self.client.close() | |
| self._connected = False | |
| logger.info("mongodb_disconnected") | |
| # ---- Moderation Logs ---- | |
| async def log_moderation(self, log_entry: dict) -> str | None: | |
| """ | |
| Store a complete moderation pipeline run. | |
| Args: | |
| log_entry: Full pipeline result including input, scores, decisions. | |
| Returns: | |
| Inserted document ID or None. | |
| """ | |
| if not self._connected: | |
| return None | |
| try: | |
| log_entry["created_at"] = datetime.now(timezone.utc) | |
| result = await self.db["moderation_logs"].insert_one(log_entry) | |
| logger.debug("moderation_logged", doc_id=str(result.inserted_id)) | |
| return str(result.inserted_id) | |
| except Exception as e: | |
| logger.error("moderation_log_failed", error=str(e)) | |
| return None | |
| async def get_moderation_history( | |
| self, | |
| user_id: str, | |
| limit: int = 20, | |
| skip: int = 0, | |
| ) -> list[dict]: | |
| """Get moderation history for a user.""" | |
| if not self._connected: | |
| return [] | |
| try: | |
| cursor = ( | |
| self.db["moderation_logs"] | |
| .find({"user_id": user_id}) | |
| .sort("created_at", -1) | |
| .skip(skip) | |
| .limit(limit) | |
| ) | |
| results = [] | |
| async for doc in cursor: | |
| doc["_id"] = str(doc["_id"]) | |
| results.append(doc) | |
| return results | |
| except Exception as e: | |
| logger.error("history_fetch_failed", error=str(e)) | |
| return [] | |
| # ---- User History (aggregated stats) ---- | |
| async def get_user_history(self, user_id: str) -> dict | None: | |
| """Get aggregated moderation stats for a user.""" | |
| if not self._connected: | |
| return None | |
| try: | |
| doc = await self.db["user_history"].find_one({"user_id": user_id}) | |
| if doc: | |
| doc["_id"] = str(doc["_id"]) | |
| return doc | |
| except Exception as e: | |
| logger.error("user_history_fetch_failed", error=str(e)) | |
| return None | |
| async def update_user_history(self, user_id: str, moderation_result: dict) -> None: | |
| """ | |
| Update aggregated user history after a moderation event. | |
| Tracks: | |
| - total_scans, total_violations, total_warnings | |
| - violation_categories (count per category) | |
| - last_violation_at | |
| - risk_trend (recent violation rate) | |
| """ | |
| if not self._connected: | |
| return | |
| try: | |
| risk_level = moderation_result.get("risk_level", "LOW") | |
| categories = moderation_result.get("categories", []) | |
| update_ops: dict[str, Any] = { | |
| "$inc": {"total_scans": 1}, | |
| "$set": {"last_scan_at": datetime.now(timezone.utc)}, | |
| "$setOnInsert": {"user_id": user_id, "created_at": datetime.now(timezone.utc)}, | |
| } | |
| if risk_level == "HIGH": | |
| update_ops["$inc"]["total_violations"] = 1 | |
| update_ops["$set"]["last_violation_at"] = datetime.now(timezone.utc) | |
| for cat in categories: | |
| update_ops["$inc"][f"violation_categories.{cat}"] = 1 | |
| elif risk_level == "MEDIUM": | |
| update_ops["$inc"]["total_warnings"] = 1 | |
| await self.db["user_history"].update_one( | |
| {"user_id": user_id}, | |
| update_ops, | |
| upsert=True, | |
| ) | |
| except Exception as e: | |
| logger.error("user_history_update_failed", error=str(e)) | |
| def is_connected(self) -> bool: | |
| return self._connected | |
| # Global singleton | |
| mongo_service = MongoService() | |