"""Hybrid search engine — dense FAISS + BM25 re-ranking + text search.""" from __future__ import annotations import json import logging import re from collections import Counter from itertools import chain from difflib import SequenceMatcher from typing import Dict, List, Literal, Optional import faiss import numpy as np from sentence_transformers import SentenceTransformer from app.arabic_nlp import light_stem, normalize_arabic, tokenize_ar from app.cache import rewrite_cache, search_cache from app.config import cfg from app.llm import LLMProvider logger = logging.getLogger("qmodel.search") # ═══════════════════════════════════════════════════════════════════════ # QUERY REWRITING # ═══════════════════════════════════════════════════════════════════════ _REWRITE_SYSTEM = """\ You are an Islamic-scholarship search query optimizer. Your ONLY job: rewrite the user's question to maximise retrieval from a Quranic + Hadith dataset. Reply ONLY with a valid JSON object — no markdown, no preamble: { "ar_query": "", "en_query": "", "keywords": ["<3-7 key Arabic or English terms from the question>"], "intent": "" } Intent Detection Rules (CRITICAL): - 'tafsir' intent = looking up Quranic verse(s) by partial text, topic, word, or asking about meaning (ابحث عن آية, find verse, ما تفسير, verse about X, آية عن, الآية التي فيها, verse that says) IMPORTANT: When user provides Arabic verse text to find, put that text in ar_query verbatim. - 'hadith' intent = looking up Hadith by text, topic, or asking about meaning (NOT authenticity) (ابحث عن حديث, find hadith, hadith about, حديث عن, ما معنى حديث, hadith that says) IMPORTANT: When user provides Arabic hadith text to find, put that text in ar_query verbatim. - 'auth' intent = asking about Hadith authenticity/grade/verification (صحيح؟, هل صحيح, is it authentic, verify hadith, درجة الحديث, is this hadith real, هل هذا حديث صحيح) IMPORTANT: Include the hadith text fragment in ar_query for matching. - 'surah_info' intent = asking about surah metadata: verse count, revelation type, surah number (كم عدد آيات سورة, كم آية في سورة, how many verses in surah, is surah X meccan/medinan) - 'count' intent = asking for WORD frequency/occurrence count (كم مرة ذُكرت كلمة, how many times is word X mentioned, عدد مرات ذكر كلمة) NOTE: "كم عدد آيات سورة" is surah_info NOT count! IMPORTANT: The word being counted MUST be the first keyword. - 'general' intent = other Islamic questions Rewriting Rules: - For verse/hadith text lookups: include the EXACT Arabic text fragment in ar_query - For topic searches: expand the topic with Arabic synonyms and related terms in keywords - For word frequency: extract the EXACT keyword being counted as the FIRST keyword - keywords MUST include core Arabic terms for matching (e.g. صبر, رحمة, صلاة) Examples: - "ابحث عن الآية التي فيها إنا أعطيناك الكوثر" → intent: tafsir, ar_query: "إنا أعطيناك الكوثر" - "Find the verse about patience" → intent: tafsir, keywords: ["صبر", "patience", "الصبر"] - "ما الآية التي تتحدث عن الصدقة" → intent: tafsir, keywords: ["صدقة", "الصدقة", "إنفاق"] - "كم عدد آيات سورة آل عمران" → intent: surah_info - "كم آية في سورة البقرة" → intent: surah_info - "how many verses in surah al-baqara" → intent: surah_info - "هل سورة الفاتحة مكية أم مدنية" → intent: surah_info - "كم مرة ذُكرت كلمة مريم في القرآن" → intent: count, keywords: ["مريم", ...] - "how many times is mercy mentioned in Quran" → intent: count, keywords: ["رحمة", "mercy", "الرحمة"] - "هل حديث إنما الأعمال بالنيات صحيح" → intent: auth, ar_query: "إنما الأعمال بالنيات" - "is the hadith about actions by intentions authentic" → intent: auth, keywords: ["إنما الأعمال بالنيات", "actions", "intentions"] - "ما معنى حديث إنما الأعمال" → intent: hadith, ar_query: "إنما الأعمال" - "ابحث عن حديث عن الصبر" → intent: hadith, keywords: ["صبر", "الصبر", "patience"] - "find hadith about fasting" → intent: hadith, keywords: ["صيام", "صوم", "fasting"] - "ما حكم الربا في الإسلام" → intent: general, keywords: ["ربا", "الربا", "usury"] - "هل الحديث ده صحيح: من كان يؤمن بالله" → intent: auth, ar_query: "من كان يؤمن بالله" """ _QURAN_REF_NUMERIC = re.compile(r"\b(\d{1,3})\s*:\s*(\d{1,3})\b") _QURAN_REF_AR_NAME_FIRST = re.compile( r"(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F\s]+?)" r"(?:\s+(?:الآية|آية|الايه|ايه)\s*|[\s,:،-]+)(\d{1,3})", ) _QURAN_REF_AR_VERSE_FIRST = re.compile( r"(?:الآية|آية|الايه|ايه)\s*(\d{1,3})\s+(?:من|في)\s+(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F\s]+)", ) _QURAN_REF_EN_NAME_FIRST = re.compile( r"(?:surah|sura)\s+([A-Za-z'\- ]+?)" r"(?:\s+(?:ayah|verse|ayat)\s*|[\s,:-]+)(\d{1,3})", re.I, ) _QURAN_REF_EN_VERSE_FIRST = re.compile( r"(?:ayah|verse|ayat)\s*(\d{1,3})\s+(?:of|in)\s+(?:surah|sura)\s+([A-Za-z'\- ]+)", re.I, ) _COLLECTION_ALIASES = { "sahih al-bukhari": "Sahih al-Bukhari", "sahih bukhari": "Sahih al-Bukhari", "al bukhari": "Sahih al-Bukhari", "bukhari": "Sahih al-Bukhari", "البخاري": "Sahih al-Bukhari", "صحيح البخاري": "Sahih al-Bukhari", "sahih muslim": "Sahih Muslim", "muslim": "Sahih Muslim", "مسلم": "Sahih Muslim", "صحيح مسلم": "Sahih Muslim", "sunan an nasai": "Sunan an-Nasai", "sunan an-nasai": "Sunan an-Nasai", "nasai": "Sunan an-Nasai", "nasa'i": "Sunan an-Nasai", "نسائي": "Sunan an-Nasai", "النسائي": "Sunan an-Nasai", "sunan abu dawood": "Sunan Abu Dawood", "sunan abu dawood": "Sunan Abu Dawood", "abu dawood": "Sunan Abu Dawood", "abu dawood": "Sunan Abu Dawood", "أبو داود": "Sunan Abu Dawood", "ابو داود": "Sunan Abu Dawood", "jami at tirmidhi": "Jami' at-Tirmidhi", "jami at-tirmidhi": "Jami' at-Tirmidhi", "tirmidhi": "Jami' at-Tirmidhi", "الترمذي": "Jami' at-Tirmidhi", "ترمذي": "Jami' at-Tirmidhi", "sunan ibn majah": "Sunan Ibn Majah", "ibn majah": "Sunan Ibn Majah", "ابن ماجه": "Sunan Ibn Majah", "sunan al darimi": "Sunan al-Darimi", "sunan al-darimi": "Sunan al-Darimi", "darimi": "Sunan al-Darimi", "الدارمي": "Sunan al-Darimi", "muwatta malik": "Muwatta Malik", "muwatta": "Muwatta Malik", "موطأ مالك": "Muwatta Malik", "موطا مالك": "Muwatta Malik", "malik": "Muwatta Malik", "musnad ahmad": "Musnad Ahmad", "ahmad": "Musnad Ahmad", "ahmed": "Musnad Ahmad", "أحمد": "Musnad Ahmad", "مسند أحمد": "Musnad Ahmad", } _SORTED_COLLECTION_ALIASES = sorted(_COLLECTION_ALIASES.items(), key=lambda item: len(item[0]), reverse=True) def _find_balanced_json(text: str) -> Optional[str]: """Find the first balanced {...} in *text*; return it or None.""" start = text.find("{") if start == -1: return None depth = 0 in_string = False escaped = False for idx in range(start, len(text)): ch = text[idx] if escaped: escaped = False continue if ch == "\\": escaped = True continue if ch == '"': in_string = not in_string continue if in_string: continue if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: return text[start:idx + 1] return None def _extract_json_object(text: str) -> Optional[str]: """Extract the first balanced JSON object from model output.""" if not text: return None # Remove common wrappers some models add around structured responses. cleaned = re.sub(r"```(?:json)?", "", text, flags=re.IGNORECASE) cleaned = cleaned.replace("```", "") # Strip closed blocks cleaned = re.sub(r"[\s\S]*?", "", cleaned, flags=re.IGNORECASE) cleaned = cleaned.strip() # Before stripping unclosed (which removes everything after it), # check if a JSON object exists anywhere in the remaining text — # the model may have emitted JSON inside a truncated think block. if "{" not in cleaned: # No JSON at all, strip unclosed think and give up cleaned = re.sub(r"[\s\S]*$", "", cleaned, flags=re.IGNORECASE).strip() if not cleaned: return None else: # Try to extract JSON first; only strip unclosed if that fails candidate = _find_balanced_json(cleaned) if candidate: return candidate cleaned = re.sub(r"[\s\S]*$", "", cleaned, flags=re.IGNORECASE).strip() if not cleaned: return None if cleaned.startswith("{") and cleaned.endswith("}"): return cleaned return _find_balanced_json(cleaned) def _detect_intent_regex(query: str) -> str: """Detect intent from raw query using regex when LLM rewrite is unavailable.""" # surah_info: asking about surah metadata (verse count, type, etc.) if re.search( r"كم\s+(?:عدد\s+)?آيات?|عدد\s+آيات?|كم\s+آية|how many\s+verses?|number of\s+verses?", query, re.I, ): return "surah_info" if re.search( r"(?:هل|ما\s+نوع)\s+(?:سورة|سوره)\s+.+\s+(?:مكية|مدنية)", query, ): return "surah_info" # count: word frequency if re.search( r"كم مرة|كم تكرر|عدد مرات|تكرار|كم ذُكر|كم وردت?", query, ): return "count" if re.search( r"\b(how many times?|count|frequency|occurrences? of)\b", query, re.I, ): return "count" # auth: hadith authenticity check if re.search( r"صحيح[؟?]|هل صحيح|درجة الحديث|هل هذا حديث|is.+authentic|verify hadith|hadith.+grade", query, re.I, ): return "auth" # hadith: hadith lookup (check before tafsir since both can match Arabic text) if re.search( r"حديث\s+عن|ابحث عن حديث|ما معنى حديث|find hadith|hadith about", query, re.I, ): return "hadith" # tafsir: Quranic verse lookup — Arabic verse text or explicit tafsir request if re.search( r"ابحث عن آية|ما تفسير|تفسير آية|آية عن|الآية التي|find verse|verse about|tafsir", query, re.I, ): return "tafsir" # If query contains substantial Arabic with Quranic markers (diacritics, special chars), # treat as tafsir (verse text lookup) ar_chars = len(re.findall(r"[\u0600-\u06FF]", query)) diacritics = len(re.findall(r"[\u064B-\u0655\u0670\u06D6-\u06ED\u06E1-\u06E9\u0610-\u061A]", query)) if ar_chars > 10 and diacritics >= 3: return "tafsir" return "general" async def rewrite_query(raw: str, llm: LLMProvider) -> Dict: """Rewrite query for better retrieval.""" cached = await rewrite_cache.get(raw) if cached: return cached detected_intent = _detect_intent_regex(raw) fallback = { "ar_query": normalize_arabic(raw), "en_query": raw, "keywords": [light_stem(t) for t in tokenize_ar(raw)][:7], "intent": detected_intent, } try: messages = [ {"role": "system", "content": _REWRITE_SYSTEM}, {"role": "user", "content": raw}, ] text = "" for _attempt, temp in enumerate((0.0, 0.3)): text = await llm.chat( messages=messages, max_tokens=1024, temperature=temp, ) if text.strip(): break logger.warning("Empty rewrite response (attempt %d), retrying with temperature=%.1f", _attempt + 1, 0.3) logger.debug("Raw rewrite response (%d chars): %.300s", len(text), text) json_payload = _extract_json_object(text) if not json_payload: raise ValueError( f"Model did not return a JSON object (got {len(text)} chars: {text[:120]!r})" ) result = json.loads(json_payload) for k in ("ar_query", "en_query", "keywords", "intent"): result.setdefault(k, fallback[k]) if not isinstance(result.get("keywords"), list): result["keywords"] = fallback["keywords"] else: result["keywords"] = [str(x).strip() for x in result["keywords"] if str(x).strip()][:7] if not result["keywords"]: result["keywords"] = fallback["keywords"] result["intent"] = str(result.get("intent") or fallback["intent"]).strip().lower() if result["intent"] == "fatwa": result["intent"] = "general" if result["intent"] not in {"tafsir", "hadith", "count", "surah_info", "auth", "general"}: result["intent"] = fallback["intent"] result["ar_query"] = str(result.get("ar_query") or fallback["ar_query"]).strip()[:400] result["en_query"] = str(result.get("en_query") or fallback["en_query"]).strip()[:400] await rewrite_cache.set(result, raw) logger.info("Rewrite: intent=%s ar=%s", result["intent"], result["ar_query"][:60]) return result except Exception as exc: logger.warning("Query rewrite failed (%s) — using fallback (intent=%s)", exc, fallback["intent"]) return fallback def result_key(item: dict) -> tuple: """Build a stable key for deduplicating search results.""" item_type = item.get("type", "") if item_type == "quran": return ( "quran", int(item.get("surah_number") or 0), int(item.get("ayah_number") or item.get("verse_number") or 0), ) if item_type == "hadith": return ( "hadith", normalize_arabic(item.get("collection", ""), aggressive=True).lower(), int(item.get("hadith_number") or 0), normalize_arabic(item.get("reference", ""), aggressive=True).lower(), ) return ( item_type, normalize_arabic(item.get("source") or item.get("reference", ""), aggressive=True).lower(), normalize_arabic(item.get("arabic", "")[:80], aggressive=True).lower(), item.get("english", "")[:80].lower(), ) def merge_search_results(*result_groups: list, limit: Optional[int] = None) -> list: """Merge multiple ranked result groups, deduplicating by stable content key.""" merged: dict[tuple, dict] = {} for item in chain.from_iterable(result_groups): key = result_key(item) current = merged.get(key) if current is None or item.get("_score", 0.0) > current.get("_score", 0.0): merged[key] = item results = sorted(merged.values(), key=lambda row: row.get("_score", 0.0), reverse=True) return results[:limit] if limit is not None else results def normalize_collection_name(text: str) -> Optional[str]: """Resolve a collection alias to the canonical dataset collection name.""" if not text: return None normalized = normalize_arabic(text, aggressive=True).lower() normalized = normalized.replace("_", " ") normalized = re.sub(r"[^a-z0-9\u0600-\u06FF\s'\-]+", " ", normalized) normalized = re.sub(r"\s+", " ", normalized).strip() for alias, canonical in _SORTED_COLLECTION_ALIASES: if alias in normalized: return canonical return None def filter_results_by_collection(results: list, collection: Optional[str]) -> list: """Filter hadith results by canonical or fuzzy collection name.""" if not collection: return list(results) canonical = normalize_collection_name(collection) collection_norm = normalize_arabic(collection, aggressive=True).lower().strip() filtered = [] for item in results: haystack = normalize_arabic( f"{item.get('collection', '')} {item.get('reference', '')}", aggressive=True, ).lower() if canonical and item.get("collection", "") == canonical: filtered.append(item) continue if collection_norm and collection_norm in haystack: filtered.append(item) return filtered def _surah_matches(item: dict, surah_query: str) -> bool: query_norm = normalize_arabic(surah_query, aggressive=True).lower().strip() query_clean = re.sub(r"^(ال|al[\-\s']*)", "", query_norm, flags=re.I).strip() for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"): value = item.get(field, "") if not value: continue value_norm = normalize_arabic(value, aggressive=True).lower().strip() value_clean = re.sub(r"^(ال|al[\-\s']*)", "", value_norm, flags=re.I).strip() if query_norm == value_norm or query_clean == value_clean: return True if query_clean and value_clean and (query_clean in value_clean or value_clean in query_clean): return True return False def lookup_quran_verses(query: str, dataset: list, limit: int = 5) -> list: """Resolve direct Quran references like 2:255 or Surah Al-Baqarah 255.""" if not query: return [] matches = [] numeric = _QURAN_REF_NUMERIC.search(query) if numeric: surah_num, verse_num = int(numeric.group(1)), int(numeric.group(2)) for item in dataset: if item.get("type") != "quran": continue if item.get("surah_number") == surah_num and (item.get("ayah_number") or item.get("verse_number")) == verse_num: matches.append({**item, "_score": 9.5}) return matches named_patterns = ( (_QURAN_REF_AR_NAME_FIRST, lambda m: (m.group(1), int(m.group(2)))), (_QURAN_REF_AR_VERSE_FIRST, lambda m: (m.group(2), int(m.group(1)))), (_QURAN_REF_EN_NAME_FIRST, lambda m: (m.group(1), int(m.group(2)))), (_QURAN_REF_EN_VERSE_FIRST, lambda m: (m.group(2), int(m.group(1)))), ) for pattern, extractor in named_patterns: match = pattern.search(query) if not match: continue surah_query, verse_num = extractor(match) for item in dataset: if item.get("type") != "quran": continue if _surah_matches(item, surah_query) and (item.get("ayah_number") or item.get("verse_number")) == verse_num: matches.append({**item, "_score": 9.0}) if matches: break return matches[:limit] def lookup_hadith_references(query: str, dataset: list, collection: Optional[str] = None, limit: int = 5) -> list: """Resolve direct hadith references like Bukhari 1 or مسلم 1907.""" if not query and not collection: return [] canonical_collection = normalize_collection_name(collection or "") or normalize_collection_name(query) number_match = re.search(r"\b(\d{1,5})\b", query) if not canonical_collection or not number_match: return [] hadith_number = int(number_match.group(1)) matches = [] for item in dataset: if item.get("type") != "hadith": continue if item.get("collection") != canonical_collection: continue if int(item.get("hadith_number") or 0) != hadith_number: continue matches.append({**item, "_score": 9.0}) return matches[:limit] # ═══════════════════════════════════════════════════════════════════════ # BM25 SCORING # ═══════════════════════════════════════════════════════════════════════ def _bm25_score( query_terms: List[str], doc_text: str, avg_dl: float, k1: float = 1.5, b: float = 0.75, ) -> float: """BM25 term-frequency scoring.""" doc_tokens = tokenize_ar(doc_text) dl = len(doc_tokens) tf = Counter(doc_tokens) score = 0.0 for term in query_terms: f = tf.get(term, 0) score += (f * (k1 + 1)) / (f + k1 * (1 - b + b * dl / max(avg_dl, 1))) return score # ═══════════════════════════════════════════════════════════════════════ # HYBRID SEARCH — dense FAISS + BM25 re-ranking + filtering # ═══════════════════════════════════════════════════════════════════════ async def hybrid_search( raw_query: str, rewrite: Dict, embed_model: SentenceTransformer, index: faiss.Index, dataset: list, top_n: int = cfg.TOP_K_RETURN, source_type: Optional[Literal["quran", "hadith"]] = None, grade_filter: Optional[str] = None, ) -> list: """Hybrid search: dense + sparse with optional filtering.""" cache_key = (raw_query, top_n, source_type, grade_filter) cached = await search_cache.get(*cache_key) if cached: return cached # ── 1. Dual-language dense retrieval ────────────────────────────── ar_q = "query: " + rewrite["ar_query"] en_q = "query: " + rewrite["en_query"] embeddings = embed_model.encode( [ar_q, en_q], normalize_embeddings=True, batch_size=2 ).astype("float32") fused = embeddings[0] + embeddings[1] fused /= np.linalg.norm(fused) distances, indices = index.search(fused.reshape(1, -1), cfg.TOP_K_SEARCH) # ── 2. De-duplicate candidates & apply filters ───────────────────── seen: set = set() candidates = [] for dist, idx in zip(distances[0], indices[0]): item_idx = int(idx) if item_idx not in seen and 0 <= item_idx < len(dataset): seen.add(item_idx) item = dataset[item_idx] if source_type and item.get("type") != source_type: continue if grade_filter and item.get("type") == "hadith": item_grade = item.get("grade", "").lower() if grade_filter.lower() not in item_grade: continue candidates.append({**item, "_dense": float(dist)}) if not candidates: return [] # ── 3. BM25 sparse scoring ───────────────────────────────────────── query_terms = [ light_stem(kw) for kw in rewrite.get("keywords", raw_query.split()) ] avg_dl = sum( len(tokenize_ar(c.get("arabic", "") + " " + c.get("english", ""))) for c in candidates ) / max(len(candidates), 1) for c in candidates: doc = c.get("arabic", "") + " " + c.get("english", "") c["_sparse"] = _bm25_score(query_terms, doc, avg_dl) # ── 3.5. Phrase matching boost for exact snippets ─────────────────── query_norm = normalize_arabic(raw_query, aggressive=False).lower() for c in candidates: if c.get("type") == "hadith": ar_norm = normalize_arabic(c.get("arabic", ""), aggressive=False).lower() query_fragments = query_norm.split() for i in range(len(query_fragments) - 2): phrase = " ".join(query_fragments[i:i+3]) if len(phrase) > 5 and phrase in ar_norm: c["_sparse"] += 2.0 break # ── 4. Score fusion ──────────────────────────────────────────────── α = cfg.RERANK_ALPHA intent = rewrite.get("intent", "general") if intent == "auth": α = 0.75 max_sparse = max((c["_sparse"] for c in candidates), default=1.0) or 1.0 for c in candidates: base_score = α * c["_dense"] + (1 - α) * c["_sparse"] / max_sparse if intent == "hadith" and c.get("type") == "hadith": base_score += cfg.HADITH_BOOST c["_score"] = base_score candidates.sort(key=lambda x: x["_score"], reverse=True) results = candidates[:top_n] await search_cache.set(results, *cache_key) return results # ═══════════════════════════════════════════════════════════════════════ # TEXT-BASED SEARCH (exact substring + fuzzy matching) # ═══════════════════════════════════════════════════════════════════════ def text_search( query: str, dataset: list, source_type: Optional[Literal["quran", "hadith"]] = None, limit: int = 10, ) -> list: """Search dataset by exact text match (Arabic or English). Returns items sorted by relevance: exact matches first, then partial. """ q_norm = normalize_arabic(query, aggressive=True).lower() q_lower = query.lower().strip() results = [] for item in dataset: if source_type and item.get("type") != source_type: continue ar_raw = item.get("arabic", "") en_raw = item.get("english", "") ar_norm = normalize_arabic(ar_raw, aggressive=True).lower() en_lower = en_raw.lower() score = 0.0 # Exact substring in raw Arabic (with diacritics) — highest priority if query.strip() in ar_raw: score = max(score, 5.0) # Exact substring in normalized Arabic if q_norm and q_norm in ar_norm: # Boost for shorter docs (more specific match) score = max(score, 3.0 + (1.0 / max(len(ar_norm), 1)) * 100) # Exact substring in English if q_lower and q_lower in en_lower: score = max(score, 2.0 + (1.0 / max(len(en_lower), 1)) * 100) # N-gram phrase matching for partial Arabic text (3+ word sequences) if score == 0.0 and q_norm: q_words = q_norm.split() if len(q_words) >= 3: # Check sliding windows of 3 words from query against doc for i in range(len(q_words) - 2): trigram = " ".join(q_words[i:i+3]) if trigram in ar_norm: score = max(score, 2.0 + (i == 0) * 0.5) break if score == 0.0 and len(q_words) >= 2: for i in range(len(q_words) - 1): bigram = " ".join(q_words[i:i+2]) if bigram in ar_norm or bigram in en_lower: score = max(score, 1.5) break # Word-level overlap for lower-confidence matches if score == 0.0: q_tokens = set(q_norm.split()) ar_tokens = set(ar_norm.split()) en_tokens = set(en_lower.split()) ar_overlap = len(q_tokens & ar_tokens) en_overlap = len(q_tokens & en_tokens) best_overlap = max(ar_overlap, en_overlap) if best_overlap >= max(2, len(q_tokens) * 0.5): score = best_overlap / max(len(q_tokens), 1) # Fuzzy similarity — catch 80%+ similar text (typos, slight differences) if score == 0.0 and len(q_norm) >= 10: q_len = len(q_norm) for text in (ar_norm, en_lower): if not text: continue # Only compare when lengths are comparable (within 3x) if len(text) > q_len * 3: continue ratio = SequenceMatcher(None, q_norm, text).ratio() if ratio >= 0.80: score = max(score, 1.0 + ratio) # 1.80–2.0 range if score > 0: results.append({**item, "_score": score}) results.sort(key=lambda x: x["_score"], reverse=True) return results[:limit] def build_context(results: list) -> str: """Format search results into context block for LLM.""" lines = [] for i, r in enumerate(results, 1): source = r.get("source") or r.get("reference") or "Unknown Source" item_type = "Quranic Verse" if r.get("type") == "quran" else "Hadith" grade_str = f" [Grade: {r.get('grade')}]" if r.get("grade") else "" block = ( f"[{i}] 📌 {item_type}{grade_str} | {source} | score: {r.get('_score', 0):.3f}\n" f" Arabic : {r.get('arabic', '')}\n" f" English: {r.get('english', '')}" ) # Add explicit structured metadata for Quran verses if r.get("type") == "quran": block += ( f"\n [REF] Surah: {r.get('surah_name_ar', '')} ({r.get('surah_name_en', '')}) " f"| Surah Number: {r.get('surah_number', '')} " f"| Ayah: {r.get('verse_number', '')}" ) lines.append(block) return "\n\n".join(lines)