andykr1k commited on
Commit
3040e30
·
1 Parent(s): 20560f9

New feed recommender

Browse files
Files changed (1) hide show
  1. app.py +42 -93
app.py CHANGED
@@ -18,7 +18,10 @@ from apscheduler.schedulers.background import BackgroundScheduler
18
  logging.basicConfig(level=logging.INFO)
19
  logger = logging.getLogger(__name__)
20
 
 
21
  load_dotenv()
 
 
22
  app = FastAPI()
23
 
24
  app.add_middleware(
@@ -38,7 +41,7 @@ CACHE_SIZE = 1000
38
  TOP_K = 75
39
  HISTORY_WINDOW = timedelta(days=30)
40
  TIMEZONE = ZoneInfo("UTC")
41
- UPDATE_INTERVAL = 300
42
 
43
  # Global variables
44
  supabase_client = None
@@ -53,36 +56,30 @@ sentence_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='/tmp/cach
53
  SUPABASE_URL = os.getenv('supabaseUrl')
54
  SUPABASE_KEY = os.getenv('supabaseAnonKey')
55
 
 
56
  def get_supabase_client():
57
  global supabase_client
58
  if supabase_client is None:
59
  supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY)
60
  return supabase_client
61
 
62
- async def fetch_full_post_records(post_ids: List[str], batch_size=1000):
63
- supabase = get_supabase_client()
64
- if not post_ids:
65
- return []
66
-
67
- records = []
68
- tasks = []
69
- for i in range(0, len(post_ids), batch_size):
70
- batch_ids = post_ids[i:i + batch_size]
71
- task = asyncio.to_thread(
72
- supabase.table('posts').select('*').in_('id', batch_ids).execute
73
- )
74
- tasks.append(task)
75
-
76
- responses = await asyncio.gather(*tasks)
77
- for response in responses:
78
- batch_records = response.data
79
- for record in batch_records:
80
- record['type'] = 'post'
81
- record['created_at'] = datetime.fromisoformat(
82
- record['created_at'].replace('Z', '+00:00')
83
- ).astimezone(TIMEZONE)
84
- records.extend(batch_records)
85
- return records
86
 
87
  class Recommender:
88
  def __init__(self):
@@ -92,45 +89,45 @@ class Recommender:
92
 
93
  async def update_interactions(self):
94
  supabase = get_supabase_client()
95
- last_update = self.last_update
96
 
97
  # Fetch new likes
98
  likes = await asyncio.to_thread(
99
  supabase.table('likes')
100
  .select('user_id, post_id, created_at')
101
- .gt('created_at', last_update.isoformat())
102
  .execute
103
  )
 
104
  # Fetch new comments
105
  comments = await asyncio.to_thread(
106
  supabase.table('comments')
107
  .select('user_id, post_id, created_at')
108
- .gt('created_at', last_update.isoformat())
109
  .execute
110
  )
111
 
112
  for like in likes.data:
113
  user_id = like['user_id']
114
  post_id = like['post_id']
115
- timestamp = datetime.fromisoformat(like['created_at'].replace('Z', '+00:00')).astimezone(TIMEZONE)
116
  user_interactions[user_id].add(post_id)
117
  self.post_popularity[post_id] += 1.0
118
  if post_id in post_features:
119
  self.user_profiles[user_id] = (
120
- 0.5 * self.user_profiles[user_id] +
121
- 0.5 * post_features[post_id]
122
  )
123
 
124
  for comment in comments.data:
125
  user_id = comment['user_id']
126
  post_id = comment['post_id']
127
- timestamp = datetime.fromisoformat(comment['created_at'].replace('Z', '+00:00')).astimezone(TIMEZONE)
128
  user_interactions[user_id].add(post_id)
129
  self.post_popularity[post_id] += 0.5
130
  if post_id in post_features:
131
  self.user_profiles[user_id] = (
132
- 0.5 * self.user_profiles[user_id] +
133
- 0.5 * post_features[post_id]
134
  )
135
 
136
  self.last_update = datetime.now(TIMEZONE)
@@ -138,7 +135,7 @@ class Recommender:
138
 
139
  async def update_posts(self):
140
  supabase = get_supabase_client()
141
- # Fetch new or updated posts since last update
142
  posts = await asyncio.to_thread(
143
  supabase.table('posts')
144
  .select('*')
@@ -155,9 +152,7 @@ class Recommender:
155
  text = f"{title} {description}".strip()
156
  post_texts.append(text)
157
  post_ids.append(post_id)
158
- post['created_at'] = datetime.fromisoformat(
159
- post['created_at'].replace('Z', '+00:00')
160
- ).astimezone(TIMEZONE)
161
  post['type'] = 'post'
162
  post_metadata[post_id] = post
163
 
@@ -169,49 +164,12 @@ class Recommender:
169
  logger.info(f"Updated {len(posts.data)} posts")
170
 
171
  async def full_update(self):
172
- """Full refresh at startup"""
173
- supabase = get_supabase_client()
174
- posts = await asyncio.to_thread(
175
- supabase.table('posts')
176
- .select('*')
177
- .execute
178
- )
179
-
180
- post_texts = []
181
- post_ids = []
182
- for post in posts.data:
183
- post_id = post['id']
184
- title = post.get('movie_name', '') or ''
185
- description = post.get('content', '') or ''
186
- text = f"{title} {description}".strip()
187
- post_texts.append(text)
188
- post_ids.append(post_id)
189
-
190
- # Adjust the timestamp to ensure six digits for microseconds
191
- created_at_str = post['created_at']
192
- if '.' in created_at_str:
193
- date_part, time_part = created_at_str.split('T')
194
- time_part = time_part.split('+')[0] # Remove timezone for now
195
- if '.' in time_part:
196
- time_without_micro, micro = time_part.split('.')
197
- micro = micro.zfill(6) # Pad with zeros to make it six digits
198
- time_part = f"{time_without_micro}.{micro}"
199
- created_at_str = f"{date_part}T{time_part}+00:00"
200
- post['created_at'] = datetime.fromisoformat(created_at_str).astimezone(TIMEZONE)
201
- post['type'] = 'post'
202
- post_metadata[post_id] = post
203
-
204
- if post_texts:
205
- embeddings = sentence_model.encode(post_texts, show_progress_bar=True, convert_to_numpy=True)
206
- for post_id, embedding in zip(post_ids, embeddings):
207
- post_features[post_id] = embedding / np.linalg.norm(embedding)
208
-
209
  await self.update_interactions()
210
 
211
  def get_recommendations(self, user_id: str) -> List[Dict]:
212
  if user_id in recommendation_cache:
213
- cached = recommendation_cache[user_id]
214
- return random.sample(cached, min(TOP_K, len(cached)))
215
 
216
  user_profile = self.user_profiles[user_id]
217
  seen_posts = user_interactions[user_id]
@@ -227,21 +185,14 @@ class Recommender:
227
  time_diff = now - post_metadata[post_id]['created_at']
228
  freshness = 1.0 / (1.0 + (time_diff.days / 7))
229
  score = (
230
- 0.6 * sim_score +
231
- 0.3 * np.log1p(self.post_popularity[post_id]) +
232
- 0.1 * freshness
233
  )
234
  scores[post_id] = score + random.uniform(-0.1, 0.1)
235
 
236
- top_posts = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:TOP_K * 2]
237
- selected = random.sample(top_posts, min(TOP_K, len(top_posts)))
238
-
239
- results = []
240
- for post_id, score in selected:
241
- post_record = post_metadata[post_id].copy()
242
- post_record['score'] = score
243
- post_record['created_at'] = post_record['created_at'].isoformat()
244
- results.append(post_record)
245
 
246
  recommendation_cache[user_id] = results
247
  if len(recommendation_cache) > CACHE_SIZE:
@@ -259,10 +210,8 @@ async def background_update():
259
  def sync_background_update():
260
  loop = asyncio.new_event_loop()
261
  asyncio.set_event_loop(loop)
262
- try:
263
- loop.run_until_complete(background_update())
264
- finally:
265
- loop.close()
266
 
267
  @app.get("/recommend/feed")
268
  async def get_recommendations_handler(user_id: str = Query(...)):
 
18
  logging.basicConfig(level=logging.INFO)
19
  logger = logging.getLogger(__name__)
20
 
21
+ # Load environment variables
22
  load_dotenv()
23
+
24
+ # FastAPI app setup
25
  app = FastAPI()
26
 
27
  app.add_middleware(
 
41
  TOP_K = 75
42
  HISTORY_WINDOW = timedelta(days=30)
43
  TIMEZONE = ZoneInfo("UTC")
44
+ UPDATE_INTERVAL = 300 # In seconds (5 minutes)
45
 
46
  # Global variables
47
  supabase_client = None
 
56
  SUPABASE_URL = os.getenv('supabaseUrl')
57
  SUPABASE_KEY = os.getenv('supabaseAnonKey')
58
 
59
+
60
  def get_supabase_client():
61
  global supabase_client
62
  if supabase_client is None:
63
  supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY)
64
  return supabase_client
65
 
66
+
67
+ def parse_datetime(dt_str: str) -> datetime:
68
+ """Parse ISO datetime string and ensure correct microsecond precision."""
69
+ try:
70
+ if '.' in dt_str:
71
+ date_part, time_part = dt_str.split('T')
72
+ time_part, tz_part = time_part.split('+')
73
+ if '.' in time_part:
74
+ time_without_micro, micro = time_part.split('.')
75
+ micro = micro.ljust(6, '0') # Ensure microseconds are 6 digits
76
+ time_part = f"{time_without_micro}.{micro}"
77
+ dt_str = f"{date_part}T{time_part}+00:00"
78
+ return datetime.fromisoformat(dt_str).astimezone(TIMEZONE)
79
+ except Exception as e:
80
+ logger.error(f"Error parsing datetime: {dt_str} - {str(e)}")
81
+ raise
82
+
 
 
 
 
 
 
 
83
 
84
  class Recommender:
85
  def __init__(self):
 
89
 
90
  async def update_interactions(self):
91
  supabase = get_supabase_client()
 
92
 
93
  # Fetch new likes
94
  likes = await asyncio.to_thread(
95
  supabase.table('likes')
96
  .select('user_id, post_id, created_at')
97
+ .gt('created_at', self.last_update.isoformat())
98
  .execute
99
  )
100
+
101
  # Fetch new comments
102
  comments = await asyncio.to_thread(
103
  supabase.table('comments')
104
  .select('user_id, post_id, created_at')
105
+ .gt('created_at', self.last_update.isoformat())
106
  .execute
107
  )
108
 
109
  for like in likes.data:
110
  user_id = like['user_id']
111
  post_id = like['post_id']
112
+ timestamp = parse_datetime(like['created_at'])
113
  user_interactions[user_id].add(post_id)
114
  self.post_popularity[post_id] += 1.0
115
  if post_id in post_features:
116
  self.user_profiles[user_id] = (
117
+ 0.5 * self.user_profiles[user_id] +
118
+ 0.5 * post_features[post_id]
119
  )
120
 
121
  for comment in comments.data:
122
  user_id = comment['user_id']
123
  post_id = comment['post_id']
124
+ timestamp = parse_datetime(comment['created_at'])
125
  user_interactions[user_id].add(post_id)
126
  self.post_popularity[post_id] += 0.5
127
  if post_id in post_features:
128
  self.user_profiles[user_id] = (
129
+ 0.5 * self.user_profiles[user_id] +
130
+ 0.5 * post_features[post_id]
131
  )
132
 
133
  self.last_update = datetime.now(TIMEZONE)
 
135
 
136
  async def update_posts(self):
137
  supabase = get_supabase_client()
138
+
139
  posts = await asyncio.to_thread(
140
  supabase.table('posts')
141
  .select('*')
 
152
  text = f"{title} {description}".strip()
153
  post_texts.append(text)
154
  post_ids.append(post_id)
155
+ post['created_at'] = parse_datetime(post['created_at'])
 
 
156
  post['type'] = 'post'
157
  post_metadata[post_id] = post
158
 
 
164
  logger.info(f"Updated {len(posts.data)} posts")
165
 
166
  async def full_update(self):
167
+ await self.update_posts()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
  await self.update_interactions()
169
 
170
  def get_recommendations(self, user_id: str) -> List[Dict]:
171
  if user_id in recommendation_cache:
172
+ return recommendation_cache[user_id]
 
173
 
174
  user_profile = self.user_profiles[user_id]
175
  seen_posts = user_interactions[user_id]
 
185
  time_diff = now - post_metadata[post_id]['created_at']
186
  freshness = 1.0 / (1.0 + (time_diff.days / 7))
187
  score = (
188
+ 0.6 * sim_score +
189
+ 0.3 * np.log1p(self.post_popularity[post_id]) +
190
+ 0.1 * freshness
191
  )
192
  scores[post_id] = score + random.uniform(-0.1, 0.1)
193
 
194
+ top_posts = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:TOP_K]
195
+ results = [post_metadata[post_id] for post_id, _ in top_posts]
 
 
 
 
 
 
 
196
 
197
  recommendation_cache[user_id] = results
198
  if len(recommendation_cache) > CACHE_SIZE:
 
210
  def sync_background_update():
211
  loop = asyncio.new_event_loop()
212
  asyncio.set_event_loop(loop)
213
+ loop.run_until_complete(background_update())
214
+ loop.close()
 
 
215
 
216
  @app.get("/recommend/feed")
217
  async def get_recommendations_handler(user_id: str = Query(...)):