""" MathLingua — Adaptive Engine Hybrid adaptive algorithm combining: 1. Elo Rating — overall ability tracking with hint-weighted outcomes 2. Bayesian Knowledge Tracing (BKT) — per-topic mastery estimation 3. Thompson Sampling — intelligent question-level selection with ZPD windowing The orchestrator combines all three to produce progression decisions: SKIP (+2), INCREASE (+1), MAINTAIN (0), DECREASE (-1), RAPID_DECREASE (-2) Reference: MathLingua Technical Specification §6 """ from __future__ import annotations import math import random from dataclasses import dataclass, field from typing import Optional from feature_engineering import ( FeatureEngineer, EngineeredFeatures, InteractionSignals, ) # ──────────────────────────────────────────────────────── # Constants # ──────────────────────────────────────────────────────── LEVELS = [ "1.1", "1.2", "1.3", "1.4", "1.5", "2.1", "2.2", "2.3", "2.4", "2.5", "3.1", "3.2", "3.3", "3.4", "3.5", ] LEVEL_TO_ELO: dict[str, int] = { "1.1": 820, "1.2": 870, "1.3": 920, "1.4": 970, "1.5": 1020, "2.1": 1070, "2.2": 1120, "2.3": 1170, "2.4": 1220, "2.5": 1270, "3.1": 1320, "3.2": 1370, "3.3": 1420, "3.4": 1470, "3.5": 1520, } ELO_TO_LEVEL = sorted(LEVEL_TO_ELO.items(), key=lambda x: x[1]) TOPICS = ["arithmetic", "fractions", "percentages", "algebra", "geometry", "statistics"] INITIAL_STUDENT_ELO = 1000 # ──────────────────────────────────────────────────────── # Elo Engine # ──────────────────────────────────────────────────────── class EloEngine: """ Elo rating system adapted for education with hint-weighted outcomes. Weighted outcomes: 1.00 (no hint), 0.75 (L1), 0.50 (L2), 0.25 (L3), 0.00 (L4/incorrect) K-factor schedule: 48 (first 10), 32 (11–30), 24 (30+) """ def __init__(self): pass @staticmethod def expected_score(student_elo: float, question_elo: float) -> float: """E_s = 1 / (1 + 10^((R_q - R_s) / 400))""" return 1.0 / (1.0 + math.pow(10.0, (question_elo - student_elo) / 400.0)) @staticmethod def k_factor_student(interaction_count: int) -> float: if interaction_count <= 10: return 48.0 elif interaction_count <= 30: return 32.0 else: return 24.0 @staticmethod def k_factor_question(interaction_count: int) -> float: if interaction_count <= 10: return 8.0 elif interaction_count <= 30: return 6.0 else: return 4.0 def update( self, student_elo: float, question_elo: float, weighted_outcome: float, student_interactions: int, ) -> tuple[float, float]: """ Update student and question Elo ratings. Returns: (new_student_elo, new_question_elo) """ expected = self.expected_score(student_elo, question_elo) ks = self.k_factor_student(student_interactions) kq = self.k_factor_question(student_interactions) new_student = student_elo + ks * (weighted_outcome - expected) new_question = question_elo + kq * (expected - weighted_outcome) return round(new_student, 1), round(new_question, 1) # ──────────────────────────────────────────────────────── # Bayesian Knowledge Tracing (BKT) # ──────────────────────────────────────────────────────── @dataclass class BKTParams: """BKT parameters for one topic.""" p_know: float = 0.10 # P(L_0) — prior knowledge p_learn: float = 0.15 # P(T) — learn rate p_slip: float = 0.10 # P(S) — slip p_guess: float = 0.25 # P(G) — guess class BKTEngine: """ Bayesian Knowledge Tracing with slip adjustment for scaffold usage. P(S)_adj = P(S) × (1 + 0.5 × hint_depth_normalized) This makes BKT more skeptical of scaffold-assisted correctness. """ def __init__(self, topics: Optional[list[str]] = None): self.topics = topics or TOPICS self.params: dict[str, BKTParams] = { t: BKTParams() for t in self.topics } def get_mastery(self, topic: str) -> float: """Return P(know) for a topic.""" return self.params.get(topic, BKTParams()).p_know def update( self, topic: str, weighted_outcome: float, hint_depth_normalized: float, ) -> float: """ Update P(know) for a topic given an interaction outcome. Args: topic: Math topic string weighted_outcome: 0.0–1.0 hint-weighted outcome hint_depth_normalized: h_i / 4 (0.0–1.0) Returns: New P(know) """ if topic not in self.params: self.params[topic] = BKTParams() p = self.params[topic] # Adjust slip probability based on hint depth p_slip_adj = p.p_slip * (1.0 + 0.5 * hint_depth_normalized) p_slip_adj = min(p_slip_adj, 0.5) # cap at 0.5 # Determine if "correct" or "incorrect" for BKT purposes is_correct = weighted_outcome >= 0.5 if is_correct: # P(L_n | correct) = P(L) * (1-P(S)_adj) / [P(L)*(1-P(S)_adj) + (1-P(L))*P(G)] numerator = p.p_know * (1.0 - p_slip_adj) denominator = numerator + (1.0 - p.p_know) * p.p_guess else: # P(L_n | incorrect) = P(L) * P(S)_adj / [P(L)*P(S)_adj + (1-P(L))*(1-P(G))] numerator = p.p_know * p_slip_adj denominator = numerator + (1.0 - p.p_know) * (1.0 - p.p_guess) if denominator > 0: p_know_given_obs = numerator / denominator else: p_know_given_obs = p.p_know # Learning transition: P(L_n) = P(L_n|O) + (1 - P(L_n|O)) * P(T) new_p_know = p_know_given_obs + (1.0 - p_know_given_obs) * p.p_learn new_p_know = max(0.01, min(0.99, new_p_know)) # clamp p.p_know = round(new_p_know, 4) return p.p_know # ──────────────────────────────────────────────────────── # Thompson Sampling # ──────────────────────────────────────────────────────── @dataclass class BetaPrior: alpha: float = 1.0 beta: float = 1.0 class ThompsonSampler: """ Beta-Bernoulli Thompson Sampling with ZPD window and proximity bonus. ZPD window: [current_level - 2, current_level + 3] (asymmetric upward) Proximity bonus: Gaussian centered on student Elo, σ = 100 """ def __init__(self): self.priors: dict[str, BetaPrior] = { level: BetaPrior() for level in LEVELS } def update(self, level: str, weighted_outcome: float) -> None: """Update Beta prior for a level based on weighted outcome.""" if level not in self.priors: self.priors[level] = BetaPrior() self.priors[level].alpha += weighted_outcome self.priors[level].beta += (1.0 - weighted_outcome) def select(self, current_level: str, student_elo: float) -> str: """ Select next question level via Thompson Sampling within ZPD window. """ current_idx = LEVELS.index(current_level) if current_level in LEVELS else 5 # ZPD window: -2 to +3 lo = max(0, current_idx - 2) hi = min(len(LEVELS), current_idx + 4) # +4 because slice is exclusive candidate_levels = LEVELS[lo:hi] best_score = -1.0 best_level = current_level for level in candidate_levels: prior = self.priors.get(level, BetaPrior()) # Sample from Beta distribution sampled_theta = random.betavariate( max(prior.alpha, 0.01), max(prior.beta, 0.01), ) # Gaussian proximity bonus level_elo = LEVEL_TO_ELO.get(level, 1000) proximity = math.exp( -0.5 * ((level_elo - student_elo) / 100.0) ** 2 ) score = sampled_theta * proximity if score > best_score: best_score = score best_level = level return best_level # ──────────────────────────────────────────────────────── # Feature Predictor (for P(isSolved)) # ──────────────────────────────────────────────────────── class FeaturePredictor: """ Simple logistic model predicting P(isSolved) from features. Weights from spec §5.6 (logistic regression on simulated data). """ # Feature importance weights (from spec) W_MCS: float = 0.42 W_ELO_GAP: float = 0.28 W_LDS: float = -0.18 W_BKT: float = 0.15 W_STREAK: float = 0.08 BIAS: float = -0.30 @staticmethod def _sigmoid(x: float) -> float: return 1.0 / (1.0 + math.exp(-x)) def predict( self, mcs_avg: float, elo_gap: float, # student_elo - question_elo (normalized by /400) lds_avg: float, p_know: float, streak: int, ) -> float: """ Predict probability that the student solves the next problem without L4. """ z = ( self.BIAS + self.W_MCS * mcs_avg + self.W_ELO_GAP * elo_gap + self.W_LDS * lds_avg + self.W_BKT * p_know + self.W_STREAK * min(streak, 5) / 5.0 ) return round(self._sigmoid(z), 4) # ──────────────────────────────────────────────────────── # Adaptive Engine (Orchestrator) # ──────────────────────────────────────────────────────── @dataclass class AdaptiveState: """Complete adaptive state for one student.""" student_elo: float = INITIAL_STUDENT_ELO current_level: str = "2.1" # start at center total_interactions: int = 0 streak_correct: int = 0 # consecutive weighted_outcome >= 0.75 streak_wrong: int = 0 # consecutive weighted_outcome < 0.40 recent_lds: list[float] = field(default_factory=list) # last 5 recent_mcs: list[float] = field(default_factory=list) # last 5 enhanced_scaffold: bool = False class AdaptiveEngine: """ Main orchestrator combining Elo, BKT, Thompson Sampling, and feature engineering. Decision logic (from spec §6.5): weighted_outcome ≥ 0.85 AND streak ≥ 3 → SKIP (+2) weighted_outcome ≥ 0.75 AND P(know) ≥ 0.7 → INCREASE (+1) weighted_outcome ≥ 0.40 → MAINTAIN (0) weighted_outcome ≥ 0.25 OR streak_wrong < 2 → DECREASE (-1) else (outcome < 0.25 AND P(know) < 0.30) → RAPID_DECREASE (-2) """ def __init__(self, seed: Optional[int] = None): self.elo_engine = EloEngine() self.bkt_engine = BKTEngine() self.thompson = ThompsonSampler() self.feature_eng = FeatureEngineer() self.predictor = FeaturePredictor() self.state = AdaptiveState() if seed is not None: random.seed(seed) def _elo_to_level(self, elo: float) -> str: """Map an Elo rating to the nearest sub-level.""" best_level = LEVELS[0] best_dist = abs(elo - LEVEL_TO_ELO[LEVELS[0]]) for level, level_elo in ELO_TO_LEVEL: dist = abs(elo - level_elo) if dist < best_dist: best_dist = dist best_level = level return best_level def _shift_level(self, level: str, delta: int) -> str: """Shift a level by delta sub-levels, clamped to valid range.""" idx = LEVELS.index(level) if level in LEVELS else 5 new_idx = max(0, min(len(LEVELS) - 1, idx + delta)) return LEVELS[new_idx] def _update_rolling(self, lst: list[float], value: float, window: int = 5): lst.append(value) if len(lst) > window: lst.pop(0) def process_interaction( self, signals: InteractionSignals, question_elo: float, topic: str, ) -> dict: """ Process a single student-question interaction. Returns a dict with: - features: EngineeredFeatures - weighted_outcome: float - new_student_elo: float - new_p_know: float - decision: str - next_level: str - enhanced_scaffold: bool """ s = self.state # 1. Compute engineered features features = self.feature_eng.compute(signals) weighted_outcome = self.feature_eng.compute_weighted_outcome( signals.is_correct, signals.max_hint_level ) # 2. Update Elo s.total_interactions += 1 new_elo, new_q_elo = self.elo_engine.update( s.student_elo, question_elo, weighted_outcome, s.total_interactions ) s.student_elo = new_elo # 3. Update BKT hint_depth = signals.max_hint_level / 4.0 new_p_know = self.bkt_engine.update(topic, weighted_outcome, hint_depth) # 4. Update Thompson priors self.thompson.update(signals.question_level, weighted_outcome) # 5. Update streaks if weighted_outcome >= 0.75: s.streak_correct += 1 s.streak_wrong = 0 elif weighted_outcome < 0.40: s.streak_wrong += 1 s.streak_correct = 0 else: s.streak_correct = 0 s.streak_wrong = 0 # 6. Update rolling averages self._update_rolling(s.recent_lds, features.lds) self._update_rolling(s.recent_mcs, features.mcs) # 7. Progression decision if weighted_outcome >= 0.85 and s.streak_correct >= 3: decision = "SKIP" level_delta = 2 elif weighted_outcome >= 0.75 and new_p_know >= 0.70: decision = "INCREASE" level_delta = 1 elif weighted_outcome >= 0.40: decision = "MAINTAIN" level_delta = 0 elif weighted_outcome >= 0.25 or s.streak_wrong < 2: decision = "DECREASE" level_delta = -1 else: decision = "RAPID_DECREASE" level_delta = -2 # 8. LDS/MCS diagnostic overlay avg_lds = sum(s.recent_lds) / max(len(s.recent_lds), 1) avg_mcs = sum(s.recent_mcs) / max(len(s.recent_mcs), 1) s.enhanced_scaffold = False if avg_lds > 0.6 and avg_mcs > 0.6: # Language gap: knows math, needs scaffold — don't decrease if level_delta < 0: decision = "MAINTAIN" level_delta = 0 s.enhanced_scaffold = True # 9. Apply level change decision_level = self._shift_level(s.current_level, level_delta) # 10. Thompson sampling for fine-grained selection thompson_level = self.thompson.select(decision_level, s.student_elo) # 11. Override if Thompson and decision disagree strongly dec_idx = LEVELS.index(decision_level) if decision_level in LEVELS else 5 th_idx = LEVELS.index(thompson_level) if thompson_level in LEVELS else 5 if level_delta < 0 and th_idx > dec_idx + 1: # Decision says decrease but Thompson wants to increase significantly next_level = decision_level else: next_level = thompson_level s.current_level = next_level return { "features": features, "weighted_outcome": weighted_outcome, "new_student_elo": s.student_elo, "new_p_know": new_p_know, "decision": decision, "decision_level": decision_level, "next_level": next_level, "enhanced_scaffold": s.enhanced_scaffold, "avg_lds": round(avg_lds, 4), "avg_mcs": round(avg_mcs, 4), "quadrant": features.quadrant, } # ──────────────────────────────────────────────────────── # Simulation # ──────────────────────────────────────────────────────── def simulate_student_profile( profile_name: str, true_level_idx: int, base_p_correct: float, hint_tendency: float, n_interactions: int = 20, seed: int = 42, ) -> dict: """ Simulate a student profile through n_interactions. Args: profile_name: Label for this profile true_level_idx: Index into LEVELS of the student's true ability base_p_correct: Base probability of getting correct answer hint_tendency: Probability of requesting hints (0=never, 1=always) n_interactions: Number of practice interactions seed: Random seed """ random.seed(seed) engine = AdaptiveEngine(seed=seed) true_elo = LEVEL_TO_ELO[LEVELS[true_level_idx]] results = [] for i in range(n_interactions): current_level = engine.state.current_level question_elo = LEVEL_TO_ELO.get(current_level, 1000) # Simulate difficulty effect on correctness elo_diff = true_elo - question_elo difficulty_modifier = 1.0 / (1.0 + math.exp(-elo_diff / 200.0)) p_correct = base_p_correct * difficulty_modifier + 0.1 * (1 - difficulty_modifier) # Simulate hint usage if random.random() < hint_tendency: max_hint = random.choices( [1, 2, 3, 4], weights=[0.3, 0.3, 0.25, 0.15], )[0] else: max_hint = 0 is_correct = random.random() < p_correct if max_hint == 4: is_correct = False # L4 = solution reveal # Generate plausible timing base_time = 30 + true_level_idx * 5 total_time = max(10, base_time + random.gauss(0, 10)) scaffold_total = 0 t_l1, t_l2, t_l3, t_l4 = 0.0, 0.0, 0.0, 0.0 if max_hint >= 1: t_l1 = random.uniform(3, 10) scaffold_total += t_l1 if max_hint >= 2: t_l2 = random.uniform(5, 15) scaffold_total += t_l2 if max_hint >= 3: t_l3 = random.uniform(8, 20) scaffold_total += t_l3 if max_hint >= 4: t_l4 = random.uniform(10, 25) scaffold_total += t_l4 total_time = max(total_time, scaffold_total + 5) topic = random.choice(TOPICS) signals = InteractionSignals( max_hint_level=max_hint, time_before_first_hint=random.uniform(2, 15) if max_hint > 0 else 0, total_time=total_time, time_at_L1=t_l1, time_at_L2=t_l2, time_at_L3=t_l3, time_at_L4=t_l4, num_attempts=1 if is_correct and max_hint == 0 else random.randint(1, 3), is_correct=is_correct, question_level=current_level, ) result = engine.process_interaction(signals, question_elo, topic) results.append(result) # Summary final_elo = engine.state.student_elo final_level = engine.state.current_level avg_wo = sum(r["weighted_outcome"] for r in results) / len(results) avg_lds = sum(r["features"].lds for r in results) / len(results) avg_mcs = sum(r["features"].mcs for r in results) / len(results) decisions = {} for r in results: d = r["decision"] decisions[d] = decisions.get(d, 0) + 1 return { "profile": profile_name, "true_level": LEVELS[true_level_idx], "start_elo": INITIAL_STUDENT_ELO, "final_elo": round(final_elo, 1), "final_level": final_level, "avg_weighted_outcome": round(avg_wo, 3), "avg_lds": round(avg_lds, 3), "avg_mcs": round(avg_mcs, 3), "decisions": decisions, } def _run_simulation(): print("=" * 70) print("MathLingua Adaptive Engine — Simulation Results") print("=" * 70) profiles = [ ("Strong Student (true ~2.5)", 9, 0.85, 0.15), ("Struggling Student (true ~1.2)", 1, 0.45, 0.70), ("Average Student (true ~1.5)", 4, 0.65, 0.40), ] for name, true_idx, p_correct, hint_tend in profiles: result = simulate_student_profile(name, true_idx, p_correct, hint_tend) print(f"\n{'─' * 50}") print(f"Profile: {result['profile']}") print(f" True level: {result['true_level']}") print(f" Elo: {result['start_elo']} → {result['final_elo']}") print(f" Level: 2.1 → {result['final_level']}") print(f" Avg weighted outcome: {result['avg_weighted_outcome']}") print(f" Avg LDS: {result['avg_lds']}") print(f" Avg MCS: {result['avg_mcs']}") print(f" Decisions: {result['decisions']}") print(f"\n{'=' * 70}") print("Simulation completed successfully ✓") print(f"{'=' * 70}") if __name__ == "__main__": _run_simulation()