QModel / app /state.py
aelgendy's picture
Upload folder using huggingface_hub
c566217 verified
"""Application state, lifespan, and core RAG pipeline."""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from contextlib import asynccontextmanager
from typing import Literal, Optional
import faiss
from fastapi import FastAPI, HTTPException
from sentence_transformers import SentenceTransformer
from app.analysis import (
count_occurrences,
detect_analysis_intent,
detect_surah_info,
lookup_surah_info,
)
from app.arabic_nlp import detect_language, normalize_arabic
from app.config import cfg
from app.llm import LLMProvider, get_llm_provider
from app.prompts import build_messages, not_found_answer
from app.search import (
build_context,
hybrid_search,
lookup_hadith_references,
lookup_quran_verses,
merge_search_results,
rewrite_query,
text_search,
)
logger = logging.getLogger("qmodel.state")
# ═══════════════════════════════════════════════════════════════════════
# SURAH INFO PROGRAMMATIC FALLBACK
# ═══════════════════════════════════════════════════════════════════════
def _results_fallback(results: list, lang: str) -> str:
"""Generate a direct answer from search results when LLM returns empty.
Presents ALL high-scoring results so the user sees every place
a text is mentioned (e.g. same verse in multiple surahs).
"""
if not results:
return not_found_answer(lang)
# Include all results whose score is at least 50% of the top score,
# or score >= 2.0 (strong match), whichever captures more.
top_score = results[0].get("_score", 0.0)
threshold = max(top_score * 0.5, 2.0) if top_score >= 2.0 else 0.0
relevant = [r for r in results if r.get("_score", 0.0) >= threshold] or results[:1]
quran_hits = [r for r in relevant if r.get("type") == "quran"]
hadith_hits = [r for r in relevant if r.get("type") == "hadith"]
lines: list[str] = []
# --- Quran results ---
if quran_hits:
if len(quran_hits) == 1:
r = quran_hits[0]
surah_ar = r.get("surah_name_ar", "")
surah_en = r.get("surah_name_en", "")
verse_num = r.get("verse_number", "")
if lang == "arabic":
lines.append(f"هذه الآية الكريمة من سورة {surah_ar} ({surah_en})، الآية {verse_num}.")
else:
lines.append(f"This noble verse is from Surah {surah_en} ({surah_ar}), verse {verse_num}.")
else:
if lang == "arabic":
refs = [
f"سورة {r.get('surah_name_ar', '')} ({r.get('surah_name_en', '')})، الآية {r.get('verse_number', '')}"
for r in quran_hits
]
lines.append(f"هذا النص الكريم ذُكِر في {len(quran_hits)} مواضع:")
for ref in refs:
lines.append(f" • {ref}")
else:
refs = [
f"Surah {r.get('surah_name_en', '')} ({r.get('surah_name_ar', '')}), verse {r.get('verse_number', '')}"
for r in quran_hits
]
lines.append(f"This text appears in {len(quran_hits)} places:")
for ref in refs:
lines.append(f" • {ref}")
for r in quran_hits:
ar_text = r.get("arabic", "")
en_text = r.get("english", "")
surah_ar = r.get("surah_name_ar", "")
surah_en = r.get("surah_name_en", "")
surah_num = r.get("surah_number", "")
verse_num = r.get("verse_number", "")
lines.append("")
lines.append("┌─────────────────────────────────────────────┐")
lines.append(f"│ ❝ {ar_text} ❞")
lines.append(f"│ 📝 Translation: {en_text}")
lines.append(f"│ 📖 Source: سورة {surah_ar} ({surah_en}) | رقم السورة: {surah_num} | الآية: {verse_num}")
lines.append("└─────────────────────────────────────────────┘")
# --- Hadith results ---
if hadith_hits:
for r in hadith_hits:
ar_text = r.get("arabic", "")
en_text = r.get("english", "")
source = r.get("source") or r.get("reference", "")
grade = r.get("grade", "")
grade_str = f" [{grade}]" if grade else ""
if lang == "arabic":
lines.append(f"\nالحديث الشريف{grade_str}:")
else:
lines.append(f"\nHadith{grade_str}:")
lines.append("")
lines.append("┌─────────────────────────────────────────────┐")
lines.append(f"│ ❝ {ar_text} ❞")
lines.append(f"│ 📝 Translation: {en_text}")
lines.append(f"│ 📖 Source: {source}")
lines.append("└─────────────────────────────────────────────┘")
if lang == "arabic":
lines.append("\nوالله أعلم.")
else:
lines.append("\nAnd Allah knows best.")
return "\n".join(lines)
def _surah_info_fallback(info: dict, lang: str) -> str:
"""Generate a direct answer from surah metadata when LLM fails."""
name_ar = info.get("surah_name_ar", "")
name_en = info.get("surah_name_en", "")
number = info.get("surah_number", "")
verses = info.get("total_verses", "")
rev = info.get("revelation_type", "")
translit = info.get("surah_name_transliteration", "")
rev_ar = "مكية" if rev == "meccan" else "مدنية" if rev == "medinan" else rev
rev_en = rev.capitalize() if rev else ""
if lang == "arabic":
return (
f"سورة {name_ar} ({translit}) هي السورة رقم {number} في القرآن الكريم.\n"
f"عدد آياتها: {verses} آية.\n"
f"نوعها: {rev_ar}.\n"
f"والله أعلم."
)
return (
f"Surah {name_en} ({translit} / {name_ar}) is surah number {number} "
f"in the Holy Quran.\n"
f"Total verses: {verses}.\n"
f"Revelation type: {rev_en}.\n"
f"And Allah knows best."
)
# ═══════════════════════════════════════════════════════════════════════
# POST-GENERATION HALLUCINATION CHECK
# ═══════════════════════════════════════════════════════════════════════
_QUOTE_RE = re.compile(r"❝\s*(.+?)\s*❞", re.DOTALL)
_SURAH_REF_AR = re.compile(
r"(?:سورة|سوره)\s+([\u0600-\u06FF\u0750-\u077F\s]+?)[\s,،]*"
r"(?:الآية|آية|الايه|ايه)?\s*(\d+)",
)
_SURAH_REF_EN = re.compile(
r"(?:surah|sura)\s+([A-Za-z\-' ]+?)[\s,]*"
r"(?:ayah|verse|ayat)?\s*(\d+)",
re.I,
)
_SURAH_NUM_REF = re.compile(r"\b(\d{1,3})\s*:\s*(\d{1,3})\b")
def _build_valid_refs(results: list) -> set:
"""Build a set of valid (surah_number, verse_number) tuples from results."""
refs = set()
for r in results:
if r.get("type") == "quran":
sn = r.get("surah_number")
vn = r.get("verse_number")
if sn and vn:
refs.add((int(sn), int(vn)))
return refs
def _build_surah_name_map(results: list) -> dict:
"""Build a map from normalized surah names to surah numbers."""
name_map = {}
for r in results:
if r.get("type") == "quran":
sn = r.get("surah_number")
if sn:
sn = int(sn)
for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"):
name = r.get(field, "").strip().lower()
name = re.sub(r"^(ال|al[\-\s']*)", "", name).strip()
if name:
name_map[name] = sn
return name_map
def _verify_references(answer: str, results: list) -> str:
"""Check that surah/verse references in the answer match retrieved results.
Replaces hallucinated references with corrected ones or warnings.
"""
valid_refs = _build_valid_refs(results)
if not valid_refs:
return answer # No quran results — nothing to verify
name_map = _build_surah_name_map(results)
# Check numeric references like "16:53"
def _check_num_ref(m: re.Match) -> str:
sn, vn = int(m.group(1)), int(m.group(2))
if (sn, vn) in valid_refs:
return m.group(0)
# Check if any valid ref exists — if so, the LLM hallucinated a different ref
logger.warning("Hallucinated reference: %d:%d not in sources", sn, vn)
# Find closest valid reference to suggest
if len(valid_refs) == 1:
correct = next(iter(valid_refs))
return f"{correct[0]}:{correct[1]}"
return m.group(0) # Multiple refs — can't auto-correct
answer = _SURAH_NUM_REF.sub(_check_num_ref, answer)
# Check Arabic surah name references like "سورة إبراهيم، الآية 7"
def _check_ar_ref(m: re.Match) -> str:
raw_name = m.group(1).strip()
verse_num = int(m.group(2))
name_norm = re.sub(r"^(ال)", "", raw_name).strip().lower()
matched_sn = name_map.get(name_norm)
if matched_sn and (matched_sn, verse_num) in valid_refs:
return m.group(0) # Valid reference
# Check if the combined reference is wrong
for (sn, vn) in valid_refs:
# Find the correct surah name for this ref
for r in results:
if r.get("type") == "quran" and int(r.get("surah_number", 0)) == sn and int(r.get("verse_number", 0)) == vn:
correct_name = r.get("surah_name_ar", "")
logger.warning(
"Hallucinated reference: سورة %s آية %d -> correcting to سورة %s آية %d",
raw_name, verse_num, correct_name, vn,
)
return f"سورة {correct_name}، الآية {vn}"
return m.group(0)
answer = _SURAH_REF_AR.sub(_check_ar_ref, answer)
# Check English surah name references
def _check_en_ref(m: re.Match) -> str:
raw_name = m.group(1).strip()
verse_num = int(m.group(2))
name_norm = re.sub(r"^(al[\-\s']*)", "", raw_name, flags=re.I).strip().lower()
matched_sn = name_map.get(name_norm)
if matched_sn and (matched_sn, verse_num) in valid_refs:
return m.group(0)
for (sn, vn) in valid_refs:
for r in results:
if r.get("type") == "quran" and int(r.get("surah_number", 0)) == sn and int(r.get("verse_number", 0)) == vn:
correct_name = r.get("surah_name_en", "")
logger.warning(
"Hallucinated reference: Surah %s verse %d -> correcting to Surah %s verse %d",
raw_name, verse_num, correct_name, vn,
)
return f"Surah {correct_name}, verse {vn}"
return m.group(0)
answer = _SURAH_REF_EN.sub(_check_en_ref, answer)
return answer
def _verify_citations(answer: str, results: list) -> str:
"""Check that quoted Arabic text in the answer actually appears in retrieved results.
If a quoted block doesn't match any source, replace it with a warning.
This prevents the model from fabricating hadith or verse text.
"""
source_texts_raw = []
for r in results:
for field in ("arabic", "english", "text"):
val = r.get(field, "")
if val:
source_texts_raw.append(re.sub(r"\s+", " ", val.strip()))
# Pre-compute normalized versions for diacritics-insensitive comparison
source_texts_norm = [normalize_arabic(s) for s in source_texts_raw]
def _check_quote(m: re.Match) -> str:
quoted_raw = re.sub(r"\s+", " ", m.group(1).strip())
quoted_norm = normalize_arabic(quoted_raw)
if len(quoted_norm) < 10:
return m.group(0) # too short to verify
for src_raw, src_norm in zip(source_texts_raw, source_texts_norm):
# 1. Exact substring match (raw — preserves diacritics)
if quoted_raw in src_raw or src_raw in quoted_raw:
return m.group(0)
# 2. Normalized substring match (strips diacritics/punctuation)
if quoted_norm in src_norm or src_norm in quoted_norm:
return m.group(0)
# 3. Word overlap on normalized text (≥50% of quoted words found)
q_words = set(quoted_norm.split())
s_words = set(src_norm.split())
if q_words and len(q_words & s_words) / len(q_words) >= 0.5:
return m.group(0)
# Quote not found in any source — flag it
logger.warning("Hallucination detected: quoted text not in sources: %.80s...", quoted_norm)
return "❝ ⚠️ [تم حذف نص غير موثق — النص غير موجود في قاعدة البيانات] ❞"
return _QUOTE_RE.sub(_check_quote, answer)
# ═══════════════════════════════════════════════════════════════════════
# HADITH GRADE INFERENCE
# ═══════════════════════════════════════════════════════════════════════
def _verify_surah_info(answer: str, surah_info: dict) -> str:
"""Verify and correct surah metadata in the LLM answer.
Replaces hallucinated surah names and verse counts with the correct
values from the authoritative surah_info lookup.
"""
if not surah_info:
return answer
correct_name_ar = surah_info.get("surah_name_ar", "")
correct_name_en = surah_info.get("surah_name_en", "")
correct_verses = surah_info.get("total_verses")
correct_number = surah_info.get("surah_number")
correct_type = surah_info.get("revelation_type", "")
correct_translit = surah_info.get("surah_name_transliteration", "")
correct_ar_norm = normalize_arabic(correct_name_ar).lower()
correct_ar_bare = re.sub(r"^ال", "", correct_ar_norm).strip()
# Words that can follow "سورة" but aren't surah names
_NOT_SURAH_NAMES = {
"مكية", "مكي", "مدنية", "مدني", "باللغة", "من", "في", "هي",
"التي", "الكريمة", "المباركة", "هذه", "تلك",
}
_NOT_SURAH_NAMES_NORM = {normalize_arabic(w).lower() for w in _NOT_SURAH_NAMES}
# ── Fix wrong surah names ───────────────────────────────────────
# Match "سورة <name>" — capture one Arabic word (letters + diacritics only,
# excluding Arabic punctuation like ، ؛ ؟ which sit in U+060C-U+061F).
def _fix_surah_name_ar(m: re.Match) -> str:
found_name = m.group(1).strip()
found_norm = normalize_arabic(found_name).lower()
found_bare = re.sub(r"^ال", "", found_norm).strip()
# Skip non-surah-name words (check both raw and normalized)
if found_name in _NOT_SURAH_NAMES or found_norm in _NOT_SURAH_NAMES_NORM:
return m.group(0)
if found_bare == correct_ar_bare or found_norm == correct_ar_norm:
return m.group(0) # already correct
# Handle 2-word capture where 1st word is the correct surah name
# (e.g., "النحل من" starts with "النحل")
if found_bare.startswith(correct_ar_bare) or found_norm.startswith(correct_ar_norm):
return m.group(0) # already correct
logger.warning(
"Surah info hallucination: سورة %s -> correcting to سورة %s",
found_name, correct_name_ar,
)
return m.group(0).replace(found_name, correct_name_ar)
# Use \u0621-\u06FF to capture Arabic letters/diacritics but exclude
# Arabic punctuation (،؛؟ etc. at U+060C-U+061F). Allow an optional
# second word for 2-word names like آل عمران.
answer = re.sub(
r"(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F]+(?:\s[\u0621-\u06FF\u0750-\u077F]+)?)"
r"(?=[\s,،؛؟\.\n?!]|$)",
_fix_surah_name_ar,
answer,
)
# Fix English surah names: "Surah <name>"
if correct_name_en:
def _fix_surah_name_en(m: re.Match) -> str:
found = m.group(1).strip()
if found.lower() == correct_name_en.lower():
return m.group(0)
# Also allow transliteration match
if correct_translit and found.lower() == correct_translit.lower():
return m.group(0)
logger.warning(
"Surah info hallucination: Surah %s -> correcting to Surah %s",
found, correct_name_en,
)
return m.group(0).replace(found, correct_name_en)
answer = re.sub(
r"(?:Surah|sura)\s+([A-Za-z'\-]+(?:[\s\-][A-Za-z'\-]+)*)",
_fix_surah_name_en,
answer,
flags=re.I,
)
# ── Fix wrong surah numbers ─────────────────────────────────────
if correct_number is not None:
def _fix_surah_number(m: re.Match) -> str:
num = int(m.group(2))
if num == correct_number:
return m.group(0)
logger.warning(
"Surah info hallucination: surah number %d -> correcting to %d",
num, correct_number,
)
return m.group(1) + " " + str(correct_number)
# Arabic: "رقم X" / "رقمها X" / "ترتيبها X"
answer = re.sub(
r"(رقم[ها]*|ترتيب[ها]*)\s+(\d+)",
_fix_surah_number,
answer,
)
# English: "surah number X", "number X"
answer = re.sub(
r"((?:surah\s+)?number)\s+(\d+)",
_fix_surah_number,
answer,
flags=re.I,
)
# ── Fix wrong verse counts ──────────────────────────────────────
if correct_verses is not None:
def _fix_verse_count(m: re.Match) -> str:
num = int(m.group(1))
if num == correct_verses:
return m.group(0)
logger.warning(
"Surah info hallucination: %d verses -> correcting to %d",
num, correct_verses,
)
return m.group(0).replace(m.group(1), str(correct_verses))
# Arabic: "34 آية" / "34 آيات"
answer = re.sub(
r"(\d+)\s*(?:آية|آيات|آيه)",
_fix_verse_count,
answer,
)
# "الآية 34" used as count context (after "عدد" or near "آيات")
answer = re.sub(
r"(الآية|الايه)\s+(\d+)",
lambda m: m.group(1) + " " + (str(correct_verses) if int(m.group(2)) != correct_verses else m.group(2)),
answer,
)
# English: "34 verses" / "has 34 verses"
answer = re.sub(
r"(\d+)\s+(?:verses|ayat|ayahs)",
_fix_verse_count,
answer,
flags=re.I,
)
# "عددها 34" / "عدد 34"
answer = re.sub(
r"(عدد[ها]*\s+)(\d+)",
lambda m: m.group(1) + (str(correct_verses) if int(m.group(2)) != correct_verses else m.group(2)),
answer,
)
return answer
def infer_hadith_grade(item: dict) -> dict:
"""Infer hadith grade from collection name if not present."""
if item.get("type") != "hadith" or item.get("grade"):
return item
collection = item.get("collection", "").lower()
reference = item.get("reference", "").lower()
combined = f"{collection} {reference}"
if any(s in combined for s in ["sahih al-bukhari", "sahih bukhari", "bukhari"]):
item["grade"] = "Sahih"
elif any(s in combined for s in ["sahih muslim", "sahih al-muslim"]):
item["grade"] = "Sahih"
elif any(s in combined for s in ["sunan an-nasai", "sunan an-nasa", "nasa'i", "nasa"]):
item["grade"] = "Sahih"
elif any(s in combined for s in ["jami at-tirmidhi", "tirmidhi", "at-tirmidhi"]):
item["grade"] = "Hasan"
elif any(s in combined for s in ["sunan abu dawood", "abu dawood", "abo daud", "abou daoude"]):
item["grade"] = "Hasan"
elif any(s in combined for s in ["sunan ibn majah", "ibn majah", "ibn maja"]):
item["grade"] = "Hasan"
elif any(s in combined for s in ["muwatta malik", "muwatta", "malik"]):
item["grade"] = "Hasan"
elif any(s in combined for s in ["musnad ahmad", "ahmad", "ahmed"]):
item["grade"] = "Hasan/Sahih"
elif any(s in combined for s in ["sunan al-darimi", "darimi", "al-darimi"]):
item["grade"] = "Hasan"
return item
# ═══════════════════════════════════════════════════════════════════════
# APP STATE
# ═══════════════════════════════════════════════════════════════════════
class AppState:
embed_model: Optional[SentenceTransformer] = None
faiss_index: Optional[faiss.Index] = None
dataset: Optional[list] = None
llm: Optional[LLMProvider] = None
ready: bool = False
state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize state on startup."""
logger.info("Loading embed model: %s", cfg.EMBED_MODEL)
state.embed_model = SentenceTransformer(cfg.EMBED_MODEL)
logger.info("Loading FAISS index: %s", cfg.FAISS_INDEX)
state.faiss_index = faiss.read_index(cfg.FAISS_INDEX)
logger.info("Loading metadata: %s", cfg.METADATA_FILE)
with open(cfg.METADATA_FILE, "r", encoding="utf-8") as f:
state.dataset = json.load(f)
state.dataset = [infer_hadith_grade(item) for item in state.dataset]
logger.info("Initializing LLM provider: %s", cfg.LLM_BACKEND)
state.llm = get_llm_provider()
state.ready = True
logger.info(
"QModel v6 ready | backend=%s | dataset=%d | faiss=%d | threshold=%.2f",
cfg.LLM_BACKEND,
len(state.dataset) if state.dataset else 0,
state.faiss_index.ntotal if state.faiss_index else 0,
cfg.CONFIDENCE_THRESHOLD,
)
yield
state.ready = False
logger.info("QModel shutdown")
def check_ready():
"""Raise 503 if service isn't ready."""
if not state.ready:
raise HTTPException(
status_code=503,
detail="Service is still initialising. Please retry shortly.",
)
# ═══════════════════════════════════════════════════════════════════════
# CORE RAG PIPELINE
# ═══════════════════════════════════════════════════════════════════════
async def run_rag_pipeline(
question: str,
top_k: int = cfg.TOP_K_RETURN,
source_type: Optional[Literal["quran", "hadith"]] = None,
grade_filter: Optional[str] = None,
) -> dict:
"""Core RAG pipeline: rewrite -> search -> verify -> generate."""
t0 = time.perf_counter()
# 1. Query rewriting
rewrite = await rewrite_query(question, state.llm)
intent = rewrite.get("intent", "general")
# 2. Concurrent: surah info + analysis intent + hybrid search + text search
surah_task = detect_surah_info(question, rewrite)
kw_task = detect_analysis_intent(question, rewrite)
search_task = hybrid_search(
question, rewrite,
state.embed_model, state.faiss_index, state.dataset,
top_k, source_type, grade_filter,
)
surah_det, analysis_kw, results = await asyncio.gather(
surah_task, kw_task, search_task,
)
# 2b. Direct reference lookup — catches Quran/Hadith references early.
direct_queries = list(dict.fromkeys([
question,
rewrite.get("ar_query", ""),
rewrite.get("en_query", ""),
]))
direct_results = []
if source_type in (None, "quran"):
for query in direct_queries:
direct_results.extend(lookup_quran_verses(query, state.dataset, limit=top_k))
if source_type in (None, "hadith"):
for query in direct_queries:
direct_results.extend(lookup_hadith_references(query, state.dataset, limit=top_k))
results = merge_search_results(direct_results, results, limit=top_k)
# 2c. Text search fallback — catches exact matches missed by FAISS.
# For auth/hadith/tafsir intents, also search with the rewritten ar_query
# which should contain the actual text fragment to look up.
ar_q = rewrite.get("ar_query", "")
# Determine text search source filter based on intent
text_src = source_type
if not text_src and intent in ("tafsir", "count", "surah_info"):
text_src = "quran"
elif not text_src and intent in ("hadith", "auth"):
text_src = "hadith"
text_limit = top_k * 2 if intent in ("auth", "hadith", "tafsir") else top_k
text_results = []
for q in dict.fromkeys([ar_q, question]): # deduplicated, ar_query first
if not q:
continue
for hit in text_search(q, state.dataset, text_src, limit=text_limit):
# Boost text search hits for auth intent (exact text match is crucial)
if intent == "auth" and hit.get("_score", 0) > 2.0:
hit = {**hit, "_score": hit["_score"] + 1.0}
text_results.append(hit)
results = merge_search_results(results, text_results, limit=top_k)
# 3a. Surah metadata lookup
surah_info = None
if surah_det:
surah_info = await lookup_surah_info(surah_det["surah_query"], state.dataset)
if surah_info:
intent = "surah_info"
logger.info(
"Surah info: %s -> %s (%d verses)",
surah_det["surah_query"],
surah_info["surah_name_en"],
surah_info.get("total_verses", 0),
)
# 3b. Word frequency count
analysis = None
if analysis_kw and not surah_info:
count_src = "hadith" if intent in ("hadith", "auth") else "quran"
analysis = await count_occurrences(analysis_kw, state.dataset, source_type=count_src)
logger.info("Analysis: kw=%s src=%s count=%d", analysis_kw, count_src, analysis["total_count"])
# 4. Language detection
lang = detect_language(question)
top_score = results[0].get("_score", 0.0) if results else 0.0
logger.info(
"Search done | intent=%s | top_score=%.3f | threshold=%.2f",
intent, top_score, cfg.CONFIDENCE_THRESHOLD,
)
# 5. Confidence gate (skip for surah_info)
if not surah_info and top_score < cfg.CONFIDENCE_THRESHOLD:
logger.warning(
"Low confidence (%.3f < %.2f) — returning safe fallback",
top_score, cfg.CONFIDENCE_THRESHOLD,
)
return {
"answer": not_found_answer(lang),
"language": lang,
"intent": intent,
"analysis": analysis,
"sources": results,
"top_score": top_score,
"latency_ms": int((time.perf_counter() - t0) * 1000),
}
# 5b. Surah metadata: deterministic answer (faster & more reliable than LLM)
if surah_info and intent == "surah_info":
answer = _surah_info_fallback(surah_info, lang)
latency = int((time.perf_counter() - t0) * 1000)
logger.info(
"Pipeline done (surah_info deterministic) | lang=%s | %d ms",
lang, latency,
)
return {
"answer": answer,
"language": lang,
"intent": intent,
"analysis": None,
"sources": results,
"top_score": top_score,
"latency_ms": latency,
}
# 6. Build context + prompt + LLM call
context = build_context(results)
messages = build_messages(context, question, lang, intent, analysis, surah_info)
try:
answer = await state.llm.chat(
messages,
max_tokens=cfg.MAX_TOKENS,
temperature=cfg.TEMPERATURE,
)
# Strip residual <think> blocks that Qwen3 may emit
answer = re.sub(r"<think>[\s\S]*?</think>", "", answer, flags=re.IGNORECASE)
answer = re.sub(r"<think>[\s\S]*$", "", answer, flags=re.IGNORECASE)
answer = answer.strip()
if not answer:
logger.warning("LLM returned empty answer — using results fallback")
if surah_info:
answer = _surah_info_fallback(surah_info, lang)
elif results:
answer = _results_fallback(results, lang)
else:
answer = not_found_answer(lang)
except Exception as exc:
logger.error("LLM call failed: %s", exc)
raise HTTPException(status_code=502, detail="LLM service unavailable")
# 7. Post-generation hallucination check
answer = _verify_citations(answer, results)
answer = _verify_references(answer, results)
if surah_info:
answer = _verify_surah_info(answer, surah_info)
latency = int((time.perf_counter() - t0) * 1000)
logger.info(
"Pipeline done | intent=%s | lang=%s | top_score=%.3f | %d ms",
intent, lang, top_score, latency,
)
return {
"answer": answer,
"language": lang,
"intent": intent,
"analysis": analysis,
"sources": results,
"top_score": top_score,
"latency_ms": latency,
}