| |
| |
| """ |
| LoongFlow HuggingFace Spaces Demo |
| 展示 PEES (Plan-Execute-Execute-Summary) 进化式 Agent 工作流程 |
| """ |
|
|
| import gradio as gr |
| import pandas as pd |
| import time |
| import random |
| from typing import List, Dict, Any, Tuple |
|
|
| |
| |
| |
|
|
| def simulate_planner(task: str) -> Dict[str, Any]: |
| """模拟 Planner 阶段 - 制定战略计划""" |
| time.sleep(0.3) |
| |
| strategies = [ |
| "我将采用分治策略,把任务分解为多个子问题分别解决。", |
| "首先进行需求分析,然后设计系统架构,最后逐步实现。", |
| "使用迭代式开发,从最小可行产品开始,逐步添加功能。", |
| "采用自顶向下的方法,先定义接口,再实现具体逻辑。", |
| ] |
| |
| return { |
| "role": "Planner", |
| "thought": random.choice(strategies), |
| "plan": f""" |
| ## 任务分析 |
| - 用户需求: {task} |
| |
| ## 战略规划 |
| 1. 理解任务本质和目标 |
| 2. 设计整体架构方案 |
| 3. 制定分步实施计划 |
| 4. 预留扩展和优化空间 |
| """.strip(), |
| "timestamp": time.strftime("%H:%M:%S") |
| } |
|
|
|
|
| def simulate_executor(task: str, plan: str) -> Dict[str, Any]: |
| """模拟第一个 Execute 阶段 - 实现代码""" |
| time.sleep(0.5) |
| |
| code_samples = { |
| "todo": '''```python |
| # Todo List App - 实现 |
| class TodoList: |
| def __init__(self): |
| self.tasks = [] |
| |
| def add_task(self, title, priority="medium"): |
| task = { |
| "id": len(self.tasks) + 1, |
| "title": title, |
| "priority": priority, |
| "done": False, |
| "created_at": datetime.now() |
| } |
| self.tasks.append(task) |
| return task |
| |
| def complete_task(self, task_id): |
| for task in self.tasks: |
| if task["id"] == task_id: |
| task["done"] = True |
| return True |
| return False |
| |
| def get_pending(self): |
| return [t for t in self.tasks if not t["done"]] |
| ```''', |
| "file": '''```python |
| # File Processor - 实现 |
| import os |
| import shutil |
| from pathlib import Path |
| |
| class FileProcessor: |
| def __init__(self, input_dir, output_dir): |
| self.input_dir = Path(input_dir) |
| self.output_dir = Path(output_dir) |
| |
| def process_all(self): |
| results = [] |
| for filepath in self.input_dir.rglob("*"): |
| if filepath.is_file(): |
| dest = self.output_dir / filepath.relative_to(self.input_dir) |
| dest.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(filepath, dest) |
| results.append({"file": str(filepath), "status": "copied"}) |
| return results |
| ```''', |
| "default": '''```python |
| # Solution Implementation - 实现 |
| class Solution: |
| def __init__(self, task): |
| self.task = task |
| self.components = {} |
| |
| def analyze(self): |
| """分析任务需求""" |
| return {"requirements": "...", "constraints": "..."} |
| |
| def design(self): |
| """设计解决方案""" |
| return {"architecture": "...", "flow": "..."} |
| |
| def implement(self): |
| """实现代码""" |
| return {"code": "...", "tests": "..."} |
| |
| def run(self): |
| return self.implement() |
| ```''' |
| } |
| |
| code = code_samples.get("default") |
| for key, c in code_samples.items(): |
| if key in task.lower(): |
| code = c |
| break |
| |
| return { |
| "role": "Executor", |
| "action": "编写并执行实现代码", |
| "code": code, |
| "result": "代码实现完成", |
| "timestamp": time.strftime("%H:%M:%S") |
| } |
|
|
|
|
| def simulate_executor2(task: str, previous_result: str) -> Dict[str, Any]: |
| """模拟第二个 Execute 阶段 - 验证测试""" |
| time.sleep(0.4) |
| |
| test_samples = { |
| "todo": '''```python |
| # 测试用例 |
| def test_todo_list(): |
| todo = TodoList() |
| |
| # 测试添加任务 |
| task = todo.add_task("完成报告", "high") |
| assert task["title"] == "完成报告" |
| assert task["priority"] == "high" |
| |
| # 测试完成任务 |
| todo.complete_task(task["id"]) |
| assert task["done"] == True |
| |
| # 测试获取待办 |
| pending = todo.get_pending() |
| assert len(pending) == 0 |
| |
| print("所有测试通过!") |
| ```''', |
| "file": '''```python |
| # 测试用例 |
| def test_file_processor(): |
| processor = FileProcessor("input", "output") |
| |
| # 创建测试文件 |
| os.makedirs("input", exist_ok=True) |
| with open("input/test.txt", "w") as f: |
| f.write("test") |
| |
| # 执行处理 |
| results = processor.process_all() |
| |
| # 验证结果 |
| assert os.path.exists("output/test.txt") |
| assert len(results) == 1 |
| |
| print("所有测试通过!") |
| ```''', |
| "default": '''```python |
| # 验证测试 |
| def test_solution(): |
| solution = Solution("task") |
| |
| # 测试各个组件 |
| analysis = solution.analyze() |
| assert analysis is not None |
| |
| design = solution.design() |
| assert design is not None |
| |
| result = solution.run() |
| assert result is not None |
| |
| print("所有测试通过!") |
| ```''' |
| } |
| |
| test_code = test_samples.get("default") |
| for key, c in test_samples.items(): |
| if key in task.lower(): |
| test_code = c |
| break |
| |
| return { |
| "role": "Executor2", |
| "action": "编写并运行测试用例", |
| "code": test_code, |
| "result": "测试执行完成", |
| "timestamp": time.strftime("%H:%M:%S") |
| } |
|
|
|
|
| def simulate_summary(iteration: int, score: float, target: float) -> Dict[str, Any]: |
| """模拟 Summary 阶段的反思过程""" |
| time.sleep(0.3) |
| |
| reflections_positive = [ |
| "本次迭代成功实现了核心功能,分数有明显提升。", |
| "代码结构良好,解决方案更优雅。", |
| "测试覆盖完整,边界情况处理得当。", |
| "验证通过,性能达到预期。", |
| ] |
| |
| reflections_negative = [ |
| "本次迭代遇到一些问题,分数略有下降。", |
| "实现方案有缺陷,需要重新调整。", |
| "某些边界情况未处理好,导致扣分。", |
| "测试未完全通过,需要修复。", |
| ] |
| |
| improvements_positive = [ |
| "继续保持当前良好的实现方式", |
| "建议扩展更多功能", |
| "可以尝试更多边界情况", |
| ] |
| |
| improvements_negative = [ |
| "需要修复实现的bug", |
| "建议优化代码结构", |
| "需要添加更多的错误处理", |
| "考虑性能优化", |
| ] |
| |
| |
| |
| |
| |
| gap = target - score |
| |
| if gap > 0.3: |
| |
| base_gain = random.uniform(0.18, 0.28) |
| new_score = score + base_gain |
| elif gap > 0.1: |
| |
| base_gain = gap * random.uniform(0.5, 0.7) |
| oscillation = random.uniform(-0.05, 0.05) |
| new_score = score + base_gain + oscillation |
| else: |
| |
| |
| new_score = target + random.uniform(0.02, 0.08) |
| |
| |
| new_score = max(0.15, min(1.0, new_score)) |
| |
| if new_score >= score: |
| reflection = random.choice(reflections_positive) |
| improvement = random.choice(improvements_positive) |
| else: |
| reflection = random.choice(reflections_negative) |
| improvement = random.choice(improvements_negative) |
| |
| return { |
| "role": "Summary", |
| "reflection": reflection, |
| "improvement": improvement, |
| "score": new_score, |
| "timestamp": time.strftime("%H:%M:%S") |
| } |
|
|
|
|
| def run_pees_iteration(task: str, iteration: int, current_score: float, target: float) -> Tuple[List[Dict[str, Any]], float]: |
| """运行一次完整的 PEES 迭代""" |
| results = [] |
| |
| |
| planner_result = simulate_planner(task) |
| results.append({ |
| "phase": "Plan", |
| "phase_name": "计划", |
| "content": planner_result["thought"], |
| "detail": planner_result["plan"], |
| "timestamp": planner_result["timestamp"] |
| }) |
| |
| |
| executor_result = simulate_executor(task, planner_result["plan"]) |
| results.append({ |
| "phase": "Execute", |
| "phase_name": "执行", |
| "content": executor_result["action"], |
| "detail": f"{executor_result['code']}\n\n执行结果: {executor_result['result']}", |
| "timestamp": executor_result["timestamp"] |
| }) |
| |
| |
| executor2_result = simulate_executor2(task, executor_result["result"]) |
| results.append({ |
| "phase": "Evaluate", |
| "phase_name": "验证", |
| "content": executor2_result["action"], |
| "detail": f"{executor2_result['code']}\n\n验证结果: {executor2_result['result']}", |
| "timestamp": executor2_result["timestamp"] |
| }) |
| |
| |
| summary_result = simulate_summary(iteration, current_score, target) |
| results.append({ |
| "phase": "Summary", |
| "phase_name": "总结", |
| "content": summary_result["reflection"], |
| "detail": f"改进建议: {summary_result['improvement']}\n\n当前分数: {summary_result['score']:.2f}", |
| "timestamp": summary_result["timestamp"] |
| }) |
| |
| return results, summary_result["score"] |
|
|
|
|
| |
| |
| |
|
|
| def create_demo(): |
| """创建 Gradio 界面""" |
| |
| with gr.Blocks(title="LoongFlow PEES Demo", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # LoongFlow PEES Agent Demo |
| |
| **LoongFlow** 是一个进化式 Agent 开发框架,采用 **PEES (Plan-Execute-Evaluate-Summary)** 思考范式。 |
| |
| --- |
| |
| ### PEES 工作流程 |
| |
| ``` |
| ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ |
| │ Plan │ → │ Execute │ → │ Evaluate │ → │ Summary │ |
| │ 计划 │ │ 执行 │ │ 验证 │ │ 总结 │ |
| └─────────┘ └─────────┘ └─────────┘ └─────────┘ |
| │ │ |
| │ ◀──── 迭代改进 ────│ |
| │ |
| ┌─────────┐ |
| │ 目标达成 │ |
| └─────────┘ |
| ``` |
| |
| - **Plan (P)**: 分析任务,制定战略计划 |
| - **Execute (E1)**: 编写代码,实现功能 |
| - **Evaluate (E2)**: 编写测试,验证功能 |
| - **Summary (S)**: 反思结果,提取改进建议 |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| task_input = gr.Textbox( |
| label="输入任务描述", |
| placeholder="例如: 帮我写一个待办事项应用 / 创建一个文件处理工具", |
| lines=3 |
| ) |
| |
| with gr.Row(): |
| max_iterations = gr.Slider( |
| minimum=1, maximum=10, value=5, step=1, |
| label="最大迭代次数" |
| ) |
| target_score = gr.Slider( |
| minimum=0.5, maximum=1.0, value=0.85, step=0.05, |
| label="目标分数" |
| ) |
| |
| run_btn = gr.Button("开始执行任务", variant="primary") |
| |
| with gr.Column(scale=1): |
| status_output = gr.Textbox( |
| label="执行状态", |
| lines=5, |
| interactive=False |
| ) |
| |
| |
| score_display = gr.HTML(label="分数演进") |
| |
| |
| score_list = gr.JSON(label="分数历史", visible=False) |
| |
| gr.Markdown("### 迭代详情") |
| |
| |
| with gr.Tabs(): |
| with gr.Tab("Plan 计划"): |
| plan_output = gr.Markdown("*等待开始...*") |
| with gr.Tab("Execute 执行"): |
| execute1_output = gr.Markdown("*等待开始...*") |
| with gr.Tab("Evaluate 验证"): |
| execute2_output = gr.Markdown("*等待开始...*") |
| with gr.Tab("Summary 总结"): |
| summary_output = gr.Markdown("*等待开始...*") |
| |
| def run_task(task: str, max_iter: int, target: float): |
| if not task or not task.strip(): |
| yield "错误: 请输入任务描述", "", "", "", "", "" |
| return |
| |
| chart_data = [] |
| current_score = 0.0 |
| |
| empty_md = "*等待开始...*" |
| empty_svg = '<svg width="400" height="250"><text x="200" y="130" text-anchor="middle" fill="#999">等待开始...</text></svg>' |
| |
| yield "状态: 准备执行任务...", empty_svg, empty_md, empty_md, empty_md, empty_md |
| |
| for i in range(1, int(max_iter) + 1): |
| |
| results, current_score = run_pees_iteration(task, i, current_score, target) |
| |
| |
| plan_result = results[0] |
| execute1_result = results[1] |
| execute2_result = results[2] |
| summary_result = results[3] |
| |
| |
| plan_md = f"""### 迭代 {i} - Plan 计划 |
| **时间**: {plan_result['timestamp']} |
| |
| {plan_result['content']} |
| |
| <details> |
| <summary>查看计划详情</summary> |
| |
| {plan_result['detail']} |
| |
| </details> |
| """ |
| |
| exec1_md = f"""### 迭代 {i} - Execute 执行 |
| **时间**: {execute1_result['timestamp']} |
| |
| {execute1_result['content']} |
| |
| <details> |
| <summary>查看实现代码</summary> |
| |
| {execute1_result['detail']} |
| |
| </details> |
| """ |
| |
| exec2_md = f"""### 迭代 {i} - Evaluate 验证 |
| **时间**: {execute2_result['timestamp']} |
| |
| {execute2_result['content']} |
| |
| <details> |
| <summary>查看测试代码</summary> |
| |
| {execute2_result['detail']} |
| |
| </details> |
| """ |
| |
| summary_md = f"""### 迭代 {i} - Summary 总结 |
| **时间**: {summary_result['timestamp']} |
| |
| {summary_result['content']} |
| |
| <details> |
| <summary>查看改进建议</summary> |
| |
| {summary_result['detail']} |
| |
| </details> |
| """ |
| |
| |
| chart_data.append({"iteration": i, "score": round(current_score, 2)}) |
| |
| |
| if len(chart_data) == 1: |
| |
| svg = f''' |
| <svg width="400" height="250" style="border:1px solid #ccc; background:white;"> |
| <text x="200" y="130" text-anchor="middle" fill="#666">分数: {chart_data[0]["score"]:.2f}</text> |
| <circle cx="50" cy="{200 - chart_data[0]["score"]*180}" r="8" fill="#22c55e"/> |
| </svg> |
| ''' |
| else: |
| |
| width = 400 |
| height = 250 |
| padding = 40 |
| plot_width = width - padding * 2 |
| plot_height = height - padding * 2 |
| |
| |
| points_svg = "" |
| lines_svg = "" |
| for idx, item in enumerate(chart_data): |
| x = padding + idx * (plot_width / (len(chart_data) - 1)) |
| y = padding + plot_height - item["score"] * plot_height |
| points_svg += f'<circle cx="{x}" cy="{y}" r="6" fill="#22c55e" stroke="white" stroke-width="2"/>' |
| points_svg += f'<text x="{x}" y="{y-15}" text-anchor="middle" font-size="12" fill="#333">{item["score"]:.2f}</text>' |
| if idx > 0: |
| prev_x = padding + (idx - 1) * (plot_width / (len(chart_data) - 1)) |
| prev_y = padding + plot_height - chart_data[idx-1]["score"] * plot_height |
| lines_svg += f'<line x1="{prev_x}" y1="{prev_y}" x2="{x}" y2="{y}" stroke="#22c55e" stroke-width="3"/>' |
| |
| |
| svg = f''' |
| <svg width="{width}" height="{height}" style="border:1px solid #ccc; background:white; border-radius:8px;"> |
| <!-- Y轴标签 --> |
| <text x="15" y="50" font-size="12" fill="#666">1.0</text> |
| <text x="15" y="{padding + plot_height/2}" font-size="12" fill="#666">0.5</text> |
| <text x="15" y="{height-20}" font-size="12" fill="#666">0.0</text> |
| <!-- X轴标签 --> |
| <text x="{width/2}" y="{height-5}" font-size="12" fill="#666" text-anchor="middle">迭代次数</text> |
| <!-- 折线 --> |
| {lines_svg} |
| {points_svg} |
| </svg> |
| ''' |
| |
| |
| status = f"状态: 第 {i}/{int(max_iter)} 次迭代完成 (分数: {current_score:.2f})" |
| yield status, svg, plan_md, exec1_md, exec2_md, summary_md |
| |
| |
| if current_score >= target: |
| break |
| |
| time.sleep(0.3) |
| |
| final_status = f"状态: 任务完成\n最终分数: {current_score:.2f}\n总迭代次数: {len(chart_data)}" |
| yield final_status, svg, plan_md, exec1_md, exec2_md, summary_md |
| |
| run_btn.click( |
| fn=run_task, |
| inputs=[task_input, max_iterations, target_score], |
| outputs=[status_output, score_display, plan_output, execute1_output, execute2_output, summary_output] |
| ) |
| |
| gr.Markdown(""" |
| --- |
| |
| ### 关于 LoongFlow |
| |
| LoongFlow 是一个面向复杂任务的进化式 Agent 框架,特别适用于: |
| |
| - **数学推理**: 开放式数学问题求解 |
| - **机器学习**: AutoML 和算法优化 |
| - **代码生成**: 复杂编程任务 |
| - **科学研究**: 实验设计和分析 |
| |
| 了解更多: [GitHub](https://github.com/baidu-baige/LoongFlow) |
| """) |
| |
| return demo |
|
|
|
|
| if __name__ == "__main__": |
| demo = create_demo() |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|