Spaces:
Running
Running
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| import os | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import List, Tuple | |
| # ==================== CONFIGURATION ==================== | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| raise ValueError("HF_TOKEN environment variable is required! Add it in Space Settings β Repository secrets.") | |
| # Kimi-K2.6 served via the Novita inference provider on HF | |
| # Note: provider="novita" is set in InferenceClient, so NO :novita suffix here | |
| MODEL_NAME = "moonshotai/Kimi-K2.6" | |
| HISTORY_FILE = "chat_history.json" | |
| MAX_HISTORY_LENGTH = 20 # Keep last 20 messages | |
| # Token limits for response lengths | |
| RESPONSE_LENGTHS = { | |
| "Short": 512, | |
| "Medium": 1024, | |
| "Long": 4096, | |
| } | |
| MAX_FILE_SIZE_MB = 5 | |
| # ==================== INFERENCE CLIENT ==================== | |
| def create_client(): | |
| """Create and validate the HF Inference Client with graceful fallback.""" | |
| try: | |
| c = InferenceClient( | |
| provider="novita", | |
| api_key=HF_TOKEN, | |
| ) | |
| print(f"β Connected to {MODEL_NAME} via Novita") | |
| return c | |
| except Exception as e: | |
| print(f"β οΈ Client initialization warning: {e}") | |
| return None | |
| client = create_client() | |
| # ==================== MEMORY MANAGEMENT ==================== | |
| def load_history() -> List[dict]: | |
| """Load conversation history from JSON file.""" | |
| if not os.path.exists(HISTORY_FILE): | |
| return [] | |
| try: | |
| with open(HISTORY_FILE, "r", encoding="utf-8") as f: | |
| history = json.load(f) | |
| if len(history) > MAX_HISTORY_LENGTH: | |
| history = history[-MAX_HISTORY_LENGTH:] | |
| save_history(history) | |
| return history | |
| except Exception as e: | |
| print(f"Error loading history: {e}") | |
| return [] | |
| def save_history(history: List[dict]): | |
| """Save conversation history to JSON file.""" | |
| try: | |
| if len(history) > MAX_HISTORY_LENGTH: | |
| history = history[-MAX_HISTORY_LENGTH:] | |
| with open(HISTORY_FILE, "w", encoding="utf-8") as f: | |
| json.dump(history, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| print(f"Error saving history: {e}") | |
| def clear_memory() -> str: | |
| """Clear conversation history file.""" | |
| if os.path.exists(HISTORY_FILE): | |
| os.remove(HISTORY_FILE) | |
| return "β Memory cleared successfully!" | |
| # ==================== FILE PROCESSING ==================== | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| """Extract text from PDF using PyPDF2.""" | |
| try: | |
| import PyPDF2 | |
| text = "" | |
| with open(file_path, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for page in reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text.strip() | |
| except ImportError: | |
| return "β PyPDF2 not installed. PDF reading is unavailable." | |
| except Exception as e: | |
| return f"β Error reading PDF: {str(e)}" | |
| def process_uploaded_file(file) -> Tuple[str, str]: | |
| """Process an uploaded file and return (content_string, status_message).""" | |
| if file is None: | |
| return "", "" | |
| try: | |
| file_path = file.name | |
| file_size_mb = os.path.getsize(file_path) / (1024 * 1024) | |
| if file_size_mb > MAX_FILE_SIZE_MB: | |
| return "", f"β File too large ({file_size_mb:.1f} MB). Max: {MAX_FILE_SIZE_MB} MB" | |
| file_ext = Path(file_path).suffix.lower() | |
| file_name = Path(file_path).name | |
| if file_ext == ".pdf": | |
| content = extract_text_from_pdf(file_path) | |
| elif file_ext in [".txt", ".py", ".js", ".ts", ".html", ".css", | |
| ".json", ".md", ".java", ".cpp", ".c", ".rs", ".go"]: | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| content = f.read() | |
| else: | |
| return "", f"β Unsupported file type: {file_ext}" | |
| # Truncate very long files | |
| if len(content) > 50_000: | |
| content = content[:50_000] + "\n\n[... Content truncated due to length ...]" | |
| formatted = f"π **File: {file_name}**\n```\n{content}\n```\n\n" | |
| return formatted, f"β File loaded: {file_name} ({file_size_mb:.2f} MB)" | |
| except Exception as e: | |
| return "", f"β Error processing file: {str(e)}" | |
| # ==================== CHAT ENGINE ==================== | |
| def build_messages(history: List[dict], full_message: str) -> List[dict]: | |
| """ | |
| Convert Gradio history format into the OpenAI-compatible messages list | |
| that the Novita/Kimi API expects. | |
| """ | |
| system_prompt = ( | |
| "You are Kimi, a highly intelligent and helpful AI assistant created by Moonshot AI. " | |
| "You excel at reasoning, coding, mathematics, and analysis. " | |
| "Provide clear, structured, and accurate responses." | |
| ) | |
| messages = [{"role": "system", "content": system_prompt}] | |
| # Add existing conversation history | |
| for turn in history: | |
| if turn.get("role") in ("user", "assistant") and turn.get("content"): | |
| # Strip out any <thought> blocks from assistant history | |
| # (Kimi docs: do NOT include reasoning in multi-turn history) | |
| content = turn["content"] | |
| messages.append({"role": turn["role"], "content": content}) | |
| # Add current user message | |
| messages.append({ | |
| "role": "user", | |
| "content": [{"type": "text", "text": full_message}] | |
| }) | |
| return messages | |
| def parse_kimi_response(completion) -> Tuple[str, str]: | |
| """ | |
| Parse the API response. | |
| Returns (reasoning_text, final_answer_text). | |
| reasoning_text may be empty if the model didn't return it. | |
| """ | |
| try: | |
| msg = completion.choices[0].message | |
| reasoning = getattr(msg, "reasoning", "") or "" | |
| content = msg.content or "" | |
| return reasoning.strip(), content.strip() | |
| except Exception as e: | |
| return "", f"β οΈ Error parsing response: {str(e)}" | |
| def chat_engine( | |
| message: str, | |
| history: List[dict], | |
| response_length: str, | |
| file_content: str = "", | |
| ): | |
| """ | |
| Main chat function. Returns updated Gradio history. | |
| history is a list of {role, content} dicts (Gradio messages format). | |
| """ | |
| if not message.strip(): | |
| return history | |
| if client is None: | |
| err = ( | |
| "β οΈ Model client not initialized.\n\n" | |
| "Please check that:\n" | |
| "1. `HF_TOKEN` is set in your Space Secrets.\n" | |
| "2. The token has **read** access and Inference API is enabled." | |
| ) | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": err}) | |
| return history | |
| try: | |
| full_message = (file_content + message) if file_content else message | |
| messages = build_messages(history, full_message) | |
| max_tokens = RESPONSE_LENGTHS.get(response_length, 1024) | |
| # ββ API call ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=1.0, | |
| top_p=0.95, | |
| stream=False, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| reasoning, answer = parse_kimi_response(completion) | |
| # Format the bot reply: show reasoning in a collapsible block if present | |
| if reasoning: | |
| bot_reply = ( | |
| f"<details>\n" | |
| f"<summary>π§ <b>Kimi's Reasoning (click to expand)</b></summary>\n\n" | |
| f"{reasoning}\n\n" | |
| f"</details>\n\n" | |
| f"---\n\n{answer}" | |
| ) | |
| else: | |
| bot_reply = answer | |
| # Save to persistent file (clean format, no reasoning blocks) | |
| persistent = load_history() | |
| persistent.append({"user": message, "bot": answer}) | |
| save_history(persistent) | |
| # Update Gradio history | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": bot_reply}) | |
| return history | |
| except Exception as e: | |
| error_str = str(e) | |
| # ββ Friendly error hints ββββββββββββββββββββββββββββββββββββββββββββ | |
| if "401" in error_str or "Unauthorized" in error_str: | |
| hint = ( | |
| "π **401 Unauthorized** β Your HF token is invalid or missing.\n\n" | |
| "Fix: Go to Space Settings β Secrets β add `HF_TOKEN` with a valid token." | |
| ) | |
| elif "403" in error_str or "Forbidden" in error_str: | |
| hint = ( | |
| "π« **403 Forbidden** β Your token doesn't have access to this model or provider.\n\n" | |
| "Fix: Make sure your HF token has `read` permission and you have accepted " | |
| "the model's license on Hugging Face." | |
| ) | |
| elif "429" in error_str or "rate limit" in error_str.lower(): | |
| hint = ( | |
| "β³ **429 Rate Limited** β Too many requests. Please wait 30 seconds and try again.\n\n" | |
| "This is a Novita provider limit, not a code error." | |
| ) | |
| elif "503" in error_str or "loading" in error_str.lower(): | |
| hint = ( | |
| "β³ **503 Model Loading** β The model is warming up on the server.\n\n" | |
| "Wait 30β60 seconds and resend your message." | |
| ) | |
| elif "model" in error_str.lower() and "not found" in error_str.lower(): | |
| hint = ( | |
| "β **Model Not Found** β The model ID or provider tag may have changed.\n\n" | |
| f"Current model: `{MODEL_NAME}`\n" | |
| "Check the HF model page for the latest provider tag." | |
| ) | |
| else: | |
| hint = f"β οΈ **Unexpected Error:**\n```\n{error_str}\n```\n\nPlease try again in a moment." | |
| history.append({"role": "user", "content": message}) | |
| history.append({"role": "assistant", "content": hint}) | |
| return history | |
| # ==================== GRADIO INTERFACE ==================== | |
| def create_interface(): | |
| """Build and return the Gradio Blocks interface.""" | |
| css = """ | |
| /* ββ Google Font ββ */ | |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap'); | |
| /* ββ Root / Body ββ */ | |
| body, .gradio-container { | |
| font-family: 'Inter', sans-serif !important; | |
| background: #0d0f14 !important; | |
| color: #e2e8f0 !important; | |
| } | |
| /* ββ Header block ββ */ | |
| .kimi-header { | |
| background: linear-gradient(135deg, #1a1f2e 0%, #0f1623 50%, #1a1035 100%); | |
| border: 1px solid rgba(139, 92, 246, 0.25); | |
| border-radius: 16px; | |
| padding: 28px 36px; | |
| margin-bottom: 20px; | |
| text-align: center; | |
| box-shadow: 0 4px 32px rgba(139, 92, 246, 0.12); | |
| } | |
| .kimi-header h1 { | |
| font-size: 2rem !important; | |
| font-weight: 700 !important; | |
| background: linear-gradient(90deg, #a78bfa, #7c3aed, #c4b5fd) !important; | |
| -webkit-background-clip: text !important; | |
| -webkit-text-fill-color: transparent !important; | |
| margin: 0 0 6px 0 !important; | |
| } | |
| .kimi-header p { | |
| color: #94a3b8 !important; | |
| font-size: 0.9rem !important; | |
| margin: 0 !important; | |
| } | |
| .kimi-header .badge { | |
| display: inline-block; | |
| background: rgba(124, 58, 237, 0.18); | |
| border: 1px solid rgba(139, 92, 246, 0.35); | |
| color: #c4b5fd; | |
| border-radius: 999px; | |
| padding: 3px 12px; | |
| font-size: 0.78rem; | |
| margin-top: 10px; | |
| } | |
| /* ββ Chat bubble overrides ββ */ | |
| .message-wrap { | |
| padding: 6px 0 !important; | |
| } | |
| .user .message-bubble-border, .user .message { | |
| background: linear-gradient(135deg, #3b1d8a, #5b21b6) !important; | |
| border: none !important; | |
| color: #f5f3ff !important; | |
| border-radius: 18px 18px 4px 18px !important; | |
| } | |
| .bot .message-bubble-border, .bot .message { | |
| background: #1e2333 !important; | |
| border: 1px solid rgba(139, 92, 246, 0.2) !important; | |
| color: #e2e8f0 !important; | |
| border-radius: 18px 18px 18px 4px !important; | |
| } | |
| /* ββ Input textbox ββ */ | |
| .input-wrap textarea { | |
| background: #141824 !important; | |
| border: 1px solid rgba(139, 92, 246, 0.3) !important; | |
| border-radius: 12px !important; | |
| color: #e2e8f0 !important; | |
| font-size: 0.95rem !important; | |
| padding: 12px 16px !important; | |
| resize: none !important; | |
| transition: border-color 0.2s; | |
| } | |
| .input-wrap textarea:focus { | |
| border-color: rgba(139, 92, 246, 0.7) !important; | |
| outline: none !important; | |
| box-shadow: 0 0 0 3px rgba(139, 92, 246, 0.12) !important; | |
| } | |
| /* ββ Send button ββ */ | |
| #send-btn { | |
| background: linear-gradient(135deg, #7c3aed, #5b21b6) !important; | |
| border: none !important; | |
| border-radius: 12px !important; | |
| color: #fff !important; | |
| font-weight: 600 !important; | |
| font-size: 0.95rem !important; | |
| letter-spacing: 0.02em !important; | |
| transition: all 0.2s ease !important; | |
| box-shadow: 0 4px 14px rgba(124, 58, 237, 0.4) !important; | |
| } | |
| #send-btn:hover { | |
| background: linear-gradient(135deg, #6d28d9, #4c1d95) !important; | |
| box-shadow: 0 6px 20px rgba(124, 58, 237, 0.55) !important; | |
| transform: translateY(-1px) !important; | |
| } | |
| /* ββ Settings accordion ββ */ | |
| .gr-accordion { | |
| background: #141824 !important; | |
| border: 1px solid rgba(139, 92, 246, 0.2) !important; | |
| border-radius: 12px !important; | |
| margin-top: 12px !important; | |
| } | |
| .gr-accordion .label-wrap { | |
| color: #a78bfa !important; | |
| font-weight: 600 !important; | |
| } | |
| /* ββ Radio buttons ββ */ | |
| .gr-radio label { | |
| background: #1e2333 !important; | |
| border: 1px solid rgba(139, 92, 246, 0.2) !important; | |
| border-radius: 8px !important; | |
| color: #cbd5e1 !important; | |
| padding: 6px 14px !important; | |
| transition: all 0.15s; | |
| } | |
| .gr-radio label:hover { | |
| border-color: rgba(139, 92, 246, 0.5) !important; | |
| color: #e2e8f0 !important; | |
| } | |
| .gr-radio input:checked + label { | |
| background: rgba(124, 58, 237, 0.25) !important; | |
| border-color: #7c3aed !important; | |
| color: #c4b5fd !important; | |
| } | |
| /* ββ Action buttons ββ */ | |
| #clear-file-btn { | |
| background: #1e2333 !important; | |
| border: 1px solid rgba(139, 92, 246, 0.25) !important; | |
| color: #94a3b8 !important; | |
| border-radius: 8px !important; | |
| font-size: 0.82rem !important; | |
| transition: all 0.15s; | |
| } | |
| #clear-file-btn:hover { | |
| border-color: rgba(139, 92, 246, 0.5) !important; | |
| color: #c4b5fd !important; | |
| } | |
| #clear-chat-btn { | |
| background: rgba(239, 68, 68, 0.1) !important; | |
| border: 1px solid rgba(239, 68, 68, 0.3) !important; | |
| color: #f87171 !important; | |
| border-radius: 8px !important; | |
| font-size: 0.82rem !important; | |
| transition: all 0.15s; | |
| } | |
| #clear-chat-btn:hover { | |
| background: rgba(239, 68, 68, 0.2) !important; | |
| border-color: rgba(239, 68, 68, 0.5) !important; | |
| } | |
| /* ββ Status messages ββ */ | |
| .file-status p { color: #4ade80 !important; font-size: 0.82rem !important; } | |
| /* ββ Footer tips ββ */ | |
| .tips-block { | |
| background: #141824; | |
| border: 1px solid rgba(139, 92, 246, 0.15); | |
| border-radius: 12px; | |
| padding: 14px 20px; | |
| margin-top: 12px; | |
| } | |
| .tips-block p, .tips-block li { color: #64748b !important; font-size: 0.82rem !important; } | |
| /* ββ Chatbot container ββ */ | |
| .chatbot-wrap { | |
| border: 1px solid rgba(139, 92, 246, 0.2) !important; | |
| border-radius: 14px !important; | |
| background: #0f1219 !important; | |
| overflow: hidden !important; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="Kimi K2.6 Β· AI Reasoning Chatbot", | |
| css=css, | |
| theme=gr.themes.Base( | |
| primary_hue="violet", | |
| neutral_hue="slate", | |
| font=gr.themes.GoogleFont("Inter"), | |
| ).set( | |
| body_background_fill="#0d0f14", | |
| body_text_color="#e2e8f0", | |
| block_background_fill="#141824", | |
| block_border_color="rgba(139,92,246,0.2)", | |
| input_background_fill="#141824", | |
| button_primary_background_fill="linear-gradient(135deg,#7c3aed,#5b21b6)", | |
| button_primary_text_color="#ffffff", | |
| ), | |
| ) as demo: | |
| # ββ Header βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.HTML(""" | |
| <div class="kimi-header"> | |
| <h1>π Kimi K2.6</h1> | |
| <p>Moonshot AI's 1-Trillion-Parameter Reasoning Model</p> | |
| <span class="badge">β‘ Powered by Novita Inference API Β· 256K Context Window</span> | |
| </div> | |
| """) | |
| # ββ State ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| file_content_state = gr.State("") | |
| # ββ Chat window ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| chatbot = gr.Chatbot( | |
| label="", | |
| height=520, | |
| show_label=False, | |
| type="messages", | |
| render_markdown=True, | |
| bubble_full_width=False, | |
| elem_classes=["chatbot-wrap"], | |
| avatar_images=( | |
| None, | |
| "https://huggingface.co/moonshotai/Kimi-K2.6/resolve/main/figures/kimi-logo.png", | |
| ), | |
| ) | |
| # ββ Input row ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Row(equal_height=True): | |
| msg = gr.Textbox( | |
| label="", | |
| placeholder="β¦ Ask Kimi anything β coding, math, reasoning, analysis...", | |
| lines=2, | |
| max_lines=6, | |
| scale=8, | |
| show_label=False, | |
| elem_classes=["input-wrap"], | |
| container=False, | |
| ) | |
| send_btn = gr.Button( | |
| "Send π", | |
| variant="primary", | |
| scale=1, | |
| min_width=100, | |
| elem_id="send-btn", | |
| ) | |
| # ββ Settings accordion βββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Accordion("βοΈ Settings & File Upload", open=False): | |
| with gr.Row(): | |
| response_length = gr.Radio( | |
| choices=["Short", "Medium", "Long"], | |
| value="Medium", | |
| label="π Response Length", | |
| info="Short Β· 512 tok Medium Β· 1024 tok Long Β· 4096 tok", | |
| scale=2, | |
| ) | |
| gr.HTML("<hr style='border-color:rgba(139,92,246,0.15);margin:12px 0;'>") | |
| file_upload = gr.File( | |
| label="π Upload File for Analysis (PDF Β· Code Β· Text β max 5 MB)", | |
| file_types=[".txt", ".pdf", ".py", ".js", ".ts", | |
| ".html", ".css", ".json", ".md", ".java", | |
| ".cpp", ".c", ".rs", ".go"], | |
| type="filepath", | |
| ) | |
| file_status = gr.Markdown("", elem_classes=["file-status"]) | |
| with gr.Row(): | |
| clear_file_btn = gr.Button( | |
| "ποΈ Clear File", | |
| size="sm", | |
| variant="secondary", | |
| elem_id="clear-file-btn", | |
| ) | |
| clear_btn = gr.Button( | |
| "π§Ή Clear Chat", | |
| size="sm", | |
| variant="stop", | |
| elem_id="clear-chat-btn", | |
| ) | |
| clear_status = gr.Markdown("") | |
| # ββ Tips footer ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.HTML(""" | |
| <div class="tips-block"> | |
| <b style="color:#7c3aed;">π‘ Tips</b> | |
| <ul style="margin:6px 0 0 18px;padding:0;"> | |
| <li>Upload a PDF or code file, then ask Kimi to summarize, review, or debug it.</li> | |
| <li>Use <b>Long</b> mode for complex coding or multi-step math problems.</li> | |
| <li>Kimi's internal <b>Reasoning</b> block (if shown) reveals step-by-step thinking.</li> | |
| <li>Press <b>Enter</b> or click <b>Send</b> to submit your message.</li> | |
| </ul> | |
| </div> | |
| """) | |
| # ββ Event handler functions ββββββββββββββββββββββββββββββββββββββββ | |
| def handle_file_upload(file): | |
| content, status = process_uploaded_file(file) | |
| return content, status | |
| def clear_file(): | |
| return None, "", "" | |
| def clear_conversation(): | |
| result = clear_memory() | |
| return [], result | |
| # ββ Wire up events βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| file_upload.change( | |
| fn=handle_file_upload, | |
| inputs=[file_upload], | |
| outputs=[file_content_state, file_status], | |
| ) | |
| clear_file_btn.click( | |
| fn=clear_file, | |
| outputs=[file_upload, file_content_state, file_status], | |
| ) | |
| send_btn.click( | |
| fn=chat_engine, | |
| inputs=[msg, chatbot, response_length, file_content_state], | |
| outputs=[chatbot], | |
| api_name="chat", | |
| ) | |
| msg.submit( | |
| fn=chat_engine, | |
| inputs=[msg, chatbot, response_length, file_content_state], | |
| outputs=[chatbot], | |
| api_name="chat", | |
| ) | |
| clear_btn.click( | |
| fn=clear_conversation, | |
| outputs=[chatbot, clear_status], | |
| api_name="clear_memory", | |
| ) | |
| return demo | |
| # ==================== LAUNCH ==================== | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| ) | |