andykr1k commited on
Commit
de77c9a
·
1 Parent(s): 3040e30

New feed recommender

Browse files
Files changed (1) hide show
  1. app.py +24 -96
app.py CHANGED
@@ -37,7 +37,6 @@ SEED = 42
37
  random.seed(SEED)
38
  np.random.seed(SEED)
39
 
40
- CACHE_SIZE = 1000
41
  TOP_K = 75
42
  HISTORY_WINDOW = timedelta(days=30)
43
  TIMEZONE = ZoneInfo("UTC")
@@ -48,7 +47,7 @@ supabase_client = None
48
  user_interactions = defaultdict(set)
49
  post_features = {}
50
  post_metadata = {}
51
- recommendation_cache = {}
52
  if not os.path.exists('/tmp/cache'):
53
  os.makedirs('/tmp/cache')
54
  sentence_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='/tmp/cache')
@@ -56,100 +55,47 @@ sentence_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='/tmp/cach
56
  SUPABASE_URL = os.getenv('supabaseUrl')
57
  SUPABASE_KEY = os.getenv('supabaseAnonKey')
58
 
59
-
60
  def get_supabase_client():
61
  global supabase_client
62
  if supabase_client is None:
63
  supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY)
64
  return supabase_client
65
 
66
-
67
  def parse_datetime(dt_str: str) -> datetime:
68
- """Parse ISO datetime string and ensure correct microsecond precision."""
69
  try:
70
- if '.' in dt_str:
71
- date_part, time_part = dt_str.split('T')
72
- time_part, tz_part = time_part.split('+')
73
- if '.' in time_part:
74
- time_without_micro, micro = time_part.split('.')
75
- micro = micro.ljust(6, '0') # Ensure microseconds are 6 digits
76
- time_part = f"{time_without_micro}.{micro}"
77
- dt_str = f"{date_part}T{time_part}+00:00"
78
  return datetime.fromisoformat(dt_str).astimezone(TIMEZONE)
79
  except Exception as e:
80
  logger.error(f"Error parsing datetime: {dt_str} - {str(e)}")
81
  raise
82
 
83
-
84
  class Recommender:
85
  def __init__(self):
86
  self.user_profiles = defaultdict(lambda: np.zeros(384))
87
  self.post_popularity = defaultdict(float)
88
  self.last_update = datetime.now(TIMEZONE) - HISTORY_WINDOW
89
 
90
- async def update_interactions(self):
91
  supabase = get_supabase_client()
92
 
93
- # Fetch new likes
94
- likes = await asyncio.to_thread(
95
- supabase.table('likes')
96
- .select('user_id, post_id, created_at')
97
- .gt('created_at', self.last_update.isoformat())
98
- .execute
99
- )
100
-
101
- # Fetch new comments
102
- comments = await asyncio.to_thread(
103
- supabase.table('comments')
104
- .select('user_id, post_id, created_at')
105
- .gt('created_at', self.last_update.isoformat())
106
- .execute
107
- )
108
-
109
- for like in likes.data:
110
- user_id = like['user_id']
111
- post_id = like['post_id']
112
- timestamp = parse_datetime(like['created_at'])
113
- user_interactions[user_id].add(post_id)
114
- self.post_popularity[post_id] += 1.0
115
- if post_id in post_features:
116
- self.user_profiles[user_id] = (
117
- 0.5 * self.user_profiles[user_id] +
118
- 0.5 * post_features[post_id]
119
- )
120
-
121
- for comment in comments.data:
122
- user_id = comment['user_id']
123
- post_id = comment['post_id']
124
- timestamp = parse_datetime(comment['created_at'])
125
  user_interactions[user_id].add(post_id)
126
- self.post_popularity[post_id] += 0.5
127
  if post_id in post_features:
128
- self.user_profiles[user_id] = (
129
- 0.5 * self.user_profiles[user_id] +
130
- 0.5 * post_features[post_id]
131
- )
132
 
133
- self.last_update = datetime.now(TIMEZONE)
134
- logger.info(f"Updated interactions: {len(likes.data)} new likes, {len(comments.data)} new comments")
135
 
136
- async def update_posts(self):
137
- supabase = get_supabase_client()
138
-
139
- posts = await asyncio.to_thread(
140
- supabase.table('posts')
141
- .select('*')
142
- .gt('created_at', self.last_update.isoformat())
143
- .execute
144
- )
145
-
146
- post_texts = []
147
- post_ids = []
148
  for post in posts.data:
149
  post_id = post['id']
150
- title = post.get('movie_name', '') or ''
151
- description = post.get('content', '') or ''
152
- text = f"{title} {description}".strip()
153
  post_texts.append(text)
154
  post_ids.append(post_id)
155
  post['created_at'] = parse_datetime(post['created_at'])
@@ -161,16 +107,10 @@ class Recommender:
161
  for post_id, embedding in zip(post_ids, embeddings):
162
  post_features[post_id] = embedding / np.linalg.norm(embedding)
163
 
164
- logger.info(f"Updated {len(posts.data)} posts")
165
-
166
- async def full_update(self):
167
- await self.update_posts()
168
- await self.update_interactions()
169
 
170
  def get_recommendations(self, user_id: str) -> List[Dict]:
171
- if user_id in recommendation_cache:
172
- return recommendation_cache[user_id]
173
-
174
  user_profile = self.user_profiles[user_id]
175
  seen_posts = user_interactions[user_id]
176
 
@@ -184,19 +124,13 @@ class Recommender:
184
  sim_score = np.dot(user_profile, feature) if np.any(user_profile) else 0
185
  time_diff = now - post_metadata[post_id]['created_at']
186
  freshness = 1.0 / (1.0 + (time_diff.days / 7))
187
- score = (
188
- 0.6 * sim_score +
189
- 0.3 * np.log1p(self.post_popularity[post_id]) +
190
- 0.1 * freshness
191
- )
192
  scores[post_id] = score + random.uniform(-0.1, 0.1)
193
 
194
  top_posts = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:TOP_K]
195
  results = [post_metadata[post_id] for post_id, _ in top_posts]
196
-
197
- recommendation_cache[user_id] = results
198
- if len(recommendation_cache) > CACHE_SIZE:
199
- recommendation_cache.pop(next(iter(recommendation_cache)))
200
 
201
  return results
202
 
@@ -204,8 +138,7 @@ recommender = Recommender()
204
  scheduler = BackgroundScheduler(timezone="UTC")
205
 
206
  async def background_update():
207
- await recommender.update_posts()
208
- await recommender.update_interactions()
209
 
210
  def sync_background_update():
211
  loop = asyncio.new_event_loop()
@@ -218,8 +151,8 @@ async def get_recommendations_handler(user_id: str = Query(...)):
218
  try:
219
  recommendations = recommender.get_recommendations(user_id)
220
 
221
- if recommendations:
222
- insert_pos = random.randint(0, min(9, len(recommendations)-1))
223
  recommendations.insert(insert_pos, {"type": "suggestedaccounts"})
224
 
225
  return {"status": "success", "recommendations": recommendations}
@@ -229,10 +162,9 @@ async def get_recommendations_handler(user_id: str = Query(...)):
229
 
230
  @app.on_event("startup")
231
  async def startup_event():
232
- await recommender.full_update()
233
  scheduler.add_job(sync_background_update, 'interval', seconds=UPDATE_INTERVAL)
234
  scheduler.start()
235
- logger.info(f"Recommender initialized with background updates every {UPDATE_INTERVAL} seconds")
236
 
237
  @app.on_event("shutdown")
238
  async def shutdown_event():
@@ -242,7 +174,3 @@ async def shutdown_event():
242
  @app.get("/")
243
  async def health_check():
244
  return {"status": "success", "message": "Service operational"}
245
-
246
- if __name__ == "__main__":
247
- import uvicorn
248
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
37
  random.seed(SEED)
38
  np.random.seed(SEED)
39
 
 
40
  TOP_K = 75
41
  HISTORY_WINDOW = timedelta(days=30)
42
  TIMEZONE = ZoneInfo("UTC")
 
47
  user_interactions = defaultdict(set)
48
  post_features = {}
49
  post_metadata = {}
50
+
51
  if not os.path.exists('/tmp/cache'):
52
  os.makedirs('/tmp/cache')
53
  sentence_model = SentenceTransformer('all-MiniLM-L6-v2', cache_folder='/tmp/cache')
 
55
  SUPABASE_URL = os.getenv('supabaseUrl')
56
  SUPABASE_KEY = os.getenv('supabaseAnonKey')
57
 
 
58
  def get_supabase_client():
59
  global supabase_client
60
  if supabase_client is None:
61
  supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY)
62
  return supabase_client
63
 
 
64
  def parse_datetime(dt_str: str) -> datetime:
 
65
  try:
 
 
 
 
 
 
 
 
66
  return datetime.fromisoformat(dt_str).astimezone(TIMEZONE)
67
  except Exception as e:
68
  logger.error(f"Error parsing datetime: {dt_str} - {str(e)}")
69
  raise
70
 
 
71
  class Recommender:
72
  def __init__(self):
73
  self.user_profiles = defaultdict(lambda: np.zeros(384))
74
  self.post_popularity = defaultdict(float)
75
  self.last_update = datetime.now(TIMEZONE) - HISTORY_WINDOW
76
 
77
+ async def update_data(self):
78
  supabase = get_supabase_client()
79
 
80
+ # Fetch likes and comments
81
+ likes = await asyncio.to_thread(supabase.table('likes').select('user_id, post_id, created_at').gt('created_at', self.last_update.isoformat()).execute)
82
+ comments = await asyncio.to_thread(supabase.table('comments').select('user_id, post_id, created_at').gt('created_at', self.last_update.isoformat()).execute)
83
+
84
+ for interaction in likes.data + comments.data:
85
+ user_id = interaction['user_id']
86
+ post_id = interaction['post_id']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  user_interactions[user_id].add(post_id)
88
+ self.post_popularity[post_id] += 1.0 if 'likes' else 0.5
89
  if post_id in post_features:
90
+ self.user_profiles[user_id] += post_features[post_id]
 
 
 
91
 
92
+ # Fetch posts
93
+ posts = await asyncio.to_thread(supabase.table('posts').select('*').gt('created_at', self.last_update.isoformat()).execute)
94
 
95
+ post_texts, post_ids = [], []
 
 
 
 
 
 
 
 
 
 
 
96
  for post in posts.data:
97
  post_id = post['id']
98
+ text = f"{post.get('movie_name', '')} {post.get('content', '')}".strip()
 
 
99
  post_texts.append(text)
100
  post_ids.append(post_id)
101
  post['created_at'] = parse_datetime(post['created_at'])
 
107
  for post_id, embedding in zip(post_ids, embeddings):
108
  post_features[post_id] = embedding / np.linalg.norm(embedding)
109
 
110
+ self.last_update = datetime.now(TIMEZONE)
111
+ logger.info("Data updated: {} posts, {} interactions".format(len(posts.data), len(likes.data) + len(comments.data)))
 
 
 
112
 
113
  def get_recommendations(self, user_id: str) -> List[Dict]:
 
 
 
114
  user_profile = self.user_profiles[user_id]
115
  seen_posts = user_interactions[user_id]
116
 
 
124
  sim_score = np.dot(user_profile, feature) if np.any(user_profile) else 0
125
  time_diff = now - post_metadata[post_id]['created_at']
126
  freshness = 1.0 / (1.0 + (time_diff.days / 7))
127
+
128
+ score = 0.6 * sim_score + 0.3 * np.log1p(self.post_popularity[post_id]) + 0.1 * freshness
 
 
 
129
  scores[post_id] = score + random.uniform(-0.1, 0.1)
130
 
131
  top_posts = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:TOP_K]
132
  results = [post_metadata[post_id] for post_id, _ in top_posts]
133
+ random.shuffle(results)
 
 
 
134
 
135
  return results
136
 
 
138
  scheduler = BackgroundScheduler(timezone="UTC")
139
 
140
  async def background_update():
141
+ await recommender.update_data()
 
142
 
143
  def sync_background_update():
144
  loop = asyncio.new_event_loop()
 
151
  try:
152
  recommendations = recommender.get_recommendations(user_id)
153
 
154
+ if recommendations and not any(item.get("type") == "suggestedaccounts" for item in recommendations):
155
+ insert_pos = random.randint(0, min(9, len(recommendations) - 1))
156
  recommendations.insert(insert_pos, {"type": "suggestedaccounts"})
157
 
158
  return {"status": "success", "recommendations": recommendations}
 
162
 
163
  @app.on_event("startup")
164
  async def startup_event():
165
+ await recommender.update_data()
166
  scheduler.add_job(sync_background_update, 'interval', seconds=UPDATE_INTERVAL)
167
  scheduler.start()
 
168
 
169
  @app.on_event("shutdown")
170
  async def shutdown_event():
 
174
  @app.get("/")
175
  async def health_check():
176
  return {"status": "success", "message": "Service operational"}