File size: 3,141 Bytes
edc3562
 
 
 
 
 
 
 
 
 
 
 
 
07656ee
edc3562
 
 
 
2688686
 
 
 
edc3562
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
Overseer Worker Dispatch Module
Add this to your overseer to dispatch tasks to worker agents.
"""

import os
import asyncio
import httpx
from typing import List, Optional

# === WORKER CONFIG ===
WORKERS = {
    "friend_hf": os.environ.get(
        "FRIEND_WORKER_URL", "https://hitanshu10-WORKER.hf.space/execute"
    ),
    "local": os.environ.get(
        "LOCAL_WORKER_URL", "https://struggle-trend-possible.ngrok-free.dev/execute"
    ),
    "friend_laptop": os.environ.get(
        "FRIEND_LAPTOP_URL",
        "https://palpitant-silvicolous-jamey.ngrok-free.dev/execute",
    ),
}

ROUND_ROBIN = {"friend_hf": 0, "local": 0}


async def dispatch_to_worker(task: dict, worker_name: Optional[str] = None) -> dict:
    """
    Send task to a worker and get result.

    Args:
        task: Dict with keys like 'instructions', 'context', 'worker_role', 'task_id'
        worker_name: Specific worker to use, or None for round-robin

    Returns:
        Dict with 'status', 'worker_id', 'result'
    """
    # Pick worker
    if worker_name is None:
        worker_name = "friend_hf"  # Default to friend
        # To use both in round-robin:
        # worker_name = min(ROUND_ROBIN, key=ROUND_ROBIN.get)
        # ROUND_ROBIN[worker_name] += 1

    worker_url = WORKERS.get(worker_name)
    if not worker_url:
        return {"status": "error", "error": f"Unknown worker: {worker_name}"}

    try:
        async with httpx.AsyncClient(timeout=60.0) as client:
            response = await client.post(worker_url, json=task)
            if response.status_code == 200:
                result = response.json()
                return {
                    "status": "success",
                    "worker_id": worker_name,
                    "result": result.get("result", {}),
                }
            else:
                return {
                    "status": "error",
                    "worker_id": worker_name,
                    "error": f"Worker returned {response.status_code}",
                }
    except Exception as e:
        return {"status": "error", "worker_id": worker_name, "error": str(e)}


async def dispatch_to_all_workers(task: dict) -> List[dict]:
    """Send task to all workers, return all results."""
    results = []
    for worker_name, worker_url in WORKERS.items():
        try:
            async with httpx.AsyncClient(timeout=60.0) as client:
                response = await client.post(worker_url, json=task)
                if response.status_code == 200:
                    results.append(
                        {
                            "worker_id": worker_name,
                            "result": response.json().get("result", {}),
                        }
                    )
        except Exception as e:
            results.append({"worker_id": worker_name, "error": str(e)})
    return results


def get_worker_urls() -> dict:
    """Get current worker URLs."""
    return WORKERS.copy()


def set_worker_url(name: str, url: str):
    """Update a worker URL (e.g., after starting ngrok)."""
    WORKERS[name] = url
    print(f"Updated worker '{name}' URL to: {url}")