Agent-Q3 / hq /compute_router.py
madDegen's picture
consolidate: HQ ComputeRouter multi-backend weighted routing
216bfa1 verified
"""
Agent Q3 [HQ] — ComputeRouter
Weighted round-robin across Local Ollama → HuggingFace → OpenRouter → RunPod
Strategies: round_robin | local_first | hf_first | runpod_first | load_based
"""
import os, httpx, asyncio, random
from typing import Literal
STRATEGY = os.getenv("COMPUTE_STRATEGY", "round_robin")
BACKENDS = {
"local": {"url": "http://localhost:11434/v1/chat/completions", "weight": 0.40},
"hf": {"url": os.getenv("HF_ENDPOINT", ""), "weight": 0.25},
"openrouter":{"url": "https://openrouter.ai/api/v1/chat/completions", "weight": 0.25},
"runpod": {"url": os.getenv("RUNPOD_ENDPOINT", ""), "weight": 0.10},
}
MODEL_MAP = {
"reasoner": os.getenv("REASONER_MODEL", "gemma4:e4b-instruct-q4_K_M"),
"coder": os.getenv("CODER_MODEL", "qwen3.5:4b-instruct-q4_K_M"),
}
class ComputeRouter:
def __init__(self):
self.strategy = STRATEGY
self._queue_depths = {k: 0 for k in BACKENDS}
def _pick_backend(self) -> str:
if self.strategy == "local_first":
return "local"
if self.strategy == "hf_first":
return "hf"
if self.strategy == "runpod_first":
return "runpod"
if self.strategy == "load_based":
return min(self._queue_depths, key=self._queue_depths.get)
# round_robin weighted
backends = list(BACKENDS.keys())
weights = [BACKENDS[b]["weight"] for b in backends]
return random.choices(backends, weights=weights, k=1)[0]
async def route(self, messages: list, target: str = "reasoner") -> dict:
backend = self._pick_backend()
url = BACKENDS[backend]["url"]
model = MODEL_MAP.get(target, MODEL_MAP["reasoner"])
headers = {"Content-Type": "application/json"}
if backend == "openrouter":
headers["Authorization"] = f"Bearer {os.getenv('OPENROUTER_API_KEY', '')}"
model = "google/gemma-3-4b-it" if target == "reasoner" else "qwen/qwen-2.5-coder-7b-instruct"
elif backend == "hf":
headers["Authorization"] = f"Bearer {os.getenv('HF_TOKEN', '')}"
elif backend == "runpod":
headers["Authorization"] = f"Bearer {os.getenv('RUNPOD_API_KEY', '')}"
payload = {"model": model, "messages": messages}
self._queue_depths[backend] += 1
try:
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(url, json=payload, headers=headers)
r.raise_for_status()
return r.json()
except Exception as e:
# Fallback to local
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(BACKENDS["local"]["url"], json={"model": MODEL_MAP[target], "messages": messages})
return r.json()
finally:
self._queue_depths[backend] -= 1
def health(self) -> dict:
return {k: {"weight": v["weight"], "queue": self._queue_depths[k]} for k, v in BACKENDS.items()}