| """ |
| Real Learning Data Collection Module for ContextFlow |
| |
| Collects real behavioral signals from actual learning sessions for model improvement. |
| Addresses: Synthetic Data Bias limitation |
| """ |
|
|
| import json |
| import time |
| import uuid |
| from datetime import datetime |
| from typing import Dict, List, Optional, Any |
| from dataclasses import dataclass, asdict, field |
| from collections import defaultdict |
| import numpy as np |
|
|
|
|
| @dataclass |
| class LearningSession: |
| """A real learning session with actual student data""" |
| session_id: str |
| user_id: str |
| topic: str |
| start_time: datetime |
| end_time: Optional[datetime] = None |
| events: List[Dict] = field(default_factory=list) |
| confusion_scores: List[float] = field(default_factory=list) |
| actual_doubts: List[str] = field(default_factory=list) |
| gesture_signals: Dict[str, int] = field(default_factory=dict) |
| completion_status: str = "in_progress" |
| |
| def to_dict(self) -> Dict: |
| return { |
| 'session_id': self.session_id, |
| 'user_id': self.user_id, |
| 'topic': self.topic, |
| 'start_time': self.start_time.isoformat(), |
| 'end_time': self.end_time.isoformat() if self.end_time else None, |
| 'events': self.events, |
| 'confusion_scores': self.confusion_scores, |
| 'actual_doubts': self.actual_doubts, |
| 'gesture_signals': self.gesture_signals, |
| 'completion_status': self.completion_status, |
| 'duration_minutes': (self.end_time - self.start_time).total_seconds() / 60 if self.end_time else 0 |
| } |
|
|
|
|
| @dataclass |
| class BehavioralEvent: |
| """A single behavioral event from a real session""" |
| timestamp: float |
| event_type: str |
| data: Dict[str, Any] |
| session_id: str |
| user_id: str |
| |
| |
| |
| |
| |
| |
|
|
|
|
| class RealDataCollector: |
| """ |
| Collects real learning data from user sessions. |
| |
| Usage: |
| collector = RealDataCollector(user_id='student123') |
| collector.start_session('machine learning') |
| collector.record_event('mouse_hesitation', {'duration_ms': 2000}) |
| collector.report_doubt('how_gradient_descent_works') |
| collector.end_session() |
| """ |
| |
| def __init__(self, user_id: str): |
| self.user_id = user_id |
| self.current_session: Optional[LearningSession] = None |
| self.sessions: List[LearningSession] = [] |
| self.data_dir = 'collected_data' |
| |
| def start_session(self, topic: str) -> str: |
| """Start a new learning session""" |
| session_id = str(uuid.uuid4()) |
| self.current_session = LearningSession( |
| session_id=session_id, |
| user_id=self.user_id, |
| topic=topic, |
| start_time=datetime.now() |
| ) |
| return session_id |
| |
| def record_event(self, event_type: str, data: Dict[str, Any]): |
| """Record a behavioral event""" |
| if not self.current_session: |
| return |
| |
| event = BehavioralEvent( |
| timestamp=time.time(), |
| event_type=event_type, |
| data=data, |
| session_id=self.current_session.session_id, |
| user_id=self.user_id |
| ) |
| |
| self.current_session.events.append(asdict(event)) |
| |
| |
| if event_type.startswith('gesture_'): |
| gesture_name = event_type.replace('gesture_', '') |
| self.current_session.gesture_signals[gesture_name] = \ |
| self.current_session.gesture_signals.get(gesture_name, 0) + 1 |
| |
| def record_confusion(self, score: float): |
| """Record a confusion score observation""" |
| if not self.current_session: |
| return |
| self.current_session.confusion_scores.append(score) |
| |
| def report_doubt(self, doubt_type: str): |
| """Record an actual doubt the student had""" |
| if not self.current_session: |
| return |
| self.current_session.actual_doubts.append(doubt_type) |
| |
| def end_session(self, status: str = "completed"): |
| """End the current session""" |
| if not self.current_session: |
| return |
| |
| self.current_session.end_time = datetime.now() |
| self.current_session.completion_status = status |
| self.sessions.append(self.current_session) |
| self.current_session = None |
| |
| def save_session(self) -> str: |
| """Save session data to file""" |
| if not self.current_session: |
| return None |
| |
| session_data = self.current_session.to_dict() |
| filename = f"{self.data_dir}/{self.current_session.session_id}.json" |
| |
| import os |
| os.makedirs(self.data_dir, exist_ok=True) |
| |
| with open(filename, 'w') as f: |
| json.dump(session_data, f, indent=2) |
| |
| return filename |
| |
| def get_training_data(self) -> List[Dict]: |
| """Get collected data formatted for RL training""" |
| training_samples = [] |
| |
| for session in self.sessions: |
| if session.completion_status != "completed": |
| continue |
| |
| |
| for i, event in enumerate(session.events): |
| |
| state = self._extract_state_from_session(session, i) |
| |
| |
| actual_doubt = self._get_doubt_at_time(session, event['timestamp']) |
| |
| if actual_doubt: |
| training_samples.append({ |
| 'state': state, |
| 'actual_doubt': actual_doubt, |
| 'session_id': session.session_id, |
| 'topic': session.topic |
| }) |
| |
| return training_samples |
| |
| def _extract_state_from_session(self, session: LearningSession, event_idx: int) -> np.ndarray: |
| """Extract 64-dim state vector from session events""" |
| events_so_far = session.events[:event_idx+1] |
| |
| |
| topic_hash = hash(session.topic) % 1000 |
| np.random.seed(topic_hash) |
| topic_emb = np.random.randn(32) * 0.1 |
| |
| |
| progress = min(event_idx / max(len(session.events), 1), 1.0) |
| |
| |
| recent_confusion = session.confusion_scores[-10:] if session.confusion_scores else [0] |
| confusion_features = [ |
| np.mean(recent_confusion), |
| np.std(recent_confusion) if len(recent_confusion) > 1 else 0, |
| recent_confusion[-1] if recent_confusion else 0, |
| ] * 5 + [0] * 1 |
| |
| |
| gesture_features = np.zeros(14) |
| for g, count in session.gesture_signals.items(): |
| idx = hash(g) % 14 |
| gesture_features[idx] = min(count / 20, 1.0) |
| |
| |
| if session.end_time: |
| time_spent = (session.end_time - session.start_time).total_seconds() |
| else: |
| time_spent = time.time() - session.start_time.timestamp() |
| |
| |
| state = np.concatenate([ |
| topic_emb, |
| [progress], |
| confusion_features[:16], |
| gesture_features, |
| [min(time_spent / 1800, 1.0)] |
| ]) |
| |
| return state |
| |
| def _get_doubt_at_time(self, session: LearningSession, timestamp: float) -> Optional[str]: |
| """Get doubt reported around this timestamp""" |
| for doubt in session.actual_doubts: |
| |
| return doubt |
| return None |
|
|
|
|
| class DataAugmentor: |
| """ |
| Augment collected data to improve model generalization. |
| Addresses: Synthetic Data Bias, Real-world Generalization |
| """ |
| |
| @staticmethod |
| def add_noise(state: np.ndarray, noise_level: float = 0.1) -> np.ndarray: |
| """Add Gaussian noise to state for augmentation""" |
| noise = np.random.randn(*state.shape) * noise_level |
| return state + noise |
| |
| @staticmethod |
| def scale_features(state: np.ndarray, scale_range: tuple = (0.8, 1.2)) -> np.ndarray: |
| """Randomly scale features for augmentation""" |
| scale = np.random.uniform(scale_range[0], scale_range[1], state.shape) |
| return state * scale |
| |
| @staticmethod |
| def shuffle_confusion_order(state: np.ndarray, n_shuffle: int = 3) -> np.ndarray: |
| """Shuffle some confusion features""" |
| augmented = state.copy() |
| confusion_start, confusion_end = 33, 49 |
| |
| indices = list(range(confusion_start, confusion_end)) |
| np.random.shuffle(indices) |
| |
| original = augmented[confusion_start:confusion_end].copy() |
| for i, j in enumerate(indices[:n_shuffle]): |
| augmented[confusion_start + i] = original[j] |
| |
| return augmented |
| |
| @staticmethod |
| def augment_batch(states: np.ndarray, labels: np.ndarray, |
| augment_ratio: float = 0.5) -> tuple: |
| """Augment a batch of training data""" |
| n_augment = int(len(states) * augment_ratio) |
| indices = np.random.choice(len(states), n_augment, replace=False) |
| |
| augmented_states = [] |
| augmented_labels = [] |
| |
| for idx in indices: |
| state = states[idx] |
| label = labels[idx] |
| |
| |
| if np.random.random() < 0.5: |
| state = DataAugmentor.add_noise(state) |
| if np.random.random() < 0.3: |
| state = DataAugmentor.scale_features(state) |
| if np.random.random() < 0.2: |
| state = DataAugmentor.shuffle_confusion_order(state) |
| |
| augmented_states.append(state) |
| augmented_labels.append(label) |
| |
| return np.vstack([states, np.array(augmented_states)]), \ |
| np.concatenate([labels, np.array(augmented_labels)]) |
|
|
|
|
| class DataValidator: |
| """ |
| Validate collected data quality. |
| Addresses: Validation Gap |
| """ |
| |
| @staticmethod |
| def validate_session(session: LearningSession) -> Dict[str, Any]: |
| """Validate a session has sufficient data""" |
| issues = [] |
| |
| if len(session.events) < 10: |
| issues.append("Insufficient events (need at least 10)") |
| |
| if not session.actual_doubts: |
| issues.append("No actual doubts recorded") |
| |
| if session.completion_status != "completed": |
| issues.append("Session not completed") |
| |
| if len(session.confusion_scores) < 3: |
| issues.append("Insufficient confusion observations") |
| |
| return { |
| 'valid': len(issues) == 0, |
| 'session_id': session.session_id, |
| 'issues': issues, |
| 'metrics': { |
| 'n_events': len(session.events), |
| 'n_doubts': len(session.actual_doubts), |
| 'n_confusion_scores': len(session.confusion_scores), |
| 'duration_minutes': (session.end_time - session.start_time).total_seconds() / 60 |
| if session.end_time else 0 |
| } |
| } |
| |
| @staticmethod |
| def get_benchmark_metrics(sessions: List[LearningSession]) -> Dict: |
| """Generate benchmark metrics from collected data""" |
| valid_sessions = [s for s in sessions |
| if DataValidator.validate_session(s)['valid']] |
| |
| if not valid_sessions: |
| return {'error': 'No valid sessions for benchmarking'} |
| |
| all_doubts = [] |
| for s in valid_sessions: |
| all_doubts.extend(s.actual_doubts) |
| |
| doubt_counts = defaultdict(int) |
| for d in all_doubts: |
| doubt_counts[d] += 1 |
| |
| return { |
| 'n_valid_sessions': len(valid_sessions), |
| 'total_doubts': len(all_doubts), |
| 'unique_doubts': len(doubt_counts), |
| 'top_doubts': sorted(doubt_counts.items(), key=lambda x: -x[1])[:10], |
| 'avg_confusion': np.mean([s.confusion_scores for s in valid_sessions |
| if s.confusion_scores]), |
| 'completion_rate': len(valid_sessions) / len(sessions) if sessions else 0 |
| } |
|
|
|
|
| |
| if __name__ == "__main__": |
| import argparse |
| |
| parser = argparse.ArgumentParser(description='ContextFlow Real Data Collector') |
| parser.add_argument('--user_id', required=True, help='User ID for this session') |
| parser.add_argument('--topic', required=True, help='Learning topic') |
| parser.add_argument('--simulate', action='store_true', help='Simulate data collection') |
| |
| args = parser.parse_args() |
| |
| collector = RealDataCollector(args.user_id) |
| collector.start_session(args.topic) |
| |
| if args.simulate: |
| print("Simulating session...") |
| for i in range(50): |
| collector.record_event('mouse_move', {'x': i, 'y': i*2}) |
| if i % 10 == 0: |
| collector.record_confusion(np.random.random()) |
| if i == 25: |
| collector.report_doubt('how_backpropagation_works') |
| |
| collector.end_session('completed') |
| collector.save_session() |
| print(f"Session saved with {len(collector.sessions[0].events)} events") |
| |
| print("Data collector ready. Use collector.record_event() to log events.") |
|
|