""" Claude Web API Proxy - HuggingFace Spaces with ReAct """ from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import httpx import json import asyncio from datetime import datetime from typing import Optional, Dict import os import uuid import re import pickle from pathlib import Path import logging # 日志配置 logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') logger = logging.getLogger(__name__) # 重试函数 async def retry_async(fn, max_retries=3, delay=1.0): for i in range(max_retries): try: return await fn() except Exception as e: if i == max_retries - 1: raise logger.warning(f"Retry {i+1}/{max_retries}: {e}") await asyncio.sleep(delay * (i + 1)) # 文件持久化 CACHE_FILE = Path("/tmp/conversation_cache.pkl") conversation_cache: Dict[str, str] = {} # 请求记录(内存) request_log: list = [] def load_cache(): global conversation_cache if CACHE_FILE.exists(): try: with open(CACHE_FILE, 'rb') as f: conversation_cache = pickle.load(f) except: conversation_cache = {} def save_cache(): try: with open(CACHE_FILE, 'wb') as f: pickle.dump(conversation_cache, f) except: pass # 智能保活 last_request_time = datetime.now() async def keep_alive(): while True: idle_time = (datetime.now() - last_request_time).total_seconds() if idle_time < 3600: await asyncio.sleep(1800) else: await asyncio.sleep(6 * 3600) try: async with httpx.AsyncClient() as client: url = os.getenv("SPACE_URL", "") if url: await client.get(f"{url}/health") except: pass @asynccontextmanager async def lifespan(app: FastAPI): load_cache() logger.info("Application started") yield logger.info("Application shutdown") app = FastAPI(title="Claude Proxy", lifespan=lifespan) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) @app.get("/clear_cache") async def clear_cache(): global conversation_cache conversation_cache = {} if CACHE_FILE.exists(): CACHE_FILE.unlink() return {"status": "cache cleared"} @app.get("/logs") async def get_logs(): """获取最近的日志""" try: import subprocess result = subprocess.run(['tail', '-n', '100', '/proc/1/fd/1'], capture_output=True, text=True, timeout=5) return {"logs": result.stdout} except: return {"error": "Cannot read logs"} @app.get("/reqlog") async def req_log(): """获取请求记录,用于排查哪些客户端发了请求""" return {"count": len(request_log), "requests": request_log} @app.get("/health") async def health(): return {"status": "ok", "time": datetime.now().isoformat()} @app.get("/") async def root(): return {"name": "Claude Proxy", "version": "2.0-hf-react"} async def get_org_id(key: str) -> Optional[str]: logger.info("[3] Space → Claude REQUEST: GET /api/organizations") headers = { 'Cookie': f'sessionKey={key[:20]}...', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://claude.ai/chats', 'Origin': 'https://claude.ai', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin' } async with httpx.AsyncClient() as client: r = await client.get('https://claude.ai/api/organizations', headers={**headers, 'Cookie': f'sessionKey={key}'}, timeout=30.0) logger.info(f"[4] Claude → Space RESPONSE: status={r.status_code}, body={r.text[:200]}") if r.status_code == 200: data = r.json() return data[0]['uuid'] if data else None else: raise Exception(f"Claude.ai returned {r.status_code}") async def create_conv(key: str, org_id: str) -> Optional[str]: conv_id = str(uuid.uuid4()) headers = { 'Content-Type': 'application/json', 'Cookie': f'sessionKey={key}', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'application/json', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://claude.ai/chats', 'Origin': 'https://claude.ai', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin' } async with httpx.AsyncClient() as client: r = await client.post(f'https://claude.ai/api/organizations/{org_id}/chat_conversations', headers=headers, json={ 'uuid': conv_id, 'name': '', 'include_conversation_preferences': True, 'is_temporary': False, 'enabled_imagine': False }, timeout=30.0) logger.info(f"create_conv response: status={r.status_code}, body={r.text[:500]}") if r.status_code in [200, 201]: return conv_id else: raise Exception(f"Failed to create conversation: {r.status_code}") def extract_prompt(messages): result = [] for m in messages: role = m['role'].title() content = m['content'] # 支持图片 if isinstance(content, list): parts = [] for part in content: if part.get('type') == 'text': parts.append(part['text']) elif part.get('type') == 'image_url': parts.append(f"[Image: {part['image_url']['url']}]") result.append(f"{role}: {' '.join(parts)}") else: result.append(f"{role}: {content}") return '\n\n'.join(result) def extract_attachments(messages): attachments = [] for m in messages: if isinstance(m['content'], list): for part in m['content']: if part.get('type') == 'image_url': attachments.append({ 'extracted_content': '', 'file_name': 'image.png', 'file_size': 0, 'file_type': 'image/png', 'url': part['image_url']['url'] }) return attachments def enhance_prompt(prompt: str, messages: list) -> str: has_system = any(m['role'] == 'system' for m in messages) if not has_system: return prompt return prompt + "\n\n[Environment: OpenClaw | Tools: web_search, artifacts, repl available]" def inject_react_prompt(prompt: str, tools: list) -> str: if not tools: return prompt tool_defs = '\n\n'.join([f"{t.get('name', 'unknown')}: {t.get('description', '')}" for t in tools]) return prompt + f""" [Tool Calling Protocol] You have access to these tools: {tool_defs} Use this format: Thought: [reasoning] Action: [tool_name] Action Input: [JSON] After result: Observation: [result] When done: Final Answer: [response] """ def parse_react_output(text: str) -> Optional[dict]: action_match = re.search(r'Action:\s*(\w+)', text) input_match = re.search(r'Action Input:\s*(\{[^}]*\})', text) if action_match and input_match: try: return {'type': 'tool_use', 'id': f'tool_{int(datetime.now().timestamp())}', 'name': action_match.group(1), 'input': json.loads(input_match.group(1))} except: return None return None @app.post("/v1/chat/completions") @app.post("/chat/completions") async def chat(request: Request): global last_request_time last_request_time = datetime.now() start_time = datetime.now() try: body = await request.json() auth = request.headers.get('Authorization', '') # 记录请求(用于排查客户端是否真正到达Space) client_ip = request.client.host if request.client else "unknown" user_agent = request.headers.get('User-Agent', 'unknown') x_forwarded_for = request.headers.get('X-Forwarded-For', 'none') request_log.append({ "time": datetime.now().isoformat(), "ip": client_ip, "x_forwarded_for": x_forwarded_for, "user_agent": user_agent[:80], "key_prefix": auth[:30], "body_preview": json.dumps(body, ensure_ascii=False)[:150] }) if len(request_log) > 30: request_log.pop(0) logger.info("="*50) logger.info(f"[1] OpenClaw → Space REQUEST") logger.info(f" IP: {client_ip}") logger.info(f" User-Agent: {user_agent}") logger.info(f" X-Forwarded-For: {x_forwarded_for}") logger.info(f" Body: {json.dumps(body, ensure_ascii=False)[:500]}") logger.info("="*50) if not auth.startswith('Bearer '): logger.info("[2] Space → OpenClaw RESPONSE: 401 Missing auth") return JSONResponse({"error": {"message": "Missing auth", "type": "auth_error"}}, 401) key = auth.replace('Bearer ', '') logger.info(f"[3] Calling get_org_id with sessionKey={key[:20]}...") org_id = await retry_async(lambda: get_org_id(key)) if not org_id: logger.error(f"[2] Space → OpenClaw RESPONSE: 401 Invalid key") return JSONResponse({"error": {"message": "Invalid key or network error", "type": "auth_error"}}, 401) conv_id = conversation_cache.get(key) if not conv_id: conv_id = await retry_async(lambda: create_conv(key, org_id)) if conv_id: conversation_cache[key] = conv_id save_cache() if not conv_id: logger.error("Failed to create conversation") return JSONResponse({"error": {"message": "Conv error", "type": "api_error"}}, 500) messages = body.get('messages', []) tools = body.get('tools', []) prompt = extract_prompt(messages) prompt = enhance_prompt(prompt, messages) if tools: prompt = inject_react_prompt(prompt, tools) attachments = extract_attachments(messages) model = body.get('model', 'claude-sonnet-4-6') payload = { 'prompt': prompt, 'timezone': body.get('timezone', 'Asia/Shanghai'), 'locale': body.get('locale', 'en-US'), 'model': model, 'rendering_mode': 'messages', 'attachments': attachments, 'files': [], 'tools': [ {"type": "web_search_v0", "name": "web_search"}, {"type": "artifacts_v0", "name": "artifacts"}, {"type": "repl_v0", "name": "repl"} ] } logger.info(f"Sending to Claude: attachments={len(attachments)}") logger.info(f"Payload to Claude: {json.dumps(payload, ensure_ascii=False)[:1000]}") stream = body.get('stream', True) # 非流式响应 if not stream: try: full_text = '' headers = { 'Content-Type': 'application/json', 'Cookie': f'sessionKey={key}', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/event-stream', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://claude.ai/chats', 'Origin': 'https://claude.ai', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin' } async with httpx.AsyncClient() as client: async with client.stream('POST', f'https://claude.ai/api/organizations/{org_id}/chat_conversations/{conv_id}/completion', headers=headers, json=payload, timeout=120.0) as response: async for line in response.aiter_lines(): if line.startswith('data: '): try: data = json.loads(line[6:]) if data.get('type') == 'content_block_delta' and data.get('delta', {}).get('text'): full_text += data['delta']['text'] except: pass return JSONResponse({ "id": f"chatcmpl-{int(datetime.now().timestamp())}", "object": "chat.completion", "created": int(datetime.now().timestamp()), "model": model, "choices": [{ "index": 0, "message": {"role": "assistant", "content": full_text}, "finish_reason": "stop" }] }) except Exception as e: logger.error(f"Non-stream error: {e}") return JSONResponse({"error": {"message": str(e), "type": "api_error"}}, 500) # 流式响应 async def generate(): try: full_text = '' headers = { 'Content-Type': 'application/json', 'Cookie': f'sessionKey={key}', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/event-stream', 'Accept-Language': 'en-US,en;q=0.9', 'Referer': 'https://claude.ai/chats', 'Origin': 'https://claude.ai', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin' } async with httpx.AsyncClient() as client: async with client.stream('POST', f'https://claude.ai/api/organizations/{org_id}/chat_conversations/{conv_id}/completion', headers=headers, json=payload, timeout=120.0) as response: if response.status_code != 200: error_body = await response.aread() logger.error(f"Claude API error: status={response.status_code}, body={error_body.decode()[:500]}") yield f'data: {json.dumps({"error": f"Claude returned {response.status_code}"})}\n\n' return async for line in response.aiter_lines(): if line.startswith('data: '): try: data = json.loads(line[6:]) if data.get('type') == 'content_block_delta' and data.get('delta', {}).get('text'): text = data['delta']['text'] full_text += text # ReAct: 检测工具调用 react_result = parse_react_output(full_text) if react_result and react_result['type'] == 'tool_use': chunk = { "id": f"chatcmpl-{int(datetime.now().timestamp())}", "object": "chat.completion.chunk", "created": int(datetime.now().timestamp()), "model": model, "choices": [{ "index": 0, "delta": { "tool_calls": [{ "id": react_result['id'], "type": "function", "function": { "name": react_result['name'], "arguments": json.dumps(react_result['input']) } }] }, "finish_reason": "tool_calls" }] } yield f"data: {json.dumps(chunk)}\n\n" yield "data: [DONE]\n\n" return # 正常文本 chunk = { "id": f"chatcmpl-{int(datetime.now().timestamp())}", "object": "chat.completion.chunk", "created": int(datetime.now().timestamp()), "model": model, "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}] } yield f"data: {json.dumps(chunk)}\n\n" except: pass yield "data: [DONE]\n\n" except Exception as e: logger.error(f"Stream error: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") except Exception as e: duration = (datetime.now() - start_time).total_seconds() logger.error(f"Request failed: {e}, duration={duration}s") return JSONResponse({"error": {"message": str(e), "type": "api_error"}}, 500) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)