# ========================================= # 1. IMPORTS # ========================================= import asyncio import os import json import uuid import cloudinary import cloudinary.uploader import firebase_admin from firebase_admin import credentials, firestore from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel from gradio_client import Client from google.cloud.firestore_v1.base_query import FieldFilter import edge_tts from typing import Optional, List from dotenv import load_dotenv from contextlib import asynccontextmanager # ========================================= # 2. INITIALIZATIONS & CONFIG # ========================================= load_dotenv() if not firebase_admin._apps: fb_json = os.getenv("FIREBASE_JSON") if fb_json: cred_dict = json.loads(fb_json) cred = credentials.Certificate(cred_dict) else: cred = credentials.Certificate("serviceAccountKey.json") firebase_admin.initialize_app(cred) db = firestore.client() cloudinary.config( cloud_name=os.getenv("CLOUD_NAME"), api_key=os.getenv("API_KEY"), api_secret=os.getenv("API_SECRET"), secure=True ) HF_SPACE = "Fayza38/Question_and_answer_model" client = None # ========================================= # 3. MODELS & CONSTANTS # ========================================= TECH_CATEGORIES = { 0: "Security", 1: "BackEnd", 2: "Networking", 3: "FrontEnd", 4: "DataEngineering", 5: "WebDevelopment", 6: "FullStack", 7: "VersionControl", 8: "SystemDesign", 9: "MachineLearning", 10: "LanguagesAndFrameworks", 11: "DatabaseSystems", 12: "ArtificialIntelligence", 13: "SoftwareTesting", 14: "DistributedSystems", 15: "DevOps", 16: "LowLevelSystems", 17: "DatabaseAndSql", 18: "GeneralProgramming", 19: "DataStructures", 20: "Algorithms" } DIFFICULTY_MAP = {0: "Easy", 1: "Intermediate", 2: "Hard"} class GenerateSessionRequest(BaseModel): sessionId: str sessionType: int # 0: Behavioral, 1: Technical difficultyLevel: int = 0 trackName: Optional[int] = None class CleanupRequest(BaseModel): audioUrls: List[str] # ========================================= # 4. LIFESPAN MANAGEMENT # ========================================= @asynccontextmanager async def lifespan(app: FastAPI): global client print("Connecting to Hugging Face Model...") try: loop = asyncio.get_event_loop() client = await loop.run_in_executor(None, lambda: Client(HF_SPACE)) print("Model Connected Successfully!") except Exception as e: print(f"Model Connection Failed: {e}") yield print("Shutting down Intervision Service...") app = FastAPI(title="Intervision AI Question Service", lifespan=lifespan) # ========================================= # 5. CORE LOGIC HELPERS # ========================================= async def generate_audio(text, filename): try: communicate = edge_tts.Communicate(text, "en-US-GuyNeural", rate="-15%") await communicate.save(filename) upload_result = cloudinary.uploader.upload( filename, resource_type="video", folder="interview_audio" ) if os.path.exists(filename): os.remove(filename) return upload_result["secure_url"] except Exception as e: print(f"Audio Generation Error: {e}") if os.path.exists(filename): os.remove(filename) return None async def safe_generate(prompt, retries=5): if client is None: raise Exception("AI Client is not initialized.") for attempt in range(retries): try: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, lambda: client.predict(prompt=prompt, api_name="/generate_questions")) except Exception as e: if attempt == retries - 1: raise e await asyncio.sleep(5) def parse_question_output(raw_output: str): if not raw_output: return None, None text = raw_output.split("assistant")[-1].strip() if "assistant" in raw_output else raw_output if "Q:" in text and "A:" in text: try: parts = text.split("A:") q = parts[0].replace("Q:", "").strip() a = parts[1].split("<|im_end|>")[0].strip() return q, a except: return None, None return None, None async def refill_specific_pool(track_id: int, difficulty: int, count: int, session_type: int): while client is None: await asyncio.sleep(5) if session_type == 0: prompt = "Generate ONE unique simple behavioral interview question. Format: Q: [Question] A: [Answer]" track_text = "Behavioral" else: track_text = TECH_CATEGORIES.get(track_id) level_text = DIFFICULTY_MAP.get(difficulty) prompt = f"Generate ONE unique {track_text} question for {level_text} level. Format: Q: [Question] A: [Answer]" success_count = 0 while success_count < count: try: raw_output = await safe_generate(prompt) q, a = parse_question_output(raw_output) if q and a: audio_url = await generate_audio(q, f"{uuid.uuid4()}.mp3") if audio_url: db.collection("questions_pool").add({ "session_type": session_type, "track_id": track_id if session_type == 1 else -1, "difficulty": difficulty if session_type == 1 else 0, "questionText": q, "questionIdealAnswer": a, "audio_url": audio_url, "created_at": firestore.SERVER_TIMESTAMP }) success_count += 1 print(f"Successfully added {track_text} question {success_count}/{count}") await asyncio.sleep(2) except Exception as e: print(f"Refill error: {e}") await asyncio.sleep(5) # ========================================= # 6. API ENDPOINTS # ========================================= @app.post("/generate-session") async def generate_session(request: GenerateSessionRequest, background_tasks: BackgroundTasks): s_type, t_id = request.sessionType, request.trackName # FORCED ADJUSTMENT: Behavioral always uses difficulty 0 if s_type == 0: diff = 0 else: diff = request.difficultyLevel query = db.collection("questions_pool").where(filter=FieldFilter("session_type", "==", s_type)) if s_type == 1: # Technical if t_id is None: raise HTTPException(status_code=400, detail="trackName is required for technical sessions.") query = query.where(filter=FieldFilter("track_id", "==", t_id)).where(filter=FieldFilter("difficulty", "==", diff)) else: # For behavioral, we filter specifically by the forced difficulty 0 query = query.where(filter=FieldFilter("difficulty", "==", 0)) docs = query.limit(10).get() final_questions = [] for index, doc in enumerate(docs, start=1): data = doc.to_dict() final_questions.append({ "question_id": index, "text": data["questionText"], "expected_answer": data["questionIdealAnswer"], "audio_url": data.get("audio_url", "") }) db.collection("questions_pool").document(doc.id).delete() async def check_and_refill_background(): snap = query.count().get() current_count = snap[0][0].value if current_count < 50: print(f"Stock for {('Behavioral' if s_type==0 else TECH_CATEGORIES[t_id])} is low ({current_count}). Refilling...") await refill_specific_pool(t_id if s_type == 1 else -1, diff, 50 - current_count, s_type) background_tasks.add_task(check_and_refill_background) if not final_questions: raise HTTPException(status_code=503, detail="The question pool is empty. Please try again in a few minutes.") return {"session_id": request.sessionId, "questions": final_questions} @app.get("/admin/prefill-all") async def prefill_all(background_tasks: BackgroundTasks): """Checks and refills ALL tracks and behavioral questions to 50 items each.""" async def run_sync(): print("Starting Global Smart Prefill...") # 1. Sync Behavioral beh_snap = db.collection("questions_pool").where(filter=FieldFilter("session_type", "==", 0)).count().get() beh_count = beh_snap[0][0].value if beh_count < 50: print(f"Syncing Behavioral: adding {50-beh_count}") await refill_specific_pool(-1, 0, 50 - beh_count, 0) # 2. Sync Technical for t_id, t_name in TECH_CATEGORIES.items(): for d_id, d_name in DIFFICULTY_MAP.items(): query = db.collection("questions_pool")\ .where(filter=FieldFilter("session_type", "==", 1))\ .where(filter=FieldFilter("track_id", "==", t_id))\ .where(filter=FieldFilter("difficulty", "==", d_id)) snap = query.count().get() current = snap[0][0].value if current < 50: print(f"Syncing {t_name} ({d_name}): adding {50-current}") await refill_specific_pool(t_id, d_id, 50 - current, 1) print("Global Smart Prefill Completed!") background_tasks.add_task(run_sync) return {"message": "Global prefill process started in the background."} @app.post("/cleanup-audio") async def cleanup_audio(request: CleanupRequest, background_tasks: BackgroundTasks): def delete_job(urls): for url in urls: try: public_id = "interview_audio/" + url.split('/')[-1].split('.')[0] cloudinary.uploader.destroy(public_id, resource_type="video") except: pass background_tasks.add_task(delete_job, request.audioUrls) return {"message": "Cloudinary cleanup process initiated."} @app.get("/health") async def health(): return {"status": "active", "ai_model_connected": client is not None} @app.get("/") async def root(): return {"app": "Intervision AI Engine", "status": "Running"} if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)