| """ |
| Criação e configuração do agente SQL |
| """ |
| import logging |
| import time |
| import asyncio |
| from typing import Optional, Dict, Any, List |
| from langchain_openai import ChatOpenAI |
| from langchain_anthropic import ChatAnthropic |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain_community.agent_toolkits import create_sql_agent |
| from langchain_community.utilities import SQLDatabase |
| from langchain.callbacks.base import BaseCallbackHandler |
| from langchain.schema import AgentAction, AgentFinish |
|
|
|
|
| from utils.config import ( |
| MAX_ITERATIONS, |
| TEMPERATURE, |
| AVAILABLE_MODELS, |
| OPENAI_MODELS, |
| ANTHROPIC_MODELS, |
| GOOGLE_MODELS |
| ) |
|
|
| class SQLQueryCaptureHandler(BaseCallbackHandler): |
| """ |
| Handler para capturar queries SQL executadas pelo agente |
| """ |
|
|
| def __init__(self): |
| super().__init__() |
| self.sql_queries: List[str] = [] |
| self.agent_actions: List[Dict[str, Any]] = [] |
| self.step_count = 0 |
|
|
| def on_agent_action(self, action: AgentAction, **kwargs) -> None: |
| """ |
| Captura ações do agente, especialmente queries SQL |
| |
| Args: |
| action: Ação do agente |
| """ |
| try: |
| self.step_count += 1 |
| tool_name = action.tool |
| tool_input = action.tool_input |
|
|
| |
| if tool_name == 'sql_db_query' and isinstance(tool_input, dict): |
| sql_query = tool_input.get('query', '') |
| if sql_query and sql_query.strip(): |
| clean_query = sql_query.strip() |
| self.sql_queries.append(clean_query) |
|
|
| |
| logging.info(f"[SQL_HANDLER] 🔍 Query SQL capturada:\n{clean_query}") |
|
|
| |
| self.agent_actions.append({ |
| "step": self.step_count, |
| "tool": tool_name, |
| "input": tool_input, |
| "timestamp": time.time() |
| }) |
|
|
| except Exception as e: |
| logging.error(f"[SQL_HANDLER] Erro ao capturar ação: {e}") |
|
|
| def get_last_sql_query(self) -> Optional[str]: |
| """ |
| Retorna a última query SQL capturada |
| |
| Returns: |
| Última query SQL ou None se não houver |
| """ |
| return self.sql_queries[-1] if self.sql_queries else None |
|
|
| def get_all_sql_queries(self) -> List[str]: |
| """ |
| Retorna todas as queries SQL capturadas |
| |
| Returns: |
| Lista de queries SQL |
| """ |
| return self.sql_queries.copy() |
|
|
| def reset(self): |
| """Reseta o handler para nova execução""" |
| self.sql_queries.clear() |
| self.agent_actions.clear() |
| self.step_count = 0 |
|
|
| async def retry_with_backoff(func, max_retries=3, base_delay=1.0): |
| """ |
| Executa função com retry e backoff exponencial para lidar com rate limiting |
| |
| Args: |
| func: Função a ser executada |
| max_retries: Número máximo de tentativas |
| base_delay: Delay base em segundos |
| |
| Returns: |
| Resultado da função ou levanta exceção após esgotar tentativas |
| """ |
| for attempt in range(max_retries + 1): |
| try: |
| return func() |
| except Exception as e: |
| error_str = str(e) |
|
|
| |
| if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): |
| if attempt < max_retries: |
| delay = base_delay * (2 ** attempt) |
| logging.warning(f"API sobrecarregada (tentativa {attempt + 1}/{max_retries + 1}). Aguardando {delay}s...") |
| await asyncio.sleep(delay) |
| continue |
| else: |
| logging.error(f"API continua sobrecarregada após {max_retries + 1} tentativas") |
| raise Exception(f"API da Anthropic sobrecarregada. Tente novamente em alguns minutos. Erro original: {e}") |
| else: |
| |
| raise e |
|
|
| |
| raise Exception("Número máximo de tentativas excedido") |
|
|
|
|
|
|
| def create_sql_agent_executor(db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None): |
| """ |
| Cria um agente SQL usando LangChain com suporte a diferentes provedores |
| |
| Args: |
| db: Objeto SQLDatabase do LangChain |
| model_name: Nome do modelo a usar (OpenAI, Anthropic) |
| single_table_mode: Se deve restringir a uma única tabela |
| selected_table: Tabela específica para modo único |
| |
| Returns: |
| Agente SQL configurado |
| """ |
| try: |
| |
| if single_table_mode and selected_table: |
| |
| restricted_db = SQLDatabase.from_uri( |
| db._engine.url, |
| include_tables=[selected_table] |
| ) |
| logging.info(f"[SQL_AGENT] Criando agente em modo tabela única: {selected_table}") |
| db_to_use = restricted_db |
| else: |
| |
| logging.info("[SQL_AGENT] Criando agente em modo multi-tabela") |
| db_to_use = db |
|
|
| |
| model_id = AVAILABLE_MODELS.get(model_name, model_name) |
|
|
| |
| if model_id in OPENAI_MODELS: |
| |
| if model_id == "o3-mini": |
| |
| llm = ChatOpenAI(model=model_id) |
| else: |
| |
| llm = ChatOpenAI(model=model_id, temperature=TEMPERATURE) |
|
|
| agent_type = "openai-tools" |
|
|
| elif model_id in ANTHROPIC_MODELS: |
| |
| llm = ChatAnthropic( |
| model=model_id, |
| temperature=TEMPERATURE, |
| max_tokens=4096, |
| max_retries=2, |
| timeout=60.0 |
| ) |
| agent_type = "tool-calling" |
|
|
| elif model_id in GOOGLE_MODELS: |
| |
| llm = ChatGoogleGenerativeAI( |
| model=model_id, |
| temperature=TEMPERATURE, |
| max_tokens=4096, |
| max_retries=2, |
| timeout=60.0 |
| ) |
| agent_type = "tool-calling" |
|
|
| else: |
| |
| llm = ChatOpenAI( |
| model="gpt-4o-mini", |
| temperature=TEMPERATURE |
| ) |
| agent_type = "openai-tools" |
| logging.warning(f"Modelo {model_name} não reconhecido, usando gpt-4o-mini como fallback") |
|
|
| |
| sql_agent = create_sql_agent( |
| llm=llm, |
| db=db_to_use, |
| agent_type=agent_type, |
| verbose=True, |
| max_iterations=MAX_ITERATIONS, |
| return_intermediate_steps=True, |
| top_k=10 |
| ) |
|
|
| logging.info(f"Agente SQL criado com sucesso usando modelo {model_name} ({model_id}) com agent_type={agent_type}") |
| return sql_agent |
|
|
| except Exception as e: |
| logging.error(f"Erro ao criar agente SQL: {e}") |
| raise |
|
|
| class SQLAgentManager: |
| """ |
| Gerenciador do agente SQL com funcionalidades avançadas |
| """ |
|
|
| def __init__(self, db: SQLDatabase, model_name: str = "gpt-4o-mini", single_table_mode: bool = False, selected_table: str = None): |
| self.db = db |
| self.model_name = model_name |
| self.single_table_mode = single_table_mode |
| self.selected_table = selected_table |
| self.agent = None |
| self._initialize_agent() |
|
|
| def _initialize_agent(self): |
| """Inicializa o agente SQL""" |
| self.agent = create_sql_agent_executor(self.db, self.model_name, self.single_table_mode, self.selected_table) |
| |
| def recreate_agent(self, new_db: SQLDatabase = None, new_model: str = None, single_table_mode: bool = None, selected_table: str = None): |
| """ |
| Recria o agente com novos parâmetros |
| |
| Args: |
| new_db: Novo banco de dados (opcional) |
| new_model: Novo modelo (opcional) |
| single_table_mode: Novo modo de tabela (opcional) |
| selected_table: Nova tabela selecionada (opcional) |
| """ |
| if new_db: |
| self.db = new_db |
| if new_model: |
| self.model_name = new_model |
| if single_table_mode is not None: |
| self.single_table_mode = single_table_mode |
| if selected_table is not None: |
| self.selected_table = selected_table |
|
|
| self._initialize_agent() |
| mode_info = f"modo {'tabela única' if self.single_table_mode else 'multi-tabela'}" |
| logging.info(f"Agente SQL recriado com modelo {self.model_name} em {mode_info}") |
| |
| def _extract_text_from_claude_response(self, output) -> str: |
| """ |
| Extrai texto limpo da resposta do Claude que pode vir em formato complexo |
| |
| Args: |
| output: Resposta do agente (pode ser string, lista ou dict) |
| |
| Returns: |
| String limpa com o texto da resposta |
| """ |
| try: |
| |
| if isinstance(output, str): |
| return output |
|
|
| |
| if isinstance(output, list): |
| text_parts = [] |
| for item in output: |
| if isinstance(item, dict) and 'text' in item: |
| text_parts.append(item['text']) |
| elif isinstance(item, str): |
| text_parts.append(item) |
|
|
| if text_parts: |
| return '\n'.join(text_parts) |
|
|
| |
| if isinstance(output, dict): |
| if 'text' in output: |
| return output['text'] |
| elif 'content' in output: |
| return str(output['content']) |
|
|
| |
| return str(output) |
|
|
| except Exception as e: |
| logging.warning(f"Erro ao extrair texto da resposta: {e}") |
| return str(output) |
|
|
| async def execute_query(self, instruction: str) -> dict: |
| """ |
| Executa uma query através do agente SQL com retry para rate limiting |
| |
| Args: |
| instruction: Instrução para o agente |
| |
| Returns: |
| Resultado da execução |
| """ |
| try: |
| logging.info("------- Agent SQL: Executando query -------") |
|
|
| |
| sql_handler = SQLQueryCaptureHandler() |
|
|
| |
| model_id = getattr(self, 'model_name', '') |
| is_claude = any(claude_model in model_id for claude_model in ANTHROPIC_MODELS) |
| is_gemini = any(gemini_model in model_id for gemini_model in GOOGLE_MODELS) |
|
|
| if is_claude or is_gemini: |
| |
| response = await retry_with_backoff( |
| lambda: self.agent.invoke( |
| {"input": instruction}, |
| {"callbacks": [sql_handler]} |
| ), |
| max_retries=3, |
| base_delay=2.0 |
| ) |
| else: |
| |
| response = self.agent.invoke( |
| {"input": instruction}, |
| {"callbacks": [sql_handler]} |
| ) |
|
|
| |
| raw_output = response.get("output", "Erro ao obter a resposta do agente.") |
| clean_output = self._extract_text_from_claude_response(raw_output) |
|
|
| |
| sql_query = sql_handler.get_last_sql_query() |
|
|
| result = { |
| "output": clean_output, |
| "intermediate_steps": response.get("intermediate_steps", []), |
| "success": True, |
| "sql_query": sql_query, |
| "all_sql_queries": sql_handler.get_all_sql_queries() |
| } |
|
|
| logging.info(f"Query executada com sucesso: {result['output'][:100]}...") |
| return result |
|
|
| except Exception as e: |
| error_str = str(e) |
|
|
| |
| if any(keyword in error_str.lower() for keyword in ['overloaded', 'rate_limit', 'too_many_requests', 'quota']): |
| error_msg = ( |
| "🚫 **API da Anthropic temporariamente sobrecarregada**\n\n" |
| "A API do Claude está com muitas solicitações no momento. " |
| "Por favor, aguarde alguns minutos e tente novamente.\n\n" |
| "**Sugestões:**\n" |
| "- Aguarde 2-3 minutos antes de tentar novamente\n" |
| "- Considere usar um modelo OpenAI temporariamente\n" |
| "- Tente novamente em horários de menor movimento\n\n" |
| f"*Erro técnico: {e}*" |
| ) |
| else: |
| error_msg = f"Erro ao consultar o agente SQL: {e}" |
|
|
| logging.error(error_msg) |
| return { |
| "output": error_msg, |
| "intermediate_steps": [], |
| "success": False |
| } |
|
|
| def get_agent_info(self) -> dict: |
| """ |
| Retorna informações sobre o agente atual |
| |
| Returns: |
| Dicionário com informações do agente |
| """ |
| return { |
| "model_name": self.model_name, |
| "max_iterations": MAX_ITERATIONS, |
| "temperature": TEMPERATURE, |
| "database_tables": self.db.get_usable_table_names() if self.db else [], |
| "agent_type": "openai-tools" |
| } |
| |
| def validate_agent(self) -> bool: |
| """ |
| Valida se o agente está funcionando corretamente |
| |
| Returns: |
| True se válido, False caso contrário |
| """ |
| try: |
| |
| test_result = self.agent.invoke({ |
| "input": "Quantas linhas existem na tabela?" |
| }) |
| |
| success = "output" in test_result and test_result["output"] |
| logging.info(f"Validação do agente: {'Sucesso' if success else 'Falha'}") |
| return success |
| |
| except Exception as e: |
| logging.error(f"Erro na validação do agente: {e}") |
| return False |
|
|
| def get_default_sql_agent(db: SQLDatabase) -> SQLAgentManager: |
| """ |
| Cria um agente SQL com configurações padrão |
| |
| Args: |
| db: Objeto SQLDatabase |
| |
| Returns: |
| SQLAgentManager configurado |
| """ |
| return SQLAgentManager(db) |
|
|