| """Application state, lifespan, and core RAG pipeline.""" |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import logging |
| import re |
| import time |
| from contextlib import asynccontextmanager |
| from typing import Literal, Optional |
|
|
| import faiss |
| from fastapi import FastAPI, HTTPException |
| from sentence_transformers import SentenceTransformer |
|
|
| from app.analysis import ( |
| count_occurrences, |
| detect_analysis_intent, |
| detect_surah_info, |
| lookup_surah_info, |
| ) |
| from app.arabic_nlp import detect_language, normalize_arabic |
| from app.config import cfg |
| from app.llm import LLMProvider, get_llm_provider |
| from app.prompts import build_messages, not_found_answer |
| from app.search import ( |
| build_context, |
| hybrid_search, |
| lookup_hadith_references, |
| lookup_quran_verses, |
| merge_search_results, |
| rewrite_query, |
| text_search, |
| ) |
|
|
| logger = logging.getLogger("qmodel.state") |
|
|
|
|
| |
| |
| |
| def _results_fallback(results: list, lang: str) -> str: |
| """Generate a direct answer from search results when LLM returns empty. |
| |
| Presents ALL high-scoring results so the user sees every place |
| a text is mentioned (e.g. same verse in multiple surahs). |
| """ |
| if not results: |
| return not_found_answer(lang) |
|
|
| |
| |
| top_score = results[0].get("_score", 0.0) |
| threshold = max(top_score * 0.5, 2.0) if top_score >= 2.0 else 0.0 |
| relevant = [r for r in results if r.get("_score", 0.0) >= threshold] or results[:1] |
|
|
| quran_hits = [r for r in relevant if r.get("type") == "quran"] |
| hadith_hits = [r for r in relevant if r.get("type") == "hadith"] |
|
|
| lines: list[str] = [] |
|
|
| |
| if quran_hits: |
| if len(quran_hits) == 1: |
| r = quran_hits[0] |
| surah_ar = r.get("surah_name_ar", "") |
| surah_en = r.get("surah_name_en", "") |
| verse_num = r.get("verse_number", "") |
| if lang == "arabic": |
| lines.append(f"هذه الآية الكريمة من سورة {surah_ar} ({surah_en})، الآية {verse_num}.") |
| else: |
| lines.append(f"This noble verse is from Surah {surah_en} ({surah_ar}), verse {verse_num}.") |
| else: |
| if lang == "arabic": |
| refs = [ |
| f"سورة {r.get('surah_name_ar', '')} ({r.get('surah_name_en', '')})، الآية {r.get('verse_number', '')}" |
| for r in quran_hits |
| ] |
| lines.append(f"هذا النص الكريم ذُكِر في {len(quran_hits)} مواضع:") |
| for ref in refs: |
| lines.append(f" • {ref}") |
| else: |
| refs = [ |
| f"Surah {r.get('surah_name_en', '')} ({r.get('surah_name_ar', '')}), verse {r.get('verse_number', '')}" |
| for r in quran_hits |
| ] |
| lines.append(f"This text appears in {len(quran_hits)} places:") |
| for ref in refs: |
| lines.append(f" • {ref}") |
|
|
| for r in quran_hits: |
| ar_text = r.get("arabic", "") |
| en_text = r.get("english", "") |
| surah_ar = r.get("surah_name_ar", "") |
| surah_en = r.get("surah_name_en", "") |
| surah_num = r.get("surah_number", "") |
| verse_num = r.get("verse_number", "") |
| lines.append("") |
| lines.append("┌─────────────────────────────────────────────┐") |
| lines.append(f"│ ❝ {ar_text} ❞") |
| lines.append(f"│ 📝 Translation: {en_text}") |
| lines.append(f"│ 📖 Source: سورة {surah_ar} ({surah_en}) | رقم السورة: {surah_num} | الآية: {verse_num}") |
| lines.append("└─────────────────────────────────────────────┘") |
|
|
| |
| if hadith_hits: |
| for r in hadith_hits: |
| ar_text = r.get("arabic", "") |
| en_text = r.get("english", "") |
| source = r.get("source") or r.get("reference", "") |
| grade = r.get("grade", "") |
| grade_str = f" [{grade}]" if grade else "" |
| if lang == "arabic": |
| lines.append(f"\nالحديث الشريف{grade_str}:") |
| else: |
| lines.append(f"\nHadith{grade_str}:") |
| lines.append("") |
| lines.append("┌─────────────────────────────────────────────┐") |
| lines.append(f"│ ❝ {ar_text} ❞") |
| lines.append(f"│ 📝 Translation: {en_text}") |
| lines.append(f"│ 📖 Source: {source}") |
| lines.append("└─────────────────────────────────────────────┘") |
|
|
| if lang == "arabic": |
| lines.append("\nوالله أعلم.") |
| else: |
| lines.append("\nAnd Allah knows best.") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _surah_info_fallback(info: dict, lang: str) -> str: |
| """Generate a direct answer from surah metadata when LLM fails.""" |
| name_ar = info.get("surah_name_ar", "") |
| name_en = info.get("surah_name_en", "") |
| number = info.get("surah_number", "") |
| verses = info.get("total_verses", "") |
| rev = info.get("revelation_type", "") |
| translit = info.get("surah_name_transliteration", "") |
|
|
| rev_ar = "مكية" if rev == "meccan" else "مدنية" if rev == "medinan" else rev |
| rev_en = rev.capitalize() if rev else "" |
|
|
| if lang == "arabic": |
| return ( |
| f"سورة {name_ar} ({translit}) هي السورة رقم {number} في القرآن الكريم.\n" |
| f"عدد آياتها: {verses} آية.\n" |
| f"نوعها: {rev_ar}.\n" |
| f"والله أعلم." |
| ) |
| return ( |
| f"Surah {name_en} ({translit} / {name_ar}) is surah number {number} " |
| f"in the Holy Quran.\n" |
| f"Total verses: {verses}.\n" |
| f"Revelation type: {rev_en}.\n" |
| f"And Allah knows best." |
| ) |
|
|
|
|
| |
| |
| |
| _QUOTE_RE = re.compile(r"❝\s*(.+?)\s*❞", re.DOTALL) |
| _SURAH_REF_AR = re.compile( |
| r"(?:سورة|سوره)\s+([\u0600-\u06FF\u0750-\u077F\s]+?)[\s,،]*" |
| r"(?:الآية|آية|الايه|ايه)?\s*(\d+)", |
| ) |
| _SURAH_REF_EN = re.compile( |
| r"(?:surah|sura)\s+([A-Za-z\-' ]+?)[\s,]*" |
| r"(?:ayah|verse|ayat)?\s*(\d+)", |
| re.I, |
| ) |
| _SURAH_NUM_REF = re.compile(r"\b(\d{1,3})\s*:\s*(\d{1,3})\b") |
|
|
|
|
| def _build_valid_refs(results: list) -> set: |
| """Build a set of valid (surah_number, verse_number) tuples from results.""" |
| refs = set() |
| for r in results: |
| if r.get("type") == "quran": |
| sn = r.get("surah_number") |
| vn = r.get("verse_number") |
| if sn and vn: |
| refs.add((int(sn), int(vn))) |
| return refs |
|
|
|
|
| def _build_surah_name_map(results: list) -> dict: |
| """Build a map from normalized surah names to surah numbers.""" |
| name_map = {} |
| for r in results: |
| if r.get("type") == "quran": |
| sn = r.get("surah_number") |
| if sn: |
| sn = int(sn) |
| for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"): |
| name = r.get(field, "").strip().lower() |
| name = re.sub(r"^(ال|al[\-\s']*)", "", name).strip() |
| if name: |
| name_map[name] = sn |
| return name_map |
|
|
|
|
| def _verify_references(answer: str, results: list) -> str: |
| """Check that surah/verse references in the answer match retrieved results. |
| |
| Replaces hallucinated references with corrected ones or warnings. |
| """ |
| valid_refs = _build_valid_refs(results) |
| if not valid_refs: |
| return answer |
|
|
| name_map = _build_surah_name_map(results) |
|
|
| |
| def _check_num_ref(m: re.Match) -> str: |
| sn, vn = int(m.group(1)), int(m.group(2)) |
| if (sn, vn) in valid_refs: |
| return m.group(0) |
| |
| logger.warning("Hallucinated reference: %d:%d not in sources", sn, vn) |
| |
| if len(valid_refs) == 1: |
| correct = next(iter(valid_refs)) |
| return f"{correct[0]}:{correct[1]}" |
| return m.group(0) |
|
|
| answer = _SURAH_NUM_REF.sub(_check_num_ref, answer) |
|
|
| |
| def _check_ar_ref(m: re.Match) -> str: |
| raw_name = m.group(1).strip() |
| verse_num = int(m.group(2)) |
| name_norm = re.sub(r"^(ال)", "", raw_name).strip().lower() |
|
|
| matched_sn = name_map.get(name_norm) |
| if matched_sn and (matched_sn, verse_num) in valid_refs: |
| return m.group(0) |
|
|
| |
| for (sn, vn) in valid_refs: |
| |
| for r in results: |
| if r.get("type") == "quran" and int(r.get("surah_number", 0)) == sn and int(r.get("verse_number", 0)) == vn: |
| correct_name = r.get("surah_name_ar", "") |
| logger.warning( |
| "Hallucinated reference: سورة %s آية %d -> correcting to سورة %s آية %d", |
| raw_name, verse_num, correct_name, vn, |
| ) |
| return f"سورة {correct_name}، الآية {vn}" |
| return m.group(0) |
|
|
| answer = _SURAH_REF_AR.sub(_check_ar_ref, answer) |
|
|
| |
| def _check_en_ref(m: re.Match) -> str: |
| raw_name = m.group(1).strip() |
| verse_num = int(m.group(2)) |
| name_norm = re.sub(r"^(al[\-\s']*)", "", raw_name, flags=re.I).strip().lower() |
|
|
| matched_sn = name_map.get(name_norm) |
| if matched_sn and (matched_sn, verse_num) in valid_refs: |
| return m.group(0) |
|
|
| for (sn, vn) in valid_refs: |
| for r in results: |
| if r.get("type") == "quran" and int(r.get("surah_number", 0)) == sn and int(r.get("verse_number", 0)) == vn: |
| correct_name = r.get("surah_name_en", "") |
| logger.warning( |
| "Hallucinated reference: Surah %s verse %d -> correcting to Surah %s verse %d", |
| raw_name, verse_num, correct_name, vn, |
| ) |
| return f"Surah {correct_name}, verse {vn}" |
| return m.group(0) |
|
|
| answer = _SURAH_REF_EN.sub(_check_en_ref, answer) |
|
|
| return answer |
|
|
|
|
| def _verify_citations(answer: str, results: list) -> str: |
| """Check that quoted Arabic text in the answer actually appears in retrieved results. |
| |
| If a quoted block doesn't match any source, replace it with a warning. |
| This prevents the model from fabricating hadith or verse text. |
| """ |
| source_texts_raw = [] |
| for r in results: |
| for field in ("arabic", "english", "text"): |
| val = r.get(field, "") |
| if val: |
| source_texts_raw.append(re.sub(r"\s+", " ", val.strip())) |
|
|
| |
| source_texts_norm = [normalize_arabic(s) for s in source_texts_raw] |
|
|
| def _check_quote(m: re.Match) -> str: |
| quoted_raw = re.sub(r"\s+", " ", m.group(1).strip()) |
| quoted_norm = normalize_arabic(quoted_raw) |
|
|
| if len(quoted_norm) < 10: |
| return m.group(0) |
|
|
| for src_raw, src_norm in zip(source_texts_raw, source_texts_norm): |
| |
| if quoted_raw in src_raw or src_raw in quoted_raw: |
| return m.group(0) |
|
|
| |
| if quoted_norm in src_norm or src_norm in quoted_norm: |
| return m.group(0) |
|
|
| |
| q_words = set(quoted_norm.split()) |
| s_words = set(src_norm.split()) |
| if q_words and len(q_words & s_words) / len(q_words) >= 0.5: |
| return m.group(0) |
|
|
| |
| logger.warning("Hallucination detected: quoted text not in sources: %.80s...", quoted_norm) |
| return "❝ ⚠️ [تم حذف نص غير موثق — النص غير موجود في قاعدة البيانات] ❞" |
|
|
| return _QUOTE_RE.sub(_check_quote, answer) |
|
|
|
|
| |
| |
| |
| def _verify_surah_info(answer: str, surah_info: dict) -> str: |
| """Verify and correct surah metadata in the LLM answer. |
| |
| Replaces hallucinated surah names and verse counts with the correct |
| values from the authoritative surah_info lookup. |
| """ |
| if not surah_info: |
| return answer |
|
|
| correct_name_ar = surah_info.get("surah_name_ar", "") |
| correct_name_en = surah_info.get("surah_name_en", "") |
| correct_verses = surah_info.get("total_verses") |
| correct_number = surah_info.get("surah_number") |
| correct_type = surah_info.get("revelation_type", "") |
| correct_translit = surah_info.get("surah_name_transliteration", "") |
|
|
| correct_ar_norm = normalize_arabic(correct_name_ar).lower() |
| correct_ar_bare = re.sub(r"^ال", "", correct_ar_norm).strip() |
|
|
| |
| _NOT_SURAH_NAMES = { |
| "مكية", "مكي", "مدنية", "مدني", "باللغة", "من", "في", "هي", |
| "التي", "الكريمة", "المباركة", "هذه", "تلك", |
| } |
| _NOT_SURAH_NAMES_NORM = {normalize_arabic(w).lower() for w in _NOT_SURAH_NAMES} |
|
|
| |
| |
| |
| def _fix_surah_name_ar(m: re.Match) -> str: |
| found_name = m.group(1).strip() |
| found_norm = normalize_arabic(found_name).lower() |
| found_bare = re.sub(r"^ال", "", found_norm).strip() |
| |
| if found_name in _NOT_SURAH_NAMES or found_norm in _NOT_SURAH_NAMES_NORM: |
| return m.group(0) |
| if found_bare == correct_ar_bare or found_norm == correct_ar_norm: |
| return m.group(0) |
| |
| |
| if found_bare.startswith(correct_ar_bare) or found_norm.startswith(correct_ar_norm): |
| return m.group(0) |
| logger.warning( |
| "Surah info hallucination: سورة %s -> correcting to سورة %s", |
| found_name, correct_name_ar, |
| ) |
| return m.group(0).replace(found_name, correct_name_ar) |
|
|
| |
| |
| |
| answer = re.sub( |
| r"(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F]+(?:\s[\u0621-\u06FF\u0750-\u077F]+)?)" |
| r"(?=[\s,،؛؟\.\n?!]|$)", |
| _fix_surah_name_ar, |
| answer, |
| ) |
|
|
| |
| if correct_name_en: |
| def _fix_surah_name_en(m: re.Match) -> str: |
| found = m.group(1).strip() |
| if found.lower() == correct_name_en.lower(): |
| return m.group(0) |
| |
| if correct_translit and found.lower() == correct_translit.lower(): |
| return m.group(0) |
| logger.warning( |
| "Surah info hallucination: Surah %s -> correcting to Surah %s", |
| found, correct_name_en, |
| ) |
| return m.group(0).replace(found, correct_name_en) |
|
|
| answer = re.sub( |
| r"(?:Surah|sura)\s+([A-Za-z'\-]+(?:[\s\-][A-Za-z'\-]+)*)", |
| _fix_surah_name_en, |
| answer, |
| flags=re.I, |
| ) |
|
|
| |
| if correct_number is not None: |
| def _fix_surah_number(m: re.Match) -> str: |
| num = int(m.group(2)) |
| if num == correct_number: |
| return m.group(0) |
| logger.warning( |
| "Surah info hallucination: surah number %d -> correcting to %d", |
| num, correct_number, |
| ) |
| return m.group(1) + " " + str(correct_number) |
|
|
| |
| answer = re.sub( |
| r"(رقم[ها]*|ترتيب[ها]*)\s+(\d+)", |
| _fix_surah_number, |
| answer, |
| ) |
| |
| answer = re.sub( |
| r"((?:surah\s+)?number)\s+(\d+)", |
| _fix_surah_number, |
| answer, |
| flags=re.I, |
| ) |
|
|
| |
| if correct_verses is not None: |
| def _fix_verse_count(m: re.Match) -> str: |
| num = int(m.group(1)) |
| if num == correct_verses: |
| return m.group(0) |
| logger.warning( |
| "Surah info hallucination: %d verses -> correcting to %d", |
| num, correct_verses, |
| ) |
| return m.group(0).replace(m.group(1), str(correct_verses)) |
|
|
| |
| answer = re.sub( |
| r"(\d+)\s*(?:آية|آيات|آيه)", |
| _fix_verse_count, |
| answer, |
| ) |
| |
| answer = re.sub( |
| r"(الآية|الايه)\s+(\d+)", |
| lambda m: m.group(1) + " " + (str(correct_verses) if int(m.group(2)) != correct_verses else m.group(2)), |
| answer, |
| ) |
| |
| answer = re.sub( |
| r"(\d+)\s+(?:verses|ayat|ayahs)", |
| _fix_verse_count, |
| answer, |
| flags=re.I, |
| ) |
| |
| answer = re.sub( |
| r"(عدد[ها]*\s+)(\d+)", |
| lambda m: m.group(1) + (str(correct_verses) if int(m.group(2)) != correct_verses else m.group(2)), |
| answer, |
| ) |
|
|
| return answer |
|
|
|
|
| def infer_hadith_grade(item: dict) -> dict: |
| """Infer hadith grade from collection name if not present.""" |
| if item.get("type") != "hadith" or item.get("grade"): |
| return item |
|
|
| collection = item.get("collection", "").lower() |
| reference = item.get("reference", "").lower() |
| combined = f"{collection} {reference}" |
|
|
| if any(s in combined for s in ["sahih al-bukhari", "sahih bukhari", "bukhari"]): |
| item["grade"] = "Sahih" |
| elif any(s in combined for s in ["sahih muslim", "sahih al-muslim"]): |
| item["grade"] = "Sahih" |
| elif any(s in combined for s in ["sunan an-nasai", "sunan an-nasa", "nasa'i", "nasa"]): |
| item["grade"] = "Sahih" |
| elif any(s in combined for s in ["jami at-tirmidhi", "tirmidhi", "at-tirmidhi"]): |
| item["grade"] = "Hasan" |
| elif any(s in combined for s in ["sunan abu dawood", "abu dawood", "abo daud", "abou daoude"]): |
| item["grade"] = "Hasan" |
| elif any(s in combined for s in ["sunan ibn majah", "ibn majah", "ibn maja"]): |
| item["grade"] = "Hasan" |
| elif any(s in combined for s in ["muwatta malik", "muwatta", "malik"]): |
| item["grade"] = "Hasan" |
| elif any(s in combined for s in ["musnad ahmad", "ahmad", "ahmed"]): |
| item["grade"] = "Hasan/Sahih" |
| elif any(s in combined for s in ["sunan al-darimi", "darimi", "al-darimi"]): |
| item["grade"] = "Hasan" |
|
|
| return item |
|
|
|
|
| |
| |
| |
| class AppState: |
| embed_model: Optional[SentenceTransformer] = None |
| faiss_index: Optional[faiss.Index] = None |
| dataset: Optional[list] = None |
| llm: Optional[LLMProvider] = None |
| ready: bool = False |
|
|
|
|
| state = AppState() |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Initialize state on startup.""" |
| logger.info("Loading embed model: %s", cfg.EMBED_MODEL) |
| state.embed_model = SentenceTransformer(cfg.EMBED_MODEL) |
|
|
| logger.info("Loading FAISS index: %s", cfg.FAISS_INDEX) |
| state.faiss_index = faiss.read_index(cfg.FAISS_INDEX) |
|
|
| logger.info("Loading metadata: %s", cfg.METADATA_FILE) |
| with open(cfg.METADATA_FILE, "r", encoding="utf-8") as f: |
| state.dataset = json.load(f) |
|
|
| state.dataset = [infer_hadith_grade(item) for item in state.dataset] |
|
|
| logger.info("Initializing LLM provider: %s", cfg.LLM_BACKEND) |
| state.llm = get_llm_provider() |
|
|
| state.ready = True |
| logger.info( |
| "QModel v6 ready | backend=%s | dataset=%d | faiss=%d | threshold=%.2f", |
| cfg.LLM_BACKEND, |
| len(state.dataset) if state.dataset else 0, |
| state.faiss_index.ntotal if state.faiss_index else 0, |
| cfg.CONFIDENCE_THRESHOLD, |
| ) |
| yield |
| state.ready = False |
| logger.info("QModel shutdown") |
|
|
|
|
| def check_ready(): |
| """Raise 503 if service isn't ready.""" |
| if not state.ready: |
| raise HTTPException( |
| status_code=503, |
| detail="Service is still initialising. Please retry shortly.", |
| ) |
|
|
|
|
| |
| |
| |
| async def run_rag_pipeline( |
| question: str, |
| top_k: int = cfg.TOP_K_RETURN, |
| source_type: Optional[Literal["quran", "hadith"]] = None, |
| grade_filter: Optional[str] = None, |
| ) -> dict: |
| """Core RAG pipeline: rewrite -> search -> verify -> generate.""" |
| t0 = time.perf_counter() |
|
|
| |
| rewrite = await rewrite_query(question, state.llm) |
| intent = rewrite.get("intent", "general") |
|
|
| |
| surah_task = detect_surah_info(question, rewrite) |
| kw_task = detect_analysis_intent(question, rewrite) |
| search_task = hybrid_search( |
| question, rewrite, |
| state.embed_model, state.faiss_index, state.dataset, |
| top_k, source_type, grade_filter, |
| ) |
| surah_det, analysis_kw, results = await asyncio.gather( |
| surah_task, kw_task, search_task, |
| ) |
|
|
| |
| direct_queries = list(dict.fromkeys([ |
| question, |
| rewrite.get("ar_query", ""), |
| rewrite.get("en_query", ""), |
| ])) |
| direct_results = [] |
| if source_type in (None, "quran"): |
| for query in direct_queries: |
| direct_results.extend(lookup_quran_verses(query, state.dataset, limit=top_k)) |
| if source_type in (None, "hadith"): |
| for query in direct_queries: |
| direct_results.extend(lookup_hadith_references(query, state.dataset, limit=top_k)) |
| results = merge_search_results(direct_results, results, limit=top_k) |
|
|
| |
| |
| |
| ar_q = rewrite.get("ar_query", "") |
|
|
| |
| text_src = source_type |
| if not text_src and intent in ("tafsir", "count", "surah_info"): |
| text_src = "quran" |
| elif not text_src and intent in ("hadith", "auth"): |
| text_src = "hadith" |
|
|
| text_limit = top_k * 2 if intent in ("auth", "hadith", "tafsir") else top_k |
| text_results = [] |
| for q in dict.fromkeys([ar_q, question]): |
| if not q: |
| continue |
| for hit in text_search(q, state.dataset, text_src, limit=text_limit): |
| |
| if intent == "auth" and hit.get("_score", 0) > 2.0: |
| hit = {**hit, "_score": hit["_score"] + 1.0} |
| text_results.append(hit) |
| results = merge_search_results(results, text_results, limit=top_k) |
|
|
| |
| surah_info = None |
| if surah_det: |
| surah_info = await lookup_surah_info(surah_det["surah_query"], state.dataset) |
| if surah_info: |
| intent = "surah_info" |
| logger.info( |
| "Surah info: %s -> %s (%d verses)", |
| surah_det["surah_query"], |
| surah_info["surah_name_en"], |
| surah_info.get("total_verses", 0), |
| ) |
|
|
| |
| analysis = None |
| if analysis_kw and not surah_info: |
| count_src = "hadith" if intent in ("hadith", "auth") else "quran" |
| analysis = await count_occurrences(analysis_kw, state.dataset, source_type=count_src) |
| logger.info("Analysis: kw=%s src=%s count=%d", analysis_kw, count_src, analysis["total_count"]) |
|
|
| |
| lang = detect_language(question) |
| top_score = results[0].get("_score", 0.0) if results else 0.0 |
|
|
| logger.info( |
| "Search done | intent=%s | top_score=%.3f | threshold=%.2f", |
| intent, top_score, cfg.CONFIDENCE_THRESHOLD, |
| ) |
|
|
| |
| if not surah_info and top_score < cfg.CONFIDENCE_THRESHOLD: |
| logger.warning( |
| "Low confidence (%.3f < %.2f) — returning safe fallback", |
| top_score, cfg.CONFIDENCE_THRESHOLD, |
| ) |
| return { |
| "answer": not_found_answer(lang), |
| "language": lang, |
| "intent": intent, |
| "analysis": analysis, |
| "sources": results, |
| "top_score": top_score, |
| "latency_ms": int((time.perf_counter() - t0) * 1000), |
| } |
|
|
| |
| if surah_info and intent == "surah_info": |
| answer = _surah_info_fallback(surah_info, lang) |
| latency = int((time.perf_counter() - t0) * 1000) |
| logger.info( |
| "Pipeline done (surah_info deterministic) | lang=%s | %d ms", |
| lang, latency, |
| ) |
| return { |
| "answer": answer, |
| "language": lang, |
| "intent": intent, |
| "analysis": None, |
| "sources": results, |
| "top_score": top_score, |
| "latency_ms": latency, |
| } |
|
|
| |
| context = build_context(results) |
| messages = build_messages(context, question, lang, intent, analysis, surah_info) |
|
|
| try: |
| answer = await state.llm.chat( |
| messages, |
| max_tokens=cfg.MAX_TOKENS, |
| temperature=cfg.TEMPERATURE, |
| ) |
| |
| answer = re.sub(r"<think>[\s\S]*?</think>", "", answer, flags=re.IGNORECASE) |
| answer = re.sub(r"<think>[\s\S]*$", "", answer, flags=re.IGNORECASE) |
| answer = answer.strip() |
|
|
| if not answer: |
| logger.warning("LLM returned empty answer — using results fallback") |
| if surah_info: |
| answer = _surah_info_fallback(surah_info, lang) |
| elif results: |
| answer = _results_fallback(results, lang) |
| else: |
| answer = not_found_answer(lang) |
| except Exception as exc: |
| logger.error("LLM call failed: %s", exc) |
| raise HTTPException(status_code=502, detail="LLM service unavailable") |
|
|
| |
| answer = _verify_citations(answer, results) |
| answer = _verify_references(answer, results) |
| if surah_info: |
| answer = _verify_surah_info(answer, surah_info) |
|
|
| latency = int((time.perf_counter() - t0) * 1000) |
| logger.info( |
| "Pipeline done | intent=%s | lang=%s | top_score=%.3f | %d ms", |
| intent, lang, top_score, latency, |
| ) |
|
|
| return { |
| "answer": answer, |
| "language": lang, |
| "intent": intent, |
| "analysis": analysis, |
| "sources": results, |
| "top_score": top_score, |
| "latency_ms": latency, |
| } |
|
|