| import os
|
| import pandas as pd
|
| import numpy as np
|
| from sentence_transformers import SentenceTransformer
|
| from sklearn.neighbors import NearestNeighbors
|
| from sklearn.decomposition import TruncatedSVD, NMF
|
| from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances
|
| from sklearn.feature_extraction.text import TfidfVectorizer
|
| from sklearn.cluster import KMeans
|
| from sklearn.preprocessing import StandardScaler
|
| from scipy.sparse import csr_matrix
|
| from scipy.spatial.distance import pdist, squareform
|
| import gradio as gr
|
| import json
|
| import re
|
| from collections import defaultdict, Counter
|
| import csv
|
| import time
|
| from datetime import datetime
|
| import warnings
|
| warnings.filterwarnings('ignore')
|
|
|
|
|
| from huggingface_hub import hf_hub_download
|
|
|
|
|
|
|
| def download_file_from_hf(filename, repo_id="aegarciaherrera/Sistema_Recomendador_Archivos"):
|
| """
|
| Descarga un archivo específico desde el repositorio de Hugging Face
|
| """
|
| try:
|
| file_path = hf_hub_download(
|
| repo_id=repo_id,
|
| filename=filename,
|
| repo_type="dataset"
|
| )
|
| print(f"✓ Archivo {filename} descargado exitosamente")
|
| return file_path
|
| except Exception as e:
|
| print(f"✗ Error descargando {filename}: {str(e)}")
|
| return None
|
|
|
|
|
| print("Descargando archivos desde Hugging Face...")
|
| productos_path = download_file_from_hf("productos.csv")
|
| mapping_path = download_file_from_hf("embedding_index_mapping.csv")
|
|
|
|
|
| if productos_path and mapping_path:
|
| df_productos = pd.read_csv(productos_path)
|
| df_productos = df_productos.reset_index(drop=True)
|
| df_mapping = pd.read_csv(mapping_path)
|
| print(f"✓ Productos cargados: {len(df_productos):,} registros")
|
| print(f"✓ Mapping cargado: {len(df_mapping):,} registros")
|
| else:
|
| raise FileNotFoundError("No se pudieron descargar los archivos principales")
|
|
|
|
|
| try:
|
|
|
| ratings_agg_path = download_file_from_hf("ratings_aggregated.csv")
|
| ratings_det_path = download_file_from_hf("ratings_detailed.csv")
|
|
|
| if ratings_agg_path and ratings_det_path:
|
| df_ratings_aggregated = pd.read_csv(ratings_agg_path)
|
| df_ratings_detailed = pd.read_csv(ratings_det_path)
|
| ratings_dict = df_ratings_aggregated.set_index('parent_asin')['average_rating'].to_dict()
|
| print(f"✓ Ratings V2 cargados: {len(ratings_dict):,} productos con ratings")
|
| HAS_DETAILED_RATINGS = True
|
| else:
|
| raise FileNotFoundError("Archivos V2 no encontrados")
|
|
|
| except Exception as e:
|
| print(f"No se pudieron cargar ratings V2: {str(e)}")
|
| try:
|
|
|
| ratings_path = download_file_from_hf("ratings.csv")
|
| if ratings_path:
|
| df_ratings = pd.read_csv(ratings_path)
|
| ratings_dict = df_ratings.set_index('parent_asin')['rating'].to_dict()
|
| df_ratings_detailed = df_ratings
|
| print(f"✓ Ratings V1 cargados: {len(ratings_dict):,} productos con ratings")
|
| HAS_DETAILED_RATINGS = False
|
| else:
|
| raise FileNotFoundError("No se pudo cargar ratings V1")
|
| except Exception as e2:
|
| print(f"✗ Error cargando ratings: {str(e2)}")
|
| ratings_dict = {}
|
| df_ratings_detailed = pd.DataFrame()
|
| HAS_DETAILED_RATINGS = False
|
|
|
| print("=" * 50)
|
| print("RESUMEN DE CARGA:")
|
| print(f"- Productos: {len(df_productos):,} registros")
|
| print(f"- Ratings: {len(ratings_dict):,} productos")
|
| print(f"- Ratings detallados: {'Sí' if HAS_DETAILED_RATINGS else 'No'}")
|
| print("=" * 50)
|
|
|
|
|
|
|
| df_similars = df_productos[df_productos["parent_asin"].isin(df_mapping["parent_asin"])].reset_index(drop=True)
|
|
|
|
|
| df_similars = df_similars.merge(df_mapping, on="parent_asin").sort_values("index").reset_index(drop=True)
|
|
|
|
|
| df_similars["description"] = df_similars["description"].fillna("").astype(str)
|
| df_similars = df_similars.reset_index(drop=True)
|
|
|
| print(f"Total de productos en df_similars: {len(df_similars):,}")
|
| print(f"Productos únicos: {df_similars['parent_asin'].nunique():,}")
|
|
|
|
|
| model = SentenceTransformer("all-MiniLM-L6-v2")
|
|
|
|
|
| try:
|
| embeddings_path = download_file_from_hf("embeddings.npy")
|
| descriptions_path = download_file_from_hf("descriptions.npy")
|
|
|
| if embeddings_path and descriptions_path:
|
| description_embeddings = np.load(embeddings_path)
|
| descriptions = np.load(descriptions_path, allow_pickle=True)
|
| print(f"✓ Embeddings precalculados descargados y cargados: {description_embeddings.shape}")
|
|
|
|
|
| if len(description_embeddings) != len(df_similars):
|
| print(f"WARNING: Mismatch detectado!")
|
| print(f" Embeddings: {len(description_embeddings)}")
|
| print(f" df_similars: {len(df_similars)}")
|
| print(" Recomiendo regenerar embeddings con el nuevo df_similars")
|
| else:
|
| print("✓ Consistencia verificada: embeddings y df_similars coinciden")
|
| else:
|
| raise FileNotFoundError("No se pudieron descargar los embeddings precomputados")
|
|
|
| except Exception as e:
|
| print(f"✗ Error descargando embeddings: {str(e)}")
|
| print("Generando embeddings básicos...")
|
| description_embeddings = model.encode(df_similars["description"].tolist())
|
| descriptions = df_similars["description"].values
|
|
|
|
|
| class RecommendationMetrics:
|
| """Sistema de métricas para evaluar y comparar diferentes enfoques de recomendación"""
|
|
|
| def __init__(self):
|
| self.metrics_history = defaultdict(list)
|
| self.execution_times = defaultdict(list)
|
|
|
| def calculate_diversity(self, recommendations_asins):
|
| """Calcula la diversidad de las recomendaciones basada en categorías"""
|
| if not recommendations_asins:
|
| return 0.0
|
|
|
| categories = []
|
| for asin in recommendations_asins:
|
| product_row = df_similars[df_similars['parent_asin'] == asin]
|
| if len(product_row) > 0:
|
| category = product_row.iloc[0].get('main_category', 'Unknown')
|
| categories.append(category)
|
|
|
| if not categories:
|
| return 0.0
|
|
|
| unique_categories = len(set(categories))
|
| total_items = len(categories)
|
| return unique_categories / total_items
|
|
|
| def calculate_novelty(self, recommendations_asins):
|
| """Calcula la novedad basada en popularidad (rating y frecuencia)"""
|
| if not recommendations_asins:
|
| return 0.0
|
|
|
| novelty_scores = []
|
| for asin in recommendations_asins:
|
| rating = ratings_dict.get(asin, 0.0)
|
|
|
| novelty_score = max(0, 5.0 - rating) / 5.0
|
| novelty_scores.append(novelty_score)
|
|
|
| return np.mean(novelty_scores) if novelty_scores else 0.0
|
|
|
| def calculate_coverage(self, recommendations_asins, total_available_items):
|
| """Calcula el coverage como porcentaje de items únicos recomendados"""
|
| unique_recommendations = len(set(recommendations_asins))
|
| return unique_recommendations / min(total_available_items, 100)
|
|
|
| def calculate_precision_at_k(self, recommendations_asins, relevant_items, k=5):
|
| """Calcula precision@k"""
|
| if not recommendations_asins or not relevant_items:
|
| return 0.0
|
|
|
| top_k = recommendations_asins[:k]
|
| relevant_in_top_k = len(set(top_k) & set(relevant_items))
|
| return relevant_in_top_k / min(k, len(top_k))
|
|
|
| def evaluate_recommendations(self, method_name, recommendations_asins, execution_time,
|
| relevant_items=None, total_available=1000):
|
| """Evalúa un conjunto de recomendaciones con múltiples métricas"""
|
| metrics = {
|
| 'method': method_name,
|
| 'timestamp': datetime.now(),
|
| 'execution_time': execution_time,
|
| 'num_recommendations': len(recommendations_asins),
|
| 'diversity': self.calculate_diversity(recommendations_asins),
|
| 'novelty': self.calculate_novelty(recommendations_asins),
|
| 'coverage': self.calculate_coverage(recommendations_asins, total_available)
|
| }
|
|
|
| if relevant_items:
|
| metrics['precision_at_5'] = self.calculate_precision_at_k(recommendations_asins, relevant_items, 5)
|
|
|
|
|
| for key, value in metrics.items():
|
| if key not in ['method', 'timestamp']:
|
| self.metrics_history[f"{method_name}_{key}"].append(value)
|
|
|
| return metrics
|
|
|
| def get_comparison_report(self):
|
| """Genera un reporte comparativo de todos los métodos evaluados"""
|
| if not self.metrics_history:
|
| return "No hay métricas disponibles"
|
|
|
| report = "# 📊 REPORTE COMPARATIVO DE MÉTODOS\n\n"
|
|
|
|
|
| methods = set()
|
| for key in self.metrics_history.keys():
|
| method = key.split('_')[0] + '_' + key.split('_')[1]
|
| methods.add(method)
|
|
|
| for method in sorted(methods):
|
| report += f"## {method.replace('_', ' ').title()}\n"
|
|
|
|
|
| method_metrics = {}
|
| for key, values in self.metrics_history.items():
|
| if key.startswith(method):
|
| metric_name = '_'.join(key.split('_')[2:])
|
| if values:
|
| method_metrics[metric_name] = {
|
| 'mean': np.mean(values),
|
| 'std': np.std(values),
|
| 'count': len(values)
|
| }
|
|
|
| for metric, stats in method_metrics.items():
|
| report += f"- **{metric.replace('_', ' ').title()}**: {stats['mean']:.4f} ± {stats['std']:.4f} (n={stats['count']})\n"
|
|
|
| report += "\n"
|
|
|
| return report
|
|
|
|
|
| metrics_evaluator = RecommendationMetrics()
|
|
|
|
|
|
|
| class DescriptionSearcher:
|
| """Sistema de búsqueda por descripción con múltiples enfoques"""
|
|
|
| def __init__(self, df_products, embeddings, model):
|
| self.df_products = df_products
|
| self.embeddings = embeddings
|
| self.model = model
|
| self.setup_methods()
|
|
|
| def setup_methods(self):
|
| """Configura los diferentes métodos de búsqueda"""
|
|
|
| self.knn = NearestNeighbors(n_neighbors=50, metric="cosine")
|
| self.knn.fit(self.embeddings)
|
|
|
|
|
| descriptions_text = self.df_products["description"].fillna("").tolist()
|
| self.tfidf_vectorizer = TfidfVectorizer(
|
| max_features=5000,
|
| stop_words='english',
|
| ngram_range=(1, 2),
|
| min_df=2
|
| )
|
| self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(descriptions_text)
|
|
|
|
|
| self.n_clusters = min(100, len(self.df_products) // 10)
|
| self.kmeans = KMeans(n_clusters=self.n_clusters, random_state=42, n_init=10)
|
| self.cluster_labels = self.kmeans.fit_predict(self.embeddings)
|
|
|
| def search_method_1_knn(self, query, n_results=5):
|
| """Método 1: KNN con embeddings (original mejorado)"""
|
| start_time = time.time()
|
|
|
| query_embedding = self.model.encode([query])
|
| distances, indices = self.knn.kneighbors(query_embedding, n_neighbors=min(50, len(self.df_products)))
|
|
|
| results = []
|
| seen_asins = set()
|
|
|
| for i, idx in enumerate(indices[0]):
|
| if len(results) >= n_results:
|
| break
|
|
|
| if idx >= len(self.df_products):
|
| continue
|
|
|
| row = self.df_products.iloc[idx]
|
| asin = row.get("parent_asin", "N/A")
|
|
|
| if asin in seen_asins:
|
| continue
|
| seen_asins.add(asin)
|
|
|
| similarity_score = 1 - distances[0][i]
|
| results.append({
|
| 'asin': asin,
|
| 'similarity_score': similarity_score,
|
| 'method': 'KNN_Embeddings'
|
| })
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in results]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'search_knn', result_asins, execution_time
|
| )
|
|
|
| return results, metrics
|
|
|
| def search_method_2_tfidf(self, query, n_results=5):
|
| """Método 2: TF-IDF + Cosine Similarity"""
|
| start_time = time.time()
|
|
|
| query_tfidf = self.tfidf_vectorizer.transform([query])
|
| similarities = cosine_similarity(query_tfidf, self.tfidf_matrix).flatten()
|
|
|
|
|
| top_indices = np.argsort(similarities)[::-1]
|
|
|
| results = []
|
| seen_asins = set()
|
|
|
| for idx in top_indices:
|
| if len(results) >= n_results:
|
| break
|
|
|
| if similarities[idx] < 0.01:
|
| continue
|
|
|
| row = self.df_products.iloc[idx]
|
| asin = row.get("parent_asin", "N/A")
|
|
|
| if asin in seen_asins:
|
| continue
|
| seen_asins.add(asin)
|
|
|
| results.append({
|
| 'asin': asin,
|
| 'similarity_score': similarities[idx],
|
| 'method': 'TF_IDF'
|
| })
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in results]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'search_tfidf', result_asins, execution_time
|
| )
|
|
|
| return results, metrics
|
|
|
| def search_method_3_cluster(self, query, n_results=5):
|
| """Método 3: Clustering + Embedding similarity"""
|
| start_time = time.time()
|
|
|
| query_embedding = self.model.encode([query])
|
|
|
|
|
| query_cluster = self.kmeans.predict(query_embedding)[0]
|
|
|
|
|
| cluster_mask = self.cluster_labels == query_cluster
|
| cluster_indices = np.where(cluster_mask)[0]
|
|
|
| if len(cluster_indices) == 0:
|
| execution_time = time.time() - start_time
|
| return [], {'method': 'Cluster_Search', 'execution_time': execution_time}
|
|
|
|
|
| cluster_embeddings = self.embeddings[cluster_indices]
|
| similarities = cosine_similarity(query_embedding, cluster_embeddings).flatten()
|
|
|
|
|
| sorted_cluster_indices = cluster_indices[np.argsort(similarities)[::-1]]
|
|
|
| results = []
|
| seen_asins = set()
|
|
|
| for idx in sorted_cluster_indices:
|
| if len(results) >= n_results:
|
| break
|
|
|
| row = self.df_products.iloc[idx]
|
| asin = row.get("parent_asin", "N/A")
|
|
|
| if asin in seen_asins:
|
| continue
|
| seen_asins.add(asin)
|
|
|
| similarity_idx = np.where(cluster_indices == idx)[0][0]
|
| similarity_score = similarities[similarity_idx]
|
|
|
| results.append({
|
| 'asin': asin,
|
| 'similarity_score': similarity_score,
|
| 'method': 'Cluster_Search'
|
| })
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in results]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'search_cluster', result_asins, execution_time
|
| )
|
|
|
| return results, metrics
|
|
|
|
|
|
|
| class CollaborativeRecommender:
|
| """Sistema de recomendación colaborativa con múltiples enfoques"""
|
|
|
| def __init__(self, ratings_df, min_ratings_per_user=5, min_ratings_per_item=5):
|
| self.ratings_df = ratings_df.copy()
|
| self.min_ratings_per_user = min_ratings_per_user
|
| self.min_ratings_per_item = min_ratings_per_item
|
| self.user_item_matrix = None
|
| self.item_similarity_matrix = None
|
| self.svd_model = None
|
| self.nmf_model = None
|
| self.user_encoder = {}
|
| self.item_encoder = {}
|
| self.user_decoder = {}
|
| self.item_decoder = {}
|
|
|
| self._prepare_data()
|
| self._build_matrices()
|
|
|
| def _prepare_data(self):
|
| """Prepara los datos filtrando usuarios e items con pocas interacciones"""
|
| print("Preparando datos para filtrado colaborativo...")
|
|
|
|
|
| user_counts = self.ratings_df['user_id'].value_counts()
|
| valid_users = user_counts[user_counts >= self.min_ratings_per_user].index
|
|
|
|
|
| item_counts = self.ratings_df['parent_asin'].value_counts()
|
| valid_items = item_counts[item_counts >= self.min_ratings_per_item].index
|
|
|
|
|
| self.ratings_df = self.ratings_df[
|
| (self.ratings_df['user_id'].isin(valid_users)) &
|
| (self.ratings_df['parent_asin'].isin(valid_items))
|
| ]
|
|
|
| print(f"Datos filtrados: {len(self.ratings_df):,} ratings, "
|
| f"{self.ratings_df['user_id'].nunique():,} usuarios, "
|
| f"{self.ratings_df['parent_asin'].nunique():,} productos")
|
|
|
|
|
| unique_users = self.ratings_df['user_id'].unique()
|
| unique_items = self.ratings_df['parent_asin'].unique()
|
|
|
| self.user_encoder = {user: idx for idx, user in enumerate(unique_users)}
|
| self.item_encoder = {item: idx for idx, item in enumerate(unique_items)}
|
| self.user_decoder = {idx: user for user, idx in self.user_encoder.items()}
|
| self.item_decoder = {idx: item for item, idx in self.item_encoder.items()}
|
|
|
| def _build_matrices(self):
|
| """Construye las matrices necesarias para la recomendación"""
|
| print("Construyendo matrices de interacción...")
|
|
|
| n_users = len(self.user_encoder)
|
| n_items = len(self.item_encoder)
|
|
|
|
|
| user_indices = self.ratings_df['user_id'].map(self.user_encoder)
|
| item_indices = self.ratings_df['parent_asin'].map(self.item_encoder)
|
| ratings = self.ratings_df['rating'].values
|
|
|
|
|
| self.user_item_matrix = csr_matrix(
|
| (ratings, (user_indices, item_indices)),
|
| shape=(n_users, n_items)
|
| )
|
|
|
|
|
| print("Calculando similitudes SVD...")
|
| self.svd_model = TruncatedSVD(
|
| n_components=min(50, min(n_users, n_items)-1),
|
| random_state=42
|
| )
|
| item_features_svd = self.svd_model.fit_transform(self.user_item_matrix.T)
|
| self.item_similarity_matrix = cosine_similarity(item_features_svd)
|
|
|
|
|
| print("Calculando factorización NMF...")
|
| self.nmf_model = NMF(
|
| n_components=min(30, min(n_users, n_items)-1),
|
| random_state=42,
|
| max_iter=200
|
| )
|
| self.user_features_nmf = self.nmf_model.fit_transform(self.user_item_matrix)
|
| self.item_features_nmf = self.nmf_model.components_.T
|
|
|
|
|
| print("Calculando similitud directa...")
|
| self.item_similarity_direct = cosine_similarity(self.user_item_matrix.T)
|
|
|
| print("Matrices construidas exitosamente")
|
|
|
| def recommend_method_1_svd(self, target_item, n_recommendations=4):
|
| """Método 1: Recomendaciones basadas en SVD"""
|
| start_time = time.time()
|
|
|
| if target_item not in self.item_encoder:
|
| return [], {'method': 'SVD_Collaborative', 'execution_time': 0}
|
|
|
| target_idx = self.item_encoder[target_item]
|
| similarities = self.item_similarity_matrix[target_idx]
|
|
|
|
|
| similar_indices = np.argsort(similarities)[::-1][1:n_recommendations+1]
|
|
|
| recommendations = []
|
| for idx in similar_indices:
|
| item_id = self.item_decoder[idx]
|
| similarity_score = similarities[idx]
|
| recommendations.append({
|
| 'asin': item_id,
|
| 'similarity_score': similarity_score,
|
| 'method': 'SVD_Collaborative'
|
| })
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in recommendations]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'collab_svd', result_asins, execution_time
|
| )
|
|
|
| return recommendations, metrics
|
|
|
| def recommend_method_2_nmf(self, target_item, n_recommendations=4):
|
| """Método 2: Recomendaciones basadas en NMF"""
|
| start_time = time.time()
|
|
|
| if target_item not in self.item_encoder:
|
| return [], {'method': 'NMF_Collaborative', 'execution_time': 0}
|
|
|
| target_idx = self.item_encoder[target_item]
|
| target_features = self.item_features_nmf[target_idx]
|
|
|
|
|
| similarities = cosine_similarity([target_features], self.item_features_nmf).flatten()
|
|
|
|
|
| similar_indices = np.argsort(similarities)[::-1][1:n_recommendations+1]
|
|
|
| recommendations = []
|
| for idx in similar_indices:
|
| item_id = self.item_decoder[idx]
|
| similarity_score = similarities[idx]
|
| recommendations.append({
|
| 'asin': item_id,
|
| 'similarity_score': similarity_score,
|
| 'method': 'NMF_Collaborative'
|
| })
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in recommendations]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'collab_nmf', result_asins, execution_time
|
| )
|
|
|
| return recommendations, metrics
|
|
|
| def recommend_method_3_direct(self, target_item, n_recommendations=4):
|
| """Método 3: Similitud directa item-to-item"""
|
| start_time = time.time()
|
|
|
| if target_item not in self.item_encoder:
|
| return [], {'method': 'Direct_Collaborative', 'execution_time': 0}
|
|
|
| target_idx = self.item_encoder[target_item]
|
| similarities = self.item_similarity_direct[target_idx]
|
|
|
|
|
| similar_indices = np.argsort(similarities)[::-1][1:n_recommendations+1]
|
|
|
| recommendations = []
|
| for idx in similar_indices:
|
| item_id = self.item_decoder[idx]
|
| similarity_score = similarities[idx]
|
| recommendations.append({
|
| 'asin': item_id,
|
| 'similarity_score': similarity_score,
|
| 'method': 'Direct_Collaborative'
|
| })
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in recommendations]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'collab_direct', result_asins, execution_time
|
| )
|
|
|
| return recommendations, metrics
|
|
|
| def get_available_items(self):
|
| """Retorna lista de items disponibles para recomendación"""
|
| return list(self.item_encoder.keys())
|
|
|
|
|
|
|
| class ClientBasedRecommender:
|
| """Sistema de recomendación basado en productos seleccionados por un cliente"""
|
|
|
| def __init__(self, df_products, embeddings, ratings_dict):
|
| self.df_products = df_products
|
| self.embeddings = embeddings
|
| self.ratings_dict = ratings_dict
|
| self.setup_methods()
|
|
|
| def setup_methods(self):
|
| """Configura mapeo ASIN -> índice posicional"""
|
| self.asin_to_idx = {}
|
|
|
| parent_asins = self.df_products["parent_asin"].values
|
|
|
| for idx, asin in enumerate(parent_asins):
|
| if pd.notna(asin):
|
| self.asin_to_idx[asin] = idx
|
|
|
| self.prepare_content_features()
|
|
|
|
|
| assert len(self.embeddings) == len(self.df_products), \
|
| f"ERROR: embeddings ({len(self.embeddings)}) y dataframe ({len(self.df_products)}) NO coinciden."
|
|
|
|
|
| def prepare_content_features(self):
|
| """Prepara características de contenido para recomendaciones"""
|
|
|
| categories = self.df_products.get('main_category', pd.Series(['Unknown'] * len(self.df_products)))
|
| self.unique_categories = list(set(categories.fillna('Unknown')))
|
|
|
|
|
| self.category_features = np.zeros((len(self.df_products), len(self.unique_categories)))
|
| for idx, category in enumerate(categories.fillna('Unknown')):
|
| if category in self.unique_categories:
|
| cat_idx = self.unique_categories.index(category)
|
| self.category_features[idx, cat_idx] = 1
|
|
|
| def recommend_method_1_profile_similarity(self, selected_asins, n_recommendations=5):
|
| """Método 1: Perfil de usuario basado en similitud de embeddings"""
|
| start_time = time.time()
|
|
|
| if not selected_asins:
|
| return [], {'method': 'Profile_Similarity', 'execution_time': 0}
|
|
|
|
|
| selected_embeddings = []
|
| valid_asins = []
|
|
|
| for asin in selected_asins:
|
| if asin in self.asin_to_idx:
|
| idx = self.asin_to_idx[asin]
|
| selected_embeddings.append(self.embeddings[idx])
|
| valid_asins.append(asin)
|
|
|
| if not selected_embeddings:
|
| return [], {'method': 'Profile_Similarity', 'execution_time': 0}
|
|
|
|
|
| user_profile = np.mean(selected_embeddings, axis=0)
|
|
|
|
|
| similarities = cosine_similarity([user_profile], self.embeddings).flatten()
|
|
|
|
|
| excluded_indices = [self.asin_to_idx[asin] for asin in valid_asins if asin in self.asin_to_idx]
|
| for idx in excluded_indices:
|
| similarities[idx] = -1
|
|
|
|
|
| top_indices = np.argsort(similarities)[::-1][:n_recommendations]
|
|
|
| recommendations = []
|
| for idx in top_indices:
|
| if similarities[idx] <= 0:
|
| continue
|
|
|
| row = self.df_products.iloc[idx]
|
| asin = row.get('parent_asin')
|
| if asin:
|
| recommendations.append({
|
| 'asin': asin,
|
| 'similarity_score': similarities[idx],
|
| 'method': 'Profile_Similarity'
|
| })
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in recommendations]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'client_profile', result_asins, execution_time
|
| )
|
|
|
| return recommendations, metrics
|
|
|
| def recommend_method_2_weighted_categories(self, selected_asins, n_recommendations=5):
|
| """Método 2: Recomendación basada en categorías ponderadas"""
|
| start_time = time.time()
|
|
|
| if not selected_asins:
|
| return [], {'method': 'Weighted_Categories', 'execution_time': 0}
|
|
|
|
|
| category_weights = defaultdict(float)
|
| valid_selections = 0
|
|
|
| for asin in selected_asins:
|
| if asin in self.asin_to_idx:
|
| idx = self.asin_to_idx[asin]
|
| row = self.df_products.iloc[idx]
|
| category = row.get('main_category', 'Unknown')
|
|
|
|
|
| rating = self.ratings_dict.get(asin, 3.0)
|
| category_weights[category] += rating / 5.0
|
| valid_selections += 1
|
|
|
| if not category_weights:
|
| return [], {'method': 'Weighted_Categories', 'execution_time': 0}
|
|
|
|
|
| total_weight = sum(category_weights.values())
|
| for category in category_weights:
|
| category_weights[category] /= total_weight
|
|
|
|
|
| product_scores = []
|
| excluded_asins = set(selected_asins)
|
|
|
| for idx, row in self.df_products.iterrows():
|
| asin = row.get('parent_asin')
|
| if not asin or asin in excluded_asins:
|
| continue
|
|
|
| category = row.get('main_category', 'Unknown')
|
| category_score = category_weights.get(category, 0.0)
|
|
|
|
|
| product_rating = self.ratings_dict.get(asin, 0.0)
|
| final_score = category_score * 0.7 + (product_rating / 5.0) * 0.3
|
|
|
| product_scores.append({
|
| 'asin': asin,
|
| 'similarity_score': final_score,
|
| 'method': 'Weighted_Categories'
|
| })
|
|
|
|
|
| product_scores.sort(key=lambda x: x['similarity_score'], reverse=True)
|
| recommendations = product_scores[:n_recommendations]
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in recommendations]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'client_categories', result_asins, execution_time
|
| )
|
|
|
| return recommendations, metrics
|
|
|
| def recommend_method_3_hybrid_approach(self, selected_asins, n_recommendations=5):
|
| """Método 3: Enfoque híbrido combinando embeddings, categorías y ratings"""
|
| start_time = time.time()
|
|
|
| if not selected_asins:
|
| return [], {'method': 'Hybrid_Approach', 'execution_time': 0}
|
|
|
|
|
| selected_embeddings = []
|
| selected_categories = []
|
| selected_ratings = []
|
| valid_asins = []
|
|
|
| for asin in selected_asins:
|
| if asin in self.asin_to_idx:
|
| idx = self.asin_to_idx[asin]
|
| row = self.df_products.iloc[idx]
|
|
|
| selected_embeddings.append(self.embeddings[idx])
|
| selected_categories.append(row.get('main_category', 'Unknown'))
|
| selected_ratings.append(self.ratings_dict.get(asin, 3.0))
|
| valid_asins.append(asin)
|
|
|
| if not selected_embeddings:
|
| return [], {'method': 'Hybrid_Approach', 'execution_time': 0}
|
|
|
|
|
| weights = np.array(selected_ratings) / 5.0
|
| weights = weights / np.sum(weights)
|
|
|
| user_profile = np.average(selected_embeddings, axis=0, weights=weights)
|
|
|
|
|
| category_preferences = Counter(selected_categories)
|
| total_selections = len(selected_categories)
|
|
|
|
|
| candidate_scores = []
|
| excluded_asins = set(selected_asins)
|
|
|
| for idx, row in self.df_products.iterrows():
|
| asin = row.get('parent_asin')
|
| if not asin or asin in excluded_asins:
|
| continue
|
|
|
|
|
| embedding_similarity = cosine_similarity([user_profile], [self.embeddings[idx]])[0][0]
|
|
|
|
|
| category = row.get('main_category', 'Unknown')
|
| category_score = category_preferences.get(category, 0) / total_selections
|
|
|
|
|
| product_rating = self.ratings_dict.get(asin, 0.0)
|
| rating_score = product_rating / 5.0
|
|
|
|
|
| hybrid_score = (
|
| embedding_similarity * 0.5 +
|
| category_score * 0.3 +
|
| rating_score * 0.2
|
| )
|
|
|
| candidate_scores.append({
|
| 'asin': asin,
|
| 'similarity_score': hybrid_score,
|
| 'method': 'Hybrid_Approach',
|
| 'embedding_sim': embedding_similarity,
|
| 'category_score': category_score,
|
| 'rating_score': rating_score
|
| })
|
|
|
|
|
| candidate_scores.sort(key=lambda x: x['similarity_score'], reverse=True)
|
| recommendations = candidate_scores[:n_recommendations]
|
|
|
| execution_time = time.time() - start_time
|
|
|
|
|
| result_asins = [r['asin'] for r in recommendations]
|
| metrics = metrics_evaluator.evaluate_recommendations(
|
| 'client_hybrid', result_asins, execution_time
|
| )
|
|
|
| return recommendations, metrics
|
|
|
|
|
| def clean_description(description):
|
| """Limpia la descripción eliminando corchetes y su contenido"""
|
| if not description or description == "":
|
| return "Sin descripción"
|
|
|
| if description.strip().startswith('[') and description.strip().endswith(']'):
|
| cleaned = description.strip()[1:-1].strip()
|
| else:
|
| cleaned = re.sub(r'\[.*?\]', '', description)
|
|
|
| cleaned = re.sub(r'\s+', ' ', cleaned).strip()
|
| return cleaned if cleaned else "Sin descripción"
|
|
|
| def get_best_image_url(row):
|
| """Extrae la mejor URL de imagen disponible"""
|
| image_columns = ['image_urls_best', 'image_urls_large', 'image_urls_all']
|
|
|
| for col in image_columns:
|
| if col in row:
|
| try:
|
| images = json.loads(row[col]) if isinstance(row[col], str) else row[col]
|
| if isinstance(images, list) and images:
|
| for img_url in images:
|
| if img_url and isinstance(img_url, str) and img_url.startswith("http"):
|
| return img_url
|
| except (json.JSONDecodeError, TypeError, ValueError):
|
| continue
|
|
|
| return "https://via.placeholder.com/300x300.png?text=No+Image"
|
|
|
| def get_product_rating(asin):
|
| """Obtiene el rating de un producto desde el diccionario de ratings"""
|
| return ratings_dict.get(asin, 0.0)
|
|
|
| def get_product_info_by_asin(asin):
|
| """Obtiene información de un producto por su ASIN"""
|
| product_row = df_similars[df_similars['parent_asin'] == asin]
|
| if len(product_row) == 0:
|
| return None
|
|
|
| row = product_row.iloc[0]
|
| return {
|
| 'asin': asin,
|
| 'title': row.get('title', 'Sin título'),
|
| 'description': clean_description(row.get('description', '')),
|
| 'rating': get_product_rating(asin),
|
| 'image_url': get_best_image_url(row),
|
| 'category': row.get('main_category', 'Unknown')
|
| }
|
|
|
|
|
|
|
|
|
| description_searcher = DescriptionSearcher(df_similars, description_embeddings, model)
|
|
|
|
|
| if HAS_DETAILED_RATINGS:
|
| print("Inicializando sistema de recomendación colaborativo...")
|
| collaborative_recommender = CollaborativeRecommender(df_ratings_detailed)
|
| else:
|
| print("Sistema colaborativo no disponible (requiere ratings detallados)")
|
| collaborative_recommender = None
|
|
|
|
|
| client_recommender = ClientBasedRecommender(df_similars, description_embeddings, ratings_dict)
|
|
|
|
|
|
|
| def search_products_enhanced(descripcion_input, method_choice, max_images_per_product=2, target_products=5):
|
| """Búsqueda mejorada con selección de método"""
|
| if not descripcion_input.strip():
|
| return [("https://via.placeholder.com/300.png?text=Vacío", "Por favor escribe algo para buscar...")]
|
|
|
|
|
| if method_choice == "KNN + Embeddings":
|
| results, metrics = description_searcher.search_method_1_knn(descripcion_input, target_products)
|
| elif method_choice == "TF-IDF + Cosine":
|
| results, metrics = description_searcher.search_method_2_tfidf(descripcion_input, target_products)
|
| elif method_choice == "Clustering + Embeddings":
|
| results, metrics = description_searcher.search_method_3_cluster(descripcion_input, target_products)
|
| else:
|
|
|
| results_knn, metrics_knn = description_searcher.search_method_1_knn(descripcion_input, 2)
|
| results_tfidf, metrics_tfidf = description_searcher.search_method_2_tfidf(descripcion_input, 2)
|
| results_cluster, metrics_cluster = description_searcher.search_method_3_cluster(descripcion_input, 2)
|
|
|
|
|
| all_results = results_knn + results_tfidf + results_cluster
|
| results = sorted(all_results, key=lambda x: x['similarity_score'], reverse=True)[:target_products]
|
|
|
| metrics = {
|
| 'method': 'All_Methods_Combined',
|
| 'knn_time': metrics_knn.get('execution_time', 0),
|
| 'tfidf_time': metrics_tfidf.get('execution_time', 0),
|
| 'cluster_time': metrics_cluster.get('execution_time', 0)
|
| }
|
|
|
|
|
| gallery_results = []
|
| for result in results:
|
| product_info = get_product_info_by_asin(result['asin'])
|
| if product_info:
|
| texto = f"🔍 Método: {result['method']}\n"
|
| texto += f"📦 {product_info['title']}\n"
|
| texto += f"⭐ Rating: {product_info['rating']:.2f}\n"
|
| texto += f"🎯 Similitud: {result['similarity_score']:.3f}\n"
|
| texto += f"📂 Categoría: {product_info['category']}\n\n"
|
| texto += f"📝 {product_info['description'][:200]}{'...' if len(product_info['description']) > 200 else ''}"
|
|
|
| gallery_results.append((product_info['image_url'], texto))
|
|
|
| return gallery_results
|
|
|
| def get_collaborative_recommendations_enhanced(selected_product_asin, method_choice):
|
| """Recomendaciones colaborativas mejoradas con selección de método"""
|
| if not collaborative_recommender:
|
| return [("https://via.placeholder.com/300.png?text=No+Disponible", "Sistema colaborativo no disponible")]
|
|
|
| if not selected_product_asin:
|
| return [("https://via.placeholder.com/300.png?text=Vacío", "Por favor selecciona un producto...")]
|
|
|
|
|
| if method_choice == "SVD":
|
| recommendations, metrics = collaborative_recommender.recommend_method_1_svd(selected_product_asin)
|
| elif method_choice == "NMF":
|
| recommendations, metrics = collaborative_recommender.recommend_method_2_nmf(selected_product_asin)
|
| elif method_choice == "Direct Similarity":
|
| recommendations, metrics = collaborative_recommender.recommend_method_3_direct(selected_product_asin)
|
| else:
|
|
|
| rec_svd, met_svd = collaborative_recommender.recommend_method_1_svd(selected_product_asin, 2)
|
| rec_nmf, met_nmf = collaborative_recommender.recommend_method_2_nmf(selected_product_asin, 2)
|
| rec_direct, met_direct = collaborative_recommender.recommend_method_3_direct(selected_product_asin, 1)
|
|
|
| recommendations = rec_svd + rec_nmf + rec_direct
|
| metrics = {'method': 'All_Collaborative_Methods'}
|
|
|
| if not recommendations:
|
| return [("https://via.placeholder.com/300.png?text=Sin+Recomendaciones", "No se encontraron recomendaciones para este producto.")]
|
|
|
|
|
| gallery_results = []
|
| for rec in recommendations:
|
| product_info = get_product_info_by_asin(rec['asin'])
|
| if product_info:
|
| texto = f"🤝 Método: {rec['method']}\n"
|
| texto += f"📦 {product_info['title']}\n"
|
| texto += f"⭐ Rating: {product_info['rating']:.2f}\n"
|
| texto += f"🎯 Similitud: {rec['similarity_score']:.3f}\n"
|
| texto += f"📂 Categoría: {product_info['category']}\n\n"
|
| texto += f"📝 {product_info['description'][:200]}{'...' if len(product_info['description']) > 200 else ''}"
|
|
|
| gallery_results.append((product_info['image_url'], texto))
|
|
|
| return gallery_results
|
|
|
| def get_client_recommendations(selected_asins_text, method_choice, n_recommendations=5):
|
| """Recomendaciones basadas en cliente con selección de método"""
|
| if not selected_asins_text.strip():
|
| return [("https://via.placeholder.com/300.png?text=Vacío", "Por favor ingresa ASINs de productos...")]
|
|
|
|
|
| selected_asins = []
|
| for asin in re.split(r'[,\s\n]+', selected_asins_text.strip()):
|
| asin = asin.strip()
|
| if asin:
|
| selected_asins.append(asin)
|
|
|
| if not selected_asins:
|
| return [("https://via.placeholder.com/300.png?text=Error", "No se pudieron parsear los ASINs")]
|
|
|
|
|
| if method_choice == "Profile Similarity":
|
| recommendations, metrics = client_recommender.recommend_method_1_profile_similarity(selected_asins, n_recommendations)
|
| elif method_choice == "Weighted Categories":
|
| recommendations, metrics = client_recommender.recommend_method_2_weighted_categories(selected_asins, n_recommendations)
|
| elif method_choice == "Hybrid Approach":
|
| recommendations, metrics = client_recommender.recommend_method_3_hybrid_approach(selected_asins, n_recommendations)
|
| else:
|
|
|
| rec_profile, met_profile = client_recommender.recommend_method_1_profile_similarity(selected_asins, 2)
|
| rec_categories, met_categories = client_recommender.recommend_method_2_weighted_categories(selected_asins, 2)
|
| rec_hybrid, met_hybrid = client_recommender.recommend_method_3_hybrid_approach(selected_asins, 1)
|
|
|
| recommendations = rec_profile + rec_categories + rec_hybrid
|
| metrics = {'method': 'All_Client_Methods'}
|
|
|
| if not recommendations:
|
| return [("https://via.placeholder.com/300.png?text=Sin+Recomendaciones", "No se encontraron recomendaciones para los productos seleccionados.")]
|
|
|
|
|
| gallery_results = []
|
| for rec in recommendations:
|
| product_info = get_product_info_by_asin(rec['asin'])
|
| if product_info:
|
| texto = f"👤 Método: {rec['method']}\n"
|
| texto += f"📦 {product_info['title']}\n"
|
| texto += f"⭐ Rating: {product_info['rating']:.2f}\n"
|
| texto += f"🎯 Score: {rec['similarity_score']:.3f}\n"
|
| texto += f"📂 Categoría: {product_info['category']}\n\n"
|
|
|
|
|
| if 'embedding_sim' in rec:
|
| texto += f"🔗 Sim. Embedding: {rec['embedding_sim']:.3f}\n"
|
| texto += f"📂 Score Categoría: {rec['category_score']:.3f}\n"
|
| texto += f"⭐ Score Rating: {rec['rating_score']:.3f}\n\n"
|
|
|
| texto += f"📝 {product_info['description'][:150]}{'...' if len(product_info['description']) > 150 else ''}"
|
|
|
| gallery_results.append((product_info['image_url'], texto))
|
|
|
| return gallery_results
|
|
|
| def get_product_options():
|
| """Obtiene lista de productos disponibles para el dropdown"""
|
| if not collaborative_recommender:
|
| return [("Sistema no disponible", "")]
|
|
|
| available_asins = collaborative_recommender.get_available_items()
|
| options = []
|
|
|
| for asin in available_asins[:100]:
|
| product_info = get_product_info_by_asin(asin)
|
| if product_info and product_info['rating'] > 0:
|
| label = f"{product_info['title'][:50]}... (Rating: {product_info['rating']:.1f})"
|
| options.append((label, asin))
|
|
|
| return options
|
|
|
| def get_metrics_report():
|
| """Genera reporte de métricas para mostrar en la interfaz"""
|
| return metrics_evaluator.get_comparison_report()
|
|
|
|
|
| def create_enhanced_interface():
|
| """Crea la interfaz mejorada con todas las funcionalidades y métricas"""
|
|
|
| with gr.Blocks(title="🚀 Advanced Product Recommendaation System", theme=gr.themes.Soft()) as demo:
|
| gr.Markdown("""
|
| # 🚀 Advanced Product Recommendaation System
|
|
|
| **Funcionalidades disponibles:**
|
| - 🔍 **Búsqueda por Descripción** (3 métodos: KNN+Embeddings, TF-IDF+Cosine, Clustering+Embeddings)
|
| - 🤝 **Recomendación Colaborativa** (3 métodos: SVD, NMF, Direct Similarity)
|
| - 👤 **Recomendación Basada en Cliente** (3 métodos: Profile Similarity, Weighted Categories, Hybrid Approach)
|
| - 📊 **Métricas y Comparación** en tiempo real
|
| """)
|
|
|
| with gr.Tabs():
|
|
|
| with gr.TabItem("🔍 Búsqueda por Descripción"):
|
| gr.Markdown("### Describe el producto que buscas en inglés y selecciona el método de búsqueda")
|
|
|
| with gr.Row():
|
| with gr.Column(scale=1):
|
| descripcion_input = gr.Textbox(
|
| label="Describe your ideal product",
|
| placeholder="exp: Handmade shungite bead bracelet, Silver necklace, etc."
|
| )
|
| search_method = gr.Dropdown(
|
| choices=["KNN + Embeddings", "TF-IDF + Cosine", "Clustering + Embeddings", "Comparar Todos"],
|
| value="KNN + Embeddings",
|
| label="Método de búsqueda"
|
| )
|
| max_images = gr.Slider(
|
| minimum=1, maximum=3, value=2, step=1,
|
| label="Máximo de imágenes por producto"
|
| )
|
| num_products = gr.Slider(
|
| minimum=1, maximum=10, value=5, step=1,
|
| label="Número de productos a mostrar"
|
| )
|
| search_btn = gr.Button("🔍 Buscar Productos", variant="primary", size="lg")
|
|
|
| with gr.Column(scale=2):
|
| search_gallery = gr.Gallery(
|
| label="Productos Encontrados",
|
| columns=3,
|
| rows=2,
|
| height="auto"
|
| )
|
|
|
|
|
| if collaborative_recommender:
|
| with gr.TabItem("🤝 Recomendaciones Colaborativas"):
|
| gr.Markdown("### Selecciona un producto base y el método de recomendación colaborativa")
|
|
|
| with gr.Row():
|
| with gr.Column(scale=1):
|
| product_dropdown = gr.Dropdown(
|
| choices=get_product_options(),
|
| label="Selecciona un producto base",
|
| value=None
|
| )
|
| collab_method = gr.Dropdown(
|
| choices=["SVD", "NMF", "Direct Similarity", "Comparar Todos"],
|
| value="SVD",
|
| label="Método colaborativo"
|
| )
|
| recommend_btn = gr.Button("🤝 Obtener Recomendaciones", variant="primary", size="lg")
|
| refresh_products_btn = gr.Button("🔄 Actualizar Lista")
|
|
|
| with gr.Column(scale=2):
|
| recommendations_gallery = gr.Gallery(
|
| label="Recomendaciones Colaborativas",
|
| columns=2,
|
| rows=2,
|
| height="auto"
|
| )
|
|
|
|
|
| with gr.TabItem("👤 Recomendaciones Basadas en Cliente"):
|
| gr.Markdown("""
|
| ### Ingresa los ASINs de productos que un cliente ha seleccionado
|
| **Formato:** Separa los ASINs con comas, espacios o nuevas líneas
|
| **Ejemplo 1:** B07NTK7T5P, B0751M85FV, B01HYNE114, B0BKBJT5MM.
|
| **Ejemplo 2:** B01BAN3CBE, B0754TWHPT, B079KM6HDM, B097B8WH61.
|
| **Ejemplo 3:** B0B8WK62Z3, B01BYCH44W, B0BGNQ3CLH, B084L4PF4M.
|
|
|
| """)
|
|
|
| with gr.Row():
|
| with gr.Column(scale=1):
|
| client_asins_input = gr.Textbox(
|
| label="ASINs de productos seleccionados por el cliente",
|
| placeholder="Insert here the product´s ID´s to get other products you might enjoy!",
|
| lines=3
|
| )
|
| client_method = gr.Dropdown(
|
| choices=["Profile Similarity", "Weighted Categories", "Hybrid Approach", "Comparar Todos"],
|
| value="Hybrid Approach",
|
| label="Método de recomendación"
|
| )
|
| client_num_recs = gr.Slider(
|
| minimum=1, maximum=10, value=5, step=1,
|
| label="Número de recomendaciones"
|
| )
|
| client_recommend_btn = gr.Button("👤 Generar Recomendaciones", variant="primary", size="lg")
|
|
|
| with gr.Accordion("ℹ️ Información de Métodos", open=False):
|
| gr.Markdown("""
|
| **Profile Similarity:** Crea un perfil promedio basado en los embeddings de los productos seleccionados
|
|
|
| **Weighted Categories:** Recomienda basándose en las categorías más frecuentes, ponderadas por rating
|
|
|
| **Hybrid Approach:** Combina embeddings, categorías y ratings con pesos optimizados
|
| """)
|
|
|
| with gr.Column(scale=2):
|
| client_gallery = gr.Gallery(
|
| label="Recomendaciones para el Cliente",
|
| columns=3,
|
| rows=2,
|
| height="auto"
|
| )
|
|
|
|
|
| with gr.TabItem("📊 Métricas y Comparación"):
|
| gr.Markdown("### Análisis de rendimiento y comparación de métodos")
|
|
|
| with gr.Row():
|
| with gr.Column():
|
| metrics_btn = gr.Button("📊 Actualizar Métricas", variant="secondary")
|
| clear_metrics_btn = gr.Button("🗑️ Limpiar Historial")
|
|
|
| metrics_output = gr.Markdown("Ejecuta algunas recomendaciones para ver las métricas...")
|
|
|
|
|
| with gr.Accordion("📈 Estadísticas del Sistema", open=False):
|
| collab_stats = ""
|
| if collaborative_recommender:
|
| collab_stats = f"""
|
| - 🤝 Productos disponibles para recomendación colaborativa: {len(collaborative_recommender.get_available_items()):,}
|
| - 🧮 Dimensiones de matriz SVD: {collaborative_recommender.item_similarity_matrix.shape}
|
| """
|
|
|
| stats_text = f"""
|
| **Estadísticas del Sistema Completo:**
|
| - 📊 Total de productos: {len(df_similars):,}
|
| - ⭐ Productos con ratings: {len(ratings_dict):,}
|
| - 🔍 Embeddings precalculados: {len(description_embeddings):,}
|
| - ✅ Consistencia verificada: {len(description_embeddings) == len(df_similars)}
|
| - 🎯 Métodos de búsqueda: 3 implementados
|
| - 🤝 Métodos colaborativos: {"3 implementados" if collaborative_recommender else "No disponible"}
|
| - 👤 Métodos basados en cliente: 3 implementados
|
| {collab_stats}
|
| """
|
| gr.Markdown(stats_text)
|
|
|
|
|
|
|
|
|
| search_btn.click(
|
| fn=search_products_enhanced,
|
| inputs=[descripcion_input, search_method, max_images, num_products],
|
| outputs=search_gallery
|
| )
|
|
|
|
|
| if collaborative_recommender:
|
| recommend_btn.click(
|
| fn=get_collaborative_recommendations_enhanced,
|
| inputs=[product_dropdown, collab_method],
|
| outputs=recommendations_gallery
|
| )
|
|
|
| refresh_products_btn.click(
|
| fn=lambda: gr.Dropdown(choices=get_product_options()),
|
| outputs=product_dropdown
|
| )
|
|
|
|
|
| client_recommend_btn.click(
|
| fn=get_client_recommendations,
|
| inputs=[client_asins_input, client_method, client_num_recs],
|
| outputs=client_gallery
|
| )
|
|
|
|
|
| metrics_btn.click(
|
| fn=get_metrics_report,
|
| outputs=metrics_output
|
| )
|
|
|
| def clear_metrics():
|
| global metrics_evaluator
|
| metrics_evaluator = RecommendationMetrics()
|
| return "Historial de métricas limpiado."
|
|
|
| clear_metrics_btn.click(
|
| fn=clear_metrics,
|
| outputs=metrics_output
|
| )
|
|
|
| return demo
|
|
|
|
|
| if __name__ == "__main__":
|
| print("🚀 Iniciando sistema avanzado de recomendación...")
|
|
|
|
|
| print(f"✅ DataFrame: {len(df_similars):,} productos")
|
| print(f"✅ Embeddings: {description_embeddings.shape}")
|
| print(f"✅ Consistencia: {len(description_embeddings) == len(df_similars)}")
|
| print(f"✅ Búsqueda por descripción: 3 métodos disponibles")
|
|
|
| if collaborative_recommender:
|
| print(f"✅ Sistema colaborativo: 3 métodos con {len(collaborative_recommender.get_available_items()):,} productos")
|
| else:
|
| print("⚠️ Sistema colaborativo no disponible")
|
|
|
| print(f"✅ Sistema basado en cliente: 3 métodos disponibles")
|
| print(f"✅ Sistema de métricas: Inicializado")
|
|
|
|
|
| demo = create_enhanced_interface()
|
| demo.launch(
|
| share=False,
|
| debug=False,
|
| show_error=True,
|
| server_name="0.0.0.0",
|
| server_port=7860
|
| ) |