| |
| |
| import os |
| import asyncio |
| import uuid |
| from pathlib import Path |
| from typing import Optional, List, Tuple |
| import time |
|
|
| import gradio as gr |
| from agents import ( |
| AnalysisAgent, |
| CollaborationAgent, |
| ConversationAgent, |
| ResearchAnalystAgent, |
| MasterOrchestrator, |
| ) |
| from utils import load_pdf_text |
| from utils.session import make_user_session |
| from utils.validation import validate_file_size |
| from utils.prompts import PromptManager |
| from utils.export import ExportManager |
| from config import Config |
|
|
| |
| |
| |
| Config.ensure_directories() |
|
|
| |
| AGENTS = { |
| "analysis": AnalysisAgent(name="AnalysisAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
| "collab": CollaborationAgent(name="CollaborationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
| "conversation": ConversationAgent(name="ConversationAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
| "research": ResearchAnalystAgent(name="ResearchAnalystAgent", model=Config.OPENAI_MODEL, tasks_completed=0), |
| } |
| ORCHESTRATOR = MasterOrchestrator(agents=AGENTS) |
|
|
| |
| PROMPT_MANAGER = PromptManager() |
| EXPORT_MANAGER = ExportManager() |
|
|
| |
| |
| |
| def save_uploaded_file(uploaded, username: str = "anonymous", session_dir: Optional[str] = None) -> str: |
| if session_dir is None: |
| session_dir = make_user_session(username) |
| Path(session_dir).mkdir(parents=True, exist_ok=True) |
| dst = Path(session_dir) / f"upload_{uuid.uuid4().hex}.pdf" |
|
|
| if isinstance(uploaded, str) and os.path.exists(uploaded): |
| from shutil import copyfile |
| copyfile(uploaded, dst) |
| return str(dst) |
| if hasattr(uploaded, "read"): |
| with open(dst, "wb") as f: |
| f.write(uploaded.read()) |
| return str(dst) |
| if isinstance(uploaded, dict) and "name" in uploaded and os.path.exists(uploaded["name"]): |
| from shutil import copyfile |
| copyfile(uploaded["name"], dst) |
| return str(dst) |
| raise RuntimeError("Unable to save uploaded file.") |
|
|
| |
| |
| |
| def run_async(func, *args, **kwargs): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| return loop.run_until_complete(func(*args, **kwargs)) |
|
|
| |
| |
| |
| def handle_analysis(file, prompt, username="anonymous", use_streaming=False): |
| if file is None: |
| return "Please upload a PDF.", None, None |
| |
| validate_file_size(file) |
| path = save_uploaded_file(file, username) |
| |
| if use_streaming: |
| return handle_analysis_streaming(path, prompt, username) |
| else: |
| result = run_async( |
| ORCHESTRATOR.handle_user_prompt, |
| user_id=username, |
| prompt=prompt, |
| file_path=path, |
| targets=["analysis"] |
| ) |
| return result.get("analysis", "No analysis result."), None, None |
|
|
| def handle_analysis_streaming(file_path, prompt, username="anonymous"): |
| """Handle analysis with streaming output""" |
| def stream_generator(): |
| async def async_stream(): |
| async for chunk in ORCHESTRATOR.handle_user_prompt_streaming( |
| user_id=username, |
| prompt=prompt, |
| file_path=file_path, |
| targets=["analysis"] |
| ): |
| yield chunk |
| |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| async_gen = async_stream() |
| while True: |
| try: |
| chunk = loop.run_until_complete(async_gen.__anext__()) |
| yield chunk |
| except StopAsyncIteration: |
| break |
| finally: |
| loop.close() |
| |
| return stream_generator(), None, None |
|
|
| def handle_batch_analysis(files, prompt, username="anonymous"): |
| """Handle batch analysis of multiple PDFs""" |
| if not files or len(files) == 0: |
| return "Please upload at least one PDF.", None, None |
| |
| |
| file_paths = [] |
| for file in files: |
| try: |
| validate_file_size(file) |
| path = save_uploaded_file(file, username) |
| file_paths.append(path) |
| except Exception as e: |
| return f"Error with file {file}: {str(e)}", None, None |
| |
| result = run_async( |
| ORCHESTRATOR.handle_batch_analysis, |
| user_id=username, |
| prompt=prompt, |
| file_paths=file_paths, |
| targets=["analysis"] |
| ) |
| |
| |
| batch_summary = result.get("summary", {}) |
| batch_results = result.get("batch_results", []) |
| |
| formatted_output = f"π Batch Analysis Results\n" |
| formatted_output += f"Total files: {batch_summary.get('processing_stats', {}).get('total_files', 0)}\n" |
| formatted_output += f"Successful: {batch_summary.get('processing_stats', {}).get('successful', 0)}\n" |
| formatted_output += f"Failed: {batch_summary.get('processing_stats', {}).get('failed', 0)}\n" |
| formatted_output += f"Success rate: {batch_summary.get('processing_stats', {}).get('success_rate', '0%')}\n\n" |
| |
| if batch_summary.get("batch_analysis"): |
| formatted_output += f"π Batch Summary:\n{batch_summary['batch_analysis']}\n\n" |
| |
| formatted_output += "π Individual Results:\n" |
| for i, file_result in enumerate(batch_results): |
| formatted_output += f"\n--- File {i+1}: {Path(file_result.get('file_path', 'Unknown')).name} ---\n" |
| if "error" in file_result: |
| formatted_output += f"β Error: {file_result['error']}\n" |
| else: |
| formatted_output += f"β
{file_result.get('analysis', 'No analysis')}\n" |
| |
| return formatted_output, None, None |
|
|
| def handle_research_analysis(file, prompt, username="anonymous", use_streaming=False): |
| """Handle research analysis with R&D pipeline focus""" |
| if file is None: |
| return "Please upload a PDF.", None, None |
| |
| validate_file_size(file) |
| path = save_uploaded_file(file, username) |
| |
| |
| |
| result = run_async( |
| ORCHESTRATOR.handle_user_prompt, |
| user_id=username, |
| prompt=prompt, |
| file_path=path, |
| targets=["research"] |
| ) |
| return result.get("research_analysis", "No research analysis result."), None, None |
|
|
| def handle_export(result_text, export_format, username="anonymous"): |
| """Handle export of analysis results""" |
| if not result_text or result_text.strip() == "": |
| return "No content to export.", None |
| |
| try: |
| if export_format == "txt": |
| filepath = EXPORT_MANAGER.export_text(result_text, username=username) |
| elif export_format == "json": |
| data = {"analysis": result_text, "exported_by": username, "timestamp": time.time()} |
| filepath = EXPORT_MANAGER.export_json(data, username=username) |
| elif export_format == "pdf": |
| filepath = EXPORT_MANAGER.export_pdf(result_text, username=username) |
| else: |
| return f"Unsupported export format: {export_format}", None |
| |
| return f"β
Export successful! File saved to: {filepath}", filepath |
| except Exception as e: |
| return f"β Export failed: {str(e)}", None |
|
|
| def get_custom_prompts(): |
| """Get available custom prompts""" |
| prompts = PROMPT_MANAGER.get_all_prompts() |
| return list(prompts.keys()) |
|
|
| def load_custom_prompt(prompt_id): |
| """Load a custom prompt template""" |
| return PROMPT_MANAGER.get_prompt(prompt_id) or "" |
|
|
| |
| |
| |
| with gr.Blocks(title="PDF Analysis & Orchestrator", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# π PDF Analysis & Orchestrator - Intelligent Document Processing") |
| gr.Markdown("Upload PDFs and provide instructions for analysis, summarization, or explanation. Now with enhanced features!") |
|
|
| with gr.Tabs(): |
| |
| with gr.Tab("π Single Document Analysis"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| pdf_in = gr.File(label="Upload PDF", file_types=[".pdf"], elem_id="file_upload") |
| username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="username") |
| |
| |
| with gr.Accordion("π― Custom Prompts", open=False): |
| prompt_dropdown = gr.Dropdown( |
| choices=get_custom_prompts(), |
| label="Select Custom Prompt", |
| value=None |
| ) |
| load_prompt_btn = gr.Button("Load Prompt", size="sm") |
| |
| |
| with gr.Accordion("βοΈ Analysis Options", open=False): |
| use_streaming = gr.Checkbox(label="Enable Streaming Output", value=False) |
| chunk_size = gr.Slider( |
| minimum=5000, maximum=30000, value=15000, step=1000, |
| label="Chunk Size (for large documents)" |
| ) |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### Analysis Instructions") |
| prompt_input = gr.Textbox( |
| lines=4, |
| placeholder="Describe what you want to do with the document...\nExamples:\n- Summarize this document in 3 key points\n- Explain this technical paper for a 10-year-old\n- Segment this document by themes\n- Analyze the key findings", |
| label="Instructions" |
| ) |
| |
| with gr.Row(): |
| submit_btn = gr.Button("π Analyze & Orchestrate", variant="primary", size="lg") |
| clear_btn = gr.Button("ποΈ Clear", size="sm") |
|
|
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| output_box = gr.Textbox(label="Analysis Result", lines=15, max_lines=25, show_copy_button=True) |
| status_box = gr.Textbox(label="Status", value="Ready to analyze documents", interactive=False) |
| |
| with gr.Column(scale=1): |
| |
| with gr.Accordion("πΎ Export Results", open=False): |
| export_format = gr.Dropdown( |
| choices=["txt", "json", "pdf"], |
| label="Export Format", |
| value="txt" |
| ) |
| export_btn = gr.Button("π₯ Export", variant="secondary") |
| export_status = gr.Textbox(label="Export Status", interactive=False) |
| |
| |
| with gr.Accordion("π Document Info", open=False): |
| doc_info = gr.Textbox(label="Document Information", interactive=False, lines=6) |
|
|
| |
| with gr.Tab("π¬ Senior Research Analyst"): |
| gr.Markdown("### π― R&D Pipeline Analysis") |
| gr.Markdown("Act as a senior research analyst: extract high-value, novel ideas and convert them into concrete R&D pipeline outcomes (experiments β prototypes β product decisions)") |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| research_pdf_in = gr.File(label="Upload Research Document", file_types=[".pdf"], elem_id="research_file_upload") |
| research_username_input = gr.Textbox(label="Username (optional)", placeholder="anonymous", elem_id="research_username") |
| |
| |
| with gr.Accordion("π― Research Prompts", open=False): |
| research_prompt_dropdown = gr.Dropdown( |
| choices=[pid for pid, prompt in PROMPT_MANAGER.get_all_prompts().items() if prompt.get("category") == "research"], |
| label="Select Research Prompt", |
| value="research_pipeline" |
| ) |
| load_research_prompt_btn = gr.Button("Load Research Prompt", size="sm") |
| |
| |
| with gr.Accordion("βοΈ Research Options", open=False): |
| gr.Markdown("Research analysis uses comprehensive processing for detailed R&D pipeline insights.") |
| |
| with gr.Column(scale=2): |
| gr.Markdown("### Research Analysis Instructions") |
| research_prompt_input = gr.Textbox( |
| lines=4, |
| placeholder="Focus on extracting novel ideas with high product/engineering impact...\nExamples:\n- Identify breakthrough concepts for R&D pipeline\n- Assess commercial viability of technical innovations\n- Design experimental frameworks for validation\n- Create prototype development roadmaps", |
| label="Research Instructions" |
| ) |
| |
| with gr.Row(): |
| research_submit_btn = gr.Button("π¬ Research Analysis", variant="primary", size="lg") |
| research_clear_btn = gr.Button("ποΈ Clear", size="sm") |
|
|
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| research_output_box = gr.Textbox(label="Research Analysis Result", lines=20, max_lines=30, show_copy_button=True) |
| research_status_box = gr.Textbox(label="Research Status", value="Ready for research analysis", interactive=False) |
| |
| with gr.Column(scale=1): |
| |
| with gr.Accordion("πΎ Export Research Results", open=False): |
| research_export_format = gr.Dropdown( |
| choices=["txt", "json", "pdf"], |
| label="Export Format", |
| value="txt" |
| ) |
| research_export_btn = gr.Button("π₯ Export Research", variant="secondary") |
| research_export_status = gr.Textbox(label="Export Status", interactive=False) |
| |
| |
| with gr.Accordion("π Research Insights", open=False): |
| research_insights = gr.Textbox(label="Key Insights Summary", interactive=False, lines=8) |
|
|
| |
| with gr.Tab("π Batch Processing"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| batch_files = gr.File( |
| label="Upload Multiple PDFs", |
| file_count="multiple", |
| file_types=[".pdf"] |
| ) |
| batch_username = gr.Textbox(label="Username (optional)", placeholder="anonymous") |
| |
| with gr.Column(scale=2): |
| batch_prompt = gr.Textbox( |
| lines=3, |
| placeholder="Enter analysis instructions for all documents...", |
| label="Batch Analysis Instructions" |
| ) |
| batch_submit = gr.Button("π Process Batch", variant="primary", size="lg") |
| |
| batch_output = gr.Textbox(label="Batch Results", lines=20, max_lines=30, show_copy_button=True) |
| batch_status = gr.Textbox(label="Batch Status", interactive=False) |
|
|
| |
| with gr.Tab("π― Manage Prompts"): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### Add New Prompt") |
| new_prompt_id = gr.Textbox(label="Prompt ID", placeholder="my_custom_prompt") |
| new_prompt_name = gr.Textbox(label="Prompt Name", placeholder="My Custom Analysis") |
| new_prompt_desc = gr.Textbox(label="Description", placeholder="What this prompt does") |
| new_prompt_template = gr.Textbox( |
| lines=4, |
| label="Prompt Template", |
| placeholder="Enter your custom prompt template..." |
| ) |
| new_prompt_category = gr.Dropdown( |
| choices=["custom", "business", "technical", "explanation", "analysis"], |
| label="Category", |
| value="custom" |
| ) |
| add_prompt_btn = gr.Button("β Add Prompt", variant="primary") |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### Existing Prompts") |
| prompt_list = gr.Dataframe( |
| headers=["ID", "Name", "Category", "Description"], |
| datatype=["str", "str", "str", "str"], |
| interactive=False, |
| label="Available Prompts" |
| ) |
| refresh_prompts_btn = gr.Button("π Refresh List") |
| delete_prompt_id = gr.Textbox(label="Prompt ID to Delete", placeholder="prompt_id") |
| delete_prompt_btn = gr.Button("ποΈ Delete Prompt", variant="stop") |
|
|
| |
| |
| submit_btn.click( |
| fn=handle_analysis, |
| inputs=[pdf_in, prompt_input, username_input, use_streaming], |
| outputs=[output_box, status_box, doc_info] |
| ) |
| |
| |
| load_prompt_btn.click( |
| fn=load_custom_prompt, |
| inputs=[prompt_dropdown], |
| outputs=[prompt_input] |
| ) |
| |
| |
| export_btn.click( |
| fn=handle_export, |
| inputs=[output_box, export_format, username_input], |
| outputs=[export_status, gr.State()] |
| ) |
| |
| |
| clear_btn.click( |
| fn=lambda: ("", "", "", "Ready"), |
| inputs=[], |
| outputs=[pdf_in, prompt_input, output_box, status_box] |
| ) |
| |
| |
| research_submit_btn.click( |
| fn=handle_research_analysis, |
| inputs=[research_pdf_in, research_prompt_input, research_username_input], |
| outputs=[research_output_box, research_status_box, research_insights] |
| ) |
| |
| |
| load_research_prompt_btn.click( |
| fn=load_custom_prompt, |
| inputs=[research_prompt_dropdown], |
| outputs=[research_prompt_input] |
| ) |
| |
| |
| research_export_btn.click( |
| fn=handle_export, |
| inputs=[research_output_box, research_export_format, research_username_input], |
| outputs=[research_export_status, gr.State()] |
| ) |
| |
| |
| research_clear_btn.click( |
| fn=lambda: ("", "", "", "Ready for research analysis", ""), |
| inputs=[], |
| outputs=[research_pdf_in, research_prompt_input, research_output_box, research_status_box, research_insights] |
| ) |
| |
| |
| batch_submit.click( |
| fn=handle_batch_analysis, |
| inputs=[batch_files, batch_prompt, batch_username], |
| outputs=[batch_output, batch_status, gr.State()] |
| ) |
| |
| |
| add_prompt_btn.click( |
| fn=lambda id, name, desc, template, cat: PROMPT_MANAGER.add_prompt(id, name, desc, template, cat), |
| inputs=[new_prompt_id, new_prompt_name, new_prompt_desc, new_prompt_template, new_prompt_category], |
| outputs=[] |
| ) |
| |
| refresh_prompts_btn.click( |
| fn=lambda: [[pid, prompt["name"], prompt["category"], prompt["description"]] |
| for pid, prompt in PROMPT_MANAGER.get_all_prompts().items()], |
| inputs=[], |
| outputs=[prompt_list] |
| ) |
| |
| delete_prompt_btn.click( |
| fn=lambda pid: PROMPT_MANAGER.delete_prompt(pid), |
| inputs=[delete_prompt_id], |
| outputs=[] |
| ) |
|
|
| |
| gr.Examples( |
| examples=[ |
| ["Summarize this document in 3 key points"], |
| ["Explain this technical content for a general audience"], |
| ["Segment this document by main themes or topics"], |
| ["Analyze the key findings and recommendations"], |
| ["Create an executive summary of this document"], |
| ], |
| inputs=prompt_input, |
| label="Example Instructions" |
| ) |
| |
| |
| gr.Examples( |
| examples=[ |
| ["Identify breakthrough concepts with high product/engineering impact and design specific experiments to validate them"], |
| ["Assess the commercial viability of technical innovations and create prototype development roadmaps"], |
| ["Extract novel methodologies and convert them into concrete R&D pipeline outcomes"], |
| ["Analyze technical concepts for transformative potential and generate strategic product decisions"], |
| ["Design experimental frameworks to validate key hypotheses with measurable success criteria"], |
| ], |
| inputs=research_prompt_input, |
| label="Research Analysis Examples" |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |