File size: 4,540 Bytes
71c1ad2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# app/services/redis_service.py
# Async Redis client for caching analysis results and user history

import json
import hashlib
from typing import Any
from app.config import get_settings
from app.observability.logging import get_logger

logger = get_logger(__name__)


class RedisService:
    """
    Async Redis client for caching.

    Caches:
    - Analysis results by content hash (avoid re-processing identical content)
    - User history summaries (avoid repeated MongoDB queries)
    """

    def __init__(self):
        self.settings = get_settings()
        self.client = None
        self._connected = False

    async def connect(self) -> None:
        """Establish Redis connection."""
        try:
            import redis.asyncio as aioredis

            self.client = aioredis.from_url(
                self.settings.redis_url,
                decode_responses=True,
                socket_connect_timeout=5,
            )
            # Test connection
            await self.client.ping()
            self._connected = True
            logger.info("redis_connected", url=self.settings.redis_url)
        except Exception as e:
            logger.warning("redis_connection_failed", error=str(e))
            self._connected = False

    async def disconnect(self) -> None:
        """Close Redis connection."""
        if self.client:
            await self.client.close()
            self._connected = False
            logger.info("redis_disconnected")

    @staticmethod
    def _content_hash(content: str) -> str:
        """Generate a deterministic hash for content-based cache keys."""
        return hashlib.sha256(content.encode("utf-8")).hexdigest()[:16]

    async def get_cached_result(self, content: str, input_type: str) -> dict | None:
        """
        Look up a cached analysis result.

        Args:
            content: The raw content (text, or image hash).
            input_type: 'text', 'image', or 'video'.

        Returns:
            Cached result dict or None.
        """
        if not self._connected:
            return None

        try:
            key = f"hubble:result:{input_type}:{self._content_hash(content)}"
            cached = await self.client.get(key)
            if cached:
                logger.debug("cache_hit", key=key)
                return json.loads(cached)
            return None
        except Exception as e:
            logger.warning("cache_get_failed", error=str(e))
            return None

    async def cache_result(self, content: str, input_type: str, result: dict) -> None:
        """
        Cache an analysis result.

        Args:
            content: The raw content (text, or image hash).
            input_type: 'text', 'image', or 'video'.
            result: The analysis result dict.
        """
        if not self._connected:
            return

        try:
            key = f"hubble:result:{input_type}:{self._content_hash(content)}"
            await self.client.setex(
                key,
                self.settings.redis_cache_ttl,
                json.dumps(result, default=str),
            )
            logger.debug("cache_set", key=key, ttl=self.settings.redis_cache_ttl)
        except Exception as e:
            logger.warning("cache_set_failed", error=str(e))

    async def get_user_history(self, user_id: str) -> dict | None:
        """Get cached user moderation history summary."""
        if not self._connected:
            return None

        try:
            key = f"hubble:user_history:{user_id}"
            cached = await self.client.get(key)
            return json.loads(cached) if cached else None
        except Exception:
            return None

    async def cache_user_history(self, user_id: str, history: dict) -> None:
        """Cache user moderation history summary (10 minute TTL)."""
        if not self._connected:
            return

        try:
            key = f"hubble:user_history:{user_id}"
            await self.client.setex(key, 600, json.dumps(history, default=str))
        except Exception:
            pass

    async def invalidate_user_history(self, user_id: str) -> None:
        """Invalidate cached user history after a new moderation event."""
        if not self._connected:
            return

        try:
            key = f"hubble:user_history:{user_id}"
            await self.client.delete(key)
        except Exception:
            pass

    @property
    def is_connected(self) -> bool:
        return self._connected


# Global singleton
redis_service = RedisService()