import json import re from typing import Optional, Tuple from utils.helpers import generate_id QUESTION_QUEUE_SUFFIX = "question_queue" QUESTION_BACKLOG_SUFFIX = "question_backlog" CONTEXT_CACHE_SUFFIX = "context_cache" ASKED_SET_SUFFIX = "asked_questions_set" QUESTION_PREFIX_RE = re.compile( r"^\s*(?:question|q)\s*#?\s*\d+(?:\s*of\s*\d+)?\s*[\:\-\)\.]\s*", re.IGNORECASE, ) def _key(session_id: str, suffix: str) -> str: return f"session:{session_id}:{suffix}" def normalize_question_text(text: str) -> str: value = (text or "").strip() if not value: return "" while True: updated = QUESTION_PREFIX_RE.sub("", value).strip() if updated == value: break value = updated return value def question_fingerprint(text: str) -> str: value = normalize_question_text(text).lower() value = re.sub(r"[^a-z0-9\s]", " ", value) value = re.sub(r"\s+", " ", value).strip() return value async def mark_question_asked(redis, session_id: str, question_text: str, ttl_seconds: int) -> None: fp = question_fingerprint(question_text) if not fp: return key = _key(session_id, ASKED_SET_SUFFIX) await redis.sadd(key, fp) await redis.expire(key, ttl_seconds) async def is_question_asked(redis, session_id: str, question_text: str) -> bool: fp = question_fingerprint(question_text) if not fp: return False key = _key(session_id, ASKED_SET_SUFFIX) return bool(await redis.sismember(key, fp)) async def _has_in_list(redis, session_id: str, list_key: str, question_text: str) -> bool: wanted = question_fingerprint(question_text) if not wanted: return False ids = await redis.lrange(list_key, 0, -1) for qid in ids: q = await redis.hgetall(f"session:{session_id}:q:{qid}") if question_fingerprint(q.get("question", "")) == wanted: return True return False async def _append_question_object( redis, session_id: str, question: str, difficulty: str, category: str, ttl_seconds: int, ) -> str: normalized_question = normalize_question_text(question) qid = generate_id() q_key = f"session:{session_id}:q:{qid}" await redis.hset( q_key, mapping={ "question_id": qid, "question": normalized_question, "difficulty": difficulty or "medium", "category": category or "general", }, ) await redis.expire(q_key, ttl_seconds) questions_key = f"session:{session_id}:questions" await redis.rpush(questions_key, qid) await redis.expire(questions_key, ttl_seconds) return qid async def enqueue_question( redis, session_id: str, question: str, difficulty: str = "medium", category: str = "general", ttl_seconds: int = 7200, max_queue_size: int = 3, ) -> Optional[str]: text = normalize_question_text(question) if not text: return None queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX) if await is_question_asked(redis, session_id, text): return None if await _has_in_list(redis, session_id, queue_key, text): return None if await _has_in_list(redis, session_id, backlog_key, text): return None q_len = await redis.llen(queue_key) qid = await _append_question_object( redis=redis, session_id=session_id, question=text, difficulty=difficulty, category=category, ttl_seconds=ttl_seconds, ) if q_len < max_queue_size: await redis.rpush(queue_key, qid) await redis.expire(queue_key, ttl_seconds) return qid await redis.rpush(backlog_key, qid) await redis.expire(backlog_key, ttl_seconds) return qid async def flush_backlog_to_queue( redis, session_id: str, ttl_seconds: int = 7200, max_queue_size: int = 3, ) -> None: queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX) while await redis.llen(queue_key) < max_queue_size: qid = await redis.lpop(backlog_key) if not qid: break await redis.rpush(queue_key, qid) await redis.expire(queue_key, ttl_seconds) await redis.expire(backlog_key, ttl_seconds) async def queue_size(redis, session_id: str) -> int: return int(await redis.llen(_key(session_id, QUESTION_QUEUE_SUFFIX))) async def pop_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]: queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) qid = await redis.lpop(queue_key) if not qid: return None, None q = await redis.hgetall(f"session:{session_id}:q:{qid}") return qid, q async def peek_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]: queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) qid = await redis.lindex(queue_key, 0) if not qid: return None, None q = await redis.hgetall(f"session:{session_id}:q:{qid}") return qid, q async def push_context_item( redis, session_id: str, item: dict, ttl_seconds: int = 7200, max_items: int = 3, ) -> None: key = _key(session_id, CONTEXT_CACHE_SUFFIX) await redis.lpush(key, json.dumps(item, ensure_ascii=True)) await redis.ltrim(key, 0, max(0, max_items - 1)) await redis.expire(key, ttl_seconds) async def get_recent_context_items(redis, session_id: str, max_items: int = 3) -> list[dict]: key = _key(session_id, CONTEXT_CACHE_SUFFIX) raw_items = await redis.lrange(key, 0, max(0, max_items - 1)) parsed: list[dict] = [] for raw in raw_items: try: parsed.append(json.loads(raw)) except Exception: continue # Convert newest-first storage into chronological order for prompting. parsed.reverse() return parsed