interviewbot / backend /services /group_test_service.py
sajith-0701's picture
v5
15145f6
from bson import ObjectId
from database import get_db
from models.collections import GROUP_TESTS, GROUP_TEST_RESULTS, TOPICS, USERS, RESULTS
from utils.helpers import utc_now, str_objectid, str_objectids
# ─── helpers ─────────────────────────────────────────────────────────────────
async def _enrich_group_test(doc: dict, db) -> dict:
"""Attach topic names to a group test document."""
enriched = str_objectid(doc)
topics = []
for tid in (doc.get("topic_ids") or []):
try:
t = await db[TOPICS].find_one({"_id": ObjectId(tid)})
except Exception:
t = None
if t:
topics.append({"id": str(t["_id"]), "name": t.get("name", "")})
enriched["topics"] = topics
return enriched
async def _refresh_topic_statuses(result: dict, db) -> dict:
"""Check RESULTS collection for each topic session and update statuses in-place."""
topic_results = result.get("topic_results") or []
updated = False
for tr in topic_results:
if tr.get("status") == "completed":
continue
session_id = tr.get("session_id")
if not session_id:
continue
report = await db[RESULTS].find_one({"session_id": session_id})
if report:
tr["status"] = "completed"
tr["overall_score"] = report.get("overall_score", 0)
tr["total_questions"] = report.get("total_questions", 0)
tr["completed_at"] = report.get("completed_at")
updated = True
if updated:
result_id = str(result.get("_id") or result.get("id", ""))
all_done = all(tr.get("status") == "completed" for tr in topic_results)
if all_done:
scores = [tr.get("overall_score") or 0 for tr in topic_results]
overall = round(sum(scores) / len(scores), 1) if scores else 0.0
await db[GROUP_TEST_RESULTS].update_one(
{"_id": ObjectId(result_id)},
{
"$set": {
"topic_results": topic_results,
"status": "completed",
"overall_score": overall,
"completed_at": utc_now(),
}
},
)
result["status"] = "completed"
result["overall_score"] = overall
else:
await db[GROUP_TEST_RESULTS].update_one(
{"_id": ObjectId(result_id)},
{"$set": {"topic_results": topic_results}},
)
result["topic_results"] = topic_results
return result
# ─── Admin CRUD ───────────────────────────────────────────────────────────────
async def create_group_test(
name: str,
description: str | None,
topic_ids: list[str],
time_limit_minutes: int | None,
max_attempts: int,
created_by: str,
) -> dict:
db = get_db()
if not topic_ids:
raise ValueError("At least one topic is required")
# Validate every topic exists
for tid in topic_ids:
try:
t = await db[TOPICS].find_one({"_id": ObjectId(tid)})
except Exception:
raise ValueError(f"Invalid topic ID: {tid}")
if not t:
raise ValueError(
f"Topic with ID '{tid}' does not exist. Create it in Topics before adding here."
)
doc = {
"name": name.strip(),
"description": (description or "").strip() or None,
"topic_ids": topic_ids,
"time_limit_minutes": time_limit_minutes,
"max_attempts": max(1, int(max_attempts)),
"is_published": False,
"created_by": created_by,
"created_at": utc_now(),
}
result = await db[GROUP_TESTS].insert_one(doc)
doc["_id"] = result.inserted_id
return await _enrich_group_test(doc, db)
async def list_group_tests(only_published: bool = False) -> list:
db = get_db()
query = {"is_published": True} if only_published else {}
cursor = db[GROUP_TESTS].find(query).sort("created_at", -1)
docs = await cursor.to_list(length=200)
return [await _enrich_group_test(d, db) for d in docs]
async def get_group_test(group_test_id: str) -> dict:
db = get_db()
doc = await db[GROUP_TESTS].find_one({"_id": ObjectId(group_test_id)})
if not doc:
raise ValueError("Group test not found")
return await _enrich_group_test(doc, db)
async def update_group_test(group_test_id: str, data: dict) -> dict:
db = get_db()
# Validate topic IDs if provided
if "topic_ids" in data and data["topic_ids"] is not None:
if not data["topic_ids"]:
raise ValueError("At least one topic is required")
for tid in data["topic_ids"]:
try:
t = await db[TOPICS].find_one({"_id": ObjectId(tid)})
except Exception:
raise ValueError(f"Invalid topic ID: {tid}")
if not t:
raise ValueError(
f"Topic with ID '{tid}' does not exist. Create it in Topics before adding here."
)
update_data = {k: v for k, v in data.items() if v is not None}
if "max_attempts" in update_data:
update_data["max_attempts"] = max(1, int(update_data["max_attempts"]))
update_data["updated_at"] = utc_now()
await db[GROUP_TESTS].update_one(
{"_id": ObjectId(group_test_id)}, {"$set": update_data}
)
return await get_group_test(group_test_id)
async def delete_group_test(group_test_id: str) -> bool:
db = get_db()
result = await db[GROUP_TESTS].delete_one({"_id": ObjectId(group_test_id)})
return result.deleted_count > 0
async def set_group_test_publish(group_test_id: str, is_published: bool) -> dict:
db = get_db()
await db[GROUP_TESTS].update_one(
{"_id": ObjectId(group_test_id)},
{"$set": {"is_published": is_published, "updated_at": utc_now()}},
)
return await get_group_test(group_test_id)
async def get_group_test_results_admin(group_test_id: str) -> list:
"""All student results for a given group test (admin view)."""
db = get_db()
cursor = db[GROUP_TEST_RESULTS].find({"group_test_id": group_test_id}).sort(
"started_at", -1
)
docs = await cursor.to_list(length=500)
return str_objectids(docs)
# ─── Student ──────────────────────────────────────────────────────────────────
async def start_group_test_attempt(group_test_id: str, user_id: str) -> dict:
db = get_db()
group_test = await db[GROUP_TESTS].find_one({"_id": ObjectId(group_test_id)})
if not group_test:
raise ValueError("Group test not found")
if not group_test.get("is_published"):
raise ValueError("This group test is not available")
max_attempts = int(group_test.get("max_attempts") or 1)
attempt_count = await db[GROUP_TEST_RESULTS].count_documents(
{"group_test_id": group_test_id, "user_id": user_id}
)
if attempt_count >= max_attempts:
raise ValueError(
f"You have reached the maximum number of attempts ({max_attempts}) for this group test."
)
user = await db[USERS].find_one({"_id": ObjectId(user_id)})
topic_results = []
for tid in (group_test.get("topic_ids") or []):
t = await db[TOPICS].find_one({"_id": ObjectId(tid)})
topic_results.append(
{
"topic_id": tid,
"topic_name": t.get("name", "") if t else "",
"session_id": None,
"status": "pending",
"overall_score": None,
"total_questions": None,
"completed_at": None,
}
)
doc = {
"group_test_id": group_test_id,
"group_test_name": group_test.get("name", ""),
"user_id": user_id,
"user_name": (user or {}).get("name", ""),
"user_email": (user or {}).get("email", ""),
"attempt_number": attempt_count + 1,
"topic_results": topic_results,
"overall_score": None,
"status": "in_progress",
"started_at": utc_now(),
"completed_at": None,
"time_limit_minutes": group_test.get("time_limit_minutes"),
}
result = await db[GROUP_TEST_RESULTS].insert_one(doc)
doc["_id"] = result.inserted_id
return str_objectid(doc)
async def get_group_test_result(result_id: str, user_id: str) -> dict:
db = get_db()
result = await db[GROUP_TEST_RESULTS].find_one({"_id": ObjectId(result_id)})
if not result:
raise ValueError("Group test result not found")
if result.get("user_id") != user_id:
raise ValueError("Unauthorized")
result = await _refresh_topic_statuses(result, db)
return str_objectid(result)
async def link_topic_session(
result_id: str, user_id: str, topic_id: str, session_id: str
) -> dict:
"""Store session_id for a topic so its completion can be tracked."""
db = get_db()
result = await db[GROUP_TEST_RESULTS].find_one({"_id": ObjectId(result_id)})
if not result:
raise ValueError("Group test result not found")
if result.get("user_id") != user_id:
raise ValueError("Unauthorized")
topic_results = result.get("topic_results") or []
found = False
for tr in topic_results:
if tr.get("topic_id") == topic_id:
tr["session_id"] = session_id
if tr.get("status") == "pending":
tr["status"] = "in_progress"
found = True
break
if not found:
raise ValueError("Topic not found in this group test")
await db[GROUP_TEST_RESULTS].update_one(
{"_id": ObjectId(result_id)},
{"$set": {"topic_results": topic_results}},
)
return await get_group_test_result(result_id, user_id)
async def get_my_group_test_results(user_id: str) -> list:
db = get_db()
cursor = db[GROUP_TEST_RESULTS].find({"user_id": user_id}).sort("started_at", -1)
docs = await cursor.to_list(length=100)
return str_objectids(docs)
async def get_my_group_test_attempt(group_test_id: str, user_id: str) -> dict | None:
"""Return latest attempt result for a group test, or None."""
db = get_db()
doc = await db[GROUP_TEST_RESULTS].find_one(
{"group_test_id": group_test_id, "user_id": user_id},
sort=[("attempt_number", -1)],
)
if not doc:
return None
doc = await _refresh_topic_statuses(doc, db)
return str_objectid(doc)