""" MathLingua — Question Database Module Loads the question_database.json file and provides indexing, selection, and readability validation utilities. 130 hand-crafted math word problems across 15 sub-levels (1.1–3.5). Difficulty is LINGUISTIC (readability), not mathematical. Each question includes 4 scaffold levels (L1–L4) and readability metrics. Distribution: Levels 1.1–3.1 have 10 questions each; Levels 3.2–3.5 have 5 each. Total: 11×10 + 4×5 = 130 questions. Reference: MathLingua Technical Specification §4 """ from __future__ import annotations import json import os import random from dataclasses import dataclass, asdict from typing import Optional try: import textstat HAS_TEXTSTAT = True except ImportError: HAS_TEXTSTAT = False # ──────────────────────────────────────────────────────── # Constants # ──────────────────────────────────────────────────────── LEVELS = [ "1.1", "1.2", "1.3", "1.4", "1.5", "2.1", "2.2", "2.3", "2.4", "2.5", "3.1", "3.2", "3.3", "3.4", "3.5", ] LEVEL_TO_ELO = { "1.1": 820, "1.2": 870, "1.3": 920, "1.4": 970, "1.5": 1020, "2.1": 1070, "2.2": 1120, "2.3": 1170, "2.4": 1220, "2.5": 1270, "3.1": 1320, "3.2": 1370, "3.3": 1420, "3.4": 1470, "3.5": 1520, } DB_PATH = os.path.join(os.path.dirname(__file__), "question_database.json") # ──────────────────────────────────────────────────────── # Question dataclass # ──────────────────────────────────────────────────────── @dataclass class Question: id: str level: str topic: str subtopic: str grade: int problem_text: str answer: str answer_numeric: float solution_steps: list[str] scaffolds: dict[str, str] readability: dict[str, float] elo_rating: int metadata: dict[str, str] # ──────────────────────────────────────────────────────── # QuestionDatabase # ──────────────────────────────────────────────────────── class QuestionDatabase: """ Manages the question pool with indexing by level, topic, and Elo range. Usage: db = QuestionDatabase() # auto-loads from question_database.json db = QuestionDatabase(path="custom.json") questions = db.get_by_level("2.1") q = db.select_question("2.1", topic="fractions", exclude_ids=["2.1.03"]) stats = db.level_stats("2.1") """ def __init__(self, path: Optional[str] = None): self.path = path or DB_PATH self.questions: list[Question] = [] self._by_id: dict[str, Question] = {} self._by_level: dict[str, list[Question]] = {l: [] for l in LEVELS} self._by_topic: dict[str, list[Question]] = {} self._load() def _load(self): """Load questions from JSON file.""" if not os.path.exists(self.path): raise FileNotFoundError( f"Question database not found at: {self.path}\n" f"Run this module directly to generate it, or provide a valid path." ) with open(self.path, "r", encoding="utf-8") as f: raw = json.load(f) for entry in raw: q = Question( id=entry["id"], level=entry["level"], topic=entry["topic"], subtopic=entry["subtopic"], grade=entry["grade"], problem_text=entry["problem_text"], answer=entry["answer"], answer_numeric=entry["answer_numeric"], solution_steps=entry["solution_steps"], scaffolds=entry["scaffolds"], readability=entry.get("readability", {}), elo_rating=entry["elo_rating"], metadata=entry.get("metadata", {"source": "curated", "created_at": "2026-04-27"}), ) self.questions.append(q) self._by_id[q.id] = q self._by_level[q.level].append(q) if q.topic not in self._by_topic: self._by_topic[q.topic] = [] self._by_topic[q.topic].append(q) def __len__(self) -> int: return len(self.questions) def get_by_id(self, question_id: str) -> Optional[Question]: """Get a question by its ID.""" return self._by_id.get(question_id) def get_by_level(self, level: str) -> list[Question]: """Get all questions at a given level.""" return self._by_level.get(level, []) def get_by_topic(self, topic: str) -> list[Question]: """Get all questions for a given topic.""" return self._by_topic.get(topic, []) def select_question( self, level: str, topic: Optional[str] = None, exclude_ids: Optional[set[str]] = None, ) -> Optional[Question]: """ Select a random question at the given level, optionally filtered by topic. Args: level: Target sub-level (e.g., "2.1") topic: Optional topic filter (e.g., "fractions") exclude_ids: Set of question IDs to exclude (recently served) Returns: Question or None if no match found """ candidates = self._by_level.get(level, []) if topic: candidates = [q for q in candidates if q.topic == topic] if exclude_ids: candidates = [q for q in candidates if q.id not in exclude_ids] if not candidates: return None return random.choice(candidates) def select_batch( self, level_distribution: dict[str, int], exclude_ids: Optional[set[str]] = None, topic_weights: Optional[dict[str, float]] = None, ) -> list[Question]: """ Select a batch of questions according to level distribution. Args: level_distribution: {level: count} e.g., {"2.1": 5, "2.2": 8, "2.3": 5, "2.4": 2} exclude_ids: Questions to exclude topic_weights: Optional topic preference weights (favor weaker topics) Returns: List of selected questions """ exclude = exclude_ids or set() batch = [] for level, count in level_distribution.items(): candidates = [q for q in self._by_level.get(level, []) if q.id not in exclude] if topic_weights: # Weight candidates by topic preference weighted = [] for q in candidates: w = topic_weights.get(q.topic, 1.0) weighted.append((q, w)) # Weighted sample if weighted: questions_only = [qw[0] for qw in weighted] weights_only = [qw[1] for qw in weighted] selected = random.choices(questions_only, weights=weights_only, k=min(count, len(candidates))) batch.extend(selected) else: selected = random.sample(candidates, min(count, len(candidates))) batch.extend(selected) random.shuffle(batch) return batch def level_stats(self, level: str) -> dict: """Get statistics for a level's questions.""" questions = self._by_level.get(level, []) if not questions: return {"count": 0} fk_scores = [q.readability.get("flesch_kincaid", 0) for q in questions if q.readability] word_counts = [q.readability.get("word_count", 0) for q in questions if q.readability] diff_words = [q.readability.get("difficult_words", 0) for q in questions if q.readability] return { "count": len(questions), "topics": list(set(q.topic for q in questions)), "avg_fk": round(sum(fk_scores) / max(len(fk_scores), 1), 2), "avg_words": round(sum(word_counts) / max(len(word_counts), 1), 1), "avg_difficult_words": round(sum(diff_words) / max(len(diff_words), 1), 1), "elo_range": (min(q.elo_rating for q in questions), max(q.elo_rating for q in questions)), } def compute_readability(self, text: str) -> dict[str, float]: """Compute readability metrics for a problem text using textstat.""" if not HAS_TEXTSTAT: return {"error": "textstat not installed"} return { "flesch_kincaid": round(textstat.flesch_kincaid_grade(text), 2), "word_count": textstat.lexicon_count(text, removepunct=True), "difficult_words": textstat.difficult_words(text), "avg_syllables_per_word": round( textstat.syllable_count(text) / max(textstat.lexicon_count(text, removepunct=True), 1), 3 ), } def validate_all(self) -> dict: """Validate the full database: check counts, readability ordering, etc.""" results = { "total_questions": len(self.questions), "expected_total": 130, "level_counts": {}, "level_stats": {}, "monotonic_fk": True, "issues": [], } expected_counts = {l: 10 for l in LEVELS} for l in ["3.2", "3.3", "3.4", "3.5"]: expected_counts[l] = 5 prev_fk = 0.0 for level in LEVELS: count = len(self._by_level[level]) results["level_counts"][level] = count stats = self.level_stats(level) results["level_stats"][level] = stats if count != expected_counts[level]: results["issues"].append( f"Level {level}: expected {expected_counts[level]} questions, got {count}" ) if stats.get("avg_fk", 0) < prev_fk: results["monotonic_fk"] = False results["issues"].append( f"Level {level}: FK grade {stats.get('avg_fk')} is less than previous {prev_fk}" ) prev_fk = stats.get("avg_fk", 0) results["valid"] = len(results["issues"]) == 0 return results def to_dict_list(self) -> list[dict]: """Export all questions as a list of dicts (for JSON serialization).""" return [asdict(q) for q in self.questions] def summary(self) -> str: """Print a summary table of the database.""" lines = [ "MathLingua Question Database Summary", "=" * 60, f"Total questions: {len(self.questions)}", f"Topics: {sorted(self._by_topic.keys())}", "", f"{'Level':<8}{'Count':<8}{'Avg FK':<10}{'Avg Words':<12}{'Topics':<30}", "-" * 60, ] for level in LEVELS: stats = self.level_stats(level) lines.append( f"{level:<8}{stats['count']:<8}{stats.get('avg_fk', 'N/A'):<10}" f"{stats.get('avg_words', 'N/A'):<12}{', '.join(stats.get('topics', [])):<30}" ) return "\n".join(lines) # ──────────────────────────────────────────────────────── # Main # ──────────────────────────────────────────────────────── if __name__ == "__main__": try: db = QuestionDatabase() print(db.summary()) print("\n") validation = db.validate_all() print(f"Validation: {'PASS ✓' if validation['valid'] else 'FAIL ✗'}") if validation["issues"]: for issue in validation["issues"]: print(f" ⚠ {issue}") except FileNotFoundError as e: print(f"Database file not found: {e}") print("The question_database.json file should be in the same directory.")