| import os |
|
|
| print(">>> DEBUG: Environment Variables at Startup <<<") |
| for var in ("OPENAI_API_KEY", "LLAMA_CLOUD_API_KEY"): |
| |
| print(f"{var} = {os.getenv(var)!r}") |
|
|
| |
| import shutil |
| import asyncio |
| from pathlib import Path |
| import nest_asyncio |
| nest_asyncio.apply() |
|
|
| import gradio as gr |
| from PyPDF2 import PdfReader |
|
|
| from llama_parse import LlamaParse |
| from llama_index.core import ( |
| Settings, VectorStoreIndex, StorageContext, load_index_from_storage |
| ) |
| from llama_index.llms.openai import OpenAI |
| from llama_index.embeddings.openai import OpenAIEmbedding |
| from llama_index.core.tools import QueryEngineTool |
| from llama_index.core.query_engine import SubQuestionQueryEngine |
| from llama_index.core.agent.workflow import FunctionAgent |
| from llama_index.core.workflow import Context |
|
|
| |
|
|
| global OPENAI_API_KEY, LLAMA_CLOUD_API_KEY |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
| LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY") |
|
|
| |
|
|
| Settings.llm = OpenAI(model="gpt-4o") |
| Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-3-large") |
| Settings.chunk_size = 512 |
| Settings.chunk_overlap = 64 |
|
|
|
|
| |
| print(">>> DEBUG: About to init LlamaParse with key:", os.getenv("LLAMA_CLOUD_API_KEY") is not None) |
| |
| parser = LlamaParse( |
| api_key = LLAMA_CLOUD_API_KEY, |
| |
| result_type = "markdown", |
| content_guideline_instruction = ( |
| "You are processing a PDF slide deck. " |
| "Produce Markdown with slide metadata, cleaned bullets, tables, " |
| "charts summaries, figures captions, metrics, and a 1β2 sentence takeaway." |
| ), |
| verbose=True |
| ) |
|
|
| |
| Path("./user_data").mkdir(exist_ok=True) |
| Path("./index_data").mkdir(exist_ok=True) |
|
|
| |
| async def answer(uploaded_files: list[gr.FileData], question: str) -> str: |
|
|
| print(f">>> DEBUG: answer() called. OPENAI key set? {os.getenv('OPENAI_API_KEY') is not None}") |
| print(f">>> DEBUG: answer() called. LLAMA key set? {os.getenv('LLAMA_CLOUD_API_KEY') is not None}") |
| |
| if not uploaded_files: |
| return "β Please upload at least one PDF." |
| if len(uploaded_files) > 5: |
| return "β You can upload up to 5 PDF files." |
|
|
| tools = [] |
| for file_obj in uploaded_files: |
| |
| try: |
| reader = PdfReader(file_obj.name) |
| except Exception as e: |
| return f"β Error reading {file_obj.name}: {e}" |
| if len(reader.pages) > 50: |
| return f"β {Path(file_obj.name).name} has {len(reader.pages)} pages (>50)." |
|
|
| |
| dest = Path("./user_data") / Path(file_obj.name).name |
| shutil.copyfile(file_obj.name, dest) |
|
|
| |
| docs = parser.load_data(dest) |
|
|
| |
| stem = dest.stem |
| idx_dir = Path(f"./index_data/{stem}") |
|
|
| |
| if idx_dir.exists() and any(idx_dir.iterdir()): |
| sc = StorageContext.from_defaults(persist_dir=str(idx_dir)) |
| idx = load_index_from_storage(sc) |
| else: |
| sc = StorageContext.from_defaults() |
| idx = VectorStoreIndex.from_documents(docs, storage_context=sc) |
| sc.persist(persist_dir=str(idx_dir)) |
|
|
| |
| tools.append( |
| QueryEngineTool.from_defaults( |
| query_engine=idx.as_query_engine(), |
| name=f"vector_index_{stem}", |
| description=f"Query engine for {stem}.pdf" |
| ) |
| ) |
|
|
| |
| subq = SubQuestionQueryEngine.from_defaults(query_engine_tools=tools) |
| tools.append( |
| QueryEngineTool.from_defaults( |
| query_engine=subq, |
| name="sub_question_query_engine", |
| description="Multi-file comparative queries" |
| ) |
| ) |
| agent = FunctionAgent(tools=tools, llm=OpenAI(model="gpt-4o")) |
| ctx = Context(agent) |
|
|
| |
| resp = await agent.run(question, ctx=ctx) |
| return str(resp) |
|
|
| |
| def remove_docs(filenames: str) -> str: |
| """ |
| filenames: comma-separated list of exact PDF filenames (with .pdf) |
| Deletes each from ./user_data/ and its index folder under ./index_data/ |
| """ |
| if not filenames.strip(): |
| return "β Enter at least one filename to remove." |
|
|
| removed, not_found = [], [] |
| for name in [f.strip() for f in filenames.split(",")]: |
| pdf_path = Path("./user_data") / name |
| idx_path = Path("./index_data") / Path(name).stem |
|
|
| ok = True |
| if pdf_path.exists(): |
| pdf_path.unlink() |
| else: |
| ok = False |
|
|
| if idx_path.exists(): |
| shutil.rmtree(idx_path) |
| else: |
| ok = ok and False |
|
|
| if ok: |
| removed.append(name) |
| else: |
| not_found.append(name) |
|
|
| msg = "" |
| if removed: |
| msg += f"β
Removed: {', '.join(removed)}.\n" |
| if not_found: |
| msg += f"β οΈ Not found: {', '.join(not_found)}." |
| return msg.strip() |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# π PDF Slide Deck Q&A Bot") |
|
|
| with gr.Tab("Ask Questions"): |
| with gr.Row(): |
| file_input = gr.UploadButton( |
| "Upload up to 5 PDFs", |
| file_types=[".pdf"], |
| file_count="multiple" |
| ) |
| question = gr.Textbox( |
| lines=2, |
| placeholder="Ask your question about the uploaded slide decks..." |
| ) |
| output = gr.Textbox(label="Answer") |
| ask_btn = gr.Button("Ask") |
| ask_btn.click( |
| fn=answer, |
| inputs=[file_input, question], |
| outputs=output |
| ) |
|
|
| with gr.Tab("Remove Documents"): |
| remove_input = gr.Textbox( |
| lines=1, |
| placeholder="e.g. Q1-Slides.pdf, Q2-Slides.pdf" |
| ) |
| remove_output = gr.Textbox(label="Removal Status") |
| remove_btn = gr.Button("Remove Docs") |
| remove_btn.click( |
| fn=remove_docs, |
| inputs=remove_input, |
| outputs=remove_output |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|