from llama_index.llms.google_genai import GoogleGenAI from llama_index.llms.gemini import Gemini from llama_index.tools.arxiv import ArxivToolSpec from llama_index.tools.wikipedia import WikipediaToolSpec from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec from llama_index.core.tools import FunctionTool from llama_index.core.agent.workflow import AgentWorkflow, ReActAgent from llama_index.llms.lmstudio import LMStudio from llama_index.core.agent.workflow import ( AgentStream, AgentOutput ) from gradio import ChatMessage from llama_index.core.base.llms.types import ChatMessage as llama_index_chat_message from tools import interpret_python_math_code, image_understanding, convert_audio_to_text, video_understanding, read_csv_file, read_xlsx_file from gaia_system_prompt import GAIA_SYSTEM_PROMPT, CUSTOM_SYSTEM_PROMPT import os import asyncio TIMEOUT=180 # Timeout for agent execution in seconds GEMINI_API_KEY = os.getenv("GEMINI_TOKEN") GEMINI_OPENAI_API_DIR = "https://generativelanguage.googleapis.com/v1beta/openai/" GEMINI_MODEL_NAME = "gemini-2.0-flash" LMSTUDIO_MODEL_NAME = "gemma-3-12B-it-qat-GGUF" API_DIR = "http://host.docker.internal:1234/v1" # LM Studio API URL class FinalAgent: def __init__(self): # LLM Initialization # self.llm = GoogleGenAI(model=GEMINI_MODEL_NAME, api_key=GEMINI_API_KEY) self.llm = Gemini(model=GEMINI_MODEL_NAME, api_key=GEMINI_API_KEY) # self.llm = LMStudio(model_name=LMSTUDIO_MODEL_NAME, base_url=API_DIR, request_timeout=180, temperature=0.1) # Tool Initialization self.tools = [ FunctionTool.from_defaults( fn=interpret_python_math_code, name="InterpretPythonMathCode", description="Interprets Python code for mathematical expressions." ), FunctionTool.from_defaults( fn=image_understanding, name="ImageUnderstanding", description="Analyzes an image and generates a response to a given question based on the image's content." ), FunctionTool.from_defaults( fn=convert_audio_to_text, name="ConvertAudioToText", description="Converts audio files to text using a speech-to-text model." ), FunctionTool.from_defaults( fn=video_understanding, name="VideoUnderstanding", description="Analyzes a video and generates a response to a given question based on the video's content." ), FunctionTool.from_defaults( fn=read_csv_file, name="ReadCSVFile", description="Reads a CSV file and returns its content as a string." ), FunctionTool.from_defaults( fn=read_xlsx_file, name="ReadXLSXFile", description="Reads an XLSX file and returns its content as a string." ) ] self.tools.extend( ArxivToolSpec().to_tool_list() ) self.tools.extend( WikipediaToolSpec().to_tool_list() ) self.tools.extend( DuckDuckGoSearchToolSpec().to_tool_list() ) # Print the tools for debugging print("Tools initialized:") for tool in self.tools: print(f"- {tool._metadata}") # Agent Workflow Initialization self.agent = AgentWorkflow.from_tools_or_functions( tools_or_functions=self.tools, llm=self.llm, system_prompt=CUSTOM_SYSTEM_PROMPT, timeout=TIMEOUT ) # self.agent = ReActAgent( # llm=self.llm, # verbose=True, # max_iterations=5, # system_prompt=CUSTOM_SYSTEM_PROMPT, # tools=self.tools # ) print("FinalAgent initialized.") # async def __call__(self, question: str) -> str: # # Example # print(f"Agent received question: {question}") # # fixed_answer = "This is a default answer." # # print(f"Agent returning fixed answer: {fixed_answer}") # # response = fixed_answer # # Implement agent logic here # response = "" # # Run the agent with the question # stream = await self.agent.run(question) # response = stream.response.content # # async for event in stream.stream_events(): # # if isinstance(event, AgentStream): # # # Check if delta is empty # # if event.raw["choices"][0]["delta"] != {}: # # response += event.raw["choices"][0]["delta"]["content"] # print(f"Agent response: {response}") # return response async def __call__(self, question: str) -> str: print(f"Agent received question: {question}") response_str = "" try: # Use arun for an async method. agent_chat_response = await self.agent.run(question) print(agent_chat_response) potential_response_obj = agent_chat_response.response if isinstance(potential_response_obj, ChatMessage): # If it's a ChatMessage, its .content attribute should hold the string print(f"DEBUG: Response object is ChatMessage. Role: {potential_response_obj.role}") response_str = potential_response_obj.content if response_str is None: # Handle cases where content might be None print("DEBUG: ChatMessage content is None, defaulting to empty string.") response_str = "" elif isinstance(potential_response_obj, str): # If it's already a string print("DEBUG: Response object is str.") response_str = potential_response_obj elif isinstance(potential_response_obj, llama_index_chat_message): # If it's a llama_index ChatMessage, use its .content attribute print(f"DEBUG: Response object is llama_index ChatMessage. Role: {potential_response_obj.role}") response_str = potential_response_obj.content if response_str is None: print("DEBUG: llama_index ChatMessage content is None, defaulting to empty string.") response_str = "" else: # Fallback if it's some other type print(f"Warning: Agent response was of unexpected type: {type(potential_response_obj)}. Converting to string.") response_str = str(potential_response_obj) except Exception as e: print(f"Error during agent execution with LLM {self.llm.__class__.__name__}: {e}") # Depending on requirements, you might want to return an error message or re-raise response_str = f"Agent error: {e}" # Get the agent's final response between and tags if "" in response_str and "" in response_str: start_index = response_str.index("") + len("") end_index = response_str.index("") response_str = response_str[start_index:end_index].strip() else: print("Warning: No tags found in the response.") return response_str # async def main(): # # Example usage # agent = FinalAgent() # question = "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia." # answer = await agent(question) # print(f"Final answer: {answer}") # if __name__ == "__main__": # asyncio.run(main())