import os import gradio as gr import json import logging import time from typing import List, Dict, Generator from dotenv import load_dotenv # Try to use truststore for corporate network compatibility (local only) if not os.getenv("SPACE_ID"): try: import truststore truststore.inject_into_ssl() print("💡 Truststore injected (Corporate SSL mode)") except ImportError: pass # Import our agents from the src directory from src.clarifier import Clarifier from src.planner import Planner from src.splitter import Splitter from src.coordinator import Coordinator from src.reviewer import Reviewer # Configuration and Secrets load_dotenv() def get_secret(key): val = os.getenv(key) return val.strip() if val is not None else "" HF_KEY = get_secret("HF_KEY") TAVILY_API_KEY = get_secret("TAVILY_API_KEY") # Logging setup logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # --- MODELS --- # Using the most stable and powerful mid-sized model for all tasks STABLE_MODEL = 'Qwen/Qwen2.5-7B-Instruct' CLARIFIER_MODEL = STABLE_MODEL PLANNER_MODEL = STABLE_MODEL SPLITTER_MODEL = STABLE_MODEL COORDINATOR_MODEL = STABLE_MODEL SUBAGENT_MODEL = STABLE_MODEL REVIEWER_MODEL = STABLE_MODEL # --- GRADIO THEME --- theme = gr.themes.Soft( primary_hue="blue", secondary_hue="slate", neutral_hue="slate", font=[gr.themes.GoogleFont('Inter'), 'sans-serif'], ) def start_clarification(topic, hf_key, state): if not topic: return gr.update(), "### ⚠️ Warning\nPlease enter a topic.", state, gr.update() clean_key = hf_key.strip() if hf_key else "" if not clean_key: return gr.update(), "### ⚠️ Warning\nPlease provide a valid Hugging Face Token.", state, gr.update() state["initial_topic"] = topic clarifier = Clarifier(model_name=CLARIFIER_MODEL, hf_key=clean_key) try: suggestions = clarifier.get_suggestions(topic) state["suggestions"] = suggestions if not suggestions: return gr.update(), "### ❌ Error\nNo suggestions received. This could be due to model traffic. Please try again in 5 seconds.", state, gr.update() suggestion_md = "### 💡 Refine Your Topic\n\nChoose one of the suggested directions or enter a custom one below:\n\n" for i, s in enumerate(suggestions): suggestion_md += f"**Option {i+1}: {s['title']}**\n{s['description']}\n\n" return gr.update(visible=True), suggestion_md, state, gr.update(visible=False) except Exception as e: error_msg = str(e) if "401" in error_msg: error_msg = "401 Unauthorized: Your Hugging Face Token is invalid for the Inference API. Ensure it is a 'Write' token or has 'Inference' scope enabled." return gr.update(), f"### ❌ Error\n{error_msg}", state, gr.update() def select_suggestion(index, custom_topic, state): if custom_topic and custom_topic.strip(): state["final_topic"] = custom_topic elif index is not None and 0 <= int(index)-1 < len(state.get("suggestions", [])): sug = state["suggestions"][int(index)-1] state["final_topic"] = f"{sug['title']}: {sug['description']}" else: return gr.update(), "### ⚠️ Warning\nPlease select an option or enter a custom topic.", state, gr.update() return gr.update(visible=True), f"### 🎯 Target Topic\n**{state['final_topic']}**", state, gr.update(visible=False) def generate_strategy(hf_key, state): clean_key = hf_key.strip() if hf_key else "" if not clean_key: return "### ⚠️ Warning\nHF Key missing.", state, gr.update() planner = Planner(model_name=PLANNER_MODEL, hf_key=clean_key) try: plan = planner.plan(state["final_topic"]) state["research_plan"] = plan return plan, state, gr.update(visible=True) except Exception as e: return f"### ❌ Error\n{str(e)}", state, gr.update() def decompose_tasks(hf_key, state): clean_key = hf_key.strip() if hf_key else "" if not clean_key: return "### ⚠️ Warning\nHF Key missing.", state, gr.update() splitter = Splitter(model_name=SPLITTER_MODEL, hf_key=clean_key) try: subtasks = splitter.split(state["research_plan"]) state["subtasks"] = subtasks tasks_md = "### 📋 Generated Subtasks\n\n" for task in subtasks: tasks_md += f"- **{task['title']}** (ID: `{task['id']}`)\n" return tasks_md, state, gr.update(visible=True) except Exception as e: return f"### ❌ Error\n{str(e)}", state, gr.update() def run_research(hf_key, tavily_key, state): from smolagents import CodeAgent, tool, InferenceClientModel from src.prompts import SUBAGENT_DIRECTION, COORDINATOR_DIRECTION from tavily import TavilyClient clean_hf = hf_key.strip() if hf_key else "" clean_tavily = tavily_key.strip() if tavily_key else "" if not clean_hf: yield "### ❌ Error\nHF Token missing.", state, gr.update(), "" return if not clean_tavily: yield "### ❌ Error\nTavily API Key missing.", state, gr.update(), "" return tavily_client = TavilyClient(api_key=clean_tavily) @tool def web_search(query: str) -> str: """ Search the web for real-time information using Tavily. Args: query: The search query to look up. """ try: response = tavily_client.search(query=query, search_depth="advanced", max_results=5) results = response.get("results", []) formatted = [f"Title: {r['title']}\nURL: {r['url']}\nContent: {r['content']}\n" for r in results] return "\n---\n".join(formatted) if formatted else "No results." except Exception as e: return f"Search failed: {e}" coordinator_model = InferenceClientModel(model_id=COORDINATOR_MODEL, api_key=clean_hf) subagent_model = InferenceClientModel(model_id=SUBAGENT_MODEL, api_key=clean_hf) current_findings = [] log_content = "### 🔍 Agentic Research Progress\n\n" subtasks = state.get("subtasks", []) if not subtasks: yield "### ❌ Error\nNo subtasks found.", state, gr.update(), "" return for i, task in enumerate(subtasks): t_id = task['id'] t_title = task['title'] t_desc = task['description'] log_content += f"**Agent {i+1} working on:** {t_title}...\n" yield log_content, state, gr.update(), "" subagent = CodeAgent( tools=[web_search], model=subagent_model, add_base_tools=False, max_steps=2 ) prompt = SUBAGENT_DIRECTION.format( user_query=state.get("final_topic", ""), research_plan=state.get("research_plan", ""), subtask_id=t_id, subtask_title=t_title, subtask_description=t_desc ) try: finding = subagent.run(prompt) current_findings.append(f"FINDINGS FOR TASK {t_id}: {t_title}\n\n{finding}") log_content += f"✅ {t_title} complete!\n\n" yield log_content, state, gr.update(), "" except Exception as e: log_content += f"❌ {t_title} failed: {e}\n\n" yield log_content, state, gr.update(), "" log_content += "### ✨ Synthesis: Generating Final Report...\n" yield log_content, state, gr.update(), "" sys_prompt = COORDINATOR_DIRECTION.format( user_query=state.get("final_topic", ""), research_plan=state.get("research_plan", ""), subtasks_json=json.dumps(subtasks, indent=2) ) user_prompt = f"Synthesize these findings:\n\n" + "\n\n".join(current_findings) try: response = coordinator_model(messages=[ {"role": "system", "content": sys_prompt}, {"role": "user", "content": user_prompt} ]) final_report = response.content if "" in final_report and "" in final_report: final_report = final_report.split("")[-1].strip() log_content += "### 🖋️ Review: Polishing and Finalizing...\n" yield log_content, state, gr.update(), "" reviewer = Reviewer(model_name=REVIEWER_MODEL, hf_key=clean_hf) polished_report = reviewer.review(final_report) state["final_report"] = polished_report os.makedirs("temp_outputs", exist_ok=True) ts = int(time.time()) pdf_name = f"research_{ts}.pdf" pdf_path = os.path.join("temp_outputs", pdf_name) if reviewer.generate_pdf(polished_report, pdf_path): state["pdf_path"] = pdf_path md_name = f"research_{ts}.md" md_path = os.path.join("temp_outputs", md_name) with open(md_path, "w", encoding="utf-8") as f: f.write(polished_report) state["md_path"] = md_path yield log_content + "✅ Research mission accomplished!", state, gr.update(visible=True), polished_report except Exception as e: yield log_content + f"❌ Synthesis failed: {e}", state, gr.update(), "" def reset_all(): return ( gr.update(value="", visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), {}, "### 🧬 Status\nReady to research.", "" ) with gr.Blocks(theme=theme, title="Deep Research Agent") as demo: state = gr.State({}) gr.Markdown("# 🧬 Deep Research Agent") gr.Markdown("### The ultimate AI research pipeline that browses the web for you.") with gr.Row(): with gr.Column(scale=1, variant="panel"): gr.Markdown("## ⚙️ Configuration") hf_key_input = gr.Textbox(label="Hugging Face Token", type="password", value=HF_KEY) tavily_key_input = gr.Textbox(label="Tavily API Key", type="password", value=TAVILY_API_KEY) reset_btn = gr.Button("🔄 New Research", variant="secondary") gr.Markdown("---") log_output = gr.Markdown("### 🧬 Status\nReady to research.") with gr.Column(scale=4): # STEP 1: Introduction with gr.Column(visible=True) as step1_col: gr.Markdown("## 1️⃣ What are you researching?") topic_input = gr.Textbox(label="Enter a broad topic or research question:", placeholder="e.g., Target market for sustainable polymers in Europe") start_btn = gr.Button("Clarify Topic ➡️", variant="primary") # STEP 2: Refinement with gr.Column(visible=False) as step2_col: gr.Markdown("## 2️⃣ Refine Your Topic") suggestion_display = gr.Markdown() with gr.Row(): opt_index = gr.Dropdown(choices=["1", "2", "3"], label="Select Option (Optional)") custom_topic_input = gr.Textbox(label="Or type a custom refined topic:") refine_btn = gr.Button("Set Strategic Topic 🎯", variant="primary") # STEP 3: Planning & Splitting with gr.Column(visible=False) as step3_col: gr.Markdown("## 3️⃣ Strategy & Task Splitting") target_display = gr.Markdown() strat_btn = gr.Button("Generate Strategy 📋", variant="primary") strategy_display = gr.Markdown() split_btn = gr.Button("Decompose into Subtasks 🧩", variant="primary", visible=False) tasks_display = gr.Markdown() execute_btn = gr.Button("🚀 Launch Research Agents", variant="primary", visible=False) # STEP 4: Execution with gr.Column(visible=False) as step4_col: gr.Markdown("## 4️⃣ Agentic Research in Progress") gr.Markdown("Monitoring agent fleet... check the status panel on the left.") # STEP 5: Results with gr.Column(visible=False) as step5_col: gr.Markdown("## 🏁 Final Research Report") final_md_display = gr.Markdown() with gr.Row(): download_md = gr.File(label="Download Markdown") download_pdf = gr.File(label="Download PDF") new_research_btn = gr.Button("Start Over", variant="primary") # --- Callbacks --- start_btn.click( start_clarification, inputs=[topic_input, hf_key_input, state], outputs=[step2_col, suggestion_display, state, step1_col] ) refine_btn.click( select_suggestion, inputs=[opt_index, custom_topic_input, state], outputs=[step3_col, target_display, state, step2_col] ) strat_btn.click( generate_strategy, inputs=[hf_key_input, state], outputs=[strategy_display, state, split_btn] ) split_btn.click( decompose_tasks, inputs=[hf_key_input, state], outputs=[tasks_display, state, execute_btn] ) execute_btn.click( lambda: gr.update(visible=True), outputs=step4_col ).then( run_research, inputs=[hf_key_input, tavily_key_input, state], outputs=[log_output, state, step5_col, final_md_display] ).then( lambda s: (s.get("md_path"), s.get("pdf_path")), inputs=[state], outputs=[download_md, download_pdf] ) reset_btn.click( reset_all, outputs=[topic_input, step2_col, step3_col, step4_col, step5_col, state, log_output, final_md_display] ) new_research_btn.click( reset_all, outputs=[topic_input, step2_col, step3_col, step4_col, step5_col, state, log_output, final_md_display] ) if __name__ == "__main__": demo.queue() demo.launch()