| """ |
| LLM Client Module - Multi-Provider LLM Integration |
| Author: @mangubee |
| Date: 2026-01-02 |
| |
| Handles all LLM calls for: |
| - Planning (question analysis and execution plan generation) |
| - Tool selection (function calling) |
| - Answer synthesis (factoid answer generation from evidence) |
| - Conflict resolution (evaluating contradictory information) |
| |
| Based on Level 5 decision: Gemini 2.0 Flash (primary/free) + Claude Sonnet 4.5 (fallback/paid) |
| Based on Level 6 decision: LLM function calling for tool selection |
| Pattern: Matches Stage 2 tools (Gemini primary, Claude fallback) |
| """ |
|
|
| import os |
| import logging |
| import time |
| import datetime |
| from pathlib import Path |
| from typing import List, Dict, Optional, Any, Callable |
| from anthropic import Anthropic |
| import google.generativeai as genai |
| from huggingface_hub import InferenceClient |
| from groq import Groq |
|
|
| |
| |
| |
|
|
| |
| CLAUDE_MODEL = "claude-sonnet-4-5-20250929" |
|
|
| |
| GEMINI_MODEL = "gemini-2.0-flash-exp" |
|
|
| |
| HF_MODEL = "openai/gpt-oss-120b:scaleway" |
| |
| |
|
|
| |
| GROQ_MODEL = "openai/gpt-oss-120b" |
| |
|
|
| |
| TEMPERATURE = 0 |
| MAX_TOKENS = 4096 |
|
|
| |
|
|
| |
| |
| |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
|
|
| _SESSION_LOG_FILE = None |
| _SYSTEM_PROMPT_WRITTEN = False |
|
|
|
|
| def get_session_log_file() -> Path: |
| """ |
| Get or create the session log file for LLM synthesis context. |
| |
| Creates a single log file per session (not per question) to avoid polluting |
| the log/ folder with multiple files. All questions append to this one file. |
| |
| Returns: |
| Path: Session log file path |
| """ |
| global _SESSION_LOG_FILE |
|
|
| if _SESSION_LOG_FILE is None: |
| log_dir = Path("_log") |
| log_dir.mkdir(exist_ok=True) |
|
|
| |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
| _SESSION_LOG_FILE = log_dir / f"llm_session_{timestamp}.md" |
|
|
| |
| with open(_SESSION_LOG_FILE, "w", encoding="utf-8") as f: |
| f.write("# LLM Synthesis Session Log\n\n") |
| f.write(f"**Session Start:** {datetime.datetime.now().isoformat()}\n\n") |
|
|
| return _SESSION_LOG_FILE |
|
|
|
|
| def reset_session_log(): |
| """Reset session log file (for testing or new evaluation run).""" |
| global _SESSION_LOG_FILE, _SYSTEM_PROMPT_WRITTEN |
| _SESSION_LOG_FILE = None |
| _SYSTEM_PROMPT_WRITTEN = False |
|
|
|
|
| |
| |
| |
|
|
|
|
| def retry_with_backoff(func: Callable, max_retries: int = 3) -> Any: |
| """ |
| Retry function with exponential backoff on quota errors. |
| |
| Handles: |
| - 429 rate limit errors |
| - Quota exceeded errors |
| - Respects retry_after header if present |
| |
| Args: |
| func: Function to retry (should be a lambda or callable with no args) |
| max_retries: Maximum number of retry attempts (default: 3) |
| |
| Returns: |
| Result of successful function call |
| |
| Raises: |
| Exception: If all retries exhausted or non-quota error encountered |
| """ |
| for attempt in range(max_retries): |
| try: |
| return func() |
| except Exception as e: |
| error_str = str(e).lower() |
|
|
| |
| is_quota_error = ( |
| "429" in str(e) |
| or "quota" in error_str |
| or "rate limit" in error_str |
| or "too many requests" in error_str |
| ) |
|
|
| if is_quota_error and attempt < max_retries - 1: |
| |
| wait_time = 2**attempt |
| logger.warning( |
| f"Quota/rate limit error (attempt {attempt + 1}/{max_retries}): {e}. " |
| f"Retrying in {wait_time}s..." |
| ) |
| time.sleep(wait_time) |
| continue |
|
|
| |
| raise |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _get_provider_function(function_name: str, provider: str) -> Callable: |
| """ |
| Get the provider-specific function for a given operation. |
| |
| Args: |
| function_name: Base function name ("plan_question", "select_tools", "synthesize_answer") |
| provider: Provider name ("gemini", "huggingface", "groq", "claude") |
| |
| Returns: |
| Callable: Provider-specific function |
| |
| Raises: |
| ValueError: If provider is invalid |
| """ |
| |
| function_map = { |
| "plan_question": { |
| "gemini": plan_question_gemini, |
| "huggingface": plan_question_hf, |
| "groq": plan_question_groq, |
| "claude": plan_question_claude, |
| }, |
| "select_tools": { |
| "gemini": select_tools_gemini, |
| "huggingface": select_tools_hf, |
| "groq": select_tools_groq, |
| "claude": select_tools_claude, |
| }, |
| "synthesize_answer": { |
| "gemini": synthesize_answer_gemini, |
| "huggingface": synthesize_answer_hf, |
| "groq": synthesize_answer_groq, |
| "claude": synthesize_answer_claude, |
| }, |
| } |
|
|
| if function_name not in function_map: |
| raise ValueError(f"Unknown function name: {function_name}") |
|
|
| if provider not in function_map[function_name]: |
| raise ValueError( |
| f"Unknown provider: {provider}. Valid options: gemini, huggingface, groq, claude" |
| ) |
|
|
| return function_map[function_name][provider] |
|
|
|
|
| def _call_with_fallback(function_name: str, *args, **kwargs) -> Any: |
| """ |
| Call LLM function with configured provider. |
| |
| NOTE: Fallback mechanism has been archived to reduce complexity. |
| Only the primary provider is used. If it fails, the error is raised directly. |
| |
| Args: |
| function_name: Base function name ("plan_question", "select_tools", "synthesize_answer") |
| *args, **kwargs: Arguments to pass to the provider-specific function |
| |
| Returns: |
| Result from LLM call |
| |
| Raises: |
| Exception: If primary provider fails |
| """ |
| |
| primary_provider = os.getenv("LLM_PROVIDER", "gemini").lower() |
|
|
| |
| |
| |
| |
| |
|
|
| |
| try: |
| primary_func = _get_provider_function(function_name, primary_provider) |
| logger.info(f"[{function_name}] Using provider: {primary_provider}") |
| return retry_with_backoff(lambda: primary_func(*args, **kwargs)) |
| except Exception as primary_error: |
| logger.error(f"[{function_name}] Provider {primary_provider} failed: {primary_error}") |
| raise Exception( |
| f"{function_name} failed with {primary_provider}: {primary_error}" |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def create_claude_client() -> Anthropic: |
| """Initialize Anthropic client with API key from environment.""" |
| api_key = os.getenv("ANTHROPIC_API_KEY") |
| if not api_key: |
| raise ValueError("ANTHROPIC_API_KEY environment variable not set") |
|
|
| logger.info(f"Initializing Anthropic client with model: {CLAUDE_MODEL}") |
| return Anthropic(api_key=api_key) |
|
|
|
|
| def create_gemini_client(): |
| """Initialize Gemini client with API key from environment.""" |
| api_key = os.getenv("GOOGLE_API_KEY") |
| if not api_key: |
| raise ValueError("GOOGLE_API_KEY environment variable not set") |
|
|
| genai.configure(api_key=api_key) |
| logger.info(f"Initializing Gemini client with model: {GEMINI_MODEL}") |
| return genai.GenerativeModel(GEMINI_MODEL) |
|
|
|
|
| def create_hf_client() -> InferenceClient: |
| """Initialize HuggingFace Inference API client with token from environment.""" |
| hf_token = os.getenv("HF_TOKEN") |
| if not hf_token: |
| raise ValueError("HF_TOKEN environment variable not set") |
|
|
| logger.info(f"Initializing HuggingFace Inference client with model: {HF_MODEL}") |
| return InferenceClient(model=HF_MODEL, token=hf_token) |
|
|
|
|
| def create_groq_client() -> Groq: |
| """Initialize Groq client with API key from environment.""" |
| api_key = os.getenv("GROQ_API_KEY") |
| if not api_key: |
| raise ValueError("GROQ_API_KEY environment variable not set") |
|
|
| logger.info(f"Initializing Groq client with model: {GROQ_MODEL}") |
| return Groq(api_key=api_key) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def plan_question_claude( |
| question: str, |
| available_tools: Dict[str, Dict], |
| file_paths: Optional[List[str]] = None, |
| ) -> str: |
| """Analyze question and generate execution plan using Claude.""" |
| client = create_claude_client() |
|
|
| |
| tool_descriptions = [] |
| for name, info in available_tools.items(): |
| tool_descriptions.append( |
| f"- {name}: {info['description']} (Category: {info['category']})" |
| ) |
| tools_text = "\n".join(tool_descriptions) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f"\n\nAvailable files:\n" + "\n".join( |
| [f"- {fp}" for fp in file_paths] |
| ) |
|
|
| |
| system_prompt = """You are a planning agent for answering complex questions. |
| |
| Your task is to analyze the question and create a step-by-step execution plan. |
| |
| Consider: |
| 1. What information is needed to answer the question? |
| 2. Which tools can provide that information? |
| 3. In what order should tools be executed? |
| 4. What parameters need to be extracted from the question? |
| |
| Generate a concise plan with numbered steps.""" |
|
|
| user_prompt = f"""Question: {question}{file_context} |
| |
| Available tools: |
| {tools_text} |
| |
| Create an execution plan to answer this question. Format as numbered steps.""" |
|
|
| logger.info(f"[plan_question_claude] Calling Claude for planning") |
|
|
| response = client.messages.create( |
| model=CLAUDE_MODEL, |
| max_tokens=MAX_TOKENS, |
| temperature=TEMPERATURE, |
| system=system_prompt, |
| messages=[{"role": "user", "content": user_prompt}], |
| ) |
|
|
| plan = response.content[0].text |
| logger.info(f"[plan_question_claude] Generated plan ({len(plan)} chars)") |
|
|
| return plan |
|
|
|
|
| |
| |
| |
|
|
|
|
| def plan_question_gemini( |
| question: str, |
| available_tools: Dict[str, Dict], |
| file_paths: Optional[List[str]] = None, |
| ) -> str: |
| """Analyze question and generate execution plan using Gemini.""" |
| model = create_gemini_client() |
|
|
| |
| tool_descriptions = [] |
| for name, info in available_tools.items(): |
| tool_descriptions.append( |
| f"- {name}: {info['description']} (Category: {info['category']})" |
| ) |
| tools_text = "\n".join(tool_descriptions) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f"\n\nAvailable files:\n" + "\n".join( |
| [f"- {fp}" for fp in file_paths] |
| ) |
|
|
| |
| prompt = f"""You are a planning agent for answering complex questions. |
| |
| Your task is to analyze the question and create a step-by-step execution plan. |
| |
| Consider: |
| 1. What information is needed to answer the question? |
| 2. Which tools can provide that information? |
| 3. In what order should tools be executed? |
| 4. What parameters need to be extracted from the question? |
| |
| Generate a concise plan with numbered steps. |
| |
| Question: {question}{file_context} |
| |
| Available tools: |
| {tools_text} |
| |
| Create an execution plan to answer this question. Format as numbered steps.""" |
|
|
| logger.info(f"[plan_question_gemini] Calling Gemini for planning") |
|
|
| response = model.generate_content( |
| prompt, |
| generation_config=genai.types.GenerationConfig( |
| temperature=TEMPERATURE, max_output_tokens=MAX_TOKENS |
| ), |
| ) |
|
|
| plan = response.text |
| logger.info(f"[plan_question_gemini] Generated plan ({len(plan)} chars)") |
|
|
| return plan |
|
|
|
|
| |
| |
| |
|
|
|
|
| def plan_question_hf( |
| question: str, |
| available_tools: Dict[str, Dict], |
| file_paths: Optional[List[str]] = None, |
| ) -> str: |
| """Analyze question and generate execution plan using HuggingFace Inference API.""" |
| client = create_hf_client() |
|
|
| |
| tool_descriptions = [] |
| for name, info in available_tools.items(): |
| tool_descriptions.append( |
| f"- {name}: {info['description']} (Category: {info['category']})" |
| ) |
| tools_text = "\n".join(tool_descriptions) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f"\n\nAvailable files:\n" + "\n".join( |
| [f"- {fp}" for fp in file_paths] |
| ) |
|
|
| |
| system_prompt = """You are a planning agent for answering complex questions. |
| |
| Your task is to analyze the question and create a step-by-step execution plan. |
| |
| Consider: |
| 1. What information is needed to answer the question? |
| 2. Which tools can provide that information? |
| 3. In what order should tools be executed? |
| 4. What parameters need to be extracted from the question? |
| |
| Generate a concise plan with numbered steps.""" |
|
|
| user_prompt = f"""Question: {question}{file_context} |
| |
| Available tools: |
| {tools_text} |
| |
| Create an execution plan to answer this question. Format as numbered steps.""" |
|
|
| logger.info(f"[plan_question_hf] Calling HuggingFace ({HF_MODEL}) for planning") |
|
|
| |
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ] |
|
|
| response = client.chat_completion( |
| messages=messages, max_tokens=MAX_TOKENS, temperature=TEMPERATURE |
| ) |
|
|
| plan = response.choices[0].message.content |
| logger.info(f"[plan_question_hf] Generated plan ({len(plan)} chars)") |
|
|
| return plan |
|
|
|
|
| |
| |
| |
|
|
|
|
| def plan_question_groq( |
| question: str, |
| available_tools: Dict[str, Dict], |
| file_paths: Optional[List[str]] = None, |
| ) -> str: |
| """Analyze question and generate execution plan using Groq.""" |
| client = create_groq_client() |
|
|
| |
| tool_descriptions = [] |
| for name, info in available_tools.items(): |
| tool_descriptions.append( |
| f"- {name}: {info['description']} (Category: {info['category']})" |
| ) |
| tools_text = "\n".join(tool_descriptions) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f"\n\nAvailable files:\n" + "\n".join( |
| [f"- {fp}" for fp in file_paths] |
| ) |
|
|
| |
| system_prompt = """You are a planning agent for answering complex questions. |
| |
| Your task is to analyze the question and create a step-by-step execution plan. |
| |
| Consider: |
| 1. What information is needed to answer the question? |
| 2. Which tools can provide that information? |
| 3. In what order should tools be executed? |
| 4. What parameters need to be extracted from the question? |
| |
| Generate a concise plan with numbered steps.""" |
|
|
| user_prompt = f"""Question: {question}{file_context} |
| |
| Available tools: |
| {tools_text} |
| |
| Create an execution plan to answer this question. Format as numbered steps.""" |
|
|
| logger.info(f"[plan_question_groq] Calling Groq ({GROQ_MODEL}) for planning") |
|
|
| |
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ] |
|
|
| response = client.chat.completions.create( |
| model=GROQ_MODEL, |
| messages=messages, |
| max_tokens=MAX_TOKENS, |
| temperature=TEMPERATURE, |
| ) |
|
|
| plan = response.choices[0].message.content |
| logger.info(f"[plan_question_groq] Generated plan ({len(plan)} chars)") |
|
|
| return plan |
|
|
|
|
| |
| |
| |
|
|
|
|
| def plan_question( |
| question: str, |
| available_tools: Dict[str, Dict], |
| file_paths: Optional[List[str]] = None, |
| ) -> str: |
| """ |
| Analyze question and generate execution plan using LLM. |
| |
| Uses LLM_PROVIDER config to select which provider to use. |
| If ENABLE_LLM_FALLBACK=true, falls back to other providers on failure. |
| Each provider call wrapped with retry logic (3 attempts with exponential backoff). |
| |
| Args: |
| question: GAIA question text |
| available_tools: Tool registry (name -> {description, category, parameters}) |
| file_paths: Optional list of file paths for file-based questions |
| |
| Returns: |
| Execution plan as structured text |
| """ |
| return _call_with_fallback("plan_question", question, available_tools, file_paths) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def select_tools_claude( |
| question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None |
| ) -> List[Dict[str, Any]]: |
| """Use Claude function calling to select tools and extract parameters.""" |
| client = create_claude_client() |
|
|
| |
| tool_schemas = [] |
| for name, info in available_tools.items(): |
| tool_schemas.append( |
| { |
| "name": name, |
| "description": info["description"], |
| "input_schema": { |
| "type": "object", |
| "properties": info.get("parameters", {}), |
| "required": info.get("required_params", []), |
| }, |
| } |
| ) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f""" |
| |
| IMPORTANT: These files are available for this question: |
| {chr(10).join(f"- {fp}" for fp in file_paths)} |
| |
| When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg". |
| For vision tools with images: vision(image_path="<actual_file_path>") |
| For file parsing tools: parse_file(file_path="<actual_file_path>")""" |
|
|
| system_prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters. |
| |
| Few-shot examples: |
| - "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums") |
| - "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100") |
| - "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg") |
| - "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx") |
| |
| Execute the plan step by step. Extract correct parameters from the question. |
| Use actual file paths when files are provided.{file_context} |
| |
| Plan: |
| {plan}""" |
|
|
| user_prompt = f"""Question: {question} |
| |
| Select and call the tools needed according to the plan. Use exact parameter names from tool schemas.""" |
|
|
| logger.info( |
| f"[select_tools_claude] Calling Claude with function calling for {len(tool_schemas)} tools" |
| ) |
|
|
| response = client.messages.create( |
| model=CLAUDE_MODEL, |
| max_tokens=MAX_TOKENS, |
| temperature=TEMPERATURE, |
| system=system_prompt, |
| messages=[{"role": "user", "content": user_prompt}], |
| tools=tool_schemas, |
| ) |
|
|
| |
| tool_calls = [] |
| for content_block in response.content: |
| if content_block.type == "tool_use": |
| tool_calls.append( |
| { |
| "tool": content_block.name, |
| "params": content_block.input, |
| "id": content_block.id, |
| } |
| ) |
|
|
| logger.info(f"[select_tools_claude] Claude selected {len(tool_calls)} tool(s)") |
|
|
| return tool_calls |
|
|
|
|
| |
| |
| |
|
|
|
|
| def select_tools_gemini( |
| question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None |
| ) -> List[Dict[str, Any]]: |
| """Use Gemini function calling to select tools and extract parameters.""" |
| model = create_gemini_client() |
|
|
| |
| tools = [] |
| for name, info in available_tools.items(): |
| tools.append( |
| genai.protos.Tool( |
| function_declarations=[ |
| genai.protos.FunctionDeclaration( |
| name=name, |
| description=info["description"], |
| parameters=genai.protos.Schema( |
| type=genai.protos.Type.OBJECT, |
| properties={ |
| param_name: genai.protos.Schema( |
| type=genai.protos.Type.STRING, |
| description=param_info.get("description", ""), |
| ) |
| for param_name, param_info in info.get( |
| "parameters", {} |
| ).items() |
| }, |
| required=info.get("required_params", []), |
| ), |
| ) |
| ] |
| ) |
| ) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f""" |
| |
| IMPORTANT: These files are available for this question: |
| {chr(10).join(f"- {fp}" for fp in file_paths)} |
| |
| When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg". |
| For vision tools with images: vision(image_path="<actual_file_path>") |
| For file parsing tools: parse_file(file_path="<actual_file_path>")""" |
|
|
| prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters. |
| |
| Few-shot examples: |
| - "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums") |
| - "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100") |
| - "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg") |
| - "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx") |
| |
| Execute the plan step by step. Extract correct parameters from the question. |
| Use actual file paths when files are provided.{file_context} |
| |
| Plan: |
| {plan} |
| |
| Question: {question} |
| |
| Select and call the tools needed according to the plan. Use exact parameter names from tool schemas.""" |
|
|
| logger.info( |
| f"[select_tools_gemini] Calling Gemini with function calling for {len(available_tools)} tools" |
| ) |
|
|
| response = model.generate_content( |
| prompt, |
| tools=tools, |
| generation_config=genai.types.GenerationConfig( |
| temperature=TEMPERATURE, max_output_tokens=MAX_TOKENS |
| ), |
| ) |
|
|
| |
| tool_calls = [] |
| for part in response.parts: |
| if hasattr(part, "function_call") and part.function_call: |
| fc = part.function_call |
| tool_calls.append( |
| { |
| "tool": fc.name, |
| "params": dict(fc.args), |
| "id": f"gemini_{len(tool_calls)}", |
| } |
| ) |
|
|
| logger.info(f"[select_tools_gemini] Gemini selected {len(tool_calls)} tool(s)") |
|
|
| return tool_calls |
|
|
|
|
| |
| |
| |
|
|
|
|
| def select_tools_hf( |
| question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None |
| ) -> List[Dict[str, Any]]: |
| """Use HuggingFace Inference API with function calling to select tools and extract parameters.""" |
| client = create_hf_client() |
|
|
| |
| tools = [] |
| for name, info in available_tools.items(): |
| tool_schema = { |
| "type": "function", |
| "function": { |
| "name": name, |
| "description": info["description"], |
| "parameters": { |
| "type": "object", |
| "properties": {}, |
| "required": info.get("required_params", []), |
| }, |
| }, |
| } |
|
|
| |
| for param_name, param_info in info.get("parameters", {}).items(): |
| tool_schema["function"]["parameters"]["properties"][param_name] = { |
| "type": param_info.get("type", "string"), |
| "description": param_info.get("description", ""), |
| } |
|
|
| tools.append(tool_schema) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f""" |
| |
| IMPORTANT: These files are available for this question: |
| {chr(10).join(f"- {fp}" for fp in file_paths)} |
| |
| When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg". |
| For vision tools with images: vision(image_path="<actual_file_path>") |
| For file parsing tools: parse_file(file_path="<actual_file_path>")""" |
|
|
| system_prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters. |
| |
| Few-shot examples: |
| - "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums") |
| - "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100") |
| - "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg") |
| - "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx") |
| |
| Execute the plan step by step. Extract correct parameters from the question. |
| Use actual file paths when files are provided.{file_context} |
| |
| Plan: |
| {plan}""" |
|
|
| user_prompt = f"""Question: {question} |
| |
| Select and call the tools needed according to the plan. Use exact parameter names from tool schemas.""" |
|
|
| logger.info( |
| f"[select_tools_hf] Calling HuggingFace with function calling for {len(tools)} tools, file_paths={file_paths}" |
| ) |
|
|
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ] |
|
|
| |
| response = client.chat_completion( |
| messages=messages, tools=tools, max_tokens=MAX_TOKENS, temperature=TEMPERATURE |
| ) |
|
|
| |
| tool_calls = [] |
| if ( |
| hasattr(response.choices[0].message, "tool_calls") |
| and response.choices[0].message.tool_calls |
| ): |
| for tool_call in response.choices[0].message.tool_calls: |
| import json |
|
|
| tool_calls.append( |
| { |
| "tool": tool_call.function.name, |
| "params": json.loads(tool_call.function.arguments), |
| "id": tool_call.id, |
| } |
| ) |
|
|
| logger.info(f"[select_tools_hf] HuggingFace selected {len(tool_calls)} tool(s)") |
|
|
| return tool_calls |
|
|
|
|
| |
| |
| |
|
|
|
|
| def select_tools_groq( |
| question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None |
| ) -> List[Dict[str, Any]]: |
| """Use Groq with function calling to select tools and extract parameters.""" |
| client = create_groq_client() |
|
|
| |
| tools = [] |
| for name, info in available_tools.items(): |
| tool_schema = { |
| "type": "function", |
| "function": { |
| "name": name, |
| "description": info["description"], |
| "parameters": { |
| "type": "object", |
| "properties": {}, |
| "required": info.get("required_params", []), |
| }, |
| }, |
| } |
|
|
| |
| for param_name, param_info in info.get("parameters", {}).items(): |
| tool_schema["function"]["parameters"]["properties"][param_name] = { |
| "type": param_info.get("type", "string"), |
| "description": param_info.get("description", ""), |
| } |
|
|
| tools.append(tool_schema) |
|
|
| |
| file_context = "" |
| if file_paths: |
| file_context = f""" |
| |
| IMPORTANT: These files are available for this question: |
| {chr(10).join(f"- {fp}" for fp in file_paths)} |
| |
| When selecting tools, use the ACTUAL file paths listed above. Do NOT use placeholder paths like "<provided_path>" or "path_to_chess_image.jpg". |
| For vision tools with images: vision(image_path="<actual_file_path>") |
| For file parsing tools: parse_file(file_path="<actual_file_path>")""" |
|
|
| system_prompt = f"""You are a tool selection expert. Based on the question and execution plan, select appropriate tools with correct parameters. |
| |
| Few-shot examples: |
| - "How many albums did The Beatles release?" → web_search(query="Beatles discography number of albums") |
| - "What is 25 * 37 + 100?" → calculator(expression="25 * 37 + 100") |
| - "Analyze the image at example.com/pic.jpg" → vision(image_path="example.com/pic.jpg") |
| - "What's in the uploaded Excel file?" → parse_file(file_path="actual_file.xlsx") |
| |
| Execute the plan step by step. Extract correct parameters from the question. |
| Use actual file paths when files are provided.{file_context} |
| |
| Plan: |
| {plan}""" |
|
|
| user_prompt = f"""Question: {question} |
| |
| Select and call the tools needed according to the plan. Use exact parameter names from tool schemas.""" |
|
|
| logger.info( |
| f"[select_tools_groq] Calling Groq with function calling for {len(tools)} tools" |
| ) |
|
|
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ] |
|
|
| |
| response = client.chat.completions.create( |
| model=GROQ_MODEL, |
| messages=messages, |
| tools=tools, |
| max_tokens=MAX_TOKENS, |
| temperature=TEMPERATURE, |
| ) |
|
|
| |
| tool_calls = [] |
| if ( |
| hasattr(response.choices[0].message, "tool_calls") |
| and response.choices[0].message.tool_calls |
| ): |
| for tool_call in response.choices[0].message.tool_calls: |
| import json |
|
|
| tool_calls.append( |
| { |
| "tool": tool_call.function.name, |
| "params": json.loads(tool_call.function.arguments), |
| "id": tool_call.id, |
| } |
| ) |
|
|
| logger.info(f"[select_tools_groq] Groq selected {len(tool_calls)} tool(s)") |
|
|
| return tool_calls |
|
|
|
|
| |
| |
| |
|
|
|
|
| def select_tools_with_function_calling( |
| question: str, plan: str, available_tools: Dict[str, Dict], file_paths: Optional[List[str]] = None |
| ) -> List[Dict[str, Any]]: |
| """ |
| Use LLM function calling to dynamically select tools and extract parameters. |
| |
| Uses LLM_PROVIDER config to select which provider to use. |
| If ENABLE_LLM_FALLBACK=true, falls back to other providers on failure. |
| Each provider call wrapped with retry logic (3 attempts with exponential backoff). |
| |
| Args: |
| question: GAIA question text |
| plan: Execution plan from planning phase |
| available_tools: Tool registry |
| file_paths: Optional list of downloaded file paths for file-based questions |
| |
| Returns: |
| List of tool calls with extracted parameters |
| """ |
| return _call_with_fallback("select_tools", question, plan, available_tools, file_paths) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def synthesize_answer_claude(question: str, evidence: List[str]) -> str: |
| """Synthesize factoid answer from evidence using Claude.""" |
| client = create_claude_client() |
|
|
| |
| evidence_text = "\n\n".join( |
| [f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)] |
| ) |
|
|
| system_prompt = """You are an answer synthesis agent for the GAIA benchmark. |
| |
| Your task is to extract a factoid answer from the provided evidence. |
| |
| CRITICAL - Response format (two parts): |
| 1. **REASONING** - Show your step-by-step thought process: |
| - What information is in the evidence? |
| - What is the question asking for? |
| - How do you extract the answer from the evidence? |
| - Any ambiguities or uncertainties? |
| |
| 2. **FINAL ANSWER** - The factoid answer only: |
| - A number, a few words, or a comma-separated list |
| - No explanations, just the answer |
| - If evidence is insufficient, state "Unable to answer" |
| |
| Response format: |
| REASONING: [Your step-by-step thought process here] |
| FINAL ANSWER: [The factoid answer] |
| |
| Examples: |
| REASONING: The evidence mentions the population of Tokyo is 13.9 million. The question asks for the city with highest population. Tokyo is listed as the highest. |
| FINAL ANSWER: Tokyo |
| |
| REASONING: The transcript mentions "giant petrel", "emperor", and "adelie" (with typo "deli"). These are three different bird species present in the same scene. |
| FINAL ANSWER: 3 |
| """ |
|
|
| user_prompt = f"""Question: {question} |
| |
| {evidence_text} |
| |
| Extract the factoid answer from the evidence above. Return only the factoid, nothing else.""" |
|
|
| logger.info(f"[synthesize_answer_claude] Calling Claude for answer synthesis") |
|
|
| response = client.messages.create( |
| model=CLAUDE_MODEL, |
| max_tokens=256, |
| temperature=TEMPERATURE, |
| system=system_prompt, |
| messages=[{"role": "user", "content": user_prompt}], |
| ) |
|
|
| answer = response.content[0].text.strip() |
| logger.info(f"[synthesize_answer_claude] Generated answer: {answer}") |
|
|
| return answer |
|
|
|
|
| |
| |
| |
|
|
|
|
| def synthesize_answer_gemini(question: str, evidence: List[str]) -> str: |
| """Synthesize factoid answer from evidence using Gemini.""" |
| model = create_gemini_client() |
|
|
| |
| evidence_text = "\n\n".join( |
| [f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)] |
| ) |
|
|
| prompt = f"""You are an answer synthesis agent for the GAIA benchmark. |
| |
| Your task is to extract a factoid answer from the provided evidence. |
| |
| CRITICAL - Answer format requirements: |
| 1. Answers must be factoids: a number, a few words, or a comma-separated list |
| 2. Be concise - no explanations, just the answer |
| 3. If evidence conflicts, evaluate source credibility and recency |
| 4. If evidence is insufficient, state "Unable to answer" |
| |
| Examples of good factoid answers: |
| - "42" |
| - "Paris" |
| - "Albert Einstein" |
| - "red, blue, green" |
| - "1969-07-20" |
| |
| Examples of bad answers (too verbose): |
| - "The answer is 42 because..." |
| - "Based on the evidence, it appears that..." |
| |
| Question: {question} |
| |
| {evidence_text} |
| |
| Extract the factoid answer from the evidence above. Return only the factoid, nothing else.""" |
|
|
| logger.info(f"[synthesize_answer_gemini] Calling Gemini for answer synthesis") |
|
|
| response = model.generate_content( |
| prompt, |
| generation_config=genai.types.GenerationConfig( |
| temperature=TEMPERATURE, |
| max_output_tokens=256, |
| ), |
| ) |
|
|
| answer = response.text.strip() |
| logger.info(f"[synthesize_answer_gemini] Generated answer: {answer}") |
|
|
| return answer |
|
|
|
|
| |
| |
| |
|
|
|
|
| def synthesize_answer_hf(question: str, evidence: List[str]) -> str: |
| """Synthesize factoid answer from evidence using HuggingFace Inference API.""" |
| global _SYSTEM_PROMPT_WRITTEN |
|
|
| client = create_hf_client() |
|
|
| |
| evidence_text = "\n\n".join( |
| [f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)] |
| ) |
|
|
| system_prompt = """You are an answer synthesis agent for the GAIA benchmark. |
| |
| Your task is to extract a factoid answer from the provided evidence. |
| |
| CRITICAL - Response format (two parts): |
| 1. **REASONING** - Show your step-by-step thought process: |
| - What information is in the evidence? |
| - What is the question asking for? |
| - How do you extract the answer from the evidence? |
| - Any ambiguities or uncertainties? |
| |
| 2. **FINAL ANSWER** - The factoid answer only: |
| - A number, a few words, or a comma-separated list |
| - No explanations, just the answer |
| - If evidence is insufficient, state "Unable to answer" |
| |
| Response format: |
| REASONING: [Your step-by-step thought process here] |
| FINAL ANSWER: [The factoid answer] |
| |
| Examples: |
| REASONING: The evidence mentions the population of Tokyo is 13.9 million. The question asks for the city with highest population. Tokyo is listed as the highest. |
| FINAL ANSWER: Tokyo |
| |
| REASONING: The transcript mentions "giant petrel", "emperor", and "adelie" (with typo "deli"). These are three different bird species present in the same scene. |
| FINAL ANSWER: 3 |
| """ |
|
|
| user_prompt = f"""Question: {question} |
| |
| {evidence_text} |
| |
| Extract the factoid answer from the evidence above. Return only the factoid, nothing else.""" |
|
|
| |
| |
| |
| context_file = get_session_log_file() |
| question_timestamp = datetime.datetime.now().isoformat() |
|
|
| |
| system_prompt_section = "" |
| if not _SYSTEM_PROMPT_WRITTEN: |
| system_prompt_section = f""" |
| |
| ## System Prompt (static - used for all questions) |
| |
| ```text |
| {system_prompt} |
| ``` |
| """ |
| _SYSTEM_PROMPT_WRITTEN = True |
|
|
| question_header = f""" |
| ## Question [{question_timestamp}] |
| |
| **Question:** {question} |
| **Evidence items:** {len(evidence)} |
| {system_prompt_section} |
| |
| ### Evidence & Prompt |
| |
| ```text |
| {user_prompt} |
| ``` |
| """ |
|
|
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ] |
|
|
| response = client.chat_completion( |
| messages=messages, |
| max_tokens=1024, |
| temperature=TEMPERATURE, |
| ) |
|
|
| full_response = response.choices[0].message.content.strip() |
|
|
| |
| if "FINAL ANSWER:" in full_response: |
| parts = full_response.split("FINAL ANSWER:") |
| answer = parts[-1].strip() |
| reasoning = parts[0].replace("REASONING:", "").strip() |
| else: |
| |
| answer = full_response |
| reasoning = "No reasoning provided (format not followed)" |
|
|
| logger.info(f"[synthesize_answer_hf] Answer: {answer}") |
|
|
| |
| |
| |
| complete_block = f"""{question_header} |
| |
| ### LLM Response |
| |
| ```text |
| {full_response} |
| ``` |
| |
| **Extracted Answer:** `{answer}` |
| |
| """ |
|
|
| with open(context_file, "a", encoding="utf-8") as f: |
| f.write(complete_block) |
|
|
| return answer |
|
|
|
|
| |
| |
| |
|
|
|
|
| def synthesize_answer_groq(question: str, evidence: List[str]) -> str: |
| """Synthesize factoid answer from evidence using Groq.""" |
| client = create_groq_client() |
|
|
| |
| evidence_text = "\n\n".join( |
| [f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)] |
| ) |
|
|
| system_prompt = """You are an answer synthesis agent for the GAIA benchmark. |
| |
| Your task is to extract a factoid answer from the provided evidence. |
| |
| CRITICAL - Response format (two parts): |
| 1. **REASONING** - Show your step-by-step thought process: |
| - What information is in the evidence? |
| - What is the question asking for? |
| - How do you extract the answer from the evidence? |
| - Any ambiguities or uncertainties? |
| |
| 2. **FINAL ANSWER** - The factoid answer only: |
| - A number, a few words, or a comma-separated list |
| - No explanations, just the answer |
| - If evidence is insufficient, state "Unable to answer" |
| |
| Response format: |
| REASONING: [Your step-by-step thought process here] |
| FINAL ANSWER: [The factoid answer] |
| |
| Examples: |
| REASONING: The evidence mentions the population of Tokyo is 13.9 million. The question asks for the city with highest population. Tokyo is listed as the highest. |
| FINAL ANSWER: Tokyo |
| |
| REASONING: The transcript mentions "giant petrel", "emperor", and "adelie" (with typo "deli"). These are three different bird species present in the same scene. |
| FINAL ANSWER: 3 |
| """ |
|
|
| user_prompt = f"""Question: {question} |
| |
| {evidence_text} |
| |
| Extract the factoid answer from the evidence above. Return only the factoid, nothing else.""" |
|
|
| logger.info(f"[synthesize_answer_groq] Calling Groq for answer synthesis") |
|
|
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_prompt}, |
| ] |
|
|
| response = client.chat.completions.create( |
| model=GROQ_MODEL, |
| messages=messages, |
| max_tokens=256, |
| temperature=TEMPERATURE, |
| ) |
|
|
| answer = response.choices[0].message.content.strip() |
| logger.info(f"[synthesize_answer_groq] Generated answer: {answer}") |
|
|
| return answer |
|
|
|
|
| |
| |
| |
|
|
|
|
| def synthesize_answer(question: str, evidence: List[str]) -> str: |
| """ |
| Synthesize factoid answer from collected evidence using LLM. |
| |
| Uses LLM_PROVIDER config to select which provider to use. |
| If ENABLE_LLM_FALLBACK=true, falls back to other providers on failure. |
| Each provider call wrapped with retry logic (3 attempts with exponential backoff). |
| |
| Args: |
| question: Original GAIA question |
| evidence: List of evidence strings from tool executions |
| |
| Returns: |
| Factoid answer string |
| """ |
| return _call_with_fallback("synthesize_answer", question, evidence) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def resolve_conflicts(evidence: List[str]) -> Dict[str, Any]: |
| """ |
| Detect and resolve conflicts in evidence using LLM reasoning. |
| |
| Optional function for advanced conflict handling. |
| Currently integrated into synthesize_answer(). |
| Uses same Gemini primary, Claude fallback pattern. |
| |
| Args: |
| evidence: List of evidence strings that may conflict |
| |
| Returns: |
| Dictionary with conflict analysis |
| """ |
| try: |
| |
| model = create_gemini_client() |
| evidence_text = "\n\n".join( |
| [f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)] |
| ) |
|
|
| prompt = f"""You are a conflict detection agent. |
| |
| Analyze the provided evidence and identify any contradictions or conflicts. |
| |
| Evaluate: |
| 1. Are there contradictory facts? |
| 2. Which sources are more credible? |
| 3. Which information is more recent? |
| 4. How should conflicts be resolved? |
| |
| Analyze this evidence for conflicts: |
| |
| {evidence_text} |
| |
| Respond in JSON format: |
| {{ |
| "has_conflicts": true/false, |
| "conflicts": ["description of conflict 1", ...], |
| "resolution": "recommended resolution strategy" |
| }}""" |
|
|
| logger.info(f"[resolve_conflicts] Analyzing with Gemini") |
|
|
| response = model.generate_content(prompt) |
|
|
| result = {"has_conflicts": False, "conflicts": [], "resolution": response.text} |
|
|
| return result |
|
|
| except Exception as gemini_error: |
| logger.warning( |
| f"[resolve_conflicts] Gemini failed: {gemini_error}, trying Claude" |
| ) |
|
|
| |
| client = create_claude_client() |
| evidence_text = "\n\n".join( |
| [f"Evidence {i + 1}:\n{e}" for i, e in enumerate(evidence)] |
| ) |
|
|
| system_prompt = """You are a conflict detection agent. |
| |
| Analyze the provided evidence and identify any contradictions or conflicts. |
| |
| Evaluate: |
| 1. Are there contradictory facts? |
| 2. Which sources are more credible? |
| 3. Which information is more recent? |
| 4. How should conflicts be resolved?""" |
|
|
| user_prompt = f"""Analyze this evidence for conflicts: |
| |
| {evidence_text} |
| |
| Respond in JSON format: |
| {{ |
| "has_conflicts": true/false, |
| "conflicts": ["description of conflict 1", ...], |
| "resolution": "recommended resolution strategy" |
| }}""" |
|
|
| response = client.messages.create( |
| model=CLAUDE_MODEL, |
| max_tokens=MAX_TOKENS, |
| temperature=TEMPERATURE, |
| system=system_prompt, |
| messages=[{"role": "user", "content": user_prompt}], |
| ) |
|
|
| result = { |
| "has_conflicts": False, |
| "conflicts": [], |
| "resolution": response.content[0].text, |
| } |
|
|
| return result |
|
|