import os import numpy as np import pandas as pd import torch from sentence_transformers import SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity from .utils import ( bonus_lexical, inferir_categoria_consulta, limpar_texto, mapear_categoria, ) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) DATA_DIR = os.path.join(BASE_DIR, "data") LOGS_DIR = os.path.join(BASE_DIR, "logs") PATH_PRODUCTS = os.path.join(DATA_DIR, "produtos_finetunado.csv") PATH_EMBEDDINGS = os.path.join(DATA_DIR, "embeddings_produtos_finetunado.npy") PATH_NEGATIVE_MEMORY = os.path.join(LOGS_DIR, "negative_memory.csv") MODEL_NAME = os.getenv("HF_MODEL_REPO", "Ana2012/bertimbau-buscador").strip() HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip() class SearchEngine: def __init__(self): self.device = "cuda" if torch.cuda.is_available() else "cpu" self.model = None self.df_produtos = None self.emb_produtos = None self.df_negative_memory = pd.DataFrame() self.negative_memory_mtime = None def load(self): self._load_products() self._load_model() self._load_embeddings() self._refresh_negative_memory(force=True) def _load_products(self): df = pd.read_csv(PATH_PRODUCTS) df.columns = df.columns.str.strip().str.lower() df["product_name"] = df["product_name"].fillna("").astype(str) df["description"] = df["description"].fillna("").astype(str) df["categoria_principal"] = df["categoria_principal"].fillna("").astype(str) df["category_names_text"] = df["category_names_text"].fillna("").astype(str) df["region"] = df["region"].fillna("").astype(str) df["neighborhood"] = df["neighborhood"].fillna("").astype(str) df["product_name_limpo"] = df["product_name"].apply(limpar_texto) df["description_limpa"] = df["description"].apply(limpar_texto) df["categoria_principal_limpa"] = df["categoria_principal"].apply(limpar_texto) df["category_names_text_limpo"] = df["category_names_text"].apply(limpar_texto) df["region_limpa"] = df["region"].apply(limpar_texto) df["neighborhood_limpo"] = df["neighborhood"].apply(limpar_texto) df["texto_busca_reforcado"] = ( "produto " + df["product_name_limpo"] + " " + "categoria " + df["categoria_principal_limpa"] + " " + "categorias " + df["category_names_text_limpo"] + " " + "bairro " + df["neighborhood_limpo"] + " " + "regiao " + df["region_limpa"] + " " + "descricao " + df["description_limpa"] ).str.strip() df["categoria_grupo"] = df["categoria_principal"].apply(mapear_categoria) self.df_produtos = df def _load_model(self): kwargs = {"device": self.device} if HF_API_TOKEN: kwargs["token"] = HF_API_TOKEN # Usa o mesmo pipeline validado localmente com SentenceTransformer. self.model = SentenceTransformer(MODEL_NAME, **kwargs) def _load_embeddings(self): self.emb_produtos = np.load(PATH_EMBEDDINGS) # Se estes embeddings .npy foram gerados com outro pipeline # (por exemplo, AutoModel + mean pooling manual), os scores podem ficar inconsistentes. # Nesse caso, regenere os embeddings dos produtos com o mesmo SentenceTransformer. if self.emb_produtos.ndim != 2: raise RuntimeError("O arquivo de embeddings precisa conter uma matriz 2D.") def runtime_info(self): return { "model_repo": MODEL_NAME, "device": self.device, "products_loaded": 0 if self.df_produtos is None else int(len(self.df_produtos)), "embeddings_loaded": 0 if self.emb_produtos is None else int(len(self.emb_produtos)), "embedding_dim": 0 if self.emb_produtos is None else int(self.emb_produtos.shape[1]), } def _refresh_negative_memory(self, force=False): if not os.path.exists(PATH_NEGATIVE_MEMORY): self.df_negative_memory = pd.DataFrame() self.negative_memory_mtime = None return current_mtime = os.path.getmtime(PATH_NEGATIVE_MEMORY) if not force and self.negative_memory_mtime == current_mtime: return df = pd.read_csv(PATH_NEGATIVE_MEMORY) df.columns = df.columns.str.strip().str.lower() for col in ["query", "product_id", "product_name", "motivo", "rating"]: if col not in df.columns: df[col] = "" df["query"] = df["query"].fillna("").astype(str) df["query_limpa"] = df["query"].apply(limpar_texto) df["product_id"] = df["product_id"].fillna("").astype(str) df["product_name"] = df["product_name"].fillna("").astype(str) df["motivo"] = df["motivo"].fillna("").astype(str) df["rating_num"] = pd.to_numeric(df["rating"], errors="coerce") self.df_negative_memory = df self.negative_memory_mtime = current_mtime def _similaridade_consulta(self, query_atual, query_memoria): if not query_atual or not query_memoria: return 0.0 if query_atual == query_memoria: return 1.0 termos_atuais = set(query_atual.split()) termos_memoria = set(query_memoria.split()) if not termos_atuais or not termos_memoria: return 0.0 intersecao = len(termos_atuais & termos_memoria) if intersecao == 0: return 0.0 return intersecao / max(len(termos_atuais), len(termos_memoria)) def _calcular_penalidade_feedback(self, query_text, df_filtrado): self._refresh_negative_memory() if self.df_negative_memory.empty: return np.zeros(len(df_filtrado)) query_limpa = limpar_texto(query_text) memorias = self.df_negative_memory[ self.df_negative_memory["product_id"].isin(df_filtrado["product_id"].astype(str)) ] if memorias.empty: return np.zeros(len(df_filtrado)) penalidades = {} for _, memoria in memorias.iterrows(): similaridade = self._similaridade_consulta(query_limpa, memoria["query_limpa"]) if similaridade <= 0: continue penalidade = 0.08 + (0.12 * similaridade) if memoria["motivo"] == "nao_foi_util": penalidade += 0.04 if pd.notna(memoria["rating_num"]) and memoria["rating_num"] <= 2: penalidade += 0.04 product_id = memoria["product_id"] penalidades[product_id] = min(penalidades.get(product_id, 0.0) + penalidade, 0.45) return df_filtrado["product_id"].astype(str).map(lambda x: penalidades.get(x, 0.0)).values def gerar_embedding_unico(self, texto): embedding = self.model.encode( texto, convert_to_numpy=True, normalize_embeddings=False, show_progress_bar=False, ) return np.asarray(embedding, dtype=np.float32) def buscar(self, query_text, top_k=5): query_limpa = limpar_texto(query_text) categoria = inferir_categoria_consulta(query_limpa) if categoria is not None: mask = self.df_produtos["categoria_grupo"] == categoria df_filtrado = self.df_produtos[mask].copy() idx_filtrado = df_filtrado.index.tolist() else: df_filtrado = self.df_produtos.copy() idx_filtrado = df_filtrado.index.tolist() if len(df_filtrado) == 0: df_filtrado = self.df_produtos.copy() idx_filtrado = df_filtrado.index.tolist() emb_query = self.gerar_embedding_unico(query_text).reshape(1, -1) emb_base = self.emb_produtos[idx_filtrado] if emb_base.shape[1] != emb_query.shape[1]: raise RuntimeError( "Dimensao incompatível entre os embeddings salvos e o embedding da consulta. " "Regenere o arquivo .npy com o mesmo modelo SentenceTransformer." ) sims = cosine_similarity(emb_query, emb_base)[0] bonus = df_filtrado.apply( lambda row: bonus_lexical( query_text, row["product_name"], row["categoria_principal"], row["neighborhood"], row["region"], row["description"], row["texto_busca_reforcado"], ), axis=1, ).values penalidade_feedback = self._calcular_penalidade_feedback(query_text, df_filtrado) score_final = sims + bonus - penalidade_feedback top_idx_local = np.argsort(score_final)[::-1][:top_k] resultados = [] for rank, idx_local in enumerate(top_idx_local, start=1): idx_global = idx_filtrado[idx_local] prod = self.df_produtos.iloc[idx_global] resultados.append({ "rank": rank, "establishment_id": str(prod["establishment_id"]), "product_id": str(prod["product_id"]), "product_name": prod["product_name"], "categoria_principal": prod["categoria_principal"], "categoria_grupo": prod["categoria_grupo"], "region": prod["region"], "neighborhood": prod["neighborhood"], "score_semantico": float(sims[idx_local]), "bonus_lexical": float(bonus[idx_local]), "penalidade_feedback": float(penalidade_feedback[idx_local]), "score_final": float(score_final[idx_local]), }) return { "query": query_text, "categoria_inferida": categoria, "resultados": resultados, }