| """ |
| Gerenciador de objetos não-serializáveis para LangGraph |
| """ |
| import uuid |
| from typing import Dict, Any, Optional |
| import logging |
|
|
| class ObjectManager: |
| """ |
| Gerencia objetos não-serializáveis que não podem ser incluídos no estado do LangGraph |
| """ |
| |
| def __init__(self): |
| self._objects: Dict[str, Any] = {} |
| self._sql_agents: Dict[str, Any] = {} |
| self._processing_agents: Dict[str, Any] = {} |
| self._engines: Dict[str, Any] = {} |
| self._databases: Dict[str, Any] = {} |
| self._cache_managers: Dict[str, Any] = {} |
| |
| self._agent_db_mapping: Dict[str, str] = {} |
| |
| self._connection_metadata: Dict[str, Dict[str, Any]] = {} |
| |
| def store_sql_agent(self, agent: Any, db_id: str = None) -> str: |
| """Armazena agente SQL e retorna ID""" |
| agent_id = str(uuid.uuid4()) |
| self._sql_agents[agent_id] = agent |
|
|
| |
| if db_id: |
| self._agent_db_mapping[agent_id] = db_id |
|
|
| logging.info(f"Agente SQL armazenado com ID: {agent_id}") |
| return agent_id |
| |
| def get_sql_agent(self, agent_id: str) -> Optional[Any]: |
| """Recupera agente SQL pelo ID""" |
| return self._sql_agents.get(agent_id) |
|
|
| def store_processing_agent(self, agent: Any) -> str: |
| """Armazena Processing Agent e retorna ID""" |
| agent_id = str(uuid.uuid4()) |
| self._processing_agents[agent_id] = agent |
| logging.info(f"Processing Agent armazenado com ID: {agent_id}") |
| return agent_id |
|
|
| def get_processing_agent(self, agent_id: str) -> Optional[Any]: |
| """Recupera Processing Agent pelo ID""" |
| return self._processing_agents.get(agent_id) |
| |
| def store_engine(self, engine: Any) -> str: |
| """Armazena engine e retorna ID""" |
| engine_id = str(uuid.uuid4()) |
| self._engines[engine_id] = engine |
| logging.info(f"Engine armazenada com ID: {engine_id}") |
| return engine_id |
| |
| def get_engine(self, engine_id: str) -> Optional[Any]: |
| """Recupera engine pelo ID""" |
| return self._engines.get(engine_id) |
|
|
| def store_database(self, database: Any) -> str: |
| """Armazena banco de dados e retorna ID""" |
| db_id = str(uuid.uuid4()) |
| self._databases[db_id] = database |
| logging.info(f"Banco de dados armazenado com ID: {db_id}") |
| return db_id |
|
|
| def get_database(self, db_id: str) -> Optional[Any]: |
| """Recupera banco de dados pelo ID""" |
| return self._databases.get(db_id) |
|
|
| def get_db_id_for_agent(self, agent_id: str) -> Optional[str]: |
| """Recupera ID do banco associado ao agente""" |
| return self._agent_db_mapping.get(agent_id) |
| |
| def store_cache_manager(self, cache_manager: Any) -> str: |
| """Armazena cache manager e retorna ID""" |
| cache_id = str(uuid.uuid4()) |
| self._cache_managers[cache_id] = cache_manager |
| logging.info(f"Cache manager armazenado com ID: {cache_id}") |
| return cache_id |
| |
| def get_cache_manager(self, cache_id: str) -> Optional[Any]: |
| """Recupera cache manager pelo ID""" |
| return self._cache_managers.get(cache_id) |
| |
| def store_object(self, obj: Any, category: str = "general") -> str: |
| """Armazena objeto genérico e retorna ID""" |
| obj_id = str(uuid.uuid4()) |
| self._objects[obj_id] = {"object": obj, "category": category} |
| logging.info(f"Objeto {category} armazenado com ID: {obj_id}") |
| return obj_id |
| |
| def get_object(self, obj_id: str) -> Optional[Any]: |
| """Recupera objeto pelo ID""" |
| obj_data = self._objects.get(obj_id) |
| return obj_data["object"] if obj_data else None |
| |
| def update_sql_agent(self, agent_id: str, new_agent: Any) -> bool: |
| """Atualiza agente SQL existente""" |
| if agent_id in self._sql_agents: |
| self._sql_agents[agent_id] = new_agent |
| logging.info(f"Agente SQL atualizado: {agent_id}") |
| return True |
| return False |
| |
| def update_engine(self, engine_id: str, new_engine: Any) -> bool: |
| """Atualiza engine existente""" |
| if engine_id in self._engines: |
| self._engines[engine_id] = new_engine |
| logging.info(f"Engine atualizada: {engine_id}") |
| return True |
| return False |
| |
| def update_cache_manager(self, cache_id: str, new_cache_manager: Any) -> bool: |
| """Atualiza cache manager existente""" |
| if cache_id in self._cache_managers: |
| self._cache_managers[cache_id] = new_cache_manager |
| logging.info(f"Cache manager atualizado: {cache_id}") |
| return True |
| return False |
| |
| def clear_all(self): |
| """Limpa todos os objetos armazenados""" |
| self._objects.clear() |
| self._sql_agents.clear() |
| self._engines.clear() |
| self._databases.clear() |
| self._cache_managers.clear() |
| self._agent_db_mapping.clear() |
| self._connection_metadata.clear() |
| logging.info("Todos os objetos foram limpos do gerenciador") |
|
|
| def store_connection_metadata(self, connection_id: str, metadata: Dict[str, Any]) -> str: |
| """ |
| Armazena metadados de conexão |
| |
| Args: |
| connection_id: ID da conexão |
| metadata: Metadados da conexão |
| |
| Returns: |
| ID dos metadados armazenados |
| """ |
| self._connection_metadata[connection_id] = metadata |
| logging.info(f"Metadados de conexão armazenados com ID: {connection_id}") |
| return connection_id |
|
|
| def get_connection_metadata(self, connection_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Recupera metadados de conexão pelo ID |
| |
| Args: |
| connection_id: ID da conexão |
| |
| Returns: |
| Metadados da conexão ou None se não encontrado |
| """ |
| return self._connection_metadata.get(connection_id) |
|
|
| def get_all_connection_metadata(self) -> Dict[str, Dict[str, Any]]: |
| """ |
| Retorna todos os metadados de conexão |
| |
| Returns: |
| Dicionário com todos os metadados |
| """ |
| return self._connection_metadata.copy() |
|
|
| def get_stats(self) -> Dict[str, int]: |
| """Retorna estatísticas dos objetos armazenados""" |
| return { |
| "sql_agents": len(self._sql_agents), |
| "engines": len(self._engines), |
| "databases": len(self._databases), |
| "cache_managers": len(self._cache_managers), |
| "general_objects": len(self._objects), |
| "agent_db_mappings": len(self._agent_db_mapping), |
| "connection_metadata": len(self._connection_metadata) |
| } |
|
|
| |
| _object_manager: Optional[ObjectManager] = None |
|
|
| def get_object_manager() -> ObjectManager: |
| """Retorna instância singleton do gerenciador de objetos""" |
| global _object_manager |
| if _object_manager is None: |
| _object_manager = ObjectManager() |
| return _object_manager |
|
|
| def reset_object_manager(): |
| """Reseta o gerenciador de objetos""" |
| global _object_manager |
| if _object_manager: |
| _object_manager.clear_all() |
| _object_manager = ObjectManager() |
|
|