"""Quran / Hadith analytics — occurrence counting, surah metadata, dataset stats.""" from __future__ import annotations import re from typing import Dict, List, Literal, Optional from app.arabic_nlp import light_stem, normalize_arabic, tokenize_ar from app.cache import analysis_cache from app.config import cfg # ═══════════════════════════════════════════════════════════════════════ # INTENT DETECTION — frequency / surah info queries # ═══════════════════════════════════════════════════════════════════════ _COUNT_EN = re.compile( r"\b(how many|count|number of|frequency|occurrences? of|how often|" r"times? (does|is|appears?))\b", re.I, ) _COUNT_AR = re.compile( r"(كم مرة|كم عدد|كم تكرر|عدد مرات|تكرار|كم ذُكر|كم وردت?)" ) _SURAH_VERSES_AR = re.compile( r"كم\s+(?:عدد\s+)?آيات?\s*(?:في\s+|فى\s+)?(?:سورة|سوره)" r"|عدد\s+آيات?\s+(?:سورة|سوره)" r"|كم\s+آية\s+(?:في|فى)\s+(?:سورة|سوره)" r"|(?:سورة|سوره)\s+[\u0600-\u06FF\s]+\s+(?:كم\s+آية|عدد\s+آيات?)" ) _SURAH_VERSES_EN = re.compile( r"(?:how many|number of)\s+(?:verses?|ayat|ayahs?)\s+(?:in|of|does)\b" r"|\bsurah?\b.*\b(?:how many|number of)\s+(?:verses?|ayat|ayahs?)", re.I, ) _SURAH_TYPE_AR = re.compile( r"(?:سورة|سوره)\s+[\u0600-\u06FF\s]+\s+(?:مكية|مدنية|مكي|مدني)" r"|(?:هل|ما\s+نوع)\s+(?:سورة|سوره)\s+[\u0600-\u06FF\s]+\s+(?:مكية|مدنية)" ) _SURAH_NAME_AR = re.compile( r"(?:سورة|سوره)\s+([\u0600-\u06FF\u0750-\u077F\s]+)" ) _SURAH_NAME_EN = re.compile( r"\bsurah?\s+([a-zA-Z'\-]+(?:[\s\-][a-zA-Z'\-]+)*)", re.I, ) def _extract_surah_name(query: str) -> Optional[str]: """Extract surah name from a query string.""" for pat in (_SURAH_NAME_AR, _SURAH_NAME_EN): m = pat.search(query) if m: name = m.group(1).strip() name = re.sub(r'[\s؟?!]+$', '', name) name = re.sub(r'\s+(كم|عدد|هل|ما|في|فى)$', '', name) if name: return name return None # ═══════════════════════════════════════════════════════════════════════ # SURAH INFO DETECTION & LOOKUP # ═══════════════════════════════════════════════════════════════════════ async def detect_surah_info(query: str, rewrite: dict) -> Optional[dict]: """Detect if query asks about surah metadata (verse count, type, etc.).""" is_verse_q = bool(_SURAH_VERSES_AR.search(query) or _SURAH_VERSES_EN.search(query)) is_type_q = bool(_SURAH_TYPE_AR.search(query)) if not (is_verse_q or is_type_q): if rewrite.get("intent") == "surah_info": is_verse_q = True elif rewrite.get("intent") == "count": kw_text = " ".join(rewrite.get("keywords", [])) if any(w in kw_text for w in ("آيات", "آية", "verses", "ayat")): is_verse_q = True else: return None else: return None surah_name = _extract_surah_name(query) if not surah_name: return None return { "surah_query": surah_name, "query_type": "verses" if is_verse_q else "type", } async def lookup_surah_info(surah_query: str, dataset: list) -> Optional[dict]: """Look up surah metadata from dataset entries.""" query_norm = normalize_arabic(surah_query, aggressive=True).lower() query_clean = re.sub(r"^(ال|al[\-\s']*)", "", query_norm, flags=re.I).strip() for item in dataset: if item.get("type") != "quran": continue for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"): val = item.get(field, "") if not val: continue val_norm = normalize_arabic(val, aggressive=True).lower() val_clean = re.sub(r"^(ال|al[\-\s']*)", "", val_norm, flags=re.I).strip() if (query_norm in val_norm or val_norm in query_norm or (query_clean and val_clean and (query_clean in val_clean or val_clean in query_clean)) or (query_clean and query_clean in val_norm)): return { "surah_number": item.get("surah_number"), "surah_name_ar": item.get("surah_name_ar", ""), "surah_name_en": item.get("surah_name_en", ""), "surah_name_transliteration": item.get("surah_name_transliteration", ""), "total_verses": item.get("total_verses"), "revelation_type": item.get("revelation_type", ""), } return None # ═══════════════════════════════════════════════════════════════════════ # ANALYSIS INTENT (word frequency detection) # ═══════════════════════════════════════════════════════════════════════ async def detect_analysis_intent(query: str, rewrite: dict) -> Optional[str]: """Detect if query is asking for word frequency analysis.""" if (_SURAH_VERSES_AR.search(query) or _SURAH_VERSES_EN.search(query) or _SURAH_TYPE_AR.search(query) or rewrite.get("intent") == "surah_info"): return None if rewrite.get("intent") == "count": kws = rewrite.get("keywords", []) kw_text = " ".join(kws) if any(w in kw_text for w in ("آيات", "آية", "verses", "ayat")): return None # The rewriter is instructed to put the target word as first keyword if kws: return kws[0] # Fallback: extract from query keyword = _extract_count_keyword(query) return keyword if not (_COUNT_EN.search(query) or _COUNT_AR.search(query)): return None keyword = _extract_count_keyword(query) return keyword def _extract_count_keyword(query: str) -> Optional[str]: """Extract the keyword being counted from various question patterns.""" # Arabic patterns: كم مرة ذكرت كلمة X / كم مرة وردت X / عدد مرات ذكر X ar_patterns = [ re.compile(r"(?:كلمة|لفظ|لفظة)\s+([\u0600-\u06FF\u0750-\u077F]+)"), re.compile(r"(?:ذ[ُ]?كر(?:ت|)|وردت?|تكرر(?:ت|))\s+(?:كلمة\s+)?([\u0600-\u06FF\u0750-\u077F]+)"), re.compile(r"(?:عدد\s+مرات\s+(?:ذكر|ورود))\s+(?:كلمة\s+)?([\u0600-\u06FF\u0750-\u077F]+)"), re.compile(r"كم\s+(?:مرة|مره)\s+(?:ذ[ُ]?كر(?:ت|)|وردت?)\s+(?:كلمة\s+)?([\u0600-\u06FF\u0750-\u077F]+)"), ] for pat in ar_patterns: m = pat.search(query) if m: word = m.group(1).strip() # Skip common non-keyword words if word not in ("في", "من", "عن", "إلى", "على", "هل", "ما", "كم"): return word # English patterns: how many times is X mentioned / count of X / occurrences of X en_patterns = [ re.compile(r"(?:word|term)\s+['\"]?(\w+)['\"]?", re.I), re.compile(r"(?:times?\s+(?:is|does|has)\s+)(\w+)", re.I), re.compile(r"(?:occurrences?\s+of|frequency\s+of|count\s+of)\s+['\"]?(\w+)['\"]?", re.I), re.compile(r"(?:mentioned|appear[s]?|occur[s]?)\s+.*?['\"]?(\w+)['\"]?\s+(?:in|throughout)", re.I), re.compile(r"(?:how many times)\s+(?:is\s+)?['\"]?(\w+)['\"]?", re.I), ] for pat in en_patterns: m = pat.search(query) if m: word = m.group(1).strip() if word.lower() not in ("the", "a", "an", "in", "of", "is", "are", "was", "how", "many", "quran"): return word # Last resort: find the first meaningful word after count-related keywords for pat in (_COUNT_EN, _COUNT_AR): m = pat.search(query) if m: tail = query[m.end():].strip().split() for word in tail: clean = re.sub(r"[؟?!.,،]", "", word).strip() if clean and clean.lower() not in ( "في", "من", "عن", "القرآن", "الكريم", "the", "quran", "in", "of", "كلمة", "لفظ", "word", "term", ): return clean return None # ═══════════════════════════════════════════════════════════════════════ # OCCURRENCE COUNTING # ═══════════════════════════════════════════════════════════════════════ async def count_occurrences( keyword: str, dataset: list, source_type: Optional[str] = "quran", ) -> dict: """Count keyword occurrences with surah/collection grouping.""" cached = await analysis_cache.get(keyword, source_type or "all") if cached: return cached kw_norm = normalize_arabic(keyword, aggressive=True).lower() kw_stem = light_stem(kw_norm) count = 0 by_surah: Dict[int, Dict] = {} examples: list = [] for item in dataset: if source_type and item.get("type") != source_type: continue ar_norm = normalize_arabic(item.get("arabic", ""), aggressive=True).lower() combined = f"{ar_norm} {item.get('english', '')}".lower() exact = combined.count(kw_norm) stemmed = combined.count(kw_stem) - exact if kw_stem != kw_norm else 0 occ = exact + stemmed if occ > 0: count += occ surah_num = item.get("surah_number", 0) if surah_num not in by_surah: by_surah[surah_num] = { "name": item.get("surah_name_en", f"Surah {surah_num}"), "count": 0, } by_surah[surah_num]["count"] += occ if len(examples) < cfg.MAX_EXAMPLES: examples.append({ "reference": item.get("source", ""), "arabic": item.get("arabic", ""), "english": item.get("english", ""), }) result = { "keyword": keyword, "kw_stemmed": kw_stem, "total_count": count, "by_surah": dict(sorted(by_surah.items())), "examples": examples, } await analysis_cache.set(result, keyword, source_type or "all") return result # ═══════════════════════════════════════════════════════════════════════ # DATASET ANALYTICS — aggregate statistics # ═══════════════════════════════════════════════════════════════════════ def get_quran_analytics(dataset: list) -> dict: """Compute aggregate Quran statistics from dataset.""" surahs: Dict[int, dict] = {} total_verses = 0 for item in dataset: if item.get("type") != "quran": continue total_verses += 1 sn = item.get("surah_number", 0) if sn not in surahs: surahs[sn] = { "surah_number": sn, "surah_name_ar": item.get("surah_name_ar", ""), "surah_name_en": item.get("surah_name_en", ""), "surah_name_transliteration": item.get("surah_name_transliteration", ""), "revelation_type": item.get("revelation_type", ""), "total_verses": item.get("total_verses", 0), "verses_in_dataset": 0, } surahs[sn]["verses_in_dataset"] += 1 meccan = sum(1 for s in surahs.values() if s.get("revelation_type", "").lower() == "meccan") medinan = sum(1 for s in surahs.values() if s.get("revelation_type", "").lower() == "medinan") return { "total_verses_in_dataset": total_verses, "total_surahs": len(surahs), "meccan_surahs": meccan, "medinan_surahs": medinan, "surahs": [surahs[k] for k in sorted(surahs)], } def get_hadith_analytics(dataset: list) -> dict: """Compute aggregate Hadith statistics from dataset.""" collections: Dict[str, dict] = {} grades: Dict[str, int] = {} total = 0 for item in dataset: if item.get("type") != "hadith": continue total += 1 col = item.get("collection", "Unknown") if col not in collections: collections[col] = {"collection": col, "count": 0, "grades": {}} collections[col]["count"] += 1 grade = item.get("grade", "Ungraded") grades[grade] = grades.get(grade, 0) + 1 collections[col]["grades"][grade] = collections[col]["grades"].get(grade, 0) + 1 return { "total_hadiths": total, "collections": sorted(collections.values(), key=lambda c: c["count"], reverse=True), "grade_summary": dict(sorted(grades.items(), key=lambda x: x[1], reverse=True)), } def get_chapter_info(chapter_number: int, dataset: list) -> Optional[dict]: """Get all verses and metadata for a specific surah/chapter.""" verses = [] meta = None for item in dataset: if item.get("type") != "quran": continue if item.get("surah_number") != chapter_number: continue if meta is None: meta = { "surah_number": item.get("surah_number"), "surah_name_ar": item.get("surah_name_ar", ""), "surah_name_en": item.get("surah_name_en", ""), "surah_name_transliteration": item.get("surah_name_transliteration", ""), "revelation_type": item.get("revelation_type", ""), "total_verses": item.get("total_verses", 0), } verses.append({ "ayah": item.get("ayah_number") or item.get("verse_number"), "arabic": item.get("arabic", ""), "english": item.get("english", ""), "source": item.get("source", ""), }) if not meta: return None verses.sort(key=lambda v: v.get("ayah") or 0) return {**meta, "verses": verses} def get_verse(surah: int, ayah: int, dataset: list) -> Optional[dict]: """Get a specific verse by surah and ayah number.""" for item in dataset: if item.get("type") != "quran": continue if item.get("surah_number") != surah: continue item_ayah = item.get("ayah_number") or item.get("verse_number") if item_ayah == ayah: return { "surah_number": item.get("surah_number"), "surah_name_ar": item.get("surah_name_ar", ""), "surah_name_en": item.get("surah_name_en", ""), "surah_name_transliteration": item.get("surah_name_transliteration", ""), "ayah": item_ayah, "arabic": item.get("arabic", ""), "english": item.get("english", ""), "transliteration": item.get("transliteration", ""), "tafsir_en": item.get("tafsir_en", ""), "tafsir_ar": item.get("tafsir_ar", ""), "source": item.get("source", ""), "revelation_type": item.get("revelation_type", ""), } return None