Spaces:
Runtime error
Runtime error
Ashira Pitchayapakayakul
feat: migrate $HOME/.claude/* to $HOME/.surrogate/* (clean separation from Claude Code)
e36381e | """Claude Max plan OAuth client. | |
| Handles: | |
| - Read OAuth token from macOS keychain (`Claude Code-credentials`) | |
| - Auto-refresh before expiry (lazy, on API call) | |
| - Call Anthropic `/v1/messages` with OAuth Bearer | |
| - Parse `anthropic-ratelimit-*` headers β quota state | |
| - Cache quota state (5-min TTL) to avoid probing too often | |
| Quota model (verified 2026-04-19): | |
| Max plan uses UNIFIED pool β Opus + Sonnet share quota. | |
| Haiku has separate pool (confirmed via live probe). | |
| 5-hour window + 7-day window, both monitored. | |
| Headers (from live response): | |
| anthropic-ratelimit-unified-5h-status: allowed|rate_limited | |
| anthropic-ratelimit-unified-5h-reset: <unix-ts> | |
| anthropic-ratelimit-unified-5h-utilization: 0.0-1.0 | |
| anthropic-ratelimit-unified-7d-status | |
| anthropic-ratelimit-unified-7d-reset | |
| anthropic-ratelimit-unified-7d-utilization | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import subprocess | |
| import time | |
| import urllib.error | |
| import urllib.request | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| KEYCHAIN_SERVICE = "Claude Code-credentials" | |
| OAUTH_REFRESH_URL = "https://claude.ai/v1/oauth/token" | |
| OAUTH_CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" | |
| ANTHROPIC_API = "https://api.anthropic.com/v1/messages" | |
| ANTHROPIC_BETA = "oauth-2025-04-20" | |
| ANTHROPIC_VERSION = "2023-06-01" | |
| QUOTA_CACHE_PATH = Path.home() / ".surrogate" / "yolo" / "max-quota.json" | |
| QUOTA_CACHE_TTL = 300 # 5 minutes | |
| # --- Model IDs (from live probe 2026-04-19) --- | |
| MODEL_OPUS = "claude-opus-4-20250514" | |
| MODEL_SONNET = "claude-sonnet-4-20250514" | |
| MODEL_HAIKU = "claude-haiku-4-5-20251001" | |
| class QuotaState: | |
| """Rate-limit state parsed from response headers.""" | |
| model: str | |
| status: str = "unknown" # allowed | rate_limited | unknown | |
| reset_at: int = 0 # unix timestamp when window resets | |
| utilization_5h: float = 0.0 | |
| utilization_7d: float = 0.0 | |
| last_checked: float = 0.0 # unix seconds | |
| last_error: str = "" | |
| def available(self) -> bool: | |
| return self.status == "allowed" | |
| def seconds_until_reset(self) -> int: | |
| return max(0, int(self.reset_at - time.time())) | |
| class MaxResponse: | |
| """Successful response from Max plan.""" | |
| content: str | |
| model_requested: str | |
| model_served: str | |
| input_tokens: int | |
| output_tokens: int | |
| quota: QuotaState = field(default_factory=lambda: QuotaState(model="")) | |
| class MaxUnavailable(Exception): | |
| """Raised when Max plan cannot serve the request (429 or auth).""" | |
| def __init__(self, model: str, reset_at: int = 0, msg: str = ""): | |
| self.model = model | |
| self.reset_at = reset_at | |
| self.msg = msg | |
| super().__init__(f"Max {model} unavailable: {msg} (reset in {max(0, reset_at - int(time.time()))}s)") | |
| class MaxAuthError(Exception): | |
| """Raised when OAuth token refresh fails permanently β needs relogin.""" | |
| # ---------------------------------------------------------------------- | |
| # Keychain I/O | |
| # ---------------------------------------------------------------------- | |
| def read_token() -> dict: | |
| """Read full credential blob from keychain.""" | |
| try: | |
| raw = subprocess.check_output( | |
| ["security", "find-generic-password", "-s", KEYCHAIN_SERVICE, "-w"], | |
| stderr=subprocess.DEVNULL, | |
| ).decode().strip() | |
| return json.loads(raw) | |
| except subprocess.CalledProcessError: | |
| raise MaxAuthError(f"Keychain entry '{KEYCHAIN_SERVICE}' not found β run `claude` to login") | |
| except json.JSONDecodeError as e: | |
| raise MaxAuthError(f"Invalid JSON in keychain: {e}") | |
| def write_token(cred: dict) -> None: | |
| """Atomically replace keychain entry.""" | |
| body = json.dumps(cred) | |
| subprocess.run( | |
| ["security", "delete-generic-password", "-s", KEYCHAIN_SERVICE], | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| subprocess.run( | |
| ["security", "add-generic-password", | |
| "-s", KEYCHAIN_SERVICE, | |
| "-a", os.environ.get("USER", "Ashira"), | |
| "-w", body, | |
| "-U"], | |
| check=True, | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| # ---------------------------------------------------------------------- | |
| # OAuth refresh | |
| # ---------------------------------------------------------------------- | |
| def refresh_if_needed(cred: dict, buffer_seconds: int = 120) -> dict: | |
| """Refresh access token if expiring in <buffer_seconds. Writes back to keychain.""" | |
| oa = cred["claudeAiOauth"] | |
| expires_at = oa["expiresAt"] / 1000 | |
| if time.time() + buffer_seconds < expires_at: | |
| return cred # still fresh | |
| # Refresh | |
| req = urllib.request.Request( | |
| OAUTH_REFRESH_URL, | |
| data=json.dumps({ | |
| "grant_type": "refresh_token", | |
| "refresh_token": oa["refreshToken"], | |
| "client_id": OAUTH_CLIENT_ID, | |
| }).encode(), | |
| headers={"content-type": "application/json"}, | |
| method="POST", | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=15) as r: | |
| new = json.loads(r.read()) | |
| except urllib.error.HTTPError as e: | |
| raise MaxAuthError( | |
| f"OAuth refresh failed ({e.code}): {e.read().decode()[:200]}. " | |
| "Run `claude` in a new terminal to re-login." | |
| ) | |
| oa["accessToken"] = new["access_token"] | |
| oa["refreshToken"] = new["refresh_token"] | |
| oa["expiresAt"] = int((time.time() + new["expires_in"]) * 1000) | |
| write_token(cred) | |
| return cred | |
| # ---------------------------------------------------------------------- | |
| # Quota cache | |
| # ---------------------------------------------------------------------- | |
| def load_quota_cache() -> dict[str, QuotaState]: | |
| """Load cached quota state (per model).""" | |
| if not QUOTA_CACHE_PATH.exists(): | |
| return {} | |
| try: | |
| raw = json.loads(QUOTA_CACHE_PATH.read_text()) | |
| return {k: QuotaState(**v) for k, v in raw.items()} | |
| except (json.JSONDecodeError, TypeError): | |
| return {} | |
| def save_quota_cache(cache: dict[str, QuotaState]) -> None: | |
| QUOTA_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| data = {k: v.__dict__ for k, v in cache.items()} | |
| QUOTA_CACHE_PATH.write_text(json.dumps(data, indent=2)) | |
| def parse_quota_headers(model: str, headers: dict[str, str]) -> QuotaState: | |
| """Parse anthropic-ratelimit-* headers into QuotaState.""" | |
| h = {k.lower(): v for k, v in headers.items()} | |
| def fget(key: str, default: float = 0.0) -> float: | |
| try: | |
| return float(h.get(key, default)) | |
| except (ValueError, TypeError): | |
| return default | |
| def iget(key: str, default: int = 0) -> int: | |
| try: | |
| return int(float(h.get(key, default))) | |
| except (ValueError, TypeError): | |
| return default | |
| status = h.get("anthropic-ratelimit-unified-5h-status", "unknown") | |
| reset_5h = iget("anthropic-ratelimit-unified-5h-reset") | |
| reset_7d = iget("anthropic-ratelimit-unified-7d-reset") | |
| return QuotaState( | |
| model=model, | |
| status=status, | |
| reset_at=max(reset_5h, reset_7d) if reset_5h and reset_7d else reset_5h or reset_7d, | |
| utilization_5h=fget("anthropic-ratelimit-unified-5h-utilization"), | |
| utilization_7d=fget("anthropic-ratelimit-unified-7d-utilization"), | |
| last_checked=time.time(), | |
| ) | |
| # ---------------------------------------------------------------------- | |
| # Call Anthropic via Max OAuth | |
| # ---------------------------------------------------------------------- | |
| def call_max( | |
| model: str, | |
| messages: list[dict], | |
| max_tokens: int = 4096, | |
| system: Optional[str] = None, | |
| timeout: int = 180, | |
| ) -> MaxResponse: | |
| """Make a Max-plan OAuth call. Raises MaxUnavailable on 429.""" | |
| cred = refresh_if_needed(read_token()) | |
| token = cred["claudeAiOauth"]["accessToken"] | |
| body: dict[str, Any] = { | |
| "model": model, | |
| "max_tokens": max_tokens, | |
| "messages": messages, | |
| } | |
| if system: | |
| body["system"] = system | |
| req = urllib.request.Request( | |
| ANTHROPIC_API, | |
| data=json.dumps(body).encode(), | |
| headers={ | |
| "Authorization": f"Bearer {token}", | |
| "anthropic-version": ANTHROPIC_VERSION, | |
| "anthropic-beta": ANTHROPIC_BETA, | |
| "content-type": "application/json", | |
| }, | |
| method="POST", | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=timeout) as r: | |
| data = json.loads(r.read()) | |
| quota = parse_quota_headers(model, dict(r.getheaders())) | |
| _update_cache(quota) | |
| return MaxResponse( | |
| content=data["content"][0]["text"], | |
| model_requested=model, | |
| model_served=data.get("model", model), | |
| input_tokens=data["usage"]["input_tokens"], | |
| output_tokens=data["usage"]["output_tokens"], | |
| quota=quota, | |
| ) | |
| except urllib.error.HTTPError as e: | |
| err_body = e.read().decode() | |
| headers = dict(e.headers) | |
| quota = parse_quota_headers(model, headers) | |
| # Override: 429 always means rate_limited regardless of header contents | |
| quota.status = "rate_limited" if e.code == 429 else "error" | |
| quota.last_error = f"HTTP {e.code}: {err_body[:200]}" | |
| # If 429 but no reset header, set a safe cooldown (5 min) so pick_max_model skips | |
| if e.code == 429 and quota.reset_at <= time.time(): | |
| quota.reset_at = int(time.time() + 300) | |
| _update_cache(quota) | |
| if e.code == 429: | |
| raise MaxUnavailable(model, quota.reset_at, err_body) | |
| if e.code == 401: | |
| raise MaxAuthError(f"Max auth failed ({e.code}) β relogin needed") | |
| raise MaxUnavailable(model, 0, f"HTTP {e.code}: {err_body[:200]}") | |
| def _update_cache(quota: QuotaState) -> None: | |
| cache = load_quota_cache() | |
| cache[quota.model] = quota | |
| save_quota_cache(cache) | |
| # ---------------------------------------------------------------------- | |
| # Tier selection | |
| # ---------------------------------------------------------------------- | |
| MAX_TIER_ORDER = [MODEL_OPUS, MODEL_SONNET, MODEL_HAIKU] | |
| def pick_max_model(prefer: str = MODEL_OPUS) -> Optional[str]: | |
| """Pick best available Max-plan model. | |
| Strategy: | |
| 1. If cache status=allowed AND fresh (< TTL) β use it immediately | |
| 2. If cache stale (> TTL) β eligible to re-probe (real probe will confirm) | |
| 3. If cache rate_limited: | |
| - If reset_at > 0 AND reset_at still in future β NOT eligible (honor cooldown) | |
| - Only eligible when reset_at passed + cache went stale | |
| 4. Walk Opus β Sonnet β Haiku; use first eligible | |
| Returns model name or None if all rate-limited within cooldown. | |
| """ | |
| cache = load_quota_cache() | |
| now = time.time() | |
| def eligible(model: str) -> bool: | |
| q = cache.get(model) | |
| if not q: | |
| return True # unknown β worth one probe | |
| # Fresh + allowed | |
| if q.status == "allowed" and now - q.last_checked <= QUOTA_CACHE_TTL: | |
| return True | |
| # Rate-limited + still within cooldown window β skip | |
| if q.status == "rate_limited" and q.reset_at > now: | |
| return False | |
| # Stale (either status) + no active cooldown β re-probe OK | |
| if now - q.last_checked > QUOTA_CACHE_TTL: | |
| return True | |
| # Rate-limited but reset_at is 0 or in past β try again cautiously | |
| if q.status == "rate_limited" and q.reset_at <= now: | |
| return now - q.last_checked > 30 # wait 30s between retries | |
| return False | |
| order = [prefer] + [m for m in MAX_TIER_ORDER if m != prefer] | |
| for model in order: | |
| if eligible(model): | |
| return model | |
| return None | |
| def probe_and_refresh_cache() -> dict[str, QuotaState]: | |
| """Send minimal probes to each tier to refresh cache. Called every 5 min.""" | |
| out: dict[str, QuotaState] = {} | |
| for model in MAX_TIER_ORDER: | |
| try: | |
| resp = call_max(model, [{"role": "user", "content": "."}], max_tokens=5) | |
| out[model] = resp.quota | |
| except MaxUnavailable as e: | |
| # already cached in _update_cache | |
| cache = load_quota_cache() | |
| out[model] = cache.get(model, QuotaState(model=model, status="rate_limited", | |
| reset_at=e.reset_at)) | |
| except MaxAuthError: | |
| raise | |
| return out | |
| if __name__ == "__main__": | |
| # CLI self-test | |
| import sys | |
| if len(sys.argv) > 1 and sys.argv[1] == "probe": | |
| for model, q in probe_and_refresh_cache().items(): | |
| print(f"{model}: {q.status} util5h={q.utilization_5h:.2f} " | |
| f"reset_in={q.seconds_until_reset}s") | |
| elif len(sys.argv) > 1 and sys.argv[1] == "pick": | |
| print(pick_max_model() or "NONE_AVAILABLE") | |
| else: | |
| # quick call | |
| m = pick_max_model() or MODEL_HAIKU | |
| r = call_max(m, [{"role": "user", "content": sys.argv[1] if len(sys.argv) > 1 else "hi"}], max_tokens=50) | |
| print(f"[{r.model_served}] {r.content[:200]}") | |