Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import re | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from dotenv import load_dotenv | |
| from unstructured.partition.auto import partition | |
| # ==================== ENV SETUP ==================== | |
| load_dotenv() | |
| if not os.getenv("GOOGLE_API_KEY"): | |
| st.error("❌ GOOGLE_API_KEY not found. Add it in Hugging Face Secrets.") | |
| st.stop() | |
| # Disable inference for safety (you can remove this in Docker if you want full inference) | |
| os.environ["UNSTRUCTURED_DISABLE_INFERENCE"] = "true" | |
| # ==================== QUESTION SPLITTER ==================== | |
| def split_questions(text): | |
| text = text.replace("\n", " ").strip() | |
| # Split on ? OR . only if the dot is NOT part of a number like "1." | |
| questions = re.split( | |
| r'(?<!\d)(?<=[?.])\s*', | |
| text | |
| ) | |
| return [q.strip() for q in questions if q.strip()] | |
| # ==================== PROMPT ==================== | |
| PROMPT = PromptTemplate( | |
| template=""" | |
| Answer the question using ONLY the given context. | |
| Respond in the SAME language as the question. | |
| If the answer is not present, say: | |
| "Answer is not available in the context." | |
| Context: | |
| {context} | |
| Question: | |
| {question} | |
| Answer: | |
| """, | |
| input_variables=["context", "question"] | |
| ) | |
| # ==================== DOCUMENT INGESTION ==================== | |
| def extract_text_unstructured(uploaded_files): | |
| full_text = "" | |
| for file in uploaded_files: | |
| with open(file.name, "wb") as f: | |
| f.write(file.getbuffer()) | |
| elements = partition( | |
| filename=file.name, | |
| strategy="fast" | |
| ) | |
| file_text = "\n".join(el.text for el in elements if el.text) | |
| full_text += f"\n\n--- Source: {file.name} ---\n\n{file_text}" | |
| os.remove(file.name) | |
| return full_text | |
| # ==================== CHUNKING ==================== | |
| def get_text_chunks(text): | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| return splitter.split_text(text) | |
| # ==================== EMBEDDINGS ==================== | |
| def load_embeddings(): | |
| return HuggingFaceEmbeddings( | |
| model_name="paraphrase-multilingual-MiniLM-L12-v2" | |
| ) | |
| # ==================== VECTOR STORE ==================== | |
| def get_vector_store(text_chunks): | |
| embeddings = load_embeddings() | |
| db = FAISS.from_texts(text_chunks, embedding=embeddings) | |
| db.save_local("faiss_index") | |
| # ==================== GEMINI ==================== | |
| def ask_gemini(context, question): | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.5-flash", | |
| temperature=0.3 | |
| ) | |
| response = llm.invoke( | |
| PROMPT.format(context=context, question=question) | |
| ) | |
| return response.content | |
| # ==================== USER QUERY ==================== | |
| def user_input(user_question): | |
| if not os.path.exists("faiss_index"): | |
| st.warning("Please upload and process files first.") | |
| return | |
| embeddings = load_embeddings() | |
| db = FAISS.load_local( | |
| "faiss_index", | |
| embeddings, | |
| allow_dangerous_deserialization=True | |
| ) | |
| questions = split_questions(user_question) | |
| for idx, question in enumerate(questions, start=1): | |
| st.markdown(f"### Question {idx}") | |
| st.write(question) | |
| docs = db.similarity_search(question, k=3) | |
| if not docs: | |
| st.write("Answer is not available in the context.") | |
| st.divider() | |
| continue | |
| context = "\n\n".join(doc.page_content for doc in docs) | |
| with st.spinner("Thinking..."): | |
| answer = ask_gemini(context, question) | |
| st.markdown("**✅ Reply:**") | |
| st.write(answer) | |
| st.divider() | |
| # ==================== CACHE ==================== | |
| def clear_cache(): | |
| st.cache_resource.clear() | |
| st.cache_data.clear() | |
| # ==================== STREAMLIT UI ==================== | |
| def main(): | |
| st.set_page_config(page_title="Chat PDF") | |
| st.header("📘 Syllabus RAG System") | |
| user_question = st.text_input("Ask a question from the uploaded documents") | |
| if user_question: | |
| user_input(user_question) | |
| with st.sidebar: | |
| st.title("Menu") | |
| pdf_docs = st.file_uploader( | |
| "Upload files", | |
| type=["pdf", "txt", "md", "docx", "html"], | |
| accept_multiple_files=True | |
| ) | |
| if st.button("Submit & Process"): | |
| if not pdf_docs: | |
| st.warning("Please upload at least one file.") | |
| return | |
| with st.spinner("Processing files..."): | |
| raw_text = extract_text_unstructured(pdf_docs) | |
| chunks = get_text_chunks(raw_text) | |
| get_vector_store(chunks) | |
| st.success("✅ Files processed successfully!") | |
| if st.button("Clear Cache"): | |
| clear_cache() | |
| st.success("Cache cleared successfully!") | |
| if __name__ == "__main__": | |
| main() | |