| """Execution Agent - Handles code execution and computational tasks""" |
| from typing import Dict, Any, List |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage |
| from langchain_core.tools import tool |
| from langchain_groq import ChatGroq |
| from code_agent import run_agent |
| from src.tracing import get_langfuse_callback_handler |
|
|
|
|
| @tool |
| def run_python(input: str) -> str: |
| """Execute Python code in a restricted sandbox (code-interpreter). |
| |
| Pass **any** coding or file-manipulation task here and the agent will |
| compute the answer by running Python. The entire standard library is NOT |
| available; heavy networking is disabled. Suitable for: math, data-frames, |
| small file parsing, algorithmic questions. |
| """ |
| return run_agent(input) |
|
|
|
|
| def load_execution_prompt() -> str: |
| """Load the execution prompt from file""" |
| try: |
| with open("./prompts/execution_prompt.txt", "r", encoding="utf-8") as f: |
| return f.read().strip() |
| except FileNotFoundError: |
| return """You are a specialized execution agent. Use the run_python tool to execute code and solve computational problems.""" |
|
|
|
|
| def get_execution_tools() -> List: |
| """Get list of tools available to the execution agent""" |
| return [run_python] |
|
|
|
|
| def execute_tool_calls(tool_calls: list, tools: list) -> list: |
| """Execute tool calls and return results""" |
| tool_messages = [] |
| |
| |
| tool_map = {tool.name: tool for tool in tools} |
| |
| for tool_call in tool_calls: |
| tool_name = tool_call['name'] |
| tool_args = tool_call['args'] |
| tool_call_id = tool_call['id'] |
| |
| if tool_name in tool_map: |
| try: |
| print(f"Execution Agent: Executing {tool_name} with args: {str(tool_args)[:200]}...") |
| result = tool_map[tool_name].invoke(tool_args) |
| tool_messages.append( |
| ToolMessage( |
| content=str(result), |
| tool_call_id=tool_call_id |
| ) |
| ) |
| except Exception as e: |
| print(f"Error executing {tool_name}: {e}") |
| tool_messages.append( |
| ToolMessage( |
| content=f"Error executing {tool_name}: {e}", |
| tool_call_id=tool_call_id |
| ) |
| ) |
| else: |
| tool_messages.append( |
| ToolMessage( |
| content=f"Unknown tool: {tool_name}", |
| tool_call_id=tool_call_id |
| ) |
| ) |
| |
| return tool_messages |
|
|
|
|
| def needs_code_execution(query: str) -> bool: |
| """Heuristic to determine if a query requires code execution""" |
| code_indicators = [ |
| "calculate", "compute", "algorithm", "fibonacci", "math", "data", |
| "programming", "code", "function", "sort", "csv", "json", "pandas", |
| "plot", "graph", "analyze", "process", "file", "manipulation" |
| ] |
| query_lower = query.lower() |
| return any(indicator in query_lower for indicator in code_indicators) |
|
|
|
|
| def execution_agent(state: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Execution agent that handles computational and code execution tasks |
| """ |
| print("Execution Agent: Processing computational request") |
| |
| try: |
| |
| execution_prompt = load_execution_prompt() |
| |
| |
| llm = ChatGroq(model="qwen-qwq-32b", temperature=0.1) |
| tools = get_execution_tools() |
| llm_with_tools = llm.bind_tools(tools) |
| |
| |
| callback_handler = get_langfuse_callback_handler() |
| callbacks = [callback_handler] if callback_handler else [] |
| |
| |
| messages = state.get("messages", []) |
| |
| |
| execution_messages = [SystemMessage(content=execution_prompt)] |
| |
| |
| user_query = None |
| for msg in reversed(messages): |
| if msg.type == "human": |
| user_query = msg.content |
| break |
| |
| |
| if user_query and needs_code_execution(user_query): |
| guidance_msg = HumanMessage( |
| content=f"""Task requiring code execution: {user_query} |
| |
| Please analyze this computational task and use the run_python tool to solve it step by step. |
| Break down complex problems into smaller steps and provide clear explanations.""" |
| ) |
| execution_messages.append(guidance_msg) |
| |
| |
| for msg in messages: |
| if msg.type != "system": |
| execution_messages.append(msg) |
| |
| |
| response = llm_with_tools.invoke(execution_messages, config={"callbacks": callbacks}) |
| |
| |
| if response.tool_calls: |
| print(f"Execution Agent: LLM requested {len(response.tool_calls)} tool calls") |
| |
| |
| tool_messages = execute_tool_calls(response.tool_calls, tools) |
| |
| |
| execution_messages.extend([response] + tool_messages) |
| |
| |
| final_response = llm.invoke(execution_messages, config={"callbacks": callbacks}) |
| |
| return { |
| **state, |
| "messages": execution_messages + [final_response], |
| "agent_response": final_response, |
| "current_step": "verification" |
| } |
| else: |
| |
| return { |
| **state, |
| "messages": execution_messages + [response], |
| "agent_response": response, |
| "current_step": "verification" |
| } |
| |
| except Exception as e: |
| print(f"Execution Agent Error: {e}") |
| error_response = AIMessage(content=f"I encountered an error while processing your computational request: {e}") |
| return { |
| **state, |
| "messages": state.get("messages", []) + [error_response], |
| "agent_response": error_response, |
| "current_step": "verification" |
| } |