""" Plexi-Assistant.py - Plexi RAG Assistant (GitHub Index) ====================================================== The Streamlit frontend. Retrieval uses the pre-built LlamaIndex committed to the plexi-materials repo by GitHub Actions. Flow per user message: 1. On app start -> fetch pre-built index from GitHub (cached) 2. On each message -> embed query locally, retrieve top-k chunks 3. Build focused prompt with retrieved chunks -> call user's LLM """ import json import os import requests import streamlit as st try: from streamlit_cookies_manager_ext import EncryptedCookieManager COOKIES_MANAGER_AVAILABLE = True except ImportError: EncryptedCookieManager = None COOKIES_MANAGER_AVAILABLE = False from utils import ( APP_ICON_PATH, fetch_rag_index, get_manifest, inject_theme, load_subject_context, render_page_header, render_panel, render_sidebar_footer, render_sidebar_intro, render_stat_cards, summarize_subject_catalog, ) st.set_page_config(page_title="Plexi Assistant", page_icon=APP_ICON_PATH, layout="wide") inject_theme() TOP_K = 5 PROMPT_SUGGESTIONS = [ ( "Summarize this subject", "Give me a clean revision summary of this subject using only the loaded materials.", ), ( "Important exam topics", "List the most important exam topics covered in these materials.", ), ( "Explain like a beginner", "Explain the most important concepts in simple terms using only the loaded materials.", ), ( "Make viva questions", "Create 10 viva-style questions and short answers from the loaded materials.", ), ] PROVIDERS = { "Gemini (Google)": { "base_url": "https://generativelanguage.googleapis.com/v1beta/openai", "models": [ "gemini-2.0-flash", "gemini-2.0-flash-lite", "gemini-1.5-flash", "gemini-1.5-pro", ], "key_help": "Get a key at [Google AI Studio](https://aistudio.google.com/app/apikey)", }, "OpenAI": { "base_url": "https://api.openai.com/v1", "models": ["gpt-4o-mini", "gpt-4o", "gpt-4.1-mini", "gpt-4.1-nano"], "key_help": "Get a key at [OpenAI Platform](https://platform.openai.com/api-keys)", }, "Mistral": { "base_url": "https://api.mistral.ai/v1", "models": [ "mistral-small-latest", "mistral-medium-latest", "mistral-large-latest", "open-mistral-nemo", ], "key_help": "Get a key at [Mistral Console](https://console.mistral.ai/api-keys)", }, "Groq": { "base_url": "https://api.groq.com/openai/v1", "models": [ "llama-3.3-70b-versatile", "llama-3.1-8b-instant", "gemma2-9b-it", "mixtral-8x7b-32768", ], "key_help": "Get a key at [Groq Console](https://console.groq.com/keys)", }, "OpenRouter": { "base_url": "https://openrouter.ai/api/v1", "models": [ "google/gemini-2.0-flash-exp:free", "meta-llama/llama-3.3-70b-instruct:free", "mistralai/mistral-small-3.1-24b-instruct:free", "qwen/qwen3-8b:free", ], "key_help": "Get a key at [OpenRouter](https://openrouter.ai/keys)", }, "Custom (self-hosted)": { "base_url": "", "models": [], "key_help": "For Ollama, LM Studio, or any OpenAI-compatible server", }, } PROVIDER_NAMES = list(PROVIDERS.keys()) PLEXI_GPT_URL = "https://chatgpt.com/g/g-69caa671910481919ce71d19952e34e5-plexi" PLEXI_MCP_GUIDE_URL = "https://lazyhuman.notion.site/Setting-Up-Plexi-MCP-for-Claude-and-ChatGPT-336e3502f0918090b69fdbed148e8e55" PLEXI_MCP_ENDPOINT = "https://plexi-mcp.vercel.app/api/mcp" SAVED_CONFIG_COOKIE = "assistant_config" COOKIE_PASSWORD = os.getenv("PLEXI_COOKIE_PASSWORD") or os.getenv("COOKIES_PASSWORD") cookies = None if COOKIE_PASSWORD and COOKIES_MANAGER_AVAILABLE: cookies = EncryptedCookieManager( prefix="plexi/assistant/", password=COOKIE_PASSWORD, ) if not cookies.ready(): st.stop() def _matches_scope(node, semester: str, subject: str) -> bool: """Return True when a retrieved node belongs to the active semester + subject.""" metadata = getattr(node.node, "metadata", {}) or {} return metadata.get("semester") == semester and metadata.get("subject") == subject def queue_prompt(prompt: str): """Store a prompt and rerun so the chat input flow can process it.""" st.session_state["_pending_prompt"] = prompt st.rerun() def _saved_config_available(): return cookies is not None def _load_saved_config(): """Load saved assistant settings from the browser cookie.""" if not _saved_config_available(): return None raw_config = cookies.get(SAVED_CONFIG_COOKIE) if not raw_config: return None try: return json.loads(raw_config) except (TypeError, json.JSONDecodeError): del cookies[SAVED_CONFIG_COOKIE] cookies.save() return None def _save_config(config): """Persist assistant settings in the browser cookie.""" if not _saved_config_available(): return cookies[SAVED_CONFIG_COOKIE] = json.dumps(config) cookies.save() def _clear_saved_config(): """Remove the saved browser-side assistant settings.""" if not _saved_config_available(): return if SAVED_CONFIG_COOKIE in cookies: del cookies[SAVED_CONFIG_COOKIE] cookies.save() def _current_config(selected_semester=None, selected_subject=None, api_key=None): """Build the current assistant configuration payload.""" return { "cfg_provider": st.session_state.get("cfg_provider"), "cfg_base_url": st.session_state.get("cfg_base_url"), "cfg_model": st.session_state.get("cfg_model"), "api_key": api_key if api_key is not None else st.session_state.get("api_key"), "asst_semester": selected_semester if selected_semester is not None else st.session_state.get("asst_semester"), "asst_subject": selected_subject if selected_subject is not None else st.session_state.get("asst_subject"), } def _hydrate_saved_config(): """Hydrate session state from a remembered browser config once per load.""" if st.session_state.get("_saved_config_hydrated"): return saved_config = _load_saved_config() if saved_config: for key, value in saved_config.items(): if value and key not in st.session_state: st.session_state[key] = value st.session_state["remember_device"] = True st.session_state["_saved_config_hydrated"] = True def render_external_access(): """Render low-emphasis outbound access actions.""" st.markdown( '
Use Plexi Elsewhere
', unsafe_allow_html=True, ) st.caption( "Open the GPT directly or use the MCP endpoint in any compatible client." ) button_cols = st.columns([1, 1], gap="small") with button_cols[0]: st.link_button("Open Plexi GPT", PLEXI_GPT_URL, use_container_width=True) with button_cols[1]: st.link_button("Open MCP Guide", PLEXI_MCP_GUIDE_URL, use_container_width=True) with st.expander("MCP endpoint", expanded=False): st.code(PLEXI_MCP_ENDPOINT, language=None) def local_retrieve(index, query: str, semester: str, subject: str, top_k: int = TOP_K): """Retrieve top-k relevant chunks scoped to the active semester + subject.""" if index is None: return [] try: retriever = index.as_retriever(similarity_top_k=max(top_k * 5, 10)) nodes = retriever.retrieve(query) scoped_nodes = [ node for node in nodes if _matches_scope(node, semester, subject) ] return [ { "text": node.node.get_content(), "score": round(float(node.score), 4) if node.score is not None else None, "filename": (getattr(node.node, "metadata", {}) or {}).get("filename"), "subject": (getattr(node.node, "metadata", {}) or {}).get("subject"), } for node in scoped_nodes[:top_k] ] except Exception as err: st.warning(f"Retrieval error: {err}") return [] def format_context(chunks): """Format retrieved chunks for the system prompt.""" if not chunks: return "(No relevant context retrieved.)" parts = [] for index, chunk in enumerate(chunks, start=1): score = f" [relevance: {chunk['score']}]" if chunk.get("score") else "" parts.append(f"--- Chunk {index}{score} ---\n{chunk['text']}\n") return "\n".join(parts) def _send_message(endpoint_url, api_key, model, system_prompt, history, user_prompt): messages = [{"role": "system", "content": system_prompt}] for message in history: messages.append({"role": message["role"], "content": message["content"]}) messages.append({"role": "user", "content": user_prompt}) headers = {"Content-Type": "application/json"} if api_key: headers["Authorization"] = f"Bearer {api_key}" response = requests.post( f"{endpoint_url}/chat/completions", headers=headers, json={"model": model, "messages": messages, "temperature": 0.3}, timeout=120, ) if response.status_code == 429: detail = response.json().get("error", {}).get("message", "Rate limit exceeded.") raise Exception(f"RATE_LIMITED: {detail}") if response.status_code == 413: raise Exception( "PAYLOAD_TOO_LARGE: The study materials are too large for this model's context window. " "Please try asking a more specific question (e.g., 'Tell me viva questions for Unit 1' instead of the whole subject), " "or switch to a model with a larger context window." ) if response.status_code == 401: raise Exception( "AUTH_ERROR: Invalid API key. Please check your key and try again." ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"] def _is_configured(): return ( "cfg_provider" in st.session_state and st.session_state.get("cfg_model") and ( st.session_state.get("cfg_provider") == "Custom (self-hosted)" or st.session_state.get("api_key") ) ) def render_onboarding(manifest): """Render the setup flow before chat becomes available.""" render_page_header( "Plexi assistant", "Ask course questions with grounded context", ( "Choose a provider, bring your own API key, and Plexi will answer only " "from the materials loaded for the subject you pick." ), badges=["Cited answers", "Scoped retrieval", "OpenAI-compatible"], ) left_col, right_col = st.columns([1.1, 0.9], gap="large") with left_col: st.markdown( """
Set up your model endpoint
Pick a hosted provider or connect a local OpenAI-compatible server.
""", unsafe_allow_html=True, ) provider_name = st.selectbox("Provider", PROVIDER_NAMES, key="ob_provider") provider = PROVIDERS[provider_name] if provider_name == "Custom (self-hosted)": base_url = st.text_input("Base URL", value="http://localhost:11434/v1") model_name = st.text_input( "Model", placeholder="e.g. llama3, mistral, phi3" ) else: base_url = provider["base_url"] model_options = provider["models"] + ["Custom"] model_choice = st.selectbox("Model", model_options) model_name = ( st.text_input("Custom model ID", placeholder="Enter model identifier") if model_choice == "Custom" else model_choice ) needs_key = provider_name != "Custom (self-hosted)" api_key = "" if needs_key: st.info(provider["key_help"]) api_key = st.text_input( "API Key", type="password", value=st.session_state.get("api_key", ""), placeholder="Paste your API key here", ) remember_default = bool( st.session_state.get("remember_device") or _load_saved_config() ) remember_device = st.checkbox( "Remember these settings on this device", value=remember_default, disabled=not _saved_config_available(), help=( "Saves your provider settings and API key in this browser only." if _saved_config_available() else ( "Install the optional cookie dependency and set " "PLEXI_COOKIE_PASSWORD to enable saved browser settings." ) ), ) if not _saved_config_available(): st.caption( "Saved browser settings are disabled until " "`streamlit-cookies-manager-ext` is installed and " "`PLEXI_COOKIE_PASSWORD` is set." ) can_start = bool( model_name and (not needs_key or api_key) ) if st.button( "Continue", type="primary", disabled=not can_start, use_container_width=True, ): st.session_state.cfg_provider = provider_name st.session_state.cfg_base_url = base_url st.session_state.cfg_model = model_name st.session_state.remember_device = remember_device if api_key: st.session_state.api_key = api_key elif "api_key" in st.session_state: del st.session_state.api_key if remember_device: _save_config( { "cfg_provider": provider_name, "cfg_base_url": base_url, "cfg_model": model_name, "api_key": api_key, } ) else: _clear_saved_config() st.session_state.pop("messages", None) st.rerun() with right_col: render_external_access() render_panel( "What Plexi does", "Keeps answers grounded in the currently loaded course materials instead of drifting into generic knowledge.", ) render_panel( "Provider model", "Bring your own endpoint. Use hosted providers or connect a local OpenAI-compatible server.", ) render_panel( "Best use case", "Use Plexi for revision summaries, topic breakdowns, viva practice, and quick concept explanations.", tone="callout", ) st.markdown( """
Good prompts to start with
""", unsafe_allow_html=True, ) def render_scope_selection(manifest): """Render the subject selection flow before loading materials.""" render_page_header( "Plexi assistant", "Select study materials", "Choose a semester and subject to load the corresponding materials for the chat.", badges=[st.session_state.cfg_provider, st.session_state.cfg_model], ) left_col, right_col = st.columns([1.1, 0.9], gap="large") with left_col: st.markdown( """
Choose your subject
Materials for this subject will be loaded into the AI's context.
""", unsafe_allow_html=True, ) semester_names = sorted(manifest.keys()) default_semester = st.session_state.get("asst_semester") semester_index = ( semester_names.index(default_semester) if default_semester in semester_names else 0 ) selected_semester = st.selectbox( "Semester", semester_names, index=semester_index, key="asst_semester", ) subject_names = sorted(manifest[selected_semester].keys()) default_subject = st.session_state.get("asst_subject") subject_index = ( subject_names.index(default_subject) if default_subject in subject_names else 0 ) selected_subject = st.selectbox( "Subject", subject_names, index=subject_index, key="asst_subject", ) if st.button( "Load Materials & Start Chat", type="primary", use_container_width=True ): st.session_state._scope_confirmed = True if st.session_state.get("remember_device"): _save_config( { "cfg_provider": st.session_state.cfg_provider, "cfg_base_url": st.session_state.cfg_base_url, "cfg_model": st.session_state.cfg_model, "api_key": st.session_state.get("api_key", ""), "asst_semester": selected_semester, "asst_subject": selected_subject, } ) st.session_state.pop("messages", None) st.rerun() with right_col: render_external_access() _hydrate_saved_config() render_sidebar_intro() try: manifest = get_manifest() except Exception as err: st.error(f"Failed to load materials catalog: {err}") st.stop() if not manifest: st.info("No study materials are available yet.") st.stop() if not _is_configured(): render_onboarding(manifest) st.stop() if not st.session_state.get("_scope_confirmed"): render_scope_selection(manifest) st.stop() provider_name = st.session_state.cfg_provider base_url = st.session_state.cfg_base_url model_name = st.session_state.cfg_model api_key = st.session_state.get("api_key", "") rag_index, rag_error = fetch_rag_index() rag_active = rag_index is not None mode_label = "RAG retrieval" if rag_active else "Full-context fallback" with st.sidebar: st.markdown( '
Study Scope
', unsafe_allow_html=True, ) semester_names = sorted(manifest.keys()) selected_semester = st.selectbox("Semester", semester_names, key="asst_semester") subjects = sorted(manifest[selected_semester].keys()) selected_subject = st.selectbox("Subject", subjects, key="asst_subject") scope_key = f"{selected_semester}|{selected_subject}" if st.session_state.get("_scope_key") != scope_key: st.session_state._scope_key = scope_key st.session_state.pop("messages", None) @st.cache_data(show_spinner=False, ttl=300) def _get_subject_context(semester, subject): return load_subject_context(manifest, semester, subject) with st.spinner("Loading study materials..."): subject_text, source_list = _get_subject_context(selected_semester, selected_subject) if not subject_text.strip(): st.warning("No readable text was found for this subject. Try another selection.") st.stop() subject_summary = summarize_subject_catalog( manifest[selected_semester][selected_subject] ) with st.sidebar: st.markdown( f"""
{selected_subject}
{selected_semester}
""", unsafe_allow_html=True, ) if rag_active: st.success("RAG is active. Answers use subject-scoped retrieved chunks.") elif rag_error: st.warning(f"RAG is unavailable: {rag_error}") else: st.warning("RAG index is unavailable. Using full-context fallback mode.") with st.expander(f"Loaded sources ({len(source_list)})", expanded=False): for source in source_list: st.caption(f"[{source['id']}] {source['name']} ({source['type']})") with st.expander("Change LLM settings", expanded=False): new_provider = st.selectbox( "Provider", PROVIDER_NAMES, index=PROVIDER_NAMES.index(provider_name), key="sb_provider", ) selected_provider = PROVIDERS[new_provider] if new_provider == "Custom (self-hosted)": new_base_url = st.text_input("Base URL", value=base_url, key="sb_base_url") new_model = st.text_input( "Model", value=model_name if provider_name == "Custom (self-hosted)" else "", key="sb_model_custom", placeholder="e.g. llama3, mistral, phi3", ) else: new_base_url = selected_provider["base_url"] model_options = selected_provider["models"] + ["Custom"] default_index = ( model_options.index(model_name) if model_name in model_options else 0 ) selected_model = st.selectbox( "Model", model_options, index=default_index, key="sb_model_select", ) new_model = ( st.text_input("Custom model ID", key="sb_model_id") if selected_model == "Custom" else selected_model ) new_needs_key = new_provider != "Custom (self-hosted)" new_key = api_key if new_needs_key: new_key = st.text_input( "API Key", type="password", value=api_key, key="sb_api_key", ) remember_sidebar = st.checkbox( "Remember these settings on this device", value=bool(st.session_state.get("remember_device")), disabled=not _saved_config_available(), key="sb_remember_device", help=( "Keeps the selected provider, key, and scope in this browser." if _saved_config_available() else ( "Install the optional cookie dependency and set " "PLEXI_COOKIE_PASSWORD to enable saved browser settings." ) ), ) changed = ( new_provider != provider_name or new_base_url != base_url or new_model != model_name or new_key != api_key or remember_sidebar != bool(st.session_state.get("remember_device")) ) if changed and new_model: if st.button("Apply Changes", use_container_width=True, type="primary"): st.session_state.cfg_provider = new_provider st.session_state.cfg_base_url = new_base_url st.session_state.cfg_model = new_model st.session_state.remember_device = remember_sidebar if new_key: st.session_state.api_key = new_key elif "api_key" in st.session_state: del st.session_state.api_key if remember_sidebar: _save_config( { "cfg_provider": new_provider, "cfg_base_url": new_base_url, "cfg_model": new_model, "api_key": new_key, "asst_semester": selected_semester, "asst_subject": selected_subject, } ) else: _clear_saved_config() st.session_state.pop("messages", None) st.rerun() if _load_saved_config(): if st.button("Forget Saved Settings", use_container_width=True): _clear_saved_config() st.session_state.remember_device = False for key in ( "cfg_provider", "cfg_base_url", "cfg_model", "api_key", "asst_semester", "asst_subject", ): st.session_state.pop(key, None) st.session_state.pop("messages", None) st.rerun() if st.button("New Chat", use_container_width=True): st.session_state.pop("messages", None) st.rerun() render_sidebar_footer() render_page_header( "Plexi assistant", f"Ask anything from {selected_subject}", ( "The assistant is currently grounded to the selected subject. It will stay inside " "that scope and answer only from the loaded materials." ), badges=[selected_semester, selected_subject, provider_name, mode_label], ) render_external_access() if st.session_state.get("remember_device") and _saved_config_available(): desired_config = _current_config( selected_semester=selected_semester, selected_subject=selected_subject, api_key=api_key, ) if desired_config != _load_saved_config(): _save_config(desired_config) render_stat_cards( [ { "label": "Loaded sources", "value": len(source_list), "note": "Readable files currently available to cite in this chat.", }, { "label": "Subject files", "value": subject_summary["file_count"], "note": "Assets available for the selected subject in the catalog.", }, { "label": "Provider", "value": provider_name, "note": model_name, }, { "label": "Answer mode", "value": "RAG" if rag_active else "Fallback", "note": rag_error or "Top matching chunks are injected into the prompt.", }, ] ) st.markdown( '
Prompt Starters
', unsafe_allow_html=True, ) prompt_cols = st.columns(len(PROMPT_SUGGESTIONS), gap="medium") for column, (label, prompt_text) in zip(prompt_cols, PROMPT_SUGGESTIONS): with column: if st.button(label, use_container_width=True, key=f"prompt_{label}"): queue_prompt(prompt_text) st.markdown( """
Study scope loaded
Ask for summaries, examples, key differences, exam revision prompts, or viva-style questions. Plexi will stay inside the loaded subject context.
""", unsafe_allow_html=True, ) source_index = "\n".join( f" [{src['id']}] {src['name']} ({src['type']})" for src in source_list ) def build_system_prompt(retrieved_chunks): if rag_active and retrieved_chunks: context_section = ( "## RETRIEVED CONTEXT (most relevant chunks for this query)\n" f"{format_context(retrieved_chunks)}\n" "## END OF RETRIEVED CONTEXT" ) else: context_section = f"## SOURCE MATERIALS\n{subject_text}\n## END OF MATERIALS" return ( "Your name is Plexi. You are an academic assistant for Parul University CS students.\n\n" "## STRICT GROUNDING RULES\n" "1. Answer ONLY using information found in the provided context below.\n" "2. Do NOT include inline citation markers like [Source 1] in the response.\n" "3. If the answer is NOT in the context, say: 'This information is not covered in the loaded materials.'\n" " Do NOT guess or use general knowledge.\n" "4. Use clear structure: headings, bullet points, bold key terms.\n\n" "## INTERACTION STYLE\n" "- Greet users and list covered topics when greeted.\n" "- If asked about your creator, say: Kunal Gupta (LazyHuman).\n" "- Write natural, clean answers without a sources footer unless the user explicitly asks for sources.\n\n" f"## SOURCE INDEX\n{source_index}\n\n" f"{context_section}" ) if "messages" not in st.session_state: st.session_state.messages = [ { "role": "assistant", "content": ( f"Hi! I am loaded with **{selected_subject}** from **{selected_semester}**. " f"Mode: *{mode_label}*. Ask me for summaries, explanations, or revision help." ), } ] for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) pending_prompt = st.session_state.pop("_pending_prompt", None) prompt = pending_prompt or st.chat_input("Ask about your loaded study materials") if prompt: if not pending_prompt: st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): with st.spinner("Thinking..."): retrieved = ( local_retrieve(rag_index, prompt, selected_semester, selected_subject) if rag_active else [] ) system_prompt = build_system_prompt(retrieved) history = [ message for message in st.session_state.messages[1:-1] if message["role"] in ("user", "assistant") ] try: answer = _send_message( base_url, api_key, model_name, system_prompt, history, prompt ) except Exception as err: err_text = str(err) if "RATE_LIMITED" in err_text: st.session_state["_pending_prompt"] = prompt st.error("API rate limit reached. Wait a minute and press Retry.") if st.button("Retry", type="primary"): st.rerun() st.stop() if "PAYLOAD_TOO_LARGE" in err_text: err_msg = err_text.split(": ", 1)[1] st.session_state.messages.append({ "role": "assistant", "content": f"**System Error:** {err_msg}" }) st.rerun() if "AUTH_ERROR" in err_text: err_msg = err_text.split(": ", 1)[1] _clear_saved_config() st.session_state.remember_device = False if "api_key" in st.session_state: del st.session_state.api_key st.session_state.messages.append({ "role": "assistant", "content": f"**System Error:** {err_msg}" }) st.rerun() st.session_state.messages.append({ "role": "assistant", "content": f"**System Error:** {err_text}" }) st.rerun() st.markdown(answer) if retrieved: with st.expander("Retrieved context chunks", expanded=False): for index, chunk in enumerate(retrieved, start=1): label = ( chunk.get("filename") or chunk.get("subject") or "Unknown source" ) st.caption( f"Chunk {index} - {label} - relevance: {chunk.get('score')}" ) preview = chunk["text"][:400] + ( "..." if len(chunk["text"]) > 400 else "" ) st.text(preview) st.session_state.messages.append({"role": "assistant", "content": answer})