interviewbot / backend /services /queue_service.py
sajith-0701's picture
v4.1
e39cad1
import json
import re
from typing import Optional, Tuple
from utils.helpers import generate_id
QUESTION_QUEUE_SUFFIX = "question_queue"
QUESTION_BACKLOG_SUFFIX = "question_backlog"
CONTEXT_CACHE_SUFFIX = "context_cache"
ASKED_SET_SUFFIX = "asked_questions_set"
QUESTION_PREFIX_RE = re.compile(
r"^\s*(?:question|q)\s*#?\s*\d+(?:\s*of\s*\d+)?\s*[\:\-\)\.]\s*",
re.IGNORECASE,
)
def _key(session_id: str, suffix: str) -> str:
return f"session:{session_id}:{suffix}"
def normalize_question_text(text: str) -> str:
value = (text or "").strip()
if not value:
return ""
while True:
updated = QUESTION_PREFIX_RE.sub("", value).strip()
if updated == value:
break
value = updated
return value
def question_fingerprint(text: str) -> str:
value = normalize_question_text(text).lower()
value = re.sub(r"[^a-z0-9\s]", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
async def mark_question_asked(redis, session_id: str, question_text: str, ttl_seconds: int) -> None:
fp = question_fingerprint(question_text)
if not fp:
return
key = _key(session_id, ASKED_SET_SUFFIX)
await redis.sadd(key, fp)
await redis.expire(key, ttl_seconds)
async def is_question_asked(redis, session_id: str, question_text: str) -> bool:
fp = question_fingerprint(question_text)
if not fp:
return False
key = _key(session_id, ASKED_SET_SUFFIX)
return bool(await redis.sismember(key, fp))
async def _has_in_list(redis, session_id: str, list_key: str, question_text: str) -> bool:
wanted = question_fingerprint(question_text)
if not wanted:
return False
ids = await redis.lrange(list_key, 0, -1)
for qid in ids:
q = await redis.hgetall(f"session:{session_id}:q:{qid}")
if question_fingerprint(q.get("question", "")) == wanted:
return True
return False
async def _append_question_object(
redis,
session_id: str,
question: str,
difficulty: str,
category: str,
ttl_seconds: int,
) -> str:
normalized_question = normalize_question_text(question)
qid = generate_id()
q_key = f"session:{session_id}:q:{qid}"
await redis.hset(
q_key,
mapping={
"question_id": qid,
"question": normalized_question,
"difficulty": difficulty or "medium",
"category": category or "general",
},
)
await redis.expire(q_key, ttl_seconds)
questions_key = f"session:{session_id}:questions"
await redis.rpush(questions_key, qid)
await redis.expire(questions_key, ttl_seconds)
return qid
async def enqueue_question(
redis,
session_id: str,
question: str,
difficulty: str = "medium",
category: str = "general",
ttl_seconds: int = 7200,
max_queue_size: int = 3,
) -> Optional[str]:
text = normalize_question_text(question)
if not text:
return None
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX)
if await is_question_asked(redis, session_id, text):
return None
if await _has_in_list(redis, session_id, queue_key, text):
return None
if await _has_in_list(redis, session_id, backlog_key, text):
return None
q_len = await redis.llen(queue_key)
qid = await _append_question_object(
redis=redis,
session_id=session_id,
question=text,
difficulty=difficulty,
category=category,
ttl_seconds=ttl_seconds,
)
if q_len < max_queue_size:
await redis.rpush(queue_key, qid)
await redis.expire(queue_key, ttl_seconds)
return qid
await redis.rpush(backlog_key, qid)
await redis.expire(backlog_key, ttl_seconds)
return qid
async def flush_backlog_to_queue(
redis,
session_id: str,
ttl_seconds: int = 7200,
max_queue_size: int = 3,
) -> None:
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX)
while await redis.llen(queue_key) < max_queue_size:
qid = await redis.lpop(backlog_key)
if not qid:
break
await redis.rpush(queue_key, qid)
await redis.expire(queue_key, ttl_seconds)
await redis.expire(backlog_key, ttl_seconds)
async def queue_size(redis, session_id: str) -> int:
return int(await redis.llen(_key(session_id, QUESTION_QUEUE_SUFFIX)))
async def pop_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]:
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
qid = await redis.lpop(queue_key)
if not qid:
return None, None
q = await redis.hgetall(f"session:{session_id}:q:{qid}")
return qid, q
async def peek_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]:
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
qid = await redis.lindex(queue_key, 0)
if not qid:
return None, None
q = await redis.hgetall(f"session:{session_id}:q:{qid}")
return qid, q
async def push_context_item(
redis,
session_id: str,
item: dict,
ttl_seconds: int = 7200,
max_items: int = 3,
) -> None:
key = _key(session_id, CONTEXT_CACHE_SUFFIX)
await redis.lpush(key, json.dumps(item, ensure_ascii=True))
await redis.ltrim(key, 0, max(0, max_items - 1))
await redis.expire(key, ttl_seconds)
async def get_recent_context_items(redis, session_id: str, max_items: int = 3) -> list[dict]:
key = _key(session_id, CONTEXT_CACHE_SUFFIX)
raw_items = await redis.lrange(key, 0, max(0, max_items - 1))
parsed: list[dict] = []
for raw in raw_items:
try:
parsed.append(json.loads(raw))
except Exception:
continue
# Convert newest-first storage into chronological order for prompting.
parsed.reverse()
return parsed