Spaces:
Build error
Build error
| import gradio as gr | |
| import time | |
| import base64 | |
| from datetime import datetime | |
| import argparse | |
| import os | |
| from agent import BrowserAgent | |
| from computers import BrowserbaseComputer, PlaywrightComputer | |
| import base64 | |
| import sys | |
| sys.path.append(os.path.dirname(__file__)) | |
| from prompt import prompt_options | |
| PLAYWRIGHT_SCREEN_SIZE = (1440, 900) | |
| os.environ["PLAYWRIGHT_HEADLESS"] = "false" | |
| def run_genflow(query:str, prompt:str = ""): | |
| env = PlaywrightComputer( | |
| screen_size=PLAYWRIGHT_SCREEN_SIZE, | |
| initial_url="https://wenku.baidu.com/ndcore/browse/aiunion?fr=options_AIcard_1&_wkts_=1761290807747&bdQuery=genflow&t=1761290807744&tabType=genflow&aiCreat=genflow", | |
| # initial_url="https://www.doubao.com/chat/", | |
| highlight_mouse=True, # 如果指定,代理将尝试在屏幕截图中突出显示鼠标光标的位置。这对于可视化调试很有用。 | |
| ) | |
| # query="在提示文本为'输入问题,交给GenFlow的'搜索条中输入‘百度’,点击发送按钮", | |
| prompt = "GenFlow是一个AI聊天机器人。你需要作为测试员,在它的网页底部搜索栏中输入用户提交的问题, 并且观察其输出结果。" + prompt | |
| prompt += """ | |
| GenFlow 返回的内容可能比较长,你可以多次执行'scroll_at'操作来查看网页中它返回的上下文。 | |
| GenFlow 有时会呈现左边主页面,右边预览区域。在这种情况下执行'scroll_at'的时候你要注意鼠标位置。 | |
| 通过反复的滚动,确保主页面和预览区域已经滚动到底。预览区域往往很长,所以你需要多次执行'scroll_at'操作,确保2次滚动操作看到的网页完全一样为止, 给出最终客观评价. | |
| GenFlow 一定会给出'输出结果',请保持足够的耐心! | |
| **无论输入的语言是什么,你需要输出中文**""" | |
| # print(prompt) | |
| with env as browser_computer: | |
| agent = BrowserAgent( | |
| browser_computer=browser_computer, | |
| query=query, | |
| system_prompt=prompt, | |
| model_name='gemini-2.5-computer-use-preview-10-2025', | |
| ) | |
| # reasoning, status, function_responses_list | |
| for step in agent.agent_loop_yield(): | |
| # print(step) | |
| # input("☕️") | |
| yield step | |
| # === 处理用户查询 === | |
| def process_user_query(query, prompt=""): | |
| if not query.strip(): | |
| yield [], "请输入有效的查询", "<p>无执行步骤</p>" | |
| return | |
| steps = [] | |
| for reasoning, status, function_responses_list in run_genflow(query, prompt): | |
| # 每个 function_responses_list 是若干个 ["screenshot", "action", "response"] | |
| sub_steps = [] | |
| for item in function_responses_list: | |
| if len(item) == 3: | |
| screenshot_base64, action, response = item["screenshot"], item["action"], item["response"] | |
| else: | |
| screenshot_base64, action, response = "", "未知操作", "" | |
| sub_steps.append({ | |
| "screenshot": f"data:image/png;base64,{base64.b64encode(screenshot_base64).decode("utf-8")}" if screenshot_base64 else "", | |
| "action": action, | |
| "response": response, | |
| }) | |
| step = { | |
| "step": len(steps) + 1, | |
| "reasoning": reasoning, | |
| "status": status, | |
| "functions": sub_steps, # ✅ 支持多个函数结果 | |
| } | |
| steps.append(step) | |
| display_html = update_steps_display(steps) | |
| yield steps, f"正在执行第 {len(steps)} 步: {status}", display_html | |
| time.sleep(0.3) | |
| yield steps, f"任务完成!共执行 {len(steps)} 步。", update_steps_display(steps) | |
| # === 更新步骤展示的 HTML === | |
| def update_steps_display(steps): | |
| """以HTML格式显示每一步的推理、函数动作及截图""" | |
| if not steps: | |
| return "<p>暂无执行步骤</p>" | |
| html = "<div style='font-family: Arial, sans-serif;'>" | |
| for step in steps: | |
| status_color = { | |
| "COMPLETE": "green", | |
| "CONTINUE": "orange", | |
| "FAILURE": "red", | |
| }.get(step["status"], "black") | |
| html += f""" | |
| <div style='border:1px solid #ddd; margin:10px 0; padding:15px; border-radius:8px;'> | |
| <div style='display:flex; justify-content:space-between; align-items:center;'> | |
| <h3 style='margin:0;'>步骤 {step['step']}</h3> | |
| <span style='color:{status_color}; font-weight:bold;'>{step['status']}</span> | |
| </div> | |
| <p><strong>推理:</strong> {step['reasoning']}</p> | |
| """ | |
| # ✅ 支持多个函数动作展示 | |
| for idx, func in enumerate(step["functions"], start=1): | |
| # print(func) | |
| html += f""" | |
| <div style="word-wrap: break-word; word-break: break-all; white-space: pre-wrap; border:1px solid #ddd; border-radius:6px; padding:8px; margin-top:6px;"> | |
| <p><strong>函数调用 {idx}:</strong></p> | |
| <p>动作: {func['action']}</p> | |
| <p>返回: {func['response']}</p> | |
| """ | |
| if func["screenshot"]: | |
| html += f"<img src='{func['screenshot']}' style='max-width:100%; border:1px solid #ccc; border-radius:6px;'/>" | |
| html += "</div>" | |
| html += "</div>" # 结束step块 | |
| html += "</div>" | |
| return html | |
| def resolve_prompt(selected_value): | |
| """如果选择的是预定义 key,则返回对应 value,否则直接返回用户输入""" | |
| if selected_value in prompt_options: | |
| return prompt_options[selected_value] | |
| return selected_value or "" | |
| # === 创建 Gradio 界面 === | |
| def create_demo_interface(): | |
| with gr.Blocks(title="CUA 在线试用系统", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| <div style="font-family: 'Microsoft YaHei', sans-serif; font-size: 18px; line-height: 1.6;"> | |
| <h1 style="color: #2E86AB; font-size: 28px; font-weight: bold; text-align: center;">🧭 CUA 在线试用系统</h1> | |
| <p style="color: #555; font-size: 16px; text-align: center;"> | |
| 输入一个任务描述,Agent 将自动进行 genflow 中浏览、截图、分析并执行下一步操作。 | |
| </p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| query_input = gr.Textbox( | |
| label="需要在 genflow 中测试的问题", | |
| placeholder="例如:帮我搜索AI新闻 / 购买一台笔记本电脑", | |
| lines=2) | |
| prompt_dropdown = gr.Dropdown( | |
| label="选择或在右侧系统提示词中输入评估标准", | |
| choices=list(prompt_options.keys()), | |
| value=None, | |
| allow_custom_value=False, # ✅ 允许用户手动输入文本 | |
| ) | |
| submit_btn = gr.Button("开始执行", variant="primary") | |
| with gr.Column(scale=2): | |
| status_output = gr.Textbox(label="执行状态", interactive=False) | |
| final_prompt = gr.Textbox(label="系统提示词", interactive=True,lines=5) | |
| # 按钮或自动触发逻辑 | |
| prompt_dropdown.change(resolve_prompt, inputs=prompt_dropdown, outputs=final_prompt) | |
| # ✅ 把详细执行过程放在 JSON 前面 | |
| with gr.Accordion("详细执行过程", open=True): | |
| steps_display = gr.HTML() | |
| # 再放执行步骤 JSON | |
| steps_output = gr.JSON(label="执行步骤详情(JSON)") | |
| # 流式输出绑定 | |
| submit_btn.click( | |
| fn=process_user_query, | |
| inputs=[query_input, final_prompt], | |
| outputs=[steps_output, status_output, steps_display], | |
| api_name="run_agent", | |
| show_progress=True, | |
| queue=True, # 支持流式输出 | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_demo_interface() | |
| demo.queue() # 必须启用 queue 才能支持 yield | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| ) | |