nexus-os-space / nexus_os_v2 /cloud_api_adapters.py
specimba's picture
Copy nexus_os_v2/cloud_api_adapters.py from dataset for module imports
3412619 verified
"""
Cloud API Adapters for NEXUS OS v2.1
Implements generation adapters for all cloud providers:
- DeepSeek (V4 Pro, V4 Flash)
- Qwen (3 Coder Next)
- Moonshot (Kimi K2.6)
- Zhipu (GLM 5.1)
- OpenAI (GPT-5, GPT-5.5)
- Anthropic (Claude 4.7, Claude 4.6)
Each adapter normalizes the provider-specific API to a common interface:
generate(prompt, max_tokens, temperature, system) -> (text, metadata)
API keys loaded from environment variables — NEVER committed to git.
"""
import os
import json
import time
from typing import Optional, Dict, Any, Tuple, List
from dataclasses import dataclass
@dataclass
class CloudResponse:
"""Normalized response from any cloud provider."""
text: str
model_used: str
tokens_input: int = 0
tokens_output: int = 0
latency_ms: float = 0.0
cost_cents: float = 0.0
finish_reason: str = "stop"
raw_metadata: Dict[str, Any] = None
class BaseCloudAdapter:
"""Base class for all cloud API adapters."""
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
def generate(
self,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
raise NotImplementedError
def is_available(self) -> bool:
return self.api_key is not None and len(self.api_key) > 10
class DeepSeekAdapter(BaseCloudAdapter):
"""
DeepSeek API adapter.
API key: DEEPSEEK_API_KEY
Base URL: https://api.deepseek.com/v1/chat/completions
Models: deepseek-chat (V4 Pro), deepseek-reasoner (V4 Flash)
"""
BASE_URL = "https://api.deepseek.com/v1/chat/completions"
def __init__(self, api_key: Optional[str] = None):
super().__init__(api_key or os.environ.get("DEEPSEEK_API_KEY"))
def generate(
self,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
import urllib.request
import urllib.error
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
payload = json.dumps({
"model": "deepseek-chat",
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": False,
}).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
elapsed = (time.time() - t0) * 1000
choice = data.get("choices", [{}])[0]
usage = data.get("usage", {})
return CloudResponse(
text=choice.get("message", {}).get("content", ""),
model_used=data.get("model", "unknown"),
tokens_input=usage.get("prompt_tokens", 0),
tokens_output=usage.get("completion_tokens", 0),
latency_ms=elapsed,
finish_reason=choice.get("finish_reason", "stop"),
raw_metadata=data,
)
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise RuntimeError(f"DeepSeek API error {e.code}: {error_body}")
class QwenAdapter(BaseCloudAdapter):
"""
Qwen (Alibaba Cloud) API adapter.
API key: QWEN_API_KEY
Base URL: https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation
Models: qwen-coder-plus-latest, qwen-max, etc.
"""
BASE_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
def __init__(self, api_key: Optional[str] = None):
super().__init__(api_key or os.environ.get("QWEN_API_KEY"))
def generate(
self,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
import urllib.request
import urllib.error
payload = json.dumps({
"model": "qwen-coder-plus-latest",
"input": {
"messages": [
*( [{"role": "system", "content": system}] if system else [] ),
{"role": "user", "content": prompt},
]
},
"parameters": {
"max_tokens": max_tokens,
"temperature": temperature,
},
}).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
elapsed = (time.time() - t0) * 1000
output = data.get("output", {})
usage = data.get("usage", {})
return CloudResponse(
text=output.get("text", ""),
model_used=output.get("model_id", "unknown"),
tokens_input=usage.get("input_tokens", 0),
tokens_output=usage.get("output_tokens", 0),
latency_ms=elapsed,
raw_metadata=data,
)
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise RuntimeError(f"Qwen API error {e.code}: {error_body}")
class KimiAdapter(BaseCloudAdapter):
"""
Moonshot AI (Kimi) API adapter.
API key: KIMI_API_KEY
Base URL: https://api.moonshot.cn/v1/chat/completions
Models: moonshot-v1-32k, moonshot-v1-128k
"""
BASE_URL = "https://api.moonshot.cn/v1/chat/completions"
def __init__(self, api_key: Optional[str] = None):
super().__init__(api_key or os.environ.get("KIMI_API_KEY"))
def generate(
self,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
import urllib.request
import urllib.error
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
payload = json.dumps({
"model": "moonshot-v1-128k",
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
}).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
elapsed = (time.time() - t0) * 1000
choice = data.get("choices", [{}])[0]
usage = data.get("usage", {})
return CloudResponse(
text=choice.get("message", {}).get("content", ""),
model_used=data.get("model", "unknown"),
tokens_input=usage.get("prompt_tokens", 0),
tokens_output=usage.get("completion_tokens", 0),
latency_ms=elapsed,
finish_reason=choice.get("finish_reason", "stop"),
raw_metadata=data,
)
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise RuntimeError(f"Kimi API error {e.code}: {error_body}")
class GLMAdapter(BaseCloudAdapter):
"""
Zhipu AI (GLM) API adapter.
API key: GLM_API_KEY
Base URL: https://open.bigmodel.cn/api/paas/v4/chat/completions
Models: glm-4-plus, glm-4-flash
"""
BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
def __init__(self, api_key: Optional[str] = None):
super().__init__(api_key or os.environ.get("GLM_API_KEY"))
def generate(
self,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
import urllib.request
import urllib.error
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
payload = json.dumps({
"model": "glm-4-plus",
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
}).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
elapsed = (time.time() - t0) * 1000
choice = data.get("choices", [{}])[0]
usage = data.get("usage", {})
return CloudResponse(
text=choice.get("message", {}).get("content", ""),
model_used=data.get("model", "unknown"),
tokens_input=usage.get("prompt_tokens", 0),
tokens_output=usage.get("completion_tokens", 0),
latency_ms=elapsed,
finish_reason=choice.get("finish_reason", "stop"),
raw_metadata=data,
)
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise RuntimeError(f"GLM API error {e.code}: {error_body}")
class OpenAIAdapter(BaseCloudAdapter):
"""
OpenAI API adapter (GPT-5, GPT-5.5).
API key: OPENAI_API_KEY
Base URL: https://api.openai.com/v1/chat/completions
Models: gpt-5, gpt-5-turbo, gpt-5.5
"""
BASE_URL = "https://api.openai.com/v1/chat/completions"
def __init__(self, api_key: Optional[str] = None):
super().__init__(api_key or os.environ.get("OPENAI_API_KEY"))
def generate(
self,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
import urllib.request
import urllib.error
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
payload = json.dumps({
"model": "gpt-5",
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
}).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
elapsed = (time.time() - t0) * 1000
choice = data.get("choices", [{}])[0]
usage = data.get("usage", {})
return CloudResponse(
text=choice.get("message", {}).get("content", ""),
model_used=data.get("model", "unknown"),
tokens_input=usage.get("prompt_tokens", 0),
tokens_output=usage.get("completion_tokens", 0),
latency_ms=elapsed,
finish_reason=choice.get("finish_reason", "stop"),
raw_metadata=data,
)
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise RuntimeError(f"OpenAI API error {e.code}: {error_body}")
class ClaudeAdapter(BaseCloudAdapter):
"""
Anthropic Claude API adapter.
API key: ANTHROPIC_API_KEY
Base URL: https://api.anthropic.com/v1/messages
Models: claude-sonnet-4-7, claude-opus-4-7
"""
BASE_URL = "https://api.anthropic.com/v1/messages"
def __init__(self, api_key: Optional[str] = None):
super().__init__(api_key or os.environ.get("ANTHROPIC_API_KEY"))
def generate(
self,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
import urllib.request
import urllib.error
body = {
"model": "claude-sonnet-4-7-20251001",
"max_tokens": max_tokens,
"temperature": temperature,
"messages": [{"role": "user", "content": prompt}],
}
if system:
body["system"] = system
payload = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
self.BASE_URL,
data=payload,
headers={
"Content-Type": "application/json",
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode("utf-8"))
elapsed = (time.time() - t0) * 1000
content = data.get("content", [{}])[0]
usage = data.get("usage", {})
return CloudResponse(
text=content.get("text", ""),
model_used=data.get("model", "unknown"),
tokens_input=usage.get("input_tokens", 0),
tokens_output=usage.get("output_tokens", 0),
latency_ms=elapsed,
finish_reason=data.get("stop_reason", "stop"),
raw_metadata=data,
)
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8")
raise RuntimeError(f"Claude API error {e.code}: {error_body}")
class CloudAPIManager:
"""
Unified manager for all cloud API adapters.
Selects provider based on model_id or profile.
"""
ADAPTERS = {
"deepseek": DeepSeekAdapter,
"qwen": QwenAdapter,
"kimi": KimiAdapter,
"glm": GLMAdapter,
"openai": OpenAIAdapter,
"claude": ClaudeAdapter,
}
def __init__(self):
self._adapters: Dict[str, BaseCloudAdapter] = {}
for name, cls in self.ADAPTERS.items():
adapter = cls()
if adapter.is_available():
self._adapters[name] = adapter
def get_adapter(self, model_family: str) -> Optional[BaseCloudAdapter]:
"""Get adapter by model family name."""
family_lower = model_family.lower()
for name, adapter in self._adapters.items():
if name in family_lower or family_lower in name:
return adapter
return None
def generate(
self,
model_family: str,
prompt: str,
max_tokens: int = 2048,
temperature: float = 0.7,
system: Optional[str] = None,
) -> CloudResponse:
"""Generate via the appropriate cloud adapter."""
adapter = self.get_adapter(model_family)
if not adapter:
available = list(self._adapters.keys())
raise RuntimeError(
f"No cloud adapter available for '{model_family}'. "
f"Available: {available}. Set API key env var."
)
return adapter.generate(prompt, max_tokens, temperature, system)
def list_available(self) -> List[str]:
"""List available cloud providers (API keys configured)."""
return list(self._adapters.keys())
def is_available(self, model_family: str) -> bool:
return self.get_adapter(model_family) is not None