Bc-AI commited on
Commit
eef72cf
·
verified ·
1 Parent(s): 63236b6

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +66 -145
app.py CHANGED
@@ -1,21 +1,18 @@
1
  """
2
- SharePUTER™ v2 Worker Node — v2.0.3 FIXED
3
- Key fix: separate httpx client for get_work vs submit_result
4
  """
5
 
6
  from fastapi import FastAPI
7
  from fastapi.responses import HTMLResponse
8
  from fastapi.middleware.cors import CORSMiddleware
9
- import httpx, asyncio, uuid, os, sys, io, json, time, traceback
10
- import multiprocessing
11
- from datetime import datetime
12
  from contextlib import redirect_stdout, redirect_stderr
13
  from typing import Optional
14
 
15
- app = FastAPI(title="SharePUTER™ v2 Worker")
16
  app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
17
 
18
- HEAD_URL = os.environ.get("SACCP_HEAD_URL", "https://bc-ai-saccp-head.hf.space")
19
  NODE_TYPE = os.environ.get("SACCP_NODE_TYPE", "CPU").upper()
20
  NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "anonymous")
21
  AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true"
@@ -25,206 +22,130 @@ REGISTERED = False
25
  import psutil
26
 
27
  def detect_specs():
28
- s = {
29
- "cpu_count": multiprocessing.cpu_count(),
30
- "ram_total_gb": round(psutil.virtual_memory().total / (1024**3), 2),
31
- "ram_available_gb": round(psutil.virtual_memory().available / (1024**3), 2),
32
- "platform": sys.platform,
33
- "python": sys.version.split()[0],
34
- "gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0,
35
- }
36
  try:
37
  import torch
38
  if torch.cuda.is_available():
39
- s["gpu_available"] = True
40
- s["gpu_name"] = torch.cuda.get_device_name(0)
41
- s["gpu_memory_gb"] = round(torch.cuda.get_device_properties(0).total_mem / (1024**3), 2)
42
- except:
43
- pass
44
  return s
45
 
46
  def detect_libs():
47
- common = ["numpy","pandas","scipy","scikit-learn","torch","transformers",
48
- "tensorflow","keras","pillow","requests","httpx","fastapi","pydantic"]
49
  out = []
50
- for lib in common:
51
- try:
52
- __import__(lib.replace("-","_"))
53
- out.append(lib)
54
- except:
55
- pass
56
  return out
57
 
58
  SPECS = detect_specs()
59
- INSTALLED_LIBS = detect_libs()
60
-
61
 
62
- def safe_execute(code: str, timeout: int = 600) -> dict:
63
  result = {"status": "completed", "result": None, "stdout": "", "error": None,
64
  "execution_time": 0, "resource_usage": {}}
65
- stdout_cap = io.StringIO()
66
- stderr_cap = io.StringIO()
67
- namespace = {}
68
  start = time.time()
69
- start_mem = psutil.Process().memory_info().rss
70
-
71
  try:
72
- with redirect_stdout(stdout_cap), redirect_stderr(stderr_cap):
73
- exec(compile(code, "<fragment>", "exec"), namespace)
74
  result["execution_time"] = round(time.time() - start, 3)
75
- result["stdout"] = stdout_cap.getvalue()
76
- end_mem = psutil.Process().memory_info().rss
77
- result["resource_usage"] = {
78
- "time_seconds": result["execution_time"],
79
- "memory_delta_mb": round((end_mem - start_mem) / (1024**2), 2),
80
- }
81
- for var in ["result", "__saccp_result__", "__saccp_results__", "output"]:
82
- if var in namespace and namespace[var] is not None:
83
- val = namespace[var]
84
- try:
85
- json.dumps(val)
86
- result["result"] = val
87
- except:
88
- result["result"] = str(val)[:5000]
89
  break
90
  if result["result"] is None and result["stdout"]:
91
  result["result"] = result["stdout"].strip()[-5000:]
92
  except Exception as e:
93
- result["status"] = "failed"
94
- result["error"] = f"{type(e).__name__}: {str(e)}\n{traceback.format_exc()[-1500:]}"
95
- result["execution_time"] = round(time.time() - start, 3)
96
- result["stdout"] = stdout_cap.getvalue()
97
  return result
98
 
99
-
100
  @app.get("/", response_class=HTMLResponse)
101
- async def homepage():
102
- ram = round(psutil.virtual_memory().available / (1024**3), 2)
103
  return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title>
104
  <style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}}
105
  h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head>
106
- <body><h1>🤖 SharePUTER Worker v2.0.3</h1>
107
- <p>Node: <b>{NODE_ID or 'unregistered'}</b> | Type: <b>{NODE_TYPE}</b> |
108
- Status: <b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p>
109
- <div class="s">CPUs: {SPECS['cpu_count']}</div>
110
- <div class="s">RAM: {ram}/{SPECS['ram_total_gb']} GB</div>
111
- <div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div>
112
- <div class="s">Libs: {', '.join(INSTALLED_LIBS)}</div>
113
  </body></html>""")
114
 
115
-
116
  @app.get("/health")
117
  async def health():
118
- return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "version": "2.0.3"}
119
-
120
 
121
  async def worker_loop():
122
  global NODE_ID, REGISTERED
123
-
124
  while not REGISTERED:
125
  await asyncio.sleep(2)
126
  if AUTO_REG:
127
  try:
128
  async with httpx.AsyncClient(timeout=15) as c:
129
  r = await c.post(f"{HEAD_URL}/api/register_node", json={
130
- "node_type": NODE_TYPE,
131
- "node_url": os.environ.get("SPACE_HOST", "http://localhost:7861"),
132
- "owner": NODE_OWNER, "specs": SPECS, "installed_libs": INSTALLED_LIBS,
133
- })
134
- if r.status_code == 200:
135
- NODE_ID = r.json().get("node_id")
136
- REGISTERED = True
137
- print(f"[SACCP] Registered as {NODE_ID}")
138
- except Exception as e:
139
- print(f"[SACCP] Reg failed: {e}")
140
-
141
- print(f"[SACCP] Worker active: {NODE_ID} ({NODE_TYPE})")
142
- last_hb = 0
143
 
 
 
144
  while True:
145
  try:
146
- # ─── Heartbeat ───
147
  if time.time() - last_hb > 25:
148
  try:
149
  async with httpx.AsyncClient(timeout=10) as c:
150
- await c.post(f"{HEAD_URL}/api/node_heartbeat", json={
151
- "node_id": NODE_ID, "status": "online",
152
- "installed_libs": INSTALLED_LIBS,
153
- })
154
  last_hb = time.time()
155
- except:
156
- pass
157
 
158
- # ─── Get Work (separate client) ───
159
  work = None
160
  async with httpx.AsyncClient(timeout=15) as c:
161
- libs_str = ",".join(INSTALLED_LIBS)
162
  r = await c.get(f"{HEAD_URL}/api/get_work", params={
163
  "node_id": NODE_ID, "node_type": NODE_TYPE,
164
  "has_gpu": str(SPECS['gpu_available']).lower(),
165
- "ram_gb": SPECS['ram_available_gb'], "libs": libs_str,
166
- })
167
- if r.status_code == 200:
168
- work = r.json().get("work")
169
 
170
- if not work:
171
- await asyncio.sleep(1)
172
- continue
173
 
174
- frag_id = work["fragment_id"]
175
  code = work.get("code", "")
176
- ftype = work.get("fragment_type", "compute")
177
- timeout = work.get("timeout_seconds", 600)
178
 
179
- print(f"[SACCP] Running {frag_id} ({ftype})")
 
180
 
181
- # ─── Execute ───
182
- exec_result = safe_execute(code, timeout=timeout)
 
183
 
184
- print(f"[SACCP] ✓ Executed {frag_id[:20]}... → {exec_result['status']} ({exec_result['execution_time']}s)")
185
-
186
- # ─── Submit Result (SEPARATE client, fresh connection) ───
187
- submit_payload = {
188
- "fragment_id": frag_id,
189
- "node_id": NODE_ID,
190
- "status": exec_result["status"],
191
- "result": exec_result.get("result"),
192
- "error": exec_result.get("error") or "",
193
- "stdout": exec_result.get("stdout") or "",
194
- "resource_usage": exec_result.get("resource_usage", {}),
195
- }
196
-
197
- submit_ok = False
198
  for attempt in range(3):
199
  try:
200
- async with httpx.AsyncClient(timeout=30) as submit_client:
201
- sr = await submit_client.post(
202
- f"{HEAD_URL}/api/submit_result",
203
- json=submit_payload
204
- )
205
- if sr.status_code == 200:
206
- print(f"[SACCP] 📤 Result submitted for {frag_id[:20]}...")
207
- submit_ok = True
208
- break
209
- else:
210
- print(f"[SACCP] ⚠️ Submit got {sr.status_code}: {sr.text[:200]}")
211
- except Exception as e:
212
- print(f"[SACCP] ⚠️ Submit attempt {attempt+1} failed: {e}")
213
- await asyncio.sleep(1)
214
-
215
- if not submit_ok:
216
- print(f"[SACCP] ❌ FAILED to submit result for {frag_id} after 3 attempts!")
217
-
218
- except Exception as e:
219
- print(f"[SACCP] Loop error: {e}")
220
- await asyncio.sleep(3)
221
 
 
222
 
223
  @app.on_event("startup")
224
- async def startup():
225
- asyncio.create_task(worker_loop())
226
-
227
 
228
  if __name__ == "__main__":
229
- import uvicorn
230
- uvicorn.run(app, host="0.0.0.0", port=7861)
 
1
  """
2
+ SharePUTER™ v2.1 Worker Node — rank injection for unified execution
 
3
  """
4
 
5
  from fastapi import FastAPI
6
  from fastapi.responses import HTMLResponse
7
  from fastapi.middleware.cors import CORSMiddleware
8
+ import httpx, asyncio, os, sys, io, json, time, traceback, multiprocessing
 
 
9
  from contextlib import redirect_stdout, redirect_stderr
10
  from typing import Optional
11
 
12
+ app = FastAPI(title="SharePUTER™ v2.1 Worker")
13
  app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
14
 
15
+ HEAD_URL = os.environ.get("SACCP_HEAD_URL", "http://localhost:7860")
16
  NODE_TYPE = os.environ.get("SACCP_NODE_TYPE", "CPU").upper()
17
  NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "anonymous")
18
  AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true"
 
22
  import psutil
23
 
24
  def detect_specs():
25
+ s = {"cpu_count": multiprocessing.cpu_count(),
26
+ "ram_total_gb": round(psutil.virtual_memory().total/(1024**3), 2),
27
+ "ram_available_gb": round(psutil.virtual_memory().available/(1024**3), 2),
28
+ "platform": sys.platform, "python": sys.version.split()[0],
29
+ "gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0}
 
 
 
30
  try:
31
  import torch
32
  if torch.cuda.is_available():
33
+ s.update(gpu_available=True, gpu_name=torch.cuda.get_device_name(0),
34
+ gpu_memory_gb=round(torch.cuda.get_device_properties(0).total_mem/(1024**3), 2))
35
+ except: pass
 
 
36
  return s
37
 
38
  def detect_libs():
 
 
39
  out = []
40
+ for lib in ["numpy","pandas","scipy","scikit-learn","torch","transformers","tensorflow",
41
+ "keras","pillow","requests","httpx","fastapi","pydantic","matplotlib","seaborn"]:
42
+ try: __import__(lib.replace("-","_")); out.append(lib)
43
+ except: pass
 
 
44
  return out
45
 
46
  SPECS = detect_specs()
47
+ LIBS = detect_libs()
 
48
 
49
+ def safe_execute(code, timeout=600):
50
  result = {"status": "completed", "result": None, "stdout": "", "error": None,
51
  "execution_time": 0, "resource_usage": {}}
52
+ out, err = io.StringIO(), io.StringIO()
53
+ ns = {}
 
54
  start = time.time()
55
+ mem0 = psutil.Process().memory_info().rss
 
56
  try:
57
+ with redirect_stdout(out), redirect_stderr(err):
58
+ exec(compile(code, "<saccp>", "exec"), ns)
59
  result["execution_time"] = round(time.time() - start, 3)
60
+ result["stdout"] = out.getvalue()
61
+ result["resource_usage"] = {"time_s": result["execution_time"],
62
+ "mem_mb": round((psutil.Process().memory_info().rss - mem0)/(1024**2), 2)}
63
+ for var in ["result", "__saccp_result__", "output", "__saccp_results__"]:
64
+ if var in ns and ns[var] is not None:
65
+ v = ns[var]
66
+ try: json.dumps(v); result["result"] = v
67
+ except: result["result"] = str(v)[:5000]
 
 
 
 
 
 
68
  break
69
  if result["result"] is None and result["stdout"]:
70
  result["result"] = result["stdout"].strip()[-5000:]
71
  except Exception as e:
72
+ result.update(status="failed", error=f"{type(e).__name__}: {e}\n{traceback.format_exc()[-1500:]}",
73
+ execution_time=round(time.time()-start, 3), stdout=out.getvalue())
 
 
74
  return result
75
 
 
76
  @app.get("/", response_class=HTMLResponse)
77
+ async def home():
78
+ ram = round(psutil.virtual_memory().available/(1024**3), 2)
79
  return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title>
80
  <style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}}
81
  h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head>
82
+ <body><h1>🤖 Worker v2.1</h1><p>ID: <b>{NODE_ID or '—'}</b> | {NODE_TYPE} |
83
+ <b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p>
84
+ <div class="s">CPUs: {SPECS['cpu_count']}</div><div class="s">RAM: {ram}/{SPECS['ram_total_gb']}GB</div>
85
+ <div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div><div class="s">Libs: {', '.join(LIBS)}</div>
 
 
 
86
  </body></html>""")
87
 
 
88
  @app.get("/health")
89
  async def health():
90
+ return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "v": "2.1"}
 
91
 
92
  async def worker_loop():
93
  global NODE_ID, REGISTERED
 
94
  while not REGISTERED:
95
  await asyncio.sleep(2)
96
  if AUTO_REG:
97
  try:
98
  async with httpx.AsyncClient(timeout=15) as c:
99
  r = await c.post(f"{HEAD_URL}/api/register_node", json={
100
+ "node_type": NODE_TYPE, "node_url": os.environ.get("SPACE_HOST", ""),
101
+ "owner": NODE_OWNER, "specs": SPECS, "installed_libs": LIBS})
102
+ if r.status_code == 200: NODE_ID = r.json().get("node_id"); REGISTERED = True; print(f"[W] Registered {NODE_ID}")
103
+ except Exception as e: print(f"[W] Reg fail: {e}")
 
 
 
 
 
 
 
 
 
104
 
105
+ print(f"[W] Active: {NODE_ID} ({NODE_TYPE})")
106
+ last_hb = 0
107
  while True:
108
  try:
 
109
  if time.time() - last_hb > 25:
110
  try:
111
  async with httpx.AsyncClient(timeout=10) as c:
112
+ await c.post(f"{HEAD_URL}/api/node_heartbeat", json={"node_id": NODE_ID, "status": "online", "installed_libs": LIBS})
 
 
 
113
  last_hb = time.time()
114
+ except: pass
 
115
 
 
116
  work = None
117
  async with httpx.AsyncClient(timeout=15) as c:
 
118
  r = await c.get(f"{HEAD_URL}/api/get_work", params={
119
  "node_id": NODE_ID, "node_type": NODE_TYPE,
120
  "has_gpu": str(SPECS['gpu_available']).lower(),
121
+ "ram_gb": SPECS['ram_available_gb'], "libs": ",".join(LIBS)})
122
+ if r.status_code == 200: work = r.json().get("work")
 
 
123
 
124
+ if not work: await asyncio.sleep(1); continue
 
 
125
 
126
+ fid = work["fragment_id"]
127
  code = work.get("code", "")
128
+ print(f"[W] {fid[:24]}...")
 
129
 
130
+ result = safe_execute(code, work.get("timeout_seconds", 600))
131
+ print(f"[W] ✓ {fid[:20]}... → {result['status']} ({result['execution_time']}s)")
132
 
133
+ payload = {"fragment_id": fid, "node_id": NODE_ID, "status": result["status"],
134
+ "result": result.get("result"), "error": result.get("error") or "",
135
+ "stdout": result.get("stdout") or "", "resource_usage": result.get("resource_usage", {})}
136
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
  for attempt in range(3):
138
  try:
139
+ async with httpx.AsyncClient(timeout=30) as sc:
140
+ sr = await sc.post(f"{HEAD_URL}/api/submit_result", json=payload)
141
+ if sr.status_code == 200: print(f"[W] 📤 Submitted {fid[:20]}..."); break
142
+ else: print(f"[W] ⚠ Submit {sr.status_code}: {sr.text[:100]}")
143
+ except Exception as e: print(f"[W] ⚠ Submit attempt {attempt+1}: {e}"); await asyncio.sleep(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
+ except Exception as e: print(f"[W] Err: {e}"); await asyncio.sleep(3)
146
 
147
  @app.on_event("startup")
148
+ async def startup(): asyncio.create_task(worker_loop())
 
 
149
 
150
  if __name__ == "__main__":
151
+ import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7861)