| """ |
| Intelligent AI Agent using LlamaIndex with websearch capabilities |
| This module contains the agent class with advanced tools and reasoning. |
| """ |
|
|
| import os |
| import asyncio |
| import io |
| import contextlib |
| import ast |
| import traceback |
| from typing import Any, Dict, Tuple, List |
|
|
| |
| try: |
| from dotenv import load_dotenv |
| load_dotenv() |
| print("✅ .env file loaded successfully") |
| except ImportError: |
| print("⚠️ python-dotenv not available, .env file not loaded") |
| except Exception as e: |
| print(f"⚠️ Error loading .env file: {e}") |
|
|
| |
| try: |
| from llama_index.core.agent.workflow import ( |
| ToolCall, |
| ToolCallResult, |
| FunctionAgent, |
| AgentStream, |
| ) |
| |
| from llama_index.core.tools import FunctionTool |
| from llama_index.tools.wikipedia import WikipediaToolSpec |
| from llama_index.tools.tavily_research.base import TavilyToolSpec |
| |
| from llama_index.llms.bedrock_converse import BedrockConverse |
| |
| LLAMA_INDEX_AVAILABLE = True |
| except ImportError as e: |
| print(f"LlamaIndex imports not available: {e}") |
| LLAMA_INDEX_AVAILABLE = False |
|
|
| |
|
|
| class BasicAgent: |
| """ |
| Advanced AI Agent using LlamaIndex with CodeAct capabilities and multiple tools. |
| """ |
| |
| def __init__(self): |
| """Initialize the agent with LLM, tools, and code executor.""" |
| print("Initializing Advanced AI Agent with LlamaIndex...") |
| |
| |
| self.hf_token = os.getenv("HUGGINGFACE_TOKEN") |
| if not self.hf_token: |
| print("Warning: HUGGINGFACE_TOKEN not found. Using default model.") |
| |
| |
| self._initialize_llm() |
| |
| |
| self._initialize_tools() |
| |
| |
| |
| |
| |
| self._initialize_agent() |
| |
| print("Advanced AI Agent initialized successfully.") |
| |
| def _initialize_llm(self): |
| """Initialize the Hugging Face LLM.""" |
| if not LLAMA_INDEX_AVAILABLE: |
| print("LlamaIndex not available, using basic mode") |
| self.llm = None |
| return |
| |
| try: |
| |
| |
| self.llm = BedrockConverse( |
| model="amazon.nova-pro-v1:0", |
| temperature=0.5, |
| aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), |
| aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), |
| region_name=os.getenv("AWS_REGION"), |
|
|
| ) |
| |
| print("✅ LLM initialized successfully") |
| except Exception as e: |
| print(f"Error initializing LLM: {e}") |
| |
| self.llm = None |
| |
| def _initialize_tools(self): |
| """Initialize all available tools.""" |
| self.tools = [] |
| |
| |
| self.math_functions = { |
| 'add': lambda a, b: a + b, |
| 'subtract': lambda a, b: a - b, |
| 'multiply': lambda a, b: a * b, |
| 'divide': lambda a, b: a / b if b != 0 else "Error: Division by zero", |
| 'power': lambda a, b: a ** b, |
| 'percentage': lambda v, p: (v * p) / 100, |
| } |
| |
| if not LLAMA_INDEX_AVAILABLE: |
| print("Tools initialization skipped - LlamaIndex not available") |
| return |
| |
| |
| def add_numbers(a: float, b: float) -> float: |
| """Add two numbers together.""" |
| return a + b |
| |
| def subtract_numbers(a: float, b: float) -> float: |
| """Subtract second number from first number.""" |
| return a - b |
| |
| def multiply_numbers(a: float, b: float) -> float: |
| """Multiply two numbers.""" |
| return a * b |
| |
| def divide_numbers(a: float, b: float) -> float: |
| """Divide first number by second number.""" |
| if b == 0: |
| return "Error: Division by zero" |
| return a / b |
| |
| def power_numbers(a: float, b: float) -> float: |
| """Raise first number to the power of second number.""" |
| return a ** b |
| |
| def calculate_percentage(value: float, percentage: float) -> float: |
| """Calculate percentage of a value.""" |
| return (value * percentage) / 100 |
|
|
| def get_modulus(a: float, b: float) -> float: |
| """Get the modulus of two numbers.""" |
| return a % b |
| |
| |
| try: |
| math_tools = [ |
| FunctionTool.from_defaults(fn=add_numbers, name="add_numbers", description="Add two numbers together"), |
| FunctionTool.from_defaults(fn=subtract_numbers, name="subtract_numbers", description="Subtract second number from first number"), |
| FunctionTool.from_defaults(fn=multiply_numbers, name="multiply_numbers", description="Multiply two numbers"), |
| FunctionTool.from_defaults(fn=divide_numbers, name="divide_numbers", description="Divide first number by second number"), |
| FunctionTool.from_defaults(fn=power_numbers, name="power_numbers", description="Raise first number to the power of second number"), |
| FunctionTool.from_defaults(fn=calculate_percentage, name="calculate_percentage", description="Calculate percentage of a value"), |
| FunctionTool.from_defaults(fn=get_modulus, name="get_modulus", description="Get the modulus of two numbers"), |
| ] |
| self.tools.extend(math_tools) |
| print("✅ Math tools initialized") |
| except Exception as e: |
| print(f"Warning: Could not initialize math tools: {e}") |
| |
| |
| try: |
| |
| search_spec = TavilyToolSpec( |
| api_key=os.getenv("TAVILY_API_KEY"), |
| ) |
| search_tool = search_spec.to_tool_list() |
| self.tools.extend(search_tool) |
| print("✅ DuckDuckGo search tool initialized") |
| except Exception as e: |
| print(f"Warning: Could not initialize DuckDuckGo tool: {e}") |
| |
| try: |
| |
| wiki_spec = WikipediaToolSpec() |
| wiki_tools = FunctionTool.from_defaults(wiki_spec.wikipedia_search, name="wikipedia_search", description="Search Wikipedia for information") |
| self.tools.extend(wiki_tools) |
| print("✅ Wikipedia tool initialized") |
| except Exception as e: |
| print(f"Warning: Could not initialize Wikipedia tool: {e}") |
| |
| """ try: |
| # Web requests tool |
| requests_spec = RequestsToolSpec() |
| requests_tools = requests_spec.to_tool_list() |
| self.tools.extend(requests_tools) |
| print("✅ Web requests tool initialized") |
| except Exception as e: |
| print(f"Warning: Could not initialize requests tool: {e}") """ |
| |
| print(f"✅ Total {len(self.tools)} tools initialized") |
| |
| |
| def _initialize_agent(self): |
| """Initialize the CodeAct Agent (deferred initialization).""" |
| if not self.llm: |
| print("Warning: No LLM available, using basic mode") |
| self.agent = None |
| self.context = None |
| return |
| |
| |
| self._agent_params = { |
| |
| 'llm': self.llm, |
| 'tools': self.tools |
| } |
| self.agent = None |
| self.context = None |
| print("✅ CodeAct Agent parameters prepared (deferred initialization)") |
| |
| def _ensure_agent_initialized(self): |
| """Ensure the CodeAct agent is initialized when needed.""" |
| if self.agent is None and hasattr(self, '_agent_params'): |
| try: |
| |
| if hasattr(self, 'context') and self.context: |
| try: |
| |
| self.context = None |
| except: |
| pass |
| |
| |
| |
| |
| enhanced_prompt = f""" |
| You are an intelligent AI assistant equipped with powerful tools to help solve problems. You must think step-by-step and use the available tools when needed. |
| |
| ## AVAILABLE TOOLS: |
| You have access to the following tools - use them strategically: |
| |
| ### Mathematical Tools: |
| - add_numbers(a, b): Add two numbers together |
| - subtract_numbers(a, b): Subtract second number from first number |
| - multiply_numbers(a, b): Multiply two numbers |
| - divide_numbers(a, b): Divide first number by second number |
| - power_numbers(a, b): Raise first number to the power of second number |
| - calculate_percentage(value, percentage): Calculate percentage of a value |
| - get_modulus(a, b): Get the modulus of two numbers |
| |
| ### Research Tools: |
| - tavily_search(query): Search the web for current information |
| - wikipedia_search(query): Search Wikipedia for factual information |
| |
| ## INSTRUCTIONS: |
| 1. **Read the question carefully** and identify what type of answer is needed |
| 2. **Think step-by-step** - break down complex problems into smaller parts |
| 3. **Use tools when necessary** - don't try to guess calculations or current information |
| 4. **For mathematical problems**: Use the math tools for accuracy |
| 5. **For factual questions**: Use search tools to get current/accurate information |
| 6. **Show your reasoning** - explain your thought process before the final answer |
| |
| ## ANSWER FORMAT: |
| Always end your response with exactly this format: |
| FINAL ANSWER: [YOUR ANSWER] |
| |
| ## FORMATTING RULES FOR FINAL ANSWER: |
| - **Numbers**: Write plain numbers without commas, units, or symbols (unless specifically asked) |
| - **Text**: Use simple words, no articles (a, an, the), no abbreviations |
| - **Lists**: Comma-separated, following the above rules for each element |
| - **Calculations**: Show the result as a plain number |
| |
| ## EXAMPLES: |
| Question: "What is 25% of 200?" |
| Reasoning: I need to calculate 25% of 200 using the percentage tool. |
| [Use calculate_percentage(200, 25)] |
| FINAL ANSWER: 50 |
| |
| Question: "What is the capital of France?" |
| Reasoning: This is a factual question about geography. |
| [Use wikipedia_search("France capital")] |
| FINAL ANSWER: Paris |
| |
| Remember: Think carefully, use tools when helpful, and always provide your final answer in the specified format. |
| """ |
| |
| self.agent = FunctionAgent( |
| tools=self.tools, |
| llm=self.llm, |
| system_prompt=enhanced_prompt, |
| ) |
| print("✅ CodeAct Agent initialized (deferred)") |
| |
| except Exception as e: |
| print(f"Error in deferred agent initialization: {e}") |
| print("Continuing with fallback mode...") |
| return False |
| return self.agent is not None |
| |
| async def __call__(self, question: str) -> str: |
| """ |
| Main method that processes a question and returns an answer. |
| """ |
| print(f"Agent received question (first 100 chars): {question[:100]}...") |
| |
| |
| self._ensure_agent_initialized() |
| |
| |
| if self.agent: |
| try: |
| |
| response = await self._async_agent_run(question) |
| return response |
| except Exception as e: |
| print(f"Error with agent: {e}") |
| return f"FINAL ANSWER: Error processing question - {str(e)}" |
| else: |
| return "FINAL ANSWER: Agent not properly initialized" |
| |
| |
| async def _async_agent_run(self, question: str) -> str: |
| """Run the agent asynchronously.""" |
| try: |
| |
| |
| print("Agent running...") |
| print(self.agent) |
| handler = self.agent.run(question) |
| |
| iterationsNumber = 0 |
| async for event in handler.stream_events(): |
| iterationsNumber += 1 |
| |
| |
| |
| |
| if isinstance(event, ToolCall): |
| print(f"\n-----------\nevent:\n{event}") |
| elif isinstance(event, AgentStream): |
| print(f"{event.delta}", end="", flush=True) |
| """ if iterationsNumber > 5: |
| print("Too many iterations, stopping...") |
| break """ |
| response = await handler |
| print(f'response.response: {response.response.content}') |
| return response.response.content.split("FINAL ANSWER: ")[1] |
| except Exception as e: |
| print(f"Async agent error: {e}") |
| return f"FINAL ANSWER: Error in agent processing - {str(e)}" |
| |
| |