| """ |
| LangGraph Agent Core - StateGraph Definition |
| Author: @mangubee |
| Date: 2026-01-01 |
| |
| Stage 1: Skeleton with placeholder nodes |
| Stage 2: Tool integration (CURRENT) |
| Stage 3: Planning and reasoning logic implementation |
| |
| Based on: |
| - Level 3: Sequential workflow with dynamic planning |
| - Level 4: Goal-based reasoning, coarse-grained generalist |
| - Level 6: LangGraph framework |
| """ |
|
|
| import logging |
| import os |
| from pathlib import Path |
| from typing import TypedDict, List, Optional |
| from langgraph.graph import StateGraph, END |
| from src.config import Settings |
| from src.tools import ( |
| TOOLS, |
| search, |
| parse_file, |
| safe_eval, |
| analyze_image, |
| youtube_transcript, |
| transcribe_audio, |
| ) |
| from src.agent.llm_client import ( |
| plan_question, |
| select_tools_with_function_calling, |
| synthesize_answer, |
| ) |
|
|
| |
| |
| |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
|
|
| def is_vision_question(question: str) -> bool: |
| """ |
| Detect if question requires vision analysis tool. |
| |
| Vision questions typically contain keywords about visual content like images, videos, or YouTube links. |
| |
| Args: |
| question: GAIA question text |
| |
| Returns: |
| True if question likely requires vision tool, False otherwise |
| """ |
| vision_keywords = [ |
| "image", |
| "video", |
| "youtube", |
| "photo", |
| "picture", |
| "watch", |
| "screenshot", |
| "visual", |
| ] |
| return any(keyword in question.lower() for keyword in vision_keywords) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class AgentState(TypedDict): |
| """ |
| State structure for GAIA agent workflow. |
| |
| Tracks question processing from input through planning, execution, to final answer. |
| """ |
|
|
| question: str |
| file_paths: Optional[List[str]] |
| plan: Optional[str] |
| tool_calls: List[dict] |
| tool_results: List[dict] |
| evidence: List[str] |
| answer: Optional[str] |
| errors: List[str] |
|
|
|
|
| |
| |
| |
|
|
|
|
| def validate_environment() -> List[str]: |
| """ |
| Check which API keys are available at startup. |
| |
| Returns: |
| List of missing API key names (empty if all present) |
| """ |
| missing = [] |
| if not os.getenv("GOOGLE_API_KEY"): |
| missing.append("GOOGLE_API_KEY (Gemini)") |
| if not os.getenv("HF_TOKEN"): |
| missing.append("HF_TOKEN (HuggingFace)") |
| if not os.getenv("ANTHROPIC_API_KEY"): |
| missing.append("ANTHROPIC_API_KEY (Claude)") |
| if not os.getenv("TAVILY_API_KEY"): |
| missing.append("TAVILY_API_KEY (Search)") |
| return missing |
|
|
|
|
| |
| |
| |
|
|
|
|
| def fallback_tool_selection( |
| question: str, plan: str, file_paths: Optional[List[str]] = None |
| ) -> List[dict]: |
| """ |
| MVP Fallback: Simple keyword-based tool selection when LLM fails. |
| Enhanced to use actual file paths when available. |
| |
| This is a temporary hack to get basic functionality working. |
| Uses simple keyword matching to select tools. |
| |
| Args: |
| question: The user question |
| plan: The execution plan |
| file_paths: Optional list of downloaded file paths |
| |
| Returns: |
| List of tool calls with basic parameters |
| """ |
| logger.info( |
| "[fallback_tool_selection] Using keyword-based fallback for tool selection" |
| ) |
|
|
| tool_calls = [] |
| question_lower = question.lower() |
| plan_lower = plan.lower() |
| combined = f"{question_lower} {plan_lower}" |
|
|
| |
| search_keywords = [ |
| "search", |
| "find", |
| "look up", |
| "who is", |
| "what is", |
| "when", |
| "where", |
| "google", |
| ] |
| if any(keyword in combined for keyword in search_keywords): |
| |
| query = question.split(".")[0] if "." in question else question |
| tool_calls.append({"tool": "web_search", "params": {"query": query}}) |
| logger.info( |
| f"[fallback_tool_selection] Added web_search tool with query: {query}" |
| ) |
|
|
| |
| math_keywords = [ |
| "calculate", |
| "compute", |
| "math", |
| "sum", |
| "multiply", |
| "divide", |
| "+", |
| "-", |
| "*", |
| "/", |
| "=", |
| ] |
| if any(keyword in combined for keyword in math_keywords): |
| |
| import re |
|
|
| |
| expr_match = re.search(r"[\d\s\+\-\*/\(\)\.]+", question) |
| if expr_match: |
| expression = expr_match.group().strip() |
| tool_calls.append( |
| {"tool": "calculator", "params": {"expression": expression}} |
| ) |
| logger.info( |
| f"[fallback_tool_selection] Added calculator tool with expression: {expression}" |
| ) |
|
|
| |
| if file_paths: |
| for file_path in file_paths: |
| |
| file_ext = Path(file_path).suffix.lower() |
| if file_ext in [".png", ".jpg", ".jpeg"]: |
| tool_calls.append( |
| {"tool": "vision", "params": {"image_path": file_path}} |
| ) |
| logger.info( |
| f"[fallback_tool_selection] Added vision tool for image: {file_path}" |
| ) |
| elif file_ext in [ |
| ".pdf", |
| ".xlsx", |
| ".xls", |
| ".csv", |
| ".json", |
| ".txt", |
| ".docx", |
| ".doc", |
| ]: |
| tool_calls.append( |
| {"tool": "parse_file", "params": {"file_path": file_path}} |
| ) |
| logger.info( |
| f"[fallback_tool_selection] Added parse_file tool for: {file_path}" |
| ) |
| else: |
| |
| file_keywords = ["file", "parse", "read", "csv", "json", "txt", "document"] |
| if any(keyword in combined for keyword in file_keywords): |
| logger.warning( |
| "[fallback_tool_selection] File operation detected but no file_paths available" |
| ) |
|
|
| |
| image_keywords = ["image", "picture", "photo", "analyze image", "vision"] |
| if any(keyword in combined for keyword in image_keywords): |
| if file_paths: |
| |
| pass |
| else: |
| logger.warning( |
| "[fallback_tool_selection] Image operation detected but no file_paths available" |
| ) |
|
|
| if not tool_calls: |
| logger.warning( |
| "[fallback_tool_selection] No tools selected by fallback - adding default search" |
| ) |
| |
| tool_calls.append({"tool": "web_search", "params": {"query": question}}) |
|
|
| logger.info( |
| f"[fallback_tool_selection] Fallback selected {len(tool_calls)} tool(s)" |
| ) |
| return tool_calls |
|
|
|
|
| |
| |
| |
|
|
|
|
| def plan_node(state: AgentState) -> AgentState: |
| """ |
| Planning node: Analyze question and generate execution plan. |
| |
| Stage 3: Dynamic planning with LLM |
| - LLM analyzes question and available tools |
| - Generates step-by-step execution plan |
| - Identifies which tools to use and in what order |
| |
| Args: |
| state: Current agent state with question |
| |
| Returns: |
| Updated state with execution plan |
| """ |
| try: |
| plan = plan_question( |
| question=state["question"], |
| available_tools=TOOLS, |
| file_paths=state.get("file_paths"), |
| ) |
| state["plan"] = plan |
| logger.info(f"[plan] ✓ {len(plan)} chars") |
| except Exception as e: |
| logger.error(f"[plan] ✗ {type(e).__name__}: {str(e)}") |
| state["errors"].append(f"Planning error: {type(e).__name__}: {str(e)}") |
| state["plan"] = "Error: Unable to create plan" |
| return state |
|
|
|
|
| def execute_node(state: AgentState) -> AgentState: |
| """Execution node: Execute tools based on plan.""" |
| |
| TOOL_FUNCTIONS = { |
| "web_search": search, |
| "parse_file": parse_file, |
| "calculator": safe_eval, |
| "vision": analyze_image, |
| "youtube_transcript": youtube_transcript, |
| "transcribe_audio": transcribe_audio, |
| } |
|
|
| tool_results = [] |
| evidence = [] |
| tool_calls = [] |
|
|
| try: |
| tool_calls = select_tools_with_function_calling( |
| question=state["question"], |
| plan=state["plan"], |
| available_tools=TOOLS, |
| file_paths=state.get("file_paths"), |
| ) |
|
|
| |
| if not tool_calls: |
| logger.warning("[execute] No tools selected, using fallback") |
| state["errors"].append("Tool selection returned no tools - using fallback") |
| tool_calls = fallback_tool_selection( |
| state["question"], state["plan"], state.get("file_paths") |
| ) |
| elif not isinstance(tool_calls, list): |
| logger.error(f"[execute] Invalid type: {type(tool_calls)}, using fallback") |
| state["errors"].append( |
| f"Tool selection returned invalid type: {type(tool_calls)}" |
| ) |
| tool_calls = fallback_tool_selection( |
| state["question"], state["plan"], state.get("file_paths") |
| ) |
| else: |
| logger.info(f"[execute] {len(tool_calls)} tool(s) selected") |
|
|
| |
| for idx, tool_call in enumerate(tool_calls, 1): |
| tool_name = tool_call["tool"] |
| params = tool_call["params"] |
|
|
| try: |
| tool_func = TOOL_FUNCTIONS.get(tool_name) |
| if not tool_func: |
| raise ValueError(f"Tool '{tool_name}' not found in TOOL_FUNCTIONS") |
|
|
| result = tool_func(**params) |
| logger.info(f"[{idx}/{len(tool_calls)}] {tool_name} ✓") |
|
|
| tool_results.append( |
| { |
| "tool": tool_name, |
| "params": params, |
| "result": result, |
| "status": "success", |
| } |
| ) |
|
|
| |
| if isinstance(result, dict): |
| |
| if "answer" in result: |
| evidence.append(result["answer"]) |
| |
| elif "results" in result: |
| |
| results_list = result.get("results", []) |
| if results_list: |
| |
| formatted = [] |
| for r in results_list[:3]: |
| title = r.get("title", "")[:100] |
| url = r.get("url", "")[:100] |
| snippet = r.get("snippet", "")[:200] |
| formatted.append( |
| f"Title: {title}\nURL: {url}\nSnippet: {snippet}" |
| ) |
| evidence.append("\n\n".join(formatted)) |
| else: |
| evidence.append(str(result)) |
| else: |
| evidence.append(str(result)) |
| elif isinstance(result, str): |
| evidence.append(result) |
| else: |
| evidence.append(str(result)) |
|
|
| except Exception as tool_error: |
| logger.error(f"[execute] ✗ {tool_name}: {tool_error}") |
| tool_results.append( |
| { |
| "tool": tool_name, |
| "params": params, |
| "error": str(tool_error), |
| "status": "failed", |
| } |
| ) |
| if tool_name == "vision" and ( |
| "quota" in str(tool_error).lower() or "429" in str(tool_error) |
| ): |
| state["errors"].append(f"Vision failed: LLM quota exhausted") |
| else: |
| state["errors"].append(f"{tool_name}: {type(tool_error).__name__}") |
|
|
| logger.info(f"[execute] {len(tool_results)} tools, {len(evidence)} evidence") |
|
|
| except Exception as e: |
| logger.error(f"[execute] ✗ {type(e).__name__}: {str(e)}") |
|
|
| if is_vision_question(state["question"]) and ( |
| "quota" in str(e).lower() or "429" in str(e) |
| ): |
| state["errors"].append("Vision unavailable (quota exhausted)") |
| else: |
| state["errors"].append(f"Execution error: {type(e).__name__}") |
|
|
| |
| if not tool_calls: |
| try: |
| tool_calls = fallback_tool_selection( |
| state["question"], state.get("plan", ""), state.get("file_paths") |
| ) |
|
|
| TOOL_FUNCTIONS = { |
| "web_search": search, |
| "parse_file": parse_file, |
| "calculator": safe_eval, |
| "vision": analyze_image, |
| "youtube_transcript": youtube_transcript, |
| "transcribe_audio": transcribe_audio, |
| } |
|
|
| for tool_call in tool_calls: |
| try: |
| tool_name = tool_call["tool"] |
| params = tool_call["params"] |
| tool_func = TOOL_FUNCTIONS.get(tool_name) |
| if tool_func: |
| result = tool_func(**params) |
| tool_results.append( |
| { |
| "tool": tool_name, |
| "params": params, |
| "result": result, |
| "status": "success", |
| } |
| ) |
| if isinstance(result, dict): |
| if "answer" in result: |
| evidence.append(result["answer"]) |
| elif "results" in result: |
| results_list = result.get("results", []) |
| if results_list: |
| formatted = [] |
| for r in results_list[:3]: |
| title = r.get("title", "")[:100] |
| url = r.get("url", "")[:100] |
| snippet = r.get("snippet", "")[:200] |
| formatted.append( |
| f"Title: {title}\nURL: {url}\nSnippet: {snippet}" |
| ) |
| evidence.append("\n\n".join(formatted)) |
| else: |
| evidence.append(str(result)) |
| else: |
| evidence.append(str(result)) |
| elif isinstance(result, str): |
| evidence.append(result) |
| else: |
| evidence.append(str(result)) |
| logger.info(f"[execute] Fallback {tool_name} ✓") |
| except Exception as tool_error: |
| logger.error(f"[execute] Fallback {tool_name} ✗ {tool_error}") |
| except Exception as fallback_error: |
| logger.error(f"[execute] Fallback failed: {fallback_error}") |
|
|
| |
| state["tool_calls"] = tool_calls |
| state["tool_results"] = tool_results |
| state["evidence"] = evidence |
| return state |
|
|
|
|
| def answer_node(state: AgentState) -> AgentState: |
| """Answer synthesis node: Generate final factoid answer from evidence.""" |
| if state["errors"]: |
| logger.warning(f"[answer] Errors: {state['errors']}") |
|
|
| try: |
| if not state["evidence"]: |
| error_summary = ( |
| "; ".join(state["errors"]) if state["errors"] else "No errors logged" |
| ) |
| state["answer"] = f"ERROR: No evidence. {error_summary}" |
| logger.error(f"[answer] ✗ No evidence - {error_summary}") |
| return state |
|
|
| answer = synthesize_answer( |
| question=state["question"], evidence=state["evidence"] |
| ) |
| state["answer"] = answer |
| logger.info(f"[answer] ✓ {answer}") |
|
|
| except Exception as e: |
| logger.error(f"[answer] ✗ {type(e).__name__}: {str(e)}") |
| state["errors"].append(f"Answer synthesis error: {type(e).__name__}: {str(e)}") |
| state["answer"] = ( |
| f"ERROR: Answer synthesis failed - {type(e).__name__}: {str(e)}" |
| ) |
|
|
| return state |
|
|
|
|
| |
| |
| |
|
|
|
|
| def create_gaia_graph() -> StateGraph: |
| """ |
| Create LangGraph StateGraph for GAIA agent. |
| |
| Implements sequential workflow (Level 3 decision): |
| question → plan → execute → answer |
| |
| Returns: |
| Compiled StateGraph ready for execution |
| """ |
| settings = Settings() |
|
|
| |
| graph = StateGraph(AgentState) |
|
|
| |
| graph.add_node("plan", plan_node) |
| graph.add_node("execute", execute_node) |
| graph.add_node("answer", answer_node) |
|
|
| |
| graph.set_entry_point("plan") |
| graph.add_edge("plan", "execute") |
| graph.add_edge("execute", "answer") |
| graph.add_edge("answer", END) |
|
|
| |
| compiled_graph = graph.compile() |
|
|
| print("[create_gaia_graph] StateGraph compiled successfully") |
| return compiled_graph |
|
|
|
|
| |
| |
| |
|
|
|
|
| class GAIAAgent: |
| """ |
| GAIA Benchmark Agent - Main interface. |
| |
| Wraps LangGraph StateGraph and provides simple call interface. |
| Compatible with existing BasicAgent interface in app.py. |
| """ |
|
|
| def __init__(self): |
| """Initialize agent and compile StateGraph.""" |
| print("GAIAAgent initializing...") |
|
|
| |
| missing_keys = validate_environment() |
| if missing_keys: |
| warning_msg = f"⚠️ WARNING: Missing API keys: {', '.join(missing_keys)}" |
| print(warning_msg) |
| logger.warning(warning_msg) |
| print( |
| " Agent may fail to answer questions. Set keys in environment variables." |
| ) |
| else: |
| print("✓ All API keys present") |
|
|
| self.graph = create_gaia_graph() |
| self.last_state = None |
| print("GAIAAgent initialized successfully") |
|
|
| def __call__(self, question: str, file_path: Optional[str] = None) -> str: |
| """ |
| Process question and return answer. |
| Supports optional file attachment for file-based questions. |
| |
| Args: |
| question: GAIA question text |
| file_path: Optional path to downloaded file attachment |
| |
| Returns: |
| Factoid answer string |
| """ |
| print(f"GAIAAgent processing question (first 50 chars): {question[:50]}...") |
| if file_path: |
| print(f"GAIAAgent processing file: {file_path}") |
|
|
| |
| initial_state: AgentState = { |
| "question": question, |
| "file_paths": [file_path] if file_path else None, |
| "plan": None, |
| "tool_calls": [], |
| "tool_results": [], |
| "evidence": [], |
| "answer": None, |
| "errors": [], |
| } |
|
|
| |
| final_state = self.graph.invoke(initial_state) |
|
|
| |
| self.last_state = final_state |
|
|
| |
| answer = final_state.get("answer", "Error: No answer generated") |
| print(f"GAIAAgent returning answer: {answer}") |
|
|
| return answer |
|
|