")
]
if len(tool_indices) > keep_tail:
for i in tool_indices[:-keep_tail]:
if messages[i]["content"] != "[hidden]":
messages[i] = {
"role": "user",
"content": "[hidden]",
}
def build_research_agent(
question: str,
model: str,
max_turns: int,
temperature: float,
memory_strategy: str = "condenser",
):
"""Run the ReAct research loop as a generator.
Each `yield` emits a `(markdown_for_answer_panel, json_for_record_panel)`
tuple. Intermediate yields show progress so that Gradio streams the
status lines into the UI as work happens. The last yield contains the
final answer and the final trace.
"""
client, primary_model, fallback_models = _build_client_for_model(model)
# Display label: the real HF repo id is nicer than the TGI shim name.
display_primary = model if (model == QUEST_MODEL_ID) else primary_model
state = AgentState()
used_model = display_primary
status_lines: List[str] = []
def _emit():
"""Yield the current progress snapshot to Gradio."""
return (
_render_progress(status_lines, used_model, question),
_trace_to_json(state, used_model),
)
messages: List[Dict[str, str]] = [
{"role": "system", "content": build_system_prompt()},
{"role": "user", "content": question},
]
final_answer: Optional[str] = None
# `prev_state` holds the JSON returned by the State Summarizer LLM. It is
# refreshed each time the context tokens cross MEMORY_TOKEN_THRESHOLD and
# then injected into the model's next user message as a RESEARCH STATE
# SUMMARY block. Matches inference/react_agent.py + inference/tool_memory.py
# behaviour.
prev_state: Optional[Dict[str, Any]] = None
condenser_runs = 0
status_lines.append("🚀 Starting research agent")
yield _emit()
strategy = _normalize_memory_strategy(memory_strategy)
os.environ["MEMORY_STRATEGY"] = strategy
for turn in range(1, max_turns + 1):
_apply_memory_strategy(messages, strategy, turn)
# Real LLM-based condenser: when tokens cross the threshold, call the
# MEMORY model to produce the structured state JSON, then rebuild the
# context as [system, original_question, RESEARCH_STATE_SUMMARY].
if (
strategy == "condenser"
and (MEMORY_MODEL_NAME or AZURE_OPENAI_DEPLOYMENT)
and MEMORY_API_KEY
and turn > 1
and _messages_token_count(messages) > MEMORY_TOKEN_THRESHOLD
):
status_lines.append(
f"🗜️ turn {turn}: condensing context (tokens > {MEMORY_TOKEN_THRESHOLD})"
)
yield _emit()
events_text = "\n\n".join(
f"[{m.get('role')}] {str(m.get('content',''))[:2000]}"
for m in messages[2:] # skip system + original question
)
new_state = _llm_condense(events_text, prev_state)
if new_state:
prev_state = new_state
condenser_runs += 1
state.trace.append(
{"turn": turn, "condenser_run": condenser_runs, "prev_state": prev_state}
)
# Reset history to system + question + state summary
summary_block = (
"RESEARCH STATE SUMMARY (prev_state)\n"
+ json.dumps(prev_state, ensure_ascii=False, indent=2)
+ "\n\nUse this summary to avoid redundant work and "
"follow `information_state.uncertain.need` for next steps."
)
messages[:] = [messages[0], messages[1], {"role": "user", "content": summary_block}]
status_lines[-1] = (
f"🗜️ turn {turn}: condensed → "
f"{len(prev_state.get('information_state', {}).get('trusted', []))} trusted, "
f"{len(prev_state.get('information_state', {}).get('uncertain', []))} uncertain"
)
yield _emit()
elif (
strategy == "condenser"
and not ((MEMORY_MODEL_NAME or AZURE_OPENAI_DEPLOYMENT) and MEMORY_API_KEY)
and state.trusted_notes
and turn > 1
and turn % 3 == 0
):
# Fallback heuristic when the MEMORY model is not configured.
summary_lines = "\n".join(f"- {n}" for n in state.trusted_notes[-6:])
messages.append(
{
"role": "user",
"content": f"RESEARCH STATE SUMMARY\n{summary_lines}\nUse this summary to avoid repeating work.",
}
)
status_lines.append(f"🧠 turn {turn}: thinking…")
yield _emit()
t0 = time.time()
raw_output, endpoint_model = call_model(
client=client,
messages=messages,
preferred_model=primary_model,
candidate_models=fallback_models,
temperature=temperature,
max_new_tokens=int(os.getenv("QUEST_MAX_NEW_TOKENS", "4096")),
)
dt = time.time() - t0
model_output = raw_output
# Preserve the human-friendly model id for the trace even if the
# endpoint ignores the "model" param and returns the TGI shim name.
used_model = display_primary if endpoint_model == primary_model == QUEST_ENDPOINT_MODEL else endpoint_model
messages.append({"role": "assistant", "content": model_output})
state.trace.append({"turn": turn, "assistant": model_output, "elapsed_s": round(dt, 2)})
status_lines[-1] = f"🧠 turn {turn}: model reply in {dt:.1f}s"
yield _emit()
extracted_answer = extract_answer(model_output)
if extracted_answer:
final_answer = extracted_answer
status_lines.append("✍️ writing final answer")
yield _emit()
break
tool_name, tool_args, tool_err = parse_tool_call(model_output)
if tool_err:
tool_response = {"ok": False, "error": tool_err}
status_lines.append(f"⚠️ turn {turn}: malformed tool call — {tool_err}")
yield _emit()
elif not tool_name:
# No explicit tool call and no final answer: force finalization.
# IMPORTANT: do not write the literal characters `...`
# here. Some models (notably the Qwen3 family that QUEST-35B is
# built on) will echo the template verbatim, which means the
# extracted answer ends up being the three-dot placeholder `...`
# and the user sees an empty-looking result.
messages.append(
{
"role": "user",
"content": (
"You did not call a tool and did not produce a final "
"answer. Please now write your best final answer, "
"wrapped between an opening tag and a "
"closing tag. Put the real answer text "
"between those tags; do not write a literal ellipsis "
"or other placeholder. If the question asks for "
"tabular data, use GitHub-Flavored Markdown pipe "
"tables (`| col1 | col2 |` + `|---|---|`) and put a "
"blank line before the first row so the table renders."
),
}
)
status_lines.append(f"🙃 turn {turn}: model stalled; asking for an answer")
yield _emit()
continue
else:
if tool_name == "search":
raw_query = tool_args.get("query", "")
queries: List[str]
if isinstance(raw_query, list):
queries = [str(q).strip() for q in raw_query if str(q).strip()]
else:
queries = [str(raw_query).strip()] if str(raw_query).strip() else []
max_results = int(tool_args.get("max_results", DEFAULT_MAX_SEARCH_RESULTS))
max_results = max(1, min(max_results, DEFAULT_MAX_SEARCH_RESULTS))
queries_preview = ", ".join(f"`{q}`" for q in queries) or "_(empty)_"
status_lines.append(f"🔍 turn {turn}: searching {queries_preview}")
yield _emit()
per_query: List[Dict[str, Any]] = []
backend_labels: List[str] = []
hits_total = 0
for q in queries:
if q in state.searched_query_set:
per_query.append({
"ok": True,
"query": q,
"cached": True,
"note": "Already searched; reusing cached result.",
"results": [],
})
backend_labels.append("cache")
continue
state.searched_queries.append(q)
state.searched_query_set.add(q)
single = _run_search_single(q, max_results)
per_query.append(single)
backend_labels.append(single.get("backend", "unknown"))
if single.get("ok"):
hits_total += len(single.get("results", []))
first_titles = [r.get("title", "") for r in single.get("results", [])[:2]]
if first_titles:
state.trusted_notes.append(
f"Searched '{q}' and found leads: {', '.join(t for t in first_titles if t)}"
)
else:
status_lines.append(
f"⚠️ search failed on `{q}` via {single.get('backend', 'unknown')}: "
f"{single.get('error', 'no results')}"
)
tool_response = (
per_query[0]
if len(per_query) == 1
else {"ok": True, "queries": queries, "results": per_query}
)
unique_backends = sorted(set(backend_labels))
backend_str = "/".join(unique_backends) if unique_backends else "?"
status_lines.append(
f"✅ turn {turn}: got {hits_total} hit(s) via {backend_str}"
)
yield _emit()
elif tool_name == "visit":
raw_url = tool_args.get("url", "")
urls: List[str]
if isinstance(raw_url, list):
urls = [str(u).strip() for u in raw_url if str(u).strip()]
else:
urls = [str(raw_url).strip()] if str(raw_url).strip() else []
goal = str(tool_args.get("goal", "")).strip()
max_chars = int(tool_args.get("max_chars", 6000))
max_chars = max(500, min(max_chars, 20000))
urls_preview = ", ".join(f"`{u[:60]}`" for u in urls) or "_(empty)_"
status_lines.append(f"🌐 turn {turn}: visiting {urls_preview}")
yield _emit()
per_url: List[Dict[str, Any]] = []
visit_ok = 0
for u in urls:
if u in state.visited_url_set:
per_url.append({
"ok": True,
"url": u,
"cached": True,
"note": "Already visited; reusing cached result.",
})
visit_ok += 1
continue
state.visited_urls.append(u)
state.visited_url_set.add(u)
single = _run_visit_single(u, max_chars, goal)
per_url.append(single)
if single.get("ok"):
visit_ok += 1
snippet = str(single.get("content", ""))[:180]
if snippet:
state.trusted_notes.append(
f"Visited {u} and extracted key context: {snippet}"
)
tool_response = (
per_url[0]
if len(per_url) == 1
else {"ok": True, "goal": goal, "results": per_url}
)
status_lines.append(
f"✅ turn {turn}: read {visit_ok}/{len(urls)} page(s)"
)
yield _emit()
elif tool_name in ("google_scholar", "scholar"):
raw_query = tool_args.get("query", "")
queries: List[str]
if isinstance(raw_query, list):
queries = [str(q).strip() for q in raw_query if str(q).strip()]
else:
queries = [str(raw_query).strip()] if str(raw_query).strip() else []
queries_preview = ", ".join(f"`{q}`" for q in queries) or "_(empty)_"
status_lines.append(f"🎓 turn {turn}: scholar {queries_preview}")
yield _emit()
per_q = [_run_scholar_single(q) for q in queries]
tool_response = (
per_q[0] if len(per_q) == 1 else {"ok": True, "results": per_q}
)
ok_count = sum(1 for r in per_q if r.get("ok"))
status_lines.append(
f"📚 turn {turn}: scholar {ok_count}/{len(per_q)} ok"
)
yield _emit()
else:
tool_response = {"ok": False, "error": f"Unknown tool: {tool_name}"}
status_lines.append(f"⚠️ turn {turn}: unknown tool `{tool_name}`")
yield _emit()
state.trace.append({"turn": turn, "tool": tool_name, "tool_response": tool_response})
messages.append(
{
"role": "user",
"content": TOOL_RESPONSE_TEMPLATE.format(
payload=json.dumps(tool_response, ensure_ascii=False)
),
}
)
if final_answer is None:
final_answer = (
"I could not finish a complete research answer within the configured turns. "
"Try increasing max turns or switching to a stronger model."
)
else:
final_answer = ensure_markdown_table_blank_lines(final_answer)
final_answer = f"**Model used:** `{used_model}`\n\n{final_answer}"
trace_text = _trace_to_json(state, used_model)
yield (final_answer, trace_text)
def run_ui(
question: str,
max_turns: int,
memory_strategy: str,
temperature: float,
):
if not question.strip():
yield "Please input a question.", "{}"
return
if not os.getenv("HF_TOKEN"):
warning = (
"HF_TOKEN is not configured in Space Secrets. "
"Go to Settings -> Secrets -> add `HF_TOKEN`, then retry."
)
yield warning, json.dumps({"error": warning}, ensure_ascii=False, indent=2)
return
if not QUEST_BASE_URL:
warning = (
f"`{QUEST_MODEL_ID}` needs a private HF Inference Endpoint. "
"Create one at https://ui.endpoints.huggingface.co/, then set "
"`QUEST_BASE_URL` in Space Secrets to the endpoint's `/v1/` URL."
)
yield warning, json.dumps({"error": warning}, ensure_ascii=False, indent=2)
return
try:
for partial_answer, partial_trace in build_research_agent(
question=question,
model=QUEST_MODEL_ID,
max_turns=max_turns,
temperature=temperature,
memory_strategy=memory_strategy,
):
yield partial_answer, partial_trace
except Exception as exc:
yield f"Error: {exc}", json.dumps({"error": str(exc)}, ensure_ascii=False, indent=2)
EXAMPLES = [
{
"category": "Multi-hop facts",
"icon": "🎯",
"text": "Who was the first person to walk on the Moon, and which U.S. President set that goal in his famous 1962 “Moon speech”?",
},
{
"category": "Time-varying + multi-hop",
"icon": "📈",
"text": "Who is the current CEO of the company that acquired GitHub in 2018, and what was that company's market capitalization at the close of the most recent quarter?",
},
{
"category": "Multi-constraint",
"icon": "🧩",
"text": "Find a 2-day itinerary in Tokyo under $250 focused on contemporary art museums and vegetarian restaurants, including transit between sites.",
},
{
"category": "Research Report",
"icon": "📚",
"text": "Compare the LLM-safety research approaches of Anthropic, OpenAI, and Google DeepMind over the past 18 months, focusing on alignment techniques and red-teaming methodologies.",
},
]
def _example_label(ex: Dict[str, str]) -> str:
return f"{ex['icon']} {ex['category']} — {ex['text']}"
with gr.Blocks(
title="QUEST · Deep Research by OSU NLP",
theme=APP_THEME,
css=CUSTOM_CSS,
fill_width=True,
) as demo:
# --- Quest-style header (Q mark + title + byline) ---
gr.HTML(
"""
"""
)
# --- Main two-column layout ---
with gr.Row(elem_classes="layout-gap"):
with gr.Column(scale=6, min_width=420):
with gr.Group(elem_classes="section-card"):
gr.HTML(
'Ask the agent
'
'QUEST: What can I research for you?
'
)
question = gr.Textbox(
show_label=False,
placeholder="Ask anything you want to research in depth...",
lines=6,
)
with gr.Row(elem_classes="action-row"):
run_btn = gr.Button("Run Research", variant="primary", size="lg")
stop_btn = gr.Button("Stop", variant="stop", size="lg")
clear_btn = gr.Button("Clear", variant="secondary", size="lg")
with gr.Group(elem_classes="section-card"):
gr.HTML(
'Try examples
'
'QUEST can handle multiple types of queries as shown below.
'
)
with gr.Column(elem_classes="example-buttons"):
example_buttons = [
gr.Button(_example_label(ex), variant="secondary", elem_classes="example-btn")
for ex in EXAMPLES
]
with gr.Group(elem_classes="section-card"):
gr.HTML(
''
'Output'
''
'If you see a connection error, please wait a moment and retry.'
''
'
'
)
with gr.Tabs():
with gr.TabItem("Result"):
answer = gr.Markdown(label="Final Answer")
with gr.TabItem("Record"):
trace = gr.Code(label="Execution Trace (JSON)", language="json")
with gr.Column(scale=4, min_width=340, elem_classes="right-stack"):
with gr.Group(elem_classes="section-card"):
gr.HTML(
f"""
Open release
"""
)
with gr.Group(elem_classes="section-card"):
gr.HTML('Settings
')
gr.Textbox(
label="Model",
value=QUEST_MODEL_ID,
interactive=False,
elem_id="quest-model",
)
memory_strategy = gr.Radio(
label="Memory Strategy",
choices=[
("Condenser (default)", "condenser"),
("Vanilla", "vanilla"),
("Discard-all", "discard_all"),
("Hide-tool-result", "hide_tool_result"),
],
value="condenser",
elem_id="quest-memory-strategy",
)
gr.HTML(
''
'Condenser (default) — when context grows large, a State Summarizer LLM compresses earlier turns into a structured JSON of trusted/untrusted/uncertain claims, visited sources, and prior search queries; the agent continues with that compact state.
'
'Vanilla — memory management disabled; the full conversation history is kept.
'
'Discard-all — when context grows large, the entire message history is reset, restarting the agent from the original question with no accumulated context.
'
'Hide-tool-result — when context grows large, older tool responses are pruned; only the most recent tool result is kept.'
'
'
)
max_turns = gr.Slider(
label="Max Turns",
minimum=2,
maximum=50,
value=15,
step=1,
elem_id="quest-max-turns",
)
temperature = gr.Slider(
label="Temperature",
minimum=0.0,
maximum=1.5,
value=1.0,
step=0.1,
elem_id="quest-temperature",
)
gr.HTML(
"""
"""
)
# IMPORTANT: keep `run_event` pointing at the .click() (the long-running
# generator), not at the chained .then(). stop_btn.cancels=[run_event]
# must target the generator for Stop to actually interrupt it; if we
# capture the .then() result instead, Stop cancels the instant clear
# lambda and the agent keeps running.
run_event = run_btn.click(
fn=run_ui,
inputs=[question, max_turns, memory_strategy, temperature],
outputs=[answer, trace],
)
# .success() (not .then()) so the textbox is cleared ONLY on a clean run.
# If Stop cancels the generator we leave the question intact, so the user
# can tweak it and re-run without retyping.
run_event.success(
fn=lambda: "",
inputs=[],
outputs=[question],
)
for btn, ex in zip(example_buttons, EXAMPLES):
btn.click(
fn=(lambda text=ex["text"]: text),
inputs=[],
outputs=[question],
)
stop_btn.click(fn=None, cancels=[run_event])
clear_btn.click(
fn=lambda: ("", "", "{}"),
inputs=[],
outputs=[question, answer, trace],
)
# The research agent is almost entirely I/O-bound (it waits on the OSC vLLM
# endpoint, Serper, Jina and Azure over HTTP), so many runs can proceed in
# parallel on even a small CPU box. Gradio's default_concurrency_limit is 1,
# which serialises every run_ui call and is what produces the "long queue of
# requests pending" warning. Lift it; the real ceiling is the 8 OSC vLLM
# instances behind the nginx load balancer. Tunable via the QUEST_CONCURRENCY
# Space variable without a code change.
QUEST_CONCURRENCY = int(os.getenv("QUEST_CONCURRENCY", "12"))
QUEST_QUEUE_MAX = int(os.getenv("QUEST_QUEUE_MAX", "80"))
demo.queue(
default_concurrency_limit=QUEST_CONCURRENCY,
max_size=QUEST_QUEUE_MAX,
)
if __name__ == "__main__":
demo.launch(max_threads=max(40, QUEST_CONCURRENCY * 3))