FeedRecommender / app.py
andykr1k
empty
f88c1fa
import os
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from supabase import create_client
import numpy as np
from collections import defaultdict
import random
from datetime import datetime, timedelta
import asyncio
from typing import List, Dict
from dotenv import load_dotenv
import logging
from zoneinfo import ZoneInfo
from sentence_transformers import SentenceTransformer
from apscheduler.schedulers.background import BackgroundScheduler
import math
from contextlib import asynccontextmanager # Import this for lifespan
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Configuration
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
TOP_K = 75
HISTORY_WINDOW = timedelta(days=1000)
TIMEZONE = ZoneInfo("UTC")
UPDATE_INTERVAL = 300 # In seconds (5 minutes)
START_TIME = datetime.now(TIMEZONE)
# Global variables
supabase_client = None
user_interactions = defaultdict(set)
post_features = {}
post_metadata = {}
if not os.path.exists('/tmp/cache'):
os.makedirs('/tmp/cache')
sentence_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='/tmp/cache')
SUPABASE_URL = os.getenv('supabaseUrl')
SUPABASE_KEY = os.getenv('supabaseAnonKey')
def get_supabase_client():
global supabase_client
if supabase_client is None:
supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY)
return supabase_client
def parse_datetime(dt_str: str) -> datetime:
"""Parse ISO datetime string and ensure correct microsecond precision."""
try:
if '.' in dt_str:
date_part, time_part = dt_str.split('T')
time_part, tz_part = time_part.split('+')
if '.' in time_part:
time_without_micro, micro = time_part.split('.')
micro = micro.ljust(6, '0')
time_part = f"{time_without_micro}.{micro}"
dt_str = f"{date_part}T{time_part}+00:00"
return datetime.fromisoformat(dt_str).astimezone(TIMEZONE)
except Exception as e:
logger.error(f"Error parsing datetime: {dt_str} - {str(e)}")
raise
def decay_weight(interaction_time: datetime, current_time: datetime, decay_rate: float = 0.0001) -> float:
"""
Compute an exponential decay weight for an interaction based on its age.
The decay_rate can be tuned to control how fast interactions lose weight.
"""
time_diff = (current_time - interaction_time).total_seconds()
return math.exp(-decay_rate * time_diff)
def normalize_profile(profile: np.ndarray) -> np.ndarray:
norm = np.linalg.norm(profile)
return profile / norm if norm > 0 else profile
def compute_score(sim_score: float, popularity: float, freshness: float) -> float:
"""
Compute a non-linear recommendation score.
- sim_score is squared to emphasize strong similarities.
- Popularity is log-transformed.
- Freshness is combined linearly.
"""
return 0.4 * (sim_score ** 2) + 0.3 * np.log1p(popularity) + 0.3 * freshness
class Recommender:
def __init__(self, like_weight=1.0, comment_weight=0.5, comment_like_weight=0.3,
reply_weight=0.5, reply_like_weight=0.3):
self.user_profiles = defaultdict(lambda: np.zeros(384))
self.post_popularity = defaultdict(float)
self.last_update = datetime.now(TIMEZONE) - HISTORY_WINDOW
# Parameterized weights
self.like_weight = like_weight
self.comment_weight = comment_weight
self.comment_like_weight = comment_like_weight
self.reply_weight = reply_weight
self.reply_like_weight = reply_like_weight
async def fetch_existing_post_ids(self) -> set:
supabase = get_supabase_client()
page_size = 1000
page = 0
post_ids = set()
while True:
response = await asyncio.to_thread(
supabase.table('posts')
.select('id')
.range(page * page_size, (page + 1) * page_size - 1)
.execute
)
if not response.data:
break
for row in response.data:
post_ids.add(row['id'])
page += 1
return post_ids
def clean_deleted_posts(self, existing_post_ids: set):
# Determine which posts are missing
all_cached_ids = set(post_metadata.keys())
deleted_ids = all_cached_ids - existing_post_ids
for post_id in deleted_ids:
post_metadata.pop(post_id, None)
post_features.pop(post_id, None)
self.post_popularity.pop(post_id, None)
for user_id in user_interactions:
user_interactions[user_id] -= deleted_ids # remove any deleted post_ids
async def fetch_all_rows(self, table_name: str, columns: str, last_update: datetime, post_id_not_null: bool):
"""Fetch all rows from a table using pagination."""
supabase = get_supabase_client()
page_size = 1000
page = 0
all_data = []
while True:
if post_id_not_null:
response = await asyncio.to_thread(
supabase.table(table_name)
.select(columns)
.gt('created_at', last_update.isoformat())
.not_.is_("post_id", None)
.range(page * page_size, (page + 1) * page_size - 1)
.execute
)
else:
response = await asyncio.to_thread(
supabase.table(table_name)
.select(columns)
.gt('created_at', last_update.isoformat())
.range(page * page_size, (page + 1) * page_size - 1)
.execute
)
if not response.data:
break
all_data.extend(response.data)
page += 1
return all_data
async def update_data(self):
# Current time for decay calculation
current_time = datetime.now(TIMEZONE)
# Fetch all interaction data since last update
likes = await self.fetch_all_rows('likes', 'user_id, post_id, created_at', self.last_update, True)
comments = await self.fetch_all_rows('comments', 'id, user_id, post_id, created_at, comment', self.last_update, True)
commentlikes = await self.fetch_all_rows('commentlikes', 'user_id, author_id, post_id, created_at, comment_id', self.last_update, True)
replies = await self.fetch_all_rows('replies', 'user_id, to, post_id, created_at, comment, comment_id, reply_id', self.last_update, True)
replylikes = await self.fetch_all_rows('replylikes', 'user_id, author_id, post_id, created_at, reply_id', self.last_update, True)
# Process likes with time decay
for like in likes:
user_id = like['user_id']
post_id = like['post_id']
interaction_time = parse_datetime(like['created_at'])
weight = decay_weight(interaction_time, current_time)
user_interactions[user_id].add(post_id)
self.post_popularity[post_id] += self.like_weight * weight
if post_id in post_features:
self.user_profiles[user_id] += post_features[post_id] * \
self.like_weight * weight
# Process comments with time decay
for comment in comments:
user_id = comment['user_id']
post_id = comment['post_id']
interaction_time = parse_datetime(comment['created_at'])
weight = decay_weight(interaction_time, current_time)
user_interactions[user_id].add(post_id)
self.post_popularity[post_id] += self.comment_weight * weight
if post_id in post_features:
self.user_profiles[user_id] += post_features[post_id] * \
self.comment_weight * weight
# Process comment likes with time decay
for clike in commentlikes:
user_id = clike['user_id'] # User who liked the comment
post_id = clike['post_id']
interaction_time = parse_datetime(clike['created_at'])
weight = decay_weight(interaction_time, current_time)
user_interactions[user_id].add(post_id)
self.post_popularity[post_id] += self.comment_like_weight * weight
if post_id in post_features:
self.user_profiles[user_id] += post_features[post_id] * \
self.comment_like_weight * weight
# Process replies with time decay
for reply in replies:
user_id = reply['user_id']
post_id = reply['post_id']
interaction_time = parse_datetime(reply['created_at'])
weight = decay_weight(interaction_time, current_time)
user_interactions[user_id].add(post_id)
self.post_popularity[post_id] += self.reply_weight * weight
if post_id in post_features:
self.user_profiles[user_id] += post_features[post_id] * \
self.reply_weight * weight
# Process reply likes with time decay
for rlike in replylikes:
user_id = rlike['user_id']
post_id = rlike['post_id']
interaction_time = parse_datetime(rlike['created_at'])
weight = decay_weight(interaction_time, current_time)
user_interactions[user_id].add(post_id)
self.post_popularity[post_id] += self.reply_like_weight * weight
if post_id in post_features:
self.user_profiles[user_id] += post_features[post_id] * \
self.reply_like_weight * weight
# OPTIONAL: Process negative feedback if available
# for negative in negative_interactions:
# user_id = negative['user_id']
# post_id = negative['post_id']
# interaction_time = parse_datetime(negative['created_at'])
# weight = decay_weight(interaction_time, current_time)
# user_interactions[user_id].add(post_id)
# self.post_popularity[post_id] -= some_negative_weight * weight
# if post_id in post_features:
# self.user_profiles[user_id] -= post_features[post_id] * some_negative_weight * weight
# Normalize user profiles after processing interactions
for user_id in self.user_profiles:
self.user_profiles[user_id] = normalize_profile(
self.user_profiles[user_id])
# Fetch and update post features
posts = await self.fetch_all_rows('posts', '*', self.last_update, False)
post_texts, post_ids = [], []
for post in posts:
post_id = post['id']
text = f"{post.get('movie_name', '')} {post.get('content', '')}".strip(
)
post_texts.append(text)
post_ids.append(post_id)
post['created_at'] = parse_datetime(post['created_at'])
post['type'] = 'post'
post_metadata[post_id] = post
if post_texts:
embeddings = sentence_model.encode(
post_texts, show_progress_bar=False, convert_to_numpy=True)
for post_id, embedding in zip(post_ids, embeddings):
post_features[post_id] = embedding / np.linalg.norm(embedding)
# existing_post_ids = await self.fetch_existing_post_ids()
# self.clean_deleted_posts(existing_post_ids)
self.last_update = datetime.now(TIMEZONE)
total_interactions = len(likes) + len(comments) + \
len(commentlikes) + len(replies) + len(replylikes)
logger.info(
f"Data updated: {len(posts)} posts, {total_interactions} interactions")
def get_recommendations(self, user_id: str) -> List[Dict]:
user_profile = self.user_profiles[user_id]
seen_posts = user_interactions[user_id]
scores = {}
now = datetime.now(TIMEZONE)
for post_id, feature in post_features.items():
post = post_metadata[post_id]
if post_id in seen_posts or post.get("author") == user_id:
continue
sim_score = np.dot(user_profile, feature) if np.any(
user_profile) else 0
time_diff = now - post_metadata[post_id]['created_at']
# Freshness is defined as an exponential decay based on three days old
freshness = math.exp(-time_diff.days / 3.0)
score = compute_score(
sim_score, self.post_popularity[post_id], freshness)
# Adding a small random noise for exploration
scores[post_id] = score + random.uniform(-0.1, 0.1)
top_posts = sorted(
scores.items(), key=lambda x: x[1], reverse=True)[:TOP_K]
results = [post_metadata[post_id] for post_id, _ in top_posts]
random.shuffle(results)
return results
recommender = Recommender()
scheduler = BackgroundScheduler(timezone="UTC")
async def background_update():
await recommender.update_data()
def sync_background_update():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(background_update())
loop.close()
# Lifespan context manager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup event
logger.info("Starting up application...")
await recommender.update_data()
scheduler.add_job(sync_background_update, 'interval', seconds=UPDATE_INTERVAL)
scheduler.start()
yield
# Shutdown event
logger.info("Shutting down application...")
scheduler.shutdown()
logger.info("Scheduler shut down")
# FastAPI app setup with lifespan
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/recommend/feed")
async def get_recommendations_handler(user_id: str = Query(...)):
try:
recommendations = recommender.get_recommendations(user_id)
if recommendations:
if not any(item.get("type") == "suggestedaccounts" for item in recommendations):
insert_pos = random.randint(0, min(9, len(recommendations) - 1))
recommendations.insert(insert_pos, {"type": "suggestedaccounts"})
ad_frequency = 10
i = ad_frequency
while i < len(recommendations):
recommendations.insert(i, {"type": "ad"})
i += ad_frequency + 1 # Adjust for inserted item
return {"status": "success", "recommendations": recommendations}
except Exception as e:
logger.error(f"Error generating recommendations: {str(e)}")
return {"status": "error", "message": str(e)}
@app.get("/")
async def health_check():
return {"status": "success", "message": "Service operational"}