Spaces:
Sleeping
Sleeping
| import json | |
| import asyncio | |
| import random | |
| import re | |
| from time import perf_counter | |
| from bson import ObjectId | |
| from database import get_db, get_redis | |
| from models.collections import SESSIONS, USERS, JOB_ROLES, SKILLS, QUESTIONS, TOPICS, TOPIC_QUESTIONS, RESUMES, JD_VERIFICATIONS, ANSWERS | |
| from utils.helpers import generate_id, utc_now, str_objectid | |
| from utils.skills import normalize_skill_list, build_interview_focus_skills | |
| from services.interview_graph import run_interview_graph | |
| from utils.gemini import generate_interview_question_batch, analyze_resume_vs_job_description | |
| from services.job_description_service import get_job_description_for_user | |
| from services.gemini_service import ( | |
| evaluate_and_generate_followup, | |
| generate_resume_seed_questions, | |
| generate_topic_followup_batch, | |
| ) | |
| from services.queue_service import ( | |
| enqueue_question, | |
| flush_backlog_to_queue, | |
| get_recent_context_items, | |
| mark_question_asked, | |
| normalize_question_text, | |
| peek_next_question, | |
| pop_next_question, | |
| push_context_item, | |
| queue_size, | |
| ) | |
| from services.tts_service import prefetch_wav | |
| from services.latency_service import record_latency | |
| MAX_QUESTIONS = 20 | |
| RESUME_MAX_QUESTIONS = 10 | |
| RESUME_INITIAL_BATCH_SIZE = 2 | |
| SESSION_TTL = 7200 # 2 hours | |
| BATCH_SIZE = 5 | |
| PREGEN_MIN_PENDING = 2 | |
| FOLLOWUP_AI_COUNT = 2 | |
| FOLLOWUP_BANK_COUNT = 3 | |
| MAX_QUEUE_SIZE = 3 | |
| CONTEXT_CACHE_ITEMS = 3 | |
| TOPIC_INITIAL_DB_QUESTIONS = 5 | |
| TOPIC_INITIAL_ASK_COUNT = 4 | |
| TOPIC_AI_FOLLOWUPS = 3 | |
| TOPIC_DB_FOLLOWUPS = 2 | |
| TOPIC_TOTAL_QUESTIONS = 10 | |
| MAX_SAME_TOPIC_FOLLOWUPS = 2 | |
| THIRD_FOLLOWUP_NEED_SCORE = 95 | |
| # Local process memory summary requested in workflow. | |
| _LOCAL_SUMMARIES: dict[str, str] = {} | |
| _PREGEN_IN_FLIGHT: set[str] = set() | |
| _POST_SUBMIT_LOCKS: dict[str, asyncio.Lock] = {} | |
| _QUESTION_STOPWORDS = { | |
| "a", "an", "and", "are", "as", "at", "be", "by", "for", "from", "how", "if", "in", "into", | |
| "is", "it", "of", "on", "or", "that", "the", "this", "to", "using", "what", "when", "with", "would", | |
| } | |
| _GENERIC_SOFT_SKILL_KEYS = { | |
| "problem solving", | |
| "analytical skills", | |
| "communication", | |
| "communication skills", | |
| "teamwork", | |
| "leadership", | |
| "adaptability", | |
| "time management", | |
| "critical thinking", | |
| } | |
| def _safe_json_list(value: str) -> list: | |
| try: | |
| data = json.loads(value or "[]") | |
| return data if isinstance(data, list) else [] | |
| except Exception: | |
| return [] | |
| def _question_fingerprint(text: str) -> str: | |
| base = (text or "").strip().lower() | |
| base = re.sub(r"[^a-z0-9\s]", " ", base) | |
| base = re.sub(r"\s+", " ", base).strip() | |
| return base | |
| def _question_token_set(text: str) -> set[str]: | |
| key = _question_fingerprint(text) | |
| tokens = [token for token in key.split() if token and token not in _QUESTION_STOPWORDS] | |
| return set(tokens) | |
| def _is_question_too_similar(candidate: str, recent_questions: list[str]) -> bool: | |
| candidate_key = _question_fingerprint(candidate) | |
| if not candidate_key: | |
| return True | |
| candidate_tokens = _question_token_set(candidate) | |
| candidate_opening = " ".join(candidate_key.split()[:6]) | |
| for text in (recent_questions or [])[-5:]: | |
| other_key = _question_fingerprint(text) | |
| if not other_key: | |
| continue | |
| if candidate_key == other_key: | |
| return True | |
| other_opening = " ".join(other_key.split()[:6]) | |
| if candidate_opening and candidate_opening == other_opening: | |
| return True | |
| other_tokens = _question_token_set(text) | |
| if not candidate_tokens or not other_tokens: | |
| continue | |
| intersection = len(candidate_tokens & other_tokens) | |
| union = len(candidate_tokens | other_tokens) | |
| if union <= 0: | |
| continue | |
| jaccard = intersection / union | |
| if jaccard >= 0.72: | |
| return True | |
| return False | |
| def _unique_question_items(items: list[dict], *, excluded_questions: list[str], limit: int) -> list[dict]: | |
| excluded = {_question_fingerprint(q) for q in excluded_questions if q} | |
| unique: list[dict] = [] | |
| for item in items or []: | |
| text = (item.get("question") or "").strip() | |
| if not text: | |
| continue | |
| key = _question_fingerprint(text) | |
| if not key or key in excluded: | |
| continue | |
| excluded.add(key) | |
| unique.append( | |
| { | |
| "question": text, | |
| "difficulty": item.get("difficulty", "medium"), | |
| "category": item.get("category", "general"), | |
| } | |
| ) | |
| if len(unique) >= limit: | |
| break | |
| return unique | |
| def _update_local_summary(session_id: str, question: str, answer: str) -> None: | |
| existing = _LOCAL_SUMMARIES.get(session_id, "") | |
| combined = f"{existing}\nQ: {question}\nA: {answer}".strip() | |
| # Keep summary bounded in memory. | |
| _LOCAL_SUMMARIES[session_id] = combined[-1500:] | |
| def _safe_int(value, default: int = 0) -> int: | |
| try: | |
| return int(value) | |
| except Exception: | |
| return default | |
| def _safe_score_0_100(value, default: int = 0) -> int: | |
| score = _safe_int(value, default) | |
| if score < 0: | |
| return 0 | |
| if score > 100: | |
| return 100 | |
| return score | |
| def _normalize_voice_gender(value: str | None) -> str: | |
| return "male" if (value or "").strip().lower() == "male" else "female" | |
| def _consume_prefetch_task_result(task: asyncio.Task) -> None: | |
| try: | |
| task.result() | |
| except Exception: | |
| # Prefetch is optional; ignore failures to avoid noisy task warnings. | |
| pass | |
| def _schedule_question_audio_prefetch(questions: list[str], voice_gender: str) -> None: | |
| for q in questions: | |
| text = (q or "").strip() | |
| if not text: | |
| continue | |
| try: | |
| task = asyncio.create_task(prefetch_wav(text, voice_gender)) | |
| task.add_done_callback(_consume_prefetch_task_result) | |
| except Exception: | |
| # Best-effort optimization only. | |
| pass | |
| def _get_post_submit_lock(session_id: str) -> asyncio.Lock: | |
| lock = _POST_SUBMIT_LOCKS.get(session_id) | |
| if lock is None: | |
| lock = asyncio.Lock() | |
| _POST_SUBMIT_LOCKS[session_id] = lock | |
| return lock | |
| def _consume_post_submit_task_result(task: asyncio.Task) -> None: | |
| try: | |
| task.result() | |
| except Exception: | |
| # Background processing is best-effort; ignore task-level failures. | |
| pass | |
| def _current_generation_stats(session: dict) -> dict: | |
| return { | |
| "gemini_calls": _safe_int(session.get("metrics_gemini_calls", 0)), | |
| "gemini_questions": _safe_int(session.get("metrics_gemini_questions", 0)), | |
| "bank_questions": _safe_int(session.get("metrics_bank_questions", 0)), | |
| "bank_shortfall": _safe_int(session.get("metrics_bank_shortfall", 0)), | |
| "generation_batches": _safe_int(session.get("metrics_generation_batches", 0)), | |
| } | |
| def _normalize_bank_difficulty(value: str) -> str: | |
| difficulty = (value or "medium").strip().lower() | |
| if difficulty not in {"easy", "medium", "hard"}: | |
| return "medium" | |
| if difficulty == "easy": | |
| return "medium" | |
| return difficulty | |
| def _resume_skill_pool(session: dict) -> list[str]: | |
| jd_skills = normalize_skill_list(_safe_json_list(session.get("jd_required_skills", "[]"))) | |
| focus_skills = normalize_skill_list(_safe_json_list(session.get("skills", "[]"))) | |
| ordered: list[str] = [] | |
| seen: set[str] = set() | |
| for skill in jd_skills + focus_skills: | |
| key = _question_fingerprint(skill) | |
| if not key or key in seen: | |
| continue | |
| seen.add(key) | |
| ordered.append(skill) | |
| concrete = [skill for skill in ordered if _question_fingerprint(skill) not in _GENERIC_SOFT_SKILL_KEYS] | |
| if len(concrete) >= 2: | |
| return concrete | |
| return ordered or ["core technical concepts"] | |
| def _infer_focus_skill_from_question(question_text: str, skill_pool: list[str]) -> str | None: | |
| normalized_question = _question_fingerprint(question_text) | |
| if not normalized_question: | |
| return None | |
| best_skill = None | |
| best_score = 0 | |
| for skill in skill_pool: | |
| normalized_skill = _question_fingerprint(skill) | |
| if not normalized_skill: | |
| continue | |
| tokens = [token for token in normalized_skill.split() if len(token) >= 3] | |
| if not tokens: | |
| tokens = normalized_skill.split() | |
| score = sum(1 for token in tokens if token and token in normalized_question) | |
| if normalized_skill in normalized_question: | |
| score = max(score, len(tokens) + 1) | |
| if score > best_score: | |
| best_score = score | |
| best_skill = skill | |
| return best_skill if best_score > 0 else None | |
| def _recent_focus_streak(question_texts: list[str], skill_pool: list[str]) -> tuple[str | None, int]: | |
| active_skill = None | |
| streak = 0 | |
| for text in reversed(question_texts): | |
| skill = _infer_focus_skill_from_question(text, skill_pool) | |
| if not skill: | |
| break | |
| if active_skill is None: | |
| active_skill = skill | |
| streak = 1 | |
| continue | |
| if _question_fingerprint(skill) == _question_fingerprint(active_skill): | |
| streak += 1 | |
| continue | |
| break | |
| return active_skill, streak | |
| def _pick_alternate_focus_skill(skill_pool: list[str], current_skill: str | None, seed: int) -> str | None: | |
| if not skill_pool: | |
| return None | |
| if current_skill: | |
| current_key = _question_fingerprint(current_skill) | |
| alternatives = [skill for skill in skill_pool if _question_fingerprint(skill) != current_key] | |
| if alternatives: | |
| return alternatives[max(0, seed) % len(alternatives)] | |
| return skill_pool[max(0, seed) % len(skill_pool)] | |
| def _apply_resume_followup_policy( | |
| *, | |
| skill_pool: list[str], | |
| recent_focus_topic: str | None, | |
| same_topic_streak: int, | |
| suggested_question: str, | |
| suggested_topic: str | None, | |
| followup_need_score: int, | |
| answered_count: int, | |
| ) -> tuple[str, str | None]: | |
| follow_text = (suggested_question or "").strip() | |
| topic = (suggested_topic or "").strip() | |
| if not topic and follow_text: | |
| inferred = _infer_focus_skill_from_question(follow_text, skill_pool) | |
| if inferred: | |
| topic = inferred | |
| topic_key = _question_fingerprint(topic) | |
| recent_key = _question_fingerprint(recent_focus_topic or "") | |
| if ( | |
| same_topic_streak >= MAX_SAME_TOPIC_FOLLOWUPS | |
| and topic_key | |
| and recent_key | |
| and topic_key == recent_key | |
| and _safe_score_0_100(followup_need_score) < THIRD_FOLLOWUP_NEED_SCORE | |
| ): | |
| return "", _pick_alternate_focus_skill(skill_pool, recent_focus_topic, answered_count) | |
| return follow_text, None | |
| def _avg_recent_answer_words(qa_pairs: list, window: int = 3) -> int: | |
| if not qa_pairs: | |
| return 0 | |
| recent = qa_pairs[-window:] | |
| lengths = [len((item.get("answer") or "").split()) for item in recent] | |
| if not lengths: | |
| return 0 | |
| return sum(lengths) // len(lengths) | |
| def _plan_followup_mix(target: int, qa_pairs: list, has_bank_source: bool) -> tuple[int, int]: | |
| """Decide AI-vs-bank split for the next batch. | |
| Baseline: 3 AI + 2 bank. Adaptation: | |
| - Short answers -> increase bank ratio for stability. | |
| - Rich answers -> increase AI follow-up ratio for personalization. | |
| """ | |
| if target <= 0: | |
| return 0, 0 | |
| if not has_bank_source: | |
| return target, 0 | |
| avg_words = _avg_recent_answer_words(qa_pairs) | |
| ai_target = min(FOLLOWUP_AI_COUNT, target) | |
| if avg_words < 18: | |
| ai_target = min(2, target) | |
| elif avg_words > 70: | |
| ai_target = min(4, target) | |
| # Keep at least one bank question when a bank source exists and batch size allows. | |
| if target > 1: | |
| ai_target = min(ai_target, target - 1) | |
| bank_target = target - ai_target | |
| return ai_target, bank_target | |
| async def _resolve_role_title(db, role_id: str | None, custom_role: str | None) -> str: | |
| if custom_role and custom_role.strip(): | |
| return custom_role.strip() | |
| if role_id: | |
| try: | |
| role = await db[JOB_ROLES].find_one({"_id": ObjectId(role_id)}) | |
| if role: | |
| return role["title"] | |
| except Exception: | |
| # If it's not a valid ObjectId, treat it as a direct generic title. | |
| return role_id | |
| return "Software Developer" | |
| async def _get_recent_user_questions(db, user_id: str, limit: int = 40) -> list[str]: | |
| recent: list[str] = [] | |
| seen: set[str] = set() | |
| cursor = db[ANSWERS].find({"user_id": user_id}, {"question": 1}).sort("stored_at", -1).limit(limit) | |
| async for doc in cursor: | |
| text = (doc.get("question") or "").strip() | |
| key = _question_fingerprint(text) | |
| if not text or not key or key in seen: | |
| continue | |
| seen.add(key) | |
| recent.append(text) | |
| return recent | |
| def _build_resume_intro_question(role_title: str, jd_title: str) -> str: | |
| role = (role_title or "this role").strip() | |
| title = (jd_title or "").strip() | |
| def _normalized_key(value: str) -> str: | |
| key = re.sub(r"[^a-z0-9\s]", " ", (value or "").lower()) | |
| key = re.sub(r"\s+", " ", key).strip() | |
| for prefix in ("the ", "an ", "a "): | |
| if key.startswith(prefix): | |
| key = key[len(prefix):].strip() | |
| break | |
| return key | |
| role_clean = re.sub(r"\s+", " ", role).strip() | |
| if role_clean.lower().startswith("the "): | |
| role_clean = role_clean[4:].strip() | |
| role_phrase = f"the {role_clean}" if role_clean.lower().endswith(" role") else f"the {role_clean} role" | |
| role_key = _normalized_key(role_clean) | |
| title_key = _normalized_key(title) | |
| is_generic_title = title_key in { | |
| "", | |
| "selected job description", | |
| role_key, | |
| f"{role_key} role", | |
| } | |
| if is_generic_title: | |
| return f"Introduce yourself and explain how your background aligns with {role_phrase}." | |
| title_phrase = title if title.lower().startswith(("the ", "an ", "a ")) else f"the {title}" | |
| return ( | |
| f"Introduce yourself and explain how your background aligns with {role_phrase} " | |
| f"in {title_phrase} job description." | |
| ) | |
| def _build_resume_resilient_followup_question( | |
| session: dict, | |
| question_number: int, | |
| variant: int = 0, | |
| focus_skill: str | None = None, | |
| ) -> str: | |
| role_title = (session.get("role_title") or "this role").strip() | |
| skill_pool = _resume_skill_pool(session) | |
| index = max(0, question_number - 1) + max(0, variant) | |
| skill = (focus_skill or "").strip() or skill_pool[index % len(skill_pool)] | |
| templates = [ | |
| "Describe a real project where you applied {skill} for {role}. What constraints and trade-offs shaped your design?", | |
| "If {skill} failed in production for a {role} workflow, how would you debug it step by step?", | |
| "Explain how you would test and validate a solution using {skill} before shipping it for {role}.", | |
| "Compare two approaches for {skill} in a {role} context and justify the final choice.", | |
| "Design an improvement plan to make your {skill} implementation more scalable and reliable for {role}.", | |
| "Your {role} service using {skill} has intermittent latency spikes. How would you investigate and stabilize it?", | |
| "During code review, what risks would you look for in a {skill} implementation for {role}, and why?", | |
| "How would you design rollback and observability for a feature centered on {skill} in {role}?", | |
| "Assume two engineers propose different {skill} strategies for {role}. How would you evaluate and choose between them?", | |
| "What failure modes around {skill} are easiest to miss in {role}, and how would you proactively test them?", | |
| ] | |
| template = templates[index % len(templates)] | |
| return template.format(skill=skill, role=role_title) | |
| def _build_topic_resilient_followup_question(session: dict, question_number: int, variant: int = 0) -> str: | |
| topic_name = (session.get("role_title") or "this topic").strip() | |
| index = max(0, question_number - 1) + max(0, variant) | |
| templates = [ | |
| "Explain {topic} with a practical example from a production-like scenario.", | |
| "What are the most common failure patterns in {topic}, and how would you detect them early?", | |
| "Design a step-by-step implementation plan for {topic} with measurable checkpoints.", | |
| "Compare two approaches in {topic}, including trade-offs in scalability, latency, and maintainability.", | |
| "If a {topic} solution regressed after deployment, how would you triage and recover safely?", | |
| ] | |
| template = templates[index % len(templates)] | |
| return template.format(topic=topic_name) | |
| async def _enqueue_resume_followup_with_fallback( | |
| *, | |
| redis, | |
| session_id: str, | |
| session: dict, | |
| answered_count: int, | |
| suggested_text: str, | |
| suggested_difficulty: str, | |
| suggested_category: str, | |
| focus_skill_override: str | None = None, | |
| ) -> tuple[str | None, bool]: | |
| candidates: list[tuple[str, str, str, bool]] = [] | |
| existing_questions = await _get_session_question_texts(redis, session_id) | |
| primary = (suggested_text or "").strip() | |
| if primary: | |
| candidates.append((primary, suggested_difficulty or "medium", suggested_category or "follow-up", True)) | |
| # Deterministic local fallback prevents early completion when model output is empty/duplicate. | |
| base_question_number = max(2, answered_count + 1) | |
| for variant in range(6): | |
| question_number = base_question_number + variant | |
| fallback_text = _build_resume_resilient_followup_question( | |
| session=session, | |
| question_number=question_number, | |
| variant=variant, | |
| focus_skill=focus_skill_override, | |
| ) | |
| candidates.append((fallback_text, "medium", "resume-fallback", False)) | |
| seen: set[str] = set() | |
| for text, difficulty, category, is_primary in candidates: | |
| normalized_text = normalize_question_text(text) | |
| if _is_question_too_similar(normalized_text, existing_questions): | |
| continue | |
| key = _question_fingerprint(normalized_text) | |
| if not key or key in seen: | |
| continue | |
| seen.add(key) | |
| qid = await enqueue_question( | |
| redis=redis, | |
| session_id=session_id, | |
| question=normalized_text, | |
| difficulty=difficulty, | |
| category=category, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if qid: | |
| existing_questions.append(normalized_text) | |
| return qid, is_primary | |
| return None, False | |
| async def _get_session_question_texts(redis, session_id: str) -> list[str]: | |
| question_ids = await redis.lrange(f"session:{session_id}:questions", 0, -1) | |
| output: list[str] = [] | |
| for qid in question_ids: | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| text = (q.get("question") or "").strip() | |
| if text: | |
| output.append(text) | |
| return output | |
| async def _get_answered_question_texts(redis, session_id: str, limit: int = 4) -> list[str]: | |
| answer_ids = await redis.lrange(f"session:{session_id}:answers", -max(1, limit), -1) | |
| output: list[str] = [] | |
| for qid in answer_ids: | |
| answer_data = await redis.hgetall(f"session:{session_id}:a:{qid}") | |
| text = (answer_data.get("question") or "").strip() | |
| if not text: | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| text = (q.get("question") or "").strip() | |
| if text: | |
| output.append(text) | |
| return output | |
| async def _sample_topic_questions( | |
| db, | |
| topic_id: str, | |
| excluded_questions: list[str], | |
| limit: int, | |
| ) -> list[dict]: | |
| if limit <= 0: | |
| return [] | |
| docs = await db[TOPIC_QUESTIONS].find({"topic_id": topic_id}).to_list(length=500) | |
| random.shuffle(docs) | |
| excluded = {_question_fingerprint(q) for q in excluded_questions if q} | |
| selected: list[dict] = [] | |
| for doc in docs: | |
| text = (doc.get("question") or "").strip() | |
| if not text: | |
| continue | |
| fp = _question_fingerprint(text) | |
| if not fp or fp in excluded: | |
| continue | |
| excluded.add(fp) | |
| selected.append( | |
| { | |
| "question": text, | |
| "difficulty": _normalize_bank_difficulty(doc.get("difficulty") or "medium"), | |
| "category": doc.get("category") or "topic", | |
| } | |
| ) | |
| if len(selected) >= limit: | |
| break | |
| return selected | |
| async def _seed_resume_questions_task(session_id: str) -> None: | |
| db = get_db() | |
| redis = get_redis() | |
| session = await redis.hgetall(f"session:{session_id}") | |
| if not session or session.get("status") != "in_progress" or session.get("interview_type") != "resume": | |
| return | |
| try: | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| current_q_size = await queue_size(redis, session_id) | |
| needed = max(0, RESUME_INITIAL_BATCH_SIZE - current_q_size) | |
| if needed > 0: | |
| excluded_questions = await _get_session_question_texts(redis, session_id) | |
| seed_items = await generate_resume_seed_questions( | |
| role_title=session.get("role_title", "Software Developer"), | |
| resume_summary=session.get("resume_summary", "No summary available"), | |
| resume_skills=_safe_json_list(session.get("skills", "[]")), | |
| jd_title=session.get("job_description_title", ""), | |
| jd_description=session.get("job_description_text", ""), | |
| jd_required_skills=_safe_json_list(session.get("jd_required_skills", "[]")), | |
| excluded_questions=excluded_questions, | |
| count=needed, | |
| ) | |
| appended = 0 | |
| for item in seed_items: | |
| qid = await enqueue_question( | |
| redis=redis, | |
| session_id=session_id, | |
| question=item.get("question", ""), | |
| difficulty=item.get("difficulty", "medium"), | |
| category=item.get("category", "resume-seed"), | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if qid: | |
| appended += 1 | |
| await redis.hset( | |
| f"session:{session_id}", | |
| mapping={ | |
| "generated_count": str(_safe_int(session.get("generated_count", 0)) + appended), | |
| "metrics_gemini_calls": str(_safe_int(session.get("metrics_gemini_calls", 0)) + 1), | |
| "metrics_gemini_questions": str(_safe_int(session.get("metrics_gemini_questions", 0)) + appended), | |
| "metrics_generation_batches": str(_safe_int(session.get("metrics_generation_batches", 0)) + 1), | |
| }, | |
| ) | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| { | |
| "$set": { | |
| "metrics_gemini_calls": _safe_int(session.get("metrics_gemini_calls", 0)) + 1, | |
| "metrics_gemini_questions": _safe_int(session.get("metrics_gemini_questions", 0)) + appended, | |
| "metrics_generation_batches": _safe_int(session.get("metrics_generation_batches", 0)) + 1, | |
| } | |
| }, | |
| ) | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| next_qid, next_q = await peek_next_question(redis, session_id) | |
| if next_qid and next_q: | |
| _schedule_question_audio_prefetch( | |
| [next_q.get("question", "")], | |
| _normalize_voice_gender(session.get("speech_voice_gender")), | |
| ) | |
| except Exception: | |
| # Non-blocking pre-seed path should never fail interview startup. | |
| return | |
| def _normalize_role_key(role_title: str) -> str: | |
| normalized = re.sub(r"\s+", " ", (role_title or "").strip().lower()) | |
| return normalized or "software developer" | |
| def _build_verification_cache_key( | |
| role_key: str, | |
| jd_id: str, | |
| jd_updated_at: str, | |
| resume_uploaded_at: str, | |
| ) -> str: | |
| return "||".join([ | |
| role_key or "software developer", | |
| jd_id or "-", | |
| jd_updated_at or "-", | |
| resume_uploaded_at or "-", | |
| ]) | |
| def _verification_doc_to_response(doc: dict, *, message: str, cached: bool) -> dict: | |
| return { | |
| "verification_id": doc.get("verification_id"), | |
| "saved_at": doc.get("saved_at") or doc.get("created_at") or utc_now(), | |
| "role_title": doc.get("role_title"), | |
| "job_description": doc.get("job_description") or {}, | |
| "resume_snapshot": doc.get("resume_snapshot") or {}, | |
| "jd_alignment": doc.get("jd_alignment") or {}, | |
| "message": message, | |
| "cached": cached, | |
| } | |
| async def verify_resume_job_description( | |
| user_id: str, | |
| role_id: str = None, | |
| custom_role: str = None, | |
| job_description_id: str = None, | |
| ) -> dict: | |
| """Run resume-vs-job-description verification without starting an interview. | |
| Reuses a saved verification while the selected role, JD version, and resume | |
| upload timestamp are unchanged. | |
| """ | |
| if not job_description_id: | |
| raise ValueError("job_description_id is required for verification") | |
| db = get_db() | |
| resume_doc = await db[RESUMES].find_one({"user_id": user_id}) | |
| if not resume_doc: | |
| raise ValueError("Please upload your resume before running verification") | |
| skills_doc = await db[SKILLS].find_one({"user_id": user_id}) | |
| resume_skills = normalize_skill_list(skills_doc.get("skills", [])) if skills_doc else [] | |
| parsed_data = (resume_doc or {}).get("parsed_data", {}) or {} | |
| summary_parts = [ | |
| parsed_data.get("experience_summary") or "", | |
| " ".join(parsed_data.get("recommended_roles", []) or []), | |
| ] | |
| resume_summary = "\n".join([part for part in summary_parts if part]).strip() or "No summary available" | |
| role_title = await _resolve_role_title(db, role_id=role_id, custom_role=custom_role) | |
| role_key = _normalize_role_key(role_title) | |
| selected_jd = await get_job_description_for_user(user_id, job_description_id) | |
| resume_uploaded_at = resume_doc.get("uploaded_at") or "" | |
| jd_updated_at = selected_jd.get("updated_at") or "" | |
| cache_key = _build_verification_cache_key( | |
| role_key=role_key, | |
| jd_id=selected_jd.get("id") or job_description_id, | |
| jd_updated_at=jd_updated_at, | |
| resume_uploaded_at=resume_uploaded_at, | |
| ) | |
| existing_verification = await db[JD_VERIFICATIONS].find_one( | |
| {"user_id": user_id, "cache_key": cache_key}, | |
| sort=[("created_at", -1)], | |
| ) | |
| if not existing_verification: | |
| compatibility_query = { | |
| "user_id": user_id, | |
| "role_title": role_title, | |
| "job_description.id": selected_jd.get("id"), | |
| "resume_snapshot.uploaded_at": resume_uploaded_at, | |
| } | |
| if jd_updated_at: | |
| compatibility_query["job_description.updated_at"] = jd_updated_at | |
| existing_verification = await db[JD_VERIFICATIONS].find_one( | |
| compatibility_query, | |
| sort=[("created_at", -1)], | |
| ) | |
| if existing_verification: | |
| await db[JD_VERIFICATIONS].update_one( | |
| {"_id": existing_verification["_id"]}, | |
| { | |
| "$set": { | |
| "cache_key": cache_key, | |
| "role_key": role_key, | |
| "saved_at": existing_verification.get("saved_at") | |
| or existing_verification.get("created_at") | |
| or utc_now(), | |
| } | |
| }, | |
| ) | |
| if existing_verification: | |
| return _verification_doc_to_response( | |
| existing_verification, | |
| message="Loaded saved verification", | |
| cached=True, | |
| ) | |
| jd_alignment = await analyze_resume_vs_job_description( | |
| role_title=role_title, | |
| resume_skills=resume_skills if resume_skills else ["general"], | |
| resume_summary=resume_summary, | |
| jd_title=selected_jd.get("title", ""), | |
| jd_description=selected_jd.get("description", ""), | |
| jd_required_skills=selected_jd.get("required_skills", []), | |
| ) | |
| resume_snapshot = { | |
| "filename": resume_doc.get("original_filename") or resume_doc.get("filename") or "", | |
| "uploaded_at": resume_uploaded_at, | |
| "skills": resume_skills, | |
| "parsed_data": { | |
| "name": parsed_data.get("name"), | |
| "email": parsed_data.get("email"), | |
| "phone": parsed_data.get("phone"), | |
| "location": parsed_data.get("location"), | |
| "recommended_roles": parsed_data.get("recommended_roles", []) or [], | |
| "experience_summary": parsed_data.get("experience_summary", "") or "", | |
| }, | |
| } | |
| verification_id = generate_id() | |
| saved_at = utc_now() | |
| verification_doc = { | |
| "verification_id": verification_id, | |
| "user_id": user_id, | |
| "role_id": role_id, | |
| "custom_role": custom_role, | |
| "role_title": role_title, | |
| "role_key": role_key, | |
| "cache_key": cache_key, | |
| "job_description": { | |
| "id": selected_jd.get("id"), | |
| "title": selected_jd.get("title"), | |
| "company": selected_jd.get("company"), | |
| "description": selected_jd.get("description"), | |
| "required_skills": selected_jd.get("required_skills", []) or [], | |
| "updated_at": jd_updated_at, | |
| }, | |
| "resume_snapshot": resume_snapshot, | |
| "jd_alignment": jd_alignment, | |
| "saved_at": saved_at, | |
| "created_at": saved_at, | |
| } | |
| await db[JD_VERIFICATIONS].insert_one(verification_doc) | |
| return _verification_doc_to_response( | |
| verification_doc, | |
| message="Verification complete", | |
| cached=False, | |
| ) | |
| async def _get_generated_question_texts(redis, session_id: str) -> list[str]: | |
| qids = await redis.lrange(f"session:{session_id}:questions", 0, -1) | |
| questions = [] | |
| for qid in qids: | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| if q and q.get("question"): | |
| questions.append(q["question"]) | |
| return questions | |
| async def _generate_question_batch( | |
| role_title: str, | |
| skills: list[str], | |
| previous_questions: list[str], | |
| generated_count: int, | |
| max_questions: int, | |
| current_difficulty: str, | |
| local_summary: str | None, | |
| batch_size: int, | |
| ) -> tuple[list[dict], str]: | |
| remaining = max(0, max_questions - generated_count) | |
| target = min(batch_size, remaining) | |
| if target <= 0: | |
| return [], current_difficulty | |
| # Initial resume seed: generate the full first batch in one Gemini call. | |
| if generated_count == 0 and target > 1 and not local_summary: | |
| seeded = await generate_interview_question_batch( | |
| skills=skills, | |
| role_title=role_title, | |
| count=target, | |
| start_question_number=1, | |
| previous_questions=previous_questions, | |
| foundation_limit=0, | |
| ) | |
| if seeded: | |
| last = seeded[-1].get("difficulty", current_difficulty) | |
| return seeded, last | |
| generated: list[dict] = [] | |
| rolling_questions = list(previous_questions) | |
| rolling_difficulty = current_difficulty | |
| rolling_count = generated_count | |
| for i in range(target): | |
| state = { | |
| "role_title": role_title, | |
| "skills": skills, | |
| "previous_questions": rolling_questions, | |
| # Feed the local summary once per batch as extra context. | |
| "previous_answer": local_summary if i == 0 else None, | |
| "question_count": rolling_count, | |
| "max_questions": max_questions, | |
| "current_difficulty": rolling_difficulty, | |
| } | |
| graph_result = await run_interview_graph(state) | |
| q_data = graph_result.get("question_data", {}) | |
| difficulty = q_data.get("difficulty", graph_result.get("current_difficulty", "medium")) | |
| generated.append( | |
| { | |
| "question": q_data.get("question", "Can you explain your approach?"), | |
| "difficulty": difficulty, | |
| "category": q_data.get("category", "general"), | |
| } | |
| ) | |
| rolling_questions.append(generated[-1]["question"]) | |
| rolling_count += 1 | |
| rolling_difficulty = difficulty | |
| return generated, rolling_difficulty | |
| async def _append_batch_to_redis(redis, session_id: str, batch: list[dict]) -> list[str]: | |
| created_ids: list[str] = [] | |
| for item in batch: | |
| normalized_question = normalize_question_text(item.get("question", "Can you explain your approach?")) | |
| if not normalized_question: | |
| continue | |
| qid = generate_id() | |
| created_ids.append(qid) | |
| await redis.hset( | |
| f"session:{session_id}:q:{qid}", | |
| mapping={ | |
| "question_id": qid, | |
| "question": normalized_question, | |
| "difficulty": item.get("difficulty", "medium"), | |
| "category": item.get("category", "general"), | |
| }, | |
| ) | |
| await redis.rpush(f"session:{session_id}:questions", qid) | |
| await redis.expire(f"session:{session_id}:q:{qid}", SESSION_TTL) | |
| if created_ids: | |
| await redis.expire(f"session:{session_id}:questions", SESSION_TTL) | |
| return created_ids | |
| async def _fetch_question_bank_batch( | |
| db, | |
| role_id: str | None, | |
| excluded_questions: list[str], | |
| limit: int, | |
| skill_hints: list[str] | None = None, | |
| ) -> list[dict]: | |
| if limit <= 0: | |
| return [] | |
| query = {"question": {"$exists": True, "$ne": ""}} | |
| if role_id: | |
| role_candidates = [role_id] | |
| try: | |
| oid = ObjectId(role_id) | |
| role_candidates.append(str(oid)) | |
| role_candidates.append(oid) | |
| except Exception: | |
| pass | |
| query["role_id"] = {"$in": role_candidates} | |
| normalized_hints = normalize_skill_list(skill_hints or []) | |
| if normalized_hints: | |
| scope_match = [] | |
| for skill in normalized_hints: | |
| token = re.escape(skill) | |
| scope_match.append({"category": {"$regex": token, "$options": "i"}}) | |
| scope_match.append({"question": {"$regex": token, "$options": "i"}}) | |
| if scope_match: | |
| query["$or"] = scope_match | |
| excluded = {q.strip().lower() for q in excluded_questions if q} | |
| selected: list[dict] = [] | |
| for sample_size in (max(limit * 12, 80), max(limit * 24, 160)): | |
| pipeline = [ | |
| {"$match": query}, | |
| {"$sample": {"size": sample_size}}, | |
| ] | |
| async for q in db[QUESTIONS].aggregate(pipeline): | |
| text = (q.get("question") or "").strip() | |
| if not text: | |
| continue | |
| if text.lower() in excluded: | |
| continue | |
| selected.append( | |
| { | |
| "question": text, | |
| "difficulty": _normalize_bank_difficulty(q.get("difficulty") or "medium"), | |
| "category": q.get("category") or "question-bank", | |
| } | |
| ) | |
| excluded.add(text.lower()) | |
| if len(selected) >= limit: | |
| break | |
| if len(selected) >= limit: | |
| break | |
| # If role-scoped pool is too small, widen to global random pool. | |
| if len(selected) < limit and role_id: | |
| fallback = await _fetch_question_bank_batch( | |
| db=db, | |
| role_id=None, | |
| excluded_questions=list(excluded), | |
| limit=limit - len(selected), | |
| skill_hints=normalized_hints, | |
| ) | |
| selected.extend(fallback) | |
| return selected | |
| def _strict_followup_difficulty(answered_count: int) -> str: | |
| # After first DB set (Q1-5), follow-ups should feel like real interview pressure. | |
| return "hard" if answered_count >= 10 else "medium" | |
| def _has_followup_opportunity(qa_pairs: list, window: int = BATCH_SIZE) -> bool: | |
| """Decide whether Gemini follow-up questions are needed for the latest batch.""" | |
| if not qa_pairs: | |
| return False | |
| weak_markers = { | |
| "i think", | |
| "maybe", | |
| "not sure", | |
| "dont know", | |
| "don't know", | |
| "etc", | |
| "kind of", | |
| "sort of", | |
| } | |
| for qa in qa_pairs[-window:]: | |
| answer = (qa.get("answer") or "").strip() | |
| if not answer: | |
| continue | |
| if len(answer.split()) < 30: | |
| return True | |
| lowered = answer.lower() | |
| if any(marker in lowered for marker in weak_markers): | |
| return True | |
| return False | |
| async def _generate_mixed_followup_batch( | |
| db, | |
| redis, | |
| session_id: str, | |
| session: dict, | |
| generated_count: int, | |
| max_questions: int, | |
| ) -> tuple[list[dict], str, dict]: | |
| remaining = max(0, max_questions - generated_count) | |
| target = min(BATCH_SIZE, remaining) | |
| if target <= 0: | |
| return [], session.get("current_difficulty", "medium"), { | |
| "gemini_calls": 0, | |
| "gemini_questions": 0, | |
| "bank_questions": 0, | |
| "bank_shortfall": 0, | |
| } | |
| previous_questions = await _get_generated_question_texts(redis, session_id) | |
| qa_pairs = await get_session_qa(session_id) | |
| answered_count = len(qa_pairs) | |
| role_title = session.get("role_title", "Software Developer") | |
| skills = _safe_json_list(session.get("skills", "[]")) | |
| jd_required_skills = _safe_json_list(session.get("jd_required_skills", "[]")) | |
| resume_source_mode = (session.get("resume_source_mode") or "db").strip().lower() | |
| current_difficulty = _strict_followup_difficulty(answered_count) | |
| from utils.gemini import generate_followup_question_batch_from_qa | |
| gemini_calls = 0 | |
| gemini_questions = 0 | |
| if resume_source_mode == "ai": | |
| ai_items = await generate_followup_question_batch_from_qa( | |
| role_title=role_title, | |
| skills=skills, | |
| qa_pairs=qa_pairs, | |
| previous_questions=previous_questions, | |
| count=target, | |
| difficulty=current_difficulty, | |
| ) | |
| gemini_calls = 1 if target > 0 else 0 | |
| deduped_ai = [] | |
| excluded_lower = {q.strip().lower() for q in previous_questions if q} | |
| for item in ai_items: | |
| text = (item.get("question") or "").strip() | |
| if not text: | |
| continue | |
| lowered = text.lower() | |
| if lowered in excluded_lower: | |
| continue | |
| deduped_ai.append(item) | |
| excluded_lower.add(lowered) | |
| if len(deduped_ai) >= target: | |
| break | |
| if len(deduped_ai) < target: | |
| refill, refill_last = await _generate_question_batch( | |
| role_title=role_title, | |
| skills=skills, | |
| previous_questions=previous_questions + [i.get("question", "") for i in deduped_ai], | |
| generated_count=generated_count + len(deduped_ai), | |
| max_questions=max_questions, | |
| current_difficulty=current_difficulty, | |
| local_summary=_LOCAL_SUMMARIES.get(session_id), | |
| batch_size=target - len(deduped_ai), | |
| ) | |
| for item in refill: | |
| text = (item.get("question") or "").strip() | |
| if not text: | |
| continue | |
| lowered = text.lower() | |
| if lowered in excluded_lower: | |
| continue | |
| deduped_ai.append(item) | |
| excluded_lower.add(lowered) | |
| if len(deduped_ai) >= target: | |
| break | |
| if refill: | |
| current_difficulty = refill_last | |
| final_ai = deduped_ai[:target] | |
| last_difficulty = final_ai[-1].get("difficulty", current_difficulty) if final_ai else current_difficulty | |
| return final_ai, last_difficulty, { | |
| "gemini_calls": gemini_calls, | |
| "gemini_questions": len(final_ai), | |
| "bank_questions": 0, | |
| "bank_shortfall": 0, | |
| } | |
| # Batch policy: | |
| # - If follow-up opportunity exists: 2 AI + 3 DB | |
| # - Otherwise: 5 DB | |
| ai_target = min(FOLLOWUP_AI_COUNT, target) if _has_followup_opportunity(qa_pairs) else 0 | |
| excluded_lower = {q.strip().lower() for q in previous_questions if q} | |
| ai_items: list[dict] = [] | |
| if ai_target > 0: | |
| generated_ai = await generate_followup_question_batch_from_qa( | |
| role_title=role_title, | |
| skills=skills, | |
| qa_pairs=qa_pairs, | |
| previous_questions=previous_questions, | |
| count=ai_target, | |
| difficulty=current_difficulty, | |
| ) | |
| gemini_calls += 1 | |
| for item in generated_ai: | |
| text = (item.get("question") or "").strip() | |
| if not text: | |
| continue | |
| lowered = text.lower() | |
| if lowered in excluded_lower: | |
| continue | |
| ai_items.append(item) | |
| excluded_lower.add(lowered) | |
| if len(ai_items) >= ai_target: | |
| break | |
| gemini_questions += len(ai_items) | |
| bank_target = max(0, target - len(ai_items)) | |
| exclude_pool = list(excluded_lower) | |
| bank_items = await _fetch_question_bank_batch( | |
| db=db, | |
| role_id=session.get("role_id"), | |
| excluded_questions=exclude_pool, | |
| limit=bank_target, | |
| skill_hints=jd_required_skills, | |
| ) | |
| for item in bank_items: | |
| text = (item.get("question") or "").strip() | |
| if text: | |
| excluded_lower.add(text.lower()) | |
| if len(bank_items) < bank_target: | |
| # Keep total batch size stable if the bank pool is exhausted. | |
| refill = bank_target - len(bank_items) | |
| refill_ai = [] | |
| added_refill_ai = 0 | |
| if refill > 0: | |
| refill_ai = await generate_followup_question_batch_from_qa( | |
| role_title=role_title, | |
| skills=skills, | |
| qa_pairs=qa_pairs, | |
| previous_questions=list(excluded_lower), | |
| count=refill, | |
| difficulty=current_difficulty, | |
| ) | |
| gemini_calls += 1 | |
| for item in refill_ai: | |
| text = (item.get("question") or "").strip() | |
| if not text: | |
| continue | |
| lowered = text.lower() | |
| if lowered in excluded_lower: | |
| continue | |
| ai_items.append(item) | |
| added_refill_ai += 1 | |
| excluded_lower.add(lowered) | |
| if len(ai_items) + len(bank_items) >= target: | |
| break | |
| gemini_questions += added_refill_ai | |
| mixed = (ai_items + bank_items)[:target] | |
| if len(mixed) > 1: | |
| random.shuffle(mixed) | |
| last_difficulty = mixed[-1].get("difficulty", current_difficulty) if mixed else current_difficulty | |
| return mixed, last_difficulty, { | |
| "gemini_calls": gemini_calls, | |
| "gemini_questions": gemini_questions, | |
| "bank_questions": len(bank_items), | |
| "bank_shortfall": max(0, bank_target - len(bank_items)), | |
| } | |
| async def _start_topic_interview(user_id: str, topic_id: str) -> dict: | |
| """Start topic interview with low-cost DB-first flow and staged AI follow-ups.""" | |
| db = get_db() | |
| redis = get_redis() | |
| topic = await db[TOPICS].find_one({"_id": __import__("bson").ObjectId(topic_id)}) | |
| if not topic: | |
| raise ValueError("Topic not found") | |
| if not topic.get("is_published", False): | |
| raise ValueError("This topic interview is not published yet") | |
| initial_items = await _sample_topic_questions( | |
| db=db, | |
| topic_id=topic_id, | |
| excluded_questions=[], | |
| limit=TOPIC_INITIAL_DB_QUESTIONS, | |
| ) | |
| if len(initial_items) < TOPIC_INITIAL_ASK_COUNT: | |
| raise ValueError("Not enough topic questions to start interview") | |
| first_question = initial_items[0] | |
| queued_initial = initial_items[1:TOPIC_INITIAL_ASK_COUNT] | |
| timer_enabled = bool(topic.get("timer_enabled", False)) | |
| timer_seconds = topic.get("timer_seconds") if timer_enabled else None | |
| session_id = generate_id() | |
| _LOCAL_SUMMARIES[session_id] = "" | |
| user_doc = None | |
| try: | |
| user_doc = await db[USERS].find_one({"_id": ObjectId(user_id)}, {"speech_settings": 1}) | |
| except Exception: | |
| user_doc = await db[USERS].find_one({"user_id": user_id}, {"speech_settings": 1}) | |
| speech_voice_gender = _normalize_voice_gender(((user_doc or {}).get("speech_settings") or {}).get("voice_gender")) | |
| first_id = generate_id() | |
| await redis.hset( | |
| f"session:{session_id}:q:{first_id}", | |
| mapping={ | |
| "question_id": first_id, | |
| "question": normalize_question_text(first_question.get("question", "Can you explain this topic?")), | |
| "difficulty": first_question.get("difficulty", "medium"), | |
| "category": first_question.get("category", topic.get("name", "topic")), | |
| }, | |
| ) | |
| await redis.expire(f"session:{session_id}:q:{first_id}", SESSION_TTL) | |
| await redis.rpush(f"session:{session_id}:questions", first_id) | |
| await redis.expire(f"session:{session_id}:questions", SESSION_TTL) | |
| await mark_question_asked( | |
| redis=redis, | |
| session_id=session_id, | |
| question_text=first_question.get("question", ""), | |
| ttl_seconds=SESSION_TTL, | |
| ) | |
| queued_count = 0 | |
| for item in queued_initial: | |
| qid = await enqueue_question( | |
| redis=redis, | |
| session_id=session_id, | |
| question=item.get("question", ""), | |
| difficulty=item.get("difficulty", "medium"), | |
| category=item.get("category", topic.get("name", "topic")), | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if qid: | |
| queued_count += 1 | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| session_doc = { | |
| "session_id": session_id, | |
| "user_id": user_id, | |
| "role_id": None, | |
| "role_title": topic.get("name", "Topic Interview"), | |
| "topic_id": topic_id, | |
| "interview_type": "topic", | |
| "status": "in_progress", | |
| "question_count": 1, | |
| "max_questions": TOPIC_TOTAL_QUESTIONS, | |
| "current_difficulty": first_question.get("difficulty", "medium"), | |
| "metrics_gemini_calls": 0, | |
| "metrics_gemini_questions": 0, | |
| "metrics_bank_questions": queued_count + 1, | |
| "metrics_bank_shortfall": max(0, TOPIC_INITIAL_ASK_COUNT - (queued_count + 1)), | |
| "metrics_generation_batches": 1, | |
| "speech_voice_gender": speech_voice_gender, | |
| "timer_enabled": timer_enabled, | |
| "timer_seconds": timer_seconds, | |
| "topic_followups_generated": False, | |
| "started_at": utc_now(), | |
| } | |
| await db[SESSIONS].insert_one(session_doc) | |
| session_state = { | |
| "user_id": user_id, | |
| "role_title": topic.get("name", "Topic Interview"), | |
| "topic_id": topic_id, | |
| "interview_type": "topic", | |
| "skills": json.dumps([topic.get("name", "general")]), | |
| "user_skills": json.dumps([]), | |
| "required_skills": json.dumps([]), | |
| "matched_skills": json.dumps([]), | |
| "missing_skills": json.dumps([]), | |
| "question_count": 1, | |
| "answered_count": 0, | |
| "served_count": 1, | |
| "generated_count": queued_count + 1, | |
| "max_questions": TOPIC_TOTAL_QUESTIONS, | |
| "current_difficulty": first_question.get("difficulty", "medium"), | |
| "timer_enabled": str(timer_enabled), | |
| "timer_seconds": str(timer_seconds or ""), | |
| "status": "in_progress", | |
| "speech_voice_gender": speech_voice_gender, | |
| "metrics_gemini_calls": 0, | |
| "metrics_gemini_questions": 0, | |
| "metrics_bank_questions": queued_count + 1, | |
| "metrics_bank_shortfall": max(0, TOPIC_INITIAL_ASK_COUNT - (queued_count + 1)), | |
| "metrics_generation_batches": 1, | |
| "topic_followups_generated": "0", | |
| } | |
| await redis.hset(f"session:{session_id}", mapping=session_state) | |
| await redis.expire(f"session:{session_id}", SESSION_TTL) | |
| next_qid, next_q = await peek_next_question(redis, session_id) | |
| prefetch_targets = [next_q.get("question", "")] if next_qid and next_q else [] | |
| _schedule_question_audio_prefetch(prefetch_targets, speech_voice_gender) | |
| return { | |
| "session_id": session_id, | |
| "interview_type": "topic", | |
| "topic": { | |
| "topic_id": topic_id, | |
| "name": topic.get("name", "Topic Interview"), | |
| "description": topic.get("description", ""), | |
| }, | |
| "skill_alignment": { | |
| "user_skills": [], | |
| "required_skills": [topic.get("name", "")], | |
| "matched_skills": [], | |
| "missing_skills": [], | |
| "interview_focus": [topic.get("name", "")], | |
| }, | |
| "question": { | |
| "question_id": first_id, | |
| "question": normalize_question_text(first_question.get("question", "Can you explain this topic?")), | |
| "difficulty": first_question.get("difficulty", "medium"), | |
| "question_number": 1, | |
| "total_questions": TOPIC_TOTAL_QUESTIONS, | |
| }, | |
| "timer": { | |
| "enabled": timer_enabled, | |
| "seconds": timer_seconds, | |
| }, | |
| "message": "Topic interview started. Good luck!", | |
| } | |
| async def _async_pregenerate_next_batch(session_id: str) -> None: | |
| db = get_db() | |
| redis = get_redis() | |
| try: | |
| session = await redis.hgetall(f"session:{session_id}") | |
| if not session or session.get("status") != "in_progress": | |
| return | |
| if session.get("interview_type", "resume") != "resume": | |
| return | |
| pending_len = await redis.llen(f"session:{session_id}:pending_questions") | |
| generated_count = int(session.get("generated_count", 0)) | |
| max_questions = int(session.get("max_questions", MAX_QUESTIONS)) | |
| if pending_len >= PREGEN_MIN_PENDING or generated_count >= max_questions: | |
| return | |
| batch, last_difficulty, batch_metrics = await _generate_mixed_followup_batch( | |
| db=db, | |
| redis=redis, | |
| session_id=session_id, | |
| session=session, | |
| generated_count=generated_count, | |
| max_questions=max_questions, | |
| ) | |
| if not batch: | |
| return | |
| new_ids = await _append_batch_to_redis(redis, session_id, batch) | |
| if new_ids: | |
| await redis.rpush(f"session:{session_id}:pending_questions", *new_ids) | |
| await redis.expire(f"session:{session_id}:pending_questions", SESSION_TTL) | |
| prefetch_targets = [] | |
| for qid in new_ids[:2]: | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| prefetch_targets.append(q.get("question", "")) | |
| _schedule_question_audio_prefetch( | |
| prefetch_targets, | |
| _normalize_voice_gender(session.get("speech_voice_gender")), | |
| ) | |
| await redis.hset( | |
| f"session:{session_id}", | |
| mapping={ | |
| "generated_count": str(generated_count + len(new_ids)), | |
| "current_difficulty": last_difficulty, | |
| "metrics_gemini_calls": str(_safe_int(session.get("metrics_gemini_calls", 0)) + batch_metrics.get("gemini_calls", 0)), | |
| "metrics_gemini_questions": str(_safe_int(session.get("metrics_gemini_questions", 0)) + batch_metrics.get("gemini_questions", 0)), | |
| "metrics_bank_questions": str(_safe_int(session.get("metrics_bank_questions", 0)) + batch_metrics.get("bank_questions", 0)), | |
| "metrics_bank_shortfall": str(_safe_int(session.get("metrics_bank_shortfall", 0)) + batch_metrics.get("bank_shortfall", 0)), | |
| "metrics_generation_batches": str(_safe_int(session.get("metrics_generation_batches", 0)) + 1), | |
| }, | |
| ) | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| { | |
| "$set": { | |
| "metrics_gemini_calls": _safe_int(session.get("metrics_gemini_calls", 0)) + batch_metrics.get("gemini_calls", 0), | |
| "metrics_gemini_questions": _safe_int(session.get("metrics_gemini_questions", 0)) + batch_metrics.get("gemini_questions", 0), | |
| "metrics_bank_questions": _safe_int(session.get("metrics_bank_questions", 0)) + batch_metrics.get("bank_questions", 0), | |
| "metrics_bank_shortfall": _safe_int(session.get("metrics_bank_shortfall", 0)) + batch_metrics.get("bank_shortfall", 0), | |
| "metrics_generation_batches": _safe_int(session.get("metrics_generation_batches", 0)) + 1, | |
| } | |
| }, | |
| ) | |
| finally: | |
| _PREGEN_IN_FLIGHT.discard(session_id) | |
| def _schedule_pregen(session_id: str, answered_count: int) -> None: | |
| # Start pre-generation as soon as Q1 is answered, while user is on Q2. | |
| if answered_count < 1: | |
| return | |
| if session_id in _PREGEN_IN_FLIGHT: | |
| return | |
| _PREGEN_IN_FLIGHT.add(session_id) | |
| asyncio.create_task(_async_pregenerate_next_batch(session_id)) | |
| async def start_interview( | |
| user_id: str, | |
| role_id: str = None, | |
| custom_role: str = None, | |
| interview_type: str = "resume", | |
| topic_id: str = None, | |
| job_description_id: str = None, | |
| ) -> dict: | |
| """Start a new interview session with low-cost queue-first orchestration.""" | |
| interview_type = (interview_type or "resume").strip().lower() | |
| if interview_type == "topic": | |
| if not topic_id: | |
| raise ValueError("topic_id is required for topic interviews") | |
| return await _start_topic_interview(user_id=user_id, topic_id=topic_id) | |
| db = get_db() | |
| redis = get_redis() | |
| user_doc = None | |
| try: | |
| user_doc = await db[USERS].find_one({"_id": ObjectId(user_id)}, {"speech_settings": 1}) | |
| except Exception: | |
| user_doc = await db[USERS].find_one({"user_id": user_id}, {"speech_settings": 1}) | |
| speech_voice_gender = _normalize_voice_gender(((user_doc or {}).get("speech_settings") or {}).get("voice_gender")) | |
| skills_doc = await db[SKILLS].find_one({"user_id": user_id}) | |
| user_skills = normalize_skill_list(skills_doc.get("skills", [])) if skills_doc else [] | |
| resume_doc = await db[RESUMES].find_one({"user_id": user_id}) | |
| if not resume_doc: | |
| raise ValueError("Please upload your resume before starting a resume interview") | |
| parsed_resume = (resume_doc or {}).get("parsed_data", {}) or {} | |
| resume_summary_parts = [ | |
| parsed_resume.get("experience_summary") or "", | |
| " ".join(parsed_resume.get("recommended_roles", []) or []), | |
| ] | |
| resume_summary = "\n".join([part for part in resume_summary_parts if part]).strip() or "No summary available" | |
| if not job_description_id: | |
| raise ValueError("Please select a Job Description before starting Resume Interview") | |
| role_title = await _resolve_role_title(db, role_id=role_id, custom_role=custom_role) | |
| selected_jd = await get_job_description_for_user(user_id, job_description_id) | |
| jd_required_skills = normalize_skill_list((selected_jd or {}).get("required_skills", [])) | |
| if not jd_required_skills: | |
| raise ValueError("Selected Job Description has no required skills. Add required skills first.") | |
| user_skill_set = {s.lower() for s in user_skills} | |
| matched_role_skills = [s for s in jd_required_skills if s.lower() in user_skill_set] | |
| missing_role_skills = [s for s in jd_required_skills if s.lower() not in user_skill_set] | |
| base_skills_for_interview = matched_role_skills + [s for s in missing_role_skills if s not in matched_role_skills] | |
| skills_for_interview = build_interview_focus_skills(base_skills_for_interview) or list(jd_required_skills) | |
| intro_question = _build_resume_intro_question(role_title=role_title, jd_title=selected_jd.get("title", "")) | |
| intro_question = normalize_question_text(intro_question) | |
| session_id = generate_id() | |
| _LOCAL_SUMMARIES[session_id] = "" | |
| first_id = generate_id() | |
| await redis.hset( | |
| f"session:{session_id}:q:{first_id}", | |
| mapping={ | |
| "question_id": first_id, | |
| "question": intro_question, | |
| "difficulty": "easy", | |
| "category": "intro", | |
| }, | |
| ) | |
| await redis.expire(f"session:{session_id}:q:{first_id}", SESSION_TTL) | |
| await redis.rpush(f"session:{session_id}:questions", first_id) | |
| await redis.expire(f"session:{session_id}:questions", SESSION_TTL) | |
| await mark_question_asked( | |
| redis=redis, | |
| session_id=session_id, | |
| question_text=intro_question, | |
| ttl_seconds=SESSION_TTL, | |
| ) | |
| session_doc = { | |
| "session_id": session_id, | |
| "user_id": user_id, | |
| "role_id": role_id, | |
| "role_title": role_title, | |
| "job_description_id": selected_jd.get("id"), | |
| "job_description_title": selected_jd.get("title"), | |
| "status": "in_progress", | |
| "interview_type": "resume", | |
| "question_count": 1, | |
| "max_questions": RESUME_MAX_QUESTIONS, | |
| "current_difficulty": "easy", | |
| "metrics_gemini_calls": 0, | |
| "metrics_gemini_questions": 0, | |
| "metrics_bank_questions": 1, | |
| "metrics_bank_shortfall": 0, | |
| "metrics_generation_batches": 0, | |
| "speech_voice_gender": speech_voice_gender, | |
| "started_at": utc_now(), | |
| "interview_generation_mode": "queue_followup", | |
| } | |
| await db[SESSIONS].insert_one(session_doc) | |
| session_state = { | |
| "user_id": user_id, | |
| "role_id": role_id or "", | |
| "role_title": role_title, | |
| "skills": json.dumps(skills_for_interview), | |
| "user_skills": json.dumps(user_skills), | |
| "required_skills": json.dumps(jd_required_skills), | |
| "matched_skills": json.dumps(matched_role_skills), | |
| "missing_skills": json.dumps(missing_role_skills), | |
| "question_count": 1, | |
| "answered_count": 0, | |
| "served_count": 1, | |
| "generated_count": 1, | |
| "max_questions": RESUME_MAX_QUESTIONS, | |
| "current_difficulty": "easy", | |
| "interview_type": "resume", | |
| "status": "in_progress", | |
| "speech_voice_gender": speech_voice_gender, | |
| "jd_required_skills": json.dumps(jd_required_skills), | |
| "job_description_title": selected_jd.get("title", ""), | |
| "job_description_text": selected_jd.get("description", ""), | |
| "resume_summary": resume_summary, | |
| "metrics_gemini_calls": 0, | |
| "metrics_gemini_questions": 0, | |
| "metrics_bank_questions": 1, | |
| "metrics_bank_shortfall": 0, | |
| "metrics_generation_batches": 0, | |
| "interview_generation_mode": "queue_followup", | |
| } | |
| await redis.hset(f"session:{session_id}", mapping=session_state) | |
| await redis.expire(f"session:{session_id}", SESSION_TTL) | |
| # Preload initial queue in background (2 questions) without blocking first question delivery. | |
| asyncio.create_task(_seed_resume_questions_task(session_id)) | |
| return { | |
| "session_id": session_id, | |
| "skill_alignment": { | |
| "user_skills": user_skills, | |
| "required_skills": jd_required_skills, | |
| "matched_skills": matched_role_skills, | |
| "missing_skills": missing_role_skills, | |
| "interview_focus": skills_for_interview, | |
| }, | |
| "question": { | |
| "question_id": first_id, | |
| "question": intro_question, | |
| "difficulty": "easy", | |
| "question_number": 1, | |
| "total_questions": RESUME_MAX_QUESTIONS, | |
| }, | |
| "timer": { | |
| "enabled": False, | |
| "seconds": None, | |
| }, | |
| "message": "Interview started. Good luck!", | |
| "job_description": selected_jd, | |
| "jd_alignment": None, | |
| } | |
| async def _record_submit_latency(started_at: float) -> float: | |
| elapsed_ms = (perf_counter() - started_at) * 1000.0 | |
| await record_latency("submit_ms", elapsed_ms) | |
| return round(elapsed_ms, 2) | |
| async def _apply_generation_metric_delta( | |
| *, | |
| db, | |
| redis, | |
| session_id: str, | |
| session: dict, | |
| metrics_delta: dict, | |
| generated_count: int | None = None, | |
| extra_redis_fields: dict | None = None, | |
| extra_db_fields: dict | None = None, | |
| ) -> dict: | |
| base_stats = _current_generation_stats(session) | |
| effective_stats = { | |
| "gemini_calls": base_stats["gemini_calls"] + _safe_int(metrics_delta.get("gemini_calls", 0)), | |
| "gemini_questions": base_stats["gemini_questions"] + _safe_int(metrics_delta.get("gemini_questions", 0)), | |
| "bank_questions": base_stats["bank_questions"] + _safe_int(metrics_delta.get("bank_questions", 0)), | |
| "bank_shortfall": base_stats["bank_shortfall"] + _safe_int(metrics_delta.get("bank_shortfall", 0)), | |
| "generation_batches": base_stats["generation_batches"] + _safe_int(metrics_delta.get("generation_batches", 0)), | |
| } | |
| redis_mapping = { | |
| "metrics_gemini_calls": str(effective_stats["gemini_calls"]), | |
| "metrics_gemini_questions": str(effective_stats["gemini_questions"]), | |
| "metrics_bank_questions": str(effective_stats["bank_questions"]), | |
| "metrics_bank_shortfall": str(effective_stats["bank_shortfall"]), | |
| "metrics_generation_batches": str(effective_stats["generation_batches"]), | |
| } | |
| if generated_count is not None: | |
| redis_mapping["generated_count"] = str(generated_count) | |
| if extra_redis_fields: | |
| redis_mapping.update(extra_redis_fields) | |
| await redis.hset(f"session:{session_id}", mapping=redis_mapping) | |
| db_set = { | |
| "metrics_gemini_calls": effective_stats["gemini_calls"], | |
| "metrics_gemini_questions": effective_stats["gemini_questions"], | |
| "metrics_bank_questions": effective_stats["bank_questions"], | |
| "metrics_bank_shortfall": effective_stats["bank_shortfall"], | |
| "metrics_generation_batches": effective_stats["generation_batches"], | |
| } | |
| if generated_count is not None: | |
| db_set["generated_count"] = generated_count | |
| if extra_db_fields: | |
| db_set.update(extra_db_fields) | |
| await db[SESSIONS].update_one({"session_id": session_id}, {"$set": db_set}) | |
| return effective_stats | |
| async def _post_submit_resume_processing( | |
| session_id: str, | |
| question_id: str, | |
| question_text: str, | |
| answer: str, | |
| answered_count: int, | |
| max_questions: int, | |
| ) -> None: | |
| db = get_db() | |
| redis = get_redis() | |
| async with _get_post_submit_lock(session_id): | |
| session = await redis.hgetall(f"session:{session_id}") | |
| if not session: | |
| return | |
| skill_pool = _resume_skill_pool(session) | |
| recent_answered_questions = await _get_answered_question_texts( | |
| redis=redis, | |
| session_id=session_id, | |
| limit=4, | |
| ) | |
| recent_focus_topic, same_topic_streak = _recent_focus_streak( | |
| recent_answered_questions, | |
| skill_pool, | |
| ) | |
| recent_context = await get_recent_context_items( | |
| redis=redis, | |
| session_id=session_id, | |
| max_items=CONTEXT_CACHE_ITEMS, | |
| ) | |
| excluded_questions = await _get_session_question_texts(redis, session_id) | |
| evaluation = await evaluate_and_generate_followup( | |
| role_title=session.get("role_title", "Software Developer"), | |
| required_skills=_safe_json_list(session.get("jd_required_skills", "[]")), | |
| recent_context=recent_context, | |
| current_question=question_text, | |
| current_answer=answer, | |
| excluded_questions=excluded_questions, | |
| focus_topic=recent_focus_topic or "", | |
| same_topic_streak=same_topic_streak, | |
| ) | |
| await redis.hset( | |
| f"session:{session_id}:a:{question_id}", | |
| mapping={ | |
| "score": str(_safe_int(evaluation.get("score", 0))), | |
| "feedback": evaluation.get("feedback", ""), | |
| }, | |
| ) | |
| metrics_delta = { | |
| "gemini_calls": 1, | |
| "gemini_questions": 0, | |
| "bank_questions": 0, | |
| "bank_shortfall": 0, | |
| "generation_batches": 1, | |
| } | |
| generated_count = _safe_int(session.get("generated_count", 0)) | |
| follow_text, focus_skill_override = _apply_resume_followup_policy( | |
| skill_pool=skill_pool, | |
| recent_focus_topic=recent_focus_topic, | |
| same_topic_streak=same_topic_streak, | |
| suggested_question=(evaluation.get("followup_question") or "").strip(), | |
| suggested_topic=(evaluation.get("followup_topic") or "").strip(), | |
| followup_need_score=_safe_score_0_100(evaluation.get("followup_need_score", 0)), | |
| answered_count=answered_count, | |
| ) | |
| if answered_count < max_questions and session.get("status") == "in_progress": | |
| qid, used_model_followup = await _enqueue_resume_followup_with_fallback( | |
| redis=redis, | |
| session_id=session_id, | |
| session=session, | |
| answered_count=answered_count, | |
| suggested_text=follow_text, | |
| suggested_difficulty=evaluation.get("difficulty", "medium"), | |
| suggested_category=evaluation.get("category", "follow-up"), | |
| focus_skill_override=focus_skill_override, | |
| ) | |
| if qid: | |
| generated_count += 1 | |
| if used_model_followup: | |
| metrics_delta["gemini_questions"] += 1 | |
| await _apply_generation_metric_delta( | |
| db=db, | |
| redis=redis, | |
| session_id=session_id, | |
| session=session, | |
| metrics_delta=metrics_delta, | |
| generated_count=generated_count, | |
| ) | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if session.get("status") == "in_progress": | |
| qid, q = await peek_next_question(redis, session_id) | |
| if qid and q: | |
| _schedule_question_audio_prefetch( | |
| [q.get("question", "")], | |
| _normalize_voice_gender(session.get("speech_voice_gender")), | |
| ) | |
| async def _post_submit_topic_processing( | |
| session_id: str, | |
| answered_count: int, | |
| ) -> None: | |
| db = get_db() | |
| redis = get_redis() | |
| if answered_count < TOPIC_INITIAL_ASK_COUNT: | |
| return | |
| async with _get_post_submit_lock(session_id): | |
| session = await redis.hgetall(f"session:{session_id}") | |
| if not session: | |
| return | |
| max_questions = max( | |
| TOPIC_TOTAL_QUESTIONS, | |
| _safe_int(session.get("max_questions", TOPIC_TOTAL_QUESTIONS)), | |
| ) | |
| generated_count = _safe_int(session.get("generated_count", 0)) | |
| remaining_needed = max(0, max_questions - generated_count) | |
| if remaining_needed <= 0: | |
| await redis.hset(f"session:{session_id}", mapping={"topic_followups_generated": "1"}) | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"topic_followups_generated": True, "max_questions": max_questions}}, | |
| ) | |
| return | |
| if session.get("topic_followups_generated", "0") == "1": | |
| return | |
| qa_pairs = await get_session_qa(session_id) | |
| excluded_questions = await _get_session_question_texts(redis, session_id) | |
| ai_target = min(TOPIC_AI_FOLLOWUPS, remaining_needed) | |
| ai_items = await generate_topic_followup_batch( | |
| topic_name=session.get("role_title", "Topic Interview"), | |
| qa_pairs=qa_pairs, | |
| excluded_questions=excluded_questions, | |
| count=ai_target, | |
| ) | |
| db_target = max(0, remaining_needed - len(ai_items)) | |
| db_items = await _sample_topic_questions( | |
| db=db, | |
| topic_id=session.get("topic_id", ""), | |
| excluded_questions=excluded_questions + [i.get("question", "") for i in ai_items], | |
| limit=db_target, | |
| ) | |
| topic_added = 0 | |
| ai_added = 0 | |
| db_added = 0 | |
| for item in ai_items: | |
| qid = await enqueue_question( | |
| redis=redis, | |
| session_id=session_id, | |
| question=item.get("question", ""), | |
| difficulty=item.get("difficulty", "medium"), | |
| category=item.get("category", session.get("role_title", "topic")), | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if qid: | |
| topic_added += 1 | |
| ai_added += 1 | |
| for item in db_items: | |
| qid = await enqueue_question( | |
| redis=redis, | |
| session_id=session_id, | |
| question=item.get("question", ""), | |
| difficulty=item.get("difficulty", "medium"), | |
| category=item.get("category", session.get("role_title", "topic")), | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if qid: | |
| topic_added += 1 | |
| db_added += 1 | |
| fallback_added = 0 | |
| fallback_variants = max(10, remaining_needed * 4) | |
| for variant in range(fallback_variants): | |
| if topic_added >= remaining_needed: | |
| break | |
| next_question_number = generated_count + topic_added + 1 | |
| fallback_text = _build_topic_resilient_followup_question( | |
| session=session, | |
| question_number=next_question_number, | |
| variant=variant, | |
| ) | |
| qid = await enqueue_question( | |
| redis=redis, | |
| session_id=session_id, | |
| question=fallback_text, | |
| difficulty="medium", | |
| category="topic-fallback", | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if qid: | |
| topic_added += 1 | |
| fallback_added += 1 | |
| generated_count += topic_added | |
| await _apply_generation_metric_delta( | |
| db=db, | |
| redis=redis, | |
| session_id=session_id, | |
| session=session, | |
| metrics_delta={ | |
| "gemini_calls": 1 if ai_target > 0 else 0, | |
| "gemini_questions": ai_added, | |
| "bank_questions": db_added + fallback_added, | |
| "bank_shortfall": max(0, remaining_needed - topic_added), | |
| "generation_batches": 1, | |
| }, | |
| generated_count=generated_count, | |
| extra_redis_fields={ | |
| "topic_followups_generated": "1", | |
| "max_questions": str(max_questions), | |
| }, | |
| extra_db_fields={ | |
| "topic_followups_generated": True, | |
| "max_questions": max_questions, | |
| }, | |
| ) | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| if session.get("status") == "in_progress": | |
| qid, q = await peek_next_question(redis, session_id) | |
| if qid and q: | |
| _schedule_question_audio_prefetch( | |
| [q.get("question", "")], | |
| _normalize_voice_gender(session.get("speech_voice_gender")), | |
| ) | |
| def _schedule_post_submit_processing( | |
| *, | |
| session_id: str, | |
| question_id: str, | |
| question_text: str, | |
| answer: str, | |
| answered_count: int, | |
| max_questions: int, | |
| interview_type: str, | |
| ) -> None: | |
| try: | |
| if interview_type == "resume": | |
| task = asyncio.create_task( | |
| _post_submit_resume_processing( | |
| session_id=session_id, | |
| question_id=question_id, | |
| question_text=question_text, | |
| answer=answer, | |
| answered_count=answered_count, | |
| max_questions=max_questions, | |
| ) | |
| ) | |
| task.add_done_callback(_consume_post_submit_task_result) | |
| return | |
| if interview_type == "topic": | |
| task = asyncio.create_task( | |
| _post_submit_topic_processing( | |
| session_id=session_id, | |
| answered_count=answered_count, | |
| ) | |
| ) | |
| task.add_done_callback(_consume_post_submit_task_result) | |
| except Exception: | |
| # Never block request response on scheduler errors. | |
| return | |
| async def submit_answer(session_id: str, question_id: str, answer: str) -> dict: | |
| """Submit answer and return next queued question immediately.""" | |
| started_at = perf_counter() | |
| db = get_db() | |
| redis = get_redis() | |
| session = await redis.hgetall(f"session:{session_id}") | |
| if not session: | |
| raise ValueError("Interview session not found or expired") | |
| if session.get("status") != "in_progress": | |
| raise ValueError("Interview is not in progress") | |
| current_q = await redis.hgetall(f"session:{session_id}:q:{question_id}") | |
| current_question_text = current_q.get("question", "") | |
| await redis.hset( | |
| f"session:{session_id}:a:{question_id}", | |
| mapping={ | |
| "question_id": question_id, | |
| "answer": answer, | |
| "question": current_question_text, | |
| "difficulty": current_q.get("difficulty", "medium"), | |
| "category": current_q.get("category", "general"), | |
| "submitted_at": utc_now(), | |
| }, | |
| ) | |
| await redis.rpush(f"session:{session_id}:answers", question_id) | |
| await redis.expire(f"session:{session_id}:a:{question_id}", SESSION_TTL) | |
| await redis.expire(f"session:{session_id}:answers", SESSION_TTL) | |
| await db[ANSWERS].update_one( | |
| { | |
| "session_id": session_id, | |
| "question_id": question_id, | |
| "user_id": session.get("user_id"), | |
| }, | |
| { | |
| "$set": { | |
| "question": current_question_text, | |
| "answer": answer, | |
| "difficulty": current_q.get("difficulty", "medium"), | |
| "category": current_q.get("category", "general"), | |
| "stored_at": utc_now(), | |
| } | |
| }, | |
| upsert=True, | |
| ) | |
| question_count = _safe_int(session.get("question_count", 1)) | |
| answered_count = _safe_int(session.get("answered_count", 0)) + 1 | |
| served_count = _safe_int(session.get("served_count", 1)) | |
| generated_count = _safe_int(session.get("generated_count", 0)) | |
| max_questions = _safe_int(session.get("max_questions", MAX_QUESTIONS)) | |
| interview_type = session.get("interview_type", "resume") | |
| speech_voice_gender = _normalize_voice_gender(session.get("speech_voice_gender")) | |
| if interview_type == "resume" and max_questions < RESUME_MAX_QUESTIONS: | |
| max_questions = RESUME_MAX_QUESTIONS | |
| await redis.hset(f"session:{session_id}", mapping={"max_questions": str(max_questions)}) | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"max_questions": max_questions}}, | |
| ) | |
| if interview_type == "topic" and max_questions < TOPIC_TOTAL_QUESTIONS: | |
| max_questions = TOPIC_TOTAL_QUESTIONS | |
| await redis.hset(f"session:{session_id}", mapping={"max_questions": str(max_questions)}) | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"max_questions": max_questions}}, | |
| ) | |
| _update_local_summary(session_id, current_question_text, answer) | |
| await push_context_item( | |
| redis=redis, | |
| session_id=session_id, | |
| item={ | |
| "question": current_question_text, | |
| "answer": answer, | |
| }, | |
| ttl_seconds=SESSION_TTL, | |
| max_items=CONTEXT_CACHE_ITEMS, | |
| ) | |
| if answered_count >= max_questions: | |
| await redis.hset( | |
| f"session:{session_id}", | |
| mapping={ | |
| "status": "completed", | |
| "answered_count": str(answered_count), | |
| }, | |
| ) | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"status": "completed", "completed_at": utc_now()}}, | |
| ) | |
| submit_ms = await _record_submit_latency(started_at) | |
| return { | |
| "session_id": session_id, | |
| "next_question": None, | |
| "is_complete": True, | |
| "message": "Interview complete! Generating your report...", | |
| "submit_ms": submit_ms, | |
| } | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| next_question_id, q_data = await pop_next_question(redis, session_id) | |
| effective_stats = _current_generation_stats(session) | |
| fallback_evaluation = None | |
| # Emergency fallback for rare queue-empty cases. | |
| if not next_question_id and interview_type == "resume": | |
| skill_pool = _resume_skill_pool(session) | |
| recent_answered_questions = await _get_answered_question_texts( | |
| redis=redis, | |
| session_id=session_id, | |
| limit=4, | |
| ) | |
| recent_focus_topic, same_topic_streak = _recent_focus_streak( | |
| recent_answered_questions, | |
| skill_pool, | |
| ) | |
| recent_context = await get_recent_context_items( | |
| redis=redis, | |
| session_id=session_id, | |
| max_items=CONTEXT_CACHE_ITEMS, | |
| ) | |
| excluded_questions = await _get_session_question_texts(redis, session_id) | |
| fallback_evaluation = await evaluate_and_generate_followup( | |
| role_title=session.get("role_title", "Software Developer"), | |
| required_skills=_safe_json_list(session.get("jd_required_skills", "[]")), | |
| recent_context=recent_context, | |
| current_question=current_question_text, | |
| current_answer=answer, | |
| excluded_questions=excluded_questions, | |
| focus_topic=recent_focus_topic or "", | |
| same_topic_streak=same_topic_streak, | |
| ) | |
| await redis.hset( | |
| f"session:{session_id}:a:{question_id}", | |
| mapping={ | |
| "score": str(_safe_int(fallback_evaluation.get("score", 0))), | |
| "feedback": fallback_evaluation.get("feedback", ""), | |
| }, | |
| ) | |
| fallback_delta = { | |
| "gemini_calls": 1, | |
| "gemini_questions": 0, | |
| "bank_questions": 0, | |
| "bank_shortfall": 0, | |
| "generation_batches": 1, | |
| } | |
| follow_text, focus_skill_override = _apply_resume_followup_policy( | |
| skill_pool=skill_pool, | |
| recent_focus_topic=recent_focus_topic, | |
| same_topic_streak=same_topic_streak, | |
| suggested_question=(fallback_evaluation.get("followup_question") or "").strip(), | |
| suggested_topic=(fallback_evaluation.get("followup_topic") or "").strip(), | |
| followup_need_score=_safe_score_0_100(fallback_evaluation.get("followup_need_score", 0)), | |
| answered_count=answered_count, | |
| ) | |
| if answered_count < max_questions: | |
| qid, used_model_followup = await _enqueue_resume_followup_with_fallback( | |
| redis=redis, | |
| session_id=session_id, | |
| session=session, | |
| answered_count=answered_count, | |
| suggested_text=follow_text, | |
| suggested_difficulty=fallback_evaluation.get("difficulty", "medium"), | |
| suggested_category=fallback_evaluation.get("category", "follow-up"), | |
| focus_skill_override=focus_skill_override, | |
| ) | |
| if qid: | |
| generated_count += 1 | |
| if used_model_followup: | |
| fallback_delta["gemini_questions"] = 1 | |
| effective_stats = await _apply_generation_metric_delta( | |
| db=db, | |
| redis=redis, | |
| session_id=session_id, | |
| session=session, | |
| metrics_delta=fallback_delta, | |
| generated_count=generated_count, | |
| ) | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| next_question_id, q_data = await pop_next_question(redis, session_id) | |
| if ( | |
| not next_question_id | |
| and interview_type == "topic" | |
| and answered_count < max_questions | |
| ): | |
| # Topic follow-up generation runs in background, so synchronously top-up once | |
| # before concluding interview to avoid premature completion around Q4. | |
| await _post_submit_topic_processing( | |
| session_id=session_id, | |
| answered_count=answered_count, | |
| ) | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| next_question_id, q_data = await pop_next_question(redis, session_id) | |
| if not next_question_id or not q_data: | |
| await redis.hset( | |
| f"session:{session_id}", | |
| mapping={"status": "completed", "answered_count": str(answered_count)}, | |
| ) | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| {"$set": {"status": "completed", "completed_at": utc_now()}}, | |
| ) | |
| submit_ms = await _record_submit_latency(started_at) | |
| payload = { | |
| "session_id": session_id, | |
| "next_question": None, | |
| "is_complete": True, | |
| "message": "Interview complete! Generating your report...", | |
| "submit_ms": submit_ms, | |
| } | |
| if fallback_evaluation: | |
| payload["answer_evaluation"] = { | |
| "score": _safe_int(fallback_evaluation.get("score", 0)), | |
| "feedback": fallback_evaluation.get("feedback", ""), | |
| } | |
| return payload | |
| await mark_question_asked( | |
| redis=redis, | |
| session_id=session_id, | |
| question_text=q_data.get("question", ""), | |
| ttl_seconds=SESSION_TTL, | |
| ) | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| peek_next_id, peek_q = await peek_next_question(redis, session_id) | |
| if peek_next_id and peek_q: | |
| _schedule_question_audio_prefetch([peek_q.get("question", "")], speech_voice_gender) | |
| next_difficulty = q_data.get("difficulty", session.get("current_difficulty", "medium")) | |
| new_question_count = question_count + 1 | |
| new_served_count = served_count + 1 | |
| await redis.hset( | |
| f"session:{session_id}", | |
| mapping={ | |
| "question_count": str(new_question_count), | |
| "answered_count": str(answered_count), | |
| "served_count": str(new_served_count), | |
| "generated_count": str(generated_count), | |
| "current_difficulty": next_difficulty, | |
| }, | |
| ) | |
| response = { | |
| "session_id": session_id, | |
| "next_question": { | |
| "question_id": next_question_id, | |
| "question": q_data.get("question", "Can you elaborate further?"), | |
| "difficulty": q_data.get("difficulty", "medium"), | |
| "question_number": new_served_count, | |
| "total_questions": max_questions, | |
| }, | |
| "is_complete": False, | |
| "message": f"Question {new_served_count} of {max_questions}", | |
| "generation_stats": effective_stats, | |
| } | |
| if fallback_evaluation: | |
| response["answer_evaluation"] = { | |
| "score": _safe_int(fallback_evaluation.get("score", 0)), | |
| "feedback": fallback_evaluation.get("feedback", ""), | |
| } | |
| elif interview_type == "resume": | |
| response["answer_evaluation"] = { | |
| "status": "processing", | |
| } | |
| _schedule_post_submit_processing( | |
| session_id=session_id, | |
| question_id=question_id, | |
| question_text=current_question_text, | |
| answer=answer, | |
| answered_count=answered_count, | |
| max_questions=max_questions, | |
| interview_type=interview_type, | |
| ) | |
| submit_ms = await _record_submit_latency(started_at) | |
| response["submit_ms"] = submit_ms | |
| return response | |
| async def get_next_question(session_id: str, user_id: str) -> dict: | |
| """Preview next queued question without submitting a new answer.""" | |
| db = get_db() | |
| redis = get_redis() | |
| session_doc = await db[SESSIONS].find_one({"session_id": session_id}) | |
| if not session_doc: | |
| raise ValueError("Session not found") | |
| if session_doc.get("user_id") != user_id: | |
| raise ValueError("Unauthorized access to session") | |
| session = await redis.hgetall(f"session:{session_id}") | |
| if not session: | |
| raise ValueError("Interview session not found or expired") | |
| if session.get("status") != "in_progress": | |
| return { | |
| "session_id": session_id, | |
| "next_question": None, | |
| "is_complete": True, | |
| "message": "Interview is not in progress", | |
| } | |
| await flush_backlog_to_queue( | |
| redis=redis, | |
| session_id=session_id, | |
| ttl_seconds=SESSION_TTL, | |
| max_queue_size=MAX_QUEUE_SIZE, | |
| ) | |
| qid, q = await peek_next_question(redis, session_id) | |
| if not qid or not q: | |
| return { | |
| "session_id": session_id, | |
| "next_question": None, | |
| "is_complete": False, | |
| "message": "No queued question yet", | |
| "queue_size": await queue_size(redis, session_id), | |
| } | |
| return { | |
| "session_id": session_id, | |
| "next_question": { | |
| "question_id": qid, | |
| "question": q.get("question", ""), | |
| "difficulty": q.get("difficulty", "medium"), | |
| "category": q.get("category", "general"), | |
| }, | |
| "is_complete": False, | |
| "queue_size": await queue_size(redis, session_id), | |
| "message": "Next question ready", | |
| } | |
| async def quit_interview(session_id: str, user_id: str) -> dict: | |
| """Mark an interview as quit and indicate whether a partial report can be generated.""" | |
| db = get_db() | |
| redis = get_redis() | |
| session = await db[SESSIONS].find_one({"session_id": session_id}) | |
| if not session: | |
| raise ValueError("Session not found") | |
| if session.get("user_id") != user_id: | |
| raise ValueError("Unauthorized access to session") | |
| if session.get("status") in {"completed", "quit", "quit_with_report"}: | |
| return { | |
| "session_id": session_id, | |
| "report_generated": session.get("status") == "quit_with_report", | |
| "message": "Interview already finalized", | |
| } | |
| quit_at = utc_now() | |
| # Update Redis state if still present. | |
| redis_session_key = f"session:{session_id}" | |
| redis_session = await redis.hgetall(redis_session_key) | |
| answered_count = int(redis_session.get("answered_count", 0)) if redis_session else 0 | |
| if redis_session: | |
| await redis.hset( | |
| redis_session_key, | |
| mapping={ | |
| "status": "quit", | |
| "quit_at": quit_at, | |
| }, | |
| ) | |
| await redis.expire(redis_session_key, SESSION_TTL) | |
| # Persist quit metadata for admin visibility. | |
| await db[SESSIONS].update_one( | |
| {"session_id": session_id}, | |
| { | |
| "$set": { | |
| "status": "quit", | |
| "quit_at": quit_at, | |
| "quit_reason": "user_requested", | |
| "answered_count": answered_count, | |
| } | |
| }, | |
| ) | |
| has_answers = answered_count > 0 | |
| return { | |
| "session_id": session_id, | |
| "report_generated": has_answers, | |
| "message": "Interview quit successfully" if has_answers else "Interview quit. No answers to evaluate yet.", | |
| } | |
| async def get_session_qa(session_id: str) -> list: | |
| """Get all Q&A pairs from Redis for a session.""" | |
| redis = get_redis() | |
| answer_ids = await redis.lrange(f"session:{session_id}:answers", 0, -1) | |
| qa_pairs = [] | |
| if answer_ids: | |
| for qid in answer_ids: | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| a = await redis.hgetall(f"session:{session_id}:a:{qid}") | |
| if not a: | |
| continue | |
| question_text = (a.get("question") or q.get("question") or "").strip() | |
| answer_text = (a.get("answer") or "").strip() | |
| if not question_text or not answer_text: | |
| continue | |
| qa_pairs.append({ | |
| "question_id": qid, | |
| "question": question_text, | |
| "answer": answer_text, | |
| "difficulty": a.get("difficulty") or q.get("difficulty", "medium"), | |
| "category": a.get("category") or q.get("category", "general"), | |
| }) | |
| if qa_pairs: | |
| return qa_pairs | |
| question_ids = await redis.lrange(f"session:{session_id}:questions", 0, -1) | |
| for qid in question_ids: | |
| q = await redis.hgetall(f"session:{session_id}:q:{qid}") | |
| a = await redis.hgetall(f"session:{session_id}:a:{qid}") | |
| if q and a: | |
| qa_pairs.append({ | |
| "question_id": qid, | |
| "question": q.get("question", ""), | |
| "answer": a.get("answer", ""), | |
| "difficulty": q.get("difficulty", "medium"), | |
| "category": q.get("category", "general"), | |
| }) | |
| return qa_pairs | |
| def cleanup_interview_local_state(session_id: str) -> None: | |
| """Cleanup process-local state for a completed session.""" | |
| _LOCAL_SUMMARIES.pop(session_id, None) | |
| _PREGEN_IN_FLIGHT.discard(session_id) | |
| _POST_SUBMIT_LOCKS.pop(session_id, None) | |