contextflow-rl / app /api /main.py
namish10's picture
Upload app/api/main.py with huggingface_hub
e386f0b verified
"""
API Blueprint
Main API routes for ContextFlow Research
"""
from flask import Blueprint, request, jsonify
from ..agents.study_orchestrator import StudyOrchestrator
from ..agents.doubt_predictor import DoubtPredictorAgent
from ..agents.behavioral_agent import BehavioralAgent
from ..agents.knowledge_graph_agent import KnowledgeGraphAgent
from ..agents.recall_agent import RecallAgent
from ..agents.peer_learning_agent import PeerLearningAgent
from ..agents.hand_gesture_agent import HandGestureAgent, GestureSignalMapper
from datetime import datetime
import uuid
import asyncio
def run_async(coro):
"""Run async coroutine in sync context"""
try:
loop = asyncio.get_running_loop()
# If already in async context, create new event loop
return asyncio.run(coro)
except RuntimeError:
# No running loop, safe to use asyncio.run
return asyncio.run(coro)
api = Blueprint('api', __name__)
orchestrators = {}
gesture_agents = {}
gesture_mappers = {}
def get_orchestrator(user_id: str) -> StudyOrchestrator:
"""Get or create orchestrator for user"""
if user_id not in orchestrators:
orchestrators[user_id] = StudyOrchestrator(user_id)
return orchestrators[user_id]
@api.route('/session/start', methods=['POST'])
def start_session():
"""Start a new learning session"""
data = request.json
user_id = data.get('user_id', 'anonymous')
topic = data.get('topic', 'General')
subtopic = data.get('subtopic', '')
orchestrator = get_orchestrator(user_id)
session = run_async(orchestrator.start_session(topic, subtopic))
return jsonify({
'session_id': session.session_id,
'topic': topic,
'predictions': [
{
'doubt': p.predicted_doubt,
'confidence': p.confidence,
'explanation': p.suggested_explanation,
'priority': p.priority
}
for p in session.predictions
],
'pending_reviews': len(orchestrator.state.pending_recalls),
'peer_insights_count': len(orchestrator.state.peer_insights)
})
@api.route('/session/update', methods=['POST'])
def update_session():
"""Update session with behavioral data"""
data = request.json
user_id = data.get('user_id', 'anonymous')
behavioral_data = data.get('behavioral_data', {})
captured_doubt = data.get('captured_doubt')
orchestrator = get_orchestrator(user_id)
run_async(orchestrator.update_session(behavioral_data, captured_doubt))
confusion_score = 0
if orchestrator.state.current_session and orchestrator.state.current_session.behavioral_signals:
signals = orchestrator.state.current_session.behavioral_signals[-5:]
confusion_score = orchestrator.behavioral_agent.calculate_confusion_score(signals)
return jsonify({
'status': 'updated',
'active_predictions': [p.predicted_doubt for p in orchestrator.state.active_predictions[-3:]],
'confusion_level': confusion_score
})
@api.route('/session/end', methods=['POST'])
def end_session():
"""End learning session"""
data = request.json
user_id = data.get('user_id', 'anonymous')
orchestrator = get_orchestrator(user_id)
summary = orchestrator.end_session()
return jsonify(summary)
@api.route('/session/insights', methods=['GET'])
def get_insights():
"""Get current session insights"""
user_id = request.args.get('user_id', 'anonymous')
orchestrator = get_orchestrator(user_id)
insights = orchestrator.get_active_insights()
return jsonify(insights)
@api.route('/predict/doubts', methods=['POST'])
def predict_doubts():
"""Predict doubts for learning context"""
data = request.json
user_id = data.get('user_id', 'anonymous')
context = data.get('context', {})
agent = DoubtPredictorAgent(user_id)
predictions = agent.predict_doubts(context, top_k=5)
return jsonify({
'predictions': [
{
'doubt': p.predicted_doubt,
'confidence': p.confidence,
'explanation': p.suggested_explanation,
'related_concepts': p.related_concepts,
'priority': p.priority,
'estimated_time': p.estimated_resolution_time,
'prerequisites': p.prerequisite_topics
}
for p in predictions
]
})
@api.route('/recommendations', methods=['POST'])
def get_recommendations():
"""Get learning recommendations"""
data = request.json
user_id = data.get('user_id', 'anonymous')
context = data.get('context', {})
agent = DoubtPredictorAgent(user_id)
recommendations = agent.get_learning_recommendations(context)
return jsonify(recommendations)
@api.route('/behavior/track', methods=['POST'])
def track_behavior():
"""Track behavioral signals"""
data = request.json
user_id = data.get('user_id', 'anonymous')
signals = data.get('signals', {})
agent = BehavioralAgent(user_id)
processed = agent.process_signals(signals)
confusion = agent.calculate_confusion_score(processed)
return jsonify({
'signals_processed': len(processed),
'confusion_score': confusion,
'should_capture_doubt': confusion > 0.7,
'summary': agent.get_behavior_summary()
})
@api.route('/behavior/heatmap', methods=['GET'])
def get_heatmap():
"""Get heatmap data"""
user_id = request.args.get('user_id', 'anonymous')
agent = BehavioralAgent(user_id)
heatmap = agent.generate_heatmap_data()
return jsonify({
'heatmap': heatmap
})
@api.route('/graph/add', methods=['POST'])
def add_to_graph():
"""Add doubt to knowledge graph"""
data = request.json
user_id = data.get('user_id', 'anonymous')
doubt_data = data.get('doubt', {})
agent = KnowledgeGraphAgent(user_id)
node = agent.add_doubt_to_graph(doubt_data)
return jsonify({
'node_id': node.node_id,
'type': node.node_type,
'label': node.label
})
@api.route('/graph/query', methods=['POST'])
def query_graph():
"""Query knowledge graph"""
data = request.json
user_id = data.get('user_id', 'anonymous')
query = data.get('query', '')
top_k = data.get('top_k', 5)
agent = KnowledgeGraphAgent(user_id)
results = agent.graphrag_retrieve(query, top_k)
return jsonify({
'results': results
})
@api.route('/graph/stats', methods=['GET'])
def get_graph_stats():
"""Get graph statistics"""
user_id = request.args.get('user_id', 'anonymous')
agent = KnowledgeGraphAgent(user_id)
stats = agent.get_graph_stats()
return jsonify(stats)
@api.route('/graph/path', methods=['GET'])
def find_learning_path():
"""Find learning path between topics"""
from_topic = request.args.get('from', '')
to_topic = request.args.get('to', '')
user_id = request.args.get('user_id', 'anonymous')
agent = KnowledgeGraphAgent(user_id)
path = agent.find_learning_path(from_topic, to_topic)
return jsonify({
'path': path,
'steps': len(path)
})
@api.route('/review/due', methods=['GET'])
def get_due_reviews():
"""Get cards due for review"""
user_id = request.args.get('user_id', 'anonymous')
topic = request.args.get('topic')
agent = RecallAgent(user_id)
recalls = run_async(agent.get_due_recalls(topic))
return jsonify({
'due_count': len(recalls),
'cards': [
{
'card_id': c.card_id,
'front': c.front,
'back': c.back,
'topic': c.topic,
'interval': c.interval,
'ease_factor': c.ease_factor
}
for c in recalls[:20]
]
})
@api.route('/review/complete', methods=['POST'])
def complete_review():
"""Complete a review"""
data = request.json
user_id = data.get('user_id', 'anonymous')
card_id = data.get('card_id')
quality = data.get('quality', 3)
agent = RecallAgent(user_id)
result = run_async(agent.complete_review(card_id, quality))
if result:
return jsonify({
'card_id': result.card_id,
'quality': result.quality,
'xp_earned': result.xp_earned,
'next_interval': result.new_interval,
'next_review': result.next_review.isoformat() if hasattr(result, 'next_review') else None
})
return jsonify({'error': 'Card not found'}), 404
@api.route('/review/stats', methods=['GET'])
def get_review_stats():
"""Get review statistics"""
user_id = request.args.get('user_id', 'anonymous')
agent = RecallAgent(user_id)
stats = agent.get_review_stats()
progress = agent.get_learning_progress()
return jsonify({
'stats': stats,
'progress': progress
})
@api.route('/peer/insights', methods=['GET'])
def get_peer_insights():
"""Get peer insights"""
topic = request.args.get('topic', 'General')
agent = PeerLearningAgent('anonymous')
insights = run_async(agent.get_peer_insights(topic))
return jsonify({
'insights': [
{
'type': i.insight_type if hasattr(i, 'insight_type') else 'general',
'content': i.content if hasattr(i, 'content') else str(i),
'related_topics': i.related_topics if hasattr(i, 'related_topics') else [],
'confidence': i.confidence if hasattr(i, 'confidence') else 0.5,
'peer_count': i.peer_count if hasattr(i, 'peer_count') else 1
}
for i in insights
]
})
@api.route('/peer/doubts', methods=['GET'])
def get_peer_doubts():
"""Get peer doubts"""
topic = request.args.get('topic', 'General')
limit = int(request.args.get('limit', 10))
agent = PeerLearningAgent('anonymous')
doubts = run_async(agent.get_peer_doubts(topic, limit))
return jsonify({
'doubts': [
{
'id': d.doubt_id if hasattr(d, 'doubt_id') else str(d),
'content': d.content if hasattr(d, 'content') else str(d),
'resolved': d.resolved if hasattr(d, 'resolved') else False,
'upvotes': d.upvotes if hasattr(d, 'upvotes') else 0,
'similarity': d.similarity_score if hasattr(d, 'similarity_score') else 0.5
}
for d in doubts
]
})
@api.route('/peer/trending', methods=['GET'])
def get_trending():
"""Get trending topics"""
agent = PeerLearningAgent('anonymous')
trending = run_async(agent.get_trending_topics())
return jsonify({
'trending': trending if isinstance(trending, list) else []
})
@api.route('/health', methods=['GET'])
def health():
"""Health check"""
return jsonify({
'status': 'healthy',
'service': 'ContextFlow Research API',
'timestamp': datetime.now().isoformat(),
'active_sessions': len(orchestrators)
})
# ===== Hand Gesture Training API =====
def get_gesture_agent(user_id: str) -> HandGestureAgent:
"""Get or create gesture agent for user"""
if user_id not in gesture_agents:
gesture_agents[user_id] = HandGestureAgent(user_id)
return gesture_agents[user_id]
def get_gesture_mapper(user_id: str) -> GestureSignalMapper:
"""Get or create gesture mapper for user"""
if user_id not in gesture_mappers:
gesture_mappers[user_id] = GestureSignalMapper()
return gesture_mappers[user_id]
@api.route('/gesture/list', methods=['GET'])
def list_gestures():
"""List all available gestures"""
user_id = request.args.get('user_id', 'anonymous')
agent = get_gesture_agent(user_id)
gestures = agent.get_trained_gestures()
return jsonify({
'gestures': gestures,
'count': len(gestures)
})
@api.route('/gesture/add', methods=['POST'])
def add_gesture():
"""Add a new custom gesture to train"""
data = request.json
user_id = data.get('user_id', 'anonymous')
name = data.get('name', 'Custom Gesture')
description = data.get('description', '')
gesture_type = data.get('type', 'custom')
agent = get_gesture_agent(user_id)
gesture_id = agent.add_custom_gesture(name, description, gesture_type)
return jsonify({
'gesture_id': gesture_id,
'name': name,
'message': f"Added '{name}'. Start training to teach the model."
})
@api.route('/gesture/training/start', methods=['POST'])
def start_gesture_training():
"""Start training a gesture"""
data = request.json
user_id = data.get('user_id', 'anonymous')
gesture_id = data.get('gesture_id')
agent = get_gesture_agent(user_id)
result = agent.start_training(gesture_id)
return jsonify(result)
@api.route('/gesture/training/sample', methods=['POST'])
def add_gesture_sample():
"""Add a training sample (hand landmarks)"""
data = request.json
user_id = data.get('user_id', 'anonymous')
landmarks = data.get('landmarks', [])
agent = get_gesture_agent(user_id)
result = agent.add_training_sample(landmarks)
return jsonify(result)
@api.route('/gesture/training/cancel', methods=['POST'])
def cancel_gesture_training():
"""Cancel current training"""
user_id = request.json.get('user_id', 'anonymous')
agent = get_gesture_agent(user_id)
agent.cancel_training()
return jsonify({'message': 'Training cancelled'})
@api.route('/gesture/recognition/enable', methods=['POST'])
def enable_gesture_recognition():
"""Enable real-time gesture recognition"""
data = request.json
user_id = data.get('user_id', 'anonymous')
agent = get_gesture_agent(user_id)
result = agent.enable_recognition()
return jsonify(result)
@api.route('/gesture/recognition/disable', methods=['POST'])
def disable_gesture_recognition():
"""Disable gesture recognition"""
user_id = request.json.get('user_id', 'anonymous')
agent = get_gesture_agent(user_id)
result = agent.disable_recognition()
return jsonify(result)
@api.route('/gesture/recognize', methods=['POST'])
def recognize_gesture():
"""Recognize gesture from landmarks"""
data = request.json
user_id = data.get('user_id', 'anonymous')
landmarks = data.get('landmarks', [])
agent = get_gesture_agent(user_id)
recognition = agent.recognize(landmarks)
if recognition:
mapper = get_gesture_mapper(user_id)
signal = mapper.map_to_signal(recognition)
return jsonify({
'recognized': True,
'gesture': {
'name': recognition.gesture_name,
'confidence': recognition.confidence,
'type': recognition.gesture_type
},
'signal': signal
})
return jsonify({'recognized': False})
@api.route('/gesture/delete', methods=['POST'])
def delete_gesture():
"""Delete a gesture"""
data = request.json
user_id = data.get('user_id', 'anonymous')
gesture_id = data.get('gesture_id')
agent = get_gesture_agent(user_id)
result = agent.delete_gesture(gesture_id)
return jsonify(result)
@api.route('/gesture/export', methods=['GET'])
def export_gestures():
"""Export gesture model"""
user_id = request.args.get('user_id', 'anonymous')
agent = get_gesture_agent(user_id)
model = agent.export_model()
return jsonify(model)
@api.route('/gesture/import', methods=['POST'])
def import_gestures():
"""Import gesture model"""
data = request.json
user_id = data.get('user_id', 'anonymous')
model_data = data.get('model', {})
agent = get_gesture_agent(user_id)
agent.import_model(model_data)
return jsonify({'success': True, 'message': 'Gesture model imported'})
# ===== LLM Flow API =====
from ..agents.llm_orchestrator_agent import LLMOrchestrator, LLMRequest, LLMProvider
from ..agents.gesture_action_agent import GestureActionMapper, RLLearningLoop, GestureAction
from ..agents.prompt_agent import PromptAgent, AutoSubmitAgent
llm_orchestrators = {}
gesture_action_mappers = {}
rl_loops = {}
prompt_agents = {}
auto_submit_agents = {}
def get_llm_orchestrator(user_id: str) -> LLMOrchestrator:
"""Get or create LLM orchestrator for user"""
if user_id not in llm_orchestrators:
llm_orchestrators[user_id] = LLMOrchestrator()
return llm_orchestrators[user_id]
def get_gesture_action_mapper(user_id: str) -> GestureActionMapper:
"""Get or create gesture action mapper for user"""
if user_id not in gesture_action_mappers:
gesture_action_mappers[user_id] = GestureActionMapper()
return gesture_action_mappers[user_id]
def get_rl_loop(user_id: str) -> RLLearningLoop:
"""Get or create RL learning loop for user"""
if user_id not in rl_loops:
rl_loops[user_id] = RLLearningLoop(user_id)
return rl_loops[user_id]
def get_prompt_agent(user_id: str) -> PromptAgent:
"""Get or create prompt agent for user"""
if user_id not in prompt_agents:
prompt_agents[user_id] = PromptAgent()
return prompt_agents[user_id]
def get_auto_submit_agent(user_id: str) -> AutoSubmitAgent:
"""Get or create auto submit agent for user"""
if user_id not in auto_submit_agents:
prompt_agent = get_prompt_agent(user_id)
auto_submit_agents[user_id] = AutoSubmitAgent(prompt_agent)
return auto_submit_agents[user_id]
@api.route('/llm/query', methods=['POST'])
def llm_query():
"""Query LLM with prompt"""
data = request.json
user_id = data.get('user_id', 'anonymous')
prompt = data.get('prompt', '')
system_prompt = data.get('system_prompt', 'You are a helpful learning assistant.')
providers = data.get('providers', ['chatgpt', 'gemini'])
api_keys = data.get('api_keys', {})
models = data.get('models', {})
llm_provider_map = {
'chatgpt': LLMProvider.CHATGPT,
'gemini': LLMProvider.GEMINI,
'claude': LLMProvider.CLAUDE,
'deepseek': LLMProvider.DEEPSEEK,
'ollama': LLMProvider.OLLAMA,
'groq': LLMProvider.GROQ
}
provider_enums = [llm_provider_map.get(p, LLMProvider.CHATGPT) for p in providers]
orchestrator = get_llm_orchestrator(user_id)
for provider_name, api_key in api_keys.items():
if api_key:
orchestrator.api_keys[provider_name] = api_key
for provider_name, model in models.items():
if provider_name in orchestrator.provider_configs:
config = orchestrator.provider_configs[provider_name]
if isinstance(config, dict):
config['model'] = model
config['model_name'] = model
request_obj = LLMRequest(
prompt=prompt,
system_prompt=system_prompt,
providers=provider_enums,
user_id=user_id,
models=models
)
responses = run_async(orchestrator.query_parallel(request_obj))
return jsonify({
'responses': [
{
'provider': r.provider.value if hasattr(r, 'provider') else 'unknown',
'content': r.content if hasattr(r, 'content') else str(r),
'success': r.success if hasattr(r, 'success') else True,
'error': r.error if hasattr(r, 'error') else None,
'latency_ms': r.latency_ms if hasattr(r, 'latency_ms') else 0
}
for r in responses
],
'rate_limits': orchestrator.get_rate_limit_status()
})
@api.route('/llm/rate-limits', methods=['GET'])
def get_rate_limits():
"""Get current rate limit status"""
user_id = request.args.get('user_id', 'anonymous')
orchestrator = get_llm_orchestrator(user_id)
return jsonify(orchestrator.get_rate_limit_status())
@api.route('/llm/gesture-action', methods=['POST'])
def gesture_action():
"""Process gesture and trigger LLM action"""
data = request.json
user_id = data.get('user_id', 'anonymous')
landmarks = data.get('landmarks', [])
context = data.get('context', {})
mapper = get_gesture_action_mapper(user_id)
events = mapper.process_landmarks(landmarks, context)
results = []
for event in events:
orchestrator = get_llm_orchestrator(user_id)
rl_loop = get_rl_loop(user_id)
executed_event = mapper.execute_action(event, orchestrator, rl_loop)
results.append({
'action': executed_event.action.value,
'gesture': executed_event.gesture_name,
'confidence': executed_event.confidence,
'llm_responses': executed_event.llm_responses,
'rl_feedback': executed_event.rl_feedback
})
return jsonify({
'events': results,
'available_actions': mapper.get_available_actions()
})
@api.route('/llm/gesture-actions', methods=['GET'])
def get_available_gesture_actions():
"""Get available gesture actions"""
user_id = request.args.get('user_id', 'anonymous')
mapper = get_gesture_action_mapper(user_id)
return jsonify({
'actions': mapper.get_available_actions()
})
@api.route('/llm/rl/start', methods=['POST'])
def start_rl_loop():
"""Start RL learning loop"""
data = request.json
user_id = data.get('user_id', 'anonymous')
context = data.get('context', {})
rl_loop = get_rl_loop(user_id)
rl_loop.start_loop(context)
return jsonify({
'status': 'started',
'user_id': user_id
})
@api.route('/llm/rl/interact', methods=['POST'])
def rl_interact():
"""Add interaction to RL loop"""
data = request.json
user_id = data.get('user_id', 'anonymous')
action = data.get('action', '')
response = data.get('response', '')
rl_loop = get_rl_loop(user_id)
rl_loop.add_interaction(action, response)
return jsonify({
'status': 'recorded',
'total_interactions': len(rl_loop.conversation_history)
})
@api.route('/llm/rl/feedback', methods=['POST'])
def rl_feedback():
"""Add feedback to RL loop"""
data = request.json
user_id = data.get('user_id', 'anonymous')
quality = data.get('quality', 3)
comment = data.get('comment')
rl_loop = get_rl_loop(user_id)
rl_loop.add_feedback(quality, comment)
return jsonify({
'status': 'feedback_recorded',
'average_reward': sum(rl_loop.reward_history) / len(rl_loop.reward_history) if rl_loop.reward_history else 0
})
@api.route('/llm/rl/status', methods=['GET'])
def get_rl_status():
"""Get RL loop status"""
user_id = request.args.get('user_id', 'anonymous')
rl_loop = get_rl_loop(user_id)
return jsonify(rl_loop.get_status())
@api.route('/llm/rl/end', methods=['POST'])
def end_rl_loop():
"""End RL learning loop"""
data = request.json
user_id = data.get('user_id', 'anonymous')
rl_loop = get_rl_loop(user_id)
rl_loop.end_loop()
return jsonify({
'status': 'ended',
'summary': rl_loop.get_status()
})
@api.route('/llm/prompt/generate', methods=['POST'])
def generate_prompt():
"""Generate prompt from template"""
data = request.json
user_id = data.get('user_id', 'anonymous')
template = data.get('template', 'learning_explain')
context = data.get('context', {})
prompt_agent = get_prompt_agent(user_id)
prompt_agent.update_context(context)
prompt = prompt_agent.generate_prompt(template, context)
return jsonify({
'prompt': prompt.content,
'template': prompt.template_used,
'llm_targets': prompt.llm_targets,
'auto_submit': prompt.auto_submit
})
@api.route('/llm/prompt/from-gesture', methods=['POST'])
def generate_prompt_from_gesture():
"""Generate prompt based on gesture action"""
data = request.json
user_id = data.get('user_id', 'anonymous')
gesture = data.get('gesture', '')
context = data.get('context', {})
prompt_agent = get_prompt_agent(user_id)
prompt = prompt_agent.generate_from_gesture(gesture, context)
return jsonify({
'prompt': prompt.content,
'template': prompt.template_used,
'llm_targets': prompt.llm_targets,
'auto_submit': prompt.auto_submit
})
@api.route('/llm/prompt/suggest', methods=['GET'])
def suggest_prompts():
"""Get suggested prompts based on context"""
user_id = request.args.get('user_id', 'anonymous')
context_str = request.args.get('context', '{}')
import json
context = json.loads(context_str)
prompt_agent = get_prompt_agent(user_id)
suggestions = prompt_agent.get_suggested_prompts(context)
return jsonify({
'suggestions': [
{
'prompt': s.content,
'template': s.template_used,
'llm_targets': s.llm_targets
}
for s in suggestions
]
})
@api.route('/llm/auto-submit/prepare', methods=['POST'])
def prepare_auto_submit():
"""Prepare prompt for auto submission"""
data = request.json
user_id = data.get('user_id', 'anonymous')
prompt_content = data.get('prompt', '')
template = data.get('template')
prompt_agent = get_prompt_agent(user_id)
from ..agents.prompt_agent import GeneratedPrompt
prompt = GeneratedPrompt(
content=prompt_content,
template_used=template,
context={},
llm_targets=['chatgpt'],
auto_submit=True
)
auto_submit = get_auto_submit_agent(user_id)
submission = auto_submit.prepare_submission(prompt)
return jsonify({
'status': 'ready',
'submission_id': len(submission)
})
@api.route('/llm/auto-submit/status', methods=['GET'])
def get_auto_submit_status():
"""Get auto submit status"""
user_id = request.args.get('user_id', 'anonymous')
auto_submit = get_auto_submit_agent(user_id)
return jsonify(auto_submit.get_submission_status())
@api.route('/llm/gesture-training/swipe', methods=['POST'])
def train_swipe_gesture():
"""Train a swipe gesture"""
data = request.json
user_id = data.get('user_id', 'anonymous')
gesture_name = data.get('gesture_name', 'swipe_right')
action = data.get('action', 'query_multi_llm')
finger_count = data.get('finger_count', 2)
mapper = get_gesture_action_mapper(user_id)
gesture_pattern = {
'type': 'swipe',
'finger_count': finger_count,
'name': gesture_name
}
action_enum = GestureAction(action) if action in [a.value for a in GestureAction] else GestureAction.CUSTOM
mapper.add_custom_mapping(
name=gesture_name,
gesture_pattern=gesture_pattern,
action=action_enum,
parameters={'user_defined': True}
)
return jsonify({
'success': True,
'gesture_name': gesture_name,
'action': action_enum.value
})