Spaces:
Sleeping
Sleeping
| import uuid | |
| import numpy as np | |
| from typing import Dict, Any, Optional, Tuple | |
| from datetime import datetime | |
| from models import ( | |
| Observation, Action, Reward, State, StepResult, | |
| TaskDifficulty, ActionType, GraderResult | |
| ) | |
| from agents import ( | |
| ContextFlowAgent, | |
| AgentPrediction, | |
| ConfusionLevel, | |
| InterventionType, | |
| KnowledgeGraphAgent, | |
| PeerLearningAgent, | |
| RecallAgent, | |
| ) | |
| class ContextFlowEnvironment: | |
| """ | |
| OpenEnv environment with full ContextFlow multi-agent system. | |
| Integrates: | |
| - RL-based doubt prediction | |
| - Multi-modal behavioral analysis | |
| - Gesture recognition | |
| - Knowledge graphs | |
| - Peer learning | |
| - Spaced repetition | |
| """ | |
| MAX_STEPS = 100 | |
| def __init__(self, task_difficulty: TaskDifficulty = TaskDifficulty.MEDIUM): | |
| self.task_difficulty = task_difficulty | |
| self.episode_id: Optional[str] = None | |
| self.step_count: int = 0 | |
| self._state: Optional[State] = None | |
| self._last_observation: Optional[Observation] = None | |
| self._ground_truth_confusion: float = 0.0 | |
| self._confusion_trajectory: list = [] | |
| self._prediction_history: list = [] | |
| self._intervention_history: list = [] | |
| self._task_config = self._get_task_config() | |
| self.agent = ContextFlowAgent() | |
| self.knowledge_graph = KnowledgeGraphAgent() | |
| self.peer_learning = PeerLearningAgent() | |
| self.recall_system = RecallAgent() | |
| def _get_task_config(self) -> Dict[str, Any]: | |
| configs = { | |
| TaskDifficulty.EASY: { | |
| "prediction_window": 3, | |
| "noise_level": 0.1, | |
| "confusion_base": 0.3, | |
| "intervention_threshold": 0.6, | |
| "max_steps": 50, | |
| "confusion_spike_prob": 0.08, | |
| }, | |
| TaskDifficulty.MEDIUM: { | |
| "prediction_window": 5, | |
| "noise_level": 0.2, | |
| "confusion_base": 0.5, | |
| "intervention_threshold": 0.5, | |
| "max_steps": 75, | |
| "confusion_spike_prob": 0.12, | |
| }, | |
| TaskDifficulty.HARD: { | |
| "prediction_window": 7, | |
| "noise_level": 0.3, | |
| "confusion_base": 0.6, | |
| "intervention_threshold": 0.4, | |
| "max_steps": 100, | |
| "confusion_spike_prob": 0.15, | |
| }, | |
| } | |
| return configs.get(self.task_difficulty, configs[TaskDifficulty.MEDIUM]) | |
| def _generate_synthetic_data(self) -> Tuple[Observation, float]: | |
| step = self.step_count | |
| config = self._task_config | |
| base_confusion = config["confusion_base"] | |
| noise = np.random.normal(0, config["noise_level"]) | |
| confusion_trend = np.sin(step * 0.1) * 0.2 | |
| confusion_spike = config["confusion_spike_prob"] if np.random.random() < config["confusion_spike_prob"] else 0.0 | |
| self._ground_truth_confusion = np.clip( | |
| base_confusion + confusion_trend + noise + confusion_spike, | |
| 0.0, 1.0 | |
| ) | |
| self._confusion_trajectory.append(self._ground_truth_confusion) | |
| gaze_features = np.random.randn(16).tolist() | |
| gesture_features = np.random.randn(63).tolist() | |
| biometric_features = [ | |
| 60 + np.random.randn() * 10, | |
| 0.5 + np.random.randn() * 0.1, | |
| 36.6 + np.random.randn() * 0.5, | |
| 15 + np.random.randn() * 3, | |
| 0.3 + np.random.randn() * 0.1, | |
| 0.7 + np.random.randn() * 0.1, | |
| ] | |
| audio_features = [200 + np.random.randn() * 50, 0.3 + np.random.randn() * 0.1] | |
| behavioral_features = np.random.randn(8).tolist() | |
| behavioral_features[0] = self._ground_truth_confusion * 0.5 | |
| behavioral_features[1] = self._ground_truth_confusion * 0.3 | |
| learning_context = { | |
| "topic": np.random.choice(["math", "science", "programming", "language"]), | |
| "difficulty": self.task_difficulty.value, | |
| "time_spent": step * 30, | |
| "content_length": np.random.randint(100, 1000), | |
| "subtopic": np.random.choice(["basics", "intermediate", "advanced"]), | |
| } | |
| learner_state = { | |
| "engagement": 1.0 - self._ground_truth_confusion, | |
| "frustration": self._ground_truth_confusion * 0.8, | |
| "comprehension": 0.7 - self._ground_truth_confusion * 0.3, | |
| "confusion_level": self._get_confusion_level(self._ground_truth_confusion).value, | |
| } | |
| observation = Observation( | |
| step=self.step_count, | |
| episode_id=self.episode_id, | |
| learning_context=learning_context, | |
| learner_state=learner_state, | |
| gaze_features=gaze_features, | |
| gesture_features=gesture_features, | |
| biometric_features=biometric_features, | |
| audio_features=audio_features, | |
| behavioral_features=behavioral_features, | |
| confusion_history=self._confusion_trajectory[-10:], | |
| prediction_window=config["prediction_window"], | |
| available_interventions=[ | |
| "hint", "simplify", "breakdown", "example", "scaffold", | |
| "peer_connect", "break", "encourage" | |
| ], | |
| multimodal_fused=True, | |
| metadata={ | |
| "knowledge_graph_mastery": self.knowledge_graph.get_prerequisite_mastery( | |
| learning_context["topic"] | |
| ), | |
| "similar_learners": len(self.peer_learning.find_similar_learners( | |
| learning_context["topic"] | |
| )), | |
| "recall_cards": len(self.recall_system.cards), | |
| } | |
| ) | |
| return observation, self._ground_truth_confusion | |
| def _get_confusion_level(self, prob: float) -> ConfusionLevel: | |
| from agents import ConfusionLevel | |
| if prob < 0.25: | |
| return ConfusionLevel.LOW | |
| elif prob < 0.5: | |
| return ConfusionLevel.MEDIUM | |
| elif prob < 0.75: | |
| return ConfusionLevel.HIGH | |
| else: | |
| return ConfusionLevel.CRITICAL | |
| def reset(self) -> Observation: | |
| self.episode_id = str(uuid.uuid4()) | |
| self.step_count = 0 | |
| self._confusion_trajectory = [] | |
| self._prediction_history = [] | |
| self._intervention_history = [] | |
| self._ground_truth_confusion = 0.0 | |
| self.agent = ContextFlowAgent() | |
| observation, _ = self._generate_synthetic_data() | |
| self._last_observation = observation | |
| self._state = State( | |
| episode_id=self.episode_id, | |
| step_count=self.step_count, | |
| max_steps=self._task_config["max_steps"], | |
| task_difficulty=self.task_difficulty, | |
| ground_truth_confusion=self._ground_truth_confusion, | |
| predictions_history=[], | |
| interventions_history=[], | |
| episode_reward=0.0, | |
| task_complete=False, | |
| task_success=False, | |
| ) | |
| return observation | |
| def step(self, action: Action) -> StepResult: | |
| if self._state is None: | |
| raise RuntimeError("Must call reset() before step()") | |
| if self._state.task_complete: | |
| return StepResult( | |
| observation=self._create_current_observation(), | |
| reward=Reward(total=0.0), | |
| done=True, | |
| info={"message": "Episode already complete"} | |
| ) | |
| reward = self._calculate_reward(action) | |
| self._state.episode_reward += reward.total | |
| self._state.predictions_history.append({ | |
| "step": self.step_count, | |
| "predicted": action.predicted_confusion, | |
| "ground_truth": self._ground_truth_confusion, | |
| "action_type": action.action_type.value, | |
| "confusion_level": self._get_confusion_level(action.predicted_confusion or 0.5).value, | |
| }) | |
| if action.action_type == ActionType.TRIGGER_INTERVENTION and action.intervention_type: | |
| self._intervention_history.append({ | |
| "step": self.step_count, | |
| "type": action.intervention_type, | |
| "intensity": action.intervention_intensity or 0.5, | |
| "effectiveness": 0.0, | |
| }) | |
| if action.intervention_type == "peer_connect": | |
| topic = self._last_observation.learning_context.get("topic", "general") if self._last_observation else "general" | |
| peers = self.peer_learning.find_similar_learners(topic) | |
| reward.total += 0.1 * min(len(peers), 3) | |
| self.agent.update(reward.total, self._last_observation.learning_context if self._last_observation else {}) | |
| self.step_count += 1 | |
| self._state.step_count = self.step_count | |
| observation, new_gt = self._generate_synthetic_data() | |
| self._last_observation = observation | |
| self._state.ground_truth_confusion = new_gt | |
| self._state.interventions_history = self._intervention_history.copy() | |
| if len(self._intervention_history) > 0: | |
| last_idx = len(self._intervention_history) - 1 | |
| if len(self._confusion_trajectory) >= 3: | |
| prev_confusion = self._confusion_trajectory[-3] | |
| if new_gt < prev_confusion: | |
| self._intervention_history[last_idx]["effectiveness"] = 0.8 | |
| else: | |
| self._intervention_history[last_idx]["effectiveness"] = 0.3 | |
| if self.step_count >= self._task_config["max_steps"]: | |
| self._state.task_complete = True | |
| self._state.task_success = self._grade_task().passed | |
| done = self._state.task_complete | |
| return StepResult( | |
| observation=observation, | |
| reward=reward, | |
| done=done, | |
| info={ | |
| "grader_result": self._grade_task() if done else None, | |
| "episode_summary": { | |
| "total_reward": self._state.episode_reward, | |
| "predictions_made": len(self._prediction_history), | |
| "interventions_triggered": len(self._intervention_history), | |
| "knowledge_graph_active": True, | |
| "peer_learning_active": True, | |
| "recall_system_active": True, | |
| }, | |
| "agent_state": { | |
| "epsilon": self.agent.epsilon, | |
| "recent_avg_reward": np.mean(self.agent.episode_rewards[-10:]) if self.agent.episode_rewards else 0.0, | |
| } | |
| } | |
| ) | |
| def _calculate_reward(self, action: Action) -> Reward: | |
| gt = self._ground_truth_confusion | |
| pred = action.predicted_confusion if action.predicted_confusion is not None else 0.5 | |
| prediction_error = abs(pred - gt) | |
| confusion_reward = 1.0 - prediction_error | |
| early_detection = 0.0 | |
| if len(self._confusion_trajectory) > 1: | |
| prev_confusion = self._confusion_trajectory[-2] | |
| if gt > prev_confusion and pred > prev_confusion: | |
| early_detection = 0.2 | |
| if gt > 0.6 and pred > 0.6: | |
| early_detection = 0.3 | |
| intervention_reward = 0.0 | |
| if action.action_type == ActionType.TRIGGER_INTERVENTION: | |
| if gt > self._task_config["intervention_threshold"]: | |
| intervention_reward = 0.3 | |
| elif gt < 0.3: | |
| intervention_reward = -0.1 | |
| partial_progress = 0.0 | |
| if len(self._confusion_trajectory) >= 5: | |
| recent_trend = np.mean(self._confusion_trajectory[-5:]) | |
| if recent_trend < 0.4: | |
| partial_progress = 0.1 | |
| penalty = 0.0 | |
| if action.intervention_intensity and action.intervention_intensity > 0.9: | |
| penalty = -0.2 | |
| total = confusion_reward * 0.4 + early_detection * 0.2 + intervention_reward * 0.2 + partial_progress * 0.1 + penalty | |
| return Reward( | |
| total=total, | |
| confusion_prediction_reward=confusion_reward * 0.4, | |
| early_detection_reward=early_detection, | |
| intervention_reward=intervention_reward, | |
| partial_progress_reward=partial_progress, | |
| penalty=penalty, | |
| metadata={ | |
| "prediction_error": prediction_error, | |
| "ground_truth": gt, | |
| "predicted": pred, | |
| } | |
| ) | |
| def _grade_task(self) -> GraderResult: | |
| if not self._prediction_history: | |
| return GraderResult( | |
| score=0.0, | |
| feedback="No predictions made", | |
| metrics={}, | |
| passed=False | |
| ) | |
| predictions = self._state.predictions_history | |
| gt_trajectory = self._confusion_trajectory[:len(predictions)] | |
| mae = np.mean([ | |
| abs(p["predicted"] - gt) | |
| for p, gt in zip(predictions, gt_trajectory) | |
| if p["predicted"] is not None | |
| ]) | |
| confusion_threshold = 0.6 | |
| early_detections = 0 | |
| total_spikes = 0 | |
| for i in range(1, len(gt_trajectory)): | |
| if gt_trajectory[i] > confusion_threshold: | |
| total_spikes += 1 | |
| if i < len(predictions) and predictions[i]["predicted"] > confusion_threshold: | |
| if predictions[i]["confusion_level"] in ["high", "critical"]: | |
| early_detections += 1 | |
| early_detection_rate = early_detections / max(total_spikes, 1) | |
| intervention_effectiveness = 0.0 | |
| if self._intervention_history: | |
| effective_interventions = sum(1 for i in self._intervention_history if i.get("effectiveness", 0) > 0.5) | |
| intervention_effectiveness = effective_interventions / len(self._intervention_history) | |
| score = (1 - mae) * 0.4 + early_detection_rate * 0.3 + intervention_effectiveness * 0.3 | |
| feedback_parts = [] | |
| feedback_parts.append(f"MAE: {mae:.3f}") | |
| feedback_parts.append(f"Early Detection: {early_detection_rate:.1%}") | |
| feedback_parts.append(f"Intervention Effect: {intervention_effectiveness:.1%}") | |
| feedback_parts.append(f"Predictions: {len(predictions)}") | |
| feedback_parts.append(f"Interventions: {len(self._intervention_history)}") | |
| passed = score >= self._get_passing_threshold() | |
| return GraderResult( | |
| score=score, | |
| feedback=" | ".join(feedback_parts), | |
| metrics={ | |
| "mae": float(mae), | |
| "early_detection_rate": float(early_detection_rate), | |
| "intervention_effectiveness": float(intervention_effectiveness), | |
| "total_predictions": len(predictions), | |
| "total_interventions": len(self._intervention_history), | |
| }, | |
| passed=passed | |
| ) | |
| def _get_passing_threshold(self) -> float: | |
| thresholds = { | |
| TaskDifficulty.EASY: 0.5, | |
| TaskDifficulty.MEDIUM: 0.6, | |
| TaskDifficulty.HARD: 0.7, | |
| } | |
| return thresholds.get(self.task_difficulty, 0.6) | |
| def _create_current_observation(self) -> Observation: | |
| return Observation( | |
| step=self.step_count, | |
| episode_id=self.episode_id, | |
| learning_context={"topic": "completed"}, | |
| learner_state={"engagement": 0.0}, | |
| gaze_features=[], | |
| gesture_features=[], | |
| biometric_features=[], | |
| audio_features=[], | |
| behavioral_features=[], | |
| confusion_history=self._confusion_trajectory, | |
| prediction_window=self._task_config["prediction_window"], | |
| available_interventions=[], | |
| multimodal_fused=True, | |
| ) | |
| def get_state(self) -> State: | |
| if self._state is None: | |
| raise RuntimeError("Must call reset() before get_state()") | |
| return self._state | |
| def get_agent_prediction(self) -> AgentPrediction: | |
| obs_dict = { | |
| "gaze_features": self._last_observation.gaze_features if self._last_observation else [], | |
| "gesture_features": self._last_observation.gesture_features if self._last_observation else [], | |
| "biometric_features": self._last_observation.biometric_features if self._last_observation else [], | |
| "behavioral_features": self._last_observation.behavioral_features if self._last_observation else [], | |
| "audio_features": self._last_observation.audio_features if self._last_observation else [], | |
| "learning_context": {"difficulty": self.task_difficulty.value}, | |
| } | |
| return self.agent.predict(obs_dict) | |
| def get_grader(self, difficulty: Optional[TaskDifficulty] = None) -> callable: | |
| difficulty = difficulty or self.task_difficulty | |
| def grade(predictions: list, ground_truth: list, interventions: list) -> GraderResult: | |
| nonlocal difficulty | |
| temp_env = ContextFlowEnvironment(task_difficulty=difficulty) | |
| temp_env._confusion_trajectory = ground_truth.copy() | |
| temp_env._prediction_history = predictions | |
| temp_env._intervention_history = interventions | |
| temp_env._state = State( | |
| episode_id="grader", | |
| step_count=len(predictions), | |
| max_steps=temp_env._task_config["max_steps"], | |
| task_difficulty=difficulty, | |
| predictions_history=[ | |
| {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} | |
| for i, (p, gt) in enumerate(zip(predictions, ground_truth)) | |
| ], | |
| interventions_history=interventions, | |
| ) | |
| return temp_env._grade_task() | |
| return grade | |
| def easy_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult: | |
| env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.EASY) | |
| env._confusion_trajectory = ground_truth.copy() | |
| env._prediction_history = predictions | |
| env._intervention_history = interventions | |
| env._state = State( | |
| episode_id="easy_grader", | |
| step_count=len(predictions), | |
| max_steps=env._task_config["max_steps"], | |
| task_difficulty=TaskDifficulty.EASY, | |
| predictions_history=[ | |
| {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} | |
| for i, (p, gt) in enumerate(zip(predictions, ground_truth)) | |
| ], | |
| interventions_history=interventions, | |
| ) | |
| return env._grade_task() | |
| def medium_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult: | |
| env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.MEDIUM) | |
| env._confusion_trajectory = ground_truth.copy() | |
| env._prediction_history = predictions | |
| env._intervention_history = interventions | |
| env._state = State( | |
| episode_id="medium_grader", | |
| step_count=len(predictions), | |
| max_steps=env._task_config["max_steps"], | |
| task_difficulty=TaskDifficulty.MEDIUM, | |
| predictions_history=[ | |
| {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} | |
| for i, (p, gt) in enumerate(zip(predictions, ground_truth)) | |
| ], | |
| interventions_history=interventions, | |
| ) | |
| return env._grade_task() | |
| def hard_grader(predictions: list, ground_truth: list, interventions: list) -> GraderResult: | |
| env = ContextFlowEnvironment(task_difficulty=TaskDifficulty.HARD) | |
| env._confusion_trajectory = ground_truth.copy() | |
| env._prediction_history = predictions | |
| env._intervention_history = interventions | |
| env._state = State( | |
| episode_id="hard_grader", | |
| step_count=len(predictions), | |
| max_steps=env._task_config["max_steps"], | |
| task_difficulty=TaskDifficulty.HARD, | |
| predictions_history=[ | |
| {"step": i, "predicted": p, "ground_truth": gt, "action_type": "predict", "confusion_level": "medium"} | |
| for i, (p, gt) in enumerate(zip(predictions, ground_truth)) | |
| ], | |
| interventions_history=interventions, | |
| ) | |
| return env._grade_task() | |
| __all__ = ["ContextFlowEnvironment", "easy_grader", "medium_grader", "hard_grader"] | |