Spaces:
Runtime error
Runtime error
File size: 3,141 Bytes
edc3562 07656ee edc3562 2688686 edc3562 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | """
Overseer Worker Dispatch Module
Add this to your overseer to dispatch tasks to worker agents.
"""
import os
import asyncio
import httpx
from typing import List, Optional
# === WORKER CONFIG ===
WORKERS = {
"friend_hf": os.environ.get(
"FRIEND_WORKER_URL", "https://hitanshu10-WORKER.hf.space/execute"
),
"local": os.environ.get(
"LOCAL_WORKER_URL", "https://struggle-trend-possible.ngrok-free.dev/execute"
),
"friend_laptop": os.environ.get(
"FRIEND_LAPTOP_URL",
"https://palpitant-silvicolous-jamey.ngrok-free.dev/execute",
),
}
ROUND_ROBIN = {"friend_hf": 0, "local": 0}
async def dispatch_to_worker(task: dict, worker_name: Optional[str] = None) -> dict:
"""
Send task to a worker and get result.
Args:
task: Dict with keys like 'instructions', 'context', 'worker_role', 'task_id'
worker_name: Specific worker to use, or None for round-robin
Returns:
Dict with 'status', 'worker_id', 'result'
"""
# Pick worker
if worker_name is None:
worker_name = "friend_hf" # Default to friend
# To use both in round-robin:
# worker_name = min(ROUND_ROBIN, key=ROUND_ROBIN.get)
# ROUND_ROBIN[worker_name] += 1
worker_url = WORKERS.get(worker_name)
if not worker_url:
return {"status": "error", "error": f"Unknown worker: {worker_name}"}
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(worker_url, json=task)
if response.status_code == 200:
result = response.json()
return {
"status": "success",
"worker_id": worker_name,
"result": result.get("result", {}),
}
else:
return {
"status": "error",
"worker_id": worker_name,
"error": f"Worker returned {response.status_code}",
}
except Exception as e:
return {"status": "error", "worker_id": worker_name, "error": str(e)}
async def dispatch_to_all_workers(task: dict) -> List[dict]:
"""Send task to all workers, return all results."""
results = []
for worker_name, worker_url in WORKERS.items():
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(worker_url, json=task)
if response.status_code == 200:
results.append(
{
"worker_id": worker_name,
"result": response.json().get("result", {}),
}
)
except Exception as e:
results.append({"worker_id": worker_name, "error": str(e)})
return results
def get_worker_urls() -> dict:
"""Get current worker URLs."""
return WORKERS.copy()
def set_worker_url(name: str, url: str):
"""Update a worker URL (e.g., after starting ngrok)."""
WORKERS[name] = url
print(f"Updated worker '{name}' URL to: {url}")
|