File size: 7,108 Bytes
7955aea eef72cf 7955aea efc752a 63236b6 efc752a eef72cf 7955aea eef72cf efc752a 7955aea b7cf7c2 efc752a f7228a7 efc752a 7955aea 63236b6 712d1a7 7955aea eef72cf 7955aea eef72cf efc752a 63236b6 eef72cf 63236b6 712d1a7 7955aea eef72cf 7955aea eef72cf 63236b6 eef72cf efc752a eef72cf 7955aea eef72cf efc752a eef72cf 7955aea 712d1a7 7955aea eef72cf 7955aea eef72cf 63236b6 712d1a7 63236b6 eef72cf 712d1a7 7955aea eef72cf 7955aea 712d1a7 efc752a 7955aea efc752a eef72cf 7955aea eef72cf 7955aea efc752a 7955aea efc752a eef72cf efc752a eef72cf efc752a 63236b6 efc752a 63236b6 eef72cf 63236b6 eef72cf 63236b6 eef72cf 63236b6 eef72cf 63236b6 eef72cf 63236b6 eef72cf 63236b6 eef72cf 7955aea eef72cf 712d1a7 7955aea eef72cf 712d1a7 7955aea eef72cf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | """
SharePUTER™ v2.1 Worker Node — rank injection for unified execution
"""
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
import httpx, asyncio, os, sys, io, json, time, traceback, multiprocessing
from contextlib import redirect_stdout, redirect_stderr
from typing import Optional
app = FastAPI(title="SharePUTER™ v2.1 Worker")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
HEAD_URL = os.environ.get("SACCP_HEAD_URL", "https://bc-ai-saccp-head.hf.space")
NODE_TYPE = os.environ.get("SACCP_NODE_TYPE", "CPU").upper()
NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "Keeby1237")
AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true"
NODE_ID: Optional[str] = None
REGISTERED = False
import psutil
def detect_specs():
s = {"cpu_count": multiprocessing.cpu_count(),
"ram_total_gb": round(psutil.virtual_memory().total/(1024**3), 2),
"ram_available_gb": round(psutil.virtual_memory().available/(1024**3), 2),
"platform": sys.platform, "python": sys.version.split()[0],
"gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0}
try:
import torch
if torch.cuda.is_available():
s.update(gpu_available=True, gpu_name=torch.cuda.get_device_name(0),
gpu_memory_gb=round(torch.cuda.get_device_properties(0).total_mem/(1024**3), 2))
except: pass
return s
def detect_libs():
out = []
for lib in ["numpy","pandas","scipy","scikit-learn","torch","transformers","tensorflow",
"keras","pillow","requests","httpx","fastapi","pydantic","matplotlib","seaborn"]:
try: __import__(lib.replace("-","_")); out.append(lib)
except: pass
return out
SPECS = detect_specs()
LIBS = detect_libs()
def safe_execute(code, timeout=600):
result = {"status": "completed", "result": None, "stdout": "", "error": None,
"execution_time": 0, "resource_usage": {}}
out, err = io.StringIO(), io.StringIO()
ns = {}
start = time.time()
mem0 = psutil.Process().memory_info().rss
try:
with redirect_stdout(out), redirect_stderr(err):
exec(compile(code, "<saccp>", "exec"), ns)
result["execution_time"] = round(time.time() - start, 3)
result["stdout"] = out.getvalue()
result["resource_usage"] = {"time_s": result["execution_time"],
"mem_mb": round((psutil.Process().memory_info().rss - mem0)/(1024**2), 2)}
for var in ["result", "__saccp_result__", "output", "__saccp_results__"]:
if var in ns and ns[var] is not None:
v = ns[var]
try: json.dumps(v); result["result"] = v
except: result["result"] = str(v)[:5000]
break
if result["result"] is None and result["stdout"]:
result["result"] = result["stdout"].strip()[-5000:]
except Exception as e:
result.update(status="failed", error=f"{type(e).__name__}: {e}\n{traceback.format_exc()[-1500:]}",
execution_time=round(time.time()-start, 3), stdout=out.getvalue())
return result
@app.get("/", response_class=HTMLResponse)
async def home():
ram = round(psutil.virtual_memory().available/(1024**3), 2)
return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title>
<style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}}
h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head>
<body><h1>🤖 Worker v2.1</h1><p>ID: <b>{NODE_ID or '—'}</b> | {NODE_TYPE} |
<b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p>
<div class="s">CPUs: {SPECS['cpu_count']}</div><div class="s">RAM: {ram}/{SPECS['ram_total_gb']}GB</div>
<div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div><div class="s">Libs: {', '.join(LIBS)}</div>
</body></html>""")
@app.get("/health")
async def health():
return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "v": "2.1"}
async def worker_loop():
global NODE_ID, REGISTERED
while not REGISTERED:
await asyncio.sleep(2)
if AUTO_REG:
try:
async with httpx.AsyncClient(timeout=15) as c:
r = await c.post(f"{HEAD_URL}/api/register_node", json={
"node_type": NODE_TYPE, "node_url": os.environ.get("SPACE_HOST", ""),
"owner": NODE_OWNER, "specs": SPECS, "installed_libs": LIBS})
if r.status_code == 200: NODE_ID = r.json().get("node_id"); REGISTERED = True; print(f"[W] Registered {NODE_ID}")
except Exception as e: print(f"[W] Reg fail: {e}")
print(f"[W] Active: {NODE_ID} ({NODE_TYPE})")
last_hb = 0
while True:
try:
if time.time() - last_hb > 25:
try:
async with httpx.AsyncClient(timeout=10) as c:
await c.post(f"{HEAD_URL}/api/node_heartbeat", json={"node_id": NODE_ID, "status": "online", "installed_libs": LIBS})
last_hb = time.time()
except: pass
work = None
async with httpx.AsyncClient(timeout=15) as c:
r = await c.get(f"{HEAD_URL}/api/get_work", params={
"node_id": NODE_ID, "node_type": NODE_TYPE,
"has_gpu": str(SPECS['gpu_available']).lower(),
"ram_gb": SPECS['ram_available_gb'], "libs": ",".join(LIBS)})
if r.status_code == 200: work = r.json().get("work")
if not work: await asyncio.sleep(1); continue
fid = work["fragment_id"]
code = work.get("code", "")
print(f"[W] ▶ {fid[:24]}...")
result = safe_execute(code, work.get("timeout_seconds", 600))
print(f"[W] ✓ {fid[:20]}... → {result['status']} ({result['execution_time']}s)")
payload = {"fragment_id": fid, "node_id": NODE_ID, "status": result["status"],
"result": result.get("result"), "error": result.get("error") or "",
"stdout": result.get("stdout") or "", "resource_usage": result.get("resource_usage", {})}
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=30) as sc:
sr = await sc.post(f"{HEAD_URL}/api/submit_result", json=payload)
if sr.status_code == 200: print(f"[W] 📤 Submitted {fid[:20]}..."); break
else: print(f"[W] ⚠ Submit {sr.status_code}: {sr.text[:100]}")
except Exception as e: print(f"[W] ⚠ Submit attempt {attempt+1}: {e}"); await asyncio.sleep(1)
except Exception as e: print(f"[W] Err: {e}"); await asyncio.sleep(3)
@app.on_event("startup")
async def startup(): asyncio.create_task(worker_loop())
if __name__ == "__main__":
import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7861) |