import json import asyncio import random import re from time import perf_counter from bson import ObjectId from database import get_db, get_redis from models.collections import SESSIONS, USERS, JOB_ROLES, SKILLS, QUESTIONS, TOPICS, TOPIC_QUESTIONS, RESUMES, JD_VERIFICATIONS, ANSWERS from utils.helpers import generate_id, utc_now, str_objectid from utils.skills import normalize_skill_list, build_interview_focus_skills from services.interview_graph import run_interview_graph from utils.gemini import generate_interview_question_batch, analyze_resume_vs_job_description from services.job_description_service import get_job_description_for_user from services.gemini_service import ( evaluate_and_generate_followup, generate_resume_seed_questions, generate_topic_followup_batch, ) from services.queue_service import ( enqueue_question, flush_backlog_to_queue, get_recent_context_items, mark_question_asked, normalize_question_text, peek_next_question, pop_next_question, push_context_item, queue_size, ) from services.tts_service import prefetch_wav from services.latency_service import record_latency MAX_QUESTIONS = 20 RESUME_MAX_QUESTIONS = 10 RESUME_INITIAL_BATCH_SIZE = 2 SESSION_TTL = 7200 # 2 hours BATCH_SIZE = 5 PREGEN_MIN_PENDING = 2 FOLLOWUP_AI_COUNT = 2 FOLLOWUP_BANK_COUNT = 3 MAX_QUEUE_SIZE = 3 CONTEXT_CACHE_ITEMS = 3 TOPIC_INITIAL_DB_QUESTIONS = 5 TOPIC_INITIAL_ASK_COUNT = 4 TOPIC_AI_FOLLOWUPS = 3 TOPIC_DB_FOLLOWUPS = 2 TOPIC_TOTAL_QUESTIONS = 10 MAX_SAME_TOPIC_FOLLOWUPS = 2 THIRD_FOLLOWUP_NEED_SCORE = 95 # Local process memory summary requested in workflow. _LOCAL_SUMMARIES: dict[str, str] = {} _PREGEN_IN_FLIGHT: set[str] = set() _POST_SUBMIT_LOCKS: dict[str, asyncio.Lock] = {} _QUESTION_STOPWORDS = { "a", "an", "and", "are", "as", "at", "be", "by", "for", "from", "how", "if", "in", "into", "is", "it", "of", "on", "or", "that", "the", "this", "to", "using", "what", "when", "with", "would", } _GENERIC_SOFT_SKILL_KEYS = { "problem solving", "analytical skills", "communication", "communication skills", "teamwork", "leadership", "adaptability", "time management", "critical thinking", } def _safe_json_list(value: str) -> list: try: data = json.loads(value or "[]") return data if isinstance(data, list) else [] except Exception: return [] def _question_fingerprint(text: str) -> str: base = (text or "").strip().lower() base = re.sub(r"[^a-z0-9\s]", " ", base) base = re.sub(r"\s+", " ", base).strip() return base def _question_token_set(text: str) -> set[str]: key = _question_fingerprint(text) tokens = [token for token in key.split() if token and token not in _QUESTION_STOPWORDS] return set(tokens) def _is_question_too_similar(candidate: str, recent_questions: list[str]) -> bool: candidate_key = _question_fingerprint(candidate) if not candidate_key: return True candidate_tokens = _question_token_set(candidate) candidate_opening = " ".join(candidate_key.split()[:6]) for text in (recent_questions or [])[-5:]: other_key = _question_fingerprint(text) if not other_key: continue if candidate_key == other_key: return True other_opening = " ".join(other_key.split()[:6]) if candidate_opening and candidate_opening == other_opening: return True other_tokens = _question_token_set(text) if not candidate_tokens or not other_tokens: continue intersection = len(candidate_tokens & other_tokens) union = len(candidate_tokens | other_tokens) if union <= 0: continue jaccard = intersection / union if jaccard >= 0.72: return True return False def _unique_question_items(items: list[dict], *, excluded_questions: list[str], limit: int) -> list[dict]: excluded = {_question_fingerprint(q) for q in excluded_questions if q} unique: list[dict] = [] for item in items or []: text = (item.get("question") or "").strip() if not text: continue key = _question_fingerprint(text) if not key or key in excluded: continue excluded.add(key) unique.append( { "question": text, "difficulty": item.get("difficulty", "medium"), "category": item.get("category", "general"), } ) if len(unique) >= limit: break return unique def _update_local_summary(session_id: str, question: str, answer: str) -> None: existing = _LOCAL_SUMMARIES.get(session_id, "") combined = f"{existing}\nQ: {question}\nA: {answer}".strip() # Keep summary bounded in memory. _LOCAL_SUMMARIES[session_id] = combined[-1500:] def _safe_int(value, default: int = 0) -> int: try: return int(value) except Exception: return default def _safe_score_0_100(value, default: int = 0) -> int: score = _safe_int(value, default) if score < 0: return 0 if score > 100: return 100 return score def _normalize_voice_gender(value: str | None) -> str: return "male" if (value or "").strip().lower() == "male" else "female" def _consume_prefetch_task_result(task: asyncio.Task) -> None: try: task.result() except Exception: # Prefetch is optional; ignore failures to avoid noisy task warnings. pass def _schedule_question_audio_prefetch(questions: list[str], voice_gender: str) -> None: for q in questions: text = (q or "").strip() if not text: continue try: task = asyncio.create_task(prefetch_wav(text, voice_gender)) task.add_done_callback(_consume_prefetch_task_result) except Exception: # Best-effort optimization only. pass def _get_post_submit_lock(session_id: str) -> asyncio.Lock: lock = _POST_SUBMIT_LOCKS.get(session_id) if lock is None: lock = asyncio.Lock() _POST_SUBMIT_LOCKS[session_id] = lock return lock def _consume_post_submit_task_result(task: asyncio.Task) -> None: try: task.result() except Exception: # Background processing is best-effort; ignore task-level failures. pass def _current_generation_stats(session: dict) -> dict: return { "gemini_calls": _safe_int(session.get("metrics_gemini_calls", 0)), "gemini_questions": _safe_int(session.get("metrics_gemini_questions", 0)), "bank_questions": _safe_int(session.get("metrics_bank_questions", 0)), "bank_shortfall": _safe_int(session.get("metrics_bank_shortfall", 0)), "generation_batches": _safe_int(session.get("metrics_generation_batches", 0)), } def _normalize_bank_difficulty(value: str) -> str: difficulty = (value or "medium").strip().lower() if difficulty not in {"easy", "medium", "hard"}: return "medium" if difficulty == "easy": return "medium" return difficulty def _resume_skill_pool(session: dict) -> list[str]: jd_skills = normalize_skill_list(_safe_json_list(session.get("jd_required_skills", "[]"))) focus_skills = normalize_skill_list(_safe_json_list(session.get("skills", "[]"))) ordered: list[str] = [] seen: set[str] = set() for skill in jd_skills + focus_skills: key = _question_fingerprint(skill) if not key or key in seen: continue seen.add(key) ordered.append(skill) concrete = [skill for skill in ordered if _question_fingerprint(skill) not in _GENERIC_SOFT_SKILL_KEYS] if len(concrete) >= 2: return concrete return ordered or ["core technical concepts"] def _infer_focus_skill_from_question(question_text: str, skill_pool: list[str]) -> str | None: normalized_question = _question_fingerprint(question_text) if not normalized_question: return None best_skill = None best_score = 0 for skill in skill_pool: normalized_skill = _question_fingerprint(skill) if not normalized_skill: continue tokens = [token for token in normalized_skill.split() if len(token) >= 3] if not tokens: tokens = normalized_skill.split() score = sum(1 for token in tokens if token and token in normalized_question) if normalized_skill in normalized_question: score = max(score, len(tokens) + 1) if score > best_score: best_score = score best_skill = skill return best_skill if best_score > 0 else None def _recent_focus_streak(question_texts: list[str], skill_pool: list[str]) -> tuple[str | None, int]: active_skill = None streak = 0 for text in reversed(question_texts): skill = _infer_focus_skill_from_question(text, skill_pool) if not skill: break if active_skill is None: active_skill = skill streak = 1 continue if _question_fingerprint(skill) == _question_fingerprint(active_skill): streak += 1 continue break return active_skill, streak def _pick_alternate_focus_skill(skill_pool: list[str], current_skill: str | None, seed: int) -> str | None: if not skill_pool: return None if current_skill: current_key = _question_fingerprint(current_skill) alternatives = [skill for skill in skill_pool if _question_fingerprint(skill) != current_key] if alternatives: return alternatives[max(0, seed) % len(alternatives)] return skill_pool[max(0, seed) % len(skill_pool)] def _apply_resume_followup_policy( *, skill_pool: list[str], recent_focus_topic: str | None, same_topic_streak: int, suggested_question: str, suggested_topic: str | None, followup_need_score: int, answered_count: int, ) -> tuple[str, str | None]: follow_text = (suggested_question or "").strip() topic = (suggested_topic or "").strip() if not topic and follow_text: inferred = _infer_focus_skill_from_question(follow_text, skill_pool) if inferred: topic = inferred topic_key = _question_fingerprint(topic) recent_key = _question_fingerprint(recent_focus_topic or "") if ( same_topic_streak >= MAX_SAME_TOPIC_FOLLOWUPS and topic_key and recent_key and topic_key == recent_key and _safe_score_0_100(followup_need_score) < THIRD_FOLLOWUP_NEED_SCORE ): return "", _pick_alternate_focus_skill(skill_pool, recent_focus_topic, answered_count) return follow_text, None def _avg_recent_answer_words(qa_pairs: list, window: int = 3) -> int: if not qa_pairs: return 0 recent = qa_pairs[-window:] lengths = [len((item.get("answer") or "").split()) for item in recent] if not lengths: return 0 return sum(lengths) // len(lengths) def _plan_followup_mix(target: int, qa_pairs: list, has_bank_source: bool) -> tuple[int, int]: """Decide AI-vs-bank split for the next batch. Baseline: 3 AI + 2 bank. Adaptation: - Short answers -> increase bank ratio for stability. - Rich answers -> increase AI follow-up ratio for personalization. """ if target <= 0: return 0, 0 if not has_bank_source: return target, 0 avg_words = _avg_recent_answer_words(qa_pairs) ai_target = min(FOLLOWUP_AI_COUNT, target) if avg_words < 18: ai_target = min(2, target) elif avg_words > 70: ai_target = min(4, target) # Keep at least one bank question when a bank source exists and batch size allows. if target > 1: ai_target = min(ai_target, target - 1) bank_target = target - ai_target return ai_target, bank_target async def _resolve_role_title(db, role_id: str | None, custom_role: str | None) -> str: if custom_role and custom_role.strip(): return custom_role.strip() if role_id: try: role = await db[JOB_ROLES].find_one({"_id": ObjectId(role_id)}) if role: return role["title"] except Exception: # If it's not a valid ObjectId, treat it as a direct generic title. return role_id return "Software Developer" async def _get_recent_user_questions(db, user_id: str, limit: int = 40) -> list[str]: recent: list[str] = [] seen: set[str] = set() cursor = db[ANSWERS].find({"user_id": user_id}, {"question": 1}).sort("stored_at", -1).limit(limit) async for doc in cursor: text = (doc.get("question") or "").strip() key = _question_fingerprint(text) if not text or not key or key in seen: continue seen.add(key) recent.append(text) return recent def _build_resume_intro_question(role_title: str, jd_title: str) -> str: role = (role_title or "this role").strip() title = (jd_title or "").strip() def _normalized_key(value: str) -> str: key = re.sub(r"[^a-z0-9\s]", " ", (value or "").lower()) key = re.sub(r"\s+", " ", key).strip() for prefix in ("the ", "an ", "a "): if key.startswith(prefix): key = key[len(prefix):].strip() break return key role_clean = re.sub(r"\s+", " ", role).strip() if role_clean.lower().startswith("the "): role_clean = role_clean[4:].strip() role_phrase = f"the {role_clean}" if role_clean.lower().endswith(" role") else f"the {role_clean} role" role_key = _normalized_key(role_clean) title_key = _normalized_key(title) is_generic_title = title_key in { "", "selected job description", role_key, f"{role_key} role", } if is_generic_title: return f"Introduce yourself and explain how your background aligns with {role_phrase}." title_phrase = title if title.lower().startswith(("the ", "an ", "a ")) else f"the {title}" return ( f"Introduce yourself and explain how your background aligns with {role_phrase} " f"in {title_phrase} job description." ) def _build_resume_resilient_followup_question( session: dict, question_number: int, variant: int = 0, focus_skill: str | None = None, ) -> str: role_title = (session.get("role_title") or "this role").strip() skill_pool = _resume_skill_pool(session) index = max(0, question_number - 1) + max(0, variant) skill = (focus_skill or "").strip() or skill_pool[index % len(skill_pool)] templates = [ "Describe a real project where you applied {skill} for {role}. What constraints and trade-offs shaped your design?", "If {skill} failed in production for a {role} workflow, how would you debug it step by step?", "Explain how you would test and validate a solution using {skill} before shipping it for {role}.", "Compare two approaches for {skill} in a {role} context and justify the final choice.", "Design an improvement plan to make your {skill} implementation more scalable and reliable for {role}.", "Your {role} service using {skill} has intermittent latency spikes. How would you investigate and stabilize it?", "During code review, what risks would you look for in a {skill} implementation for {role}, and why?", "How would you design rollback and observability for a feature centered on {skill} in {role}?", "Assume two engineers propose different {skill} strategies for {role}. How would you evaluate and choose between them?", "What failure modes around {skill} are easiest to miss in {role}, and how would you proactively test them?", ] template = templates[index % len(templates)] return template.format(skill=skill, role=role_title) def _build_topic_resilient_followup_question(session: dict, question_number: int, variant: int = 0) -> str: topic_name = (session.get("role_title") or "this topic").strip() index = max(0, question_number - 1) + max(0, variant) templates = [ "Explain {topic} with a practical example from a production-like scenario.", "What are the most common failure patterns in {topic}, and how would you detect them early?", "Design a step-by-step implementation plan for {topic} with measurable checkpoints.", "Compare two approaches in {topic}, including trade-offs in scalability, latency, and maintainability.", "If a {topic} solution regressed after deployment, how would you triage and recover safely?", ] template = templates[index % len(templates)] return template.format(topic=topic_name) async def _enqueue_resume_followup_with_fallback( *, redis, session_id: str, session: dict, answered_count: int, suggested_text: str, suggested_difficulty: str, suggested_category: str, focus_skill_override: str | None = None, ) -> tuple[str | None, bool]: candidates: list[tuple[str, str, str, bool]] = [] existing_questions = await _get_session_question_texts(redis, session_id) primary = (suggested_text or "").strip() if primary: candidates.append((primary, suggested_difficulty or "medium", suggested_category or "follow-up", True)) # Deterministic local fallback prevents early completion when model output is empty/duplicate. base_question_number = max(2, answered_count + 1) for variant in range(6): question_number = base_question_number + variant fallback_text = _build_resume_resilient_followup_question( session=session, question_number=question_number, variant=variant, focus_skill=focus_skill_override, ) candidates.append((fallback_text, "medium", "resume-fallback", False)) seen: set[str] = set() for text, difficulty, category, is_primary in candidates: normalized_text = normalize_question_text(text) if _is_question_too_similar(normalized_text, existing_questions): continue key = _question_fingerprint(normalized_text) if not key or key in seen: continue seen.add(key) qid = await enqueue_question( redis=redis, session_id=session_id, question=normalized_text, difficulty=difficulty, category=category, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if qid: existing_questions.append(normalized_text) return qid, is_primary return None, False async def _get_session_question_texts(redis, session_id: str) -> list[str]: question_ids = await redis.lrange(f"session:{session_id}:questions", 0, -1) output: list[str] = [] for qid in question_ids: q = await redis.hgetall(f"session:{session_id}:q:{qid}") text = (q.get("question") or "").strip() if text: output.append(text) return output async def _get_answered_question_texts(redis, session_id: str, limit: int = 4) -> list[str]: answer_ids = await redis.lrange(f"session:{session_id}:answers", -max(1, limit), -1) output: list[str] = [] for qid in answer_ids: answer_data = await redis.hgetall(f"session:{session_id}:a:{qid}") text = (answer_data.get("question") or "").strip() if not text: q = await redis.hgetall(f"session:{session_id}:q:{qid}") text = (q.get("question") or "").strip() if text: output.append(text) return output async def _sample_topic_questions( db, topic_id: str, excluded_questions: list[str], limit: int, ) -> list[dict]: if limit <= 0: return [] docs = await db[TOPIC_QUESTIONS].find({"topic_id": topic_id}).to_list(length=500) random.shuffle(docs) excluded = {_question_fingerprint(q) for q in excluded_questions if q} selected: list[dict] = [] for doc in docs: text = (doc.get("question") or "").strip() if not text: continue fp = _question_fingerprint(text) if not fp or fp in excluded: continue excluded.add(fp) selected.append( { "question": text, "difficulty": _normalize_bank_difficulty(doc.get("difficulty") or "medium"), "category": doc.get("category") or "topic", } ) if len(selected) >= limit: break return selected async def _seed_resume_questions_task(session_id: str) -> None: db = get_db() redis = get_redis() session = await redis.hgetall(f"session:{session_id}") if not session or session.get("status") != "in_progress" or session.get("interview_type") != "resume": return try: await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) current_q_size = await queue_size(redis, session_id) needed = max(0, RESUME_INITIAL_BATCH_SIZE - current_q_size) if needed > 0: excluded_questions = await _get_session_question_texts(redis, session_id) seed_items = await generate_resume_seed_questions( role_title=session.get("role_title", "Software Developer"), resume_summary=session.get("resume_summary", "No summary available"), resume_skills=_safe_json_list(session.get("skills", "[]")), jd_title=session.get("job_description_title", ""), jd_description=session.get("job_description_text", ""), jd_required_skills=_safe_json_list(session.get("jd_required_skills", "[]")), excluded_questions=excluded_questions, count=needed, ) appended = 0 for item in seed_items: qid = await enqueue_question( redis=redis, session_id=session_id, question=item.get("question", ""), difficulty=item.get("difficulty", "medium"), category=item.get("category", "resume-seed"), ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if qid: appended += 1 await redis.hset( f"session:{session_id}", mapping={ "generated_count": str(_safe_int(session.get("generated_count", 0)) + appended), "metrics_gemini_calls": str(_safe_int(session.get("metrics_gemini_calls", 0)) + 1), "metrics_gemini_questions": str(_safe_int(session.get("metrics_gemini_questions", 0)) + appended), "metrics_generation_batches": str(_safe_int(session.get("metrics_generation_batches", 0)) + 1), }, ) await db[SESSIONS].update_one( {"session_id": session_id}, { "$set": { "metrics_gemini_calls": _safe_int(session.get("metrics_gemini_calls", 0)) + 1, "metrics_gemini_questions": _safe_int(session.get("metrics_gemini_questions", 0)) + appended, "metrics_generation_batches": _safe_int(session.get("metrics_generation_batches", 0)) + 1, } }, ) await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) next_qid, next_q = await peek_next_question(redis, session_id) if next_qid and next_q: _schedule_question_audio_prefetch( [next_q.get("question", "")], _normalize_voice_gender(session.get("speech_voice_gender")), ) except Exception: # Non-blocking pre-seed path should never fail interview startup. return def _normalize_role_key(role_title: str) -> str: normalized = re.sub(r"\s+", " ", (role_title or "").strip().lower()) return normalized or "software developer" def _build_verification_cache_key( role_key: str, jd_id: str, jd_updated_at: str, resume_uploaded_at: str, ) -> str: return "||".join([ role_key or "software developer", jd_id or "-", jd_updated_at or "-", resume_uploaded_at or "-", ]) def _verification_doc_to_response(doc: dict, *, message: str, cached: bool) -> dict: return { "verification_id": doc.get("verification_id"), "saved_at": doc.get("saved_at") or doc.get("created_at") or utc_now(), "role_title": doc.get("role_title"), "job_description": doc.get("job_description") or {}, "resume_snapshot": doc.get("resume_snapshot") or {}, "jd_alignment": doc.get("jd_alignment") or {}, "message": message, "cached": cached, } async def verify_resume_job_description( user_id: str, role_id: str = None, custom_role: str = None, job_description_id: str = None, ) -> dict: """Run resume-vs-job-description verification without starting an interview. Reuses a saved verification while the selected role, JD version, and resume upload timestamp are unchanged. """ if not job_description_id: raise ValueError("job_description_id is required for verification") db = get_db() resume_doc = await db[RESUMES].find_one({"user_id": user_id}) if not resume_doc: raise ValueError("Please upload your resume before running verification") skills_doc = await db[SKILLS].find_one({"user_id": user_id}) resume_skills = normalize_skill_list(skills_doc.get("skills", [])) if skills_doc else [] parsed_data = (resume_doc or {}).get("parsed_data", {}) or {} summary_parts = [ parsed_data.get("experience_summary") or "", " ".join(parsed_data.get("recommended_roles", []) or []), ] resume_summary = "\n".join([part for part in summary_parts if part]).strip() or "No summary available" role_title = await _resolve_role_title(db, role_id=role_id, custom_role=custom_role) role_key = _normalize_role_key(role_title) selected_jd = await get_job_description_for_user(user_id, job_description_id) resume_uploaded_at = resume_doc.get("uploaded_at") or "" jd_updated_at = selected_jd.get("updated_at") or "" cache_key = _build_verification_cache_key( role_key=role_key, jd_id=selected_jd.get("id") or job_description_id, jd_updated_at=jd_updated_at, resume_uploaded_at=resume_uploaded_at, ) existing_verification = await db[JD_VERIFICATIONS].find_one( {"user_id": user_id, "cache_key": cache_key}, sort=[("created_at", -1)], ) if not existing_verification: compatibility_query = { "user_id": user_id, "role_title": role_title, "job_description.id": selected_jd.get("id"), "resume_snapshot.uploaded_at": resume_uploaded_at, } if jd_updated_at: compatibility_query["job_description.updated_at"] = jd_updated_at existing_verification = await db[JD_VERIFICATIONS].find_one( compatibility_query, sort=[("created_at", -1)], ) if existing_verification: await db[JD_VERIFICATIONS].update_one( {"_id": existing_verification["_id"]}, { "$set": { "cache_key": cache_key, "role_key": role_key, "saved_at": existing_verification.get("saved_at") or existing_verification.get("created_at") or utc_now(), } }, ) if existing_verification: return _verification_doc_to_response( existing_verification, message="Loaded saved verification", cached=True, ) jd_alignment = await analyze_resume_vs_job_description( role_title=role_title, resume_skills=resume_skills if resume_skills else ["general"], resume_summary=resume_summary, jd_title=selected_jd.get("title", ""), jd_description=selected_jd.get("description", ""), jd_required_skills=selected_jd.get("required_skills", []), ) resume_snapshot = { "filename": resume_doc.get("original_filename") or resume_doc.get("filename") or "", "uploaded_at": resume_uploaded_at, "skills": resume_skills, "parsed_data": { "name": parsed_data.get("name"), "email": parsed_data.get("email"), "phone": parsed_data.get("phone"), "location": parsed_data.get("location"), "recommended_roles": parsed_data.get("recommended_roles", []) or [], "experience_summary": parsed_data.get("experience_summary", "") or "", }, } verification_id = generate_id() saved_at = utc_now() verification_doc = { "verification_id": verification_id, "user_id": user_id, "role_id": role_id, "custom_role": custom_role, "role_title": role_title, "role_key": role_key, "cache_key": cache_key, "job_description": { "id": selected_jd.get("id"), "title": selected_jd.get("title"), "company": selected_jd.get("company"), "description": selected_jd.get("description"), "required_skills": selected_jd.get("required_skills", []) or [], "updated_at": jd_updated_at, }, "resume_snapshot": resume_snapshot, "jd_alignment": jd_alignment, "saved_at": saved_at, "created_at": saved_at, } await db[JD_VERIFICATIONS].insert_one(verification_doc) return _verification_doc_to_response( verification_doc, message="Verification complete", cached=False, ) async def _get_generated_question_texts(redis, session_id: str) -> list[str]: qids = await redis.lrange(f"session:{session_id}:questions", 0, -1) questions = [] for qid in qids: q = await redis.hgetall(f"session:{session_id}:q:{qid}") if q and q.get("question"): questions.append(q["question"]) return questions async def _generate_question_batch( role_title: str, skills: list[str], previous_questions: list[str], generated_count: int, max_questions: int, current_difficulty: str, local_summary: str | None, batch_size: int, ) -> tuple[list[dict], str]: remaining = max(0, max_questions - generated_count) target = min(batch_size, remaining) if target <= 0: return [], current_difficulty # Initial resume seed: generate the full first batch in one Gemini call. if generated_count == 0 and target > 1 and not local_summary: seeded = await generate_interview_question_batch( skills=skills, role_title=role_title, count=target, start_question_number=1, previous_questions=previous_questions, foundation_limit=0, ) if seeded: last = seeded[-1].get("difficulty", current_difficulty) return seeded, last generated: list[dict] = [] rolling_questions = list(previous_questions) rolling_difficulty = current_difficulty rolling_count = generated_count for i in range(target): state = { "role_title": role_title, "skills": skills, "previous_questions": rolling_questions, # Feed the local summary once per batch as extra context. "previous_answer": local_summary if i == 0 else None, "question_count": rolling_count, "max_questions": max_questions, "current_difficulty": rolling_difficulty, } graph_result = await run_interview_graph(state) q_data = graph_result.get("question_data", {}) difficulty = q_data.get("difficulty", graph_result.get("current_difficulty", "medium")) generated.append( { "question": q_data.get("question", "Can you explain your approach?"), "difficulty": difficulty, "category": q_data.get("category", "general"), } ) rolling_questions.append(generated[-1]["question"]) rolling_count += 1 rolling_difficulty = difficulty return generated, rolling_difficulty async def _append_batch_to_redis(redis, session_id: str, batch: list[dict]) -> list[str]: created_ids: list[str] = [] for item in batch: normalized_question = normalize_question_text(item.get("question", "Can you explain your approach?")) if not normalized_question: continue qid = generate_id() created_ids.append(qid) await redis.hset( f"session:{session_id}:q:{qid}", mapping={ "question_id": qid, "question": normalized_question, "difficulty": item.get("difficulty", "medium"), "category": item.get("category", "general"), }, ) await redis.rpush(f"session:{session_id}:questions", qid) await redis.expire(f"session:{session_id}:q:{qid}", SESSION_TTL) if created_ids: await redis.expire(f"session:{session_id}:questions", SESSION_TTL) return created_ids async def _fetch_question_bank_batch( db, role_id: str | None, excluded_questions: list[str], limit: int, skill_hints: list[str] | None = None, ) -> list[dict]: if limit <= 0: return [] query = {"question": {"$exists": True, "$ne": ""}} if role_id: role_candidates = [role_id] try: oid = ObjectId(role_id) role_candidates.append(str(oid)) role_candidates.append(oid) except Exception: pass query["role_id"] = {"$in": role_candidates} normalized_hints = normalize_skill_list(skill_hints or []) if normalized_hints: scope_match = [] for skill in normalized_hints: token = re.escape(skill) scope_match.append({"category": {"$regex": token, "$options": "i"}}) scope_match.append({"question": {"$regex": token, "$options": "i"}}) if scope_match: query["$or"] = scope_match excluded = {q.strip().lower() for q in excluded_questions if q} selected: list[dict] = [] for sample_size in (max(limit * 12, 80), max(limit * 24, 160)): pipeline = [ {"$match": query}, {"$sample": {"size": sample_size}}, ] async for q in db[QUESTIONS].aggregate(pipeline): text = (q.get("question") or "").strip() if not text: continue if text.lower() in excluded: continue selected.append( { "question": text, "difficulty": _normalize_bank_difficulty(q.get("difficulty") or "medium"), "category": q.get("category") or "question-bank", } ) excluded.add(text.lower()) if len(selected) >= limit: break if len(selected) >= limit: break # If role-scoped pool is too small, widen to global random pool. if len(selected) < limit and role_id: fallback = await _fetch_question_bank_batch( db=db, role_id=None, excluded_questions=list(excluded), limit=limit - len(selected), skill_hints=normalized_hints, ) selected.extend(fallback) return selected def _strict_followup_difficulty(answered_count: int) -> str: # After first DB set (Q1-5), follow-ups should feel like real interview pressure. return "hard" if answered_count >= 10 else "medium" def _has_followup_opportunity(qa_pairs: list, window: int = BATCH_SIZE) -> bool: """Decide whether Gemini follow-up questions are needed for the latest batch.""" if not qa_pairs: return False weak_markers = { "i think", "maybe", "not sure", "dont know", "don't know", "etc", "kind of", "sort of", } for qa in qa_pairs[-window:]: answer = (qa.get("answer") or "").strip() if not answer: continue if len(answer.split()) < 30: return True lowered = answer.lower() if any(marker in lowered for marker in weak_markers): return True return False async def _generate_mixed_followup_batch( db, redis, session_id: str, session: dict, generated_count: int, max_questions: int, ) -> tuple[list[dict], str, dict]: remaining = max(0, max_questions - generated_count) target = min(BATCH_SIZE, remaining) if target <= 0: return [], session.get("current_difficulty", "medium"), { "gemini_calls": 0, "gemini_questions": 0, "bank_questions": 0, "bank_shortfall": 0, } previous_questions = await _get_generated_question_texts(redis, session_id) qa_pairs = await get_session_qa(session_id) answered_count = len(qa_pairs) role_title = session.get("role_title", "Software Developer") skills = _safe_json_list(session.get("skills", "[]")) jd_required_skills = _safe_json_list(session.get("jd_required_skills", "[]")) resume_source_mode = (session.get("resume_source_mode") or "db").strip().lower() current_difficulty = _strict_followup_difficulty(answered_count) from utils.gemini import generate_followup_question_batch_from_qa gemini_calls = 0 gemini_questions = 0 if resume_source_mode == "ai": ai_items = await generate_followup_question_batch_from_qa( role_title=role_title, skills=skills, qa_pairs=qa_pairs, previous_questions=previous_questions, count=target, difficulty=current_difficulty, ) gemini_calls = 1 if target > 0 else 0 deduped_ai = [] excluded_lower = {q.strip().lower() for q in previous_questions if q} for item in ai_items: text = (item.get("question") or "").strip() if not text: continue lowered = text.lower() if lowered in excluded_lower: continue deduped_ai.append(item) excluded_lower.add(lowered) if len(deduped_ai) >= target: break if len(deduped_ai) < target: refill, refill_last = await _generate_question_batch( role_title=role_title, skills=skills, previous_questions=previous_questions + [i.get("question", "") for i in deduped_ai], generated_count=generated_count + len(deduped_ai), max_questions=max_questions, current_difficulty=current_difficulty, local_summary=_LOCAL_SUMMARIES.get(session_id), batch_size=target - len(deduped_ai), ) for item in refill: text = (item.get("question") or "").strip() if not text: continue lowered = text.lower() if lowered in excluded_lower: continue deduped_ai.append(item) excluded_lower.add(lowered) if len(deduped_ai) >= target: break if refill: current_difficulty = refill_last final_ai = deduped_ai[:target] last_difficulty = final_ai[-1].get("difficulty", current_difficulty) if final_ai else current_difficulty return final_ai, last_difficulty, { "gemini_calls": gemini_calls, "gemini_questions": len(final_ai), "bank_questions": 0, "bank_shortfall": 0, } # Batch policy: # - If follow-up opportunity exists: 2 AI + 3 DB # - Otherwise: 5 DB ai_target = min(FOLLOWUP_AI_COUNT, target) if _has_followup_opportunity(qa_pairs) else 0 excluded_lower = {q.strip().lower() for q in previous_questions if q} ai_items: list[dict] = [] if ai_target > 0: generated_ai = await generate_followup_question_batch_from_qa( role_title=role_title, skills=skills, qa_pairs=qa_pairs, previous_questions=previous_questions, count=ai_target, difficulty=current_difficulty, ) gemini_calls += 1 for item in generated_ai: text = (item.get("question") or "").strip() if not text: continue lowered = text.lower() if lowered in excluded_lower: continue ai_items.append(item) excluded_lower.add(lowered) if len(ai_items) >= ai_target: break gemini_questions += len(ai_items) bank_target = max(0, target - len(ai_items)) exclude_pool = list(excluded_lower) bank_items = await _fetch_question_bank_batch( db=db, role_id=session.get("role_id"), excluded_questions=exclude_pool, limit=bank_target, skill_hints=jd_required_skills, ) for item in bank_items: text = (item.get("question") or "").strip() if text: excluded_lower.add(text.lower()) if len(bank_items) < bank_target: # Keep total batch size stable if the bank pool is exhausted. refill = bank_target - len(bank_items) refill_ai = [] added_refill_ai = 0 if refill > 0: refill_ai = await generate_followup_question_batch_from_qa( role_title=role_title, skills=skills, qa_pairs=qa_pairs, previous_questions=list(excluded_lower), count=refill, difficulty=current_difficulty, ) gemini_calls += 1 for item in refill_ai: text = (item.get("question") or "").strip() if not text: continue lowered = text.lower() if lowered in excluded_lower: continue ai_items.append(item) added_refill_ai += 1 excluded_lower.add(lowered) if len(ai_items) + len(bank_items) >= target: break gemini_questions += added_refill_ai mixed = (ai_items + bank_items)[:target] if len(mixed) > 1: random.shuffle(mixed) last_difficulty = mixed[-1].get("difficulty", current_difficulty) if mixed else current_difficulty return mixed, last_difficulty, { "gemini_calls": gemini_calls, "gemini_questions": gemini_questions, "bank_questions": len(bank_items), "bank_shortfall": max(0, bank_target - len(bank_items)), } async def _start_topic_interview(user_id: str, topic_id: str) -> dict: """Start topic interview with low-cost DB-first flow and staged AI follow-ups.""" db = get_db() redis = get_redis() topic = await db[TOPICS].find_one({"_id": __import__("bson").ObjectId(topic_id)}) if not topic: raise ValueError("Topic not found") if not topic.get("is_published", False): raise ValueError("This topic interview is not published yet") initial_items = await _sample_topic_questions( db=db, topic_id=topic_id, excluded_questions=[], limit=TOPIC_INITIAL_DB_QUESTIONS, ) if len(initial_items) < TOPIC_INITIAL_ASK_COUNT: raise ValueError("Not enough topic questions to start interview") first_question = initial_items[0] queued_initial = initial_items[1:TOPIC_INITIAL_ASK_COUNT] timer_enabled = bool(topic.get("timer_enabled", False)) timer_seconds = topic.get("timer_seconds") if timer_enabled else None session_id = generate_id() _LOCAL_SUMMARIES[session_id] = "" user_doc = None try: user_doc = await db[USERS].find_one({"_id": ObjectId(user_id)}, {"speech_settings": 1}) except Exception: user_doc = await db[USERS].find_one({"user_id": user_id}, {"speech_settings": 1}) speech_voice_gender = _normalize_voice_gender(((user_doc or {}).get("speech_settings") or {}).get("voice_gender")) first_id = generate_id() await redis.hset( f"session:{session_id}:q:{first_id}", mapping={ "question_id": first_id, "question": normalize_question_text(first_question.get("question", "Can you explain this topic?")), "difficulty": first_question.get("difficulty", "medium"), "category": first_question.get("category", topic.get("name", "topic")), }, ) await redis.expire(f"session:{session_id}:q:{first_id}", SESSION_TTL) await redis.rpush(f"session:{session_id}:questions", first_id) await redis.expire(f"session:{session_id}:questions", SESSION_TTL) await mark_question_asked( redis=redis, session_id=session_id, question_text=first_question.get("question", ""), ttl_seconds=SESSION_TTL, ) queued_count = 0 for item in queued_initial: qid = await enqueue_question( redis=redis, session_id=session_id, question=item.get("question", ""), difficulty=item.get("difficulty", "medium"), category=item.get("category", topic.get("name", "topic")), ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if qid: queued_count += 1 await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) session_doc = { "session_id": session_id, "user_id": user_id, "role_id": None, "role_title": topic.get("name", "Topic Interview"), "topic_id": topic_id, "interview_type": "topic", "status": "in_progress", "question_count": 1, "max_questions": TOPIC_TOTAL_QUESTIONS, "current_difficulty": first_question.get("difficulty", "medium"), "metrics_gemini_calls": 0, "metrics_gemini_questions": 0, "metrics_bank_questions": queued_count + 1, "metrics_bank_shortfall": max(0, TOPIC_INITIAL_ASK_COUNT - (queued_count + 1)), "metrics_generation_batches": 1, "speech_voice_gender": speech_voice_gender, "timer_enabled": timer_enabled, "timer_seconds": timer_seconds, "topic_followups_generated": False, "started_at": utc_now(), } await db[SESSIONS].insert_one(session_doc) session_state = { "user_id": user_id, "role_title": topic.get("name", "Topic Interview"), "topic_id": topic_id, "interview_type": "topic", "skills": json.dumps([topic.get("name", "general")]), "user_skills": json.dumps([]), "required_skills": json.dumps([]), "matched_skills": json.dumps([]), "missing_skills": json.dumps([]), "question_count": 1, "answered_count": 0, "served_count": 1, "generated_count": queued_count + 1, "max_questions": TOPIC_TOTAL_QUESTIONS, "current_difficulty": first_question.get("difficulty", "medium"), "timer_enabled": str(timer_enabled), "timer_seconds": str(timer_seconds or ""), "status": "in_progress", "speech_voice_gender": speech_voice_gender, "metrics_gemini_calls": 0, "metrics_gemini_questions": 0, "metrics_bank_questions": queued_count + 1, "metrics_bank_shortfall": max(0, TOPIC_INITIAL_ASK_COUNT - (queued_count + 1)), "metrics_generation_batches": 1, "topic_followups_generated": "0", } await redis.hset(f"session:{session_id}", mapping=session_state) await redis.expire(f"session:{session_id}", SESSION_TTL) next_qid, next_q = await peek_next_question(redis, session_id) prefetch_targets = [next_q.get("question", "")] if next_qid and next_q else [] _schedule_question_audio_prefetch(prefetch_targets, speech_voice_gender) return { "session_id": session_id, "interview_type": "topic", "topic": { "topic_id": topic_id, "name": topic.get("name", "Topic Interview"), "description": topic.get("description", ""), }, "skill_alignment": { "user_skills": [], "required_skills": [topic.get("name", "")], "matched_skills": [], "missing_skills": [], "interview_focus": [topic.get("name", "")], }, "question": { "question_id": first_id, "question": normalize_question_text(first_question.get("question", "Can you explain this topic?")), "difficulty": first_question.get("difficulty", "medium"), "question_number": 1, "total_questions": TOPIC_TOTAL_QUESTIONS, }, "timer": { "enabled": timer_enabled, "seconds": timer_seconds, }, "message": "Topic interview started. Good luck!", } async def _async_pregenerate_next_batch(session_id: str) -> None: db = get_db() redis = get_redis() try: session = await redis.hgetall(f"session:{session_id}") if not session or session.get("status") != "in_progress": return if session.get("interview_type", "resume") != "resume": return pending_len = await redis.llen(f"session:{session_id}:pending_questions") generated_count = int(session.get("generated_count", 0)) max_questions = int(session.get("max_questions", MAX_QUESTIONS)) if pending_len >= PREGEN_MIN_PENDING or generated_count >= max_questions: return batch, last_difficulty, batch_metrics = await _generate_mixed_followup_batch( db=db, redis=redis, session_id=session_id, session=session, generated_count=generated_count, max_questions=max_questions, ) if not batch: return new_ids = await _append_batch_to_redis(redis, session_id, batch) if new_ids: await redis.rpush(f"session:{session_id}:pending_questions", *new_ids) await redis.expire(f"session:{session_id}:pending_questions", SESSION_TTL) prefetch_targets = [] for qid in new_ids[:2]: q = await redis.hgetall(f"session:{session_id}:q:{qid}") prefetch_targets.append(q.get("question", "")) _schedule_question_audio_prefetch( prefetch_targets, _normalize_voice_gender(session.get("speech_voice_gender")), ) await redis.hset( f"session:{session_id}", mapping={ "generated_count": str(generated_count + len(new_ids)), "current_difficulty": last_difficulty, "metrics_gemini_calls": str(_safe_int(session.get("metrics_gemini_calls", 0)) + batch_metrics.get("gemini_calls", 0)), "metrics_gemini_questions": str(_safe_int(session.get("metrics_gemini_questions", 0)) + batch_metrics.get("gemini_questions", 0)), "metrics_bank_questions": str(_safe_int(session.get("metrics_bank_questions", 0)) + batch_metrics.get("bank_questions", 0)), "metrics_bank_shortfall": str(_safe_int(session.get("metrics_bank_shortfall", 0)) + batch_metrics.get("bank_shortfall", 0)), "metrics_generation_batches": str(_safe_int(session.get("metrics_generation_batches", 0)) + 1), }, ) await db[SESSIONS].update_one( {"session_id": session_id}, { "$set": { "metrics_gemini_calls": _safe_int(session.get("metrics_gemini_calls", 0)) + batch_metrics.get("gemini_calls", 0), "metrics_gemini_questions": _safe_int(session.get("metrics_gemini_questions", 0)) + batch_metrics.get("gemini_questions", 0), "metrics_bank_questions": _safe_int(session.get("metrics_bank_questions", 0)) + batch_metrics.get("bank_questions", 0), "metrics_bank_shortfall": _safe_int(session.get("metrics_bank_shortfall", 0)) + batch_metrics.get("bank_shortfall", 0), "metrics_generation_batches": _safe_int(session.get("metrics_generation_batches", 0)) + 1, } }, ) finally: _PREGEN_IN_FLIGHT.discard(session_id) def _schedule_pregen(session_id: str, answered_count: int) -> None: # Start pre-generation as soon as Q1 is answered, while user is on Q2. if answered_count < 1: return if session_id in _PREGEN_IN_FLIGHT: return _PREGEN_IN_FLIGHT.add(session_id) asyncio.create_task(_async_pregenerate_next_batch(session_id)) async def start_interview( user_id: str, role_id: str = None, custom_role: str = None, interview_type: str = "resume", topic_id: str = None, job_description_id: str = None, ) -> dict: """Start a new interview session with low-cost queue-first orchestration.""" interview_type = (interview_type or "resume").strip().lower() if interview_type == "topic": if not topic_id: raise ValueError("topic_id is required for topic interviews") return await _start_topic_interview(user_id=user_id, topic_id=topic_id) db = get_db() redis = get_redis() user_doc = None try: user_doc = await db[USERS].find_one({"_id": ObjectId(user_id)}, {"speech_settings": 1}) except Exception: user_doc = await db[USERS].find_one({"user_id": user_id}, {"speech_settings": 1}) speech_voice_gender = _normalize_voice_gender(((user_doc or {}).get("speech_settings") or {}).get("voice_gender")) skills_doc = await db[SKILLS].find_one({"user_id": user_id}) user_skills = normalize_skill_list(skills_doc.get("skills", [])) if skills_doc else [] resume_doc = await db[RESUMES].find_one({"user_id": user_id}) if not resume_doc: raise ValueError("Please upload your resume before starting a resume interview") parsed_resume = (resume_doc or {}).get("parsed_data", {}) or {} resume_summary_parts = [ parsed_resume.get("experience_summary") or "", " ".join(parsed_resume.get("recommended_roles", []) or []), ] resume_summary = "\n".join([part for part in resume_summary_parts if part]).strip() or "No summary available" if not job_description_id: raise ValueError("Please select a Job Description before starting Resume Interview") role_title = await _resolve_role_title(db, role_id=role_id, custom_role=custom_role) selected_jd = await get_job_description_for_user(user_id, job_description_id) jd_required_skills = normalize_skill_list((selected_jd or {}).get("required_skills", [])) if not jd_required_skills: raise ValueError("Selected Job Description has no required skills. Add required skills first.") user_skill_set = {s.lower() for s in user_skills} matched_role_skills = [s for s in jd_required_skills if s.lower() in user_skill_set] missing_role_skills = [s for s in jd_required_skills if s.lower() not in user_skill_set] base_skills_for_interview = matched_role_skills + [s for s in missing_role_skills if s not in matched_role_skills] skills_for_interview = build_interview_focus_skills(base_skills_for_interview) or list(jd_required_skills) intro_question = _build_resume_intro_question(role_title=role_title, jd_title=selected_jd.get("title", "")) intro_question = normalize_question_text(intro_question) session_id = generate_id() _LOCAL_SUMMARIES[session_id] = "" first_id = generate_id() await redis.hset( f"session:{session_id}:q:{first_id}", mapping={ "question_id": first_id, "question": intro_question, "difficulty": "easy", "category": "intro", }, ) await redis.expire(f"session:{session_id}:q:{first_id}", SESSION_TTL) await redis.rpush(f"session:{session_id}:questions", first_id) await redis.expire(f"session:{session_id}:questions", SESSION_TTL) await mark_question_asked( redis=redis, session_id=session_id, question_text=intro_question, ttl_seconds=SESSION_TTL, ) session_doc = { "session_id": session_id, "user_id": user_id, "role_id": role_id, "role_title": role_title, "job_description_id": selected_jd.get("id"), "job_description_title": selected_jd.get("title"), "status": "in_progress", "interview_type": "resume", "question_count": 1, "max_questions": RESUME_MAX_QUESTIONS, "current_difficulty": "easy", "metrics_gemini_calls": 0, "metrics_gemini_questions": 0, "metrics_bank_questions": 1, "metrics_bank_shortfall": 0, "metrics_generation_batches": 0, "speech_voice_gender": speech_voice_gender, "started_at": utc_now(), "interview_generation_mode": "queue_followup", } await db[SESSIONS].insert_one(session_doc) session_state = { "user_id": user_id, "role_id": role_id or "", "role_title": role_title, "skills": json.dumps(skills_for_interview), "user_skills": json.dumps(user_skills), "required_skills": json.dumps(jd_required_skills), "matched_skills": json.dumps(matched_role_skills), "missing_skills": json.dumps(missing_role_skills), "question_count": 1, "answered_count": 0, "served_count": 1, "generated_count": 1, "max_questions": RESUME_MAX_QUESTIONS, "current_difficulty": "easy", "interview_type": "resume", "status": "in_progress", "speech_voice_gender": speech_voice_gender, "jd_required_skills": json.dumps(jd_required_skills), "job_description_title": selected_jd.get("title", ""), "job_description_text": selected_jd.get("description", ""), "resume_summary": resume_summary, "metrics_gemini_calls": 0, "metrics_gemini_questions": 0, "metrics_bank_questions": 1, "metrics_bank_shortfall": 0, "metrics_generation_batches": 0, "interview_generation_mode": "queue_followup", } await redis.hset(f"session:{session_id}", mapping=session_state) await redis.expire(f"session:{session_id}", SESSION_TTL) # Preload initial queue in background (2 questions) without blocking first question delivery. asyncio.create_task(_seed_resume_questions_task(session_id)) return { "session_id": session_id, "skill_alignment": { "user_skills": user_skills, "required_skills": jd_required_skills, "matched_skills": matched_role_skills, "missing_skills": missing_role_skills, "interview_focus": skills_for_interview, }, "question": { "question_id": first_id, "question": intro_question, "difficulty": "easy", "question_number": 1, "total_questions": RESUME_MAX_QUESTIONS, }, "timer": { "enabled": False, "seconds": None, }, "message": "Interview started. Good luck!", "job_description": selected_jd, "jd_alignment": None, } async def _record_submit_latency(started_at: float) -> float: elapsed_ms = (perf_counter() - started_at) * 1000.0 await record_latency("submit_ms", elapsed_ms) return round(elapsed_ms, 2) async def _apply_generation_metric_delta( *, db, redis, session_id: str, session: dict, metrics_delta: dict, generated_count: int | None = None, extra_redis_fields: dict | None = None, extra_db_fields: dict | None = None, ) -> dict: base_stats = _current_generation_stats(session) effective_stats = { "gemini_calls": base_stats["gemini_calls"] + _safe_int(metrics_delta.get("gemini_calls", 0)), "gemini_questions": base_stats["gemini_questions"] + _safe_int(metrics_delta.get("gemini_questions", 0)), "bank_questions": base_stats["bank_questions"] + _safe_int(metrics_delta.get("bank_questions", 0)), "bank_shortfall": base_stats["bank_shortfall"] + _safe_int(metrics_delta.get("bank_shortfall", 0)), "generation_batches": base_stats["generation_batches"] + _safe_int(metrics_delta.get("generation_batches", 0)), } redis_mapping = { "metrics_gemini_calls": str(effective_stats["gemini_calls"]), "metrics_gemini_questions": str(effective_stats["gemini_questions"]), "metrics_bank_questions": str(effective_stats["bank_questions"]), "metrics_bank_shortfall": str(effective_stats["bank_shortfall"]), "metrics_generation_batches": str(effective_stats["generation_batches"]), } if generated_count is not None: redis_mapping["generated_count"] = str(generated_count) if extra_redis_fields: redis_mapping.update(extra_redis_fields) await redis.hset(f"session:{session_id}", mapping=redis_mapping) db_set = { "metrics_gemini_calls": effective_stats["gemini_calls"], "metrics_gemini_questions": effective_stats["gemini_questions"], "metrics_bank_questions": effective_stats["bank_questions"], "metrics_bank_shortfall": effective_stats["bank_shortfall"], "metrics_generation_batches": effective_stats["generation_batches"], } if generated_count is not None: db_set["generated_count"] = generated_count if extra_db_fields: db_set.update(extra_db_fields) await db[SESSIONS].update_one({"session_id": session_id}, {"$set": db_set}) return effective_stats async def _post_submit_resume_processing( session_id: str, question_id: str, question_text: str, answer: str, answered_count: int, max_questions: int, ) -> None: db = get_db() redis = get_redis() async with _get_post_submit_lock(session_id): session = await redis.hgetall(f"session:{session_id}") if not session: return skill_pool = _resume_skill_pool(session) recent_answered_questions = await _get_answered_question_texts( redis=redis, session_id=session_id, limit=4, ) recent_focus_topic, same_topic_streak = _recent_focus_streak( recent_answered_questions, skill_pool, ) recent_context = await get_recent_context_items( redis=redis, session_id=session_id, max_items=CONTEXT_CACHE_ITEMS, ) excluded_questions = await _get_session_question_texts(redis, session_id) evaluation = await evaluate_and_generate_followup( role_title=session.get("role_title", "Software Developer"), required_skills=_safe_json_list(session.get("jd_required_skills", "[]")), recent_context=recent_context, current_question=question_text, current_answer=answer, excluded_questions=excluded_questions, focus_topic=recent_focus_topic or "", same_topic_streak=same_topic_streak, ) await redis.hset( f"session:{session_id}:a:{question_id}", mapping={ "score": str(_safe_int(evaluation.get("score", 0))), "feedback": evaluation.get("feedback", ""), }, ) metrics_delta = { "gemini_calls": 1, "gemini_questions": 0, "bank_questions": 0, "bank_shortfall": 0, "generation_batches": 1, } generated_count = _safe_int(session.get("generated_count", 0)) follow_text, focus_skill_override = _apply_resume_followup_policy( skill_pool=skill_pool, recent_focus_topic=recent_focus_topic, same_topic_streak=same_topic_streak, suggested_question=(evaluation.get("followup_question") or "").strip(), suggested_topic=(evaluation.get("followup_topic") or "").strip(), followup_need_score=_safe_score_0_100(evaluation.get("followup_need_score", 0)), answered_count=answered_count, ) if answered_count < max_questions and session.get("status") == "in_progress": qid, used_model_followup = await _enqueue_resume_followup_with_fallback( redis=redis, session_id=session_id, session=session, answered_count=answered_count, suggested_text=follow_text, suggested_difficulty=evaluation.get("difficulty", "medium"), suggested_category=evaluation.get("category", "follow-up"), focus_skill_override=focus_skill_override, ) if qid: generated_count += 1 if used_model_followup: metrics_delta["gemini_questions"] += 1 await _apply_generation_metric_delta( db=db, redis=redis, session_id=session_id, session=session, metrics_delta=metrics_delta, generated_count=generated_count, ) await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if session.get("status") == "in_progress": qid, q = await peek_next_question(redis, session_id) if qid and q: _schedule_question_audio_prefetch( [q.get("question", "")], _normalize_voice_gender(session.get("speech_voice_gender")), ) async def _post_submit_topic_processing( session_id: str, answered_count: int, ) -> None: db = get_db() redis = get_redis() if answered_count < TOPIC_INITIAL_ASK_COUNT: return async with _get_post_submit_lock(session_id): session = await redis.hgetall(f"session:{session_id}") if not session: return max_questions = max( TOPIC_TOTAL_QUESTIONS, _safe_int(session.get("max_questions", TOPIC_TOTAL_QUESTIONS)), ) generated_count = _safe_int(session.get("generated_count", 0)) remaining_needed = max(0, max_questions - generated_count) if remaining_needed <= 0: await redis.hset(f"session:{session_id}", mapping={"topic_followups_generated": "1"}) await db[SESSIONS].update_one( {"session_id": session_id}, {"$set": {"topic_followups_generated": True, "max_questions": max_questions}}, ) return if session.get("topic_followups_generated", "0") == "1": return qa_pairs = await get_session_qa(session_id) excluded_questions = await _get_session_question_texts(redis, session_id) ai_target = min(TOPIC_AI_FOLLOWUPS, remaining_needed) ai_items = await generate_topic_followup_batch( topic_name=session.get("role_title", "Topic Interview"), qa_pairs=qa_pairs, excluded_questions=excluded_questions, count=ai_target, ) db_target = max(0, remaining_needed - len(ai_items)) db_items = await _sample_topic_questions( db=db, topic_id=session.get("topic_id", ""), excluded_questions=excluded_questions + [i.get("question", "") for i in ai_items], limit=db_target, ) topic_added = 0 ai_added = 0 db_added = 0 for item in ai_items: qid = await enqueue_question( redis=redis, session_id=session_id, question=item.get("question", ""), difficulty=item.get("difficulty", "medium"), category=item.get("category", session.get("role_title", "topic")), ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if qid: topic_added += 1 ai_added += 1 for item in db_items: qid = await enqueue_question( redis=redis, session_id=session_id, question=item.get("question", ""), difficulty=item.get("difficulty", "medium"), category=item.get("category", session.get("role_title", "topic")), ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if qid: topic_added += 1 db_added += 1 fallback_added = 0 fallback_variants = max(10, remaining_needed * 4) for variant in range(fallback_variants): if topic_added >= remaining_needed: break next_question_number = generated_count + topic_added + 1 fallback_text = _build_topic_resilient_followup_question( session=session, question_number=next_question_number, variant=variant, ) qid = await enqueue_question( redis=redis, session_id=session_id, question=fallback_text, difficulty="medium", category="topic-fallback", ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if qid: topic_added += 1 fallback_added += 1 generated_count += topic_added await _apply_generation_metric_delta( db=db, redis=redis, session_id=session_id, session=session, metrics_delta={ "gemini_calls": 1 if ai_target > 0 else 0, "gemini_questions": ai_added, "bank_questions": db_added + fallback_added, "bank_shortfall": max(0, remaining_needed - topic_added), "generation_batches": 1, }, generated_count=generated_count, extra_redis_fields={ "topic_followups_generated": "1", "max_questions": str(max_questions), }, extra_db_fields={ "topic_followups_generated": True, "max_questions": max_questions, }, ) await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) if session.get("status") == "in_progress": qid, q = await peek_next_question(redis, session_id) if qid and q: _schedule_question_audio_prefetch( [q.get("question", "")], _normalize_voice_gender(session.get("speech_voice_gender")), ) def _schedule_post_submit_processing( *, session_id: str, question_id: str, question_text: str, answer: str, answered_count: int, max_questions: int, interview_type: str, ) -> None: try: if interview_type == "resume": task = asyncio.create_task( _post_submit_resume_processing( session_id=session_id, question_id=question_id, question_text=question_text, answer=answer, answered_count=answered_count, max_questions=max_questions, ) ) task.add_done_callback(_consume_post_submit_task_result) return if interview_type == "topic": task = asyncio.create_task( _post_submit_topic_processing( session_id=session_id, answered_count=answered_count, ) ) task.add_done_callback(_consume_post_submit_task_result) except Exception: # Never block request response on scheduler errors. return async def submit_answer(session_id: str, question_id: str, answer: str) -> dict: """Submit answer and return next queued question immediately.""" started_at = perf_counter() db = get_db() redis = get_redis() session = await redis.hgetall(f"session:{session_id}") if not session: raise ValueError("Interview session not found or expired") if session.get("status") != "in_progress": raise ValueError("Interview is not in progress") current_q = await redis.hgetall(f"session:{session_id}:q:{question_id}") current_question_text = current_q.get("question", "") await redis.hset( f"session:{session_id}:a:{question_id}", mapping={ "question_id": question_id, "answer": answer, "question": current_question_text, "difficulty": current_q.get("difficulty", "medium"), "category": current_q.get("category", "general"), "submitted_at": utc_now(), }, ) await redis.rpush(f"session:{session_id}:answers", question_id) await redis.expire(f"session:{session_id}:a:{question_id}", SESSION_TTL) await redis.expire(f"session:{session_id}:answers", SESSION_TTL) await db[ANSWERS].update_one( { "session_id": session_id, "question_id": question_id, "user_id": session.get("user_id"), }, { "$set": { "question": current_question_text, "answer": answer, "difficulty": current_q.get("difficulty", "medium"), "category": current_q.get("category", "general"), "stored_at": utc_now(), } }, upsert=True, ) question_count = _safe_int(session.get("question_count", 1)) answered_count = _safe_int(session.get("answered_count", 0)) + 1 served_count = _safe_int(session.get("served_count", 1)) generated_count = _safe_int(session.get("generated_count", 0)) max_questions = _safe_int(session.get("max_questions", MAX_QUESTIONS)) interview_type = session.get("interview_type", "resume") speech_voice_gender = _normalize_voice_gender(session.get("speech_voice_gender")) if interview_type == "resume" and max_questions < RESUME_MAX_QUESTIONS: max_questions = RESUME_MAX_QUESTIONS await redis.hset(f"session:{session_id}", mapping={"max_questions": str(max_questions)}) await db[SESSIONS].update_one( {"session_id": session_id}, {"$set": {"max_questions": max_questions}}, ) if interview_type == "topic" and max_questions < TOPIC_TOTAL_QUESTIONS: max_questions = TOPIC_TOTAL_QUESTIONS await redis.hset(f"session:{session_id}", mapping={"max_questions": str(max_questions)}) await db[SESSIONS].update_one( {"session_id": session_id}, {"$set": {"max_questions": max_questions}}, ) _update_local_summary(session_id, current_question_text, answer) await push_context_item( redis=redis, session_id=session_id, item={ "question": current_question_text, "answer": answer, }, ttl_seconds=SESSION_TTL, max_items=CONTEXT_CACHE_ITEMS, ) if answered_count >= max_questions: await redis.hset( f"session:{session_id}", mapping={ "status": "completed", "answered_count": str(answered_count), }, ) await db[SESSIONS].update_one( {"session_id": session_id}, {"$set": {"status": "completed", "completed_at": utc_now()}}, ) submit_ms = await _record_submit_latency(started_at) return { "session_id": session_id, "next_question": None, "is_complete": True, "message": "Interview complete! Generating your report...", "submit_ms": submit_ms, } await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) next_question_id, q_data = await pop_next_question(redis, session_id) effective_stats = _current_generation_stats(session) fallback_evaluation = None # Emergency fallback for rare queue-empty cases. if not next_question_id and interview_type == "resume": skill_pool = _resume_skill_pool(session) recent_answered_questions = await _get_answered_question_texts( redis=redis, session_id=session_id, limit=4, ) recent_focus_topic, same_topic_streak = _recent_focus_streak( recent_answered_questions, skill_pool, ) recent_context = await get_recent_context_items( redis=redis, session_id=session_id, max_items=CONTEXT_CACHE_ITEMS, ) excluded_questions = await _get_session_question_texts(redis, session_id) fallback_evaluation = await evaluate_and_generate_followup( role_title=session.get("role_title", "Software Developer"), required_skills=_safe_json_list(session.get("jd_required_skills", "[]")), recent_context=recent_context, current_question=current_question_text, current_answer=answer, excluded_questions=excluded_questions, focus_topic=recent_focus_topic or "", same_topic_streak=same_topic_streak, ) await redis.hset( f"session:{session_id}:a:{question_id}", mapping={ "score": str(_safe_int(fallback_evaluation.get("score", 0))), "feedback": fallback_evaluation.get("feedback", ""), }, ) fallback_delta = { "gemini_calls": 1, "gemini_questions": 0, "bank_questions": 0, "bank_shortfall": 0, "generation_batches": 1, } follow_text, focus_skill_override = _apply_resume_followup_policy( skill_pool=skill_pool, recent_focus_topic=recent_focus_topic, same_topic_streak=same_topic_streak, suggested_question=(fallback_evaluation.get("followup_question") or "").strip(), suggested_topic=(fallback_evaluation.get("followup_topic") or "").strip(), followup_need_score=_safe_score_0_100(fallback_evaluation.get("followup_need_score", 0)), answered_count=answered_count, ) if answered_count < max_questions: qid, used_model_followup = await _enqueue_resume_followup_with_fallback( redis=redis, session_id=session_id, session=session, answered_count=answered_count, suggested_text=follow_text, suggested_difficulty=fallback_evaluation.get("difficulty", "medium"), suggested_category=fallback_evaluation.get("category", "follow-up"), focus_skill_override=focus_skill_override, ) if qid: generated_count += 1 if used_model_followup: fallback_delta["gemini_questions"] = 1 effective_stats = await _apply_generation_metric_delta( db=db, redis=redis, session_id=session_id, session=session, metrics_delta=fallback_delta, generated_count=generated_count, ) await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) next_question_id, q_data = await pop_next_question(redis, session_id) if ( not next_question_id and interview_type == "topic" and answered_count < max_questions ): # Topic follow-up generation runs in background, so synchronously top-up once # before concluding interview to avoid premature completion around Q4. await _post_submit_topic_processing( session_id=session_id, answered_count=answered_count, ) await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) next_question_id, q_data = await pop_next_question(redis, session_id) if not next_question_id or not q_data: await redis.hset( f"session:{session_id}", mapping={"status": "completed", "answered_count": str(answered_count)}, ) await db[SESSIONS].update_one( {"session_id": session_id}, {"$set": {"status": "completed", "completed_at": utc_now()}}, ) submit_ms = await _record_submit_latency(started_at) payload = { "session_id": session_id, "next_question": None, "is_complete": True, "message": "Interview complete! Generating your report...", "submit_ms": submit_ms, } if fallback_evaluation: payload["answer_evaluation"] = { "score": _safe_int(fallback_evaluation.get("score", 0)), "feedback": fallback_evaluation.get("feedback", ""), } return payload await mark_question_asked( redis=redis, session_id=session_id, question_text=q_data.get("question", ""), ttl_seconds=SESSION_TTL, ) await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) peek_next_id, peek_q = await peek_next_question(redis, session_id) if peek_next_id and peek_q: _schedule_question_audio_prefetch([peek_q.get("question", "")], speech_voice_gender) next_difficulty = q_data.get("difficulty", session.get("current_difficulty", "medium")) new_question_count = question_count + 1 new_served_count = served_count + 1 await redis.hset( f"session:{session_id}", mapping={ "question_count": str(new_question_count), "answered_count": str(answered_count), "served_count": str(new_served_count), "generated_count": str(generated_count), "current_difficulty": next_difficulty, }, ) response = { "session_id": session_id, "next_question": { "question_id": next_question_id, "question": q_data.get("question", "Can you elaborate further?"), "difficulty": q_data.get("difficulty", "medium"), "question_number": new_served_count, "total_questions": max_questions, }, "is_complete": False, "message": f"Question {new_served_count} of {max_questions}", "generation_stats": effective_stats, } if fallback_evaluation: response["answer_evaluation"] = { "score": _safe_int(fallback_evaluation.get("score", 0)), "feedback": fallback_evaluation.get("feedback", ""), } elif interview_type == "resume": response["answer_evaluation"] = { "status": "processing", } _schedule_post_submit_processing( session_id=session_id, question_id=question_id, question_text=current_question_text, answer=answer, answered_count=answered_count, max_questions=max_questions, interview_type=interview_type, ) submit_ms = await _record_submit_latency(started_at) response["submit_ms"] = submit_ms return response async def get_next_question(session_id: str, user_id: str) -> dict: """Preview next queued question without submitting a new answer.""" db = get_db() redis = get_redis() session_doc = await db[SESSIONS].find_one({"session_id": session_id}) if not session_doc: raise ValueError("Session not found") if session_doc.get("user_id") != user_id: raise ValueError("Unauthorized access to session") session = await redis.hgetall(f"session:{session_id}") if not session: raise ValueError("Interview session not found or expired") if session.get("status") != "in_progress": return { "session_id": session_id, "next_question": None, "is_complete": True, "message": "Interview is not in progress", } await flush_backlog_to_queue( redis=redis, session_id=session_id, ttl_seconds=SESSION_TTL, max_queue_size=MAX_QUEUE_SIZE, ) qid, q = await peek_next_question(redis, session_id) if not qid or not q: return { "session_id": session_id, "next_question": None, "is_complete": False, "message": "No queued question yet", "queue_size": await queue_size(redis, session_id), } return { "session_id": session_id, "next_question": { "question_id": qid, "question": q.get("question", ""), "difficulty": q.get("difficulty", "medium"), "category": q.get("category", "general"), }, "is_complete": False, "queue_size": await queue_size(redis, session_id), "message": "Next question ready", } async def quit_interview(session_id: str, user_id: str) -> dict: """Mark an interview as quit and indicate whether a partial report can be generated.""" db = get_db() redis = get_redis() session = await db[SESSIONS].find_one({"session_id": session_id}) if not session: raise ValueError("Session not found") if session.get("user_id") != user_id: raise ValueError("Unauthorized access to session") if session.get("status") in {"completed", "quit", "quit_with_report"}: return { "session_id": session_id, "report_generated": session.get("status") == "quit_with_report", "message": "Interview already finalized", } quit_at = utc_now() # Update Redis state if still present. redis_session_key = f"session:{session_id}" redis_session = await redis.hgetall(redis_session_key) answered_count = int(redis_session.get("answered_count", 0)) if redis_session else 0 if redis_session: await redis.hset( redis_session_key, mapping={ "status": "quit", "quit_at": quit_at, }, ) await redis.expire(redis_session_key, SESSION_TTL) # Persist quit metadata for admin visibility. await db[SESSIONS].update_one( {"session_id": session_id}, { "$set": { "status": "quit", "quit_at": quit_at, "quit_reason": "user_requested", "answered_count": answered_count, } }, ) has_answers = answered_count > 0 return { "session_id": session_id, "report_generated": has_answers, "message": "Interview quit successfully" if has_answers else "Interview quit. No answers to evaluate yet.", } async def get_session_qa(session_id: str) -> list: """Get all Q&A pairs from Redis for a session.""" redis = get_redis() answer_ids = await redis.lrange(f"session:{session_id}:answers", 0, -1) qa_pairs = [] if answer_ids: for qid in answer_ids: q = await redis.hgetall(f"session:{session_id}:q:{qid}") a = await redis.hgetall(f"session:{session_id}:a:{qid}") if not a: continue question_text = (a.get("question") or q.get("question") or "").strip() answer_text = (a.get("answer") or "").strip() if not question_text or not answer_text: continue qa_pairs.append({ "question_id": qid, "question": question_text, "answer": answer_text, "difficulty": a.get("difficulty") or q.get("difficulty", "medium"), "category": a.get("category") or q.get("category", "general"), }) if qa_pairs: return qa_pairs question_ids = await redis.lrange(f"session:{session_id}:questions", 0, -1) for qid in question_ids: q = await redis.hgetall(f"session:{session_id}:q:{qid}") a = await redis.hgetall(f"session:{session_id}:a:{qid}") if q and a: qa_pairs.append({ "question_id": qid, "question": q.get("question", ""), "answer": a.get("answer", ""), "difficulty": q.get("difficulty", "medium"), "category": q.get("category", "general"), }) return qa_pairs def cleanup_interview_local_state(session_id: str) -> None: """Cleanup process-local state for a completed session.""" _LOCAL_SUMMARIES.pop(session_id, None) _PREGEN_IN_FLIGHT.discard(session_id) _POST_SUBMIT_LOCKS.pop(session_id, None)