File size: 7,108 Bytes
7955aea
eef72cf
7955aea
 
efc752a
63236b6
efc752a
eef72cf
7955aea
 
 
eef72cf
efc752a
7955aea
b7cf7c2
efc752a
f7228a7
efc752a
7955aea
 
 
63236b6
712d1a7
7955aea
eef72cf
 
 
 
 
7955aea
 
 
eef72cf
 
 
efc752a
 
 
63236b6
eef72cf
 
 
 
63236b6
712d1a7
7955aea
eef72cf
7955aea
eef72cf
63236b6
 
eef72cf
 
efc752a
eef72cf
7955aea
eef72cf
 
efc752a
eef72cf
 
 
 
 
 
 
 
7955aea
 
712d1a7
7955aea
eef72cf
 
7955aea
 
 
eef72cf
 
63236b6
712d1a7
63236b6
eef72cf
 
 
 
712d1a7
7955aea
 
 
eef72cf
7955aea
 
 
 
712d1a7
efc752a
7955aea
efc752a
 
eef72cf
 
 
 
7955aea
eef72cf
 
7955aea
 
efc752a
7955aea
efc752a
eef72cf
efc752a
eef72cf
efc752a
63236b6
efc752a
63236b6
 
 
eef72cf
 
63236b6
eef72cf
63236b6
eef72cf
63236b6
eef72cf
63236b6
eef72cf
 
63236b6
eef72cf
 
 
63236b6
 
 
eef72cf
 
 
 
 
7955aea
eef72cf
712d1a7
7955aea
eef72cf
712d1a7
7955aea
eef72cf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
SharePUTER™ v2.1 Worker Node — rank injection for unified execution
"""

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
import httpx, asyncio, os, sys, io, json, time, traceback, multiprocessing
from contextlib import redirect_stdout, redirect_stderr
from typing import Optional

app = FastAPI(title="SharePUTER™ v2.1 Worker")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])

HEAD_URL = os.environ.get("SACCP_HEAD_URL", "https://bc-ai-saccp-head.hf.space")
NODE_TYPE = os.environ.get("SACCP_NODE_TYPE", "CPU").upper()
NODE_OWNER = os.environ.get("SACCP_NODE_OWNER", "Keeby1237")
AUTO_REG = os.environ.get("SACCP_AUTO_REGISTER", "true").lower() == "true"
NODE_ID: Optional[str] = None
REGISTERED = False

import psutil

def detect_specs():
    s = {"cpu_count": multiprocessing.cpu_count(),
         "ram_total_gb": round(psutil.virtual_memory().total/(1024**3), 2),
         "ram_available_gb": round(psutil.virtual_memory().available/(1024**3), 2),
         "platform": sys.platform, "python": sys.version.split()[0],
         "gpu_available": False, "gpu_name": "", "gpu_memory_gb": 0}
    try:
        import torch
        if torch.cuda.is_available():
            s.update(gpu_available=True, gpu_name=torch.cuda.get_device_name(0),
                     gpu_memory_gb=round(torch.cuda.get_device_properties(0).total_mem/(1024**3), 2))
    except: pass
    return s

def detect_libs():
    out = []
    for lib in ["numpy","pandas","scipy","scikit-learn","torch","transformers","tensorflow",
                "keras","pillow","requests","httpx","fastapi","pydantic","matplotlib","seaborn"]:
        try: __import__(lib.replace("-","_")); out.append(lib)
        except: pass
    return out

SPECS = detect_specs()
LIBS = detect_libs()

def safe_execute(code, timeout=600):
    result = {"status": "completed", "result": None, "stdout": "", "error": None,
              "execution_time": 0, "resource_usage": {}}
    out, err = io.StringIO(), io.StringIO()
    ns = {}
    start = time.time()
    mem0 = psutil.Process().memory_info().rss
    try:
        with redirect_stdout(out), redirect_stderr(err):
            exec(compile(code, "<saccp>", "exec"), ns)
        result["execution_time"] = round(time.time() - start, 3)
        result["stdout"] = out.getvalue()
        result["resource_usage"] = {"time_s": result["execution_time"],
            "mem_mb": round((psutil.Process().memory_info().rss - mem0)/(1024**2), 2)}
        for var in ["result", "__saccp_result__", "output", "__saccp_results__"]:
            if var in ns and ns[var] is not None:
                v = ns[var]
                try: json.dumps(v); result["result"] = v
                except: result["result"] = str(v)[:5000]
                break
        if result["result"] is None and result["stdout"]:
            result["result"] = result["stdout"].strip()[-5000:]
    except Exception as e:
        result.update(status="failed", error=f"{type(e).__name__}: {e}\n{traceback.format_exc()[-1500:]}",
                      execution_time=round(time.time()-start, 3), stdout=out.getvalue())
    return result

@app.get("/", response_class=HTMLResponse)
async def home():
    ram = round(psutil.virtual_memory().available/(1024**3), 2)
    return HTMLResponse(f"""<!DOCTYPE html><html><head><title>Worker</title>
<style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:600px;margin:0 auto}}
h1{{color:#00ff88}}.s{{background:#111;padding:12px;margin:8px 0;border-radius:8px}}</style></head>
<body><h1>🤖 Worker v2.1</h1><p>ID: <b>{NODE_ID or '—'}</b> | {NODE_TYPE} |
<b style="color:#00ff88">{'ACTIVE' if REGISTERED else 'WAITING'}</b></p>
<div class="s">CPUs: {SPECS['cpu_count']}</div><div class="s">RAM: {ram}/{SPECS['ram_total_gb']}GB</div>
<div class="s">GPU: {SPECS['gpu_name'] or 'None'}</div><div class="s">Libs: {', '.join(LIBS)}</div>
</body></html>""")

@app.get("/health")
async def health():
    return {"status": "online", "node_id": NODE_ID, "registered": REGISTERED, "v": "2.1"}

async def worker_loop():
    global NODE_ID, REGISTERED
    while not REGISTERED:
        await asyncio.sleep(2)
        if AUTO_REG:
            try:
                async with httpx.AsyncClient(timeout=15) as c:
                    r = await c.post(f"{HEAD_URL}/api/register_node", json={
                        "node_type": NODE_TYPE, "node_url": os.environ.get("SPACE_HOST", ""),
                        "owner": NODE_OWNER, "specs": SPECS, "installed_libs": LIBS})
                    if r.status_code == 200: NODE_ID = r.json().get("node_id"); REGISTERED = True; print(f"[W] Registered {NODE_ID}")
            except Exception as e: print(f"[W] Reg fail: {e}")

    print(f"[W] Active: {NODE_ID} ({NODE_TYPE})")
    last_hb = 0
    while True:
        try:
            if time.time() - last_hb > 25:
                try:
                    async with httpx.AsyncClient(timeout=10) as c:
                        await c.post(f"{HEAD_URL}/api/node_heartbeat", json={"node_id": NODE_ID, "status": "online", "installed_libs": LIBS})
                    last_hb = time.time()
                except: pass

            work = None
            async with httpx.AsyncClient(timeout=15) as c:
                r = await c.get(f"{HEAD_URL}/api/get_work", params={
                    "node_id": NODE_ID, "node_type": NODE_TYPE,
                    "has_gpu": str(SPECS['gpu_available']).lower(),
                    "ram_gb": SPECS['ram_available_gb'], "libs": ",".join(LIBS)})
                if r.status_code == 200: work = r.json().get("work")

            if not work: await asyncio.sleep(1); continue

            fid = work["fragment_id"]
            code = work.get("code", "")
            print(f"[W] ▶ {fid[:24]}...")

            result = safe_execute(code, work.get("timeout_seconds", 600))
            print(f"[W] ✓ {fid[:20]}... → {result['status']} ({result['execution_time']}s)")

            payload = {"fragment_id": fid, "node_id": NODE_ID, "status": result["status"],
                       "result": result.get("result"), "error": result.get("error") or "",
                       "stdout": result.get("stdout") or "", "resource_usage": result.get("resource_usage", {})}

            for attempt in range(3):
                try:
                    async with httpx.AsyncClient(timeout=30) as sc:
                        sr = await sc.post(f"{HEAD_URL}/api/submit_result", json=payload)
                        if sr.status_code == 200: print(f"[W] 📤 Submitted {fid[:20]}..."); break
                        else: print(f"[W] ⚠ Submit {sr.status_code}: {sr.text[:100]}")
                except Exception as e: print(f"[W] ⚠ Submit attempt {attempt+1}: {e}"); await asyncio.sleep(1)

        except Exception as e: print(f"[W] Err: {e}"); await asyncio.sleep(3)

@app.on_event("startup")
async def startup(): asyncio.create_task(worker_loop())

if __name__ == "__main__":
    import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7861)