| import streamlit as st |
| import os |
| import PyPDF2 |
| import docx |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_community.vectorstores import Chroma |
| from groq import Groq |
| from langchain_core.prompts import PromptTemplate |
| import json |
| import random |
| import plotly.graph_objects as go |
| import plotly.express as px |
| import pandas as pd |
| from datetime import datetime |
|
|
| |
|
|
| class DocumentProcessor: |
| def __init__(self): |
| |
| self.embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
| self.text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=1000, |
| chunk_overlap=200 |
| ) |
| |
| def extract_text_from_pdf(self, pdf_path): |
| """Extract text from PDF file""" |
| text = "" |
| with open(pdf_path, 'rb') as file: |
| pdf_reader = PyPDF2.PdfReader(file) |
| for page in pdf_reader.pages: |
| text += page.extract_text() |
| return text |
| |
| def extract_text_from_docx(self, docx_path): |
| """Extract text from DOCX file""" |
| doc = docx.Document(docx_path) |
| text = "" |
| for paragraph in doc.paragraphs: |
| text += paragraph.text + "\n" |
| return text |
| |
| def process_document(self, file_path, file_type): |
| """Process document and create vector store""" |
| if file_type.lower() == 'pdf': |
| text = self.extract_text_from_pdf(file_path) |
| elif file_type.lower() in ['docx', 'doc']: |
| text = self.extract_text_from_docx(file_path) |
| else: |
| raise ValueError("Unsupported file type") |
| |
| chunks = self.text_splitter.split_text(text) |
| |
| vectorstore = Chroma.from_texts( |
| texts=chunks, |
| embedding=self.embeddings |
| ) |
| |
| return vectorstore, len(chunks) |
|
|
| class RAGLearningSystem: |
| def __init__(self, vectorstore): |
| |
| if "GROQ_API_KEY" not in os.environ: |
| st.error("Groq API key is required for generating responses.") |
| st.stop() |
| self.llm = Groq(api_key=os.environ["GROQ_API_KEY"]) |
|
|
| self.vectorstore = vectorstore |
| self.retriever = vectorstore.as_retriever(search_kwargs={"k": 3}) |
| |
| |
| self.story_prompt = PromptTemplate( |
| input_variables=["context", "topic"], |
| template=""" |
| Based on the following context from the book, explain {topic} as an engaging story. |
| Make it educational yet entertaining, using metaphors, analogies, and narrative elements. |
| |
| Context: {context} |
| |
| Create a story explanation for {topic}: |
| """ |
| ) |
| |
| |
| self.mcq_prompt = PromptTemplate( |
| input_variables=["context", "topic"], |
| template=""" |
| Based on this context about {topic}, create 3 multiple choice questions. |
| Format as JSON with structure: |
| {{ |
| "questions": [ |
| {{ |
| "question": "Question text", |
| "options": ["A. Option 1", "B. Option 2", "C. Option 3", "D. Option 4"], |
| "correct": "A", |
| "explanation": "Why this answer is correct" |
| }} |
| ] |
| }} |
| |
| Context: {context} |
| """ |
| ) |
| |
| self.fill_blank_prompt = PromptTemplate( |
| input_variables=["context", "topic"], |
| template=""" |
| Based on this context about {topic}, create 3 fill-in-the-blank questions. |
| Format as JSON with structure: |
| {{ |
| "questions": [ |
| {{ |
| "question": "Question with _____ blank", |
| "answer": "correct answer", |
| "hint": "helpful hint" |
| }} |
| ] |
| }} |
| |
| Context: {context} |
| """ |
| ) |
| |
| self.match_prompt = PromptTemplate( |
| input_variables=["context", "topic"], |
| template=""" |
| Based on this context about {topic}, create a matching exercise with 4 pairs. |
| Format as JSON with structure: |
| {{ |
| "left_items": ["Item 1", "Item 2", "Item 3", "Item 4"], |
| "right_items": ["Match A", "Match B", "Match C", "Match D"], |
| "correct_matches": {{"Item 1": "Match A", "Item 2": "Match B", "Item 3": "Match C", "Item 4": "Match D"}} |
| }} |
| |
| Context: {context} |
| """ |
| ) |
| |
| def get_story_explanation(self, topic): |
| docs = self.retriever.get_relevant_documents(topic) |
| context = "\n".join([doc.page_content for doc in docs]) |
| |
| response = self.llm.chat.completions.create( |
| messages=[ |
| { |
| "role": "user", |
| "content": self.story_prompt.format(context=context, topic=topic), |
| } |
| ], |
| model="llama3-8b-8192", |
| ) |
| |
| return response.choices[0].message.content |
| |
| def generate_mcq_questions(self, topic): |
| docs = self.retriever.get_relevant_documents(topic) |
| context = "\n".join([doc.page_content for doc in docs]) |
| |
| response = self.llm.chat.completions.create( |
| messages=[ |
| { |
| "role": "user", |
| "content": self.mcq_prompt.format(context=context, topic=topic), |
| } |
| ], |
| model="llama3-8b-8192", |
| response_format={"type": "json_object"}, |
| ) |
| |
| try: |
| return json.loads(response.choices[0].message.content) |
| except json.JSONDecodeError: |
| return {"questions": []} |
| |
| def generate_fill_blank_questions(self, topic): |
| docs = self.retriever.get_relevant_documents(topic) |
| context = "\n".join([doc.page_content for doc in docs]) |
| |
| response = self.llm.chat.completions.create( |
| messages=[ |
| { |
| "role": "user", |
| "content": self.fill_blank_prompt.format(context=context, topic=topic), |
| } |
| ], |
| model="llama3-8b-8192", |
| response_format={"type": "json_object"}, |
| ) |
| |
| try: |
| return json.loads(response.choices[0].message.content) |
| except json.JSONDecodeError: |
| return {"questions": []} |
| |
| def generate_matching_questions(self, topic): |
| docs = self.retriever.get_relevant_documents(topic) |
| context = "\n".join([doc.page_content for doc in docs]) |
| |
| response = self.llm.chat.completions.create( |
| messages=[ |
| { |
| "role": "user", |
| "content": self.match_prompt.format(context=context, topic=topic), |
| } |
| ], |
| model="llama3-8b-8192", |
| response_format={"type": "json_object"}, |
| ) |
| |
| try: |
| return json.loads(response.choices[0].message.content) |
| except json.JSONDecodeError: |
| return {"left_items": [], "right_items": [], "correct_matches": {}} |
|
|
| class LearningGames: |
| def __init__(self): |
| self.init_session_state() |
| |
| def init_session_state(self): |
| if 'game_scores' not in st.session_state: |
| st.session_state.game_scores = { |
| 'mcq': [], |
| 'fill_blank': [], |
| 'matching': [] |
| } |
| |
| if 'current_topic' not in st.session_state: |
| st.session_state.current_topic = "" |
| |
| def play_mcq_game(self, questions, topic): |
| st.subheader(f"๐ฏ Multiple Choice Quiz: {topic}") |
| if not questions.get('questions'): |
| st.error("No questions available for this topic.") |
| return |
| |
| score = 0 |
| total_questions = len(questions['questions']) |
| with st.form("mcq_form"): |
| answers = {} |
| for i, q in enumerate(questions['questions']): |
| st.write(f"**Question {i+1}:** {q['question']}") |
| answers[i] = st.radio( |
| f"Select answer for Q{i+1}:", |
| q['options'], |
| key=f"mcq_{i}" |
| ) |
| st.write("---") |
| |
| submitted = st.form_submit_button("Submit Quiz") |
| if submitted: |
| for i, q in enumerate(questions['questions']): |
| selected = answers[i] |
| correct = q['correct'] |
| if selected.startswith(correct): |
| score += 1 |
| st.success(f"Q{i+1}: Correct! โ
") |
| else: |
| st.error(f"Q{i+1}: Wrong. Correct answer: {correct}") |
| st.info(f"Explanation: {q.get('explanation', 'No explanation provided')}") |
| |
| percentage = (score / total_questions) * 100 |
| st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**") |
| st.session_state.game_scores['mcq'].append({ |
| 'topic': topic, |
| 'score': percentage, |
| 'timestamp': datetime.now(), |
| 'questions_attempted': total_questions |
| }) |
| return percentage |
| |
| def play_fill_blank_game(self, questions, topic): |
| st.subheader(f"๐ Fill in the Blanks: {topic}") |
| if not questions.get('questions'): |
| st.error("No questions available for this topic.") |
| return |
| |
| score = 0 |
| total_questions = len(questions['questions']) |
| with st.form("fill_blank_form"): |
| answers = {} |
| for i, q in enumerate(questions['questions']): |
| st.write(f"**Question {i+1}:** {q['question']}") |
| st.write(f"๐ก Hint: {q.get('hint', 'No hint available')}") |
| answers[i] = st.text_input( |
| f"Your answer for Q{i+1}:", |
| key=f"fill_{i}" |
| ) |
| st.write("---") |
| |
| submitted = st.form_submit_button("Submit Answers") |
| if submitted: |
| for i, q in enumerate(questions['questions']): |
| user_answer = answers[i].strip().lower() |
| correct_answer = q['answer'].strip().lower() |
| if user_answer == correct_answer: |
| score += 1 |
| st.success(f"Q{i+1}: Correct! โ
") |
| else: |
| st.error(f"Q{i+1}: Wrong. Correct answer: {q['answer']}") |
| |
| percentage = (score / total_questions) * 100 |
| st.write(f"**Final Score: {score}/{total_questions} ({percentage:.1f}%)**") |
| st.session_state.game_scores['fill_blank'].append({ |
| 'topic': topic, |
| 'score': percentage, |
| 'timestamp': datetime.now(), |
| 'questions_attempted': total_questions |
| }) |
| return percentage |
| |
| def play_matching_game(self, questions, topic): |
| st.subheader(f"๐ Match the Following: {topic}") |
| if not questions.get('left_items') or not questions.get('right_items'): |
| st.error("No matching pairs available for this topic.") |
| return |
| |
| left_items = questions['left_items'] |
| right_items = questions['right_items'].copy() |
| correct_matches = questions['correct_matches'] |
| random.shuffle(right_items) |
| |
| score = 0 |
| total_pairs = len(left_items) |
| with st.form("matching_form"): |
| matches = {} |
| st.write("Match each item on the left with the correct item on the right:") |
| for i, left_item in enumerate(left_items): |
| matches[left_item] = st.selectbox( |
| f"**{left_item}** matches with:", |
| ["Select..."] + right_items, |
| key=f"match_{i}" |
| ) |
| |
| submitted = st.form_submit_button("Submit Matches") |
| if submitted: |
| for left_item, user_match in matches.items(): |
| correct_match = correct_matches.get(left_item, "") |
| if user_match == correct_match: |
| score += 1 |
| st.success(f"โ
{left_item} โ {user_match} (Correct!)") |
| else: |
| st.error(f"โ {left_item} โ {user_match} (Wrong! Correct: {correct_match})") |
| |
| percentage = (score / total_pairs) * 100 |
| st.write(f"**Final Score: {score}/{total_pairs} ({percentage:.1f}%)**") |
| st.session_state.game_scores['matching'].append({ |
| 'topic': topic, |
| 'score': percentage, |
| 'timestamp': datetime.now(), |
| 'questions_attempted': total_pairs |
| }) |
| return percentage |
|
|
| class LearningDashboard: |
| def __init__(self): |
| pass |
| |
| def show_dashboard(self): |
| st.title("๐ Learning Analytics Dashboard") |
| if not any(st.session_state.game_scores.values()): |
| st.info("No learning data available yet. Complete some games to see your analytics!") |
| return |
| |
| self.show_overall_stats() |
| col1, col2 = st.columns(2) |
| with col1: |
| self.show_game_type_performance() |
| with col2: |
| self.show_topic_performance() |
| self.show_progress_over_time() |
| self.show_strengths_weaknesses() |
| |
| def show_overall_stats(self): |
| st.subheader("๐ฏ Overall Performance") |
| all_scores = [] |
| for game_type, scores in st.session_state.game_scores.items(): |
| for score_data in scores: |
| all_scores.append({ |
| 'game_type': game_type, |
| 'score': score_data['score'], |
| 'topic': score_data['topic'], |
| 'timestamp': score_data['timestamp'] |
| }) |
| if not all_scores: |
| return |
| df = pd.DataFrame(all_scores) |
| col1, col2, col3, col4 = st.columns(4) |
| with col1: |
| avg_score = df['score'].mean() |
| st.metric("Average Score", f"{avg_score:.1f}%") |
| with col2: |
| total_games = len(df) |
| st.metric("Games Played", total_games) |
| with col3: |
| best_score = df['score'].max() |
| st.metric("Best Score", f"{best_score:.1f}%") |
| with col4: |
| unique_topics = df['topic'].nunique() |
| st.metric("Topics Studied", unique_topics) |
| |
| def show_game_type_performance(self): |
| st.subheader("๐ฎ Performance by Game Type") |
| game_averages = {} |
| for game_type, scores in st.session_state.game_scores.items(): |
| if scores: |
| avg_score = sum(score['score'] for score in scores) / len(scores) |
| game_averages[game_type] = avg_score |
| if game_averages: |
| fig = go.Figure(data=[ |
| go.Bar( |
| x=list(game_averages.keys()), |
| y=list(game_averages.values()), |
| marker_color=['#FF6B6B', '#4ECDC4', '#45B7D1'] |
| ) |
| ]) |
| fig.update_layout( |
| title="Average Score by Game Type", |
| xaxis_title="Game Type", |
| yaxis_title="Average Score (%)", |
| showlegend=False |
| ) |
| st.plotly_chart(fig, use_container_width=True) |
| |
| def show_topic_performance(self): |
| st.subheader("๐ Performance by Topic") |
| topic_scores = {} |
| for game_type, scores in st.session_state.game_scores.items(): |
| for score_data in scores: |
| topic = score_data['topic'] |
| if topic not in topic_scores: |
| topic_scores[topic] = [] |
| topic_scores[topic].append(score_data['score']) |
| topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()} |
| if topic_averages: |
| fig = go.Figure(data=[ |
| go.Bar( |
| x=list(topic_averages.keys()), |
| y=list(topic_averages.values()), |
| marker_color='#96CEB4' |
| ) |
| ]) |
| fig.update_layout( |
| title="Average Score by Topic", |
| xaxis_title="Topic", |
| yaxis_title="Average Score (%)", |
| showlegend=False |
| ) |
| st.plotly_chart(fig, use_container_width=True) |
| |
| def show_progress_over_time(self): |
| st.subheader("๐ Progress Over Time") |
| all_data = [] |
| for game_type, scores in st.session_state.game_scores.items(): |
| for score_data in scores: |
| all_data.append({ |
| 'timestamp': score_data['timestamp'], |
| 'score': score_data['score'], |
| 'game_type': game_type, |
| 'topic': score_data['topic'] |
| }) |
| if all_data: |
| df = pd.DataFrame(all_data) |
| df = df.sort_values('timestamp') |
| fig = px.line(df, x='timestamp', y='score', |
| color='game_type', |
| title="Score Progress Over Time", |
| labels={'timestamp': 'Time', 'score': 'Score (%)'}) |
| st.plotly_chart(fig, use_container_width=True) |
| |
| def show_strengths_weaknesses(self): |
| st.subheader("๐ช Strengths & Areas for Improvement") |
| game_averages = {} |
| topic_averages = {} |
| for game_type, scores in st.session_state.game_scores.items(): |
| if scores: |
| game_averages[game_type] = sum(score['score'] for score in scores) / len(scores) |
| topic_scores = {} |
| for game_type, scores in st.session_state.game_scores.items(): |
| for score_data in scores: |
| topic = score_data['topic'] |
| if topic not in topic_scores: |
| topic_scores[topic] = [] |
| topic_scores[topic].append(score_data['score']) |
| topic_averages = {topic: sum(scores)/len(scores) for topic, scores in topic_scores.items()} |
| col1, col2 = st.columns(2) |
| with col1: |
| st.write("**๐ฏ Strengths:**") |
| if game_averages: |
| best_game = max(game_averages, key=game_averages.get) |
| st.success(f"โข Excellent at {best_game} games ({game_averages[best_game]:.1f}% avg)") |
| if topic_averages: |
| best_topic = max(topic_averages, key=topic_averages.get) |
| st.success(f"โข Strong understanding of {best_topic} ({topic_averages[best_topic]:.1f}% avg)") |
| with col2: |
| st.write("**๐ Areas for Improvement:**") |
| if game_averages: |
| weak_game = min(game_averages, key=game_averages.get) |
| if game_averages[weak_game] < 80: |
| st.warning(f"โข Practice {weak_game} games more ({game_averages[weak_game]:.1f}% avg)") |
| if topic_averages: |
| weak_topic = min(topic_averages, key=topic_averages.get) |
| if topic_averages[weak_topic] < 80: |
| st.warning(f"โข Review {weak_topic} concepts ({topic_averages[weak_topic]:.1f}% avg)") |
| st.subheader("๐ Personalized Recommendations") |
| if game_averages: |
| overall_avg = sum(game_averages.values()) / len(game_averages) |
| if overall_avg >= 90: |
| st.success("๐ Excellent performance! You're mastering the material well.") |
| elif overall_avg >= 75: |
| st.info("๐ Good progress! Focus on your weaker areas to improve further.") |
| else: |
| st.warning("๐ Keep practicing! Consider reviewing the story explanations before attempting games.") |
|
|
| |
|
|
| def upload_and_process_page(doc_processor): |
| st.header("๐ Process Your Learning Material") |
| |
| |
| file_path = "ragdatascience.pdf" |
| file_extension = "pdf" |
| |
| st.info(f"Processing the pre-uploaded file: `{file_path}`") |
| |
| if st.button("Process Document"): |
| with st.spinner("Processing document..."): |
| try: |
| vectorstore, chunk_count = doc_processor.process_document( |
| file_path, file_extension |
| ) |
| st.session_state.vectorstore = vectorstore |
| st.session_state.document_name = file_path |
| st.success(f"Document processed successfully! Created {chunk_count} text chunks.") |
| st.info("You can now go to 'Learn Topic' to start learning!") |
| except Exception as e: |
| st.error(f"Error processing document: {str(e)}") |
|
|
| def learn_topic_page(rag_system): |
| st.header("๐ Learn About Any Topic") |
| topic = st.text_input("What would you like to learn about?", |
| placeholder="e.g., machine learning algorithms, statistics, data visualization") |
| if st.button("Get Story Explanation") and topic: |
| with st.spinner("Generating story explanation..."): |
| try: |
| story = rag_system.get_story_explanation(topic) |
| st.session_state.current_topic = topic |
| st.subheader(f"๐ Story: {topic}") |
| st.write(story) |
| st.success("Story generated! Now you can test your understanding with games.") |
| except Exception as e: |
| st.error(f"Error generating explanation: {str(e)}") |
|
|
| def play_games_page(rag_system, games): |
| st.header("๐ฎ Test Your Knowledge") |
| topic = st.text_input("Enter topic to test:", |
| value=st.session_state.get('current_topic', '')) |
| if topic: |
| game_type = st.selectbox("Choose game type:", |
| ["Multiple Choice", "Fill in the Blanks", "Matching"]) |
| if st.button("Generate Questions"): |
| with st.spinner("Generating questions..."): |
| try: |
| if game_type == "Multiple Choice": |
| questions = rag_system.generate_mcq_questions(topic) |
| games.play_mcq_game(questions, topic) |
| elif game_type == "Fill in the Blanks": |
| questions = rag_system.generate_fill_blank_questions(topic) |
| games.play_fill_blank_game(questions, topic) |
| elif game_type == "Matching": |
| questions = rag_system.generate_matching_questions(topic) |
| games.play_matching_game(questions, topic) |
| except Exception as e: |
| st.error(f"Error generating questions: {str(e)}") |
|
|
| |
| def main(): |
| st.set_page_config( |
| page_title="RAG Learning System", |
| page_icon="๐ค", |
| layout="wide" |
| ) |
| st.title("๐ค RAG Learning System") |
| st.write("Upload your learning materials and start your interactive learning journey!") |
| |
| |
| if "COHERE_API_KEY" not in os.environ or "GROQ_API_KEY" not in os.environ: |
| st.error("API keys not found. Please add `COHERE_API_KEY` and `GROQ_API_KEY` as secrets in the Hugging Face Space settings.") |
| st.stop() |
|
|
| doc_processor = DocumentProcessor() |
| games = LearningGames() |
| dashboard = LearningDashboard() |
| |
| st.sidebar.title("Navigation") |
| page = st.sidebar.selectbox("Choose a page:", |
| ["Process Document", "Learn Topic", "Play Games", "Dashboard"]) |
| |
| if page == "Process Document": |
| upload_and_process_page(doc_processor) |
| elif page == "Learn Topic": |
| if 'vectorstore' in st.session_state: |
| learn_topic_page(RAGLearningSystem(st.session_state.vectorstore)) |
| else: |
| st.warning("Please process a document first!") |
| elif page == "Play Games": |
| if 'vectorstore' in st.session_state: |
| play_games_page(RAGLearningSystem(st.session_state.vectorstore), games) |
| else: |
| st.warning("Please process a document first!") |
| elif page == "Dashboard": |
| dashboard.show_dashboard() |
|
|
| if __name__ == "__main__": |
| main() |