import os import math import random import asyncio import logging from collections import defaultdict from datetime import datetime, timedelta from zoneinfo import ZoneInfo from typing import List, Dict import numpy as np from dotenv import load_dotenv from supabase import create_client from sentence_transformers import SentenceTransformer import pdb # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Configuration SEED = 42 random.seed(SEED) np.random.seed(SEED) TOP_K = 75 HISTORY_WINDOW = timedelta(days=1000) TIMEZONE = ZoneInfo("UTC") # Global variables supabase_client = None user_interactions = defaultdict(set) post_features = {} post_metadata = {} if not os.path.exists('/tmp/cache'): os.makedirs('/tmp/cache') sentence_model = SentenceTransformer( 'all-MiniLM-L6-v2', cache_folder='/tmp/cache') SUPABASE_URL = os.getenv('supabaseUrl') SUPABASE_KEY = os.getenv('supabaseAnonKey') def get_supabase_client(): global supabase_client if supabase_client is None: supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY) return supabase_client def parse_datetime(dt_str: str) -> datetime: """Parse ISO datetime string and ensure correct microsecond precision.""" try: if '.' in dt_str: date_part, time_part = dt_str.split('T') time_part, tz_part = time_part.split('+') if '.' in time_part: time_without_micro, micro = time_part.split('.') micro = micro.ljust(6, '0') time_part = f"{time_without_micro}.{micro}" dt_str = f"{date_part}T{time_part}+00:00" return datetime.fromisoformat(dt_str).astimezone(TIMEZONE) except Exception as e: logger.error(f"Error parsing datetime: {dt_str} - {str(e)}") raise def decay_weight(interaction_time: datetime, current_time: datetime, decay_rate: float = 0.0001) -> float: """ Compute an exponential decay weight for an interaction based on its age. The decay_rate can be tuned to control how fast interactions lose weight. """ time_diff = (current_time - interaction_time).total_seconds() return math.exp(-decay_rate * time_diff) def normalize_profile(profile: np.ndarray) -> np.ndarray: norm = np.linalg.norm(profile) return profile / norm if norm > 0 else profile def compute_score(sim_score: float, popularity: float, freshness: float) -> float: """ Compute a non-linear recommendation score. - sim_score is squared to emphasize strong similarities. - Popularity is log-transformed. - Freshness is combined linearly. """ return 0.6 * (sim_score ** 2) + 0.3 * np.log1p(popularity) + 0.1 * freshness class Recommender: def __init__(self, like_weight=1.0, comment_weight=0.5, comment_like_weight=0.3, reply_weight=0.5, reply_like_weight=0.3): self.user_profiles = defaultdict(lambda: np.zeros(384)) self.post_popularity = defaultdict(float) self.last_update = datetime.now(TIMEZONE) - HISTORY_WINDOW # Parameterized weights self.like_weight = like_weight self.comment_weight = comment_weight self.comment_like_weight = comment_like_weight self.reply_weight = reply_weight self.reply_like_weight = reply_like_weight async def fetch_all_rows(self, table_name: str, columns: str, last_update: datetime, post_id_not_null: bool): """Fetch all rows from a table using pagination.""" supabase = get_supabase_client() page_size = 1000 page = 0 all_data = [] while True: if post_id_not_null: response = await asyncio.to_thread( supabase.table(table_name) .select(columns) .gt('created_at', last_update.isoformat()) .not_.is_("post_id", None) .range(page * page_size, (page + 1) * page_size - 1) .execute ) else: response = await asyncio.to_thread( supabase.table(table_name) .select(columns) .gt('created_at', last_update.isoformat()) .range(page * page_size, (page + 1) * page_size - 1) .execute ) if not response.data: break all_data.extend(response.data) page += 1 return all_data async def update_data(self): # Current time for decay calculation current_time = datetime.now(TIMEZONE) # Fetch all interaction data since last update likes = await self.fetch_all_rows('likes', 'user_id, post_id, created_at', self.last_update, True) comments = await self.fetch_all_rows('comments', 'id, user_id, post_id, created_at, comment', self.last_update, True) commentlikes = await self.fetch_all_rows('commentlikes', 'user_id, author_id, post_id, created_at, comment_id', self.last_update, True) replies = await self.fetch_all_rows('replies', 'user_id, to, post_id, created_at, comment, comment_id, reply_id', self.last_update, True) replylikes = await self.fetch_all_rows('replylikes', 'user_id, author_id, post_id, created_at, reply_id', self.last_update, True) # Process likes with time decay for like in likes: user_id = like['user_id'] post_id = like['post_id'] interaction_time = parse_datetime(like['created_at']) weight = decay_weight(interaction_time, current_time) user_interactions[user_id].add(post_id) self.post_popularity[post_id] += self.like_weight * weight if post_id in post_features: self.user_profiles[user_id] += post_features[post_id] * \ self.like_weight * weight # Process comments with time decay for comment in comments: user_id = comment['user_id'] post_id = comment['post_id'] interaction_time = parse_datetime(comment['created_at']) weight = decay_weight(interaction_time, current_time) user_interactions[user_id].add(post_id) self.post_popularity[post_id] += self.comment_weight * weight if post_id in post_features: self.user_profiles[user_id] += post_features[post_id] * \ self.comment_weight * weight # Process comment likes with time decay for clike in commentlikes: user_id = clike['user_id'] # User who liked the comment post_id = clike['post_id'] interaction_time = parse_datetime(clike['created_at']) weight = decay_weight(interaction_time, current_time) user_interactions[user_id].add(post_id) self.post_popularity[post_id] += self.comment_like_weight * weight if post_id in post_features: self.user_profiles[user_id] += post_features[post_id] * \ self.comment_like_weight * weight # Process replies with time decay for reply in replies: user_id = reply['user_id'] post_id = reply['post_id'] interaction_time = parse_datetime(reply['created_at']) weight = decay_weight(interaction_time, current_time) user_interactions[user_id].add(post_id) self.post_popularity[post_id] += self.reply_weight * weight if post_id in post_features: self.user_profiles[user_id] += post_features[post_id] * \ self.reply_weight * weight # Process reply likes with time decay for rlike in replylikes: user_id = rlike['user_id'] post_id = rlike['post_id'] interaction_time = parse_datetime(rlike['created_at']) weight = decay_weight(interaction_time, current_time) user_interactions[user_id].add(post_id) self.post_popularity[post_id] += self.reply_like_weight * weight if post_id in post_features: self.user_profiles[user_id] += post_features[post_id] * \ self.reply_like_weight * weight # OPTIONAL: Process negative feedback if available # for negative in negative_interactions: # user_id = negative['user_id'] # post_id = negative['post_id'] # interaction_time = parse_datetime(negative['created_at']) # weight = decay_weight(interaction_time, current_time) # user_interactions[user_id].add(post_id) # self.post_popularity[post_id] -= some_negative_weight * weight # if post_id in post_features: # self.user_profiles[user_id] -= post_features[post_id] * some_negative_weight * weight # Normalize user profiles after processing interactions for user_id in self.user_profiles: self.user_profiles[user_id] = normalize_profile( self.user_profiles[user_id]) # Fetch and update post features posts = await self.fetch_all_rows('posts', '*', self.last_update, False) post_texts, post_ids = [], [] for post in posts: post_id = post['id'] text = f"{post.get('movie_name', '')} {post.get('content', '')}".strip( ) post_texts.append(text) post_ids.append(post_id) post['created_at'] = parse_datetime(post['created_at']) post['type'] = 'post' post_metadata[post_id] = post if post_texts: embeddings = sentence_model.encode( post_texts, show_progress_bar=False, convert_to_numpy=True) for post_id, embedding in zip(post_ids, embeddings): post_features[post_id] = embedding / np.linalg.norm(embedding) self.last_update = datetime.now(TIMEZONE) total_interactions = len(likes) + len(comments) + \ len(commentlikes) + len(replies) + len(replylikes) logger.info( f"Data updated: {len(posts)} posts, {total_interactions} interactions") def get_recommendations(self, user_id: str) -> List[Dict]: user_profile = self.user_profiles[user_id] seen_posts = user_interactions[user_id] scores = {} now = datetime.now(TIMEZONE) for post_id, feature in post_features.items(): if post_id in seen_posts: continue sim_score = np.dot(user_profile, feature) if np.any( user_profile) else 0 time_diff = now - post_metadata[post_id]['created_at'] # Freshness is defined as an exponential decay based on weeks old freshness = math.exp(-time_diff.days / 7.0) score = compute_score( sim_score, self.post_popularity[post_id], freshness) # Adding a small random noise for exploration scores[post_id] = score + random.uniform(-0.1, 0.1) top_posts = sorted( scores.items(), key=lambda x: x[1], reverse=True)[:TOP_K] results = [post_metadata[post_id] for post_id, _ in top_posts] random.shuffle(results) return results recommender = Recommender() async def main(): # pdb.set_trace() user_id = "d7411324-c8ea-42cb-ae59-7b8cbc61594c" await recommender.update_data() recommendations = recommender.get_recommendations(user_id) print(f"Recommendations for user {user_id}:") for post in recommendations: print( f"- Post {post['id']}: Movie: {post.get('movie_name', '')} Caption: {post.get('content', '')}") if __name__ == "__main__": asyncio.run(main())