andykr1k commited on
Commit
54acd78
·
1 Parent(s): 717f4a5

added scheduler, logging and optimization updates

Browse files
Files changed (1) hide show
  1. app.py +21 -18
app.py CHANGED
@@ -18,11 +18,10 @@ import json
18
  from apscheduler.schedulers.background import BackgroundScheduler
19
  from apscheduler.triggers.cron import CronTrigger
20
  import logging
21
- import uvicorn
22
  import pytz
23
 
24
  # Configure logging
25
- logging.basicConfig(level=logging.INFO)
26
  logger = logging.getLogger(__name__)
27
 
28
  load_dotenv()
@@ -56,6 +55,7 @@ def get_supabase_client():
56
 
57
  def load_and_preprocess_data():
58
  supabase = get_supabase_client()
 
59
 
60
  def fetch_table(table, columns, chunk_size=1000):
61
  offset = 0
@@ -84,6 +84,7 @@ def load_and_preprocess_data():
84
  bipartite.add_edges_from((p['author'], p['id']) for p in posts)
85
  bipartite.add_edges_from((l['user_id'], l['post_id']) for l in likes)
86
 
 
87
  return bipartite
88
 
89
  class GraphRecommender(nn.Module):
@@ -109,11 +110,19 @@ def prepare_training_data(G, node2idx, user_nodes, post_nodes):
109
  neg_sample_size = min(len(pos_edges), len(neg_candidates))
110
  neg_edges = random.sample(neg_candidates, neg_sample_size)
111
 
112
- return torch.tensor(pos_edges).T, torch.tensor(neg_edges).T
 
113
 
114
  def train_model(model, data, pos_edges, neg_edges, epochs=200):
 
 
 
 
 
 
115
  optimizer = optim.Adam(model.parameters(), lr=0.005, weight_decay=1e-4)
116
 
 
117
  for epoch in range(epochs):
118
  model.train()
119
  optimizer.zero_grad()
@@ -131,11 +140,12 @@ def train_model(model, data, pos_edges, neg_edges, epochs=200):
131
  total_loss.backward()
132
  optimizer.step()
133
 
134
- return model
 
135
 
136
  def rebuild_model():
137
  global G, features, user_nodes, post_nodes, node2idx, pyg_data, trained_model
138
- logger.info("Starting model rebuild at 3:30 AM")
139
  try:
140
  G = load_and_preprocess_data()
141
  user_nodes = sorted(n for n, attr in G.nodes(data=True) if attr['type'] == 'user')
@@ -144,11 +154,8 @@ def rebuild_model():
144
  all_nodes = user_nodes + post_nodes
145
  node2idx = {node: i for i, node in enumerate(all_nodes)}
146
 
147
- features = torch.sparse_coo_tensor(
148
- torch.arange(len(all_nodes)).repeat(2, 1),
149
- torch.ones(len(all_nodes)),
150
- (len(all_nodes), len(all_nodes))
151
- )
152
  pyg_data = from_networkx(G)
153
  pyg_data.x = features
154
 
@@ -156,14 +163,7 @@ def rebuild_model():
156
 
157
  input_dim = features.shape[1]
158
  model = GraphRecommender(input_dim)
159
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
160
- model = model.to(device)
161
- pyg_data = pyg_data.to(device)
162
- pos_edges = pos_edges.to(device)
163
- neg_edges = neg_edges.to(device)
164
-
165
  trained_model = train_model(model, pyg_data, pos_edges, neg_edges)
166
- trained_model = trained_model.to('cpu')
167
  logger.info("Model rebuild completed successfully")
168
  except Exception as e:
169
  logger.error(f"Error during model rebuild: {str(e)}")
@@ -187,6 +187,7 @@ def get_recommendations(user_id, model, data, G, user_nodes, post_nodes, node2id
187
  post_scores = [(post_nodes[i], score.item()) for i, score in zip(post_indices, scores)]
188
  post_scores = sorted(post_scores, key=lambda x: x[1], reverse=True)
189
 
 
190
  return [{"post_id": post, "score": score} for post, score in post_scores]
191
 
192
  def fetch_full_post_records(post_ids, batch_size=1000):
@@ -231,6 +232,7 @@ async def get_recommendations_handler(user_id: str = Query(...)):
231
  post_record["score"] = post["score"]
232
  ordered_recommendations.append(post_record)
233
 
 
234
  def generate():
235
  yield '{"status": "success", "recommendations": ['
236
  for i, rec in enumerate(ordered_recommendations):
@@ -257,7 +259,7 @@ scheduler.add_job(
257
  async def startup_event():
258
  rebuild_model()
259
  scheduler.start()
260
- logger.info("Scheduler started, model will rebuild daily at 3:30 AM")
261
 
262
  @app.on_event("shutdown")
263
  async def shutdown_event():
@@ -265,4 +267,5 @@ async def shutdown_event():
265
  logger.info("Scheduler shut down")
266
 
267
  if __name__ == "__main__":
 
268
  uvicorn.run(app, host="0.0.0.0", port=8000)
 
18
  from apscheduler.schedulers.background import BackgroundScheduler
19
  from apscheduler.triggers.cron import CronTrigger
20
  import logging
 
21
  import pytz
22
 
23
  # Configure logging
24
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
25
  logger = logging.getLogger(__name__)
26
 
27
  load_dotenv()
 
55
 
56
  def load_and_preprocess_data():
57
  supabase = get_supabase_client()
58
+ logger.info("Loading data from Supabase")
59
 
60
  def fetch_table(table, columns, chunk_size=1000):
61
  offset = 0
 
84
  bipartite.add_edges_from((p['author'], p['id']) for p in posts)
85
  bipartite.add_edges_from((l['user_id'], l['post_id']) for l in likes)
86
 
87
+ logger.info(f"Loaded graph with {len(user_set)} users and {len(post_set)} posts")
88
  return bipartite
89
 
90
  class GraphRecommender(nn.Module):
 
110
  neg_sample_size = min(len(pos_edges), len(neg_candidates))
111
  neg_edges = random.sample(neg_candidates, neg_sample_size)
112
 
113
+ logger.info(f"Prepared {len(pos_edges)} positive and {len(neg_edges)} negative edges")
114
+ return torch.tensor(pos_edges, dtype=torch.long).T, torch.tensor(neg_edges, dtype=torch.long).T
115
 
116
  def train_model(model, data, pos_edges, neg_edges, epochs=200):
117
+ device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
118
+ model = model.to(device)
119
+ data = data.to(device)
120
+ pos_edges = pos_edges.to(device)
121
+ neg_edges = neg_edges.to(device)
122
+
123
  optimizer = optim.Adam(model.parameters(), lr=0.005, weight_decay=1e-4)
124
 
125
+ logger.info("Starting model training")
126
  for epoch in range(epochs):
127
  model.train()
128
  optimizer.zero_grad()
 
140
  total_loss.backward()
141
  optimizer.step()
142
 
143
+ logger.info("Model training completed")
144
+ return model.to('cpu')
145
 
146
  def rebuild_model():
147
  global G, features, user_nodes, post_nodes, node2idx, pyg_data, trained_model
148
+ logger.info("Starting model rebuild at 3:30 AM Pacific Time")
149
  try:
150
  G = load_and_preprocess_data()
151
  user_nodes = sorted(n for n, attr in G.nodes(data=True) if attr['type'] == 'user')
 
154
  all_nodes = user_nodes + post_nodes
155
  node2idx = {node: i for i, node in enumerate(all_nodes)}
156
 
157
+ # Use dense features instead of sparse (SAGEConv requires dense input)
158
+ features = torch.eye(len(all_nodes))
 
 
 
159
  pyg_data = from_networkx(G)
160
  pyg_data.x = features
161
 
 
163
 
164
  input_dim = features.shape[1]
165
  model = GraphRecommender(input_dim)
 
 
 
 
 
 
166
  trained_model = train_model(model, pyg_data, pos_edges, neg_edges)
 
167
  logger.info("Model rebuild completed successfully")
168
  except Exception as e:
169
  logger.error(f"Error during model rebuild: {str(e)}")
 
187
  post_scores = [(post_nodes[i], score.item()) for i, score in zip(post_indices, scores)]
188
  post_scores = sorted(post_scores, key=lambda x: x[1], reverse=True)
189
 
190
+ logger.info(f"Generated {len(post_scores)} recommendations for user {user_id}")
191
  return [{"post_id": post, "score": score} for post, score in post_scores]
192
 
193
  def fetch_full_post_records(post_ids, batch_size=1000):
 
232
  post_record["score"] = post["score"]
233
  ordered_recommendations.append(post_record)
234
 
235
+ # Stream the response
236
  def generate():
237
  yield '{"status": "success", "recommendations": ['
238
  for i, rec in enumerate(ordered_recommendations):
 
259
  async def startup_event():
260
  rebuild_model()
261
  scheduler.start()
262
+ logger.info("Scheduler started, model will rebuild daily at 3:30 AM Pacific Time")
263
 
264
  @app.on_event("shutdown")
265
  async def shutdown_event():
 
267
  logger.info("Scheduler shut down")
268
 
269
  if __name__ == "__main__":
270
+ import uvicorn
271
  uvicorn.run(app, host="0.0.0.0", port=8000)