Spaces:
Running
Running
| import os | |
| import uuid | |
| import time | |
| import asyncio | |
| import threading | |
| import json | |
| import re | |
| from typing import Optional | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| API_SECRET_KEY = os.getenv("API_SECRET_KEY", "2026-2026") | |
| ZAI_MODELS = ["GLM-5.1", "GLM-5-Turbo", "GLM-5V-Turbo","GLM-5", "GLM-4.7", "GLM-4.6V", "GLM-4.5-Air" ] | |
| class AsyncBrowserThread(threading.Thread): | |
| def __init__(self): | |
| super().__init__(daemon=True) | |
| self.loop = asyncio.new_event_loop() | |
| self.ready_event = threading.Event() | |
| self.browser = None | |
| self.playwright = None | |
| def run(self): | |
| asyncio.set_event_loop(self.loop) | |
| self.loop.run_until_complete(self._start_browser()) | |
| self.ready_event.set() | |
| print("[ZAI-SERVER] Browser is ready!") | |
| self.loop.run_forever() | |
| async def _start_browser(self): | |
| from playwright.async_api import async_playwright | |
| print("[ZAI-SERVER] Starting Chrome...") | |
| self.playwright = await async_playwright().start() | |
| self.browser = await self.playwright.chromium.launch( | |
| headless=True, | |
| channel="chrome", | |
| args=[ | |
| '--disable-blink-features=AutomationControlled', | |
| '--no-sandbox', | |
| '--disable-gpu', | |
| '--disable-dev-shm-usage', | |
| '--disable-setuid-sandbox', | |
| '--single-process', | |
| '--no-zygote', | |
| ] | |
| ) | |
| print("[ZAI-SERVER] Chrome launched!") | |
| async def _talk_to_zai(self, prompt: str): | |
| context = await self.browser.new_context( | |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", | |
| viewport={'width': 1920, 'height': 1080} | |
| ) | |
| await context.add_init_script( | |
| "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" | |
| ) | |
| page = await context.new_page() | |
| try: | |
| page.set_default_timeout(120000) | |
| await page.goto("https://chat.z.ai/", wait_until="domcontentloaded") | |
| await asyncio.sleep(3) | |
| # ββ Input: confirmed from DOM βββββββββββββββββββββββββββββ | |
| await page.wait_for_selector("textarea#chat-input", timeout=60000) | |
| await page.fill("textarea#chat-input", prompt) | |
| await asyncio.sleep(0.5) | |
| await page.press("textarea#chat-input", "Enter") | |
| print(f"[ZAI-SERVER] Sent ({len(prompt)} chars)") | |
| # ββ Wait for response container βββββββββββββββββββββββββββ | |
| await asyncio.sleep(2) | |
| await page.wait_for_selector("#response-content-container", timeout=120000) | |
| # ββ Wait until response stabilizes ββββββββββββββββββββββββ | |
| last_text = "" | |
| unchanged_count = 0 | |
| while unchanged_count < 5: | |
| current_text = await page.evaluate(""" | |
| () => { | |
| const prose = document.querySelector( | |
| '#response-content-container .markdown-prose' | |
| ); | |
| if (!prose) return ''; | |
| const clone = prose.cloneNode(true); | |
| // β Ψ΄ΩΩ ΩΩ direct div children (thinking blocks) | |
| // Ψ§ΩΨ±Ψ― Ψ§ΩΨΩΩΩΩ ΩΩΩ ΩΩ <p> Ω Ψ΄ <div> | |
| clone.querySelectorAll(':scope > div').forEach(el => el.remove()); | |
| return clone.innerText.trim(); | |
| } | |
| """) | |
| if current_text == last_text and current_text.strip(): | |
| unchanged_count += 1 | |
| else: | |
| last_text = current_text | |
| unchanged_count = 0 | |
| await asyncio.sleep(1.0) | |
| print(f"[ZAI-SERVER] Response: {len(last_text)} chars") | |
| return last_text.strip() | |
| except Exception as e: | |
| print(f"[ZAI-SERVER] Error: {e}") | |
| raise e | |
| finally: | |
| await page.close() | |
| await context.close() | |
| def process_request(self, prompt: str): | |
| if not self.ready_event.wait(timeout=60): | |
| raise Exception("Error From Browser") | |
| future = asyncio.run_coroutine_threadsafe(self._talk_to_zai(prompt), self.loop) | |
| return future.result(timeout=120) | |
| browser_engine = AsyncBrowserThread() | |
| browser_engine.start() | |
| # ==================================================================== | |
| # Smart Prompt Builder | |
| # ==================================================================== | |
| def format_prompt(messages, tools=None): | |
| parts = [] | |
| system_parts = [] | |
| has_tool_results = False | |
| user_question = "" | |
| for msg in messages: | |
| role = msg.get("role", "") | |
| msg_type = msg.get("type", "") | |
| content = msg.get("content", "") | |
| if isinstance(content, list): | |
| text_parts = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| text_parts.append(item.get("text", item.get("content", str(item)))) | |
| else: | |
| text_parts.append(str(item)) | |
| content = "\n".join(text_parts) | |
| if role == "system": | |
| system_parts.append(content) | |
| elif role == "tool": | |
| has_tool_results = True | |
| tool_name = msg.get("name", "tool") | |
| parts.append(f"[TOOL RESULT from '{tool_name}']:\n{content}") | |
| elif msg_type == "function_call_output": | |
| has_tool_results = True | |
| call_id = msg.get("call_id", "") | |
| output_content = msg.get("output", content) | |
| parts.append(f"[TOOL RESULT (call_id: {call_id})]:\n{output_content}") | |
| elif msg_type == "function_call": | |
| func_name = msg.get("name", "?") | |
| func_args = msg.get("arguments", "{}") | |
| parts.append(f"[PREVIOUS TOOL CALL: Called '{func_name}' with arguments: {func_args}]") | |
| elif role == "assistant": | |
| assistant_content = content if content else "" | |
| tool_calls_in_msg = msg.get("tool_calls", []) | |
| if tool_calls_in_msg: | |
| tc_descriptions = [] | |
| for tc in tool_calls_in_msg: | |
| func = tc.get("function", {}) | |
| tc_descriptions.append( | |
| f"Called '{func.get('name', '?')}' with: {func.get('arguments', '{}')}" | |
| ) | |
| assistant_content += "\n[Previous tool calls: " + "; ".join(tc_descriptions) + "]" | |
| if assistant_content.strip(): | |
| parts.append(f"[Assistant]: {assistant_content}") | |
| elif role == "user" or (msg_type == "message" and role != "system"): | |
| user_question = content | |
| parts.append(content) | |
| has_tool_results = False | |
| elif content: | |
| parts.append(content) | |
| final = "" | |
| if system_parts: | |
| if tools and not has_tool_results: | |
| final += "=== YOUR ROLE ===\n" | |
| final += "\n\n".join(system_parts) | |
| final += "\n=== END OF ROLE ===\n\n" | |
| else: | |
| final += "=== SYSTEM INSTRUCTIONS (FOLLOW STRICTLY) ===\n" | |
| final += "\n\n".join(system_parts) | |
| final += "\n=== END OF INSTRUCTIONS ===\n\n" | |
| if tools and not has_tool_results: | |
| final += format_tools_instruction(tools, user_question) | |
| if has_tool_results: | |
| final += "=== CONTEXT FROM TOOLS ===\n" | |
| final += "The following information was retrieved by the tools you requested.\n" | |
| final += "Use ONLY this information to answer the user's question.\n\n" | |
| if parts: | |
| final += "\n".join(parts) | |
| if has_tool_results: | |
| final += "\n\n=== INSTRUCTION ===\n" | |
| final += "Now answer the user's question based ONLY on the tool results above.\n" | |
| return final | |
| def format_tools_instruction(tools, user_question=""): | |
| instruction = "\n=== MANDATORY TOOL USAGE ===\n" | |
| instruction += "You MUST use one of the tools below to answer this question.\n" | |
| instruction += "Do NOT answer directly. Do NOT say you don't have information.\n" | |
| instruction += "You MUST respond with ONLY a JSON object to call the tool.\n\n" | |
| instruction += "RESPONSE FORMAT - respond with ONLY this JSON, nothing else:\n" | |
| instruction += '{"tool_calls": [{"name": "TOOL_NAME", "arguments": {"param": "value"}}]}\n\n' | |
| instruction += "RULES:\n" | |
| instruction += "- Your ENTIRE response must be valid JSON only\n" | |
| instruction += "- No markdown, no code blocks, no explanation\n" | |
| instruction += "- No text before or after the JSON\n\n" | |
| instruction += "Available tools:\n\n" | |
| for tool in tools: | |
| func = tool.get("function", tool) | |
| name = func.get("name", "unknown") | |
| desc = func.get("description", "No description") | |
| params = func.get("parameters", {}) | |
| instruction += f"Tool: {name}\nDescription: {desc}\n" | |
| if params.get("properties"): | |
| instruction += "Parameters:\n" | |
| required_params = params.get("required", []) | |
| for param_name, param_info in params["properties"].items(): | |
| param_type = param_info.get("type", "string") | |
| param_desc = param_info.get("description", "") | |
| is_required = "required" if param_name in required_params else "optional" | |
| instruction += f" - {param_name} ({param_type}, {is_required}): {param_desc}\n" | |
| instruction += "\n" | |
| instruction += "=== END OF TOOLS ===\n\n" | |
| first_tool = tools[0] if tools else {} | |
| first_func = first_tool.get("function", first_tool) | |
| first_name = first_func.get("name", "tool") | |
| instruction += f'EXAMPLE: If the user asks a question, respond with:\n' | |
| instruction += '{"tool_calls": [{"name": "' + first_name + '", "arguments": {"input": "the user question here"}}]}\n\n' | |
| instruction += "Now respond with the JSON to call the appropriate tool:\n\n" | |
| return instruction | |
| def parse_tool_calls(response_text): | |
| cleaned = response_text.strip() | |
| if "```" in cleaned: | |
| code_block_match = re.search(r'```(?:json)?\s*\n?(.*?)\n?\s*```', cleaned, re.DOTALL) | |
| if code_block_match: | |
| cleaned = code_block_match.group(1).strip() | |
| json_candidates = [cleaned] | |
| json_match = re.search(r'\{[\s\S]*"tool_calls"[\s\S]*\}', cleaned) | |
| if json_match: | |
| json_candidates.append(json_match.group(0)) | |
| for candidate in json_candidates: | |
| try: | |
| parsed = json.loads(candidate) | |
| if isinstance(parsed, dict) and "tool_calls" in parsed: | |
| raw_calls = parsed["tool_calls"] | |
| if isinstance(raw_calls, list) and len(raw_calls) > 0: | |
| formatted_calls = [] | |
| for call in raw_calls: | |
| tool_name = call.get("name", "") | |
| arguments = call.get("arguments", {}) | |
| if isinstance(arguments, dict): | |
| arguments_str = json.dumps(arguments, ensure_ascii=False) | |
| else: | |
| arguments_str = str(arguments) | |
| formatted_calls.append({ | |
| "id": f"call_{uuid.uuid4().hex[:24]}", | |
| "type": "function", | |
| "function": { | |
| "name": tool_name, | |
| "arguments": arguments_str | |
| } | |
| }) | |
| return formatted_calls | |
| except (json.JSONDecodeError, TypeError, KeyError): | |
| continue | |
| return None | |
| # ==================================================================== | |
| # FastAPI App | |
| # ==================================================================== | |
| app = FastAPI(title="zai_api for n8n") | |
| def _auth(request: Request) -> bool: | |
| auth = request.headers.get("authorization", "") | |
| return auth.replace("Bearer ", "").strip() == API_SECRET_KEY | |
| async def chat_completions(request: Request): | |
| try: | |
| data = await request.json() | |
| except Exception: | |
| return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON payload"}}) | |
| if not _auth(request): | |
| return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) | |
| messages = data.get("messages", []) | |
| if not messages: | |
| return JSONResponse(status_code=400, content={"error": {"message": "messages field is required"}}) | |
| try: | |
| tools = data.get("tools", None) | |
| prompt = format_prompt(messages, tools=tools) | |
| start_time = time.time() | |
| print(f"[ZAI-SERVER] Processing request ({len(prompt)} chars)") | |
| response_text = browser_engine.process_request(prompt) | |
| p_tokens = len(prompt.split()) | |
| c_tokens = len(response_text.split()) | |
| tool_calls = parse_tool_calls(response_text) if tools else None | |
| if tool_calls: | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", | |
| "object": "chat.completion", | |
| "created": int(start_time), | |
| "model": data.get("model", "GLM-5.1"), | |
| "choices": [{"index": 0, "message": {"role": "assistant", "content": None, | |
| "tool_calls": tool_calls}, "finish_reason": "tool_calls"}], | |
| "usage": {"prompt_tokens": p_tokens, "completion_tokens": c_tokens, | |
| "total_tokens": p_tokens + c_tokens} | |
| } | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", | |
| "object": "chat.completion", | |
| "created": int(start_time), | |
| "model": data.get("model", "GLM-5.1"), | |
| "choices": [{"index": 0, "message": {"role": "assistant", "content": response_text}, | |
| "finish_reason": "stop"}], | |
| "usage": {"prompt_tokens": p_tokens, "completion_tokens": c_tokens, | |
| "total_tokens": p_tokens + c_tokens} | |
| } | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def responses(request: Request): | |
| try: | |
| data = await request.json() | |
| except Exception: | |
| return JSONResponse(status_code=400, content={"error": {"message": "Invalid JSON payload"}}) | |
| if not _auth(request): | |
| return JSONResponse(status_code=401, content={"error": {"message": "Invalid API Key"}}) | |
| input_data = data.get("input", "") | |
| if isinstance(input_data, str): | |
| messages = [{"role": "user", "content": input_data}] | |
| elif isinstance(input_data, list): | |
| messages = input_data | |
| else: | |
| messages = data.get("messages", []) | |
| if not messages: | |
| return JSONResponse(status_code=400, content={"error": {"message": "input field is required"}}) | |
| try: | |
| tools = data.get("tools", None) | |
| instructions = data.get("instructions", "") | |
| if instructions: | |
| messages.insert(0, {"role": "system", "content": instructions}) | |
| prompt = format_prompt(messages, tools=tools) | |
| start_time = time.time() | |
| response_text = browser_engine.process_request(prompt) | |
| p_tokens = len(prompt.split()) | |
| c_tokens = len(response_text.split()) | |
| tool_calls = parse_tool_calls(response_text) if tools else None | |
| if tool_calls: | |
| output_items = [] | |
| for tc in tool_calls: | |
| output_items.append({ | |
| "type": "function_call", | |
| "id": tc["id"], | |
| "call_id": tc["id"], | |
| "name": tc["function"]["name"], | |
| "arguments": tc["function"]["arguments"], | |
| "status": "completed" | |
| }) | |
| return { | |
| "id": f"resp-{uuid.uuid4().hex[:29]}", | |
| "object": "response", | |
| "created_at": int(start_time), | |
| "model": data.get("model", "GLM-5.1"), | |
| "status": "completed", | |
| "output": output_items, | |
| "usage": {"input_tokens": p_tokens, "output_tokens": c_tokens, | |
| "total_tokens": p_tokens + c_tokens} | |
| } | |
| return { | |
| "id": f"resp-{uuid.uuid4().hex[:29]}", | |
| "object": "response", | |
| "created_at": int(start_time), | |
| "model": data.get("model", "GLM-5.1"), | |
| "status": "completed", | |
| "output": [{"type": "message", "role": "assistant", | |
| "content": [{"type": "output_text", "text": response_text}]}], | |
| "usage": {"input_tokens": p_tokens, "output_tokens": c_tokens, | |
| "total_tokens": p_tokens + c_tokens} | |
| } | |
| except Exception as e: | |
| return JSONResponse(status_code=500, content={"error": str(e)}) | |
| async def list_models(): | |
| return { | |
| "object": "list", | |
| "data": [ | |
| {"id": m, "object": "model", "owned_by": "zai"} | |
| for m in ZAI_MODELS | |
| ] | |
| } | |
| async def health_check(): | |
| return { | |
| "status": "running", | |
| "message": "zai_api Server is active!", | |
| "models": ZAI_MODELS, | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |