Bc-AI commited on
Commit
63236b6
Β·
verified Β·
1 Parent(s): 0008b67

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +89 -102
app.py CHANGED
@@ -1,12 +1,13 @@
1
  """
2
- SharePUTERβ„’ v2 Worker Node β€” FIXED
 
3
  """
4
 
5
  from fastapi import FastAPI
6
- from fastapi.responses import HTMLResponse, JSONResponse
7
  from fastapi.middleware.cors import CORSMiddleware
8
  import httpx, asyncio, uuid, os, sys, io, json, time, traceback
9
- import subprocess, multiprocessing
10
  from datetime import datetime
11
  from contextlib import redirect_stdout, redirect_stderr
12
  from typing import Optional
@@ -21,18 +22,16 @@ AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true"
21
  NODE_ID: Optional[str] = None
22
  REGISTERED = False
23
 
 
24
 
25
  def detect_specs():
26
- import psutil
27
  s = {
28
  "cpu_count": multiprocessing.cpu_count(),
29
  "ram_total_gb": round(psutil.virtual_memory().total / (1024**3), 2),
30
  "ram_available_gb": round(psutil.virtual_memory().available / (1024**3), 2),
31
  "platform": sys.platform,
32
  "python": sys.version.split()[0],
33
- "gpu_available": False,
34
- "gpu_name": "",
35
- "gpu_memory_gb": 0,
36
  }
37
  try:
38
  import torch
@@ -40,111 +39,88 @@ def detect_specs():
40
  s["gpu_available"] = True
41
  s["gpu_name"] = torch.cuda.get_device_name(0)
42
  s["gpu_memory_gb"] = round(torch.cuda.get_device_properties(0).total_mem / (1024**3), 2)
43
- except ImportError:
44
  pass
45
  return s
46
 
47
-
48
  def detect_libs():
49
- common = ["numpy", "pandas", "scipy", "scikit-learn", "torch", "transformers",
50
- "tensorflow", "keras", "pillow", "requests", "httpx", "fastapi", "pydantic"]
51
- installed = []
52
  for lib in common:
53
  try:
54
- __import__(lib.replace("-", "_"))
55
- installed.append(lib)
56
- except ImportError:
57
  pass
58
- return installed
59
-
60
 
61
- import psutil
62
  SPECS = detect_specs()
63
  INSTALLED_LIBS = detect_libs()
64
 
65
 
66
  def safe_execute(code: str, timeout: int = 600) -> dict:
67
- """Execute Python code safely."""
68
- result = {
69
- "status": "completed",
70
- "result": None,
71
- "stdout": "",
72
- "error": None,
73
- "execution_time": 0,
74
- "resource_usage": {}
75
- }
76
-
77
  stdout_cap = io.StringIO()
78
  stderr_cap = io.StringIO()
79
  namespace = {}
80
-
81
  start = time.time()
82
  start_mem = psutil.Process().memory_info().rss
83
 
84
  try:
85
  with redirect_stdout(stdout_cap), redirect_stderr(stderr_cap):
86
  exec(compile(code, "<fragment>", "exec"), namespace)
87
-
88
  result["execution_time"] = round(time.time() - start, 3)
89
  result["stdout"] = stdout_cap.getvalue()
90
-
91
  end_mem = psutil.Process().memory_info().rss
92
  result["resource_usage"] = {
93
  "time_seconds": result["execution_time"],
94
  "memory_delta_mb": round((end_mem - start_mem) / (1024**2), 2),
95
  }
96
-
97
- # Capture result variable
98
  for var in ["result", "__saccp_result__", "__saccp_results__", "output"]:
99
  if var in namespace and namespace[var] is not None:
100
  val = namespace[var]
101
  try:
102
  json.dumps(val)
103
  result["result"] = val
104
- except (TypeError, ValueError):
105
  result["result"] = str(val)[:5000]
106
  break
107
-
108
  if result["result"] is None and result["stdout"]:
109
  result["result"] = result["stdout"].strip()[-5000:]
110
-
111
  except Exception as e:
112
  result["status"] = "failed"
113
  result["error"] = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()[-1500:]}"
114
  result["execution_time"] = round(time.time() - start, 3)
115
  result["stdout"] = stdout_cap.getvalue()
116
-
117
  return result
118
 
119
 
120
  @app.get("/", response_class=HTMLResponse)
121
  async def homepage():
122
- import psutil
123
- ram_avail = round(psutil.virtual_memory().available / (1024**3), 2)
124
- return HTMLResponse(f"""<!DOCTYPE html>
125
- <html><head><title>SharePUTER Worker</title>
126
  <style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}}
127
- h1{{color:#00ff88}}.stat{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head>
128
- <body><h1>πŸ€– SharePUTER Worker</h1>
129
- <p>Node ID: <strong>{NODE_ID or 'Not registered'}</strong></p>
130
- <p>Type: <strong>{NODE_TYPE}</strong></p>
131
- <p>Status: <strong style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</strong></p>
132
- <div class="stat">CPUs: {SPECS['cpu_count']}</div>
133
- <div class="stat">RAM: {ram_avail} / {SPECS['ram_total_gb']} GB</div>
134
- <div class="stat">GPU: {SPECS['gpu_name'] or 'None'}</div>
135
- <div class="stat">Libs: {', '.join(INSTALLED_LIBS[:10])}</div>
136
  </body></html>""")
137
 
138
 
139
  @app.get("/health")
140
  async def health():
141
- return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED}
142
 
143
 
144
  async def worker_loop():
145
  global NODE_ID, REGISTERED
146
-
147
- # Wait for registration
148
  while not REGISTERED:
149
  await asyncio.sleep(2)
150
  if AUTO_REG:
@@ -153,83 +129,94 @@ async def worker_loop():
153
  r = await c.post(f"{HEAD_URL}/api/register_node", json={
154
  "node_type": NODE_TYPE,
155
  "node_url": os.environ.get("SPACE_HOST", "http://localhost:7861"),
156
- "owner": NODE_OWNER,
157
- "specs": SPECS,
158
- "installed_libs": INSTALLED_LIBS,
159
  })
160
  if r.status_code == 200:
161
  NODE_ID = r.json().get("node_id")
162
  REGISTERED = True
163
  print(f"[SACCP] Registered as {NODE_ID}")
164
  except Exception as e:
165
- print(f"[SACCP] Registration failed: {e}")
166
 
167
  print(f"[SACCP] Worker active: {NODE_ID} ({NODE_TYPE})")
168
  last_hb = 0
169
 
170
  while True:
171
  try:
172
- # Heartbeat every 25 seconds
173
  if time.time() - last_hb > 25:
174
  try:
175
  async with httpx.AsyncClient(timeout=10) as c:
176
  await c.post(f"{HEAD_URL}/api/node_heartbeat", json={
177
- "node_id": NODE_ID,
178
- "status": "online",
179
  "installed_libs": INSTALLED_LIBS,
180
  })
181
  last_hb = time.time()
182
  except:
183
  pass
184
 
185
- # Get work
 
186
  async with httpx.AsyncClient(timeout=15) as c:
187
  libs_str = ",".join(INSTALLED_LIBS)
188
- r = await c.get(
189
- f"{HEAD_URL}/api/get_work",
190
- params={
191
- "node_id": NODE_ID,
192
- "node_type": NODE_TYPE,
193
- "has_gpu": str(SPECS['gpu_available']).lower(),
194
- "ram_gb": SPECS['ram_available_gb'],
195
- "libs": libs_str,
196
- }
197
- )
198
-
199
- if r.status_code != 200:
200
- await asyncio.sleep(2)
201
- continue
202
-
203
- work = r.json().get("work")
204
- if not work:
205
- await asyncio.sleep(1)
206
- continue
207
-
208
- frag_id = work["fragment_id"]
209
- code = work.get("code", "")
210
- ftype = work.get("fragment_type", "compute")
211
- timeout = work.get("timeout_seconds", 600)
212
-
213
- print(f"[SACCP] Running {frag_id} ({ftype})")
214
-
215
- # Execute the code
216
- exec_result = safe_execute(code, timeout=timeout)
217
-
218
- # Submit result
219
- await c.post(f"{HEAD_URL}/api/submit_result", json={
220
- "fragment_id": frag_id,
221
- "node_id": NODE_ID,
222
- "status": exec_result["status"],
223
- "result": exec_result.get("result"),
224
- "error": exec_result.get("error"),
225
- "stdout": exec_result.get("stdout", ""),
226
- "resource_usage": exec_result.get("resource_usage", {}),
227
  })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
 
229
- print(f"[SACCP] {frag_id} β†’ {exec_result['status']} ({exec_result['execution_time']}s)")
 
230
 
231
  except Exception as e:
232
- print(f"[SACCP] Error: {e}")
233
  await asyncio.sleep(3)
234
 
235
 
 
1
  """
2
+ SharePUTERβ„’ v2 Worker Node β€” v2.0.3 FIXED
3
+ Key fix: separate httpx client for get_work vs submit_result
4
  """
5
 
6
  from fastapi import FastAPI
7
+ from fastapi.responses import HTMLResponse
8
  from fastapi.middleware.cors import CORSMiddleware
9
  import httpx, asyncio, uuid, os, sys, io, json, time, traceback
10
+ import multiprocessing
11
  from datetime import datetime
12
  from contextlib import redirect_stdout, redirect_stderr
13
  from typing import Optional
 
22
  NODE_ID: Optional[str] = None
23
  REGISTERED = False
24
 
25
+ import psutil
26
 
27
  def detect_specs():
 
28
  s = {
29
  "cpu_count": multiprocessing.cpu_count(),
30
  "ram_total_gb": round(psutil.virtual_memory().total / (1024**3), 2),
31
  "ram_available_gb": round(psutil.virtual_memory().available / (1024**3), 2),
32
  "platform": sys.platform,
33
  "python": sys.version.split()[0],
34
+ "gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0,
 
 
35
  }
36
  try:
37
  import torch
 
39
  s["gpu_available"] = True
40
  s["gpu_name"] = torch.cuda.get_device_name(0)
41
  s["gpu_memory_gb"] = round(torch.cuda.get_device_properties(0).total_mem / (1024**3), 2)
42
+ except:
43
  pass
44
  return s
45
 
 
46
  def detect_libs():
47
+ common = ["numpy","pandas","scipy","scikit-learn","torch","transformers",
48
+ "tensorflow","keras","pillow","requests","httpx","fastapi","pydantic"]
49
+ out = []
50
  for lib in common:
51
  try:
52
+ __import__(lib.replace("-","_"))
53
+ out.append(lib)
54
+ except:
55
  pass
56
+ return out
 
57
 
 
58
  SPECS = detect_specs()
59
  INSTALLED_LIBS = detect_libs()
60
 
61
 
62
  def safe_execute(code: str, timeout: int = 600) -> dict:
63
+ result = {"status": "completed", "result": None, "stdout": "", "error": None,
64
+ "execution_time": 0, "resource_usage": {}}
 
 
 
 
 
 
 
 
65
  stdout_cap = io.StringIO()
66
  stderr_cap = io.StringIO()
67
  namespace = {}
 
68
  start = time.time()
69
  start_mem = psutil.Process().memory_info().rss
70
 
71
  try:
72
  with redirect_stdout(stdout_cap), redirect_stderr(stderr_cap):
73
  exec(compile(code, "<fragment>", "exec"), namespace)
 
74
  result["execution_time"] = round(time.time() - start, 3)
75
  result["stdout"] = stdout_cap.getvalue()
 
76
  end_mem = psutil.Process().memory_info().rss
77
  result["resource_usage"] = {
78
  "time_seconds": result["execution_time"],
79
  "memory_delta_mb": round((end_mem - start_mem) / (1024**2), 2),
80
  }
 
 
81
  for var in ["result", "__saccp_result__", "__saccp_results__", "output"]:
82
  if var in namespace and namespace[var] is not None:
83
  val = namespace[var]
84
  try:
85
  json.dumps(val)
86
  result["result"] = val
87
+ except:
88
  result["result"] = str(val)[:5000]
89
  break
 
90
  if result["result"] is None and result["stdout"]:
91
  result["result"] = result["stdout"].strip()[-5000:]
 
92
  except Exception as e:
93
  result["status"] = "failed"
94
  result["error"] = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()[-1500:]}"
95
  result["execution_time"] = round(time.time() - start, 3)
96
  result["stdout"] = stdout_cap.getvalue()
 
97
  return result
98
 
99
 
100
  @app.get("/", response_class=HTMLResponse)
101
  async def homepage():
102
+ ram = round(psutil.virtual_memory().available / (1024**3), 2)
103
+ return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title>
 
 
104
  <style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}}
105
+ h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head>
106
+ <body><h1>πŸ€– SharePUTER Worker v2.0.3</h1>
107
+ <p>Node: <b>{NODE_ID or 'unregistered'}</b> | Type: <b>{NODE_TYPE}</b> |
108
+ Status: <b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p>
109
+ <div class="s">CPUs: {SPECS['cpu_count']}</div>
110
+ <div class="s">RAM: {ram}/{SPECS['ram_total_gb']} GB</div>
111
+ <div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div>
112
+ <div class="s">Libs: {', '.join(INSTALLED_LIBS)}</div>
 
113
  </body></html>""")
114
 
115
 
116
  @app.get("/health")
117
  async def health():
118
+ return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "version": "2.0.3"}
119
 
120
 
121
  async def worker_loop():
122
  global NODE_ID, REGISTERED
123
+
 
124
  while not REGISTERED:
125
  await asyncio.sleep(2)
126
  if AUTO_REG:
 
129
  r = await c.post(f"{HEAD_URL}/api/register_node", json={
130
  "node_type": NODE_TYPE,
131
  "node_url": os.environ.get("SPACE_HOST", "http://localhost:7861"),
132
+ "owner": NODE_OWNER, "specs": SPECS, "installed_libs": INSTALLED_LIBS,
 
 
133
  })
134
  if r.status_code == 200:
135
  NODE_ID = r.json().get("node_id")
136
  REGISTERED = True
137
  print(f"[SACCP] Registered as {NODE_ID}")
138
  except Exception as e:
139
+ print(f"[SACCP] Reg failed: {e}")
140
 
141
  print(f"[SACCP] Worker active: {NODE_ID} ({NODE_TYPE})")
142
  last_hb = 0
143
 
144
  while True:
145
  try:
146
+ # ─── Heartbeat ───
147
  if time.time() - last_hb > 25:
148
  try:
149
  async with httpx.AsyncClient(timeout=10) as c:
150
  await c.post(f"{HEAD_URL}/api/node_heartbeat", json={
151
+ "node_id": NODE_ID, "status": "online",
 
152
  "installed_libs": INSTALLED_LIBS,
153
  })
154
  last_hb = time.time()
155
  except:
156
  pass
157
 
158
+ # ─── Get Work (separate client) ───
159
+ work = None
160
  async with httpx.AsyncClient(timeout=15) as c:
161
  libs_str = ",".join(INSTALLED_LIBS)
162
+ r = await c.get(f"{HEAD_URL}/api/get_work", params={
163
+ "node_id": NODE_ID, "node_type": NODE_TYPE,
164
+ "has_gpu": str(SPECS['gpu_available']).lower(),
165
+ "ram_gb": SPECS['ram_available_gb'], "libs": libs_str,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
166
  })
167
+ if r.status_code == 200:
168
+ work = r.json().get("work")
169
+
170
+ if not work:
171
+ await asyncio.sleep(1)
172
+ continue
173
+
174
+ frag_id = work["fragment_id"]
175
+ code = work.get("code", "")
176
+ ftype = work.get("fragment_type", "compute")
177
+ timeout = work.get("timeout_seconds", 600)
178
+
179
+ print(f"[SACCP] β–Ά Running {frag_id} ({ftype})")
180
+
181
+ # ─── Execute ───
182
+ exec_result = safe_execute(code, timeout=timeout)
183
+
184
+ print(f"[SACCP] βœ“ Executed {frag_id[:20]}... β†’ {exec_result['status']} ({exec_result['execution_time']}s)")
185
+
186
+ # ─── Submit Result (SEPARATE client, fresh connection) ───
187
+ submit_payload = {
188
+ "fragment_id": frag_id,
189
+ "node_id": NODE_ID,
190
+ "status": exec_result["status"],
191
+ "result": exec_result.get("result"),
192
+ "error": exec_result.get("error") or "",
193
+ "stdout": exec_result.get("stdout") or "",
194
+ "resource_usage": exec_result.get("resource_usage", {}),
195
+ }
196
+
197
+ submit_ok = False
198
+ for attempt in range(3):
199
+ try:
200
+ async with httpx.AsyncClient(timeout=30) as submit_client:
201
+ sr = await submit_client.post(
202
+ f"{HEAD_URL}/api/submit_result",
203
+ json=submit_payload
204
+ )
205
+ if sr.status_code == 200:
206
+ print(f"[SACCP] πŸ“€ Result submitted for {frag_id[:20]}...")
207
+ submit_ok = True
208
+ break
209
+ else:
210
+ print(f"[SACCP] ⚠️ Submit got {sr.status_code}: {sr.text[:200]}")
211
+ except Exception as e:
212
+ print(f"[SACCP] ⚠️ Submit attempt {attempt+1} failed: {e}")
213
+ await asyncio.sleep(1)
214
 
215
+ if not submit_ok:
216
+ print(f"[SACCP] ❌ FAILED to submit result for {frag_id} after 3 attempts!")
217
 
218
  except Exception as e:
219
+ print(f"[SACCP] Loop error: {e}")
220
  await asyncio.sleep(3)
221
 
222