""" Cloud API Adapters for NEXUS OS v2.1 Implements generation adapters for all cloud providers: - DeepSeek (V4 Pro, V4 Flash) - Qwen (3 Coder Next) - Moonshot (Kimi K2.6) - Zhipu (GLM 5.1) - OpenAI (GPT-5, GPT-5.5) - Anthropic (Claude 4.7, Claude 4.6) Each adapter normalizes the provider-specific API to a common interface: generate(prompt, max_tokens, temperature, system) -> (text, metadata) API keys loaded from environment variables — NEVER committed to git. """ import os import json import time from typing import Optional, Dict, Any, Tuple, List from dataclasses import dataclass @dataclass class CloudResponse: """Normalized response from any cloud provider.""" text: str model_used: str tokens_input: int = 0 tokens_output: int = 0 latency_ms: float = 0.0 cost_cents: float = 0.0 finish_reason: str = "stop" raw_metadata: Dict[str, Any] = None class BaseCloudAdapter: """Base class for all cloud API adapters.""" def __init__(self, api_key: Optional[str] = None): self.api_key = api_key def generate( self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: raise NotImplementedError def is_available(self) -> bool: return self.api_key is not None and len(self.api_key) > 10 class DeepSeekAdapter(BaseCloudAdapter): """ DeepSeek API adapter. API key: DEEPSEEK_API_KEY Base URL: https://api.deepseek.com/v1/chat/completions Models: deepseek-chat (V4 Pro), deepseek-reasoner (V4 Flash) """ BASE_URL = "https://api.deepseek.com/v1/chat/completions" def __init__(self, api_key: Optional[str] = None): super().__init__(api_key or os.environ.get("DEEPSEEK_API_KEY")) def generate( self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: import urllib.request import urllib.error messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) payload = json.dumps({ "model": "deepseek-chat", "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": False, }).encode("utf-8") req = urllib.request.Request( self.BASE_URL, data=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", }, method="POST", ) t0 = time.time() try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read().decode("utf-8")) elapsed = (time.time() - t0) * 1000 choice = data.get("choices", [{}])[0] usage = data.get("usage", {}) return CloudResponse( text=choice.get("message", {}).get("content", ""), model_used=data.get("model", "unknown"), tokens_input=usage.get("prompt_tokens", 0), tokens_output=usage.get("completion_tokens", 0), latency_ms=elapsed, finish_reason=choice.get("finish_reason", "stop"), raw_metadata=data, ) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") raise RuntimeError(f"DeepSeek API error {e.code}: {error_body}") class QwenAdapter(BaseCloudAdapter): """ Qwen (Alibaba Cloud) API adapter. API key: QWEN_API_KEY Base URL: https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation Models: qwen-coder-plus-latest, qwen-max, etc. """ BASE_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" def __init__(self, api_key: Optional[str] = None): super().__init__(api_key or os.environ.get("QWEN_API_KEY")) def generate( self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: import urllib.request import urllib.error payload = json.dumps({ "model": "qwen-coder-plus-latest", "input": { "messages": [ *( [{"role": "system", "content": system}] if system else [] ), {"role": "user", "content": prompt}, ] }, "parameters": { "max_tokens": max_tokens, "temperature": temperature, }, }).encode("utf-8") req = urllib.request.Request( self.BASE_URL, data=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", }, method="POST", ) t0 = time.time() try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read().decode("utf-8")) elapsed = (time.time() - t0) * 1000 output = data.get("output", {}) usage = data.get("usage", {}) return CloudResponse( text=output.get("text", ""), model_used=output.get("model_id", "unknown"), tokens_input=usage.get("input_tokens", 0), tokens_output=usage.get("output_tokens", 0), latency_ms=elapsed, raw_metadata=data, ) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") raise RuntimeError(f"Qwen API error {e.code}: {error_body}") class KimiAdapter(BaseCloudAdapter): """ Moonshot AI (Kimi) API adapter. API key: KIMI_API_KEY Base URL: https://api.moonshot.cn/v1/chat/completions Models: moonshot-v1-32k, moonshot-v1-128k """ BASE_URL = "https://api.moonshot.cn/v1/chat/completions" def __init__(self, api_key: Optional[str] = None): super().__init__(api_key or os.environ.get("KIMI_API_KEY")) def generate( self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: import urllib.request import urllib.error messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) payload = json.dumps({ "model": "moonshot-v1-128k", "messages": messages, "max_tokens": max_tokens, "temperature": temperature, }).encode("utf-8") req = urllib.request.Request( self.BASE_URL, data=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", }, method="POST", ) t0 = time.time() try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read().decode("utf-8")) elapsed = (time.time() - t0) * 1000 choice = data.get("choices", [{}])[0] usage = data.get("usage", {}) return CloudResponse( text=choice.get("message", {}).get("content", ""), model_used=data.get("model", "unknown"), tokens_input=usage.get("prompt_tokens", 0), tokens_output=usage.get("completion_tokens", 0), latency_ms=elapsed, finish_reason=choice.get("finish_reason", "stop"), raw_metadata=data, ) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") raise RuntimeError(f"Kimi API error {e.code}: {error_body}") class GLMAdapter(BaseCloudAdapter): """ Zhipu AI (GLM) API adapter. API key: GLM_API_KEY Base URL: https://open.bigmodel.cn/api/paas/v4/chat/completions Models: glm-4-plus, glm-4-flash """ BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions" def __init__(self, api_key: Optional[str] = None): super().__init__(api_key or os.environ.get("GLM_API_KEY")) def generate( self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: import urllib.request import urllib.error messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) payload = json.dumps({ "model": "glm-4-plus", "messages": messages, "max_tokens": max_tokens, "temperature": temperature, }).encode("utf-8") req = urllib.request.Request( self.BASE_URL, data=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", }, method="POST", ) t0 = time.time() try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read().decode("utf-8")) elapsed = (time.time() - t0) * 1000 choice = data.get("choices", [{}])[0] usage = data.get("usage", {}) return CloudResponse( text=choice.get("message", {}).get("content", ""), model_used=data.get("model", "unknown"), tokens_input=usage.get("prompt_tokens", 0), tokens_output=usage.get("completion_tokens", 0), latency_ms=elapsed, finish_reason=choice.get("finish_reason", "stop"), raw_metadata=data, ) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") raise RuntimeError(f"GLM API error {e.code}: {error_body}") class OpenAIAdapter(BaseCloudAdapter): """ OpenAI API adapter (GPT-5, GPT-5.5). API key: OPENAI_API_KEY Base URL: https://api.openai.com/v1/chat/completions Models: gpt-5, gpt-5-turbo, gpt-5.5 """ BASE_URL = "https://api.openai.com/v1/chat/completions" def __init__(self, api_key: Optional[str] = None): super().__init__(api_key or os.environ.get("OPENAI_API_KEY")) def generate( self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: import urllib.request import urllib.error messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) payload = json.dumps({ "model": "gpt-5", "messages": messages, "max_tokens": max_tokens, "temperature": temperature, }).encode("utf-8") req = urllib.request.Request( self.BASE_URL, data=payload, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", }, method="POST", ) t0 = time.time() try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read().decode("utf-8")) elapsed = (time.time() - t0) * 1000 choice = data.get("choices", [{}])[0] usage = data.get("usage", {}) return CloudResponse( text=choice.get("message", {}).get("content", ""), model_used=data.get("model", "unknown"), tokens_input=usage.get("prompt_tokens", 0), tokens_output=usage.get("completion_tokens", 0), latency_ms=elapsed, finish_reason=choice.get("finish_reason", "stop"), raw_metadata=data, ) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") raise RuntimeError(f"OpenAI API error {e.code}: {error_body}") class ClaudeAdapter(BaseCloudAdapter): """ Anthropic Claude API adapter. API key: ANTHROPIC_API_KEY Base URL: https://api.anthropic.com/v1/messages Models: claude-sonnet-4-7, claude-opus-4-7 """ BASE_URL = "https://api.anthropic.com/v1/messages" def __init__(self, api_key: Optional[str] = None): super().__init__(api_key or os.environ.get("ANTHROPIC_API_KEY")) def generate( self, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: import urllib.request import urllib.error body = { "model": "claude-sonnet-4-7-20251001", "max_tokens": max_tokens, "temperature": temperature, "messages": [{"role": "user", "content": prompt}], } if system: body["system"] = system payload = json.dumps(body).encode("utf-8") req = urllib.request.Request( self.BASE_URL, data=payload, headers={ "Content-Type": "application/json", "x-api-key": self.api_key, "anthropic-version": "2023-06-01", }, method="POST", ) t0 = time.time() try: with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read().decode("utf-8")) elapsed = (time.time() - t0) * 1000 content = data.get("content", [{}])[0] usage = data.get("usage", {}) return CloudResponse( text=content.get("text", ""), model_used=data.get("model", "unknown"), tokens_input=usage.get("input_tokens", 0), tokens_output=usage.get("output_tokens", 0), latency_ms=elapsed, finish_reason=data.get("stop_reason", "stop"), raw_metadata=data, ) except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") raise RuntimeError(f"Claude API error {e.code}: {error_body}") class CloudAPIManager: """ Unified manager for all cloud API adapters. Selects provider based on model_id or profile. """ ADAPTERS = { "deepseek": DeepSeekAdapter, "qwen": QwenAdapter, "kimi": KimiAdapter, "glm": GLMAdapter, "openai": OpenAIAdapter, "claude": ClaudeAdapter, } def __init__(self): self._adapters: Dict[str, BaseCloudAdapter] = {} for name, cls in self.ADAPTERS.items(): adapter = cls() if adapter.is_available(): self._adapters[name] = adapter def get_adapter(self, model_family: str) -> Optional[BaseCloudAdapter]: """Get adapter by model family name.""" family_lower = model_family.lower() for name, adapter in self._adapters.items(): if name in family_lower or family_lower in name: return adapter return None def generate( self, model_family: str, prompt: str, max_tokens: int = 2048, temperature: float = 0.7, system: Optional[str] = None, ) -> CloudResponse: """Generate via the appropriate cloud adapter.""" adapter = self.get_adapter(model_family) if not adapter: available = list(self._adapters.keys()) raise RuntimeError( f"No cloud adapter available for '{model_family}'. " f"Available: {available}. Set API key env var." ) return adapter.generate(prompt, max_tokens, temperature, system) def list_available(self) -> List[str]: """List available cloud providers (API keys configured).""" return list(self._adapters.keys()) def is_available(self, model_family: str) -> bool: return self.get_adapter(model_family) is not None