mathlingua-spec / adaptive_engine.py
cosmicmicra's picture
Add adaptive engine (Elo + BKT + Thompson Sampling orchestrator)
e34f815 verified
"""
MathLingua β€” Adaptive Engine
Hybrid adaptive algorithm combining:
1. Elo Rating β€” overall ability tracking with hint-weighted outcomes
2. Bayesian Knowledge Tracing (BKT) β€” per-topic mastery estimation
3. Thompson Sampling β€” intelligent question-level selection with ZPD windowing
The orchestrator combines all three to produce progression decisions:
SKIP (+2), INCREASE (+1), MAINTAIN (0), DECREASE (-1), RAPID_DECREASE (-2)
Reference: MathLingua Technical Specification Β§6
"""
from __future__ import annotations
import math
import random
from dataclasses import dataclass, field
from typing import Optional
from feature_engineering import (
FeatureEngineer,
EngineeredFeatures,
InteractionSignals,
)
# ────────────────────────────────────────────────────────
# Constants
# ────────────────────────────────────────────────────────
LEVELS = [
"1.1", "1.2", "1.3", "1.4", "1.5",
"2.1", "2.2", "2.3", "2.4", "2.5",
"3.1", "3.2", "3.3", "3.4", "3.5",
]
LEVEL_TO_ELO: dict[str, int] = {
"1.1": 820, "1.2": 870, "1.3": 920, "1.4": 970, "1.5": 1020,
"2.1": 1070, "2.2": 1120, "2.3": 1170, "2.4": 1220, "2.5": 1270,
"3.1": 1320, "3.2": 1370, "3.3": 1420, "3.4": 1470, "3.5": 1520,
}
ELO_TO_LEVEL = sorted(LEVEL_TO_ELO.items(), key=lambda x: x[1])
TOPICS = ["arithmetic", "fractions", "percentages", "algebra", "geometry", "statistics"]
INITIAL_STUDENT_ELO = 1000
# ────────────────────────────────────────────────────────
# Elo Engine
# ────────────────────────────────────────────────────────
class EloEngine:
"""
Elo rating system adapted for education with hint-weighted outcomes.
Weighted outcomes: 1.00 (no hint), 0.75 (L1), 0.50 (L2), 0.25 (L3), 0.00 (L4/incorrect)
K-factor schedule: 48 (first 10), 32 (11–30), 24 (30+)
"""
def __init__(self):
pass
@staticmethod
def expected_score(student_elo: float, question_elo: float) -> float:
"""E_s = 1 / (1 + 10^((R_q - R_s) / 400))"""
return 1.0 / (1.0 + math.pow(10.0, (question_elo - student_elo) / 400.0))
@staticmethod
def k_factor_student(interaction_count: int) -> float:
if interaction_count <= 10:
return 48.0
elif interaction_count <= 30:
return 32.0
else:
return 24.0
@staticmethod
def k_factor_question(interaction_count: int) -> float:
if interaction_count <= 10:
return 8.0
elif interaction_count <= 30:
return 6.0
else:
return 4.0
def update(
self,
student_elo: float,
question_elo: float,
weighted_outcome: float,
student_interactions: int,
) -> tuple[float, float]:
"""
Update student and question Elo ratings.
Returns: (new_student_elo, new_question_elo)
"""
expected = self.expected_score(student_elo, question_elo)
ks = self.k_factor_student(student_interactions)
kq = self.k_factor_question(student_interactions)
new_student = student_elo + ks * (weighted_outcome - expected)
new_question = question_elo + kq * (expected - weighted_outcome)
return round(new_student, 1), round(new_question, 1)
# ────────────────────────────────────────────────────────
# Bayesian Knowledge Tracing (BKT)
# ────────────────────────────────────────────────────────
@dataclass
class BKTParams:
"""BKT parameters for one topic."""
p_know: float = 0.10 # P(L_0) β€” prior knowledge
p_learn: float = 0.15 # P(T) β€” learn rate
p_slip: float = 0.10 # P(S) β€” slip
p_guess: float = 0.25 # P(G) β€” guess
class BKTEngine:
"""
Bayesian Knowledge Tracing with slip adjustment for scaffold usage.
P(S)_adj = P(S) Γ— (1 + 0.5 Γ— hint_depth_normalized)
This makes BKT more skeptical of scaffold-assisted correctness.
"""
def __init__(self, topics: Optional[list[str]] = None):
self.topics = topics or TOPICS
self.params: dict[str, BKTParams] = {
t: BKTParams() for t in self.topics
}
def get_mastery(self, topic: str) -> float:
"""Return P(know) for a topic."""
return self.params.get(topic, BKTParams()).p_know
def update(
self,
topic: str,
weighted_outcome: float,
hint_depth_normalized: float,
) -> float:
"""
Update P(know) for a topic given an interaction outcome.
Args:
topic: Math topic string
weighted_outcome: 0.0–1.0 hint-weighted outcome
hint_depth_normalized: h_i / 4 (0.0–1.0)
Returns: New P(know)
"""
if topic not in self.params:
self.params[topic] = BKTParams()
p = self.params[topic]
# Adjust slip probability based on hint depth
p_slip_adj = p.p_slip * (1.0 + 0.5 * hint_depth_normalized)
p_slip_adj = min(p_slip_adj, 0.5) # cap at 0.5
# Determine if "correct" or "incorrect" for BKT purposes
is_correct = weighted_outcome >= 0.5
if is_correct:
# P(L_n | correct) = P(L) * (1-P(S)_adj) / [P(L)*(1-P(S)_adj) + (1-P(L))*P(G)]
numerator = p.p_know * (1.0 - p_slip_adj)
denominator = numerator + (1.0 - p.p_know) * p.p_guess
else:
# P(L_n | incorrect) = P(L) * P(S)_adj / [P(L)*P(S)_adj + (1-P(L))*(1-P(G))]
numerator = p.p_know * p_slip_adj
denominator = numerator + (1.0 - p.p_know) * (1.0 - p.p_guess)
if denominator > 0:
p_know_given_obs = numerator / denominator
else:
p_know_given_obs = p.p_know
# Learning transition: P(L_n) = P(L_n|O) + (1 - P(L_n|O)) * P(T)
new_p_know = p_know_given_obs + (1.0 - p_know_given_obs) * p.p_learn
new_p_know = max(0.01, min(0.99, new_p_know)) # clamp
p.p_know = round(new_p_know, 4)
return p.p_know
# ────────────────────────────────────────────────────────
# Thompson Sampling
# ────────────────────────────────────────────────────────
@dataclass
class BetaPrior:
alpha: float = 1.0
beta: float = 1.0
class ThompsonSampler:
"""
Beta-Bernoulli Thompson Sampling with ZPD window and proximity bonus.
ZPD window: [current_level - 2, current_level + 3] (asymmetric upward)
Proximity bonus: Gaussian centered on student Elo, Οƒ = 100
"""
def __init__(self):
self.priors: dict[str, BetaPrior] = {
level: BetaPrior() for level in LEVELS
}
def update(self, level: str, weighted_outcome: float) -> None:
"""Update Beta prior for a level based on weighted outcome."""
if level not in self.priors:
self.priors[level] = BetaPrior()
self.priors[level].alpha += weighted_outcome
self.priors[level].beta += (1.0 - weighted_outcome)
def select(self, current_level: str, student_elo: float) -> str:
"""
Select next question level via Thompson Sampling within ZPD window.
"""
current_idx = LEVELS.index(current_level) if current_level in LEVELS else 5
# ZPD window: -2 to +3
lo = max(0, current_idx - 2)
hi = min(len(LEVELS), current_idx + 4) # +4 because slice is exclusive
candidate_levels = LEVELS[lo:hi]
best_score = -1.0
best_level = current_level
for level in candidate_levels:
prior = self.priors.get(level, BetaPrior())
# Sample from Beta distribution
sampled_theta = random.betavariate(
max(prior.alpha, 0.01),
max(prior.beta, 0.01),
)
# Gaussian proximity bonus
level_elo = LEVEL_TO_ELO.get(level, 1000)
proximity = math.exp(
-0.5 * ((level_elo - student_elo) / 100.0) ** 2
)
score = sampled_theta * proximity
if score > best_score:
best_score = score
best_level = level
return best_level
# ────────────────────────────────────────────────────────
# Feature Predictor (for P(isSolved))
# ────────────────────────────────────────────────────────
class FeaturePredictor:
"""
Simple logistic model predicting P(isSolved) from features.
Weights from spec Β§5.6 (logistic regression on simulated data).
"""
# Feature importance weights (from spec)
W_MCS: float = 0.42
W_ELO_GAP: float = 0.28
W_LDS: float = -0.18
W_BKT: float = 0.15
W_STREAK: float = 0.08
BIAS: float = -0.30
@staticmethod
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
def predict(
self,
mcs_avg: float,
elo_gap: float, # student_elo - question_elo (normalized by /400)
lds_avg: float,
p_know: float,
streak: int,
) -> float:
"""
Predict probability that the student solves the next problem without L4.
"""
z = (
self.BIAS
+ self.W_MCS * mcs_avg
+ self.W_ELO_GAP * elo_gap
+ self.W_LDS * lds_avg
+ self.W_BKT * p_know
+ self.W_STREAK * min(streak, 5) / 5.0
)
return round(self._sigmoid(z), 4)
# ────────────────────────────────────────────────────────
# Adaptive Engine (Orchestrator)
# ────────────────────────────────────────────────────────
@dataclass
class AdaptiveState:
"""Complete adaptive state for one student."""
student_elo: float = INITIAL_STUDENT_ELO
current_level: str = "2.1" # start at center
total_interactions: int = 0
streak_correct: int = 0 # consecutive weighted_outcome >= 0.75
streak_wrong: int = 0 # consecutive weighted_outcome < 0.40
recent_lds: list[float] = field(default_factory=list) # last 5
recent_mcs: list[float] = field(default_factory=list) # last 5
enhanced_scaffold: bool = False
class AdaptiveEngine:
"""
Main orchestrator combining Elo, BKT, Thompson Sampling, and feature engineering.
Decision logic (from spec Β§6.5):
weighted_outcome β‰₯ 0.85 AND streak β‰₯ 3 β†’ SKIP (+2)
weighted_outcome β‰₯ 0.75 AND P(know) β‰₯ 0.7 β†’ INCREASE (+1)
weighted_outcome β‰₯ 0.40 β†’ MAINTAIN (0)
weighted_outcome β‰₯ 0.25 OR streak_wrong < 2 β†’ DECREASE (-1)
else (outcome < 0.25 AND P(know) < 0.30) β†’ RAPID_DECREASE (-2)
"""
def __init__(self, seed: Optional[int] = None):
self.elo_engine = EloEngine()
self.bkt_engine = BKTEngine()
self.thompson = ThompsonSampler()
self.feature_eng = FeatureEngineer()
self.predictor = FeaturePredictor()
self.state = AdaptiveState()
if seed is not None:
random.seed(seed)
def _elo_to_level(self, elo: float) -> str:
"""Map an Elo rating to the nearest sub-level."""
best_level = LEVELS[0]
best_dist = abs(elo - LEVEL_TO_ELO[LEVELS[0]])
for level, level_elo in ELO_TO_LEVEL:
dist = abs(elo - level_elo)
if dist < best_dist:
best_dist = dist
best_level = level
return best_level
def _shift_level(self, level: str, delta: int) -> str:
"""Shift a level by delta sub-levels, clamped to valid range."""
idx = LEVELS.index(level) if level in LEVELS else 5
new_idx = max(0, min(len(LEVELS) - 1, idx + delta))
return LEVELS[new_idx]
def _update_rolling(self, lst: list[float], value: float, window: int = 5):
lst.append(value)
if len(lst) > window:
lst.pop(0)
def process_interaction(
self,
signals: InteractionSignals,
question_elo: float,
topic: str,
) -> dict:
"""
Process a single student-question interaction.
Returns a dict with:
- features: EngineeredFeatures
- weighted_outcome: float
- new_student_elo: float
- new_p_know: float
- decision: str
- next_level: str
- enhanced_scaffold: bool
"""
s = self.state
# 1. Compute engineered features
features = self.feature_eng.compute(signals)
weighted_outcome = self.feature_eng.compute_weighted_outcome(
signals.is_correct, signals.max_hint_level
)
# 2. Update Elo
s.total_interactions += 1
new_elo, new_q_elo = self.elo_engine.update(
s.student_elo, question_elo, weighted_outcome, s.total_interactions
)
s.student_elo = new_elo
# 3. Update BKT
hint_depth = signals.max_hint_level / 4.0
new_p_know = self.bkt_engine.update(topic, weighted_outcome, hint_depth)
# 4. Update Thompson priors
self.thompson.update(signals.question_level, weighted_outcome)
# 5. Update streaks
if weighted_outcome >= 0.75:
s.streak_correct += 1
s.streak_wrong = 0
elif weighted_outcome < 0.40:
s.streak_wrong += 1
s.streak_correct = 0
else:
s.streak_correct = 0
s.streak_wrong = 0
# 6. Update rolling averages
self._update_rolling(s.recent_lds, features.lds)
self._update_rolling(s.recent_mcs, features.mcs)
# 7. Progression decision
if weighted_outcome >= 0.85 and s.streak_correct >= 3:
decision = "SKIP"
level_delta = 2
elif weighted_outcome >= 0.75 and new_p_know >= 0.70:
decision = "INCREASE"
level_delta = 1
elif weighted_outcome >= 0.40:
decision = "MAINTAIN"
level_delta = 0
elif weighted_outcome >= 0.25 or s.streak_wrong < 2:
decision = "DECREASE"
level_delta = -1
else:
decision = "RAPID_DECREASE"
level_delta = -2
# 8. LDS/MCS diagnostic overlay
avg_lds = sum(s.recent_lds) / max(len(s.recent_lds), 1)
avg_mcs = sum(s.recent_mcs) / max(len(s.recent_mcs), 1)
s.enhanced_scaffold = False
if avg_lds > 0.6 and avg_mcs > 0.6:
# Language gap: knows math, needs scaffold β€” don't decrease
if level_delta < 0:
decision = "MAINTAIN"
level_delta = 0
s.enhanced_scaffold = True
# 9. Apply level change
decision_level = self._shift_level(s.current_level, level_delta)
# 10. Thompson sampling for fine-grained selection
thompson_level = self.thompson.select(decision_level, s.student_elo)
# 11. Override if Thompson and decision disagree strongly
dec_idx = LEVELS.index(decision_level) if decision_level in LEVELS else 5
th_idx = LEVELS.index(thompson_level) if thompson_level in LEVELS else 5
if level_delta < 0 and th_idx > dec_idx + 1:
# Decision says decrease but Thompson wants to increase significantly
next_level = decision_level
else:
next_level = thompson_level
s.current_level = next_level
return {
"features": features,
"weighted_outcome": weighted_outcome,
"new_student_elo": s.student_elo,
"new_p_know": new_p_know,
"decision": decision,
"decision_level": decision_level,
"next_level": next_level,
"enhanced_scaffold": s.enhanced_scaffold,
"avg_lds": round(avg_lds, 4),
"avg_mcs": round(avg_mcs, 4),
"quadrant": features.quadrant,
}
# ────────────────────────────────────────────────────────
# Simulation
# ────────────────────────────────────────────────────────
def simulate_student_profile(
profile_name: str,
true_level_idx: int,
base_p_correct: float,
hint_tendency: float,
n_interactions: int = 20,
seed: int = 42,
) -> dict:
"""
Simulate a student profile through n_interactions.
Args:
profile_name: Label for this profile
true_level_idx: Index into LEVELS of the student's true ability
base_p_correct: Base probability of getting correct answer
hint_tendency: Probability of requesting hints (0=never, 1=always)
n_interactions: Number of practice interactions
seed: Random seed
"""
random.seed(seed)
engine = AdaptiveEngine(seed=seed)
true_elo = LEVEL_TO_ELO[LEVELS[true_level_idx]]
results = []
for i in range(n_interactions):
current_level = engine.state.current_level
question_elo = LEVEL_TO_ELO.get(current_level, 1000)
# Simulate difficulty effect on correctness
elo_diff = true_elo - question_elo
difficulty_modifier = 1.0 / (1.0 + math.exp(-elo_diff / 200.0))
p_correct = base_p_correct * difficulty_modifier + 0.1 * (1 - difficulty_modifier)
# Simulate hint usage
if random.random() < hint_tendency:
max_hint = random.choices(
[1, 2, 3, 4],
weights=[0.3, 0.3, 0.25, 0.15],
)[0]
else:
max_hint = 0
is_correct = random.random() < p_correct
if max_hint == 4:
is_correct = False # L4 = solution reveal
# Generate plausible timing
base_time = 30 + true_level_idx * 5
total_time = max(10, base_time + random.gauss(0, 10))
scaffold_total = 0
t_l1, t_l2, t_l3, t_l4 = 0.0, 0.0, 0.0, 0.0
if max_hint >= 1:
t_l1 = random.uniform(3, 10)
scaffold_total += t_l1
if max_hint >= 2:
t_l2 = random.uniform(5, 15)
scaffold_total += t_l2
if max_hint >= 3:
t_l3 = random.uniform(8, 20)
scaffold_total += t_l3
if max_hint >= 4:
t_l4 = random.uniform(10, 25)
scaffold_total += t_l4
total_time = max(total_time, scaffold_total + 5)
topic = random.choice(TOPICS)
signals = InteractionSignals(
max_hint_level=max_hint,
time_before_first_hint=random.uniform(2, 15) if max_hint > 0 else 0,
total_time=total_time,
time_at_L1=t_l1,
time_at_L2=t_l2,
time_at_L3=t_l3,
time_at_L4=t_l4,
num_attempts=1 if is_correct and max_hint == 0 else random.randint(1, 3),
is_correct=is_correct,
question_level=current_level,
)
result = engine.process_interaction(signals, question_elo, topic)
results.append(result)
# Summary
final_elo = engine.state.student_elo
final_level = engine.state.current_level
avg_wo = sum(r["weighted_outcome"] for r in results) / len(results)
avg_lds = sum(r["features"].lds for r in results) / len(results)
avg_mcs = sum(r["features"].mcs for r in results) / len(results)
decisions = {}
for r in results:
d = r["decision"]
decisions[d] = decisions.get(d, 0) + 1
return {
"profile": profile_name,
"true_level": LEVELS[true_level_idx],
"start_elo": INITIAL_STUDENT_ELO,
"final_elo": round(final_elo, 1),
"final_level": final_level,
"avg_weighted_outcome": round(avg_wo, 3),
"avg_lds": round(avg_lds, 3),
"avg_mcs": round(avg_mcs, 3),
"decisions": decisions,
}
def _run_simulation():
print("=" * 70)
print("MathLingua Adaptive Engine β€” Simulation Results")
print("=" * 70)
profiles = [
("Strong Student (true ~2.5)", 9, 0.85, 0.15),
("Struggling Student (true ~1.2)", 1, 0.45, 0.70),
("Average Student (true ~1.5)", 4, 0.65, 0.40),
]
for name, true_idx, p_correct, hint_tend in profiles:
result = simulate_student_profile(name, true_idx, p_correct, hint_tend)
print(f"\n{'─' * 50}")
print(f"Profile: {result['profile']}")
print(f" True level: {result['true_level']}")
print(f" Elo: {result['start_elo']} β†’ {result['final_elo']}")
print(f" Level: 2.1 β†’ {result['final_level']}")
print(f" Avg weighted outcome: {result['avg_weighted_outcome']}")
print(f" Avg LDS: {result['avg_lds']}")
print(f" Avg MCS: {result['avg_mcs']}")
print(f" Decisions: {result['decisions']}")
print(f"\n{'=' * 70}")
print("Simulation completed successfully βœ“")
print(f"{'=' * 70}")
if __name__ == "__main__":
_run_simulation()