final_project / app /search.py
Ana2012
Deploy backend FastAPI para HF Spaces
614aa6b
import os
import numpy as np
import pandas as pd
import torch
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from .utils import (
bonus_lexical,
inferir_categoria_consulta,
limpar_texto,
mapear_categoria,
)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, "data")
LOGS_DIR = os.path.join(BASE_DIR, "logs")
PATH_PRODUCTS = os.path.join(DATA_DIR, "produtos_finetunado.csv")
PATH_EMBEDDINGS = os.path.join(DATA_DIR, "embeddings_produtos_finetunado.npy")
PATH_NEGATIVE_MEMORY = os.path.join(LOGS_DIR, "negative_memory.csv")
MODEL_NAME = os.getenv("HF_MODEL_REPO", "Ana2012/bertimbau-buscador").strip()
HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip()
class SearchEngine:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model = None
self.df_produtos = None
self.emb_produtos = None
self.df_negative_memory = pd.DataFrame()
self.negative_memory_mtime = None
def load(self):
self._load_products()
self._load_model()
self._load_embeddings()
self._refresh_negative_memory(force=True)
def _load_products(self):
df = pd.read_csv(PATH_PRODUCTS)
df.columns = df.columns.str.strip().str.lower()
df["product_name"] = df["product_name"].fillna("").astype(str)
df["description"] = df["description"].fillna("").astype(str)
df["categoria_principal"] = df["categoria_principal"].fillna("").astype(str)
df["category_names_text"] = df["category_names_text"].fillna("").astype(str)
df["region"] = df["region"].fillna("").astype(str)
df["neighborhood"] = df["neighborhood"].fillna("").astype(str)
df["product_name_limpo"] = df["product_name"].apply(limpar_texto)
df["description_limpa"] = df["description"].apply(limpar_texto)
df["categoria_principal_limpa"] = df["categoria_principal"].apply(limpar_texto)
df["category_names_text_limpo"] = df["category_names_text"].apply(limpar_texto)
df["region_limpa"] = df["region"].apply(limpar_texto)
df["neighborhood_limpo"] = df["neighborhood"].apply(limpar_texto)
df["texto_busca_reforcado"] = (
"produto " + df["product_name_limpo"] + " "
+ "categoria " + df["categoria_principal_limpa"] + " "
+ "categorias " + df["category_names_text_limpo"] + " "
+ "bairro " + df["neighborhood_limpo"] + " "
+ "regiao " + df["region_limpa"] + " "
+ "descricao " + df["description_limpa"]
).str.strip()
df["categoria_grupo"] = df["categoria_principal"].apply(mapear_categoria)
self.df_produtos = df
def _load_model(self):
kwargs = {"device": self.device}
if HF_API_TOKEN:
kwargs["token"] = HF_API_TOKEN
# Usa o mesmo pipeline validado localmente com SentenceTransformer.
self.model = SentenceTransformer(MODEL_NAME, **kwargs)
def _load_embeddings(self):
self.emb_produtos = np.load(PATH_EMBEDDINGS)
# Se estes embeddings .npy foram gerados com outro pipeline
# (por exemplo, AutoModel + mean pooling manual), os scores podem ficar inconsistentes.
# Nesse caso, regenere os embeddings dos produtos com o mesmo SentenceTransformer.
if self.emb_produtos.ndim != 2:
raise RuntimeError("O arquivo de embeddings precisa conter uma matriz 2D.")
def runtime_info(self):
return {
"model_repo": MODEL_NAME,
"device": self.device,
"products_loaded": 0 if self.df_produtos is None else int(len(self.df_produtos)),
"embeddings_loaded": 0 if self.emb_produtos is None else int(len(self.emb_produtos)),
"embedding_dim": 0 if self.emb_produtos is None else int(self.emb_produtos.shape[1]),
}
def _refresh_negative_memory(self, force=False):
if not os.path.exists(PATH_NEGATIVE_MEMORY):
self.df_negative_memory = pd.DataFrame()
self.negative_memory_mtime = None
return
current_mtime = os.path.getmtime(PATH_NEGATIVE_MEMORY)
if not force and self.negative_memory_mtime == current_mtime:
return
df = pd.read_csv(PATH_NEGATIVE_MEMORY)
df.columns = df.columns.str.strip().str.lower()
for col in ["query", "product_id", "product_name", "motivo", "rating"]:
if col not in df.columns:
df[col] = ""
df["query"] = df["query"].fillna("").astype(str)
df["query_limpa"] = df["query"].apply(limpar_texto)
df["product_id"] = df["product_id"].fillna("").astype(str)
df["product_name"] = df["product_name"].fillna("").astype(str)
df["motivo"] = df["motivo"].fillna("").astype(str)
df["rating_num"] = pd.to_numeric(df["rating"], errors="coerce")
self.df_negative_memory = df
self.negative_memory_mtime = current_mtime
def _similaridade_consulta(self, query_atual, query_memoria):
if not query_atual or not query_memoria:
return 0.0
if query_atual == query_memoria:
return 1.0
termos_atuais = set(query_atual.split())
termos_memoria = set(query_memoria.split())
if not termos_atuais or not termos_memoria:
return 0.0
intersecao = len(termos_atuais & termos_memoria)
if intersecao == 0:
return 0.0
return intersecao / max(len(termos_atuais), len(termos_memoria))
def _calcular_penalidade_feedback(self, query_text, df_filtrado):
self._refresh_negative_memory()
if self.df_negative_memory.empty:
return np.zeros(len(df_filtrado))
query_limpa = limpar_texto(query_text)
memorias = self.df_negative_memory[
self.df_negative_memory["product_id"].isin(df_filtrado["product_id"].astype(str))
]
if memorias.empty:
return np.zeros(len(df_filtrado))
penalidades = {}
for _, memoria in memorias.iterrows():
similaridade = self._similaridade_consulta(query_limpa, memoria["query_limpa"])
if similaridade <= 0:
continue
penalidade = 0.08 + (0.12 * similaridade)
if memoria["motivo"] == "nao_foi_util":
penalidade += 0.04
if pd.notna(memoria["rating_num"]) and memoria["rating_num"] <= 2:
penalidade += 0.04
product_id = memoria["product_id"]
penalidades[product_id] = min(penalidades.get(product_id, 0.0) + penalidade, 0.45)
return df_filtrado["product_id"].astype(str).map(lambda x: penalidades.get(x, 0.0)).values
def gerar_embedding_unico(self, texto):
embedding = self.model.encode(
texto,
convert_to_numpy=True,
normalize_embeddings=False,
show_progress_bar=False,
)
return np.asarray(embedding, dtype=np.float32)
def buscar(self, query_text, top_k=5):
query_limpa = limpar_texto(query_text)
categoria = inferir_categoria_consulta(query_limpa)
if categoria is not None:
mask = self.df_produtos["categoria_grupo"] == categoria
df_filtrado = self.df_produtos[mask].copy()
idx_filtrado = df_filtrado.index.tolist()
else:
df_filtrado = self.df_produtos.copy()
idx_filtrado = df_filtrado.index.tolist()
if len(df_filtrado) == 0:
df_filtrado = self.df_produtos.copy()
idx_filtrado = df_filtrado.index.tolist()
emb_query = self.gerar_embedding_unico(query_text).reshape(1, -1)
emb_base = self.emb_produtos[idx_filtrado]
if emb_base.shape[1] != emb_query.shape[1]:
raise RuntimeError(
"Dimensao incompatível entre os embeddings salvos e o embedding da consulta. "
"Regenere o arquivo .npy com o mesmo modelo SentenceTransformer."
)
sims = cosine_similarity(emb_query, emb_base)[0]
bonus = df_filtrado.apply(
lambda row: bonus_lexical(
query_text,
row["product_name"],
row["categoria_principal"],
row["neighborhood"],
row["region"],
row["description"],
row["texto_busca_reforcado"],
),
axis=1,
).values
penalidade_feedback = self._calcular_penalidade_feedback(query_text, df_filtrado)
score_final = sims + bonus - penalidade_feedback
top_idx_local = np.argsort(score_final)[::-1][:top_k]
resultados = []
for rank, idx_local in enumerate(top_idx_local, start=1):
idx_global = idx_filtrado[idx_local]
prod = self.df_produtos.iloc[idx_global]
resultados.append({
"rank": rank,
"establishment_id": str(prod["establishment_id"]),
"product_id": str(prod["product_id"]),
"product_name": prod["product_name"],
"categoria_principal": prod["categoria_principal"],
"categoria_grupo": prod["categoria_grupo"],
"region": prod["region"],
"neighborhood": prod["neighborhood"],
"score_semantico": float(sims[idx_local]),
"bonus_lexical": float(bonus[idx_local]),
"penalidade_feedback": float(penalidade_feedback[idx_local]),
"score_final": float(score_final[idx_local]),
})
return {
"query": query_text,
"categoria_inferida": categoria,
"resultados": resultados,
}