andykr1k commited on
Commit
100c733
·
1 Parent(s): d693ac3

small error handling and logging

Browse files
Files changed (2) hide show
  1. app.py +94 -69
  2. requirements.txt +2 -1
app.py CHANGED
@@ -1,7 +1,8 @@
1
  import os
2
  import random
 
 
3
  import numpy as np
4
- import pandas as pd
5
  import networkx as nx
6
  import torch
7
  import torch.nn as nn
@@ -19,6 +20,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
19
  from apscheduler.triggers.cron import CronTrigger
20
  import logging
21
  import pytz
 
22
 
23
  # Configure logging
24
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@@ -27,7 +29,6 @@ logger = logging.getLogger(__name__)
27
  load_dotenv()
28
 
29
  app = FastAPI()
30
-
31
  app.add_middleware(
32
  CORSMiddleware,
33
  allow_origins=["*"],
@@ -44,8 +45,8 @@ if torch.cuda.is_available():
44
  torch.cuda.manual_seed_all(SEED)
45
 
46
  # Global Variables
47
- global G, features, user_nodes, post_nodes, node2idx, pyg_data, trained_model
48
- G = features = user_nodes = post_nodes = node2idx = pyg_data = trained_model = None
49
 
50
  SUPABASE_URL = os.getenv('supabaseUrl')
51
  SUPABASE_KEY = os.getenv('supabaseAnonKey')
@@ -53,39 +54,49 @@ SUPABASE_KEY = os.getenv('supabaseAnonKey')
53
  def get_supabase_client():
54
  return create_client(SUPABASE_URL, SUPABASE_KEY)
55
 
56
- def load_and_preprocess_data():
57
  supabase = get_supabase_client()
58
- logger.info("Loading data from Supabase")
59
-
60
- def fetch_table(table, columns, chunk_size=1000):
 
 
 
 
 
61
  offset = 0
62
- all_data = []
63
- while True:
64
- response = supabase.table(table).select(columns).range(offset, offset + chunk_size - 1).execute()
65
- data = response.data
66
- if not data:
67
- break
68
- all_data.extend(data)
69
  offset += chunk_size
70
- return all_data
 
71
 
72
- profiles = fetch_table('profiles', 'id')
73
- posts = fetch_table('posts', 'id, author')
74
- likes = fetch_table('likes', 'user_id, post_id')
 
 
75
 
76
- bipartite = nx.DiGraph()
 
 
77
 
78
  user_set = {p['author'] for p in posts} | {l['user_id'] for l in likes}
79
  post_set = {p['id'] for p in posts}
80
 
81
- bipartite.add_nodes_from(user_set, type='user')
82
- bipartite.add_nodes_from(post_set, type='post')
 
 
83
 
84
- bipartite.add_edges_from((p['author'], p['id']) for p in posts)
85
- bipartite.add_edges_from((l['user_id'], l['post_id']) for l in likes)
 
 
86
 
87
  logger.info(f"Loaded graph with {len(user_set)} users and {len(post_set)} posts")
88
- return bipartite
89
 
90
  class GraphRecommender(nn.Module):
91
  def __init__(self, input_dim, hidden_dim=128, output_dim=64):
@@ -101,94 +112,107 @@ class GraphRecommender(nn.Module):
101
  return x
102
 
103
  def prepare_training_data(G, node2idx, user_nodes, post_nodes):
104
- pos_edges = [(node2idx[u], node2idx[v]) for u, v in G.edges() if G.nodes[u]['type'] == 'user' and G.nodes[v]['type'] == 'post']
105
-
106
- all_possible = [(node2idx[u], node2idx[p]) for u in user_nodes for p in post_nodes]
107
  pos_set = set(pos_edges)
108
- neg_candidates = [pair for pair in all_possible if pair not in pos_set]
109
 
110
- neg_sample_size = min(len(pos_edges), len(neg_candidates))
111
- neg_edges = random.sample(neg_candidates, neg_sample_size)
 
 
 
 
 
 
 
112
 
113
  logger.info(f"Prepared {len(pos_edges)} positive and {len(neg_edges)} negative edges")
114
- return torch.tensor(pos_edges, dtype=torch.long).T, torch.tensor(neg_edges, dtype=torch.long).T
115
 
116
- def train_model(model, data, pos_edges, neg_edges, epochs=200):
117
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
118
  model = model.to(device)
119
  data = data.to(device)
120
- pos_edges = pos_edges.to(device)
121
- neg_edges = neg_edges.to(device)
122
-
123
  optimizer = optim.Adam(model.parameters(), lr=0.005, weight_decay=1e-4)
124
 
 
 
 
 
 
125
  logger.info("Starting model training")
126
  for epoch in range(epochs):
127
  model.train()
128
- optimizer.zero_grad()
129
-
130
- embeddings = model(data.x, data.edge_index)
 
131
 
132
- pos_scores = (embeddings[pos_edges[0]] * embeddings[pos_edges[1]]).sum(1)
133
- neg_scores = (embeddings[neg_edges[0]] * embeddings[neg_edges[1]]).sum(1)
 
134
 
135
- pos_loss = F.binary_cross_entropy_with_logits(pos_scores, torch.ones_like(pos_scores))
136
- neg_loss = F.binary_cross_entropy_with_logits(neg_scores, torch.zeros_like(neg_scores))
 
137
 
138
- total_loss = pos_loss + neg_loss
 
 
139
 
140
- total_loss.backward()
141
- optimizer.step()
142
 
 
 
 
143
  logger.info("Model training completed")
144
- return model.to('cpu')
145
 
146
  def rebuild_model():
147
- global G, features, user_nodes, post_nodes, node2idx, pyg_data, trained_model
148
  logger.info("Starting model rebuild at 3:30 AM Pacific Time")
149
  try:
150
- G = load_and_preprocess_data()
151
- user_nodes = sorted(n for n, attr in G.nodes(data=True) if attr['type'] == 'user')
152
- post_nodes = sorted(n for n, attr in G.nodes(data=True) if attr['type'] == 'post')
153
 
154
  all_nodes = user_nodes + post_nodes
155
  node2idx = {node: i for i, node in enumerate(all_nodes)}
156
 
157
- # Use dense features instead of sparse (SAGEConv requires dense input)
 
 
 
 
 
158
  features = torch.eye(len(all_nodes))
159
- pyg_data = from_networkx(G)
160
  pyg_data.x = features
161
 
162
  pos_edges, neg_edges = prepare_training_data(G, node2idx, user_nodes, post_nodes)
163
-
164
  input_dim = features.shape[1]
165
  model = GraphRecommender(input_dim)
166
- trained_model = train_model(model, pyg_data, pos_edges, neg_edges)
167
  logger.info("Model rebuild completed successfully")
168
  except Exception as e:
169
  logger.error(f"Error during model rebuild: {str(e)}")
170
  raise
171
 
172
- def get_recommendations(user_id, model, data, G, user_nodes, post_nodes, node2idx):
173
  if user_id not in user_nodes:
174
  return []
175
 
176
  user_idx = node2idx[user_id]
177
- user_interacted = {v for _, v in G.out_edges(user_id) if G.nodes[v]['type'] == 'post'}
178
  post_indices = [node2idx[p] for p in post_nodes if p not in user_interacted]
179
 
180
  with torch.no_grad():
181
- embeddings = model(data.x, data.edge_index)
182
- user_embed = embeddings[user_idx].unsqueeze(0)
183
- post_embeds = embeddings[post_indices]
184
-
185
  scores = torch.matmul(user_embed, post_embeds.T).squeeze(0)
186
 
187
- # Create an inverse mapping from index to post ID
188
  idx2node = {idx: node for node, idx in node2idx.items()}
189
- # Map post_indices back to original post IDs
190
  post_scores = [(idx2node[i], score.item()) for i, score in zip(post_indices, scores)]
191
- post_scores = sorted(post_scores, key=lambda x: x[1], reverse=True)
192
 
193
  logger.info(f"Generated {len(post_scores)} recommendations for user {user_id}")
194
  return [{"post_id": post, "score": score} for post, score in post_scores]
@@ -197,16 +221,14 @@ def fetch_full_post_records(post_ids, batch_size=1000):
197
  supabase = get_supabase_client()
198
  if not post_ids:
199
  return []
200
-
201
  records = []
202
  for i in range(0, len(post_ids), batch_size):
203
  batch_ids = post_ids[i:i + batch_size]
204
- response = supabase.table('posts').select('*').in_('id', batch_ids).execute()
205
  batch_records = response.data
206
  for record in batch_records:
207
  record['type'] = 'post'
208
  records.extend(batch_records)
209
-
210
  return records
211
 
212
  @app.post("/rebuild")
@@ -219,7 +241,7 @@ async def get_recommendations_handler(user_id: str = Query(...)):
219
  if trained_model is None:
220
  raise HTTPException(status_code=500, detail="Model not initialized, please rebuild first.")
221
 
222
- recommended_posts = get_recommendations(user_id, trained_model, pyg_data, G, user_nodes, post_nodes, node2idx)
223
  if not recommended_posts:
224
  return {"status": "success", "recommendations": []}
225
 
@@ -235,7 +257,10 @@ async def get_recommendations_handler(user_id: str = Query(...)):
235
  post_record["score"] = post["score"]
236
  ordered_recommendations.append(post_record)
237
 
238
- # Stream the response
 
 
 
239
  def generate():
240
  yield '{"status": "success", "recommendations": ['
241
  for i, rec in enumerate(ordered_recommendations):
 
1
  import os
2
  import random
3
+ import asyncio
4
+ import aiohttp
5
  import numpy as np
 
6
  import networkx as nx
7
  import torch
8
  import torch.nn as nn
 
20
  from apscheduler.triggers.cron import CronTrigger
21
  import logging
22
  import pytz
23
+ from collections import defaultdict
24
 
25
  # Configure logging
26
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
29
  load_dotenv()
30
 
31
  app = FastAPI()
 
32
  app.add_middleware(
33
  CORSMiddleware,
34
  allow_origins=["*"],
 
45
  torch.cuda.manual_seed_all(SEED)
46
 
47
  # Global Variables
48
+ global G, features, user_nodes, post_nodes, node2idx, pyg_data, trained_model, post_embeddings
49
+ G = features = user_nodes = post_nodes = node2idx = pyg_data = trained_model = post_embeddings = None
50
 
51
  SUPABASE_URL = os.getenv('supabaseUrl')
52
  SUPABASE_KEY = os.getenv('supabaseAnonKey')
 
54
  def get_supabase_client():
55
  return create_client(SUPABASE_URL, SUPABASE_KEY)
56
 
57
+ async def fetch_chunk(session, table, columns, offset, chunk_size):
58
  supabase = get_supabase_client()
59
+ response = await asyncio.to_thread(
60
+ supabase.table(table).select(columns).range(offset, offset + chunk_size - 1).execute
61
+ )
62
+ return response.data
63
+
64
+ async def fetch_table_async(table, columns, chunk_size=1000):
65
+ async with aiohttp.ClientSession() as session:
66
+ tasks = []
67
  offset = 0
68
+ max_rows = 100000 # Adjust based on expected data size
69
+ while offset < max_rows:
70
+ tasks.append(fetch_chunk(session, table, columns, offset, chunk_size))
 
 
 
 
71
  offset += chunk_size
72
+ chunks = await asyncio.gather(*tasks)
73
+ return [item for chunk in chunks if chunk for item in chunk]
74
 
75
+ def load_and_preprocess_data():
76
+ logger.info("Loading data from Supabase")
77
+ profiles = asyncio.run(fetch_table_async('profiles', 'id'))
78
+ posts = asyncio.run(fetch_table_async('posts', 'id, author'))
79
+ likes = asyncio.run(fetch_table_async('likes', 'user_id, post_id'))
80
 
81
+ # Use adjacency list for graph
82
+ graph = defaultdict(set)
83
+ node_types = {}
84
 
85
  user_set = {p['author'] for p in posts} | {l['user_id'] for l in likes}
86
  post_set = {p['id'] for p in posts}
87
 
88
+ for user in user_set:
89
+ node_types[user] = 'user'
90
+ for post in post_set:
91
+ node_types[post] = 'post'
92
 
93
+ for p in posts:
94
+ graph[p['author']].add(p['id'])
95
+ for l in likes:
96
+ graph[l['user_id']].add(l['post_id'])
97
 
98
  logger.info(f"Loaded graph with {len(user_set)} users and {len(post_set)} posts")
99
+ return graph, node_types
100
 
101
  class GraphRecommender(nn.Module):
102
  def __init__(self, input_dim, hidden_dim=128, output_dim=64):
 
112
  return x
113
 
114
  def prepare_training_data(G, node2idx, user_nodes, post_nodes):
115
+ pos_edges = [(node2idx[u], node2idx[v]) for u in user_nodes for v in G[u]]
 
 
116
  pos_set = set(pos_edges)
 
117
 
118
+ neg_edges = []
119
+ num_neg = len(pos_edges)
120
+ post_indices = [node2idx[p] for p in post_nodes]
121
+ for _ in range(num_neg):
122
+ u_idx = random.choice([node2idx[u] for u in user_nodes])
123
+ p_idx = random.choice(post_indices)
124
+ while (u_idx, p_idx) in pos_set:
125
+ p_idx = random.choice(post_indices)
126
+ neg_edges.append((u_idx, p_idx))
127
 
128
  logger.info(f"Prepared {len(pos_edges)} positive and {len(neg_edges)} negative edges")
129
+ return torch.tensor(pos_edges, dtype=torch.long), torch.tensor(neg_edges, dtype=torch.long)
130
 
131
+ def train_model(model, data, pos_edges, neg_edges, epochs=50, batch_size=1024):
132
  device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
133
  model = model.to(device)
134
  data = data.to(device)
 
 
 
135
  optimizer = optim.Adam(model.parameters(), lr=0.005, weight_decay=1e-4)
136
 
137
+ pos_dataset = torch.utils.data.TensorDataset(pos_edges.T)
138
+ neg_dataset = torch.utils.data.TensorDataset(neg_edges.T)
139
+ pos_loader = torch.utils.data.DataLoader(pos_dataset, batch_size=batch_size, shuffle=True)
140
+ neg_loader = torch.utils.data.DataLoader(neg_dataset, batch_size=batch_size, shuffle=True)
141
+
142
  logger.info("Starting model training")
143
  for epoch in range(epochs):
144
  model.train()
145
+ total_loss = 0
146
+ for (pos_batch,), (neg_batch,) in zip(pos_loader, neg_loader):
147
+ pos_batch, neg_batch = pos_batch.to(device), neg_batch.to(device)
148
+ optimizer.zero_grad()
149
 
150
+ embeddings = model(data.x, data.edge_index)
151
+ pos_scores = (embeddings[pos_batch[:, 0]] * embeddings[pos_batch[:, 1]]).sum(1)
152
+ neg_scores = (embeddings[neg_batch[:, 0]] * embeddings[neg_batch[:, 1]]).sum(1)
153
 
154
+ pos_loss = F.binary_cross_entropy_with_logits(pos_scores, torch.ones_like(pos_scores))
155
+ neg_loss = F.binary_cross_entropy_with_logits(neg_scores, torch.zeros_like(neg_scores))
156
+ loss = pos_loss + neg_loss
157
 
158
+ loss.backward()
159
+ optimizer.step()
160
+ total_loss += loss.item()
161
 
162
+ logger.info(f"Epoch {epoch+1}, Loss: {total_loss}")
 
163
 
164
+ model.eval()
165
+ with torch.no_grad():
166
+ embeddings = model(data.x, data.edge_index).to('cpu')
167
  logger.info("Model training completed")
168
+ return model.to('cpu'), embeddings
169
 
170
  def rebuild_model():
171
+ global G, features, user_nodes, post_nodes, node2idx, pyg_data, trained_model, post_embeddings
172
  logger.info("Starting model rebuild at 3:30 AM Pacific Time")
173
  try:
174
+ G, node_types = load_and_preprocess_data()
175
+ user_nodes = sorted(n for n in G if node_types[n] == 'user')
176
+ post_nodes = sorted(n for n in node_types if node_types[n] == 'post')
177
 
178
  all_nodes = user_nodes + post_nodes
179
  node2idx = {node: i for i, node in enumerate(all_nodes)}
180
 
181
+ # Convert to PyTorch Geometric format
182
+ nx_G = nx.DiGraph() # Temporary for conversion
183
+ nx_G.add_nodes_from(user_nodes + post_nodes)
184
+ for u in G:
185
+ for v in G[u]:
186
+ nx_G.add_edge(u, v)
187
  features = torch.eye(len(all_nodes))
188
+ pyg_data = from_networkx(nx_G)
189
  pyg_data.x = features
190
 
191
  pos_edges, neg_edges = prepare_training_data(G, node2idx, user_nodes, post_nodes)
 
192
  input_dim = features.shape[1]
193
  model = GraphRecommender(input_dim)
194
+ trained_model, post_embeddings = train_model(model, pyg_data, pos_edges, neg_edges)
195
  logger.info("Model rebuild completed successfully")
196
  except Exception as e:
197
  logger.error(f"Error during model rebuild: {str(e)}")
198
  raise
199
 
200
+ def get_recommendations(user_id, model, data, G, user_nodes, post_nodes, node2idx, precomputed_embeds):
201
  if user_id not in user_nodes:
202
  return []
203
 
204
  user_idx = node2idx[user_id]
205
+ user_interacted = set(G[user_id])
206
  post_indices = [node2idx[p] for p in post_nodes if p not in user_interacted]
207
 
208
  with torch.no_grad():
209
+ user_embed = model(data.x, data.edge_index)[user_idx].unsqueeze(0)
210
+ post_embeds = precomputed_embeds[post_indices]
 
 
211
  scores = torch.matmul(user_embed, post_embeds.T).squeeze(0)
212
 
 
213
  idx2node = {idx: node for node, idx in node2idx.items()}
 
214
  post_scores = [(idx2node[i], score.item()) for i, score in zip(post_indices, scores)]
215
+ post_scores = sorted(post_scores, key=lambda x: x[1], reverse=True)[:10] # Top-10
216
 
217
  logger.info(f"Generated {len(post_scores)} recommendations for user {user_id}")
218
  return [{"post_id": post, "score": score} for post, score in post_scores]
 
221
  supabase = get_supabase_client()
222
  if not post_ids:
223
  return []
 
224
  records = []
225
  for i in range(0, len(post_ids), batch_size):
226
  batch_ids = post_ids[i:i + batch_size]
227
+ response = supabase.table('posts').select('id, title, author').in_('id', batch_ids).execute()
228
  batch_records = response.data
229
  for record in batch_records:
230
  record['type'] = 'post'
231
  records.extend(batch_records)
 
232
  return records
233
 
234
  @app.post("/rebuild")
 
241
  if trained_model is None:
242
  raise HTTPException(status_code=500, detail="Model not initialized, please rebuild first.")
243
 
244
+ recommended_posts = get_recommendations(user_id, trained_model, pyg_data, G, user_nodes, post_nodes, node2idx, post_embeddings)
245
  if not recommended_posts:
246
  return {"status": "success", "recommendations": []}
247
 
 
257
  post_record["score"] = post["score"]
258
  ordered_recommendations.append(post_record)
259
 
260
+ if ordered_recommendations:
261
+ insert_position = random.randint(0, min(9, len(ordered_recommendations) - 1))
262
+ ordered_recommendations.insert(insert_position, {"type": "suggestedaccounts"})
263
+
264
  def generate():
265
  yield '{"status": "success", "recommendations": ['
266
  for i, rec in enumerate(ordered_recommendations):
requirements.txt CHANGED
@@ -8,4 +8,5 @@ fastapi
8
  python-dotenv
9
  uvicorn
10
  apscheduler
11
- pytz
 
 
8
  python-dotenv
9
  uvicorn
10
  apscheduler
11
+ pytz
12
+ aiohttp