andykr1k commited on
Commit
b15e91c
·
1 Parent(s): 8b72f40

Added a fix for 1000 batch reqs

Browse files
Files changed (1) hide show
  1. app.py +35 -9
app.py CHANGED
@@ -38,7 +38,7 @@ random.seed(SEED)
38
  np.random.seed(SEED)
39
 
40
  TOP_K = 75
41
- HISTORY_WINDOW = timedelta(days=365)
42
  TIMEZONE = ZoneInfo("UTC")
43
  UPDATE_INTERVAL = 300 # In seconds (5 minutes)
44
 
@@ -83,14 +83,36 @@ class Recommender:
83
  self.post_popularity = defaultdict(float)
84
  self.last_update = datetime.now(TIMEZONE) - HISTORY_WINDOW
85
 
86
- async def update_data(self):
 
87
  supabase = get_supabase_client()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
89
- # Fetch likes and comments
90
- likes = await asyncio.to_thread(supabase.table('likes').select('user_id, post_id, created_at').gt('created_at', self.last_update.isoformat()).execute)
91
- comments = await asyncio.to_thread(supabase.table('comments').select('user_id, post_id, created_at').gt('created_at', self.last_update.isoformat()).execute)
 
92
 
93
- for interaction in likes.data + comments.data:
94
  user_id = interaction['user_id']
95
  post_id = interaction['post_id']
96
  user_interactions[user_id].add(post_id)
@@ -99,10 +121,10 @@ class Recommender:
99
  self.user_profiles[user_id] += post_features[post_id]
100
 
101
  # Fetch posts
102
- posts = await asyncio.to_thread(supabase.table('posts').select('*').gt('created_at', self.last_update.isoformat()).execute)
103
 
104
  post_texts, post_ids = [], []
105
- for post in posts.data:
106
  post_id = post['id']
107
  text = f"{post.get('movie_name', '')} {post.get('content', '')}".strip()
108
  post_texts.append(text)
@@ -117,7 +139,7 @@ class Recommender:
117
  post_features[post_id] = embedding / np.linalg.norm(embedding)
118
 
119
  self.last_update = datetime.now(TIMEZONE)
120
- logger.info("Data updated: {} posts, {} interactions".format(len(posts.data), len(likes.data) + len(comments.data)))
121
 
122
  def get_recommendations(self, user_id: str) -> List[Dict]:
123
  user_profile = self.user_profiles[user_id]
@@ -183,3 +205,7 @@ async def shutdown_event():
183
  @app.get("/")
184
  async def health_check():
185
  return {"status": "success", "message": "Service operational"}
 
 
 
 
 
38
  np.random.seed(SEED)
39
 
40
  TOP_K = 75
41
+ HISTORY_WINDOW = timedelta(days=1000)
42
  TIMEZONE = ZoneInfo("UTC")
43
  UPDATE_INTERVAL = 300 # In seconds (5 minutes)
44
 
 
83
  self.post_popularity = defaultdict(float)
84
  self.last_update = datetime.now(TIMEZONE) - HISTORY_WINDOW
85
 
86
+ async def fetch_all_rows(self, table_name: str, columns: str, last_update: datetime):
87
+ """Fetch all rows from a table using pagination."""
88
  supabase = get_supabase_client()
89
+ page_size = 1000
90
+ page = 0
91
+ all_data = []
92
+
93
+ while True:
94
+ response = await asyncio.to_thread(
95
+ supabase.table(table_name)
96
+ .select(columns)
97
+ .gt('created_at', last_update.isoformat())
98
+ .range(page * page_size, (page + 1) * page_size - 1)
99
+ .execute
100
+ )
101
+
102
+ if not response.data:
103
+ break
104
+
105
+ all_data.extend(response.data)
106
+ page += 1
107
+
108
+ return all_data
109
 
110
+ async def update_data(self):
111
+ # Fetch all likes and comments with pagination
112
+ likes = await recommender.fetch_all_rows('likes', 'user_id, post_id, created_at', self.last_update)
113
+ comments = await recommender.fetch_all_rows('comments', 'user_id, post_id, created_at', self.last_update)
114
 
115
+ for interaction in likes + comments:
116
  user_id = interaction['user_id']
117
  post_id = interaction['post_id']
118
  user_interactions[user_id].add(post_id)
 
121
  self.user_profiles[user_id] += post_features[post_id]
122
 
123
  # Fetch posts
124
+ posts = await recommender.fetch_all_rows('posts', '*', self.last_update)
125
 
126
  post_texts, post_ids = [], []
127
+ for post in posts:
128
  post_id = post['id']
129
  text = f"{post.get('movie_name', '')} {post.get('content', '')}".strip()
130
  post_texts.append(text)
 
139
  post_features[post_id] = embedding / np.linalg.norm(embedding)
140
 
141
  self.last_update = datetime.now(TIMEZONE)
142
+ logger.info("Data updated: {} posts, {} interactions".format(len(posts), len(likes) + len(comments)))
143
 
144
  def get_recommendations(self, user_id: str) -> List[Dict]:
145
  user_profile = self.user_profiles[user_id]
 
205
  @app.get("/")
206
  async def health_check():
207
  return {"status": "success", "message": "Service operational"}
208
+
209
+ if __name__ == "__main__":
210
+ import uvicorn
211
+ uvicorn.run(app, host="0.0.0.0", port=8000)