Spaces:
Running
Running
| """ | |
| Database connection helpers for Supabase. | |
| Uses Supabase RPC for pgvector similarity search (works over HTTPS), | |
| and supabase-py for standard CRUD operations. | |
| """ | |
| import os | |
| import numpy as np | |
| from supabase import create_client, Client | |
| # --- Supabase Client (for CRUD operations) --- | |
| _supabase_client: Client = None | |
| def get_supabase() -> Client: | |
| """Get or create the Supabase client for standard CRUD operations.""" | |
| global _supabase_client | |
| if _supabase_client is None: | |
| _supabase_client = create_client( | |
| os.environ["SUPABASE_URL"].strip(), | |
| os.environ["SUPABASE_KEY"].strip(), | |
| ) | |
| return _supabase_client | |
| def close_db_pool(): | |
| """Placeholder for shutdown compatibility.""" | |
| pass | |
| # --- Helper: embedding <-> database conversion --- | |
| def embedding_to_pgvector(embedding: np.ndarray) -> str: | |
| """Convert a numpy embedding to pgvector string format: '[0.1,0.2,...]'""" | |
| return "[" + ",".join(f"{x:.8f}" for x in embedding.tolist()) + "]" | |
| def pgvector_to_embedding(pgvector_str: str) -> np.ndarray: | |
| """Convert a pgvector string back to numpy array.""" | |
| values = pgvector_str.strip("[]").split(",") | |
| return np.array([float(v) for v in values], dtype=np.float32) | |
| # --- pgvector similarity search (via Supabase RPC) --- | |
| def search_similar_products(query_embedding: np.ndarray, top_k: int = 50): | |
| """ | |
| Find the top_k most similar products to the query embedding | |
| using pgvector cosine similarity search via Supabase RPC. | |
| """ | |
| sb = get_supabase() | |
| embedding_list = query_embedding.tolist() | |
| response = sb.rpc("search_products_by_embedding", { | |
| "query_embedding": embedding_list, | |
| "match_count": top_k, | |
| }).execute() | |
| results = [] | |
| for row in response.data: | |
| results.append({ | |
| "id": str(row["id"]), | |
| "seller_id": str(row["seller_id"]), | |
| "title": row["title"], | |
| "description": row["description"], | |
| "price": float(row["price"]), | |
| "stock": int(row["stock"]), | |
| "images": row["images"] or [], | |
| "embedding": pgvector_to_embedding(row["embedding_text"]), | |
| "similarity": float(row["similarity"]), | |
| }) | |
| return results | |
| def search_similar_products_filtered( | |
| query_embedding: np.ndarray, | |
| top_k: int = 50, | |
| price_min: float = None, | |
| price_max: float = None, | |
| brand: str = None, | |
| color: str = None, | |
| ): | |
| """ | |
| Find the top_k most similar products with optional structured filters | |
| via Supabase RPC. | |
| """ | |
| sb = get_supabase() | |
| embedding_list = query_embedding.tolist() | |
| response = sb.rpc("search_products_by_embedding_filtered", { | |
| "query_embedding": embedding_list, | |
| "match_count": top_k, | |
| "filter_price_min": price_min, | |
| "filter_price_max": price_max, | |
| "filter_brand": brand, | |
| "filter_color": color, | |
| }).execute() | |
| results = [] | |
| for row in response.data: | |
| results.append({ | |
| "id": str(row["id"]), | |
| "seller_id": str(row["seller_id"]), | |
| "title": row["title"], | |
| "description": row["description"], | |
| "price": float(row["price"]), | |
| "stock": int(row["stock"]), | |
| "images": row["images"] or [], | |
| "embedding": pgvector_to_embedding(row["embedding_text"]), | |
| "similarity": float(row["similarity"]), | |
| }) | |
| return results | |
| def store_product_embedding(product_id: str, embedding: np.ndarray): | |
| """Store/update the BERT embedding for a product via Supabase RPC.""" | |
| sb = get_supabase() | |
| embedding_str = embedding_to_pgvector(embedding) | |
| sb.rpc("update_product_embedding", { | |
| "p_product_id": product_id, | |
| "p_embedding": embedding_str, | |
| }).execute() | |