| """ |
| MathLingua β Adaptive Engine |
| |
| Hybrid adaptive algorithm combining: |
| 1. Elo Rating β overall ability tracking with hint-weighted outcomes |
| 2. Bayesian Knowledge Tracing (BKT) β per-topic mastery estimation |
| 3. Thompson Sampling β intelligent question-level selection with ZPD windowing |
| |
| The orchestrator combines all three to produce progression decisions: |
| SKIP (+2), INCREASE (+1), MAINTAIN (0), DECREASE (-1), RAPID_DECREASE (-2) |
| |
| Reference: MathLingua Technical Specification Β§6 |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import random |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| from feature_engineering import ( |
| FeatureEngineer, |
| EngineeredFeatures, |
| InteractionSignals, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| LEVELS = [ |
| "1.1", "1.2", "1.3", "1.4", "1.5", |
| "2.1", "2.2", "2.3", "2.4", "2.5", |
| "3.1", "3.2", "3.3", "3.4", "3.5", |
| ] |
|
|
| LEVEL_TO_ELO: dict[str, int] = { |
| "1.1": 820, "1.2": 870, "1.3": 920, "1.4": 970, "1.5": 1020, |
| "2.1": 1070, "2.2": 1120, "2.3": 1170, "2.4": 1220, "2.5": 1270, |
| "3.1": 1320, "3.2": 1370, "3.3": 1420, "3.4": 1470, "3.5": 1520, |
| } |
|
|
| ELO_TO_LEVEL = sorted(LEVEL_TO_ELO.items(), key=lambda x: x[1]) |
|
|
| TOPICS = ["arithmetic", "fractions", "percentages", "algebra", "geometry", "statistics"] |
|
|
| INITIAL_STUDENT_ELO = 1000 |
|
|
|
|
| |
| |
| |
|
|
| class EloEngine: |
| """ |
| Elo rating system adapted for education with hint-weighted outcomes. |
| |
| Weighted outcomes: 1.00 (no hint), 0.75 (L1), 0.50 (L2), 0.25 (L3), 0.00 (L4/incorrect) |
| K-factor schedule: 48 (first 10), 32 (11β30), 24 (30+) |
| """ |
|
|
| def __init__(self): |
| pass |
|
|
| @staticmethod |
| def expected_score(student_elo: float, question_elo: float) -> float: |
| """E_s = 1 / (1 + 10^((R_q - R_s) / 400))""" |
| return 1.0 / (1.0 + math.pow(10.0, (question_elo - student_elo) / 400.0)) |
|
|
| @staticmethod |
| def k_factor_student(interaction_count: int) -> float: |
| if interaction_count <= 10: |
| return 48.0 |
| elif interaction_count <= 30: |
| return 32.0 |
| else: |
| return 24.0 |
|
|
| @staticmethod |
| def k_factor_question(interaction_count: int) -> float: |
| if interaction_count <= 10: |
| return 8.0 |
| elif interaction_count <= 30: |
| return 6.0 |
| else: |
| return 4.0 |
|
|
| def update( |
| self, |
| student_elo: float, |
| question_elo: float, |
| weighted_outcome: float, |
| student_interactions: int, |
| ) -> tuple[float, float]: |
| """ |
| Update student and question Elo ratings. |
| |
| Returns: (new_student_elo, new_question_elo) |
| """ |
| expected = self.expected_score(student_elo, question_elo) |
| ks = self.k_factor_student(student_interactions) |
| kq = self.k_factor_question(student_interactions) |
|
|
| new_student = student_elo + ks * (weighted_outcome - expected) |
| new_question = question_elo + kq * (expected - weighted_outcome) |
|
|
| return round(new_student, 1), round(new_question, 1) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class BKTParams: |
| """BKT parameters for one topic.""" |
| p_know: float = 0.10 |
| p_learn: float = 0.15 |
| p_slip: float = 0.10 |
| p_guess: float = 0.25 |
|
|
|
|
| class BKTEngine: |
| """ |
| Bayesian Knowledge Tracing with slip adjustment for scaffold usage. |
| |
| P(S)_adj = P(S) Γ (1 + 0.5 Γ hint_depth_normalized) |
| |
| This makes BKT more skeptical of scaffold-assisted correctness. |
| """ |
|
|
| def __init__(self, topics: Optional[list[str]] = None): |
| self.topics = topics or TOPICS |
| self.params: dict[str, BKTParams] = { |
| t: BKTParams() for t in self.topics |
| } |
|
|
| def get_mastery(self, topic: str) -> float: |
| """Return P(know) for a topic.""" |
| return self.params.get(topic, BKTParams()).p_know |
|
|
| def update( |
| self, |
| topic: str, |
| weighted_outcome: float, |
| hint_depth_normalized: float, |
| ) -> float: |
| """ |
| Update P(know) for a topic given an interaction outcome. |
| |
| Args: |
| topic: Math topic string |
| weighted_outcome: 0.0β1.0 hint-weighted outcome |
| hint_depth_normalized: h_i / 4 (0.0β1.0) |
| |
| Returns: New P(know) |
| """ |
| if topic not in self.params: |
| self.params[topic] = BKTParams() |
|
|
| p = self.params[topic] |
|
|
| |
| p_slip_adj = p.p_slip * (1.0 + 0.5 * hint_depth_normalized) |
| p_slip_adj = min(p_slip_adj, 0.5) |
|
|
| |
| is_correct = weighted_outcome >= 0.5 |
|
|
| if is_correct: |
| |
| numerator = p.p_know * (1.0 - p_slip_adj) |
| denominator = numerator + (1.0 - p.p_know) * p.p_guess |
| else: |
| |
| numerator = p.p_know * p_slip_adj |
| denominator = numerator + (1.0 - p.p_know) * (1.0 - p.p_guess) |
|
|
| if denominator > 0: |
| p_know_given_obs = numerator / denominator |
| else: |
| p_know_given_obs = p.p_know |
|
|
| |
| new_p_know = p_know_given_obs + (1.0 - p_know_given_obs) * p.p_learn |
| new_p_know = max(0.01, min(0.99, new_p_know)) |
|
|
| p.p_know = round(new_p_know, 4) |
| return p.p_know |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class BetaPrior: |
| alpha: float = 1.0 |
| beta: float = 1.0 |
|
|
|
|
| class ThompsonSampler: |
| """ |
| Beta-Bernoulli Thompson Sampling with ZPD window and proximity bonus. |
| |
| ZPD window: [current_level - 2, current_level + 3] (asymmetric upward) |
| Proximity bonus: Gaussian centered on student Elo, Ο = 100 |
| """ |
|
|
| def __init__(self): |
| self.priors: dict[str, BetaPrior] = { |
| level: BetaPrior() for level in LEVELS |
| } |
|
|
| def update(self, level: str, weighted_outcome: float) -> None: |
| """Update Beta prior for a level based on weighted outcome.""" |
| if level not in self.priors: |
| self.priors[level] = BetaPrior() |
| self.priors[level].alpha += weighted_outcome |
| self.priors[level].beta += (1.0 - weighted_outcome) |
|
|
| def select(self, current_level: str, student_elo: float) -> str: |
| """ |
| Select next question level via Thompson Sampling within ZPD window. |
| """ |
| current_idx = LEVELS.index(current_level) if current_level in LEVELS else 5 |
| |
| lo = max(0, current_idx - 2) |
| hi = min(len(LEVELS), current_idx + 4) |
| candidate_levels = LEVELS[lo:hi] |
|
|
| best_score = -1.0 |
| best_level = current_level |
|
|
| for level in candidate_levels: |
| prior = self.priors.get(level, BetaPrior()) |
|
|
| |
| sampled_theta = random.betavariate( |
| max(prior.alpha, 0.01), |
| max(prior.beta, 0.01), |
| ) |
|
|
| |
| level_elo = LEVEL_TO_ELO.get(level, 1000) |
| proximity = math.exp( |
| -0.5 * ((level_elo - student_elo) / 100.0) ** 2 |
| ) |
|
|
| score = sampled_theta * proximity |
|
|
| if score > best_score: |
| best_score = score |
| best_level = level |
|
|
| return best_level |
|
|
|
|
| |
| |
| |
|
|
| class FeaturePredictor: |
| """ |
| Simple logistic model predicting P(isSolved) from features. |
| Weights from spec Β§5.6 (logistic regression on simulated data). |
| """ |
|
|
| |
| W_MCS: float = 0.42 |
| W_ELO_GAP: float = 0.28 |
| W_LDS: float = -0.18 |
| W_BKT: float = 0.15 |
| W_STREAK: float = 0.08 |
| BIAS: float = -0.30 |
|
|
| @staticmethod |
| def _sigmoid(x: float) -> float: |
| return 1.0 / (1.0 + math.exp(-x)) |
|
|
| def predict( |
| self, |
| mcs_avg: float, |
| elo_gap: float, |
| lds_avg: float, |
| p_know: float, |
| streak: int, |
| ) -> float: |
| """ |
| Predict probability that the student solves the next problem without L4. |
| """ |
| z = ( |
| self.BIAS |
| + self.W_MCS * mcs_avg |
| + self.W_ELO_GAP * elo_gap |
| + self.W_LDS * lds_avg |
| + self.W_BKT * p_know |
| + self.W_STREAK * min(streak, 5) / 5.0 |
| ) |
| return round(self._sigmoid(z), 4) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class AdaptiveState: |
| """Complete adaptive state for one student.""" |
| student_elo: float = INITIAL_STUDENT_ELO |
| current_level: str = "2.1" |
| total_interactions: int = 0 |
| streak_correct: int = 0 |
| streak_wrong: int = 0 |
| recent_lds: list[float] = field(default_factory=list) |
| recent_mcs: list[float] = field(default_factory=list) |
| enhanced_scaffold: bool = False |
|
|
|
|
| class AdaptiveEngine: |
| """ |
| Main orchestrator combining Elo, BKT, Thompson Sampling, and feature engineering. |
| |
| Decision logic (from spec Β§6.5): |
| weighted_outcome β₯ 0.85 AND streak β₯ 3 β SKIP (+2) |
| weighted_outcome β₯ 0.75 AND P(know) β₯ 0.7 β INCREASE (+1) |
| weighted_outcome β₯ 0.40 β MAINTAIN (0) |
| weighted_outcome β₯ 0.25 OR streak_wrong < 2 β DECREASE (-1) |
| else (outcome < 0.25 AND P(know) < 0.30) β RAPID_DECREASE (-2) |
| """ |
|
|
| def __init__(self, seed: Optional[int] = None): |
| self.elo_engine = EloEngine() |
| self.bkt_engine = BKTEngine() |
| self.thompson = ThompsonSampler() |
| self.feature_eng = FeatureEngineer() |
| self.predictor = FeaturePredictor() |
| self.state = AdaptiveState() |
|
|
| if seed is not None: |
| random.seed(seed) |
|
|
| def _elo_to_level(self, elo: float) -> str: |
| """Map an Elo rating to the nearest sub-level.""" |
| best_level = LEVELS[0] |
| best_dist = abs(elo - LEVEL_TO_ELO[LEVELS[0]]) |
| for level, level_elo in ELO_TO_LEVEL: |
| dist = abs(elo - level_elo) |
| if dist < best_dist: |
| best_dist = dist |
| best_level = level |
| return best_level |
|
|
| def _shift_level(self, level: str, delta: int) -> str: |
| """Shift a level by delta sub-levels, clamped to valid range.""" |
| idx = LEVELS.index(level) if level in LEVELS else 5 |
| new_idx = max(0, min(len(LEVELS) - 1, idx + delta)) |
| return LEVELS[new_idx] |
|
|
| def _update_rolling(self, lst: list[float], value: float, window: int = 5): |
| lst.append(value) |
| if len(lst) > window: |
| lst.pop(0) |
|
|
| def process_interaction( |
| self, |
| signals: InteractionSignals, |
| question_elo: float, |
| topic: str, |
| ) -> dict: |
| """ |
| Process a single student-question interaction. |
| |
| Returns a dict with: |
| - features: EngineeredFeatures |
| - weighted_outcome: float |
| - new_student_elo: float |
| - new_p_know: float |
| - decision: str |
| - next_level: str |
| - enhanced_scaffold: bool |
| """ |
| s = self.state |
|
|
| |
| features = self.feature_eng.compute(signals) |
| weighted_outcome = self.feature_eng.compute_weighted_outcome( |
| signals.is_correct, signals.max_hint_level |
| ) |
|
|
| |
| s.total_interactions += 1 |
| new_elo, new_q_elo = self.elo_engine.update( |
| s.student_elo, question_elo, weighted_outcome, s.total_interactions |
| ) |
| s.student_elo = new_elo |
|
|
| |
| hint_depth = signals.max_hint_level / 4.0 |
| new_p_know = self.bkt_engine.update(topic, weighted_outcome, hint_depth) |
|
|
| |
| self.thompson.update(signals.question_level, weighted_outcome) |
|
|
| |
| if weighted_outcome >= 0.75: |
| s.streak_correct += 1 |
| s.streak_wrong = 0 |
| elif weighted_outcome < 0.40: |
| s.streak_wrong += 1 |
| s.streak_correct = 0 |
| else: |
| s.streak_correct = 0 |
| s.streak_wrong = 0 |
|
|
| |
| self._update_rolling(s.recent_lds, features.lds) |
| self._update_rolling(s.recent_mcs, features.mcs) |
|
|
| |
| if weighted_outcome >= 0.85 and s.streak_correct >= 3: |
| decision = "SKIP" |
| level_delta = 2 |
| elif weighted_outcome >= 0.75 and new_p_know >= 0.70: |
| decision = "INCREASE" |
| level_delta = 1 |
| elif weighted_outcome >= 0.40: |
| decision = "MAINTAIN" |
| level_delta = 0 |
| elif weighted_outcome >= 0.25 or s.streak_wrong < 2: |
| decision = "DECREASE" |
| level_delta = -1 |
| else: |
| decision = "RAPID_DECREASE" |
| level_delta = -2 |
|
|
| |
| avg_lds = sum(s.recent_lds) / max(len(s.recent_lds), 1) |
| avg_mcs = sum(s.recent_mcs) / max(len(s.recent_mcs), 1) |
| s.enhanced_scaffold = False |
|
|
| if avg_lds > 0.6 and avg_mcs > 0.6: |
| |
| if level_delta < 0: |
| decision = "MAINTAIN" |
| level_delta = 0 |
| s.enhanced_scaffold = True |
|
|
| |
| decision_level = self._shift_level(s.current_level, level_delta) |
|
|
| |
| thompson_level = self.thompson.select(decision_level, s.student_elo) |
|
|
| |
| dec_idx = LEVELS.index(decision_level) if decision_level in LEVELS else 5 |
| th_idx = LEVELS.index(thompson_level) if thompson_level in LEVELS else 5 |
|
|
| if level_delta < 0 and th_idx > dec_idx + 1: |
| |
| next_level = decision_level |
| else: |
| next_level = thompson_level |
|
|
| s.current_level = next_level |
|
|
| return { |
| "features": features, |
| "weighted_outcome": weighted_outcome, |
| "new_student_elo": s.student_elo, |
| "new_p_know": new_p_know, |
| "decision": decision, |
| "decision_level": decision_level, |
| "next_level": next_level, |
| "enhanced_scaffold": s.enhanced_scaffold, |
| "avg_lds": round(avg_lds, 4), |
| "avg_mcs": round(avg_mcs, 4), |
| "quadrant": features.quadrant, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def simulate_student_profile( |
| profile_name: str, |
| true_level_idx: int, |
| base_p_correct: float, |
| hint_tendency: float, |
| n_interactions: int = 20, |
| seed: int = 42, |
| ) -> dict: |
| """ |
| Simulate a student profile through n_interactions. |
| |
| Args: |
| profile_name: Label for this profile |
| true_level_idx: Index into LEVELS of the student's true ability |
| base_p_correct: Base probability of getting correct answer |
| hint_tendency: Probability of requesting hints (0=never, 1=always) |
| n_interactions: Number of practice interactions |
| seed: Random seed |
| """ |
| random.seed(seed) |
| engine = AdaptiveEngine(seed=seed) |
| true_elo = LEVEL_TO_ELO[LEVELS[true_level_idx]] |
|
|
| results = [] |
|
|
| for i in range(n_interactions): |
| current_level = engine.state.current_level |
| question_elo = LEVEL_TO_ELO.get(current_level, 1000) |
|
|
| |
| elo_diff = true_elo - question_elo |
| difficulty_modifier = 1.0 / (1.0 + math.exp(-elo_diff / 200.0)) |
| p_correct = base_p_correct * difficulty_modifier + 0.1 * (1 - difficulty_modifier) |
|
|
| |
| if random.random() < hint_tendency: |
| max_hint = random.choices( |
| [1, 2, 3, 4], |
| weights=[0.3, 0.3, 0.25, 0.15], |
| )[0] |
| else: |
| max_hint = 0 |
|
|
| is_correct = random.random() < p_correct |
| if max_hint == 4: |
| is_correct = False |
|
|
| |
| base_time = 30 + true_level_idx * 5 |
| total_time = max(10, base_time + random.gauss(0, 10)) |
|
|
| scaffold_total = 0 |
| t_l1, t_l2, t_l3, t_l4 = 0.0, 0.0, 0.0, 0.0 |
| if max_hint >= 1: |
| t_l1 = random.uniform(3, 10) |
| scaffold_total += t_l1 |
| if max_hint >= 2: |
| t_l2 = random.uniform(5, 15) |
| scaffold_total += t_l2 |
| if max_hint >= 3: |
| t_l3 = random.uniform(8, 20) |
| scaffold_total += t_l3 |
| if max_hint >= 4: |
| t_l4 = random.uniform(10, 25) |
| scaffold_total += t_l4 |
|
|
| total_time = max(total_time, scaffold_total + 5) |
|
|
| topic = random.choice(TOPICS) |
|
|
| signals = InteractionSignals( |
| max_hint_level=max_hint, |
| time_before_first_hint=random.uniform(2, 15) if max_hint > 0 else 0, |
| total_time=total_time, |
| time_at_L1=t_l1, |
| time_at_L2=t_l2, |
| time_at_L3=t_l3, |
| time_at_L4=t_l4, |
| num_attempts=1 if is_correct and max_hint == 0 else random.randint(1, 3), |
| is_correct=is_correct, |
| question_level=current_level, |
| ) |
|
|
| result = engine.process_interaction(signals, question_elo, topic) |
| results.append(result) |
|
|
| |
| final_elo = engine.state.student_elo |
| final_level = engine.state.current_level |
| avg_wo = sum(r["weighted_outcome"] for r in results) / len(results) |
| avg_lds = sum(r["features"].lds for r in results) / len(results) |
| avg_mcs = sum(r["features"].mcs for r in results) / len(results) |
|
|
| decisions = {} |
| for r in results: |
| d = r["decision"] |
| decisions[d] = decisions.get(d, 0) + 1 |
|
|
| return { |
| "profile": profile_name, |
| "true_level": LEVELS[true_level_idx], |
| "start_elo": INITIAL_STUDENT_ELO, |
| "final_elo": round(final_elo, 1), |
| "final_level": final_level, |
| "avg_weighted_outcome": round(avg_wo, 3), |
| "avg_lds": round(avg_lds, 3), |
| "avg_mcs": round(avg_mcs, 3), |
| "decisions": decisions, |
| } |
|
|
|
|
| def _run_simulation(): |
| print("=" * 70) |
| print("MathLingua Adaptive Engine β Simulation Results") |
| print("=" * 70) |
|
|
| profiles = [ |
| ("Strong Student (true ~2.5)", 9, 0.85, 0.15), |
| ("Struggling Student (true ~1.2)", 1, 0.45, 0.70), |
| ("Average Student (true ~1.5)", 4, 0.65, 0.40), |
| ] |
|
|
| for name, true_idx, p_correct, hint_tend in profiles: |
| result = simulate_student_profile(name, true_idx, p_correct, hint_tend) |
| print(f"\n{'β' * 50}") |
| print(f"Profile: {result['profile']}") |
| print(f" True level: {result['true_level']}") |
| print(f" Elo: {result['start_elo']} β {result['final_elo']}") |
| print(f" Level: 2.1 β {result['final_level']}") |
| print(f" Avg weighted outcome: {result['avg_weighted_outcome']}") |
| print(f" Avg LDS: {result['avg_lds']}") |
| print(f" Avg MCS: {result['avg_mcs']}") |
| print(f" Decisions: {result['decisions']}") |
|
|
| print(f"\n{'=' * 70}") |
| print("Simulation completed successfully β") |
| print(f"{'=' * 70}") |
|
|
|
|
| if __name__ == "__main__": |
| _run_simulation() |
|
|