| |
| import os |
| import asyncio |
| import uuid |
| from pathlib import Path |
| from typing import Optional, List, Tuple |
| import time |
|
|
| import gradio as gr |
| from agents import ( |
| AnalysisAgent, |
| CollaborationAgent, |
| ConversationAgent, |
| MasterOrchestrator, |
| ) |
| from utils import load_pdf_text |
| from utils.session import make_user_session |
| from utils.validation import validate_file_size |
| from utils.prompts import PromptManager |
| from utils.export import ExportManager |
| from config import Config |
|
|
| |
| |
| |
| try: |
| Config.ensure_directories() |
| except Exception as e: |
| print(f"Warning: Could not ensure directories: {e}") |
|
|
| |
| AGENTS = { |
| "analysis": AnalysisAgent(name="AnalysisAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
| "collab": CollaborationAgent(name="CollaborationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
| "conversation": ConversationAgent(name="ConversationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
| } |
| ORCHESTRATOR = MasterOrchestrator(agents=AGENTS) |
|
|
| |
| try: |
| PROMPT_MANAGER = PromptManager() |
| EXPORT_MANAGER = ExportManager() |
| except Exception as e: |
| print(f"Warning: Could not initialize managers: {e}") |
| PROMPT_MANAGER = None |
| EXPORT_MANAGER = None |
|
|
| |
| |
| |
| def save_uploaded_file(uploaded, username: str = "anonymous", session_dir: Optional[str] = None) -> str: |
| if session_dir is None: |
| session_dir = make_user_session(username) |
| Path(session_dir).mkdir(parents=True, exist_ok=True) |
| dst = Path(session_dir) / f"upload_{uuid.uuid4().hex}.pdf" |
|
|
| if isinstance(uploaded, str) and os.path.exists(uploaded): |
| from shutil import copyfile |
| copyfile(uploaded, dst) |
| return str(dst) |
| if hasattr(uploaded, "read"): |
| with open(dst, "wb") as f: |
| f.write(uploaded.read()) |
| return str(dst) |
| if isinstance(uploaded, dict) and "name" in uploaded and os.path.exists(uploaded["name"]): |
| from shutil import copyfile |
| copyfile(uploaded["name"], dst) |
| return str(dst) |
| raise RuntimeError("Unable to save uploaded file.") |
|
|
| |
| |
| |
| def run_async(func, *args, **kwargs): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| return loop.run_until_complete(func(*args, **kwargs)) |
|
|
| |
| |
| |
| def handle_analysis(file, prompt, username="anonymous", use_streaming=False): |
| if file is None: |
| return "Please upload a PDF.", None, None |
| |
| try: |
| validate_file_size(file) |
| path = save_uploaded_file(file, username) |
| |
| result = run_async( |
| ORCHESTRATOR.handle_user_prompt, |
| user_id=username, |
| prompt=prompt, |
| file_path=path, |
| targets=["analysis"] |
| ) |
| return result.get("analysis", "No analysis result."), None, None |
| except Exception as e: |
| return f"Error during analysis: {str(e)}", None, None |
|
|
| def handle_batch_analysis(files, prompt, username="anonymous"): |
| """Handle batch analysis of multiple PDFs""" |
| if not files or len(files) == 0: |
| return "Please upload at least one PDF.", None, None |
| |
| try: |
| |
| file_paths = [] |
| for file in files: |
| validate_file_size(file) |
| path = save_uploaded_file(file, username) |
| file_paths.append(path) |
| |
| result = run_async( |
| ORCHESTRATOR.handle_batch_analysis, |
| user_id=username, |
| prompt=prompt, |
| file_paths=file_paths, |
| targets=["analysis"] |
| ) |
| |
| |
| batch_summary = result.get("summary", {}) |
| batch_results = result.get("batch_results", []) |
| |
| formatted_output = f"π Batch Analysis Results\n" |
| formatted_output += f"Total files: {batch_summary.get('processing_stats', {}).get('total_files', 0)}\n" |
| formatted_output += f"Successful: {batch_summary.get('processing_stats', {}).get('successful', 0)}\n" |
| formatted_output += f"Failed: {batch_summary.get('processing_stats', {}).get('failed', 0)}\n" |
| formatted_output += f"Success rate: {batch_summary.get('processing_stats', {}).get('success_rate', '0%')}\n\n" |
| |
| if batch_summary.get("batch_analysis"): |
| formatted_output += f"π Batch Summary:\n{batch_summary['batch_analysis']}\n\n" |
| |
| formatted_output += "π Individual Results:\n" |
| for i, file_result in enumerate(batch_results): |
| formatted_output += f"\n--- File {i+1}: {Path(file_result.get('file_path', 'Unknown')).name} ---\n" |
| if "error" in file_result: |
| formatted_output += f"β Error: {file_result['error']}\n" |
| else: |
| formatted_output += f"β
{file_result.get('analysis', 'No analysis')}\n" |
| |
| return formatted_output, None, None |
| except Exception as e: |
| return f"Error during batch analysis: {str(e)}", None, None |
|
|
| def handle_export(result_text, export_format, username="anonymous"): |
| """Handle export of analysis results""" |
| if not result_text or result_text.strip() == "": |
| return "No content to export.", None |
| |
| if not EXPORT_MANAGER: |
| return "Export functionality not available.", None |
| |
| try: |
| if export_format == "txt": |
| filepath = EXPORT_MANAGER.export_text(result_text, username=username) |
| elif export_format == "json": |
| data = {"analysis": result_text, "exported_by": username, "timestamp": time.time()} |
| filepath = EXPORT_MANAGER.export_json(data, username=username) |
| elif export_format == "pdf": |
| filepath = EXPORT_MANAGER.export_pdf(result_text, username=username) |
| else: |
| return f"Unsupported export format: {export_format}", None |
| |
| return f"β
Export successful! File saved to: {filepath}", filepath |
| except Exception as e: |
| return f"β Export failed: {str(e)}", None |
|
|
| def get_custom_prompts(): |
| """Get available custom prompts""" |
| if not PROMPT_MANAGER: |
| return [] |
| prompts = PROMPT_MANAGER.get_all_prompts() |
| return list(prompts.keys()) |
|
|
| def load_custom_prompt(prompt_id): |
| """Load a custom prompt template""" |
| if not PROMPT_MANAGER: |
| return "" |
| return PROMPT_MANAGER.get_prompt(prompt_id) or "" |
|
|
| |
| |
| |
| with gr.Blocks(title="PDF Analysis & Orchestrator", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# π PDF Analysis & Orchestrator - Intelligent Document Processing") |
| gr.Markdown("Upload PDFs and provide instructions for analysis, summarization, or explanation.") |
|
|
| with gr.Tabs(): |
| |
| with gr.Tab("π Single Document Analysis"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| pdf_in = gr.File(label="Upload PDF", file_types=[".pdf"], elem_id="file_upload") |
| username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="username") |
| |
| |
| with gr.Accordion("π― Custom Prompts", open=False): |
| prompt_dropdown = gr.Dropdown( |
| choices=get_custom_prompts(), |
| label="Select Custom Prompt", |
| value=None |
| ) |
| load_prompt_btn = gr.Button("Load Prompt", size="sm") |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### Analysis Instructions") |
| prompt_input = gr.Textbox( |
| lines=4, |
| placeholder="Describe what you want to do with the document...\nExamples:\n- Summarize this document in 3 key points\n- Explain this technical paper for a 10-year-old\n- Segment this document by themes\n- Analyze the key findings", |
| label="Instructions" |
| ) |
| |
| with gr.Row(): |
| submit_btn = gr.Button("π Analyze & Orchestrate", variant="primary", size="lg") |
| clear_btn = gr.Button("ποΈ Clear", size="sm") |
|
|
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| output_box = gr.Textbox(label="Analysis Result", lines=15, max_lines=25, show_copy_button=True) |
| status_box = gr.Textbox(label="Status", value="Ready to analyze documents", interactive=False) |
| |
| with gr.Column(scale=1): |
| |
| with gr.Accordion("πΎ Export Results", open=False): |
| export_format = gr.Dropdown( |
| choices=["txt", "json", "pdf"], |
| label="Export Format", |
| value="txt" |
| ) |
| export_btn = gr.Button("π₯ Export", variant="secondary") |
| export_status = gr.Textbox(label="Export Status", interactive=False) |
|
|
| |
| with gr.Tab("π Batch Processing"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| batch_files = gr.File( |
| label="Upload Multiple PDFs", |
| file_count="multiple", |
| file_types=[".pdf"] |
| ) |
| batch_username = gr.Textbox(label="Username (optional)", placeholder="anonymous") |
| |
| with gr.Column(scale=2): |
| batch_prompt = gr.Textbox( |
| lines=3, |
| placeholder="Enter analysis instructions for all documents...", |
| label="Batch Analysis Instructions" |
| ) |
| batch_submit = gr.Button("π Process Batch", variant="primary", size="lg") |
| |
| batch_output = gr.Textbox(label="Batch Results", lines=20, max_lines=30, show_copy_button=True) |
| batch_status = gr.Textbox(label="Batch Status", interactive=False) |
|
|
| |
| |
| submit_btn.click( |
| fn=handle_analysis, |
| inputs=[pdf_in, prompt_input, username_input, gr.State(False)], |
| outputs=[output_box, status_box, gr.State()] |
| ) |
| |
| |
| load_prompt_btn.click( |
| fn=load_custom_prompt, |
| inputs=[prompt_dropdown], |
| outputs=[prompt_input] |
| ) |
| |
| |
| export_btn.click( |
| fn=handle_export, |
| inputs=[output_box, export_format, username_input], |
| outputs=[export_status, gr.State()] |
| ) |
| |
| |
| clear_btn.click( |
| fn=lambda: ("", "", "", "Ready"), |
| inputs=[], |
| outputs=[pdf_in, prompt_input, output_box, status_box] |
| ) |
| |
| |
| batch_submit.click( |
| fn=handle_batch_analysis, |
| inputs=[batch_files, batch_prompt, batch_username], |
| outputs=[batch_output, batch_status, gr.State()] |
| ) |
|
|
| |
| gr.Examples( |
| examples=[ |
| ["Summarize this document in 3 key points"], |
| ["Explain this technical content for a general audience"], |
| ["Segment this document by main themes or topics"], |
| ["Analyze the key findings and recommendations"], |
| ["Create an executive summary of this document"], |
| ], |
| inputs=prompt_input, |
| label="Example Instructions" |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |
|
|