| """ |
| LangGraph Code‑Interpreter Agent |
| ================================ |
| A minimal, production‑ready example that wires a Python code‑execution tool into |
| a LangGraph workflow with an *LLM → plan → execute → reflect* loop. |
| |
| Key changes (2025‑06‑20) |
| ----------------------- |
| * **Whitelisted built‑ins** for safer `python_exec`. |
| * **Timeout guard** – aborts if the workflow exceeds a wall‑clock limit (default |
| 30 s, configurable via `LANGGRAPH_TIMEOUT_SEC`). |
| * **Dataclass state** – replaced untyped `Dict[str, Any]` with a typed |
| `@dataclass AgentState` for clearer intent and static‑analysis friendliness. |
| |
| Dependencies |
| ------------ |
| ```bash |
| pip install langgraph langchain openai tiktoken tenacity |
| ``` |
| |
| Set the environment variable `OPENAI_API_KEY` before running. |
| Optionally, you can swap `python_exec` with a sandboxed runner such as `e2b` or |
| `codeinterpreter-api`. |
| """ |
| from __future__ import annotations |
|
|
| import contextlib |
| import io |
| import os |
| import textwrap |
| import time |
| import traceback |
| from dataclasses import dataclass, replace |
| from typing import Any, Optional |
| import re |
| import cv2 |
| import pandas as pd |
|
|
| from langchain_groq import ChatGroq |
| from langchain_core.messages import AIMessage, HumanMessage, SystemMessage |
| from langchain_core.tools import tool |
| from langgraph.graph import END, StateGraph |
|
|
| |
| |
| |
|
|
| MODEL_NAME = os.getenv("LANGGRAPH_MODEL", "qwen-qwq-32b") |
| TIMEOUT_SEC = int(os.getenv("LANGGRAPH_TIMEOUT_SEC", "30")) |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| def _safe_import(name, globals=None, locals=None, fromlist=(), level=0): |
| """Whitelisted __import__ permitting just `cv2` and `pandas`.""" |
| if name in {"cv2", "pandas"}: |
| return __import__(name, globals, locals, fromlist, level) |
| raise ImportError(f"Import of module '{name}' is disabled in this sandbox.") |
|
|
| ALLOWED_BUILTINS: dict[str, Any] = { |
| "print": print, |
| "range": range, |
| "len": len, |
| "abs": abs, |
| "sum": sum, |
| "min": min, |
| "max": max, |
| "open": open, |
| "__import__": _safe_import, |
| } |
|
|
| @tool |
| def python_exec(code: str) -> str: |
| """Execute **Python** inside a restricted namespace and capture STDOUT.""" |
| code = textwrap.dedent(code) |
| exec_globals = { |
| "__builtins__": ALLOWED_BUILTINS, |
| "cv2": cv2, |
| "pd": pd, |
| "pandas": pd, |
| } |
| local_ns: dict[str, Any] = {} |
| stdout = io.StringIO() |
| try: |
| with contextlib.redirect_stdout(stdout): |
| exec(code, exec_globals, local_ns) |
| return stdout.getvalue() or "Code executed successfully, no output." |
| except Exception: |
| return "ERROR:\n" + traceback.format_exc() |
|
|
| |
| |
| |
|
|
| llm = ChatGroq(model=MODEL_NAME, temperature= 0.6) |
|
|
| |
| |
| |
|
|
| @dataclass |
| class AgentState: |
| """Typed state object carried through the graph.""" |
|
|
| input: str |
| start_time: float |
| code: Optional[str] = None |
| exec_result: Optional[str] = None |
| tries: int = 0 |
| done: bool = False |
|
|
|
|
| graph = StateGraph(AgentState) |
|
|
| |
|
|
| def plan_node(state: AgentState) -> AgentState: |
| prompt = [ |
| SystemMessage( |
| content=( |
| "You are an expert Python developer. Given a user request, " |
| "write self‑contained Python code that prints ONLY the final " |
| "answer via `print()`. Always avoid network calls." |
| ) |
| ), |
| HumanMessage(content=state.input), |
| ] |
| code_block = _extract_code(llm(prompt).content) |
| return replace(state, code=code_block) |
|
|
| |
|
|
| def exec_node(state: AgentState) -> AgentState: |
| output = python_exec(state.code or "") |
| return replace(state, exec_result=output) |
|
|
| |
|
|
| def reflect_node(state: AgentState) -> AgentState: |
| if time.time() - state.start_time > TIMEOUT_SEC: |
| return replace( |
| state, |
| done=True, |
| exec_result=f"ERROR:\nTimeout: exceeded {TIMEOUT_SEC}s budget", |
| ) |
|
|
| tries = state.tries + 1 |
| if tries >= 2: |
| return replace(state, done=True, tries=tries) |
|
|
| prompt = [ |
| SystemMessage( |
| content=( |
| "You are an expert Python debugger. Your job is to fix the " |
| "given code so it runs without errors and still answers the " |
| "original question. Return ONLY the corrected code." |
| ) |
| ), |
| HumanMessage(content="Code:\n" + (state.code or "")), |
| AIMessage(content="Error:\n" + (state.exec_result or "")), |
| ] |
| fixed_code = _extract_code(llm(prompt).content) |
| return replace(state, code=fixed_code, tries=tries) |
|
|
| |
|
|
| graph.add_node("plan", plan_node) |
|
|
| graph.add_node("execute", exec_node) |
|
|
| graph.add_node("reflect", reflect_node) |
|
|
| graph.set_entry_point("plan") |
|
|
| graph.add_edge("plan", "execute") |
|
|
|
|
| def needs_fix(state: AgentState) -> bool: |
| return (state.exec_result or "").startswith("ERROR") |
|
|
| graph.add_conditional_edges( |
| "execute", |
| needs_fix, |
| {True: "reflect", False: END}, |
| ) |
|
|
| |
|
|
| def should_continue(state: AgentState) -> bool: |
| """Return True to stop, False to continue executing.""" |
| return state.done |
|
|
| graph.add_conditional_edges( |
| "reflect", |
| should_continue, |
| {True: END, False: "execute"}, |
| ) |
|
|
| agent = graph.compile() |
|
|
| |
| |
| |
|
|
| def run_agent(query: str) -> str: |
| """Run the agent end‑to‑end and return the printed answer (or error).""" |
| init_state = AgentState(input=query, start_time=time.time()) |
| final_state = agent.invoke(init_state) |
| |
| |
| return final_state.get("exec_result", "No result") |
|
|
| |
| |
| |
|
|
| def _extract_code(text: str) -> str: |
| """Return the first code block in *text* or the raw text if none found.""" |
| match = re.search(r"```(?:python|py)?\s*(.*?)```", text, flags=re.S | re.I) |
| return match.group(1).strip() if match else text.strip() |
|
|
| if __name__ == "__main__": |
| import sys |
|
|
| question = ( |
| sys.argv[1] if len(sys.argv) > 1 else "What is the 10th Fibonacci number?" |
| ) |
|
|
| print(run_agent(question)) |
|
|