Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import json | |
| import logging | |
| import time | |
| from typing import List, Dict, Generator | |
| from dotenv import load_dotenv | |
| # Try to use truststore for corporate network compatibility (local only) | |
| if not os.getenv("SPACE_ID"): | |
| try: | |
| import truststore | |
| truststore.inject_into_ssl() | |
| print("💡 Truststore injected (Corporate SSL mode)") | |
| except ImportError: | |
| pass | |
| # Import our agents from the src directory | |
| from src.clarifier import Clarifier | |
| from src.planner import Planner | |
| from src.splitter import Splitter | |
| from src.coordinator import Coordinator | |
| from src.reviewer import Reviewer | |
| # Configuration and Secrets | |
| load_dotenv() | |
| def get_secret(key): | |
| val = os.getenv(key) | |
| return val.strip() if val is not None else "" | |
| HF_KEY = get_secret("HF_KEY") | |
| TAVILY_API_KEY = get_secret("TAVILY_API_KEY") | |
| # Logging setup | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # --- MODELS --- | |
| # Using the most stable and powerful mid-sized model for all tasks | |
| STABLE_MODEL = 'Qwen/Qwen2.5-7B-Instruct' | |
| CLARIFIER_MODEL = STABLE_MODEL | |
| PLANNER_MODEL = STABLE_MODEL | |
| SPLITTER_MODEL = STABLE_MODEL | |
| COORDINATOR_MODEL = STABLE_MODEL | |
| SUBAGENT_MODEL = STABLE_MODEL | |
| REVIEWER_MODEL = STABLE_MODEL | |
| # --- GRADIO THEME --- | |
| theme = gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="slate", | |
| neutral_hue="slate", | |
| font=[gr.themes.GoogleFont('Inter'), 'sans-serif'], | |
| ) | |
| def start_clarification(topic, hf_key, state): | |
| if not topic: | |
| return gr.update(), "### ⚠️ Warning\nPlease enter a topic.", state, gr.update() | |
| clean_key = hf_key.strip() if hf_key else "" | |
| if not clean_key: | |
| return gr.update(), "### ⚠️ Warning\nPlease provide a valid Hugging Face Token.", state, gr.update() | |
| state["initial_topic"] = topic | |
| clarifier = Clarifier(model_name=CLARIFIER_MODEL, hf_key=clean_key) | |
| try: | |
| suggestions = clarifier.get_suggestions(topic) | |
| state["suggestions"] = suggestions | |
| if not suggestions: | |
| return gr.update(), "### ❌ Error\nNo suggestions received. This could be due to model traffic. Please try again in 5 seconds.", state, gr.update() | |
| suggestion_md = "### 💡 Refine Your Topic\n\nChoose one of the suggested directions or enter a custom one below:\n\n" | |
| for i, s in enumerate(suggestions): | |
| suggestion_md += f"**Option {i+1}: {s['title']}**\n{s['description']}\n\n" | |
| return gr.update(visible=True), suggestion_md, state, gr.update(visible=False) | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "401" in error_msg: | |
| error_msg = "401 Unauthorized: Your Hugging Face Token is invalid for the Inference API. Ensure it is a 'Write' token or has 'Inference' scope enabled." | |
| return gr.update(), f"### ❌ Error\n{error_msg}", state, gr.update() | |
| def select_suggestion(index, custom_topic, state): | |
| if custom_topic and custom_topic.strip(): | |
| state["final_topic"] = custom_topic | |
| elif index is not None and 0 <= int(index)-1 < len(state.get("suggestions", [])): | |
| sug = state["suggestions"][int(index)-1] | |
| state["final_topic"] = f"{sug['title']}: {sug['description']}" | |
| else: | |
| return gr.update(), "### ⚠️ Warning\nPlease select an option or enter a custom topic.", state, gr.update() | |
| return gr.update(visible=True), f"### 🎯 Target Topic\n**{state['final_topic']}**", state, gr.update(visible=False) | |
| def generate_strategy(hf_key, state): | |
| clean_key = hf_key.strip() if hf_key else "" | |
| if not clean_key: | |
| return "### ⚠️ Warning\nHF Key missing.", state, gr.update() | |
| planner = Planner(model_name=PLANNER_MODEL, hf_key=clean_key) | |
| try: | |
| plan = planner.plan(state["final_topic"]) | |
| state["research_plan"] = plan | |
| return plan, state, gr.update(visible=True) | |
| except Exception as e: | |
| return f"### ❌ Error\n{str(e)}", state, gr.update() | |
| def decompose_tasks(hf_key, state): | |
| clean_key = hf_key.strip() if hf_key else "" | |
| if not clean_key: | |
| return "### ⚠️ Warning\nHF Key missing.", state, gr.update() | |
| splitter = Splitter(model_name=SPLITTER_MODEL, hf_key=clean_key) | |
| try: | |
| subtasks = splitter.split(state["research_plan"]) | |
| state["subtasks"] = subtasks | |
| tasks_md = "### 📋 Generated Subtasks\n\n" | |
| for task in subtasks: | |
| tasks_md += f"- **{task['title']}** (ID: `{task['id']}`)\n" | |
| return tasks_md, state, gr.update(visible=True) | |
| except Exception as e: | |
| return f"### ❌ Error\n{str(e)}", state, gr.update() | |
| def run_research(hf_key, tavily_key, state): | |
| from smolagents import CodeAgent, tool, InferenceClientModel | |
| from src.prompts import SUBAGENT_DIRECTION, COORDINATOR_DIRECTION | |
| from tavily import TavilyClient | |
| clean_hf = hf_key.strip() if hf_key else "" | |
| clean_tavily = tavily_key.strip() if tavily_key else "" | |
| if not clean_hf: | |
| yield "### ❌ Error\nHF Token missing.", state, gr.update(), "" | |
| return | |
| if not clean_tavily: | |
| yield "### ❌ Error\nTavily API Key missing.", state, gr.update(), "" | |
| return | |
| tavily_client = TavilyClient(api_key=clean_tavily) | |
| def web_search(query: str) -> str: | |
| """ | |
| Search the web for real-time information using Tavily. | |
| Args: | |
| query: The search query to look up. | |
| """ | |
| try: | |
| response = tavily_client.search(query=query, search_depth="advanced", max_results=5) | |
| results = response.get("results", []) | |
| formatted = [f"Title: {r['title']}\nURL: {r['url']}\nContent: {r['content']}\n" for r in results] | |
| return "\n---\n".join(formatted) if formatted else "No results." | |
| except Exception as e: | |
| return f"Search failed: {e}" | |
| coordinator_model = InferenceClientModel(model_id=COORDINATOR_MODEL, api_key=clean_hf) | |
| subagent_model = InferenceClientModel(model_id=SUBAGENT_MODEL, api_key=clean_hf) | |
| current_findings = [] | |
| log_content = "### 🔍 Agentic Research Progress\n\n" | |
| subtasks = state.get("subtasks", []) | |
| if not subtasks: | |
| yield "### ❌ Error\nNo subtasks found.", state, gr.update(), "" | |
| return | |
| for i, task in enumerate(subtasks): | |
| t_id = task['id'] | |
| t_title = task['title'] | |
| t_desc = task['description'] | |
| log_content += f"**Agent {i+1} working on:** {t_title}...\n" | |
| yield log_content, state, gr.update(), "" | |
| subagent = CodeAgent( | |
| tools=[web_search], | |
| model=subagent_model, | |
| add_base_tools=False, | |
| max_steps=2 | |
| ) | |
| prompt = SUBAGENT_DIRECTION.format( | |
| user_query=state.get("final_topic", ""), | |
| research_plan=state.get("research_plan", ""), | |
| subtask_id=t_id, | |
| subtask_title=t_title, | |
| subtask_description=t_desc | |
| ) | |
| try: | |
| finding = subagent.run(prompt) | |
| current_findings.append(f"FINDINGS FOR TASK {t_id}: {t_title}\n\n{finding}") | |
| log_content += f"✅ {t_title} complete!\n\n" | |
| yield log_content, state, gr.update(), "" | |
| except Exception as e: | |
| log_content += f"❌ {t_title} failed: {e}\n\n" | |
| yield log_content, state, gr.update(), "" | |
| log_content += "### ✨ Synthesis: Generating Final Report...\n" | |
| yield log_content, state, gr.update(), "" | |
| sys_prompt = COORDINATOR_DIRECTION.format( | |
| user_query=state.get("final_topic", ""), | |
| research_plan=state.get("research_plan", ""), | |
| subtasks_json=json.dumps(subtasks, indent=2) | |
| ) | |
| user_prompt = f"Synthesize these findings:\n\n" + "\n\n".join(current_findings) | |
| try: | |
| response = coordinator_model(messages=[ | |
| {"role": "system", "content": sys_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ]) | |
| final_report = response.content | |
| if "<think>" in final_report and "</think>" in final_report: | |
| final_report = final_report.split("</think>")[-1].strip() | |
| log_content += "### 🖋️ Review: Polishing and Finalizing...\n" | |
| yield log_content, state, gr.update(), "" | |
| reviewer = Reviewer(model_name=REVIEWER_MODEL, hf_key=clean_hf) | |
| polished_report = reviewer.review(final_report) | |
| state["final_report"] = polished_report | |
| os.makedirs("temp_outputs", exist_ok=True) | |
| ts = int(time.time()) | |
| pdf_name = f"research_{ts}.pdf" | |
| pdf_path = os.path.join("temp_outputs", pdf_name) | |
| if reviewer.generate_pdf(polished_report, pdf_path): | |
| state["pdf_path"] = pdf_path | |
| md_name = f"research_{ts}.md" | |
| md_path = os.path.join("temp_outputs", md_name) | |
| with open(md_path, "w", encoding="utf-8") as f: | |
| f.write(polished_report) | |
| state["md_path"] = md_path | |
| yield log_content + "✅ Research mission accomplished!", state, gr.update(visible=True), polished_report | |
| except Exception as e: | |
| yield log_content + f"❌ Synthesis failed: {e}", state, gr.update(), "" | |
| def reset_all(): | |
| return ( | |
| gr.update(value="", visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| {}, | |
| "### 🧬 Status\nReady to research.", | |
| "" | |
| ) | |
| with gr.Blocks(theme=theme, title="Deep Research Agent") as demo: | |
| state = gr.State({}) | |
| gr.Markdown("# 🧬 Deep Research Agent") | |
| gr.Markdown("### The ultimate AI research pipeline that browses the web for you.") | |
| with gr.Row(): | |
| with gr.Column(scale=1, variant="panel"): | |
| gr.Markdown("## ⚙️ Configuration") | |
| hf_key_input = gr.Textbox(label="Hugging Face Token", type="password", value=HF_KEY) | |
| tavily_key_input = gr.Textbox(label="Tavily API Key", type="password", value=TAVILY_API_KEY) | |
| reset_btn = gr.Button("🔄 New Research", variant="secondary") | |
| gr.Markdown("---") | |
| log_output = gr.Markdown("### 🧬 Status\nReady to research.") | |
| with gr.Column(scale=4): | |
| # STEP 1: Introduction | |
| with gr.Column(visible=True) as step1_col: | |
| gr.Markdown("## 1️⃣ What are you researching?") | |
| topic_input = gr.Textbox(label="Enter a broad topic or research question:", placeholder="e.g., Target market for sustainable polymers in Europe") | |
| start_btn = gr.Button("Clarify Topic ➡️", variant="primary") | |
| # STEP 2: Refinement | |
| with gr.Column(visible=False) as step2_col: | |
| gr.Markdown("## 2️⃣ Refine Your Topic") | |
| suggestion_display = gr.Markdown() | |
| with gr.Row(): | |
| opt_index = gr.Dropdown(choices=["1", "2", "3"], label="Select Option (Optional)") | |
| custom_topic_input = gr.Textbox(label="Or type a custom refined topic:") | |
| refine_btn = gr.Button("Set Strategic Topic 🎯", variant="primary") | |
| # STEP 3: Planning & Splitting | |
| with gr.Column(visible=False) as step3_col: | |
| gr.Markdown("## 3️⃣ Strategy & Task Splitting") | |
| target_display = gr.Markdown() | |
| strat_btn = gr.Button("Generate Strategy 📋", variant="primary") | |
| strategy_display = gr.Markdown() | |
| split_btn = gr.Button("Decompose into Subtasks 🧩", variant="primary", visible=False) | |
| tasks_display = gr.Markdown() | |
| execute_btn = gr.Button("🚀 Launch Research Agents", variant="primary", visible=False) | |
| # STEP 4: Execution | |
| with gr.Column(visible=False) as step4_col: | |
| gr.Markdown("## 4️⃣ Agentic Research in Progress") | |
| gr.Markdown("Monitoring agent fleet... check the status panel on the left.") | |
| # STEP 5: Results | |
| with gr.Column(visible=False) as step5_col: | |
| gr.Markdown("## 🏁 Final Research Report") | |
| final_md_display = gr.Markdown() | |
| with gr.Row(): | |
| download_md = gr.File(label="Download Markdown") | |
| download_pdf = gr.File(label="Download PDF") | |
| new_research_btn = gr.Button("Start Over", variant="primary") | |
| # --- Callbacks --- | |
| start_btn.click( | |
| start_clarification, | |
| inputs=[topic_input, hf_key_input, state], | |
| outputs=[step2_col, suggestion_display, state, step1_col] | |
| ) | |
| refine_btn.click( | |
| select_suggestion, | |
| inputs=[opt_index, custom_topic_input, state], | |
| outputs=[step3_col, target_display, state, step2_col] | |
| ) | |
| strat_btn.click( | |
| generate_strategy, | |
| inputs=[hf_key_input, state], | |
| outputs=[strategy_display, state, split_btn] | |
| ) | |
| split_btn.click( | |
| decompose_tasks, | |
| inputs=[hf_key_input, state], | |
| outputs=[tasks_display, state, execute_btn] | |
| ) | |
| execute_btn.click( | |
| lambda: gr.update(visible=True), outputs=step4_col | |
| ).then( | |
| run_research, | |
| inputs=[hf_key_input, tavily_key_input, state], | |
| outputs=[log_output, state, step5_col, final_md_display] | |
| ).then( | |
| lambda s: (s.get("md_path"), s.get("pdf_path")), | |
| inputs=[state], | |
| outputs=[download_md, download_pdf] | |
| ) | |
| reset_btn.click( | |
| reset_all, | |
| outputs=[topic_input, step2_col, step3_col, step4_col, step5_col, state, log_output, final_md_display] | |
| ) | |
| new_research_btn.click( | |
| reset_all, | |
| outputs=[topic_input, step2_col, step3_col, step4_col, step5_col, state, log_output, final_md_display] | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue() | |
| demo.launch() | |