QModel / app /analysis.py
aelgendy's picture
Upload folder using huggingface_hub
6ab1c8e
"""Quran / Hadith analytics β€” occurrence counting, surah metadata, dataset stats."""
from __future__ import annotations
import re
from typing import Dict, List, Literal, Optional
from app.arabic_nlp import light_stem, normalize_arabic, tokenize_ar
from app.cache import analysis_cache
from app.config import cfg
# ═══════════════════════════════════════════════════════════════════════
# INTENT DETECTION β€” frequency / surah info queries
# ═══════════════════════════════════════════════════════════════════════
_COUNT_EN = re.compile(
r"\b(how many|count|number of|frequency|occurrences? of|how often|"
r"times? (does|is|appears?))\b",
re.I,
)
_COUNT_AR = re.compile(
r"(ΩƒΩ… Ω…Ψ±Ψ©|ΩƒΩ… ΨΉΨ―Ψ―|ΩƒΩ… ΨͺΩƒΨ±Ψ±|ΨΉΨ―Ψ― Ω…Ψ±Ψ§Ψͺ|ΨͺΩƒΨ±Ψ§Ψ±|ΩƒΩ… ذُكر|ΩƒΩ… وردΨͺ?)"
)
_SURAH_VERSES_AR = re.compile(
r"ΩƒΩ…\s+(?:ΨΉΨ―Ψ―\s+)?ؒياΨͺ?\s*(?:في\s+|فى\s+)?(?:سورة|Ψ³ΩˆΨ±Ω‡)"
r"|ΨΉΨ―Ψ―\s+ؒياΨͺ?\s+(?:سورة|Ψ³ΩˆΨ±Ω‡)"
r"|ΩƒΩ…\s+ؒية\s+(?:في|فى)\s+(?:سورة|Ψ³ΩˆΨ±Ω‡)"
r"|(?:سورة|Ψ³ΩˆΨ±Ω‡)\s+[\u0600-\u06FF\s]+\s+(?:ΩƒΩ…\s+ؒية|ΨΉΨ―Ψ―\s+ؒياΨͺ?)"
)
_SURAH_VERSES_EN = re.compile(
r"(?:how many|number of)\s+(?:verses?|ayat|ayahs?)\s+(?:in|of|does)\b"
r"|\bsurah?\b.*\b(?:how many|number of)\s+(?:verses?|ayat|ayahs?)",
re.I,
)
_SURAH_TYPE_AR = re.compile(
r"(?:سورة|Ψ³ΩˆΨ±Ω‡)\s+[\u0600-\u06FF\s]+\s+(?:Ω…ΩƒΩŠΨ©|Ω…Ψ―Ω†ΩŠΨ©|Ω…ΩƒΩŠ|Ω…Ψ―Ω†ΩŠ)"
r"|(?:Ω‡Ω„|Ω…Ψ§\s+Ω†ΩˆΨΉ)\s+(?:سورة|Ψ³ΩˆΨ±Ω‡)\s+[\u0600-\u06FF\s]+\s+(?:Ω…ΩƒΩŠΨ©|Ω…Ψ―Ω†ΩŠΨ©)"
)
_SURAH_NAME_AR = re.compile(
r"(?:سورة|Ψ³ΩˆΨ±Ω‡)\s+([\u0600-\u06FF\u0750-\u077F\s]+)"
)
_SURAH_NAME_EN = re.compile(
r"\bsurah?\s+([a-zA-Z'\-]+(?:[\s\-][a-zA-Z'\-]+)*)",
re.I,
)
def _extract_surah_name(query: str) -> Optional[str]:
"""Extract surah name from a query string."""
for pat in (_SURAH_NAME_AR, _SURAH_NAME_EN):
m = pat.search(query)
if m:
name = m.group(1).strip()
name = re.sub(r'[\s؟?!]+$', '', name)
name = re.sub(r'\s+(ΩƒΩ…|ΨΉΨ―Ψ―|Ω‡Ω„|Ω…Ψ§|في|فى)$', '', name)
if name:
return name
return None
# ═══════════════════════════════════════════════════════════════════════
# SURAH INFO DETECTION & LOOKUP
# ═══════════════════════════════════════════════════════════════════════
async def detect_surah_info(query: str, rewrite: dict) -> Optional[dict]:
"""Detect if query asks about surah metadata (verse count, type, etc.)."""
is_verse_q = bool(_SURAH_VERSES_AR.search(query) or _SURAH_VERSES_EN.search(query))
is_type_q = bool(_SURAH_TYPE_AR.search(query))
if not (is_verse_q or is_type_q):
if rewrite.get("intent") == "surah_info":
is_verse_q = True
elif rewrite.get("intent") == "count":
kw_text = " ".join(rewrite.get("keywords", []))
if any(w in kw_text for w in ("ؒياΨͺ", "ؒية", "verses", "ayat")):
is_verse_q = True
else:
return None
else:
return None
surah_name = _extract_surah_name(query)
if not surah_name:
return None
return {
"surah_query": surah_name,
"query_type": "verses" if is_verse_q else "type",
}
async def lookup_surah_info(surah_query: str, dataset: list) -> Optional[dict]:
"""Look up surah metadata from dataset entries."""
query_norm = normalize_arabic(surah_query, aggressive=True).lower()
query_clean = re.sub(r"^(Ψ§Ω„|al[\-\s']*)", "", query_norm, flags=re.I).strip()
for item in dataset:
if item.get("type") != "quran":
continue
for field in ("surah_name_ar", "surah_name_en", "surah_name_transliteration"):
val = item.get(field, "")
if not val:
continue
val_norm = normalize_arabic(val, aggressive=True).lower()
val_clean = re.sub(r"^(Ψ§Ω„|al[\-\s']*)", "", val_norm, flags=re.I).strip()
if (query_norm in val_norm or val_norm in query_norm
or (query_clean and val_clean
and (query_clean in val_clean or val_clean in query_clean))
or (query_clean and query_clean in val_norm)):
return {
"surah_number": item.get("surah_number"),
"surah_name_ar": item.get("surah_name_ar", ""),
"surah_name_en": item.get("surah_name_en", ""),
"surah_name_transliteration": item.get("surah_name_transliteration", ""),
"total_verses": item.get("total_verses"),
"revelation_type": item.get("revelation_type", ""),
}
return None
# ═══════════════════════════════════════════════════════════════════════
# ANALYSIS INTENT (word frequency detection)
# ═══════════════════════════════════════════════════════════════════════
async def detect_analysis_intent(query: str, rewrite: dict) -> Optional[str]:
"""Detect if query is asking for word frequency analysis."""
if (_SURAH_VERSES_AR.search(query) or _SURAH_VERSES_EN.search(query)
or _SURAH_TYPE_AR.search(query)
or rewrite.get("intent") == "surah_info"):
return None
if rewrite.get("intent") == "count":
kws = rewrite.get("keywords", [])
kw_text = " ".join(kws)
if any(w in kw_text for w in ("ؒياΨͺ", "ؒية", "verses", "ayat")):
return None
# The rewriter is instructed to put the target word as first keyword
if kws:
return kws[0]
# Fallback: extract from query
keyword = _extract_count_keyword(query)
return keyword
if not (_COUNT_EN.search(query) or _COUNT_AR.search(query)):
return None
keyword = _extract_count_keyword(query)
return keyword
def _extract_count_keyword(query: str) -> Optional[str]:
"""Extract the keyword being counted from various question patterns."""
# Arabic patterns: ΩƒΩ… Ω…Ψ±Ψ© Ψ°ΩƒΨ±Ψͺ ΩƒΩ„Ω…Ψ© X / ΩƒΩ… Ω…Ψ±Ψ© وردΨͺ X / ΨΉΨ―Ψ― Ω…Ψ±Ψ§Ψͺ Ψ°ΩƒΨ± X
ar_patterns = [
re.compile(r"(?:ΩƒΩ„Ω…Ψ©|لفظ|لفظة)\s+([\u0600-\u06FF\u0750-\u077F]+)"),
re.compile(r"(?:Ψ°[ُ]?ΩƒΨ±(?:Ψͺ|)|وردΨͺ?|ΨͺΩƒΨ±Ψ±(?:Ψͺ|))\s+(?:ΩƒΩ„Ω…Ψ©\s+)?([\u0600-\u06FF\u0750-\u077F]+)"),
re.compile(r"(?:ΨΉΨ―Ψ―\s+Ω…Ψ±Ψ§Ψͺ\s+(?:Ψ°ΩƒΨ±|ورود))\s+(?:ΩƒΩ„Ω…Ψ©\s+)?([\u0600-\u06FF\u0750-\u077F]+)"),
re.compile(r"ΩƒΩ…\s+(?:Ω…Ψ±Ψ©|Ω…Ψ±Ω‡)\s+(?:Ψ°[ُ]?ΩƒΨ±(?:Ψͺ|)|وردΨͺ?)\s+(?:ΩƒΩ„Ω…Ψ©\s+)?([\u0600-\u06FF\u0750-\u077F]+)"),
]
for pat in ar_patterns:
m = pat.search(query)
if m:
word = m.group(1).strip()
# Skip common non-keyword words
if word not in ("في", "Ω…Ω†", "ΨΉΩ†", "Ψ₯Ω„Ω‰", "ΨΉΩ„Ω‰", "Ω‡Ω„", "Ω…Ψ§", "ΩƒΩ…"):
return word
# English patterns: how many times is X mentioned / count of X / occurrences of X
en_patterns = [
re.compile(r"(?:word|term)\s+['\"]?(\w+)['\"]?", re.I),
re.compile(r"(?:times?\s+(?:is|does|has)\s+)(\w+)", re.I),
re.compile(r"(?:occurrences?\s+of|frequency\s+of|count\s+of)\s+['\"]?(\w+)['\"]?", re.I),
re.compile(r"(?:mentioned|appear[s]?|occur[s]?)\s+.*?['\"]?(\w+)['\"]?\s+(?:in|throughout)", re.I),
re.compile(r"(?:how many times)\s+(?:is\s+)?['\"]?(\w+)['\"]?", re.I),
]
for pat in en_patterns:
m = pat.search(query)
if m:
word = m.group(1).strip()
if word.lower() not in ("the", "a", "an", "in", "of", "is", "are", "was", "how", "many", "quran"):
return word
# Last resort: find the first meaningful word after count-related keywords
for pat in (_COUNT_EN, _COUNT_AR):
m = pat.search(query)
if m:
tail = query[m.end():].strip().split()
for word in tail:
clean = re.sub(r"[؟?!.,،]", "", word).strip()
if clean and clean.lower() not in (
"في", "Ω…Ω†", "ΨΉΩ†", "Ψ§Ω„Ω‚Ψ±Ψ’Ω†", "Ψ§Ω„ΩƒΨ±ΩŠΩ…", "the", "quran", "in", "of",
"ΩƒΩ„Ω…Ψ©", "لفظ", "word", "term",
):
return clean
return None
# ═══════════════════════════════════════════════════════════════════════
# OCCURRENCE COUNTING
# ═══════════════════════════════════════════════════════════════════════
async def count_occurrences(
keyword: str,
dataset: list,
source_type: Optional[str] = "quran",
) -> dict:
"""Count keyword occurrences with surah/collection grouping."""
cached = await analysis_cache.get(keyword, source_type or "all")
if cached:
return cached
kw_norm = normalize_arabic(keyword, aggressive=True).lower()
kw_stem = light_stem(kw_norm)
count = 0
by_surah: Dict[int, Dict] = {}
examples: list = []
for item in dataset:
if source_type and item.get("type") != source_type:
continue
ar_norm = normalize_arabic(item.get("arabic", ""), aggressive=True).lower()
combined = f"{ar_norm} {item.get('english', '')}".lower()
exact = combined.count(kw_norm)
stemmed = combined.count(kw_stem) - exact if kw_stem != kw_norm else 0
occ = exact + stemmed
if occ > 0:
count += occ
surah_num = item.get("surah_number", 0)
if surah_num not in by_surah:
by_surah[surah_num] = {
"name": item.get("surah_name_en", f"Surah {surah_num}"),
"count": 0,
}
by_surah[surah_num]["count"] += occ
if len(examples) < cfg.MAX_EXAMPLES:
examples.append({
"reference": item.get("source", ""),
"arabic": item.get("arabic", ""),
"english": item.get("english", ""),
})
result = {
"keyword": keyword,
"kw_stemmed": kw_stem,
"total_count": count,
"by_surah": dict(sorted(by_surah.items())),
"examples": examples,
}
await analysis_cache.set(result, keyword, source_type or "all")
return result
# ═══════════════════════════════════════════════════════════════════════
# DATASET ANALYTICS β€” aggregate statistics
# ═══════════════════════════════════════════════════════════════════════
def get_quran_analytics(dataset: list) -> dict:
"""Compute aggregate Quran statistics from dataset."""
surahs: Dict[int, dict] = {}
total_verses = 0
for item in dataset:
if item.get("type") != "quran":
continue
total_verses += 1
sn = item.get("surah_number", 0)
if sn not in surahs:
surahs[sn] = {
"surah_number": sn,
"surah_name_ar": item.get("surah_name_ar", ""),
"surah_name_en": item.get("surah_name_en", ""),
"surah_name_transliteration": item.get("surah_name_transliteration", ""),
"revelation_type": item.get("revelation_type", ""),
"total_verses": item.get("total_verses", 0),
"verses_in_dataset": 0,
}
surahs[sn]["verses_in_dataset"] += 1
meccan = sum(1 for s in surahs.values() if s.get("revelation_type", "").lower() == "meccan")
medinan = sum(1 for s in surahs.values() if s.get("revelation_type", "").lower() == "medinan")
return {
"total_verses_in_dataset": total_verses,
"total_surahs": len(surahs),
"meccan_surahs": meccan,
"medinan_surahs": medinan,
"surahs": [surahs[k] for k in sorted(surahs)],
}
def get_hadith_analytics(dataset: list) -> dict:
"""Compute aggregate Hadith statistics from dataset."""
collections: Dict[str, dict] = {}
grades: Dict[str, int] = {}
total = 0
for item in dataset:
if item.get("type") != "hadith":
continue
total += 1
col = item.get("collection", "Unknown")
if col not in collections:
collections[col] = {"collection": col, "count": 0, "grades": {}}
collections[col]["count"] += 1
grade = item.get("grade", "Ungraded")
grades[grade] = grades.get(grade, 0) + 1
collections[col]["grades"][grade] = collections[col]["grades"].get(grade, 0) + 1
return {
"total_hadiths": total,
"collections": sorted(collections.values(), key=lambda c: c["count"], reverse=True),
"grade_summary": dict(sorted(grades.items(), key=lambda x: x[1], reverse=True)),
}
def get_chapter_info(chapter_number: int, dataset: list) -> Optional[dict]:
"""Get all verses and metadata for a specific surah/chapter."""
verses = []
meta = None
for item in dataset:
if item.get("type") != "quran":
continue
if item.get("surah_number") != chapter_number:
continue
if meta is None:
meta = {
"surah_number": item.get("surah_number"),
"surah_name_ar": item.get("surah_name_ar", ""),
"surah_name_en": item.get("surah_name_en", ""),
"surah_name_transliteration": item.get("surah_name_transliteration", ""),
"revelation_type": item.get("revelation_type", ""),
"total_verses": item.get("total_verses", 0),
}
verses.append({
"ayah": item.get("ayah_number") or item.get("verse_number"),
"arabic": item.get("arabic", ""),
"english": item.get("english", ""),
"source": item.get("source", ""),
})
if not meta:
return None
verses.sort(key=lambda v: v.get("ayah") or 0)
return {**meta, "verses": verses}
def get_verse(surah: int, ayah: int, dataset: list) -> Optional[dict]:
"""Get a specific verse by surah and ayah number."""
for item in dataset:
if item.get("type") != "quran":
continue
if item.get("surah_number") != surah:
continue
item_ayah = item.get("ayah_number") or item.get("verse_number")
if item_ayah == ayah:
return {
"surah_number": item.get("surah_number"),
"surah_name_ar": item.get("surah_name_ar", ""),
"surah_name_en": item.get("surah_name_en", ""),
"surah_name_transliteration": item.get("surah_name_transliteration", ""),
"ayah": item_ayah,
"arabic": item.get("arabic", ""),
"english": item.get("english", ""),
"transliteration": item.get("transliteration", ""),
"tafsir_en": item.get("tafsir_en", ""),
"tafsir_ar": item.get("tafsir_ar", ""),
"source": item.get("source", ""),
"revelation_type": item.get("revelation_type", ""),
}
return None