research_agent / app.py
Luis Vizcaya
Use Qwen2.5-7B for all agents to resolve stability issues
915ebc8
import os
import gradio as gr
import json
import logging
import time
from typing import List, Dict, Generator
from dotenv import load_dotenv
# Try to use truststore for corporate network compatibility (local only)
if not os.getenv("SPACE_ID"):
try:
import truststore
truststore.inject_into_ssl()
print("💡 Truststore injected (Corporate SSL mode)")
except ImportError:
pass
# Import our agents from the src directory
from src.clarifier import Clarifier
from src.planner import Planner
from src.splitter import Splitter
from src.coordinator import Coordinator
from src.reviewer import Reviewer
# Configuration and Secrets
load_dotenv()
def get_secret(key):
val = os.getenv(key)
return val.strip() if val is not None else ""
HF_KEY = get_secret("HF_KEY")
TAVILY_API_KEY = get_secret("TAVILY_API_KEY")
# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- MODELS ---
# Using the most stable and powerful mid-sized model for all tasks
STABLE_MODEL = 'Qwen/Qwen2.5-7B-Instruct'
CLARIFIER_MODEL = STABLE_MODEL
PLANNER_MODEL = STABLE_MODEL
SPLITTER_MODEL = STABLE_MODEL
COORDINATOR_MODEL = STABLE_MODEL
SUBAGENT_MODEL = STABLE_MODEL
REVIEWER_MODEL = STABLE_MODEL
# --- GRADIO THEME ---
theme = gr.themes.Soft(
primary_hue="blue",
secondary_hue="slate",
neutral_hue="slate",
font=[gr.themes.GoogleFont('Inter'), 'sans-serif'],
)
def start_clarification(topic, hf_key, state):
if not topic:
return gr.update(), "### ⚠️ Warning\nPlease enter a topic.", state, gr.update()
clean_key = hf_key.strip() if hf_key else ""
if not clean_key:
return gr.update(), "### ⚠️ Warning\nPlease provide a valid Hugging Face Token.", state, gr.update()
state["initial_topic"] = topic
clarifier = Clarifier(model_name=CLARIFIER_MODEL, hf_key=clean_key)
try:
suggestions = clarifier.get_suggestions(topic)
state["suggestions"] = suggestions
if not suggestions:
return gr.update(), "### ❌ Error\nNo suggestions received. This could be due to model traffic. Please try again in 5 seconds.", state, gr.update()
suggestion_md = "### 💡 Refine Your Topic\n\nChoose one of the suggested directions or enter a custom one below:\n\n"
for i, s in enumerate(suggestions):
suggestion_md += f"**Option {i+1}: {s['title']}**\n{s['description']}\n\n"
return gr.update(visible=True), suggestion_md, state, gr.update(visible=False)
except Exception as e:
error_msg = str(e)
if "401" in error_msg:
error_msg = "401 Unauthorized: Your Hugging Face Token is invalid for the Inference API. Ensure it is a 'Write' token or has 'Inference' scope enabled."
return gr.update(), f"### ❌ Error\n{error_msg}", state, gr.update()
def select_suggestion(index, custom_topic, state):
if custom_topic and custom_topic.strip():
state["final_topic"] = custom_topic
elif index is not None and 0 <= int(index)-1 < len(state.get("suggestions", [])):
sug = state["suggestions"][int(index)-1]
state["final_topic"] = f"{sug['title']}: {sug['description']}"
else:
return gr.update(), "### ⚠️ Warning\nPlease select an option or enter a custom topic.", state, gr.update()
return gr.update(visible=True), f"### 🎯 Target Topic\n**{state['final_topic']}**", state, gr.update(visible=False)
def generate_strategy(hf_key, state):
clean_key = hf_key.strip() if hf_key else ""
if not clean_key:
return "### ⚠️ Warning\nHF Key missing.", state, gr.update()
planner = Planner(model_name=PLANNER_MODEL, hf_key=clean_key)
try:
plan = planner.plan(state["final_topic"])
state["research_plan"] = plan
return plan, state, gr.update(visible=True)
except Exception as e:
return f"### ❌ Error\n{str(e)}", state, gr.update()
def decompose_tasks(hf_key, state):
clean_key = hf_key.strip() if hf_key else ""
if not clean_key:
return "### ⚠️ Warning\nHF Key missing.", state, gr.update()
splitter = Splitter(model_name=SPLITTER_MODEL, hf_key=clean_key)
try:
subtasks = splitter.split(state["research_plan"])
state["subtasks"] = subtasks
tasks_md = "### 📋 Generated Subtasks\n\n"
for task in subtasks:
tasks_md += f"- **{task['title']}** (ID: `{task['id']}`)\n"
return tasks_md, state, gr.update(visible=True)
except Exception as e:
return f"### ❌ Error\n{str(e)}", state, gr.update()
def run_research(hf_key, tavily_key, state):
from smolagents import CodeAgent, tool, InferenceClientModel
from src.prompts import SUBAGENT_DIRECTION, COORDINATOR_DIRECTION
from tavily import TavilyClient
clean_hf = hf_key.strip() if hf_key else ""
clean_tavily = tavily_key.strip() if tavily_key else ""
if not clean_hf:
yield "### ❌ Error\nHF Token missing.", state, gr.update(), ""
return
if not clean_tavily:
yield "### ❌ Error\nTavily API Key missing.", state, gr.update(), ""
return
tavily_client = TavilyClient(api_key=clean_tavily)
@tool
def web_search(query: str) -> str:
"""
Search the web for real-time information using Tavily.
Args:
query: The search query to look up.
"""
try:
response = tavily_client.search(query=query, search_depth="advanced", max_results=5)
results = response.get("results", [])
formatted = [f"Title: {r['title']}\nURL: {r['url']}\nContent: {r['content']}\n" for r in results]
return "\n---\n".join(formatted) if formatted else "No results."
except Exception as e:
return f"Search failed: {e}"
coordinator_model = InferenceClientModel(model_id=COORDINATOR_MODEL, api_key=clean_hf)
subagent_model = InferenceClientModel(model_id=SUBAGENT_MODEL, api_key=clean_hf)
current_findings = []
log_content = "### 🔍 Agentic Research Progress\n\n"
subtasks = state.get("subtasks", [])
if not subtasks:
yield "### ❌ Error\nNo subtasks found.", state, gr.update(), ""
return
for i, task in enumerate(subtasks):
t_id = task['id']
t_title = task['title']
t_desc = task['description']
log_content += f"**Agent {i+1} working on:** {t_title}...\n"
yield log_content, state, gr.update(), ""
subagent = CodeAgent(
tools=[web_search],
model=subagent_model,
add_base_tools=False,
max_steps=2
)
prompt = SUBAGENT_DIRECTION.format(
user_query=state.get("final_topic", ""),
research_plan=state.get("research_plan", ""),
subtask_id=t_id,
subtask_title=t_title,
subtask_description=t_desc
)
try:
finding = subagent.run(prompt)
current_findings.append(f"FINDINGS FOR TASK {t_id}: {t_title}\n\n{finding}")
log_content += f"✅ {t_title} complete!\n\n"
yield log_content, state, gr.update(), ""
except Exception as e:
log_content += f"❌ {t_title} failed: {e}\n\n"
yield log_content, state, gr.update(), ""
log_content += "### ✨ Synthesis: Generating Final Report...\n"
yield log_content, state, gr.update(), ""
sys_prompt = COORDINATOR_DIRECTION.format(
user_query=state.get("final_topic", ""),
research_plan=state.get("research_plan", ""),
subtasks_json=json.dumps(subtasks, indent=2)
)
user_prompt = f"Synthesize these findings:\n\n" + "\n\n".join(current_findings)
try:
response = coordinator_model(messages=[
{"role": "system", "content": sys_prompt},
{"role": "user", "content": user_prompt}
])
final_report = response.content
if "<think>" in final_report and "</think>" in final_report:
final_report = final_report.split("</think>")[-1].strip()
log_content += "### 🖋️ Review: Polishing and Finalizing...\n"
yield log_content, state, gr.update(), ""
reviewer = Reviewer(model_name=REVIEWER_MODEL, hf_key=clean_hf)
polished_report = reviewer.review(final_report)
state["final_report"] = polished_report
os.makedirs("temp_outputs", exist_ok=True)
ts = int(time.time())
pdf_name = f"research_{ts}.pdf"
pdf_path = os.path.join("temp_outputs", pdf_name)
if reviewer.generate_pdf(polished_report, pdf_path):
state["pdf_path"] = pdf_path
md_name = f"research_{ts}.md"
md_path = os.path.join("temp_outputs", md_name)
with open(md_path, "w", encoding="utf-8") as f:
f.write(polished_report)
state["md_path"] = md_path
yield log_content + "✅ Research mission accomplished!", state, gr.update(visible=True), polished_report
except Exception as e:
yield log_content + f"❌ Synthesis failed: {e}", state, gr.update(), ""
def reset_all():
return (
gr.update(value="", visible=True),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
gr.update(visible=False),
{},
"### 🧬 Status\nReady to research.",
""
)
with gr.Blocks(theme=theme, title="Deep Research Agent") as demo:
state = gr.State({})
gr.Markdown("# 🧬 Deep Research Agent")
gr.Markdown("### The ultimate AI research pipeline that browses the web for you.")
with gr.Row():
with gr.Column(scale=1, variant="panel"):
gr.Markdown("## ⚙️ Configuration")
hf_key_input = gr.Textbox(label="Hugging Face Token", type="password", value=HF_KEY)
tavily_key_input = gr.Textbox(label="Tavily API Key", type="password", value=TAVILY_API_KEY)
reset_btn = gr.Button("🔄 New Research", variant="secondary")
gr.Markdown("---")
log_output = gr.Markdown("### 🧬 Status\nReady to research.")
with gr.Column(scale=4):
# STEP 1: Introduction
with gr.Column(visible=True) as step1_col:
gr.Markdown("## 1️⃣ What are you researching?")
topic_input = gr.Textbox(label="Enter a broad topic or research question:", placeholder="e.g., Target market for sustainable polymers in Europe")
start_btn = gr.Button("Clarify Topic ➡️", variant="primary")
# STEP 2: Refinement
with gr.Column(visible=False) as step2_col:
gr.Markdown("## 2️⃣ Refine Your Topic")
suggestion_display = gr.Markdown()
with gr.Row():
opt_index = gr.Dropdown(choices=["1", "2", "3"], label="Select Option (Optional)")
custom_topic_input = gr.Textbox(label="Or type a custom refined topic:")
refine_btn = gr.Button("Set Strategic Topic 🎯", variant="primary")
# STEP 3: Planning & Splitting
with gr.Column(visible=False) as step3_col:
gr.Markdown("## 3️⃣ Strategy & Task Splitting")
target_display = gr.Markdown()
strat_btn = gr.Button("Generate Strategy 📋", variant="primary")
strategy_display = gr.Markdown()
split_btn = gr.Button("Decompose into Subtasks 🧩", variant="primary", visible=False)
tasks_display = gr.Markdown()
execute_btn = gr.Button("🚀 Launch Research Agents", variant="primary", visible=False)
# STEP 4: Execution
with gr.Column(visible=False) as step4_col:
gr.Markdown("## 4️⃣ Agentic Research in Progress")
gr.Markdown("Monitoring agent fleet... check the status panel on the left.")
# STEP 5: Results
with gr.Column(visible=False) as step5_col:
gr.Markdown("## 🏁 Final Research Report")
final_md_display = gr.Markdown()
with gr.Row():
download_md = gr.File(label="Download Markdown")
download_pdf = gr.File(label="Download PDF")
new_research_btn = gr.Button("Start Over", variant="primary")
# --- Callbacks ---
start_btn.click(
start_clarification,
inputs=[topic_input, hf_key_input, state],
outputs=[step2_col, suggestion_display, state, step1_col]
)
refine_btn.click(
select_suggestion,
inputs=[opt_index, custom_topic_input, state],
outputs=[step3_col, target_display, state, step2_col]
)
strat_btn.click(
generate_strategy,
inputs=[hf_key_input, state],
outputs=[strategy_display, state, split_btn]
)
split_btn.click(
decompose_tasks,
inputs=[hf_key_input, state],
outputs=[tasks_display, state, execute_btn]
)
execute_btn.click(
lambda: gr.update(visible=True), outputs=step4_col
).then(
run_research,
inputs=[hf_key_input, tavily_key_input, state],
outputs=[log_output, state, step5_col, final_md_display]
).then(
lambda s: (s.get("md_path"), s.get("pdf_path")),
inputs=[state],
outputs=[download_md, download_pdf]
)
reset_btn.click(
reset_all,
outputs=[topic_input, step2_col, step3_col, step4_col, step5_col, state, log_output, final_md_display]
)
new_research_btn.click(
reset_all,
outputs=[topic_input, step2_col, step3_col, step4_col, step5_col, state, log_output, final_md_display]
)
if __name__ == "__main__":
demo.queue()
demo.launch()