Anshviradiya's picture
Update app.py
d51476d verified
import streamlit as st
import os
import re
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import PromptTemplate
from langchain_community.embeddings import HuggingFaceEmbeddings
from dotenv import load_dotenv
from unstructured.partition.auto import partition
# ==================== ENV SETUP ====================
load_dotenv()
if not os.getenv("GOOGLE_API_KEY"):
st.error("❌ GOOGLE_API_KEY not found. Add it in Hugging Face Secrets.")
st.stop()
# Disable inference for safety (you can remove this in Docker if you want full inference)
os.environ["UNSTRUCTURED_DISABLE_INFERENCE"] = "true"
# ==================== QUESTION SPLITTER ====================
def split_questions(text):
text = text.replace("\n", " ").strip()
# Split on ? OR . only if the dot is NOT part of a number like "1."
questions = re.split(
r'(?<!\d)(?<=[?.])\s*',
text
)
return [q.strip() for q in questions if q.strip()]
# ==================== PROMPT ====================
PROMPT = PromptTemplate(
template="""
Answer the question using ONLY the given context.
Respond in the SAME language as the question.
If the answer is not present, say:
"Answer is not available in the context."
Context:
{context}
Question:
{question}
Answer:
""",
input_variables=["context", "question"]
)
# ==================== DOCUMENT INGESTION ====================
def extract_text_unstructured(uploaded_files):
full_text = ""
for file in uploaded_files:
with open(file.name, "wb") as f:
f.write(file.getbuffer())
elements = partition(
filename=file.name,
strategy="fast"
)
file_text = "\n".join(el.text for el in elements if el.text)
full_text += f"\n\n--- Source: {file.name} ---\n\n{file_text}"
os.remove(file.name)
return full_text
# ==================== CHUNKING ====================
def get_text_chunks(text):
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
return splitter.split_text(text)
# ==================== EMBEDDINGS ====================
@st.cache_resource
def load_embeddings():
return HuggingFaceEmbeddings(
model_name="paraphrase-multilingual-MiniLM-L12-v2"
)
# ==================== VECTOR STORE ====================
def get_vector_store(text_chunks):
embeddings = load_embeddings()
db = FAISS.from_texts(text_chunks, embedding=embeddings)
db.save_local("faiss_index")
# ==================== GEMINI ====================
def ask_gemini(context, question):
llm = ChatGoogleGenerativeAI(
model="gemini-2.5-flash",
temperature=0.3
)
response = llm.invoke(
PROMPT.format(context=context, question=question)
)
return response.content
# ==================== USER QUERY ====================
def user_input(user_question):
if not os.path.exists("faiss_index"):
st.warning("Please upload and process files first.")
return
embeddings = load_embeddings()
db = FAISS.load_local(
"faiss_index",
embeddings,
allow_dangerous_deserialization=True
)
questions = split_questions(user_question)
for idx, question in enumerate(questions, start=1):
st.markdown(f"### Question {idx}")
st.write(question)
docs = db.similarity_search(question, k=3)
if not docs:
st.write("Answer is not available in the context.")
st.divider()
continue
context = "\n\n".join(doc.page_content for doc in docs)
with st.spinner("Thinking..."):
answer = ask_gemini(context, question)
st.markdown("**✅ Reply:**")
st.write(answer)
st.divider()
# ==================== CACHE ====================
def clear_cache():
st.cache_resource.clear()
st.cache_data.clear()
# ==================== STREAMLIT UI ====================
def main():
st.set_page_config(page_title="Chat PDF")
st.header("📘 Syllabus RAG System")
user_question = st.text_input("Ask a question from the uploaded documents")
if user_question:
user_input(user_question)
with st.sidebar:
st.title("Menu")
pdf_docs = st.file_uploader(
"Upload files",
type=["pdf", "txt", "md", "docx", "html"],
accept_multiple_files=True
)
if st.button("Submit & Process"):
if not pdf_docs:
st.warning("Please upload at least one file.")
return
with st.spinner("Processing files..."):
raw_text = extract_text_unstructured(pdf_docs)
chunks = get_text_chunks(raw_text)
get_vector_store(chunks)
st.success("✅ Files processed successfully!")
if st.button("Clear Cache"):
clear_cache()
st.success("Cache cleared successfully!")
if __name__ == "__main__":
main()