|
|
| from __future__ import annotations
|
| from typing import Any, Dict, List, Optional
|
| import os, json, requests
|
| from tenacity import retry, stop_after_attempt, wait_exponential
|
|
|
| try:
|
| from gradio_client import Client as GradioClient
|
| except Exception:
|
| GradioClient = None
|
|
|
| class BaseRemoteClient:
|
| def __init__(self, base_url: str, use_gradio: bool = True, hf_token: Optional[str] = None, timeout: int = 180):
|
| self.base_url = base_url.rstrip("/")
|
| self.use_gradio = use_gradio and GradioClient is not None
|
| self.hf_token = hf_token or os.getenv("HF_TOKEN")
|
| self.timeout = timeout
|
| self._client = None
|
| if self.use_gradio:
|
| headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else None
|
| self._client = GradioClient(self.base_url, hf_token=self.hf_token, headers=headers)
|
|
|
| @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=8))
|
| def _post_json(self, route: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
| url = f"{self.base_url}{route}"
|
| headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else {}
|
| r = requests.post(url, json=payload, headers=headers, timeout=self.timeout)
|
| r.raise_for_status()
|
| return r.json()
|
|
|
| class InstructClient(BaseRemoteClient):
|
| def generate(self, prompt: str, system: Optional[str] = None, **kwargs) -> str:
|
| if self.use_gradio and self._client:
|
| out = self._client.predict(prompt, api_name="/predict")
|
| return str(out)
|
| data = {"prompt": prompt, "system": system, **kwargs}
|
| res = self._post_json("/generate", data)
|
| return res.get("text", "")
|
|
|
| class VisionClient(BaseRemoteClient):
|
| def describe(self, image_paths: List[str], context: Optional[Dict[str, Any]] = None, **kwargs) -> List[str]:
|
| if self.use_gradio and self._client:
|
| out = self._client.predict(image_paths, json.dumps(context or {}), api_name="/predict")
|
| if isinstance(out, str):
|
| try:
|
| return json.loads(out)
|
| except Exception:
|
| return [out]
|
| return list(out)
|
| data = {"images": image_paths, "context": context or {}, **kwargs}
|
| res = self._post_json("/describe", data)
|
| return res.get("descriptions", [])
|
|
|
| class ToolsClient(BaseRemoteClient):
|
| def chat(self, messages: List[Dict[str, str]], tools: Optional[List[Dict[str, Any]]] = None, **kwargs) -> Dict[str, Any]:
|
| if self.use_gradio and self._client:
|
| out = self._client.predict(json.dumps(messages), json.dumps(tools or []), api_name="/predict")
|
| if isinstance(out, str):
|
| try:
|
| return json.loads(out)
|
| except Exception:
|
| return {"text": out}
|
| return out
|
| data = {"messages": messages, "tools": tools or [], **kwargs}
|
| return self._post_json("/chat", data)
|
|
|
| class ASRClient(BaseRemoteClient):
|
| def transcribe(self, audio_path: str, **kwargs) -> Dict[str, Any]:
|
| if self.use_gradio and self._client:
|
| out = self._client.predict(audio_path, api_name="/predict")
|
| if isinstance(out, str):
|
| return {"text": out}
|
| return out
|
| files = {"file": open(audio_path, "rb")}
|
| headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else {}
|
| r = requests.post(f"{self.base_url}/transcribe", files=files, data=kwargs, headers=headers, timeout=self.timeout)
|
| r.raise_for_status()
|
| return r.json()
|
|
|