interviewbot / backend /services /evaluation_service.py
sajith-0701's picture
v4.1
e39cad1
from database import get_db, get_redis
from bson import ObjectId
from models.collections import RESULTS, ANSWERS, SESSIONS
from utils.helpers import utc_now
from utils.gemini import evaluate_interview
from services.interview_service import get_session_qa, cleanup_interview_local_state
def _json_safe(value):
if isinstance(value, ObjectId):
return str(value)
if isinstance(value, dict):
return {k: _json_safe(v) for k, v in value.items()}
if isinstance(value, list):
return [_json_safe(item) for item in value]
return value
def _safe_int(value, default: int = 0) -> int:
try:
return int(value)
except Exception:
return default
def _is_placeholder_report(report: dict) -> bool:
strengths = [str(item).strip().lower() for item in (report.get("strengths") or []) if str(item).strip()]
weaknesses = [str(item).strip().lower() for item in (report.get("weaknesses") or []) if str(item).strip()]
recommendations = [str(item).strip().lower() for item in (report.get("recommendations") or []) if str(item).strip()]
if any("unable to evaluate" in item for item in strengths + weaknesses):
return True
if any("please retry the interview" in item for item in recommendations):
return True
if not (report.get("detailed_scores") or []):
return True
return False
async def generate_report(session_id: str, user_id: str) -> dict:
"""Generate final evaluation report from Redis Q&A data using Gemini."""
db = get_db()
redis = get_redis()
# Check if report already exists
existing = await db[RESULTS].find_one({"session_id": session_id})
if existing and not _is_placeholder_report(existing):
existing["id"] = str(existing["_id"])
del existing["_id"]
return _json_safe(existing)
# Get session info
session = await db[SESSIONS].find_one({"session_id": session_id})
if not session:
raise ValueError("Session not found")
if session.get("user_id") != user_id:
raise ValueError("Unauthorized access to session")
role_title = session.get("role_title", "Software Developer")
session_status = session.get("status", "completed")
quit_at = session.get("quit_at")
redis_session = await redis.hgetall(f"session:{session_id}")
# Get all Q&A from Redis
qa_pairs = await get_session_qa(session_id)
if not qa_pairs:
archived_answers = await db[ANSWERS].find(
{"session_id": session_id, "user_id": user_id}
).sort("stored_at", 1).to_list(length=200)
for item in archived_answers:
question = (item.get("question") or "").strip()
answer = (item.get("answer") or "").strip()
if not question or not answer:
continue
qa_pairs.append(
{
"question_id": item.get("question_id") or "",
"question": question,
"answer": answer,
"difficulty": item.get("difficulty", "medium"),
"category": item.get("category", "general"),
}
)
if not qa_pairs:
raise ValueError("No Q&A data found for this session")
# Batch evaluate with Gemini
evaluation = await evaluate_interview(qa_pairs, role_title)
# Store results in MongoDB
result_doc = {
"session_id": session_id,
"user_id": user_id,
"role_title": role_title,
"session_status": session_status,
"is_quit": session_status in {"quit", "quit_with_report"},
"quit_at": quit_at,
"overall_score": evaluation.get("overall_score", 0),
"total_questions": len(qa_pairs),
"detailed_scores": evaluation.get("detailed_scores", []),
"strengths": evaluation.get("strengths", []),
"weaknesses": evaluation.get("weaknesses", []),
"recommendations": evaluation.get("recommendations", []),
"generation_stats": {
"gemini_calls": _safe_int((redis_session or {}).get("metrics_gemini_calls", 0)),
"gemini_questions": _safe_int((redis_session or {}).get("metrics_gemini_questions", 0)),
"bank_questions": _safe_int((redis_session or {}).get("metrics_bank_questions", 0)),
"bank_shortfall": _safe_int((redis_session or {}).get("metrics_bank_shortfall", 0)),
"generation_batches": _safe_int((redis_session or {}).get("metrics_generation_batches", 0)),
},
"completed_at": utc_now(),
}
if existing:
await db[RESULTS].update_one(
{"_id": existing["_id"]},
{"$set": result_doc},
)
result_doc_id = str(existing["_id"])
else:
inserted = await db[RESULTS].insert_one(result_doc)
result_doc_id = str(inserted.inserted_id)
# Store final answers in MongoDB
for qa in qa_pairs:
question_id = (qa.get("question_id") or "").strip()
upsert_filter = {
"session_id": session_id,
"user_id": user_id,
}
if question_id:
upsert_filter["question_id"] = question_id
else:
upsert_filter["question"] = qa.get("question", "")
await db[ANSWERS].update_one(
upsert_filter,
{
"$set": {
"question_id": question_id,
"question": qa.get("question", ""),
"answer": qa.get("answer", ""),
"difficulty": qa.get("difficulty", "medium"),
"category": qa.get("category", "general"),
"stored_at": utc_now(),
}
},
upsert=True,
)
# Clean up Redis session data
question_ids = await redis.lrange(f"session:{session_id}:questions", 0, -1)
keys_to_delete = [
f"session:{session_id}",
f"session:{session_id}:questions",
f"session:{session_id}:pending_questions",
f"session:{session_id}:question_queue",
f"session:{session_id}:question_backlog",
f"session:{session_id}:context_cache",
f"session:{session_id}:asked_questions_set",
f"session:{session_id}:answers",
]
for qid in question_ids:
keys_to_delete.append(f"session:{session_id}:q:{qid}")
keys_to_delete.append(f"session:{session_id}:a:{qid}")
if keys_to_delete:
await redis.delete(*keys_to_delete)
if session_status in {"quit", "quit_with_report"}:
await db[SESSIONS].update_one(
{"session_id": session_id},
{
"$set": {
"status": "quit_with_report",
"report_generated_at": utc_now(),
}
},
)
elif session_status == "completed":
await db[SESSIONS].update_one(
{"session_id": session_id},
{"$set": {"status": "completed_with_report", "report_generated_at": utc_now()}},
)
cleanup_interview_local_state(session_id)
result_doc["id"] = result_doc_id
return _json_safe(result_doc)