import uuid import numpy as np from typing import Dict, Any, Optional, Tuple from datetime import datetime from models import ( Observation, Action, Reward, State, StepResult, TaskDifficulty, ActionType, GraderResult ) from agents import ( ContextFlowAgent, AgentPrediction, ConfusionLevel, InterventionType, KnowledgeGraphAgent, PeerLearningAgent, RecallAgent, ) class ContextFlowEnvironment: """ OpenEnv environment with full ContextFlow multi-agent system. Integrates: - RL-based doubt prediction - Multi-modal behavioral analysis - Gesture recognition - Knowledge graphs - Peer learning - Spaced repetition """ MAX_STEPS = 100 def __init__(self, task_difficulty: TaskDifficulty = TaskDifficulty.MEDIUM): self.task_difficulty = task_difficulty self.episode_id: Optional[str] = None self.step_count: int = 0 self._state: Optional[State] = None self._last_observation: Optional[Observation] = None self._ground_truth_confusion: float = 0.0 self._confusion_trajectory: list = [] self._prediction_history: list = [] self._intervention_history: list = [] self._task_config = self._get_task_config() self.agent = ContextFlowAgent() self.knowledge_graph = KnowledgeGraphAgent() self.peer_learning = PeerLearningAgent() self.recall_system = RecallAgent() def _get_task_config(self) -> Dict[str, Any]: configs = { TaskDifficulty.EASY: { "prediction_window": 3, "noise_level": 0.1, "confusion_base": 0.3, "intervention_threshold": 0.6, "max_steps": 50, "confusion_spike_prob": 0.08, }, TaskDifficulty.MEDIUM: { "prediction_window": 5, "noise_level": 0.2, "confusion_base": 0.5, "intervention_threshold": 0.5, "max_steps": 75, "confusion_spike_prob": 0.12, }, TaskDifficulty.HARD: { "prediction_window": 7, "noise_level": 0.3, "confusion_base": 0.6, "intervention_threshold": 0.4, "max_steps": 100, "confusion_spike_prob": 0.15, }, } return configs.get(self.task_difficulty, configs[TaskDifficulty.MEDIUM]) def _generate_synthetic_data(self) -> Tuple[Observation, float]: step = self.step_count config = self._task_config base_confusion = config["confusion_base"] noise = np.random.normal(0, config["noise_level"]) confusion_trend = np.sin(step * 0.1) * 0.2 confusion_spike = config["confusion_spike_prob"] if np.random.random() < config["confusion_spike_prob"] else 0.0 self._ground_truth_confusion = np.clip( base_confusion + confusion_trend + noise + confusion_spike, 0.0, 1.0 ) self._confusion_trajectory.append(self._ground_truth_confusion) gaze_features = np.random.randn(16).tolist() gesture_features = np.random.randn(63).tolist() biometric_features = [ 60 + np.random.randn() * 10, 0.5 + np.random.randn() * 0.1, 36.6 + np.random.randn() * 0.5, 15 + np.random.randn() * 3, 0.3 + np.random.randn() * 0.1, 0.7 + np.random.randn() * 0.1, ] audio_features = [200 + np.random.randn() * 50, 0.3 + np.random.randn() * 0.1] behavioral_features = np.random.randn(8).tolist() behavioral_features[0] = self._ground_truth_confusion * 0.5 behavioral_features[1] = self._ground_truth_confusion * 0.3 learning_context = { "topic": np.random.choice(["math", "science", "programming", "language"]), "difficulty": self.task_difficulty.value, "time_spent": step * 30, "content_length": np.random.randint(100, 1000), "subtopic": np.random.choice(["basics", "intermediate", "advanced"]), } learner_state = { "engagement": 1.0 - self._ground_truth_confusion, "frustration": self._ground_truth_confusion * 0.8, "comprehension": 0.7 - self._ground_truth_confusion * 0.3, "confusion_level": self._get_confusion_level(self._ground_truth_confusion).value, } observation = Observation( step=self.step_count, episode_id=self.episode_id, learning_context=learning_context, learner_state=learner_state, gaze_features=gaze_features, gesture_features=gesture_features, biometric_features=biometric_features, audio_features=audio_features, behavioral_features=behavioral_features, confusion_history=self._confusion_trajectory[-10:], prediction_window=config["prediction_window"], available_interventions=[ "hint", "simplify", "breakdown", "example", "scaffold", "peer_connect", "break", "encourage" ], multimodal_fused=True, metadata={ "knowledge_graph_mastery": self.knowledge_graph.get_prerequisite_mastery( learning_context["topic"] ), "similar_learners": len(self.peer_learning.find_similar_learners( learning_context["topic"] )), "recall_cards": len(self.recall_system.cards), } ) return observation, self._ground_truth_confusion def _get_confusion_level(self, prob: float) -> ConfusionLevel: from agents import ConfusionLevel if prob < 0.25: return ConfusionLevel.LOW elif prob < 0.5: return ConfusionLevel.MEDIUM elif prob < 0.75: return ConfusionLevel.HIGH else: return ConfusionLevel.CRITICAL def reset(self) -> Observation: self.episode_id = str(uuid.uuid4()) self.step_count = 0 self._confusion_trajectory = [] self._prediction_history = [] self._intervention_history = [] self._ground_truth_confusion = 0.0 self.agent = ContextFlowAgent() observation, _ = self._generate_synthetic_data() self._last_observation = observation self._state = State( episode_id=self.episode_id, step_count=self.step_count, max_steps=self._task_config["max_steps"], task_difficulty=self.task_difficulty, ground_truth_confusion=self._ground_truth_confusion, predictions_history=[], interventions_history=[], episode_reward=0.0, task_complete=False, task_success=False, ) return observation def step(self, action: Action) -> StepResult: if self._state is None: raise RuntimeError("Must call reset() before step()") if self._state.task_complete: return StepResult( observation=self._create_current_observation(), reward=Reward(total=0.0), done=True, info={"message": "Episode already complete"} ) reward = self._calculate_reward(action) self._state.episode_reward += reward.total self._state.predictions_history.append({ "step": self.step_count, "predicted": action.predicted_confusion, "ground_truth": self._ground_truth_confusion, "action_type": action.action_type.value, "confusion_level": self._get_confusion_level(action.predicted_confusion or 0.5).value, }) if action.action_type == ActionType.TRIGGER_INTERVENTION and action.intervention_type: self._intervention_history.append({ "step": self.step_count, "type": action.intervention_type, "intensity": action.intervention_intensity or 0.5, "effectiveness": 0.0, }) if action.intervention_type == "peer_connect": topic = self._last_observation.learning_context.get("topic", "general") if self._last_observation else "general" peers = self.peer_learning.find_similar_learners(topic) reward.total += 0.1 * min(len(peers), 3) self.agent.update(reward.total, self._last_observation.learning_context if self._last_observation else {}) self.step_count += 1 self._state.step_count = self.step_count observation, new_gt = self._generate_synthetic_data() self._last_observation = observation self._state.ground_truth_confusion = new_gt self._state.interventions_history = self._intervention_history.copy() if len(self._intervention_history) > 0: last_idx = len(self._intervention_history) - 1 if len(self._confusion_trajectory) >= 3: prev_confusion = self._confusion_trajectory[-3] if new_gt < prev_confusion: self._intervention_history[last_idx]["effectiveness"] = 0.8 else: self._intervention_history[last_idx]["effectiveness"] = 0.3 if self.step_count >= self._task_config["max_steps"]: self._state.task_complete = True self._state.task_success = self._grade_task().passed done = self._state.task_complete return StepResult( observation=observation, reward=reward, done=done, info={ "grader_result": self._grade_task() if done else None, "episode_summary": { "total_reward": self._state.episode_reward, "predictions_made": len(self._prediction_history), "interventions_triggered": len(self._intervention_history), "knowledge_graph_active": True, "peer_learning_active": True, "recall_system_active": True, }, "agent_state": { "epsilon": self.agent.epsilon, "recent_avg_reward": np.mean(self.agent.episode_rewards[-10:]) if self.agent.episode_rewards else 0.0, } } ) def _calculate_reward(self, action: Action) -> Reward: gt = self._ground_truth_confusion pred = action.predicted_confusion if action.predicted_confusion is not None else 0.5 prediction_error = abs(pred - gt) confusion_reward = 1.0 - prediction_error early_detection = 0.0 if len(self._confusion_trajectory) > 1: prev_confusion = self._confusion_trajectory[-2] if gt > prev_confusion and pred > prev_confusion: early_detection = 0.2 if gt > 0.6 and pred > 0.6: early_detection = 0.3 intervention_reward = 0.0 if action.action_type == ActionType.TRIGGER_INTERVENTION: if gt > self._task_config["intervention_threshold"]: intervention_reward = 0.3 elif gt < 0.3: intervention_reward = -0.1 partial_progress = 0.0 if len(self._confusion_trajectory) >= 5: recent_trend = np.mean(self._confusion_trajectory[-5:]) if recent_trend < 0.4: partial_progress = 0.1 penalty = 0.0 if action.intervention_intensity and action.intervention_intensity > 0.9: penalty = -0.2 total = confusion_reward * 0.4 + early_detection * 0.2 + intervention_reward * 0.2 + partial_progress * 0.1 + penalty return Reward( total=total, confusion_prediction_reward=confusion_reward * 0.4, early_detection_reward=early_detection, intervention_reward=intervention_reward, partial_progress_reward=partial_progress, penalty=penalty, metadata={ "prediction_error": prediction_error, "ground_truth": gt, "predicted": pred, } ) def _grade_task(self) -> GraderResult: if not self._prediction_history: return GraderResult( score=0.0, feedback="No predictions made", metrics={}, passed=False ) predictions = self._state.predictions_history gt_trajectory = self._confusion_trajectory[:len(predictions)] mae = np.mean([ abs(p["predicted"] - gt) for p, gt in zip(predictions, gt_trajectory) if p["predicted"] is not None ]) confusion_threshold = 0.6 early_detections = 0 total_spikes = 0 for i in range(1, len(gt_trajectory)): if gt_trajectory[i] > confusion_threshold: total_spikes += 1 if i < len(predictions) and predictions[i]["predicted"] > confusion_threshold: if predictions[i]["confusion_level"] in ["high", "critical"]: early_detections += 1 early_detection_rate = early_detections / max(total_spikes, 1) intervention_effectiveness = 0.0 if self._intervention_history: effective_interventions = sum(1 for i in self._intervention_history if i.get("effectiveness", 0) > 0.5) intervention_effectiveness = effective_interventions / len(self._intervention_history) score = (1 - mae) * 0.4 + early_detection_rate * 0.3 + intervention_effectiveness * 0.3 feedback_parts = [] feedback_parts.append(f"MAE: {mae:.3f}") feedback_parts.append(f"Early Detection: {early_detection_rate:.1%}") feedback_parts.append(f"Intervention Effect: {intervention_effectiveness:.1%}") feedback_parts.append(f"Predictions: {len(predictions)}") feedback_parts.append(f"Interventions: {len(self._intervention_history)}") passed = score >= self._get_passing_threshold() return GraderResult( score=score, feedback=" | ".join(feedback_parts), metrics={ "mae": float(mae), "early_detection_rate": float(early_detection_rate), "intervention_effectiveness": float(intervention_effectiveness), "total_predictions": len(predictions), "total_interventions": len(self._intervention_history), }, passed=passed ) def _get_passing_threshold(self) -> float: thresholds = { TaskDifficulty.EASY: 0.5, TaskDifficulty.MEDIUM: 0.6, TaskDifficulty.HARD: 0.7, } return thresholds.get(self.task_difficulty, 0.6) def _create_current_observation(self) -> Observation: return Observation( step=self.step_count, episode_id=self.episode_id, learning_context={"topic": "completed"}, learner_state={"engagement": 0.0}, gaze_features=[], gesture_features=[], biometric_features=[], audio_features=[], behavioral_features=[], confusion_history=self._confusion_trajectory, prediction_window=self._task_config["prediction_window"], available_interventions=[], multimodal_fused=True, ) def get_state(self) -> State: if self._state is None: raise RuntimeError("Must call reset() before get_state()") return self._state def get_agent_prediction(self) -> AgentPrediction: obs_dict = { "gaze_features": self._last_observation.gaze_features if self._last_observation else [], "gesture_features": self._last_observation.gesture_features if self._last_observation else [], "biometric_features": self._last_observation.biometric_features if self._last_observation else [], "behavioral_features": self._last_observation.behavioral_features if self._last_observation else [], "audio_features": self._last_observation.audio_features if self._last_observation else [], "learning_context": {"difficulty": self.task_difficulty.value}, } return self.agent.predict(obs_dict) def get_grader(self, difficulty: Optional[TaskDifficulty] = None) -> callable: difficulty = difficulty or self.task_difficulty def grade(predictions: list, ground_truth: list, interventions: list) -> GraderResult: nonlocal difficulty temp_env = ContextFlowEnvironment(task_difficulty=difficulty) temp_env._confusion_trajectory = ground_truth.copy() temp_env._prediction_history = predictions temp_env._intervention_history = interventions temp_env._state = State( episode_id="grader", step_count=len(predictions), max_steps=temp_env._task_config["max_steps"], task_difficulty=difficulty, predictions_history=[ {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} for i, (p, gt) in enumerate(zip(predictions, ground_truth)) ], interventions_history=interventions, ) return temp_env._grade_task() return grade def easy_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult: env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.EASY) env._confusion_trajectory = ground_truth.copy() env._prediction_history = predictions env._intervention_history = interventions env._state = State( episode_id="easy_grader", step_count=len(predictions), max_steps=env._task_config["max_steps"], task_difficulty=TaskDifficulty.EASY, predictions_history=[ {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} for i, (p, gt) in enumerate(zip(predictions, ground_truth)) ], interventions_history=interventions, ) return env._grade_task() def medium_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult: env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.MEDIUM) env._confusion_trajectory = ground_truth.copy() env._prediction_history = predictions env._intervention_history = interventions env._state = State( episode_id="medium_grader", step_count=len(predictions), max_steps=env._task_config["max_steps"], task_difficulty=TaskDifficulty.MEDIUM, predictions_history=[ {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} for i, (p, gt) in enumerate(zip(predictions, ground_truth)) ], interventions_history=interventions, ) return env._grade_task() def hard_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult: env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.HARD) env._confusion_trajectory = ground_truth.copy() env._prediction_history = predictions env._intervention_history = interventions env._state = State( episode_id="hard_grader", step_count=len(predictions), max_steps=env._task_config["max_steps"], task_difficulty=TaskDifficulty.HARD, predictions_history=[ {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} for i, (p, gt) in enumerate(zip(predictions, ground_truth)) ], interventions_history=interventions, ) return env._grade_task() __all__ = ["ContextFlowEnvironment", "easy_grader", "medium_grader", "hard_grader"]