Spaces:
Sleeping
Sleeping
File size: 5,976 Bytes
5837391 e39cad1 5837391 e39cad1 5837391 e39cad1 5837391 e39cad1 5837391 e39cad1 5837391 e39cad1 5837391 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | import json
import re
from typing import Optional, Tuple
from utils.helpers import generate_id
QUESTION_QUEUE_SUFFIX = "question_queue"
QUESTION_BACKLOG_SUFFIX = "question_backlog"
CONTEXT_CACHE_SUFFIX = "context_cache"
ASKED_SET_SUFFIX = "asked_questions_set"
QUESTION_PREFIX_RE = re.compile(
r"^\s*(?:question|q)\s*#?\s*\d+(?:\s*of\s*\d+)?\s*[\:\-\)\.]\s*",
re.IGNORECASE,
)
def _key(session_id: str, suffix: str) -> str:
return f"session:{session_id}:{suffix}"
def normalize_question_text(text: str) -> str:
value = (text or "").strip()
if not value:
return ""
while True:
updated = QUESTION_PREFIX_RE.sub("", value).strip()
if updated == value:
break
value = updated
return value
def question_fingerprint(text: str) -> str:
value = normalize_question_text(text).lower()
value = re.sub(r"[^a-z0-9\s]", " ", value)
value = re.sub(r"\s+", " ", value).strip()
return value
async def mark_question_asked(redis, session_id: str, question_text: str, ttl_seconds: int) -> None:
fp = question_fingerprint(question_text)
if not fp:
return
key = _key(session_id, ASKED_SET_SUFFIX)
await redis.sadd(key, fp)
await redis.expire(key, ttl_seconds)
async def is_question_asked(redis, session_id: str, question_text: str) -> bool:
fp = question_fingerprint(question_text)
if not fp:
return False
key = _key(session_id, ASKED_SET_SUFFIX)
return bool(await redis.sismember(key, fp))
async def _has_in_list(redis, session_id: str, list_key: str, question_text: str) -> bool:
wanted = question_fingerprint(question_text)
if not wanted:
return False
ids = await redis.lrange(list_key, 0, -1)
for qid in ids:
q = await redis.hgetall(f"session:{session_id}:q:{qid}")
if question_fingerprint(q.get("question", "")) == wanted:
return True
return False
async def _append_question_object(
redis,
session_id: str,
question: str,
difficulty: str,
category: str,
ttl_seconds: int,
) -> str:
normalized_question = normalize_question_text(question)
qid = generate_id()
q_key = f"session:{session_id}:q:{qid}"
await redis.hset(
q_key,
mapping={
"question_id": qid,
"question": normalized_question,
"difficulty": difficulty or "medium",
"category": category or "general",
},
)
await redis.expire(q_key, ttl_seconds)
questions_key = f"session:{session_id}:questions"
await redis.rpush(questions_key, qid)
await redis.expire(questions_key, ttl_seconds)
return qid
async def enqueue_question(
redis,
session_id: str,
question: str,
difficulty: str = "medium",
category: str = "general",
ttl_seconds: int = 7200,
max_queue_size: int = 3,
) -> Optional[str]:
text = normalize_question_text(question)
if not text:
return None
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX)
if await is_question_asked(redis, session_id, text):
return None
if await _has_in_list(redis, session_id, queue_key, text):
return None
if await _has_in_list(redis, session_id, backlog_key, text):
return None
q_len = await redis.llen(queue_key)
qid = await _append_question_object(
redis=redis,
session_id=session_id,
question=text,
difficulty=difficulty,
category=category,
ttl_seconds=ttl_seconds,
)
if q_len < max_queue_size:
await redis.rpush(queue_key, qid)
await redis.expire(queue_key, ttl_seconds)
return qid
await redis.rpush(backlog_key, qid)
await redis.expire(backlog_key, ttl_seconds)
return qid
async def flush_backlog_to_queue(
redis,
session_id: str,
ttl_seconds: int = 7200,
max_queue_size: int = 3,
) -> None:
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
backlog_key = _key(session_id, QUESTION_BACKLOG_SUFFIX)
while await redis.llen(queue_key) < max_queue_size:
qid = await redis.lpop(backlog_key)
if not qid:
break
await redis.rpush(queue_key, qid)
await redis.expire(queue_key, ttl_seconds)
await redis.expire(backlog_key, ttl_seconds)
async def queue_size(redis, session_id: str) -> int:
return int(await redis.llen(_key(session_id, QUESTION_QUEUE_SUFFIX)))
async def pop_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]:
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
qid = await redis.lpop(queue_key)
if not qid:
return None, None
q = await redis.hgetall(f"session:{session_id}:q:{qid}")
return qid, q
async def peek_next_question(redis, session_id: str) -> Tuple[Optional[str], Optional[dict]]:
queue_key = _key(session_id, QUESTION_QUEUE_SUFFIX)
qid = await redis.lindex(queue_key, 0)
if not qid:
return None, None
q = await redis.hgetall(f"session:{session_id}:q:{qid}")
return qid, q
async def push_context_item(
redis,
session_id: str,
item: dict,
ttl_seconds: int = 7200,
max_items: int = 3,
) -> None:
key = _key(session_id, CONTEXT_CACHE_SUFFIX)
await redis.lpush(key, json.dumps(item, ensure_ascii=True))
await redis.ltrim(key, 0, max(0, max_items - 1))
await redis.expire(key, ttl_seconds)
async def get_recent_context_items(redis, session_id: str, max_items: int = 3) -> list[dict]:
key = _key(session_id, CONTEXT_CACHE_SUFFIX)
raw_items = await redis.lrange(key, 0, max(0, max_items - 1))
parsed: list[dict] = []
for raw in raw_items:
try:
parsed.append(json.loads(raw))
except Exception:
continue
# Convert newest-first storage into chronological order for prompting.
parsed.reverse()
return parsed
|