| """Hybrid search engine — dense FAISS + BM25 re-ranking + text search.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import re |
| from collections import Counter |
| from itertools import chain |
| from difflib import SequenceMatcher |
| from typing import Dict, List, Literal, Optional |
|
|
| import faiss |
| import numpy as np |
| from sentence_transformers import SentenceTransformer |
|
|
| from app.arabic_nlp import light_stem, normalize_arabic, tokenize_ar |
| from app.cache import rewrite_cache, search_cache |
| from app.config import cfg |
| from app.llm import LLMProvider |
|
|
| logger = logging.getLogger("qmodel.search") |
|
|
|
|
| |
| |
| |
| _REWRITE_SYSTEM = """\ |
| You are an Islamic-scholarship search query optimizer. |
| Your ONLY job: rewrite the user's question to maximise retrieval from a Quranic + Hadith dataset. |
| |
| Reply ONLY with a valid JSON object — no markdown, no preamble: |
| { |
| "ar_query": "<query in clear Arabic فصحى, ≤25 words>", |
| "en_query": "<query in clear English, ≤25 words>", |
| "keywords": ["<3-7 key Arabic or English terms from the question>"], |
| "intent": "<one of: tafsir | hadith | count | surah_info | auth | general>" |
| } |
| |
| Intent Detection Rules (CRITICAL): |
| - 'tafsir' intent = looking up Quranic verse(s) by partial text, topic, word, or asking about meaning |
| (ابحث عن آية, find verse, ما تفسير, verse about X, آية عن, الآية التي فيها, verse that says) |
| IMPORTANT: When user provides Arabic verse text to find, put that text in ar_query verbatim. |
| - 'hadith' intent = looking up Hadith by text, topic, or asking about meaning (NOT authenticity) |
| (ابحث عن حديث, find hadith, hadith about, حديث عن, ما معنى حديث, hadith that says) |
| IMPORTANT: When user provides Arabic hadith text to find, put that text in ar_query verbatim. |
| - 'auth' intent = asking about Hadith authenticity/grade/verification |
| (صحيح؟, هل صحيح, is it authentic, verify hadith, درجة الحديث, is this hadith real, هل هذا حديث صحيح) |
| IMPORTANT: Include the hadith text fragment in ar_query for matching. |
| - 'surah_info' intent = asking about surah metadata: verse count, revelation type, surah number |
| (كم عدد آيات سورة, كم آية في سورة, how many verses in surah, is surah X meccan/medinan) |
| - 'count' intent = asking for WORD frequency/occurrence count |
| (كم مرة ذُكرت كلمة, how many times is word X mentioned, عدد مرات ذكر كلمة) |
| NOTE: "كم عدد آيات سورة" is surah_info NOT count! |
| IMPORTANT: The word being counted MUST be the first keyword. |
| - 'general' intent = other Islamic questions |
| |
| Rewriting Rules: |
| - For verse/hadith text lookups: include the EXACT Arabic text fragment in ar_query |
| - For topic searches: expand the topic with Arabic synonyms and related terms in keywords |
| - For word frequency: extract the EXACT keyword being counted as the FIRST keyword |
| - keywords MUST include core Arabic terms for matching (e.g. صبر, رحمة, صلاة) |
| |
| Examples: |
| - "ابحث عن الآية التي فيها إنا أعطيناك الكوثر" → intent: tafsir, ar_query: "إنا أعطيناك الكوثر" |
| - "Find the verse about patience" → intent: tafsir, keywords: ["صبر", "patience", "الصبر"] |
| - "ما الآية التي تتحدث عن الصدقة" → intent: tafsir, keywords: ["صدقة", "الصدقة", "إنفاق"] |
| - "كم عدد آيات سورة آل عمران" → intent: surah_info |
| - "كم آية في سورة البقرة" → intent: surah_info |
| - "how many verses in surah al-baqara" → intent: surah_info |
| - "هل سورة الفاتحة مكية أم مدنية" → intent: surah_info |
| - "كم مرة ذُكرت كلمة مريم في القرآن" → intent: count, keywords: ["مريم", ...] |
| - "how many times is mercy mentioned in Quran" → intent: count, keywords: ["رحمة", "mercy", "الرحمة"] |
| - "هل حديث إنما الأعمال بالنيات صحيح" → intent: auth, ar_query: "إنما الأعمال بالنيات" |
| - "is the hadith about actions by intentions authentic" → intent: auth, keywords: ["إنما الأعمال بالنيات", "actions", "intentions"] |
| - "ما معنى حديث إنما الأعمال" → intent: hadith, ar_query: "إنما الأعمال" |
| - "ابحث عن حديث عن الصبر" → intent: hadith, keywords: ["صبر", "الصبر", "patience"] |
| - "find hadith about fasting" → intent: hadith, keywords: ["صيام", "صوم", "fasting"] |
| - "ما حكم الربا في الإسلام" → intent: general, keywords: ["ربا", "الربا", "usury"] |
| - "هل الحديث ده صحيح: من كان يؤمن بالله" → intent: auth, ar_query: "من كان يؤمن بالله" |
| """ |
|
|
|
|
| _QURAN_REF_NUMERIC = re.compile(r"\b(\d{1,3})\s*:\s*(\d{1,3})\b") |
| _QURAN_REF_AR_NAME_FIRST = re.compile( |
| r"(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F\s]+?)" |
| r"(?:\s+(?:الآية|آية|الايه|ايه)\s*|[\s,:،-]+)(\d{1,3})", |
| ) |
| _QURAN_REF_AR_VERSE_FIRST = re.compile( |
| r"(?:الآية|آية|الايه|ايه)\s*(\d{1,3})\s+(?:من|في)\s+(?:سورة|سوره)\s+([\u0621-\u06FF\u0750-\u077F\s]+)", |
| ) |
| _QURAN_REF_EN_NAME_FIRST = re.compile( |
| r"(?:surah|sura)\s+([A-Za-z'\- ]+?)" |
| r"(?:\s+(?:ayah|verse|ayat)\s*|[\s,:-]+)(\d{1,3})", |
| re.I, |
| ) |
| _QURAN_REF_EN_VERSE_FIRST = re.compile( |
| r"(?:ayah|verse|ayat)\s*(\d{1,3})\s+(?:of|in)\s+(?:surah|sura)\s+([A-Za-z'\- ]+)", |
| re.I, |
| ) |
|
|
| _COLLECTION_ALIASES = { |
| "sahih al-bukhari": "Sahih al-Bukhari", |
| "sahih bukhari": "Sahih al-Bukhari", |
| "al bukhari": "Sahih al-Bukhari", |
| "bukhari": "Sahih al-Bukhari", |
| "البخاري": "Sahih al-Bukhari", |
| "صحيح البخاري": "Sahih al-Bukhari", |
| "sahih muslim": "Sahih Muslim", |
| "muslim": "Sahih Muslim", |
| "مسلم": "Sahih Muslim", |
| "صحيح مسلم": "Sahih Muslim", |
| "sunan an nasai": "Sunan an-Nasai", |
| "sunan an-nasai": "Sunan an-Nasai", |
| "nasai": "Sunan an-Nasai", |
| "nasa'i": "Sunan an-Nasai", |
| "نسائي": "Sunan an-Nasai", |
| "النسائي": "Sunan an-Nasai", |
| "sunan abu dawood": "Sunan Abu Dawood", |
| "sunan abu dawood": "Sunan Abu Dawood", |
| "abu dawood": "Sunan Abu Dawood", |
| "abu dawood": "Sunan Abu Dawood", |
| "أبو داود": "Sunan Abu Dawood", |
| "ابو داود": "Sunan Abu Dawood", |
| "jami at tirmidhi": "Jami' at-Tirmidhi", |
| "jami at-tirmidhi": "Jami' at-Tirmidhi", |
| "tirmidhi": "Jami' at-Tirmidhi", |
| "الترمذي": "Jami' at-Tirmidhi", |
| "ترمذي": "Jami' at-Tirmidhi", |
| "sunan ibn majah": "Sunan Ibn Majah", |
| "ibn majah": "Sunan Ibn Majah", |
| "ابن ماجه": "Sunan Ibn Majah", |
| "sunan al darimi": "Sunan al-Darimi", |
| "sunan al-darimi": "Sunan al-Darimi", |
| "darimi": "Sunan al-Darimi", |
| "الدارمي": "Sunan al-Darimi", |
| "muwatta malik": "Muwatta Malik", |
| "muwatta": "Muwatta Malik", |
| "موطأ مالك": "Muwatta Malik", |
| "موطا مالك": "Muwatta Malik", |
| "malik": "Muwatta Malik", |
| "musnad ahmad": "Musnad Ahmad", |
| "ahmad": "Musnad Ahmad", |
| "ahmed": "Musnad Ahmad", |
| "أحمد": "Musnad Ahmad", |
| "مسند أحمد": "Musnad Ahmad", |
| } |
| _SORTED_COLLECTION_ALIASES = sorted(_COLLECTION_ALIASES.items(), key=lambda item: len(item[0]), reverse=True) |
|
|
|
|
| def _find_balanced_json(text: str) -> Optional[str]: |
| """Find the first balanced {...} in *text*; return it or None.""" |
| start = text.find("{") |
| if start == -1: |
| return None |
| depth = 0 |
| in_string = False |
| escaped = False |
| for idx in range(start, len(text)): |
| ch = text[idx] |
| if escaped: |
| escaped = False |
| continue |
| if ch == "\\": |
| escaped = True |
| continue |
| if ch == '"': |
| in_string = not in_string |
| continue |
| if in_string: |
| continue |
| if ch == "{": |
| depth += 1 |
| elif ch == "}": |
| depth -= 1 |
| if depth == 0: |
| return text[start:idx + 1] |
| return None |
|
|
|
|
| def _extract_json_object(text: str) -> Optional[str]: |
| """Extract the first balanced JSON object from model output.""" |
| if not text: |
| return None |
|
|
| |
| cleaned = re.sub(r"```(?:json)?", "", text, flags=re.IGNORECASE) |
| cleaned = cleaned.replace("```", "") |
| |
| cleaned = re.sub(r"<think>[\s\S]*?</think>", "", cleaned, flags=re.IGNORECASE) |
| cleaned = cleaned.strip() |
|
|
| |
| |
| |
| if "{" not in cleaned: |
| |
| cleaned = re.sub(r"<think>[\s\S]*$", "", cleaned, flags=re.IGNORECASE).strip() |
| if not cleaned: |
| return None |
| else: |
| |
| candidate = _find_balanced_json(cleaned) |
| if candidate: |
| return candidate |
| cleaned = re.sub(r"<think>[\s\S]*$", "", cleaned, flags=re.IGNORECASE).strip() |
| if not cleaned: |
| return None |
|
|
| if cleaned.startswith("{") and cleaned.endswith("}"): |
| return cleaned |
|
|
| return _find_balanced_json(cleaned) |
|
|
|
|
| def _detect_intent_regex(query: str) -> str: |
| """Detect intent from raw query using regex when LLM rewrite is unavailable.""" |
| |
| if re.search( |
| r"كم\s+(?:عدد\s+)?آيات?|عدد\s+آيات?|كم\s+آية|how many\s+verses?|number of\s+verses?", |
| query, re.I, |
| ): |
| return "surah_info" |
| if re.search( |
| r"(?:هل|ما\s+نوع)\s+(?:سورة|سوره)\s+.+\s+(?:مكية|مدنية)", |
| query, |
| ): |
| return "surah_info" |
|
|
| |
| if re.search( |
| r"كم مرة|كم تكرر|عدد مرات|تكرار|كم ذُكر|كم وردت?", |
| query, |
| ): |
| return "count" |
| if re.search( |
| r"\b(how many times?|count|frequency|occurrences? of)\b", |
| query, re.I, |
| ): |
| return "count" |
|
|
| |
| if re.search( |
| r"صحيح[؟?]|هل صحيح|درجة الحديث|هل هذا حديث|is.+authentic|verify hadith|hadith.+grade", |
| query, re.I, |
| ): |
| return "auth" |
|
|
| |
| if re.search( |
| r"حديث\s+عن|ابحث عن حديث|ما معنى حديث|find hadith|hadith about", |
| query, re.I, |
| ): |
| return "hadith" |
|
|
| |
| if re.search( |
| r"ابحث عن آية|ما تفسير|تفسير آية|آية عن|الآية التي|find verse|verse about|tafsir", |
| query, re.I, |
| ): |
| return "tafsir" |
|
|
| |
| |
| ar_chars = len(re.findall(r"[\u0600-\u06FF]", query)) |
| diacritics = len(re.findall(r"[\u064B-\u0655\u0670\u06D6-\u06ED\u06E1-\u06E9\u0610-\u061A]", query)) |
| if ar_chars > 10 and diacritics >= 3: |
| return "tafsir" |
|
|
| return "general" |
|
|
|
|
| async def rewrite_query(raw: str, llm: LLMProvider) -> Dict: |
| """Rewrite query for better retrieval.""" |
| cached = await rewrite_cache.get(raw) |
| if cached: |
| return cached |
|
|
| detected_intent = _detect_intent_regex(raw) |
| fallback = { |
| "ar_query": normalize_arabic(raw), |
| "en_query": raw, |
| "keywords": [light_stem(t) for t in tokenize_ar(raw)][:7], |
| "intent": detected_intent, |
| } |
| try: |
| messages = [ |
| {"role": "system", "content": _REWRITE_SYSTEM}, |
| {"role": "user", "content": raw}, |
| ] |
| text = "" |
| for _attempt, temp in enumerate((0.0, 0.3)): |
| text = await llm.chat( |
| messages=messages, max_tokens=1024, temperature=temp, |
| ) |
| if text.strip(): |
| break |
| logger.warning("Empty rewrite response (attempt %d), retrying with temperature=%.1f", |
| _attempt + 1, 0.3) |
| logger.debug("Raw rewrite response (%d chars): %.300s", len(text), text) |
| json_payload = _extract_json_object(text) |
| if not json_payload: |
| raise ValueError( |
| f"Model did not return a JSON object (got {len(text)} chars: {text[:120]!r})" |
| ) |
| result = json.loads(json_payload) |
| for k in ("ar_query", "en_query", "keywords", "intent"): |
| result.setdefault(k, fallback[k]) |
|
|
| if not isinstance(result.get("keywords"), list): |
| result["keywords"] = fallback["keywords"] |
| else: |
| result["keywords"] = [str(x).strip() for x in result["keywords"] if str(x).strip()][:7] |
| if not result["keywords"]: |
| result["keywords"] = fallback["keywords"] |
|
|
| result["intent"] = str(result.get("intent") or fallback["intent"]).strip().lower() |
| if result["intent"] == "fatwa": |
| result["intent"] = "general" |
| if result["intent"] not in {"tafsir", "hadith", "count", "surah_info", "auth", "general"}: |
| result["intent"] = fallback["intent"] |
|
|
| result["ar_query"] = str(result.get("ar_query") or fallback["ar_query"]).strip()[:400] |
| result["en_query"] = str(result.get("en_query") or fallback["en_query"]).strip()[:400] |
|
|
| await rewrite_cache.set(result, raw) |
| logger.info("Rewrite: intent=%s ar=%s", result["intent"], result["ar_query"][:60]) |
| return result |
| except Exception as exc: |
| logger.warning("Query rewrite failed (%s) — using fallback (intent=%s)", exc, fallback["intent"]) |
| return fallback |
|
|
|
|
| def result_key(item: dict) -> tuple: |
| """Build a stable key for deduplicating search results.""" |
| item_type = item.get("type", "") |
| if item_type == "quran": |
| return ( |
| "quran", |
| int(item.get("surah_number") or 0), |
| int(item.get("ayah_number") or item.get("verse_number") or 0), |
| ) |
| if item_type == "hadith": |
| return ( |
| "hadith", |
| normalize_arabic(item.get("collection", ""), aggressive=True).lower(), |
| int(item.get("hadith_number") or 0), |
| normalize_arabic(item.get("reference", ""), aggressive=True).lower(), |
| ) |
| return ( |
| item_type, |
| normalize_arabic(item.get("source") or item.get("reference", ""), aggressive=True).lower(), |
| normalize_arabic(item.get("arabic", "")[:80], aggressive=True).lower(), |
| item.get("english", "")[:80].lower(), |
| ) |
|
|
|
|
| def merge_search_results(*result_groups: list, limit: Optional[int] = None) -> list: |
| """Merge multiple ranked result groups, deduplicating by stable content key.""" |
| merged: dict[tuple, dict] = {} |
| for item in chain.from_iterable(result_groups): |
| key = result_key(item) |
| current = merged.get(key) |
| if current is None or item.get("_score", 0.0) > current.get("_score", 0.0): |
| merged[key] = item |
|
|
| results = sorted(merged.values(), key=lambda row: row.get("_score", 0.0), reverse=True) |
| return results[:limit] if limit is not None else results |
|
|
|
|
| def normalize_collection_name(text: str) -> Optional[str]: |
| """Resolve a collection alias to the canonical dataset collection name.""" |
| if not text: |
| return None |
|
|
| normalized = normalize_arabic(text, aggressive=True).lower() |
| normalized = normalized.replace("_", " ") |
| normalized = re.sub(r"[^a-z0-9\u0600-\u06FF\s'\-]+", " ", normalized) |
| normalized = re.sub(r"\s+", " ", normalized).strip() |
|
|
| for alias, canonical in _SORTED_COLLECTION_ALIASES: |
| if alias in normalized: |
| return canonical |
| return None |
|
|
|
|
| def filter_results_by_collection(results: list, collection: Optional[str]) -> list: |
| """Filter hadith results by canonical or fuzzy collection name.""" |
| if not collection: |
| return list(results) |
|
|
| canonical = normalize_collection_name(collection) |
| collection_norm = normalize_arabic(collection, aggressive=True).lower().strip() |
| filtered = [] |
| for item in results: |
| haystack = normalize_arabic( |
| f"{item.get('collection', '')} {item.get('reference', '')}", |
| aggressive=True, |
| ).lower() |
| if canonical and item.get("collection", "") == canonical: |
| filtered.append(item) |
| continue |
| if collection_norm and collection_norm in haystack: |
| filtered.append(item) |
| return filtered |
|
|
|
|
| def _surah_matches(item: dict, surah_query: str) -> bool: |
| query_norm = normalize_arabic(surah_query, aggressive=True).lower().strip() |
| query_clean = re.sub(r"^(ال|al[\-\s']*)", "", query_norm, flags=re.I).strip() |
|
|
| for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"): |
| value = item.get(field, "") |
| if not value: |
| continue |
| value_norm = normalize_arabic(value, aggressive=True).lower().strip() |
| value_clean = re.sub(r"^(ال|al[\-\s']*)", "", value_norm, flags=re.I).strip() |
| if query_norm == value_norm or query_clean == value_clean: |
| return True |
| if query_clean and value_clean and (query_clean in value_clean or value_clean in query_clean): |
| return True |
| return False |
|
|
|
|
| def lookup_quran_verses(query: str, dataset: list, limit: int = 5) -> list: |
| """Resolve direct Quran references like 2:255 or Surah Al-Baqarah 255.""" |
| if not query: |
| return [] |
|
|
| matches = [] |
| numeric = _QURAN_REF_NUMERIC.search(query) |
| if numeric: |
| surah_num, verse_num = int(numeric.group(1)), int(numeric.group(2)) |
| for item in dataset: |
| if item.get("type") != "quran": |
| continue |
| if item.get("surah_number") == surah_num and (item.get("ayah_number") or item.get("verse_number")) == verse_num: |
| matches.append({**item, "_score": 9.5}) |
| return matches |
|
|
| named_patterns = ( |
| (_QURAN_REF_AR_NAME_FIRST, lambda m: (m.group(1), int(m.group(2)))), |
| (_QURAN_REF_AR_VERSE_FIRST, lambda m: (m.group(2), int(m.group(1)))), |
| (_QURAN_REF_EN_NAME_FIRST, lambda m: (m.group(1), int(m.group(2)))), |
| (_QURAN_REF_EN_VERSE_FIRST, lambda m: (m.group(2), int(m.group(1)))), |
| ) |
| for pattern, extractor in named_patterns: |
| match = pattern.search(query) |
| if not match: |
| continue |
| surah_query, verse_num = extractor(match) |
| for item in dataset: |
| if item.get("type") != "quran": |
| continue |
| if _surah_matches(item, surah_query) and (item.get("ayah_number") or item.get("verse_number")) == verse_num: |
| matches.append({**item, "_score": 9.0}) |
| if matches: |
| break |
|
|
| return matches[:limit] |
|
|
|
|
| def lookup_hadith_references(query: str, dataset: list, collection: Optional[str] = None, limit: int = 5) -> list: |
| """Resolve direct hadith references like Bukhari 1 or مسلم 1907.""" |
| if not query and not collection: |
| return [] |
|
|
| canonical_collection = normalize_collection_name(collection or "") or normalize_collection_name(query) |
| number_match = re.search(r"\b(\d{1,5})\b", query) |
| if not canonical_collection or not number_match: |
| return [] |
|
|
| hadith_number = int(number_match.group(1)) |
| matches = [] |
| for item in dataset: |
| if item.get("type") != "hadith": |
| continue |
| if item.get("collection") != canonical_collection: |
| continue |
| if int(item.get("hadith_number") or 0) != hadith_number: |
| continue |
| matches.append({**item, "_score": 9.0}) |
| return matches[:limit] |
|
|
|
|
| |
| |
| |
| def _bm25_score( |
| query_terms: List[str], |
| doc_text: str, |
| avg_dl: float, |
| k1: float = 1.5, |
| b: float = 0.75, |
| ) -> float: |
| """BM25 term-frequency scoring.""" |
| doc_tokens = tokenize_ar(doc_text) |
| dl = len(doc_tokens) |
| tf = Counter(doc_tokens) |
| score = 0.0 |
| for term in query_terms: |
| f = tf.get(term, 0) |
| score += (f * (k1 + 1)) / (f + k1 * (1 - b + b * dl / max(avg_dl, 1))) |
| return score |
|
|
|
|
| |
| |
| |
| async def hybrid_search( |
| raw_query: str, |
| rewrite: Dict, |
| embed_model: SentenceTransformer, |
| index: faiss.Index, |
| dataset: list, |
| top_n: int = cfg.TOP_K_RETURN, |
| source_type: Optional[Literal["quran", "hadith"]] = None, |
| grade_filter: Optional[str] = None, |
| ) -> list: |
| """Hybrid search: dense + sparse with optional filtering.""" |
| cache_key = (raw_query, top_n, source_type, grade_filter) |
| cached = await search_cache.get(*cache_key) |
| if cached: |
| return cached |
|
|
| |
| ar_q = "query: " + rewrite["ar_query"] |
| en_q = "query: " + rewrite["en_query"] |
|
|
| embeddings = embed_model.encode( |
| [ar_q, en_q], normalize_embeddings=True, batch_size=2 |
| ).astype("float32") |
|
|
| fused = embeddings[0] + embeddings[1] |
| fused /= np.linalg.norm(fused) |
|
|
| distances, indices = index.search(fused.reshape(1, -1), cfg.TOP_K_SEARCH) |
|
|
| |
| seen: set = set() |
| candidates = [] |
| for dist, idx in zip(distances[0], indices[0]): |
| item_idx = int(idx) |
| if item_idx not in seen and 0 <= item_idx < len(dataset): |
| seen.add(item_idx) |
| item = dataset[item_idx] |
|
|
| if source_type and item.get("type") != source_type: |
| continue |
|
|
| if grade_filter and item.get("type") == "hadith": |
| item_grade = item.get("grade", "").lower() |
| if grade_filter.lower() not in item_grade: |
| continue |
|
|
| candidates.append({**item, "_dense": float(dist)}) |
|
|
| if not candidates: |
| return [] |
|
|
| |
| query_terms = [ |
| light_stem(kw) for kw in rewrite.get("keywords", raw_query.split()) |
| ] |
| avg_dl = sum( |
| len(tokenize_ar(c.get("arabic", "") + " " + c.get("english", ""))) |
| for c in candidates |
| ) / max(len(candidates), 1) |
|
|
| for c in candidates: |
| doc = c.get("arabic", "") + " " + c.get("english", "") |
| c["_sparse"] = _bm25_score(query_terms, doc, avg_dl) |
|
|
| |
| query_norm = normalize_arabic(raw_query, aggressive=False).lower() |
| for c in candidates: |
| if c.get("type") == "hadith": |
| ar_norm = normalize_arabic(c.get("arabic", ""), aggressive=False).lower() |
| query_fragments = query_norm.split() |
| for i in range(len(query_fragments) - 2): |
| phrase = " ".join(query_fragments[i:i+3]) |
| if len(phrase) > 5 and phrase in ar_norm: |
| c["_sparse"] += 2.0 |
| break |
|
|
| |
| α = cfg.RERANK_ALPHA |
| intent = rewrite.get("intent", "general") |
|
|
| if intent == "auth": |
| α = 0.75 |
|
|
| max_sparse = max((c["_sparse"] for c in candidates), default=1.0) or 1.0 |
|
|
| for c in candidates: |
| base_score = α * c["_dense"] + (1 - α) * c["_sparse"] / max_sparse |
| if intent == "hadith" and c.get("type") == "hadith": |
| base_score += cfg.HADITH_BOOST |
| c["_score"] = base_score |
|
|
| candidates.sort(key=lambda x: x["_score"], reverse=True) |
| results = candidates[:top_n] |
|
|
| await search_cache.set(results, *cache_key) |
| return results |
|
|
|
|
| |
| |
| |
| def text_search( |
| query: str, |
| dataset: list, |
| source_type: Optional[Literal["quran", "hadith"]] = None, |
| limit: int = 10, |
| ) -> list: |
| """Search dataset by exact text match (Arabic or English). |
| |
| Returns items sorted by relevance: exact matches first, then partial. |
| """ |
| q_norm = normalize_arabic(query, aggressive=True).lower() |
| q_lower = query.lower().strip() |
|
|
| results = [] |
| for item in dataset: |
| if source_type and item.get("type") != source_type: |
| continue |
|
|
| ar_raw = item.get("arabic", "") |
| en_raw = item.get("english", "") |
| ar_norm = normalize_arabic(ar_raw, aggressive=True).lower() |
| en_lower = en_raw.lower() |
|
|
| score = 0.0 |
|
|
| |
| if query.strip() in ar_raw: |
| score = max(score, 5.0) |
|
|
| |
| if q_norm and q_norm in ar_norm: |
| |
| score = max(score, 3.0 + (1.0 / max(len(ar_norm), 1)) * 100) |
|
|
| |
| if q_lower and q_lower in en_lower: |
| score = max(score, 2.0 + (1.0 / max(len(en_lower), 1)) * 100) |
|
|
| |
| if score == 0.0 and q_norm: |
| q_words = q_norm.split() |
| if len(q_words) >= 3: |
| |
| for i in range(len(q_words) - 2): |
| trigram = " ".join(q_words[i:i+3]) |
| if trigram in ar_norm: |
| score = max(score, 2.0 + (i == 0) * 0.5) |
| break |
| if score == 0.0 and len(q_words) >= 2: |
| for i in range(len(q_words) - 1): |
| bigram = " ".join(q_words[i:i+2]) |
| if bigram in ar_norm or bigram in en_lower: |
| score = max(score, 1.5) |
| break |
|
|
| |
| if score == 0.0: |
| q_tokens = set(q_norm.split()) |
| ar_tokens = set(ar_norm.split()) |
| en_tokens = set(en_lower.split()) |
| ar_overlap = len(q_tokens & ar_tokens) |
| en_overlap = len(q_tokens & en_tokens) |
| best_overlap = max(ar_overlap, en_overlap) |
| if best_overlap >= max(2, len(q_tokens) * 0.5): |
| score = best_overlap / max(len(q_tokens), 1) |
|
|
| |
| if score == 0.0 and len(q_norm) >= 10: |
| q_len = len(q_norm) |
| for text in (ar_norm, en_lower): |
| if not text: |
| continue |
| |
| if len(text) > q_len * 3: |
| continue |
| ratio = SequenceMatcher(None, q_norm, text).ratio() |
| if ratio >= 0.80: |
| score = max(score, 1.0 + ratio) |
|
|
| if score > 0: |
| results.append({**item, "_score": score}) |
|
|
| results.sort(key=lambda x: x["_score"], reverse=True) |
| return results[:limit] |
|
|
|
|
| def build_context(results: list) -> str: |
| """Format search results into context block for LLM.""" |
| lines = [] |
| for i, r in enumerate(results, 1): |
| source = r.get("source") or r.get("reference") or "Unknown Source" |
| item_type = "Quranic Verse" if r.get("type") == "quran" else "Hadith" |
| grade_str = f" [Grade: {r.get('grade')}]" if r.get("grade") else "" |
|
|
| block = ( |
| f"[{i}] 📌 {item_type}{grade_str} | {source} | score: {r.get('_score', 0):.3f}\n" |
| f" Arabic : {r.get('arabic', '')}\n" |
| f" English: {r.get('english', '')}" |
| ) |
|
|
| |
| if r.get("type") == "quran": |
| block += ( |
| f"\n [REF] Surah: {r.get('surah_name_ar', '')} ({r.get('surah_name_en', '')}) " |
| f"| Surah Number: {r.get('surah_number', '')} " |
| f"| Ayah: {r.get('verse_number', '')}" |
| ) |
|
|
| lines.append(block) |
| return "\n\n".join(lines) |
|
|