QModel / app /search.py
aelgendy's picture
Upload folder using huggingface_hub
efd73f6 verified
"""Hybrid search engine — dense FAISS + BM25 re-ranking + text search."""
from __future__ import annotations
import json
import logging
import re
from collections import Counter
from itertools import chain
from difflib import SequenceMatcher
from typing import Dict, List, Literal, Optional
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from app.arabic_nlp import light_stem, normalize_arabic, tokenize_ar
from app.cache import rewrite_cache, search_cache
from app.config import cfg
from app.llm import LLMProvider
logger = logging.getLogger("qmodel.search")
# ═══════════════════════════════════════════════════════════════════════
# QUERY REWRITING
# ═══════════════════════════════════════════════════════════════════════
_REWRITE_SYSTEM = """\
You are an Islamic-scholarship search query optimizer.
Your ONLY job: rewrite the user's question to maximise retrieval from a Quranic + Hadith dataset.
Reply ONLY with a valid JSON object — no markdown, no preamble:
{
"ar_query": "<query in clear Arabic فصحى, ≤25 words>",
"en_query": "<query in clear English, ≤25 words>",
"keywords": ["<3-7 key Arabic or English terms from the question>"],
"intent": "<one of: tafsir | hadith | count | surah_info | auth | general>"
}
Intent Detection Rules (CRITICAL):
- 'tafsir' intent = looking up Quranic verse(s) by partial text, topic, word, or asking about meaning
(ابحث عن آية, find verse, ما تفسير, verse about X, آية عن, الآية التي فيها, verse that says)
IMPORTANT: When user provides Arabic verse text to find, put that text in ar_query verbatim.
- 'hadith' intent = looking up Hadith by text, topic, or asking about meaning (NOT authenticity)
(ابحث عن حديث, find hadith, hadith about, حديث عن, ما معنى حديث, hadith that says)
IMPORTANT: When user provides Arabic hadith text to find, put that text in ar_query verbatim.
- 'auth' intent = asking about Hadith authenticity/grade/verification
(صحيح؟, هل صحيح, is it authentic, verify hadith, درجة الحديث, is this hadith real, هل هذا حديث صحيح)
IMPORTANT: Include the hadith text fragment in ar_query for matching.
- 'surah_info' intent = asking about surah metadata: verse count, revelation type, surah number
(كم عدد آيات سورة, كم آية في سورة, how many verses in surah, is surah X meccan/medinan)
- 'count' intent = asking for WORD frequency/occurrence count
(كم مرة ذُكرت كلمة, how many times is word X mentioned, عدد مرات ذكر كلمة)
NOTE: "كم عدد آيات سورة" is surah_info NOT count!
IMPORTANT: The word being counted MUST be the first keyword.
- 'general' intent = other Islamic questions
Rewriting Rules:
- For verse/hadith text lookups: include the EXACT Arabic text fragment in ar_query
- For topic searches: expand the topic with Arabic synonyms and related terms in keywords
- For word frequency: extract the EXACT keyword being counted as the FIRST keyword
- keywords MUST include core Arabic terms for matching (e.g. صبر, رحمة, صلاة)
Examples:
- "ابحث عن الآية التي فيها إنا أعطيناك الكوثر" → intent: tafsir, ar_query: "إنا أعطيناك الكوثر"
- "Find the verse about patience" → intent: tafsir, keywords: ["صبر", "patience", "الصبر"]
- "ما الآية التي تتحدث عن الصدقة" → intent: tafsir, keywords: ["صدقة", "الصدقة", "إنفاق"]
- "كم عدد آيات سورة آل عمران" → intent: surah_info
- "كم آية في سورة البقرة" → intent: surah_info
- "how many verses in surah al-baqara" → intent: surah_info
- "هل سورة الفاتحة مكية أم مدنية" → intent: surah_info
- "كم مرة ذُكرت كلمة مريم في القرآن" → intent: count, keywords: ["مريم", ...]
- "how many times is mercy mentioned in Quran" → intent: count, keywords: ["رحمة", "mercy", "الرحمة"]
- "هل حديث إنما الأعمال بالنيات صحيح" → intent: auth, ar_query: "إنما الأعمال بالنيات"
- "is the hadith about actions by intentions authentic" → intent: auth, keywords: ["إنما الأعمال بالنيات", "actions", "intentions"]
- "ما معنى حديث إنما الأعمال" → intent: hadith, ar_query: "إنما الأعمال"
- "ابحث عن حديث عن الصبر" → intent: hadith, keywords: ["صبر", "الصبر", "patience"]
- "find hadith about fasting" → intent: hadith, keywords: ["صيام", "صوم", "fasting"]
- "ما حكم الربا في الإسلام" → intent: general, keywords: ["ربا", "الربا", "usury"]
- "هل الحديث ده صحيح: من كان يؤمن بالله" → intent: auth, ar_query: "من كان يؤمن بالله"
"""
_QURAN_REF_NUMERIC = re.compile(r"\b(\d{1,3})\s*:\s*(\d{1,3})\b")
_QURAN_REF_AR_NAME_FIRST = re.compile(
r"(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F\s]+?)"
r"(?:\s+(?:الآية|آية|الايه|ايه)\s*|[\s,:،-]+)(\d{1,3})",
)
_QURAN_REF_AR_VERSE_FIRST = re.compile(
r"(?:الآية|آية|الايه|ايه)\s*(\d{1,3})\s+(?:من|في)\s+(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F\s]+)",
)
_QURAN_REF_EN_NAME_FIRST = re.compile(
r"(?:surah|sura)\s+([A-Za-z'\- ]+?)"
r"(?:\s+(?:ayah|verse|ayat)\s*|[\s,:-]+)(\d{1,3})",
re.I,
)
_QURAN_REF_EN_VERSE_FIRST = re.compile(
r"(?:ayah|verse|ayat)\s*(\d{1,3})\s+(?:of|in)\s+(?:surah|sura)\s+([A-Za-z'\- ]+)",
re.I,
)
_COLLECTION_ALIASES = {
"sahih al-bukhari": "Sahih al-Bukhari",
"sahih bukhari": "Sahih al-Bukhari",
"al bukhari": "Sahih al-Bukhari",
"bukhari": "Sahih al-Bukhari",
"البخاري": "Sahih al-Bukhari",
"صحيح البخاري": "Sahih al-Bukhari",
"sahih muslim": "Sahih Muslim",
"muslim": "Sahih Muslim",
"مسلم": "Sahih Muslim",
"صحيح مسلم": "Sahih Muslim",
"sunan an nasai": "Sunan an-Nasai",
"sunan an-nasai": "Sunan an-Nasai",
"nasai": "Sunan an-Nasai",
"nasa'i": "Sunan an-Nasai",
"نسائي": "Sunan an-Nasai",
"النسائي": "Sunan an-Nasai",
"sunan abu dawood": "Sunan Abu Dawood",
"sunan abu dawood": "Sunan Abu Dawood",
"abu dawood": "Sunan Abu Dawood",
"abu dawood": "Sunan Abu Dawood",
"أبو داود": "Sunan Abu Dawood",
"ابو داود": "Sunan Abu Dawood",
"jami at tirmidhi": "Jami' at-Tirmidhi",
"jami at-tirmidhi": "Jami' at-Tirmidhi",
"tirmidhi": "Jami' at-Tirmidhi",
"الترمذي": "Jami' at-Tirmidhi",
"ترمذي": "Jami' at-Tirmidhi",
"sunan ibn majah": "Sunan Ibn Majah",
"ibn majah": "Sunan Ibn Majah",
"ابن ماجه": "Sunan Ibn Majah",
"sunan al darimi": "Sunan al-Darimi",
"sunan al-darimi": "Sunan al-Darimi",
"darimi": "Sunan al-Darimi",
"الدارمي": "Sunan al-Darimi",
"muwatta malik": "Muwatta Malik",
"muwatta": "Muwatta Malik",
"موطأ مالك": "Muwatta Malik",
"موطا مالك": "Muwatta Malik",
"malik": "Muwatta Malik",
"musnad ahmad": "Musnad Ahmad",
"ahmad": "Musnad Ahmad",
"ahmed": "Musnad Ahmad",
"أحمد": "Musnad Ahmad",
"مسند أحمد": "Musnad Ahmad",
}
_SORTED_COLLECTION_ALIASES = sorted(_COLLECTION_ALIASES.items(), key=lambda item: len(item[0]), reverse=True)
def _find_balanced_json(text: str) -> Optional[str]:
"""Find the first balanced {...} in *text*; return it or None."""
start = text.find("{")
if start == -1:
return None
depth = 0
in_string = False
escaped = False
for idx in range(start, len(text)):
ch = text[idx]
if escaped:
escaped = False
continue
if ch == "\\":
escaped = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return text[start:idx + 1]
return None
def _extract_json_object(text: str) -> Optional[str]:
"""Extract the first balanced JSON object from model output."""
if not text:
return None
# Remove common wrappers some models add around structured responses.
cleaned = re.sub(r"```(?:json)?", "", text, flags=re.IGNORECASE)
cleaned = cleaned.replace("```", "")
# Strip closed <think> blocks
cleaned = re.sub(r"<think>[\s\S]*?</think>", "", cleaned, flags=re.IGNORECASE)
cleaned = cleaned.strip()
# Before stripping unclosed <think> (which removes everything after it),
# check if a JSON object exists anywhere in the remaining text —
# the model may have emitted JSON inside a truncated think block.
if "{" not in cleaned:
# No JSON at all, strip unclosed think and give up
cleaned = re.sub(r"<think>[\s\S]*$", "", cleaned, flags=re.IGNORECASE).strip()
if not cleaned:
return None
else:
# Try to extract JSON first; only strip unclosed <think> if that fails
candidate = _find_balanced_json(cleaned)
if candidate:
return candidate
cleaned = re.sub(r"<think>[\s\S]*$", "", cleaned, flags=re.IGNORECASE).strip()
if not cleaned:
return None
if cleaned.startswith("{") and cleaned.endswith("}"):
return cleaned
return _find_balanced_json(cleaned)
def _detect_intent_regex(query: str) -> str:
"""Detect intent from raw query using regex when LLM rewrite is unavailable."""
# surah_info: asking about surah metadata (verse count, type, etc.)
if re.search(
r"كم\s+(?:عدد\s+)?آيات?|عدد\s+آيات?|كم\s+آية|how many\s+verses?|number of\s+verses?",
query, re.I,
):
return "surah_info"
if re.search(
r"(?:هل|ما\s+نوع)\s+(?:سورة|سوره)\s+.+\s+(?:مكية|مدنية)",
query,
):
return "surah_info"
# count: word frequency
if re.search(
r"كم مرة|كم تكرر|عدد مرات|تكرار|كم ذُكر|كم وردت?",
query,
):
return "count"
if re.search(
r"\b(how many times?|count|frequency|occurrences? of)\b",
query, re.I,
):
return "count"
# auth: hadith authenticity check
if re.search(
r"صحيح[؟?]|هل صحيح|درجة الحديث|هل هذا حديث|is.+authentic|verify hadith|hadith.+grade",
query, re.I,
):
return "auth"
# hadith: hadith lookup (check before tafsir since both can match Arabic text)
if re.search(
r"حديث\s+عن|ابحث عن حديث|ما معنى حديث|find hadith|hadith about",
query, re.I,
):
return "hadith"
# tafsir: Quranic verse lookup — Arabic verse text or explicit tafsir request
if re.search(
r"ابحث عن آية|ما تفسير|تفسير آية|آية عن|الآية التي|find verse|verse about|tafsir",
query, re.I,
):
return "tafsir"
# If query contains substantial Arabic with Quranic markers (diacritics, special chars),
# treat as tafsir (verse text lookup)
ar_chars = len(re.findall(r"[\u0600-\u06FF]", query))
diacritics = len(re.findall(r"[\u064B-\u0655\u0670\u06D6-\u06ED\u06E1-\u06E9\u0610-\u061A]", query))
if ar_chars > 10 and diacritics >= 3:
return "tafsir"
return "general"
async def rewrite_query(raw: str, llm: LLMProvider) -> Dict:
"""Rewrite query for better retrieval."""
cached = await rewrite_cache.get(raw)
if cached:
return cached
detected_intent = _detect_intent_regex(raw)
fallback = {
"ar_query": normalize_arabic(raw),
"en_query": raw,
"keywords": [light_stem(t) for t in tokenize_ar(raw)][:7],
"intent": detected_intent,
}
try:
messages = [
{"role": "system", "content": _REWRITE_SYSTEM},
{"role": "user", "content": raw},
]
text = ""
for _attempt, temp in enumerate((0.0, 0.3)):
text = await llm.chat(
messages=messages, max_tokens=1024, temperature=temp,
)
if text.strip():
break
logger.warning("Empty rewrite response (attempt %d), retrying with temperature=%.1f",
_attempt + 1, 0.3)
logger.debug("Raw rewrite response (%d chars): %.300s", len(text), text)
json_payload = _extract_json_object(text)
if not json_payload:
raise ValueError(
f"Model did not return a JSON object (got {len(text)} chars: {text[:120]!r})"
)
result = json.loads(json_payload)
for k in ("ar_query", "en_query", "keywords", "intent"):
result.setdefault(k, fallback[k])
if not isinstance(result.get("keywords"), list):
result["keywords"] = fallback["keywords"]
else:
result["keywords"] = [str(x).strip() for x in result["keywords"] if str(x).strip()][:7]
if not result["keywords"]:
result["keywords"] = fallback["keywords"]
result["intent"] = str(result.get("intent") or fallback["intent"]).strip().lower()
if result["intent"] == "fatwa":
result["intent"] = "general"
if result["intent"] not in {"tafsir", "hadith", "count", "surah_info", "auth", "general"}:
result["intent"] = fallback["intent"]
result["ar_query"] = str(result.get("ar_query") or fallback["ar_query"]).strip()[:400]
result["en_query"] = str(result.get("en_query") or fallback["en_query"]).strip()[:400]
await rewrite_cache.set(result, raw)
logger.info("Rewrite: intent=%s ar=%s", result["intent"], result["ar_query"][:60])
return result
except Exception as exc:
logger.warning("Query rewrite failed (%s) — using fallback (intent=%s)", exc, fallback["intent"])
return fallback
def result_key(item: dict) -> tuple:
"""Build a stable key for deduplicating search results."""
item_type = item.get("type", "")
if item_type == "quran":
return (
"quran",
int(item.get("surah_number") or 0),
int(item.get("ayah_number") or item.get("verse_number") or 0),
)
if item_type == "hadith":
return (
"hadith",
normalize_arabic(item.get("collection", ""), aggressive=True).lower(),
int(item.get("hadith_number") or 0),
normalize_arabic(item.get("reference", ""), aggressive=True).lower(),
)
return (
item_type,
normalize_arabic(item.get("source") or item.get("reference", ""), aggressive=True).lower(),
normalize_arabic(item.get("arabic", "")[:80], aggressive=True).lower(),
item.get("english", "")[:80].lower(),
)
def merge_search_results(*result_groups: list, limit: Optional[int] = None) -> list:
"""Merge multiple ranked result groups, deduplicating by stable content key."""
merged: dict[tuple, dict] = {}
for item in chain.from_iterable(result_groups):
key = result_key(item)
current = merged.get(key)
if current is None or item.get("_score", 0.0) > current.get("_score", 0.0):
merged[key] = item
results = sorted(merged.values(), key=lambda row: row.get("_score", 0.0), reverse=True)
return results[:limit] if limit is not None else results
def normalize_collection_name(text: str) -> Optional[str]:
"""Resolve a collection alias to the canonical dataset collection name."""
if not text:
return None
normalized = normalize_arabic(text, aggressive=True).lower()
normalized = normalized.replace("_", " ")
normalized = re.sub(r"[^a-z0-9\u0600-\u06FF\s'\-]+", " ", normalized)
normalized = re.sub(r"\s+", " ", normalized).strip()
for alias, canonical in _SORTED_COLLECTION_ALIASES:
if alias in normalized:
return canonical
return None
def filter_results_by_collection(results: list, collection: Optional[str]) -> list:
"""Filter hadith results by canonical or fuzzy collection name."""
if not collection:
return list(results)
canonical = normalize_collection_name(collection)
collection_norm = normalize_arabic(collection, aggressive=True).lower().strip()
filtered = []
for item in results:
haystack = normalize_arabic(
f"{item.get('collection', '')} {item.get('reference', '')}",
aggressive=True,
).lower()
if canonical and item.get("collection", "") == canonical:
filtered.append(item)
continue
if collection_norm and collection_norm in haystack:
filtered.append(item)
return filtered
def _surah_matches(item: dict, surah_query: str) -> bool:
query_norm = normalize_arabic(surah_query, aggressive=True).lower().strip()
query_clean = re.sub(r"^(ال|al[\-\s']*)", "", query_norm, flags=re.I).strip()
for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"):
value = item.get(field, "")
if not value:
continue
value_norm = normalize_arabic(value, aggressive=True).lower().strip()
value_clean = re.sub(r"^(ال|al[\-\s']*)", "", value_norm, flags=re.I).strip()
if query_norm == value_norm or query_clean == value_clean:
return True
if query_clean and value_clean and (query_clean in value_clean or value_clean in query_clean):
return True
return False
def lookup_quran_verses(query: str, dataset: list, limit: int = 5) -> list:
"""Resolve direct Quran references like 2:255 or Surah Al-Baqarah 255."""
if not query:
return []
matches = []
numeric = _QURAN_REF_NUMERIC.search(query)
if numeric:
surah_num, verse_num = int(numeric.group(1)), int(numeric.group(2))
for item in dataset:
if item.get("type") != "quran":
continue
if item.get("surah_number") == surah_num and (item.get("ayah_number") or item.get("verse_number")) == verse_num:
matches.append({**item, "_score": 9.5})
return matches
named_patterns = (
(_QURAN_REF_AR_NAME_FIRST, lambda m: (m.group(1), int(m.group(2)))),
(_QURAN_REF_AR_VERSE_FIRST, lambda m: (m.group(2), int(m.group(1)))),
(_QURAN_REF_EN_NAME_FIRST, lambda m: (m.group(1), int(m.group(2)))),
(_QURAN_REF_EN_VERSE_FIRST, lambda m: (m.group(2), int(m.group(1)))),
)
for pattern, extractor in named_patterns:
match = pattern.search(query)
if not match:
continue
surah_query, verse_num = extractor(match)
for item in dataset:
if item.get("type") != "quran":
continue
if _surah_matches(item, surah_query) and (item.get("ayah_number") or item.get("verse_number")) == verse_num:
matches.append({**item, "_score": 9.0})
if matches:
break
return matches[:limit]
def lookup_hadith_references(query: str, dataset: list, collection: Optional[str] = None, limit: int = 5) -> list:
"""Resolve direct hadith references like Bukhari 1 or مسلم 1907."""
if not query and not collection:
return []
canonical_collection = normalize_collection_name(collection or "") or normalize_collection_name(query)
number_match = re.search(r"\b(\d{1,5})\b", query)
if not canonical_collection or not number_match:
return []
hadith_number = int(number_match.group(1))
matches = []
for item in dataset:
if item.get("type") != "hadith":
continue
if item.get("collection") != canonical_collection:
continue
if int(item.get("hadith_number") or 0) != hadith_number:
continue
matches.append({**item, "_score": 9.0})
return matches[:limit]
# ═══════════════════════════════════════════════════════════════════════
# BM25 SCORING
# ═══════════════════════════════════════════════════════════════════════
def _bm25_score(
query_terms: List[str],
doc_text: str,
avg_dl: float,
k1: float = 1.5,
b: float = 0.75,
) -> float:
"""BM25 term-frequency scoring."""
doc_tokens = tokenize_ar(doc_text)
dl = len(doc_tokens)
tf = Counter(doc_tokens)
score = 0.0
for term in query_terms:
f = tf.get(term, 0)
score += (f * (k1 + 1)) / (f + k1 * (1 - b + b * dl / max(avg_dl, 1)))
return score
# ═══════════════════════════════════════════════════════════════════════
# HYBRID SEARCH — dense FAISS + BM25 re-ranking + filtering
# ═══════════════════════════════════════════════════════════════════════
async def hybrid_search(
raw_query: str,
rewrite: Dict,
embed_model: SentenceTransformer,
index: faiss.Index,
dataset: list,
top_n: int = cfg.TOP_K_RETURN,
source_type: Optional[Literal["quran", "hadith"]] = None,
grade_filter: Optional[str] = None,
) -> list:
"""Hybrid search: dense + sparse with optional filtering."""
cache_key = (raw_query, top_n, source_type, grade_filter)
cached = await search_cache.get(*cache_key)
if cached:
return cached
# ── 1. Dual-language dense retrieval ──────────────────────────────
ar_q = "query: " + rewrite["ar_query"]
en_q = "query: " + rewrite["en_query"]
embeddings = embed_model.encode(
[ar_q, en_q], normalize_embeddings=True, batch_size=2
).astype("float32")
fused = embeddings[0] + embeddings[1]
fused /= np.linalg.norm(fused)
distances, indices = index.search(fused.reshape(1, -1), cfg.TOP_K_SEARCH)
# ── 2. De-duplicate candidates & apply filters ─────────────────────
seen: set = set()
candidates = []
for dist, idx in zip(distances[0], indices[0]):
item_idx = int(idx)
if item_idx not in seen and 0 <= item_idx < len(dataset):
seen.add(item_idx)
item = dataset[item_idx]
if source_type and item.get("type") != source_type:
continue
if grade_filter and item.get("type") == "hadith":
item_grade = item.get("grade", "").lower()
if grade_filter.lower() not in item_grade:
continue
candidates.append({**item, "_dense": float(dist)})
if not candidates:
return []
# ── 3. BM25 sparse scoring ─────────────────────────────────────────
query_terms = [
light_stem(kw) for kw in rewrite.get("keywords", raw_query.split())
]
avg_dl = sum(
len(tokenize_ar(c.get("arabic", "") + " " + c.get("english", "")))
for c in candidates
) / max(len(candidates), 1)
for c in candidates:
doc = c.get("arabic", "") + " " + c.get("english", "")
c["_sparse"] = _bm25_score(query_terms, doc, avg_dl)
# ── 3.5. Phrase matching boost for exact snippets ───────────────────
query_norm = normalize_arabic(raw_query, aggressive=False).lower()
for c in candidates:
if c.get("type") == "hadith":
ar_norm = normalize_arabic(c.get("arabic", ""), aggressive=False).lower()
query_fragments = query_norm.split()
for i in range(len(query_fragments) - 2):
phrase = " ".join(query_fragments[i:i+3])
if len(phrase) > 5 and phrase in ar_norm:
c["_sparse"] += 2.0
break
# ── 4. Score fusion ────────────────────────────────────────────────
α = cfg.RERANK_ALPHA
intent = rewrite.get("intent", "general")
if intent == "auth":
α = 0.75
max_sparse = max((c["_sparse"] for c in candidates), default=1.0) or 1.0
for c in candidates:
base_score = α * c["_dense"] + (1 - α) * c["_sparse"] / max_sparse
if intent == "hadith" and c.get("type") == "hadith":
base_score += cfg.HADITH_BOOST
c["_score"] = base_score
candidates.sort(key=lambda x: x["_score"], reverse=True)
results = candidates[:top_n]
await search_cache.set(results, *cache_key)
return results
# ═══════════════════════════════════════════════════════════════════════
# TEXT-BASED SEARCH (exact substring + fuzzy matching)
# ═══════════════════════════════════════════════════════════════════════
def text_search(
query: str,
dataset: list,
source_type: Optional[Literal["quran", "hadith"]] = None,
limit: int = 10,
) -> list:
"""Search dataset by exact text match (Arabic or English).
Returns items sorted by relevance: exact matches first, then partial.
"""
q_norm = normalize_arabic(query, aggressive=True).lower()
q_lower = query.lower().strip()
results = []
for item in dataset:
if source_type and item.get("type") != source_type:
continue
ar_raw = item.get("arabic", "")
en_raw = item.get("english", "")
ar_norm = normalize_arabic(ar_raw, aggressive=True).lower()
en_lower = en_raw.lower()
score = 0.0
# Exact substring in raw Arabic (with diacritics) — highest priority
if query.strip() in ar_raw:
score = max(score, 5.0)
# Exact substring in normalized Arabic
if q_norm and q_norm in ar_norm:
# Boost for shorter docs (more specific match)
score = max(score, 3.0 + (1.0 / max(len(ar_norm), 1)) * 100)
# Exact substring in English
if q_lower and q_lower in en_lower:
score = max(score, 2.0 + (1.0 / max(len(en_lower), 1)) * 100)
# N-gram phrase matching for partial Arabic text (3+ word sequences)
if score == 0.0 and q_norm:
q_words = q_norm.split()
if len(q_words) >= 3:
# Check sliding windows of 3 words from query against doc
for i in range(len(q_words) - 2):
trigram = " ".join(q_words[i:i+3])
if trigram in ar_norm:
score = max(score, 2.0 + (i == 0) * 0.5)
break
if score == 0.0 and len(q_words) >= 2:
for i in range(len(q_words) - 1):
bigram = " ".join(q_words[i:i+2])
if bigram in ar_norm or bigram in en_lower:
score = max(score, 1.5)
break
# Word-level overlap for lower-confidence matches
if score == 0.0:
q_tokens = set(q_norm.split())
ar_tokens = set(ar_norm.split())
en_tokens = set(en_lower.split())
ar_overlap = len(q_tokens & ar_tokens)
en_overlap = len(q_tokens & en_tokens)
best_overlap = max(ar_overlap, en_overlap)
if best_overlap >= max(2, len(q_tokens) * 0.5):
score = best_overlap / max(len(q_tokens), 1)
# Fuzzy similarity — catch 80%+ similar text (typos, slight differences)
if score == 0.0 and len(q_norm) >= 10:
q_len = len(q_norm)
for text in (ar_norm, en_lower):
if not text:
continue
# Only compare when lengths are comparable (within 3x)
if len(text) > q_len * 3:
continue
ratio = SequenceMatcher(None, q_norm, text).ratio()
if ratio >= 0.80:
score = max(score, 1.0 + ratio) # 1.80–2.0 range
if score > 0:
results.append({**item, "_score": score})
results.sort(key=lambda x: x["_score"], reverse=True)
return results[:limit]
def build_context(results: list) -> str:
"""Format search results into context block for LLM."""
lines = []
for i, r in enumerate(results, 1):
source = r.get("source") or r.get("reference") or "Unknown Source"
item_type = "Quranic Verse" if r.get("type") == "quran" else "Hadith"
grade_str = f" [Grade: {r.get('grade')}]" if r.get("grade") else ""
block = (
f"[{i}] 📌 {item_type}{grade_str} | {source} | score: {r.get('_score', 0):.3f}\n"
f" Arabic : {r.get('arabic', '')}\n"
f" English: {r.get('english', '')}"
)
# Add explicit structured metadata for Quran verses
if r.get("type") == "quran":
block += (
f"\n [REF] Surah: {r.get('surah_name_ar', '')} ({r.get('surah_name_en', '')}) "
f"| Surah Number: {r.get('surah_number', '')} "
f"| Ayah: {r.get('verse_number', '')}"
)
lines.append(block)
return "\n\n".join(lines)