| """Quran / Hadith analytics β occurrence counting, surah metadata, dataset stats.""" |
|
|
| from __future__ import annotations |
|
|
| import re |
| from typing import Dict, List, Literal, Optional |
|
|
| from app.arabic_nlp import light_stem, normalize_arabic, tokenize_ar |
| from app.cache import analysis_cache |
| from app.config import cfg |
|
|
| |
| |
| |
| _COUNT_EN = re.compile( |
| r"\b(how many|count|number of|frequency|occurrences? of|how often|" |
| r"times? (does|is|appears?))\b", |
| re.I, |
| ) |
| _COUNT_AR = re.compile( |
| r"(ΩΩ
Ω
Ψ±Ψ©|ΩΩ
ΨΉΨ―Ψ―|ΩΩ
ΨͺΩΨ±Ψ±|ΨΉΨ―Ψ― Ω
Ψ±Ψ§Ψͺ|ΨͺΩΨ±Ψ§Ψ±|ΩΩ
Ψ°ΩΩΨ±|ΩΩ
ΩΨ±Ψ―Ψͺ?)" |
| ) |
|
|
| _SURAH_VERSES_AR = re.compile( |
| r"ΩΩ
\s+(?:ΨΉΨ―Ψ―\s+)?Ψ’ΩΨ§Ψͺ?\s*(?:ΩΩ\s+|ΩΩ\s+)?(?:Ψ³ΩΨ±Ψ©|Ψ³ΩΨ±Ω)" |
| r"|ΨΉΨ―Ψ―\s+Ψ’ΩΨ§Ψͺ?\s+(?:Ψ³ΩΨ±Ψ©|Ψ³ΩΨ±Ω)" |
| r"|ΩΩ
\s+Ψ’ΩΨ©\s+(?:ΩΩ|ΩΩ)\s+(?:Ψ³ΩΨ±Ψ©|Ψ³ΩΨ±Ω)" |
| r"|(?:Ψ³ΩΨ±Ψ©|Ψ³ΩΨ±Ω)\s+[\u0600-\u06FF\s]+\s+(?:ΩΩ
\s+Ψ’ΩΨ©|ΨΉΨ―Ψ―\s+Ψ’ΩΨ§Ψͺ?)" |
| ) |
| _SURAH_VERSES_EN = re.compile( |
| r"(?:how many|number of)\s+(?:verses?|ayat|ayahs?)\s+(?:in|of|does)\b" |
| r"|\bsurah?\b.*\b(?:how many|number of)\s+(?:verses?|ayat|ayahs?)", |
| re.I, |
| ) |
| _SURAH_TYPE_AR = re.compile( |
| r"(?:Ψ³ΩΨ±Ψ©|Ψ³ΩΨ±Ω)\s+[\u0600-\u06FF\s]+\s+(?:Ω
ΩΩΨ©|Ω
Ψ―ΩΩΨ©|Ω
ΩΩ|Ω
Ψ―ΩΩ)" |
| r"|(?:ΩΩ|Ω
Ψ§\s+ΩΩΨΉ)\s+(?:Ψ³ΩΨ±Ψ©|Ψ³ΩΨ±Ω)\s+[\u0600-\u06FF\s]+\s+(?:Ω
ΩΩΨ©|Ω
Ψ―ΩΩΨ©)" |
| ) |
| _SURAH_NAME_AR = re.compile( |
| r"(?:Ψ³ΩΨ±Ψ©|Ψ³ΩΨ±Ω)\s+([\u0600-\u06FF\u0750-\u077F\s]+)" |
| ) |
| _SURAH_NAME_EN = re.compile( |
| r"\bsurah?\s+([a-zA-Z'\-]+(?:[\s\-][a-zA-Z'\-]+)*)", |
| re.I, |
| ) |
|
|
|
|
| def _extract_surah_name(query: str) -> Optional[str]: |
| """Extract surah name from a query string.""" |
| for pat in (_SURAH_NAME_AR, _SURAH_NAME_EN): |
| m = pat.search(query) |
| if m: |
| name = m.group(1).strip() |
| name = re.sub(r'[\sΨ?!]+$', '', name) |
| name = re.sub(r'\s+(ΩΩ
|ΨΉΨ―Ψ―|ΩΩ|Ω
Ψ§|ΩΩ|ΩΩ)$', '', name) |
| if name: |
| return name |
| return None |
|
|
|
|
| |
| |
| |
| async def detect_surah_info(query: str, rewrite: dict) -> Optional[dict]: |
| """Detect if query asks about surah metadata (verse count, type, etc.).""" |
| is_verse_q = bool(_SURAH_VERSES_AR.search(query) or _SURAH_VERSES_EN.search(query)) |
| is_type_q = bool(_SURAH_TYPE_AR.search(query)) |
|
|
| if not (is_verse_q or is_type_q): |
| if rewrite.get("intent") == "surah_info": |
| is_verse_q = True |
| elif rewrite.get("intent") == "count": |
| kw_text = " ".join(rewrite.get("keywords", [])) |
| if any(w in kw_text for w in ("Ψ’ΩΨ§Ψͺ", "Ψ’ΩΨ©", "verses", "ayat")): |
| is_verse_q = True |
| else: |
| return None |
| else: |
| return None |
|
|
| surah_name = _extract_surah_name(query) |
| if not surah_name: |
| return None |
|
|
| return { |
| "surah_query": surah_name, |
| "query_type": "verses" if is_verse_q else "type", |
| } |
|
|
|
|
| async def lookup_surah_info(surah_query: str, dataset: list) -> Optional[dict]: |
| """Look up surah metadata from dataset entries.""" |
| query_norm = normalize_arabic(surah_query, aggressive=True).lower() |
| query_clean = re.sub(r"^(Ψ§Ω|al[\-\s']*)", "", query_norm, flags=re.I).strip() |
|
|
| for item in dataset: |
| if item.get("type") != "quran": |
| continue |
| for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"): |
| val = item.get(field, "") |
| if not val: |
| continue |
| val_norm = normalize_arabic(val, aggressive=True).lower() |
| val_clean = re.sub(r"^(Ψ§Ω|al[\-\s']*)", "", val_norm, flags=re.I).strip() |
| if (query_norm in val_norm or val_norm in query_norm |
| or (query_clean and val_clean |
| and (query_clean in val_clean or val_clean in query_clean)) |
| or (query_clean and query_clean in val_norm)): |
| return { |
| "surah_number": item.get("surah_number"), |
| "surah_name_ar": item.get("surah_name_ar", ""), |
| "surah_name_en": item.get("surah_name_en", ""), |
| "surah_name_transliteration": item.get("surah_name_transliteration", ""), |
| "total_verses": item.get("total_verses"), |
| "revelation_type": item.get("revelation_type", ""), |
| } |
| return None |
|
|
|
|
| |
| |
| |
| async def detect_analysis_intent(query: str, rewrite: dict) -> Optional[str]: |
| """Detect if query is asking for word frequency analysis.""" |
| if (_SURAH_VERSES_AR.search(query) or _SURAH_VERSES_EN.search(query) |
| or _SURAH_TYPE_AR.search(query) |
| or rewrite.get("intent") == "surah_info"): |
| return None |
|
|
| if rewrite.get("intent") == "count": |
| kws = rewrite.get("keywords", []) |
| kw_text = " ".join(kws) |
| if any(w in kw_text for w in ("Ψ’ΩΨ§Ψͺ", "Ψ’ΩΨ©", "verses", "ayat")): |
| return None |
| |
| if kws: |
| return kws[0] |
| |
| keyword = _extract_count_keyword(query) |
| return keyword |
|
|
| if not (_COUNT_EN.search(query) or _COUNT_AR.search(query)): |
| return None |
|
|
| keyword = _extract_count_keyword(query) |
| return keyword |
|
|
|
|
| def _extract_count_keyword(query: str) -> Optional[str]: |
| """Extract the keyword being counted from various question patterns.""" |
| |
| ar_patterns = [ |
| re.compile(r"(?:ΩΩΩ
Ψ©|ΩΩΨΈ|ΩΩΨΈΨ©)\s+([\u0600-\u06FF\u0750-\u077F]+)"), |
| re.compile(r"(?:Ψ°[Ω]?ΩΨ±(?:Ψͺ|)|ΩΨ±Ψ―Ψͺ?|ΨͺΩΨ±Ψ±(?:Ψͺ|))\s+(?:ΩΩΩ
Ψ©\s+)?([\u0600-\u06FF\u0750-\u077F]+)"), |
| re.compile(r"(?:ΨΉΨ―Ψ―\s+Ω
Ψ±Ψ§Ψͺ\s+(?:Ψ°ΩΨ±|ΩΨ±ΩΨ―))\s+(?:ΩΩΩ
Ψ©\s+)?([\u0600-\u06FF\u0750-\u077F]+)"), |
| re.compile(r"ΩΩ
\s+(?:Ω
Ψ±Ψ©|Ω
Ψ±Ω)\s+(?:Ψ°[Ω]?ΩΨ±(?:Ψͺ|)|ΩΨ±Ψ―Ψͺ?)\s+(?:ΩΩΩ
Ψ©\s+)?([\u0600-\u06FF\u0750-\u077F]+)"), |
| ] |
| for pat in ar_patterns: |
| m = pat.search(query) |
| if m: |
| word = m.group(1).strip() |
| |
| if word not in ("ΩΩ", "Ω
Ω", "ΨΉΩ", "Ψ₯ΩΩ", "ΨΉΩΩ", "ΩΩ", "Ω
Ψ§", "ΩΩ
"): |
| return word |
|
|
| |
| en_patterns = [ |
| re.compile(r"(?:word|term)\s+['\"]?(\w+)['\"]?", re.I), |
| re.compile(r"(?:times?\s+(?:is|does|has)\s+)(\w+)", re.I), |
| re.compile(r"(?:occurrences?\s+of|frequency\s+of|count\s+of)\s+['\"]?(\w+)['\"]?", re.I), |
| re.compile(r"(?:mentioned|appear[s]?|occur[s]?)\s+.*?['\"]?(\w+)['\"]?\s+(?:in|throughout)", re.I), |
| re.compile(r"(?:how many times)\s+(?:is\s+)?['\"]?(\w+)['\"]?", re.I), |
| ] |
| for pat in en_patterns: |
| m = pat.search(query) |
| if m: |
| word = m.group(1).strip() |
| if word.lower() not in ("the", "a", "an", "in", "of", "is", "are", "was", "how", "many", "quran"): |
| return word |
|
|
| |
| for pat in (_COUNT_EN, _COUNT_AR): |
| m = pat.search(query) |
| if m: |
| tail = query[m.end():].strip().split() |
| for word in tail: |
| clean = re.sub(r"[Ψ?!.,Ψ]", "", word).strip() |
| if clean and clean.lower() not in ( |
| "ΩΩ", "Ω
Ω", "ΨΉΩ", "Ψ§ΩΩΨ±Ψ’Ω", "Ψ§ΩΩΨ±ΩΩ
", "the", "quran", "in", "of", |
| "ΩΩΩ
Ψ©", "ΩΩΨΈ", "word", "term", |
| ): |
| return clean |
| return None |
|
|
|
|
| |
| |
| |
| async def count_occurrences( |
| keyword: str, |
| dataset: list, |
| source_type: Optional[str] = "quran", |
| ) -> dict: |
| """Count keyword occurrences with surah/collection grouping.""" |
| cached = await analysis_cache.get(keyword, source_type or "all") |
| if cached: |
| return cached |
|
|
| kw_norm = normalize_arabic(keyword, aggressive=True).lower() |
| kw_stem = light_stem(kw_norm) |
| count = 0 |
| by_surah: Dict[int, Dict] = {} |
| examples: list = [] |
|
|
| for item in dataset: |
| if source_type and item.get("type") != source_type: |
| continue |
|
|
| ar_norm = normalize_arabic(item.get("arabic", ""), aggressive=True).lower() |
| combined = f"{ar_norm} {item.get('english', '')}".lower() |
| exact = combined.count(kw_norm) |
| stemmed = combined.count(kw_stem) - exact if kw_stem != kw_norm else 0 |
| occ = exact + stemmed |
|
|
| if occ > 0: |
| count += occ |
| surah_num = item.get("surah_number", 0) |
| if surah_num not in by_surah: |
| by_surah[surah_num] = { |
| "name": item.get("surah_name_en", f"Surah {surah_num}"), |
| "count": 0, |
| } |
| by_surah[surah_num]["count"] += occ |
|
|
| if len(examples) < cfg.MAX_EXAMPLES: |
| examples.append({ |
| "reference": item.get("source", ""), |
| "arabic": item.get("arabic", ""), |
| "english": item.get("english", ""), |
| }) |
|
|
| result = { |
| "keyword": keyword, |
| "kw_stemmed": kw_stem, |
| "total_count": count, |
| "by_surah": dict(sorted(by_surah.items())), |
| "examples": examples, |
| } |
| await analysis_cache.set(result, keyword, source_type or "all") |
| return result |
|
|
|
|
| |
| |
| |
| def get_quran_analytics(dataset: list) -> dict: |
| """Compute aggregate Quran statistics from dataset.""" |
| surahs: Dict[int, dict] = {} |
| total_verses = 0 |
|
|
| for item in dataset: |
| if item.get("type") != "quran": |
| continue |
| total_verses += 1 |
| sn = item.get("surah_number", 0) |
| if sn not in surahs: |
| surahs[sn] = { |
| "surah_number": sn, |
| "surah_name_ar": item.get("surah_name_ar", ""), |
| "surah_name_en": item.get("surah_name_en", ""), |
| "surah_name_transliteration": item.get("surah_name_transliteration", ""), |
| "revelation_type": item.get("revelation_type", ""), |
| "total_verses": item.get("total_verses", 0), |
| "verses_in_dataset": 0, |
| } |
| surahs[sn]["verses_in_dataset"] += 1 |
|
|
| meccan = sum(1 for s in surahs.values() if s.get("revelation_type", "").lower() == "meccan") |
| medinan = sum(1 for s in surahs.values() if s.get("revelation_type", "").lower() == "medinan") |
|
|
| return { |
| "total_verses_in_dataset": total_verses, |
| "total_surahs": len(surahs), |
| "meccan_surahs": meccan, |
| "medinan_surahs": medinan, |
| "surahs": [surahs[k] for k in sorted(surahs)], |
| } |
|
|
|
|
| def get_hadith_analytics(dataset: list) -> dict: |
| """Compute aggregate Hadith statistics from dataset.""" |
| collections: Dict[str, dict] = {} |
| grades: Dict[str, int] = {} |
| total = 0 |
|
|
| for item in dataset: |
| if item.get("type") != "hadith": |
| continue |
| total += 1 |
|
|
| col = item.get("collection", "Unknown") |
| if col not in collections: |
| collections[col] = {"collection": col, "count": 0, "grades": {}} |
| collections[col]["count"] += 1 |
|
|
| grade = item.get("grade", "Ungraded") |
| grades[grade] = grades.get(grade, 0) + 1 |
| collections[col]["grades"][grade] = collections[col]["grades"].get(grade, 0) + 1 |
|
|
| return { |
| "total_hadiths": total, |
| "collections": sorted(collections.values(), key=lambda c: c["count"], reverse=True), |
| "grade_summary": dict(sorted(grades.items(), key=lambda x: x[1], reverse=True)), |
| } |
|
|
|
|
| def get_chapter_info(chapter_number: int, dataset: list) -> Optional[dict]: |
| """Get all verses and metadata for a specific surah/chapter.""" |
| verses = [] |
| meta = None |
|
|
| for item in dataset: |
| if item.get("type") != "quran": |
| continue |
| if item.get("surah_number") != chapter_number: |
| continue |
| if meta is None: |
| meta = { |
| "surah_number": item.get("surah_number"), |
| "surah_name_ar": item.get("surah_name_ar", ""), |
| "surah_name_en": item.get("surah_name_en", ""), |
| "surah_name_transliteration": item.get("surah_name_transliteration", ""), |
| "revelation_type": item.get("revelation_type", ""), |
| "total_verses": item.get("total_verses", 0), |
| } |
| verses.append({ |
| "ayah": item.get("ayah_number") or item.get("verse_number"), |
| "arabic": item.get("arabic", ""), |
| "english": item.get("english", ""), |
| "source": item.get("source", ""), |
| }) |
|
|
| if not meta: |
| return None |
|
|
| verses.sort(key=lambda v: v.get("ayah") or 0) |
| return {**meta, "verses": verses} |
|
|
|
|
| def get_verse(surah: int, ayah: int, dataset: list) -> Optional[dict]: |
| """Get a specific verse by surah and ayah number.""" |
| for item in dataset: |
| if item.get("type") != "quran": |
| continue |
| if item.get("surah_number") != surah: |
| continue |
| item_ayah = item.get("ayah_number") or item.get("verse_number") |
| if item_ayah == ayah: |
| return { |
| "surah_number": item.get("surah_number"), |
| "surah_name_ar": item.get("surah_name_ar", ""), |
| "surah_name_en": item.get("surah_name_en", ""), |
| "surah_name_transliteration": item.get("surah_name_transliteration", ""), |
| "ayah": item_ayah, |
| "arabic": item.get("arabic", ""), |
| "english": item.get("english", ""), |
| "transliteration": item.get("transliteration", ""), |
| "tafsir_en": item.get("tafsir_en", ""), |
| "tafsir_ar": item.get("tafsir_ar", ""), |
| "source": item.get("source", ""), |
| "revelation_type": item.get("revelation_type", ""), |
| } |
| return None |
|
|