SentinelAI / app /services /mongo_service.py
sajith-0701's picture
initial deployment for HF Spaces
71c1ad2
# app/services/mongo_service.py
# Async MongoDB client for moderation logs and user history
from datetime import datetime, timezone
from typing import Any
from app.config import get_settings
from app.observability.logging import get_logger
logger = get_logger(__name__)
class MongoService:
"""
Async MongoDB client using Motor.
Collections:
- moderation_logs: Full pipeline run traces
- user_history: Aggregated user moderation stats
"""
def __init__(self):
self.settings = get_settings()
self.client = None
self.db = None
self._connected = False
async def connect(self) -> None:
"""Establish MongoDB connection."""
try:
from motor.motor_asyncio import AsyncIOMotorClient
self.client = AsyncIOMotorClient(
self.settings.mongodb_uri,
serverSelectionTimeoutMS=5000,
)
# Verify connection
await self.client.server_info()
self.db = self.client[self.settings.mongodb_db_name]
self._connected = True
logger.info("mongodb_connected", db=self.settings.mongodb_db_name)
# Create indexes
await self._create_indexes()
except Exception as e:
logger.warning("mongodb_connection_failed", error=str(e))
self._connected = False
async def _create_indexes(self) -> None:
"""Create necessary indexes for performance."""
try:
# moderation_logs indexes
logs = self.db["moderation_logs"]
await logs.create_index("user_id")
await logs.create_index("created_at")
await logs.create_index([("user_id", 1), ("created_at", -1)])
# user_history indexes
history = self.db["user_history"]
await history.create_index("user_id", unique=True)
logger.info("mongodb_indexes_created")
except Exception as e:
logger.warning("mongodb_index_creation_failed", error=str(e))
async def disconnect(self) -> None:
"""Close MongoDB connection."""
if self.client:
self.client.close()
self._connected = False
logger.info("mongodb_disconnected")
# ---- Moderation Logs ----
async def log_moderation(self, log_entry: dict) -> str | None:
"""
Store a complete moderation pipeline run.
Args:
log_entry: Full pipeline result including input, scores, decisions.
Returns:
Inserted document ID or None.
"""
if not self._connected:
return None
try:
log_entry["created_at"] = datetime.now(timezone.utc)
result = await self.db["moderation_logs"].insert_one(log_entry)
logger.debug("moderation_logged", doc_id=str(result.inserted_id))
return str(result.inserted_id)
except Exception as e:
logger.error("moderation_log_failed", error=str(e))
return None
async def get_moderation_history(
self,
user_id: str,
limit: int = 20,
skip: int = 0,
) -> list[dict]:
"""Get moderation history for a user."""
if not self._connected:
return []
try:
cursor = (
self.db["moderation_logs"]
.find({"user_id": user_id})
.sort("created_at", -1)
.skip(skip)
.limit(limit)
)
results = []
async for doc in cursor:
doc["_id"] = str(doc["_id"])
results.append(doc)
return results
except Exception as e:
logger.error("history_fetch_failed", error=str(e))
return []
# ---- User History (aggregated stats) ----
async def get_user_history(self, user_id: str) -> dict | None:
"""Get aggregated moderation stats for a user."""
if not self._connected:
return None
try:
doc = await self.db["user_history"].find_one({"user_id": user_id})
if doc:
doc["_id"] = str(doc["_id"])
return doc
except Exception as e:
logger.error("user_history_fetch_failed", error=str(e))
return None
async def update_user_history(self, user_id: str, moderation_result: dict) -> None:
"""
Update aggregated user history after a moderation event.
Tracks:
- total_scans, total_violations, total_warnings
- violation_categories (count per category)
- last_violation_at
- risk_trend (recent violation rate)
"""
if not self._connected:
return
try:
risk_level = moderation_result.get("risk_level", "LOW")
categories = moderation_result.get("categories", [])
update_ops: dict[str, Any] = {
"$inc": {"total_scans": 1},
"$set": {"last_scan_at": datetime.now(timezone.utc)},
"$setOnInsert": {"user_id": user_id, "created_at": datetime.now(timezone.utc)},
}
if risk_level == "HIGH":
update_ops["$inc"]["total_violations"] = 1
update_ops["$set"]["last_violation_at"] = datetime.now(timezone.utc)
for cat in categories:
update_ops["$inc"][f"violation_categories.{cat}"] = 1
elif risk_level == "MEDIUM":
update_ops["$inc"]["total_warnings"] = 1
await self.db["user_history"].update_one(
{"user_id": user_id},
update_ops,
upsert=True,
)
except Exception as e:
logger.error("user_history_update_failed", error=str(e))
@property
def is_connected(self) -> bool:
return self._connected
# Global singleton
mongo_service = MongoService()