| import json |
| import os |
| import re |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Set, Tuple |
|
|
| import gradio as gr |
| import requests |
| from bs4 import BeautifulSoup |
| from duckduckgo_search import DDGS |
| from huggingface_hub import InferenceClient |
|
|
|
|
| DEFAULT_FREE_MODELS = [ |
| |
| "Qwen/Qwen3-8B", |
| "google/gemma-3-12b-it", |
| "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B", |
| |
| "Qwen/Qwen2.5-7B-Instruct", |
| "meta-llama/Llama-3.1-8B-Instruct", |
| ] |
| DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", DEFAULT_FREE_MODELS[0]) |
| PAPER_URL = os.getenv("PAPER_URL", "#") |
| CODE_URL = os.getenv("CODE_URL", "#") |
| DATASET_URL = os.getenv("DATASET_URL", "#") |
| MODEL_URL = os.getenv("MODEL_URL", "#") |
|
|
| SYSTEM_PROMPT = """You are a Deep Research assistant. |
| You can think step by step, use tools, and then return a final answer. |
| |
| Tool protocol: |
| - To call a tool, output exactly one block: |
| <tool_call> |
| {"name":"search","arguments":{"query":"...","max_results":5}} |
| </tool_call> |
| or |
| <tool_call> |
| {"name":"visit","arguments":{"url":"...","max_chars":6000}} |
| </tool_call> |
| |
| - When you are done, output: |
| <answer> |
| ...final answer... |
| </answer> |
| |
| Rules: |
| - Use tools when needed, but avoid repeated calls to the same URL/query. |
| - Cite useful URLs in your final answer. |
| - If a tool fails, recover and continue. |
| """ |
|
|
|
|
| TOOL_RESPONSE_TEMPLATE = """<tool_response> |
| {payload} |
| </tool_response>""" |
|
|
| SEARCH_CACHE: Dict[str, Dict[str, Any]] = {} |
| VISIT_CACHE: Dict[str, Dict[str, Any]] = {} |
| LOGO_PATH = str(Path(__file__).resolve().parent / "assets" / "quest-logo.png") |
|
|
| CUSTOM_CSS = """ |
| @import url('https://fonts.googleapis.com/css2?family=Manrope:wght@400;500;600;700&display=swap'); |
| |
| .gradio-container { |
| max-width: 1200px !important; |
| font-family: 'Manrope', 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif !important; |
| background: #ffffff !important; |
| } |
| |
| .gradio-container * { |
| font-family: inherit !important; |
| } |
| |
| .banner-card { |
| padding: 18px 20px; |
| border-radius: 18px; |
| background: linear-gradient(100deg, #f8fbff 0%, #eef5ff 100%); |
| color: #0f172a; |
| border: 1px solid #dbe7ff; |
| box-shadow: 0 8px 24px rgba(15, 23, 42, 0.08); |
| margin-bottom: 14px; |
| } |
| |
| .banner-inner { |
| display: flex; |
| align-items: center; |
| gap: 18px; |
| } |
| |
| .banner-logo-wrap { |
| min-width: 250px; |
| } |
| |
| .banner-logo-image img { |
| width: 100%; |
| max-width: 280px; |
| height: auto; |
| object-fit: contain; |
| border-radius: 10px; |
| background: #ffffff; |
| border: 1px solid #e5e7eb; |
| } |
| |
| .banner-title { |
| font-size: 24px; |
| font-weight: 700; |
| line-height: 1.15; |
| color: #1e3a8a; |
| } |
| |
| .banner-subtitle { |
| margin-top: 6px; |
| opacity: 0.9; |
| font-size: 14px; |
| color: #1f2937; |
| } |
| |
| .section-card { |
| border: none !important; |
| border-radius: 0 !important; |
| padding: 0 !important; |
| background: transparent !important; |
| box-shadow: none !important; |
| } |
| |
| .section-title { |
| font-size: 13px; |
| font-weight: 700; |
| color: #6b7280; |
| margin-bottom: 6px; |
| letter-spacing: 0.02em; |
| } |
| |
| .layout-gap { |
| gap: 12px; |
| } |
| |
| .right-stack > * { |
| margin-bottom: 8px; |
| } |
| |
| .icon-grid { |
| display: grid; |
| grid-template-columns: repeat(2, minmax(0, 1fr)); |
| gap: 8px; |
| } |
| |
| .icon-link { |
| display: flex; |
| align-items: center; |
| justify-content: center; |
| gap: 6px; |
| padding: 7px 8px; |
| border: 1px solid #f1f5f9; |
| border-radius: 10px; |
| text-decoration: none !important; |
| color: #334155 !important; |
| background: #ffffff; |
| font-weight: 600; |
| font-size: 12px; |
| } |
| |
| .icon-link:hover { |
| background: #f8fafc; |
| border-color: #e2e8f0; |
| } |
| |
| .gradio-container .gr-group, |
| .gradio-container .gr-box, |
| .gradio-container .gr-panel { |
| border: none !important; |
| box-shadow: none !important; |
| background: transparent !important; |
| } |
| |
| .gradio-container .tabs { |
| border: none !important; |
| background: transparent !important; |
| } |
| |
| .gradio-container .tabitem { |
| border: none !important; |
| background: transparent !important; |
| } |
| |
| .gradio-container .gr-form, |
| .gradio-container .form { |
| background: #ffffff !important; |
| } |
| |
| .gradio-container textarea, |
| .gradio-container input { |
| background: #ffffff !important; |
| } |
| """ |
|
|
|
|
| @dataclass |
| class AgentState: |
| searched_queries: List[str] = field(default_factory=list) |
| visited_urls: List[str] = field(default_factory=list) |
| searched_query_set: Set[str] = field(default_factory=set) |
| visited_url_set: Set[str] = field(default_factory=set) |
| trusted_notes: List[str] = field(default_factory=list) |
| trace: List[Dict[str, Any]] = field(default_factory=list) |
|
|
|
|
| def extract_answer(text: str) -> Optional[str]: |
| match = re.search(r"<answer>\s*(.*?)\s*</answer>", text, flags=re.DOTALL | re.IGNORECASE) |
| return match.group(1).strip() if match else None |
|
|
|
|
| def parse_tool_call(text: str) -> Tuple[Optional[str], Optional[Dict[str, Any]], Optional[str]]: |
| match = re.search(r"<tool_call>\s*(.*?)\s*</tool_call>", text, flags=re.DOTALL | re.IGNORECASE) |
| if not match: |
| return None, None, None |
| payload = match.group(1).strip() |
| try: |
| data = json.loads(payload) |
| except json.JSONDecodeError: |
| return None, None, "Invalid JSON in <tool_call> block." |
|
|
| name = data.get("name") |
| arguments = data.get("arguments", {}) |
| if not isinstance(name, str) or not isinstance(arguments, dict): |
| return None, None, "Invalid tool format. Expect name(str) and arguments(dict)." |
| return name, arguments, None |
|
|
|
|
| def run_search(query: str, max_results: int = 5) -> Dict[str, Any]: |
| if not query.strip(): |
| return {"ok": False, "error": "Search query cannot be empty."} |
| cache_key = f"{query.strip().lower()}::{max_results}" |
| if cache_key in SEARCH_CACHE: |
| return {**SEARCH_CACHE[cache_key], "cached": True} |
|
|
| rows: List[Dict[str, str]] = [] |
| with DDGS() as ddgs: |
| for item in ddgs.text(query, max_results=max_results): |
| rows.append( |
| { |
| "title": item.get("title", ""), |
| "href": item.get("href", ""), |
| "body": item.get("body", ""), |
| } |
| ) |
| payload = {"ok": True, "query": query, "results": rows, "cached": False} |
| SEARCH_CACHE[cache_key] = payload |
| return payload |
|
|
|
|
| def _clean_html_to_text(html: str, max_chars: int) -> str: |
| soup = BeautifulSoup(html, "html.parser") |
| for tag in soup(["script", "style", "noscript"]): |
| tag.decompose() |
| text = soup.get_text(separator=" ", strip=True) |
| text = re.sub(r"\s+", " ", text) |
| return text[:max_chars] |
|
|
|
|
| def run_visit(url: str, max_chars: int = 6000) -> Dict[str, Any]: |
| if not url.strip(): |
| return {"ok": False, "error": "URL cannot be empty."} |
| cache_key = f"{url.strip()}::{max_chars}" |
| if cache_key in VISIT_CACHE: |
| return {**VISIT_CACHE[cache_key], "cached": True} |
| try: |
| resp = requests.get( |
| url, |
| timeout=20, |
| headers={"User-Agent": "Mozilla/5.0 (compatible; DeepResearchSpace/1.0)"}, |
| ) |
| resp.raise_for_status() |
| content_type = resp.headers.get("content-type", "") |
| if "text/html" in content_type or "<html" in resp.text[:200].lower(): |
| text = _clean_html_to_text(resp.text, max_chars=max_chars) |
| else: |
| text = resp.text[:max_chars] |
| payload = {"ok": True, "url": url, "content": text, "cached": False} |
| VISIT_CACHE[cache_key] = payload |
| return payload |
| except Exception as exc: |
| return {"ok": False, "url": url, "error": str(exc)} |
|
|
|
|
| def call_model( |
| client: InferenceClient, |
| messages: List[Dict[str, str]], |
| preferred_model: str, |
| candidate_models: List[str], |
| temperature: float, |
| max_new_tokens: int, |
| ) -> Tuple[str, str]: |
| model_order: List[str] = [] |
| for m in [preferred_model] + candidate_models: |
| if m and m not in model_order: |
| model_order.append(m) |
|
|
| last_error = None |
| for model_name in model_order: |
| try: |
| completion = client.chat_completion( |
| model=model_name, |
| messages=messages, |
| temperature=temperature, |
| max_tokens=max_new_tokens, |
| ) |
| return completion.choices[0].message.content or "", model_name |
| except Exception as exc: |
| last_error = exc |
| continue |
| raise RuntimeError(f"All model candidates failed. Last error: {last_error}") |
|
|
|
|
| def build_research_agent( |
| question: str, |
| model: str, |
| max_turns: int, |
| max_search_results: int, |
| temperature: float, |
| ) -> Tuple[str, str]: |
| token = os.getenv("HF_TOKEN") |
| client = InferenceClient(token=token) |
| state = AgentState() |
| used_model = model |
| recent_model_candidates = [m for m in DEFAULT_FREE_MODELS if m != model] |
|
|
| messages: List[Dict[str, str]] = [ |
| {"role": "system", "content": SYSTEM_PROMPT}, |
| {"role": "user", "content": question}, |
| ] |
|
|
| final_answer: Optional[str] = None |
|
|
| for turn in range(1, max_turns + 1): |
| if state.trusted_notes and turn > 1 and turn % 3 == 0: |
| summary_lines = "\n".join(f"- {n}" for n in state.trusted_notes[-6:]) |
| messages.append( |
| { |
| "role": "user", |
| "content": f"RESEARCH STATE SUMMARY\n{summary_lines}\nUse this summary to avoid repeating work.", |
| } |
| ) |
|
|
| model_output, used_model = call_model( |
| client=client, |
| messages=messages, |
| preferred_model=model, |
| candidate_models=recent_model_candidates, |
| temperature=temperature, |
| max_new_tokens=1400, |
| ) |
| messages.append({"role": "assistant", "content": model_output}) |
| state.trace.append({"turn": turn, "assistant": model_output}) |
|
|
| extracted_answer = extract_answer(model_output) |
| if extracted_answer: |
| final_answer = extracted_answer |
| break |
|
|
| tool_name, tool_args, tool_err = parse_tool_call(model_output) |
| if tool_err: |
| tool_response = {"ok": False, "error": tool_err} |
| elif not tool_name: |
| |
| messages.append( |
| { |
| "role": "user", |
| "content": "No tool call detected. Provide your best final answer in <answer>...</answer> now.", |
| } |
| ) |
| continue |
| else: |
| if tool_name == "search": |
| query = str(tool_args.get("query", "")).strip() |
| max_results = int(tool_args.get("max_results", max_search_results)) |
| max_results = max(1, min(max_results, 10)) |
| if query in state.searched_query_set: |
| tool_response = { |
| "ok": True, |
| "query": query, |
| "cached": True, |
| "note": "This query was already searched. Reusing cached result to avoid duplicate work.", |
| "results": [], |
| } |
| else: |
| state.searched_queries.append(query) |
| state.searched_query_set.add(query) |
| tool_response = run_search(query=query, max_results=max_results) |
| if tool_response.get("ok"): |
| first_titles = [r.get("title", "") for r in tool_response.get("results", [])[:2]] |
| if first_titles: |
| state.trusted_notes.append( |
| f"Searched '{query}' and found leads: {', '.join(t for t in first_titles if t)}" |
| ) |
| elif tool_name == "visit": |
| url = str(tool_args.get("url", "")).strip() |
| max_chars = int(tool_args.get("max_chars", 6000)) |
| max_chars = max(500, min(max_chars, 20000)) |
| if url in state.visited_url_set: |
| tool_response = { |
| "ok": True, |
| "url": url, |
| "cached": True, |
| "note": "This URL was already visited. Reusing cached result to avoid duplicate work.", |
| } |
| else: |
| state.visited_urls.append(url) |
| state.visited_url_set.add(url) |
| tool_response = run_visit(url=url, max_chars=max_chars) |
| if tool_response.get("ok"): |
| snippet = str(tool_response.get("content", ""))[:180] |
| if snippet: |
| state.trusted_notes.append( |
| f"Visited {url} and extracted key context: {snippet}" |
| ) |
| else: |
| tool_response = {"ok": False, "error": f"Unknown tool: {tool_name}"} |
|
|
| state.trace.append({"turn": turn, "tool": tool_name, "tool_response": tool_response}) |
| messages.append( |
| { |
| "role": "user", |
| "content": TOOL_RESPONSE_TEMPLATE.format( |
| payload=json.dumps(tool_response, ensure_ascii=False) |
| ), |
| } |
| ) |
|
|
| if final_answer is None: |
| final_answer = ( |
| "I could not finish a complete research answer within the configured turns. " |
| "Try increasing max turns or switching to a stronger model." |
| ) |
|
|
| citations = "\n".join(f"- {url}" for url in sorted(set(state.visited_urls))) |
| final_answer = f"**Model used:** `{used_model}`\n\n{final_answer}" |
| if citations: |
| final_answer = f"{final_answer}\n\n### Visited Sources\n{citations}" |
|
|
| trace_text = json.dumps( |
| { |
| "used_model": used_model, |
| "searched_queries": state.searched_queries, |
| "visited_urls": state.visited_urls, |
| "trusted_notes": state.trusted_notes[-10:], |
| "trace": state.trace, |
| }, |
| ensure_ascii=False, |
| indent=2, |
| ) |
| return final_answer, trace_text |
|
|
|
|
| def run_ui( |
| question: str, |
| model: str, |
| max_turns: int, |
| max_search_results: int, |
| temperature: float, |
| ): |
| if not question.strip(): |
| return "Please input a question.", "{}" |
| if not os.getenv("HF_TOKEN"): |
| warning = ( |
| "HF_TOKEN is not configured in Space Secrets. " |
| "Go to Settings -> Secrets -> add `HF_TOKEN`, then retry." |
| ) |
| return warning, json.dumps({"error": warning}, ensure_ascii=False, indent=2) |
| try: |
| return build_research_agent( |
| question=question, |
| model=model, |
| max_turns=max_turns, |
| max_search_results=max_search_results, |
| temperature=temperature, |
| ) |
| except Exception as exc: |
| return f"Error: {exc}", json.dumps({"error": str(exc)}, ensure_ascii=False, indent=2) |
|
|
|
|
| with gr.Blocks( |
| title="DeepResearch Space Starter", |
| theme=gr.themes.Default( |
| text_size="md", |
| radius_size="md", |
| spacing_size="md", |
| ), |
| css=CUSTOM_CSS, |
| ) as demo: |
| with gr.Row(elem_classes="layout-gap"): |
| with gr.Column(scale=7): |
| with gr.Group(elem_classes="section-card"): |
| gr.HTML('<div class="section-title">Chat</div>') |
| question = gr.Textbox( |
| show_label=False, |
| placeholder="Ask anything you want to research...", |
| lines=6, |
| ) |
| with gr.Row(): |
| run_btn = gr.Button("Run Research", variant="primary", size="lg") |
| stop_btn = gr.Button("Stop", variant="stop", size="lg") |
| clear_btn = gr.Button("Clear", variant="secondary", size="lg") |
|
|
| with gr.Group(elem_classes="section-card"): |
| with gr.Tabs(): |
| with gr.TabItem("Result"): |
| answer = gr.Markdown(label="Final Answer") |
| with gr.TabItem("Record"): |
| trace = gr.Code(label="Execution Trace (JSON)", language="json") |
|
|
| with gr.Column(scale=3, elem_classes="right-stack"): |
| with gr.Group(elem_classes="section-card"): |
| gr.Image( |
| value=LOGO_PATH, |
| show_label=False, |
| container=False, |
| interactive=False, |
| show_download_button=False, |
| show_fullscreen_button=False, |
| elem_classes="banner-logo-image", |
| ) |
|
|
| with gr.Group(elem_classes="section-card"): |
| gr.HTML( |
| f""" |
| <div class="icon-grid"> |
| <a class="icon-link" href="{PAPER_URL}" target="_blank" rel="noopener noreferrer">๐ Paper</a> |
| <a class="icon-link" href="{CODE_URL}" target="_blank" rel="noopener noreferrer">๐ป Code</a> |
| <a class="icon-link" href="{DATASET_URL}" target="_blank" rel="noopener noreferrer">๐๏ธ Dataset</a> |
| <a class="icon-link" href="{MODEL_URL}" target="_blank" rel="noopener noreferrer">๐ง Model</a> |
| </div> |
| """ |
| ) |
|
|
| with gr.Group(elem_classes="section-card"): |
| gr.HTML('<div class="section-title">Settings</div>') |
| model = gr.Dropdown( |
| label="Model", |
| choices=DEFAULT_FREE_MODELS, |
| value=DEFAULT_MODEL if DEFAULT_MODEL in DEFAULT_FREE_MODELS else DEFAULT_FREE_MODELS[0], |
| allow_custom_value=True, |
| info="You can type any model id supported by HF Inference API.", |
| ) |
| max_turns = gr.Slider(label="Max Turns", minimum=2, maximum=20, value=8, step=1) |
| max_search_results = gr.Slider( |
| label="Search Results Per Query", minimum=1, maximum=10, value=5, step=1 |
| ) |
| temperature = gr.Slider( |
| label="Temperature", minimum=0.0, maximum=1.5, value=0.4, step=0.1 |
| ) |
|
|
| with gr.Group(elem_classes="section-card"): |
| gr.HTML('<div class="section-title">Recommended Dialogues</div>') |
| gr.Examples( |
| examples=[ |
| ["Compare RAG and fine-tuning: trade-offs, cost, and when to use each."], |
| ["Summarize the differences between Qwen2.5, Llama 3.1, and Mistral 7B for agent tasks."], |
| ["What are the key design patterns for long-context research agents?"], |
| ], |
| inputs=question, |
| label="", |
| ) |
|
|
| run_event = run_btn.click( |
| fn=run_ui, |
| inputs=[question, model, max_turns, max_search_results, temperature], |
| outputs=[answer, trace], |
| ) |
| stop_btn.click(fn=None, cancels=[run_event]) |
| clear_btn.click( |
| fn=lambda: ("", "", "{}"), |
| inputs=[], |
| outputs=[question, answer, trace], |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|