from llama_index.llms.google_genai import GoogleGenAI
from llama_index.llms.gemini import Gemini
from llama_index.tools.arxiv import ArxivToolSpec
from llama_index.tools.wikipedia import WikipediaToolSpec
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import AgentWorkflow, ReActAgent
from llama_index.llms.lmstudio import LMStudio
from llama_index.core.agent.workflow import (
AgentStream,
AgentOutput
)
from gradio import ChatMessage
from llama_index.core.base.llms.types import ChatMessage as llama_index_chat_message
from tools import interpret_python_math_code, image_understanding, convert_audio_to_text, video_understanding, read_csv_file, read_xlsx_file
from gaia_system_prompt import GAIA_SYSTEM_PROMPT, CUSTOM_SYSTEM_PROMPT
import os
import asyncio
TIMEOUT=180 # Timeout for agent execution in seconds
GEMINI_API_KEY = os.getenv("GEMINI_TOKEN")
GEMINI_OPENAI_API_DIR = "https://generativelanguage.googleapis.com/v1beta/openai/"
GEMINI_MODEL_NAME = "gemini-2.0-flash"
LMSTUDIO_MODEL_NAME = "gemma-3-12B-it-qat-GGUF"
API_DIR = "http://host.docker.internal:1234/v1" # LM Studio API URL
class FinalAgent:
def __init__(self):
# LLM Initialization
# self.llm = GoogleGenAI(model=GEMINI_MODEL_NAME, api_key=GEMINI_API_KEY)
self.llm = Gemini(model=GEMINI_MODEL_NAME, api_key=GEMINI_API_KEY)
# self.llm = LMStudio(model_name=LMSTUDIO_MODEL_NAME, base_url=API_DIR, request_timeout=180, temperature=0.1)
# Tool Initialization
self.tools = [
FunctionTool.from_defaults(
fn=interpret_python_math_code,
name="InterpretPythonMathCode",
description="Interprets Python code for mathematical expressions."
),
FunctionTool.from_defaults(
fn=image_understanding,
name="ImageUnderstanding",
description="Analyzes an image and generates a response to a given question based on the image's content."
),
FunctionTool.from_defaults(
fn=convert_audio_to_text,
name="ConvertAudioToText",
description="Converts audio files to text using a speech-to-text model."
),
FunctionTool.from_defaults(
fn=video_understanding,
name="VideoUnderstanding",
description="Analyzes a video and generates a response to a given question based on the video's content."
),
FunctionTool.from_defaults(
fn=read_csv_file,
name="ReadCSVFile",
description="Reads a CSV file and returns its content as a string."
),
FunctionTool.from_defaults(
fn=read_xlsx_file,
name="ReadXLSXFile",
description="Reads an XLSX file and returns its content as a string."
)
]
self.tools.extend(
ArxivToolSpec().to_tool_list()
)
self.tools.extend(
WikipediaToolSpec().to_tool_list()
)
self.tools.extend(
DuckDuckGoSearchToolSpec().to_tool_list()
)
# Print the tools for debugging
print("Tools initialized:")
for tool in self.tools:
print(f"- {tool._metadata}")
# Agent Workflow Initialization
self.agent = AgentWorkflow.from_tools_or_functions(
tools_or_functions=self.tools,
llm=self.llm,
system_prompt=CUSTOM_SYSTEM_PROMPT,
timeout=TIMEOUT
)
# self.agent = ReActAgent(
# llm=self.llm,
# verbose=True,
# max_iterations=5,
# system_prompt=CUSTOM_SYSTEM_PROMPT,
# tools=self.tools
# )
print("FinalAgent initialized.")
# async def __call__(self, question: str) -> str:
# # Example
# print(f"Agent received question: {question}")
# # fixed_answer = "This is a default answer."
# # print(f"Agent returning fixed answer: {fixed_answer}")
# # response = fixed_answer
# # Implement agent logic here
# response = ""
# # Run the agent with the question
# stream = await self.agent.run(question)
# response = stream.response.content
# # async for event in stream.stream_events():
# # if isinstance(event, AgentStream):
# # # Check if delta is empty
# # if event.raw["choices"][0]["delta"] != {}:
# # response += event.raw["choices"][0]["delta"]["content"]
# print(f"Agent response: {response}")
# return response
async def __call__(self, question: str) -> str:
print(f"Agent received question: {question}")
response_str = ""
try:
# Use arun for an async method.
agent_chat_response = await self.agent.run(question)
print(agent_chat_response)
potential_response_obj = agent_chat_response.response
if isinstance(potential_response_obj, ChatMessage):
# If it's a ChatMessage, its .content attribute should hold the string
print(f"DEBUG: Response object is ChatMessage. Role: {potential_response_obj.role}")
response_str = potential_response_obj.content
if response_str is None: # Handle cases where content might be None
print("DEBUG: ChatMessage content is None, defaulting to empty string.")
response_str = ""
elif isinstance(potential_response_obj, str):
# If it's already a string
print("DEBUG: Response object is str.")
response_str = potential_response_obj
elif isinstance(potential_response_obj, llama_index_chat_message):
# If it's a llama_index ChatMessage, use its .content attribute
print(f"DEBUG: Response object is llama_index ChatMessage. Role: {potential_response_obj.role}")
response_str = potential_response_obj.content
if response_str is None:
print("DEBUG: llama_index ChatMessage content is None, defaulting to empty string.")
response_str = ""
else:
# Fallback if it's some other type
print(f"Warning: Agent response was of unexpected type: {type(potential_response_obj)}. Converting to string.")
response_str = str(potential_response_obj)
except Exception as e:
print(f"Error during agent execution with LLM {self.llm.__class__.__name__}: {e}")
# Depending on requirements, you might want to return an error message or re-raise
response_str = f"Agent error: {e}"
# Get the agent's final response between and tags
if "" in response_str and "" in response_str:
start_index = response_str.index("") + len("")
end_index = response_str.index("")
response_str = response_str[start_index:end_index].strip()
else:
print("Warning: No tags found in the response.")
return response_str
# async def main():
# # Example usage
# agent = FinalAgent()
# question = "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia."
# answer = await agent(question)
# print(f"Final answer: {answer}")
# if __name__ == "__main__":
# asyncio.run(main())