YashashMathur commited on
Commit
edc3562
·
verified ·
1 Parent(s): 23c3a1b

Add worker client with local ngrok URL

Browse files
Files changed (1) hide show
  1. overseer_worker_client.py +93 -0
overseer_worker_client.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Overseer Worker Dispatch Module
3
+ Add this to your overseer to dispatch tasks to worker agents.
4
+ """
5
+
6
+ import os
7
+ import asyncio
8
+ import httpx
9
+ from typing import List, Optional
10
+
11
+ # === WORKER CONFIG ===
12
+ WORKERS = {
13
+ "friend_hf": os.environ.get(
14
+ "FRIEND_WORKER_URL", "https://friend-space.hf.space/execute"
15
+ ),
16
+ "local": os.environ.get(
17
+ "LOCAL_WORKER_URL", "https://struggle-trend-possible.ngrok-free.dev/execute"
18
+ ),
19
+ }
20
+
21
+ ROUND_ROBIN = {"friend_hf": 0, "local": 0}
22
+
23
+
24
+ async def dispatch_to_worker(task: dict, worker_name: Optional[str] = None) -> dict:
25
+ """
26
+ Send task to a worker and get result.
27
+
28
+ Args:
29
+ task: Dict with keys like 'instructions', 'context', 'worker_role', 'task_id'
30
+ worker_name: Specific worker to use, or None for round-robin
31
+
32
+ Returns:
33
+ Dict with 'status', 'worker_id', 'result'
34
+ """
35
+ # Pick worker
36
+ if worker_name is None:
37
+ worker_name = "friend_hf" # Default to friend
38
+ # To use both in round-robin:
39
+ # worker_name = min(ROUND_ROBIN, key=ROUND_ROBIN.get)
40
+ # ROUND_ROBIN[worker_name] += 1
41
+
42
+ worker_url = WORKERS.get(worker_name)
43
+ if not worker_url:
44
+ return {"status": "error", "error": f"Unknown worker: {worker_name}"}
45
+
46
+ try:
47
+ async with httpx.AsyncClient(timeout=60.0) as client:
48
+ response = await client.post(worker_url, json=task)
49
+ if response.status_code == 200:
50
+ result = response.json()
51
+ return {
52
+ "status": "success",
53
+ "worker_id": worker_name,
54
+ "result": result.get("result", {}),
55
+ }
56
+ else:
57
+ return {
58
+ "status": "error",
59
+ "worker_id": worker_name,
60
+ "error": f"Worker returned {response.status_code}",
61
+ }
62
+ except Exception as e:
63
+ return {"status": "error", "worker_id": worker_name, "error": str(e)}
64
+
65
+
66
+ async def dispatch_to_all_workers(task: dict) -> List[dict]:
67
+ """Send task to all workers, return all results."""
68
+ results = []
69
+ for worker_name, worker_url in WORKERS.items():
70
+ try:
71
+ async with httpx.AsyncClient(timeout=60.0) as client:
72
+ response = await client.post(worker_url, json=task)
73
+ if response.status_code == 200:
74
+ results.append(
75
+ {
76
+ "worker_id": worker_name,
77
+ "result": response.json().get("result", {}),
78
+ }
79
+ )
80
+ except Exception as e:
81
+ results.append({"worker_id": worker_name, "error": str(e)})
82
+ return results
83
+
84
+
85
+ def get_worker_urls() -> dict:
86
+ """Get current worker URLs."""
87
+ return WORKERS.copy()
88
+
89
+
90
+ def set_worker_url(name: str, url: str):
91
+ """Update a worker URL (e.g., after starting ngrok)."""
92
+ WORKERS[name] = url
93
+ print(f"Updated worker '{name}' URL to: {url}")