contextflow-env-api / server /contextflow_environment.py
namish10's picture
Upload folder using huggingface_hub
a2896bf verified
import uuid
import numpy as np
from typing import Dict, Any, Optional, Tuple
from datetime import datetime
from models import (
Observation, Action, Reward, State, StepResult,
TaskDifficulty, ActionType, GraderResult
)
from agents import (
ContextFlowAgent,
AgentPrediction,
ConfusionLevel,
InterventionType,
KnowledgeGraphAgent,
PeerLearningAgent,
RecallAgent,
)
class ContextFlowEnvironment:
"""
OpenEnv environment with full ContextFlow multi-agent system.
Integrates:
- RL-based doubt prediction
- Multi-modal behavioral analysis
- Gesture recognition
- Knowledge graphs
- Peer learning
- Spaced repetition
"""
MAX_STEPS = 100
def __init__(self, task_difficulty: TaskDifficulty = TaskDifficulty.MEDIUM):
self.task_difficulty = task_difficulty
self.episode_id: Optional[str] = None
self.step_count: int = 0
self._state: Optional[State] = None
self._last_observation: Optional[Observation] = None
self._ground_truth_confusion: float = 0.0
self._confusion_trajectory: list = []
self._prediction_history: list = []
self._intervention_history: list = []
self._task_config = self._get_task_config()
self.agent = ContextFlowAgent()
self.knowledge_graph = KnowledgeGraphAgent()
self.peer_learning = PeerLearningAgent()
self.recall_system = RecallAgent()
def _get_task_config(self) -> Dict[str, Any]:
configs = {
TaskDifficulty.EASY: {
"prediction_window": 3,
"noise_level": 0.1,
"confusion_base": 0.3,
"intervention_threshold": 0.6,
"max_steps": 50,
"confusion_spike_prob": 0.08,
},
TaskDifficulty.MEDIUM: {
"prediction_window": 5,
"noise_level": 0.2,
"confusion_base": 0.5,
"intervention_threshold": 0.5,
"max_steps": 75,
"confusion_spike_prob": 0.12,
},
TaskDifficulty.HARD: {
"prediction_window": 7,
"noise_level": 0.3,
"confusion_base": 0.6,
"intervention_threshold": 0.4,
"max_steps": 100,
"confusion_spike_prob": 0.15,
},
}
return configs.get(self.task_difficulty, configs[TaskDifficulty.MEDIUM])
def _generate_synthetic_data(self) -> Tuple[Observation, float]:
step = self.step_count
config = self._task_config
base_confusion = config["confusion_base"]
noise = np.random.normal(0, config["noise_level"])
confusion_trend = np.sin(step * 0.1) * 0.2
confusion_spike = config["confusion_spike_prob"] if np.random.random() < config["confusion_spike_prob"] else 0.0
self._ground_truth_confusion = np.clip(
base_confusion + confusion_trend + noise + confusion_spike,
0.0, 1.0
)
self._confusion_trajectory.append(self._ground_truth_confusion)
gaze_features = np.random.randn(16).tolist()
gesture_features = np.random.randn(63).tolist()
biometric_features = [
60 + np.random.randn() * 10,
0.5 + np.random.randn() * 0.1,
36.6 + np.random.randn() * 0.5,
15 + np.random.randn() * 3,
0.3 + np.random.randn() * 0.1,
0.7 + np.random.randn() * 0.1,
]
audio_features = [200 + np.random.randn() * 50, 0.3 + np.random.randn() * 0.1]
behavioral_features = np.random.randn(8).tolist()
behavioral_features[0] = self._ground_truth_confusion * 0.5
behavioral_features[1] = self._ground_truth_confusion * 0.3
learning_context = {
"topic": np.random.choice(["math", "science", "programming", "language"]),
"difficulty": self.task_difficulty.value,
"time_spent": step * 30,
"content_length": np.random.randint(100, 1000),
"subtopic": np.random.choice(["basics", "intermediate", "advanced"]),
}
learner_state = {
"engagement": 1.0 - self._ground_truth_confusion,
"frustration": self._ground_truth_confusion * 0.8,
"comprehension": 0.7 - self._ground_truth_confusion * 0.3,
"confusion_level": self._get_confusion_level(self._ground_truth_confusion).value,
}
observation = Observation(
step=self.step_count,
episode_id=self.episode_id,
learning_context=learning_context,
learner_state=learner_state,
gaze_features=gaze_features,
gesture_features=gesture_features,
biometric_features=biometric_features,
audio_features=audio_features,
behavioral_features=behavioral_features,
confusion_history=self._confusion_trajectory[-10:],
prediction_window=config["prediction_window"],
available_interventions=[
"hint", "simplify", "breakdown", "example", "scaffold",
"peer_connect", "break", "encourage"
],
multimodal_fused=True,
metadata={
"knowledge_graph_mastery": self.knowledge_graph.get_prerequisite_mastery(
learning_context["topic"]
),
"similar_learners": len(self.peer_learning.find_similar_learners(
learning_context["topic"]
)),
"recall_cards": len(self.recall_system.cards),
}
)
return observation, self._ground_truth_confusion
def _get_confusion_level(self, prob: float) -> ConfusionLevel:
from agents import ConfusionLevel
if prob < 0.25:
return ConfusionLevel.LOW
elif prob < 0.5:
return ConfusionLevel.MEDIUM
elif prob < 0.75:
return ConfusionLevel.HIGH
else:
return ConfusionLevel.CRITICAL
def reset(self) -> Observation:
self.episode_id = str(uuid.uuid4())
self.step_count = 0
self._confusion_trajectory = []
self._prediction_history = []
self._intervention_history = []
self._ground_truth_confusion = 0.0
self.agent = ContextFlowAgent()
observation, _ = self._generate_synthetic_data()
self._last_observation = observation
self._state = State(
episode_id=self.episode_id,
step_count=self.step_count,
max_steps=self._task_config["max_steps"],
task_difficulty=self.task_difficulty,
ground_truth_confusion=self._ground_truth_confusion,
predictions_history=[],
interventions_history=[],
episode_reward=0.0,
task_complete=False,
task_success=False,
)
return observation
def step(self, action: Action) -> StepResult:
if self._state is None:
raise RuntimeError("Must call reset() before step()")
if self._state.task_complete:
return StepResult(
observation=self._create_current_observation(),
reward=Reward(total=0.0),
done=True,
info={"message": "Episode already complete"}
)
reward = self._calculate_reward(action)
self._state.episode_reward += reward.total
self._state.predictions_history.append({
"step": self.step_count,
"predicted": action.predicted_confusion,
"ground_truth": self._ground_truth_confusion,
"action_type": action.action_type.value,
"confusion_level": self._get_confusion_level(action.predicted_confusion or 0.5).value,
})
if action.action_type == ActionType.TRIGGER_INTERVENTION and action.intervention_type:
self._intervention_history.append({
"step": self.step_count,
"type": action.intervention_type,
"intensity": action.intervention_intensity or 0.5,
"effectiveness": 0.0,
})
if action.intervention_type == "peer_connect":
topic = self._last_observation.learning_context.get("topic", "general") if self._last_observation else "general"
peers = self.peer_learning.find_similar_learners(topic)
reward.total += 0.1 * min(len(peers), 3)
self.agent.update(reward.total, self._last_observation.learning_context if self._last_observation else {})
self.step_count += 1
self._state.step_count = self.step_count
observation, new_gt = self._generate_synthetic_data()
self._last_observation = observation
self._state.ground_truth_confusion = new_gt
self._state.interventions_history = self._intervention_history.copy()
if len(self._intervention_history) > 0:
last_idx = len(self._intervention_history) - 1
if len(self._confusion_trajectory) >= 3:
prev_confusion = self._confusion_trajectory[-3]
if new_gt < prev_confusion:
self._intervention_history[last_idx]["effectiveness"] = 0.8
else:
self._intervention_history[last_idx]["effectiveness"] = 0.3
if self.step_count >= self._task_config["max_steps"]:
self._state.task_complete = True
self._state.task_success = self._grade_task().passed
done = self._state.task_complete
return StepResult(
observation=observation,
reward=reward,
done=done,
info={
"grader_result": self._grade_task() if done else None,
"episode_summary": {
"total_reward": self._state.episode_reward,
"predictions_made": len(self._prediction_history),
"interventions_triggered": len(self._intervention_history),
"knowledge_graph_active": True,
"peer_learning_active": True,
"recall_system_active": True,
},
"agent_state": {
"epsilon": self.agent.epsilon,
"recent_avg_reward": np.mean(self.agent.episode_rewards[-10:]) if self.agent.episode_rewards else 0.0,
}
}
)
def _calculate_reward(self, action: Action) -> Reward:
gt = self._ground_truth_confusion
pred = action.predicted_confusion if action.predicted_confusion is not None else 0.5
prediction_error = abs(pred - gt)
confusion_reward = 1.0 - prediction_error
early_detection = 0.0
if len(self._confusion_trajectory) > 1:
prev_confusion = self._confusion_trajectory[-2]
if gt > prev_confusion and pred > prev_confusion:
early_detection = 0.2
if gt > 0.6 and pred > 0.6:
early_detection = 0.3
intervention_reward = 0.0
if action.action_type == ActionType.TRIGGER_INTERVENTION:
if gt > self._task_config["intervention_threshold"]:
intervention_reward = 0.3
elif gt < 0.3:
intervention_reward = -0.1
partial_progress = 0.0
if len(self._confusion_trajectory) >= 5:
recent_trend = np.mean(self._confusion_trajectory[-5:])
if recent_trend < 0.4:
partial_progress = 0.1
penalty = 0.0
if action.intervention_intensity and action.intervention_intensity > 0.9:
penalty = -0.2
total = confusion_reward * 0.4 + early_detection * 0.2 + intervention_reward * 0.2 + partial_progress * 0.1 + penalty
return Reward(
total=total,
confusion_prediction_reward=confusion_reward * 0.4,
early_detection_reward=early_detection,
intervention_reward=intervention_reward,
partial_progress_reward=partial_progress,
penalty=penalty,
metadata={
"prediction_error": prediction_error,
"ground_truth": gt,
"predicted": pred,
}
)
def _grade_task(self) -> GraderResult:
if not self._prediction_history:
return GraderResult(
score=0.0,
feedback="No predictions made",
metrics={},
passed=False
)
predictions = self._state.predictions_history
gt_trajectory = self._confusion_trajectory[:len(predictions)]
mae = np.mean([
abs(p["predicted"] - gt)
for p, gt in zip(predictions, gt_trajectory)
if p["predicted"] is not None
])
confusion_threshold = 0.6
early_detections = 0
total_spikes = 0
for i in range(1, len(gt_trajectory)):
if gt_trajectory[i] > confusion_threshold:
total_spikes += 1
if i < len(predictions) and predictions[i]["predicted"] > confusion_threshold:
if predictions[i]["confusion_level"] in ["high", "critical"]:
early_detections += 1
early_detection_rate = early_detections / max(total_spikes, 1)
intervention_effectiveness = 0.0
if self._intervention_history:
effective_interventions = sum(1 for i in self._intervention_history if i.get("effectiveness", 0) > 0.5)
intervention_effectiveness = effective_interventions / len(self._intervention_history)
score = (1 - mae) * 0.4 + early_detection_rate * 0.3 + intervention_effectiveness * 0.3
feedback_parts = []
feedback_parts.append(f"MAE: {mae:.3f}")
feedback_parts.append(f"Early Detection: {early_detection_rate:.1%}")
feedback_parts.append(f"Intervention Effect: {intervention_effectiveness:.1%}")
feedback_parts.append(f"Predictions: {len(predictions)}")
feedback_parts.append(f"Interventions: {len(self._intervention_history)}")
passed = score >= self._get_passing_threshold()
return GraderResult(
score=score,
feedback=" | ".join(feedback_parts),
metrics={
"mae": float(mae),
"early_detection_rate": float(early_detection_rate),
"intervention_effectiveness": float(intervention_effectiveness),
"total_predictions": len(predictions),
"total_interventions": len(self._intervention_history),
},
passed=passed
)
def _get_passing_threshold(self) -> float:
thresholds = {
TaskDifficulty.EASY: 0.5,
TaskDifficulty.MEDIUM: 0.6,
TaskDifficulty.HARD: 0.7,
}
return thresholds.get(self.task_difficulty, 0.6)
def _create_current_observation(self) -> Observation:
return Observation(
step=self.step_count,
episode_id=self.episode_id,
learning_context={"topic": "completed"},
learner_state={"engagement": 0.0},
gaze_features=[],
gesture_features=[],
biometric_features=[],
audio_features=[],
behavioral_features=[],
confusion_history=self._confusion_trajectory,
prediction_window=self._task_config["prediction_window"],
available_interventions=[],
multimodal_fused=True,
)
def get_state(self) -> State:
if self._state is None:
raise RuntimeError("Must call reset() before get_state()")
return self._state
def get_agent_prediction(self) -> AgentPrediction:
obs_dict = {
"gaze_features": self._last_observation.gaze_features if self._last_observation else [],
"gesture_features": self._last_observation.gesture_features if self._last_observation else [],
"biometric_features": self._last_observation.biometric_features if self._last_observation else [],
"behavioral_features": self._last_observation.behavioral_features if self._last_observation else [],
"audio_features": self._last_observation.audio_features if self._last_observation else [],
"learning_context": {"difficulty": self.task_difficulty.value},
}
return self.agent.predict(obs_dict)
def get_grader(self, difficulty: Optional[TaskDifficulty] = None) -> callable:
difficulty = difficulty or self.task_difficulty
def grade(predictions: list, ground_truth: list, interventions: list) -> GraderResult:
nonlocal difficulty
temp_env = ContextFlowEnvironment(task_difficulty=difficulty)
temp_env._confusion_trajectory = ground_truth.copy()
temp_env._prediction_history = predictions
temp_env._intervention_history = interventions
temp_env._state = State(
episode_id="grader",
step_count=len(predictions),
max_steps=temp_env._task_config["max_steps"],
task_difficulty=difficulty,
predictions_history=[
{"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"}
for i, (p, gt) in enumerate(zip(predictions, ground_truth))
],
interventions_history=interventions,
)
return temp_env._grade_task()
return grade
def easy_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult:
env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.EASY)
env._confusion_trajectory = ground_truth.copy()
env._prediction_history = predictions
env._intervention_history = interventions
env._state = State(
episode_id="easy_grader",
step_count=len(predictions),
max_steps=env._task_config["max_steps"],
task_difficulty=TaskDifficulty.EASY,
predictions_history=[
{"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"}
for i, (p, gt) in enumerate(zip(predictions, ground_truth))
],
interventions_history=interventions,
)
return env._grade_task()
def medium_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult:
env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.MEDIUM)
env._confusion_trajectory = ground_truth.copy()
env._prediction_history = predictions
env._intervention_history = interventions
env._state = State(
episode_id="medium_grader",
step_count=len(predictions),
max_steps=env._task_config["max_steps"],
task_difficulty=TaskDifficulty.MEDIUM,
predictions_history=[
{"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"}
for i, (p, gt) in enumerate(zip(predictions, ground_truth))
],
interventions_history=interventions,
)
return env._grade_task()
def hard_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult:
env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.HARD)
env._confusion_trajectory = ground_truth.copy()
env._prediction_history = predictions
env._intervention_history = interventions
env._state = State(
episode_id="hard_grader",
step_count=len(predictions),
max_steps=env._task_config["max_steps"],
task_difficulty=TaskDifficulty.HARD,
predictions_history=[
{"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"}
for i, (p, gt) in enumerate(zip(predictions, ground_truth))
],
interventions_history=interventions,
)
return env._grade_task()
__all__ = ["ContextFlowEnvironment", "easy_grader", "medium_grader", "hard_grader"]