Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| from typing import Optional, Tuple | |
| from utils.helpers import generate_id | |
| QUESTION_QUEUE_SUFFIX = "question_queue" | |
| QUESTION_BACKLOG_SUFFIX = "question_backlog" | |
| CONTEXT_CACHE_SUFFIX = "context_cache" | |
| ASKED_SET_SUFFIX = "asked_questions_set" | |
| QUESTION_PREFIX_RE = re.compile( | |
| r"^\s*(?:question|q)\s*#?\s*\d+(?:\s*of\s*\d+)?\s*[\:\-\)\.]\s*", | |
| re.IGNORECASE, | |
| ) | |
| def _key(session_id: str, suffix: str) -> str: | |
| return f"session:{session_id}:{suffix}" | |
| def normalize_question_text(text: str) -> str: | |
| value = (text or "").strip() | |
| if not value: | |
| return "" | |
| while True: | |
| updated = QUESTION_PREFIX_RE.sub("", value).strip() | |
| if updated == value: | |
| break | |
| value = updated | |
| return value | |
| def question_fingerprint(text: str) -> str: | |
| value = normalize_question_text(text).lower() | |
| value = re.sub(r"[^a-z0-9\s]", " ", value) | |
| value = re.sub(r"\s+", " ", value).strip() | |
| return value | |
| async def mark_question_asked(redis, session_id: str, question_text: str, ttl_seconds: int) -> None: | |
| fp = question_fingerprint(question_text) | |
| if not fp: | |
| return | |
| key = _key(session_id, ASKED_SET_SUFFIX) | |
| await redis.sadd(key, fp) | |
| await redis.expire(key, ttl_seconds) | |
| async def is_question_asked(redis, session_id: str, question_text: str) -> bool: | |
| fp = question_fingerprint(question_text) | |
| if not fp: | |
| return False | |
| key = _key(session_id, ASKED_SET_SUFFIX) | |
| return bool(await redis.sismember(key, fp)) | |
| async def _has_in_list(redis, session_id: str, list_key: str, question_text: str) -> bool: | |
| wanted = question_fingerprint(question_text) | |
| if not wanted: | |
| return False | |
| ids = await redis.lrange(list_key, 0, -1) | |
| for qid in ids: | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| if question_fingerprint(q.get("question", "")) == wanted: | |
| return True | |
| return False | |
| async def _append_question_object( | |
| redis, | |
| session_id: str, | |
| question: str, | |
| difficulty: str, | |
| category: str, | |
| ttl_seconds: int, | |
| ) -> str: | |
| normalized_question = normalize_question_text(question) | |
| qid = generate_id() | |
| q_key = f"session:{session_id}:q:{qid}" | |
| await redis.hset( | |
| q_key, | |
| mapping={ | |
| "question_id": qid, | |
| "question": normalized_question, | |
| "difficulty": difficulty or "medium", | |
| "category": category or "general", | |
| }, | |
| ) | |
| await redis.expire(q_key, ttl_seconds) | |
| questions_key = f"session:{session_id}:questions" | |
| await redis.rpush(questions_key, qid) | |
| await redis.expire(questions_key, ttl_seconds) | |
| return qid | |
| async def enqueue_question( | |
| redis, | |
| session_id: str, | |
| question: str, | |
| difficulty: str = "medium", | |
| category: str = "general", | |
| ttl_seconds: int = 7200, | |
| max_queue_size: int = 3, | |
| ) -> Optional[str]: | |
| text = normalize_question_text(question) | |
| if not text: | |
| return None | |
| queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) | |
| backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX) | |
| if await is_question_asked(redis, session_id, text): | |
| return None | |
| if await _has_in_list(redis, session_id, queue_key, text): | |
| return None | |
| if await _has_in_list(redis, session_id, backlog_key, text): | |
| return None | |
| q_len = await redis.llen(queue_key) | |
| qid = await _append_question_object( | |
| redis=redis, | |
| session_id=session_id, | |
| question=text, | |
| difficulty=difficulty, | |
| category=category, | |
| ttl_seconds=ttl_seconds, | |
| ) | |
| if q_len < max_queue_size: | |
| await redis.rpush(queue_key, qid) | |
| await redis.expire(queue_key, ttl_seconds) | |
| return qid | |
| await redis.rpush(backlog_key, qid) | |
| await redis.expire(backlog_key, ttl_seconds) | |
| return qid | |
| async def flush_backlog_to_queue( | |
| redis, | |
| session_id: str, | |
| ttl_seconds: int = 7200, | |
| max_queue_size: int = 3, | |
| ) -> None: | |
| queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) | |
| backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX) | |
| while await redis.llen(queue_key) < max_queue_size: | |
| qid = await redis.lpop(backlog_key) | |
| if not qid: | |
| break | |
| await redis.rpush(queue_key, qid) | |
| await redis.expire(queue_key, ttl_seconds) | |
| await redis.expire(backlog_key, ttl_seconds) | |
| async def queue_size(redis, session_id: str) -> int: | |
| return int(await redis.llen(_key(session_id, QUESTION_QUEUE_SUFFIX))) | |
| async def pop_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]: | |
| queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) | |
| qid = await redis.lpop(queue_key) | |
| if not qid: | |
| return None, None | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| return qid, q | |
| async def peek_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]: | |
| queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX) | |
| qid = await redis.lindex(queue_key, 0) | |
| if not qid: | |
| return None, None | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| return qid, q | |
| async def push_context_item( | |
| redis, | |
| session_id: str, | |
| item: dict, | |
| ttl_seconds: int = 7200, | |
| max_items: int = 3, | |
| ) -> None: | |
| key = _key(session_id, CONTEXT_CACHE_SUFFIX) | |
| await redis.lpush(key, json.dumps(item, ensure_ascii=True)) | |
| await redis.ltrim(key, 0, max(0, max_items - 1)) | |
| await redis.expire(key, ttl_seconds) | |
| async def get_recent_context_items(redis, session_id: str, max_items: int = 3) -> list[dict]: | |
| key = _key(session_id, CONTEXT_CACHE_SUFFIX) | |
| raw_items = await redis.lrange(key, 0, max(0, max_items - 1)) | |
| parsed: list[dict] = [] | |
| for raw in raw_items: | |
| try: | |
| parsed.append(json.loads(raw)) | |
| except Exception: | |
| continue | |
| # Convert newest-first storage into chronological order for prompting. | |
| parsed.reverse() | |
| return parsed | |