shrinusn77's picture
Update app.py
d95198a verified
import gradio as gr
from huggingface_hub import InferenceClient
import os
import json
import time
from pathlib import Path
from typing import List, Tuple
# ==================== CONFIGURATION ====================
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
raise ValueError("HF_TOKEN environment variable is required! Add it in Space Settings β†’ Repository secrets.")
# Kimi-K2.6 served via the Novita inference provider on HF
# Note: provider="novita" is set in InferenceClient, so NO :novita suffix here
MODEL_NAME = "moonshotai/Kimi-K2.6"
HISTORY_FILE = "chat_history.json"
MAX_HISTORY_LENGTH = 20 # Keep last 20 messages
# Token limits for response lengths
RESPONSE_LENGTHS = {
"Short": 512,
"Medium": 1024,
"Long": 4096,
}
MAX_FILE_SIZE_MB = 5
# ==================== INFERENCE CLIENT ====================
def create_client():
"""Create and validate the HF Inference Client with graceful fallback."""
try:
c = InferenceClient(
provider="novita",
api_key=HF_TOKEN,
)
print(f"βœ… Connected to {MODEL_NAME} via Novita")
return c
except Exception as e:
print(f"⚠️ Client initialization warning: {e}")
return None
client = create_client()
# ==================== MEMORY MANAGEMENT ====================
def load_history() -> List[dict]:
"""Load conversation history from JSON file."""
if not os.path.exists(HISTORY_FILE):
return []
try:
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
history = json.load(f)
if len(history) > MAX_HISTORY_LENGTH:
history = history[-MAX_HISTORY_LENGTH:]
save_history(history)
return history
except Exception as e:
print(f"Error loading history: {e}")
return []
def save_history(history: List[dict]):
"""Save conversation history to JSON file."""
try:
if len(history) > MAX_HISTORY_LENGTH:
history = history[-MAX_HISTORY_LENGTH:]
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(history, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error saving history: {e}")
def clear_memory() -> str:
"""Clear conversation history file."""
if os.path.exists(HISTORY_FILE):
os.remove(HISTORY_FILE)
return "βœ… Memory cleared successfully!"
# ==================== FILE PROCESSING ====================
def extract_text_from_pdf(file_path: str) -> str:
"""Extract text from PDF using PyPDF2."""
try:
import PyPDF2
text = ""
with open(file_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text += page.extract_text() + "\n"
return text.strip()
except ImportError:
return "❌ PyPDF2 not installed. PDF reading is unavailable."
except Exception as e:
return f"❌ Error reading PDF: {str(e)}"
def process_uploaded_file(file) -> Tuple[str, str]:
"""Process an uploaded file and return (content_string, status_message)."""
if file is None:
return "", ""
try:
file_path = file.name
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
if file_size_mb > MAX_FILE_SIZE_MB:
return "", f"❌ File too large ({file_size_mb:.1f} MB). Max: {MAX_FILE_SIZE_MB} MB"
file_ext = Path(file_path).suffix.lower()
file_name = Path(file_path).name
if file_ext == ".pdf":
content = extract_text_from_pdf(file_path)
elif file_ext in [".txt", ".py", ".js", ".ts", ".html", ".css",
".json", ".md", ".java", ".cpp", ".c", ".rs", ".go"]:
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
else:
return "", f"❌ Unsupported file type: {file_ext}"
# Truncate very long files
if len(content) > 50_000:
content = content[:50_000] + "\n\n[... Content truncated due to length ...]"
formatted = f"πŸ“Ž **File: {file_name}**\n```\n{content}\n```\n\n"
return formatted, f"βœ… File loaded: {file_name} ({file_size_mb:.2f} MB)"
except Exception as e:
return "", f"❌ Error processing file: {str(e)}"
# ==================== CHAT ENGINE ====================
def build_messages(history: List[dict], full_message: str) -> List[dict]:
"""
Convert Gradio history format into the OpenAI-compatible messages list
that the Novita/Kimi API expects.
"""
system_prompt = (
"You are Kimi, a highly intelligent and helpful AI assistant created by Moonshot AI. "
"You excel at reasoning, coding, mathematics, and analysis. "
"Provide clear, structured, and accurate responses."
)
messages = [{"role": "system", "content": system_prompt}]
# Add existing conversation history
for turn in history:
if turn.get("role") in ("user", "assistant") and turn.get("content"):
# Strip out any <thought> blocks from assistant history
# (Kimi docs: do NOT include reasoning in multi-turn history)
content = turn["content"]
messages.append({"role": turn["role"], "content": content})
# Add current user message
messages.append({
"role": "user",
"content": [{"type": "text", "text": full_message}]
})
return messages
def parse_kimi_response(completion) -> Tuple[str, str]:
"""
Parse the API response.
Returns (reasoning_text, final_answer_text).
reasoning_text may be empty if the model didn't return it.
"""
try:
msg = completion.choices[0].message
reasoning = getattr(msg, "reasoning", "") or ""
content = msg.content or ""
return reasoning.strip(), content.strip()
except Exception as e:
return "", f"⚠️ Error parsing response: {str(e)}"
def chat_engine(
message: str,
history: List[dict],
response_length: str,
file_content: str = "",
):
"""
Main chat function. Returns updated Gradio history.
history is a list of {role, content} dicts (Gradio messages format).
"""
if not message.strip():
return history
if client is None:
err = (
"⚠️ Model client not initialized.\n\n"
"Please check that:\n"
"1. `HF_TOKEN` is set in your Space Secrets.\n"
"2. The token has **read** access and Inference API is enabled."
)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": err})
return history
try:
full_message = (file_content + message) if file_content else message
messages = build_messages(history, full_message)
max_tokens = RESPONSE_LENGTHS.get(response_length, 1024)
# ── API call ────────────────────────────────────────────────────────
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
max_tokens=max_tokens,
temperature=1.0,
top_p=0.95,
stream=False,
)
# ───────────────────────────────────────────────────────────────────
reasoning, answer = parse_kimi_response(completion)
# Format the bot reply: show reasoning in a collapsible block if present
if reasoning:
bot_reply = (
f"<details>\n"
f"<summary>🧠 <b>Kimi's Reasoning (click to expand)</b></summary>\n\n"
f"{reasoning}\n\n"
f"</details>\n\n"
f"---\n\n{answer}"
)
else:
bot_reply = answer
# Save to persistent file (clean format, no reasoning blocks)
persistent = load_history()
persistent.append({"user": message, "bot": answer})
save_history(persistent)
# Update Gradio history
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": bot_reply})
return history
except Exception as e:
error_str = str(e)
# ── Friendly error hints ────────────────────────────────────────────
if "401" in error_str or "Unauthorized" in error_str:
hint = (
"πŸ”‘ **401 Unauthorized** β€” Your HF token is invalid or missing.\n\n"
"Fix: Go to Space Settings β†’ Secrets β†’ add `HF_TOKEN` with a valid token."
)
elif "403" in error_str or "Forbidden" in error_str:
hint = (
"🚫 **403 Forbidden** β€” Your token doesn't have access to this model or provider.\n\n"
"Fix: Make sure your HF token has `read` permission and you have accepted "
"the model's license on Hugging Face."
)
elif "429" in error_str or "rate limit" in error_str.lower():
hint = (
"⏳ **429 Rate Limited** β€” Too many requests. Please wait 30 seconds and try again.\n\n"
"This is a Novita provider limit, not a code error."
)
elif "503" in error_str or "loading" in error_str.lower():
hint = (
"⏳ **503 Model Loading** β€” The model is warming up on the server.\n\n"
"Wait 30–60 seconds and resend your message."
)
elif "model" in error_str.lower() and "not found" in error_str.lower():
hint = (
"❓ **Model Not Found** β€” The model ID or provider tag may have changed.\n\n"
f"Current model: `{MODEL_NAME}`\n"
"Check the HF model page for the latest provider tag."
)
else:
hint = f"⚠️ **Unexpected Error:**\n```\n{error_str}\n```\n\nPlease try again in a moment."
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": hint})
return history
# ==================== GRADIO INTERFACE ====================
def create_interface():
"""Build and return the Gradio Blocks interface."""
css = """
/* ── Google Font ── */
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap');
/* ── Root / Body ── */
body, .gradio-container {
font-family: 'Inter', sans-serif !important;
background: #0d0f14 !important;
color: #e2e8f0 !important;
}
/* ── Header block ── */
.kimi-header {
background: linear-gradient(135deg, #1a1f2e 0%, #0f1623 50%, #1a1035 100%);
border: 1px solid rgba(139, 92, 246, 0.25);
border-radius: 16px;
padding: 28px 36px;
margin-bottom: 20px;
text-align: center;
box-shadow: 0 4px 32px rgba(139, 92, 246, 0.12);
}
.kimi-header h1 {
font-size: 2rem !important;
font-weight: 700 !important;
background: linear-gradient(90deg, #a78bfa, #7c3aed, #c4b5fd) !important;
-webkit-background-clip: text !important;
-webkit-text-fill-color: transparent !important;
margin: 0 0 6px 0 !important;
}
.kimi-header p {
color: #94a3b8 !important;
font-size: 0.9rem !important;
margin: 0 !important;
}
.kimi-header .badge {
display: inline-block;
background: rgba(124, 58, 237, 0.18);
border: 1px solid rgba(139, 92, 246, 0.35);
color: #c4b5fd;
border-radius: 999px;
padding: 3px 12px;
font-size: 0.78rem;
margin-top: 10px;
}
/* ── Chat bubble overrides ── */
.message-wrap {
padding: 6px 0 !important;
}
.user .message-bubble-border, .user .message {
background: linear-gradient(135deg, #3b1d8a, #5b21b6) !important;
border: none !important;
color: #f5f3ff !important;
border-radius: 18px 18px 4px 18px !important;
}
.bot .message-bubble-border, .bot .message {
background: #1e2333 !important;
border: 1px solid rgba(139, 92, 246, 0.2) !important;
color: #e2e8f0 !important;
border-radius: 18px 18px 18px 4px !important;
}
/* ── Input textbox ── */
.input-wrap textarea {
background: #141824 !important;
border: 1px solid rgba(139, 92, 246, 0.3) !important;
border-radius: 12px !important;
color: #e2e8f0 !important;
font-size: 0.95rem !important;
padding: 12px 16px !important;
resize: none !important;
transition: border-color 0.2s;
}
.input-wrap textarea:focus {
border-color: rgba(139, 92, 246, 0.7) !important;
outline: none !important;
box-shadow: 0 0 0 3px rgba(139, 92, 246, 0.12) !important;
}
/* ── Send button ── */
#send-btn {
background: linear-gradient(135deg, #7c3aed, #5b21b6) !important;
border: none !important;
border-radius: 12px !important;
color: #fff !important;
font-weight: 600 !important;
font-size: 0.95rem !important;
letter-spacing: 0.02em !important;
transition: all 0.2s ease !important;
box-shadow: 0 4px 14px rgba(124, 58, 237, 0.4) !important;
}
#send-btn:hover {
background: linear-gradient(135deg, #6d28d9, #4c1d95) !important;
box-shadow: 0 6px 20px rgba(124, 58, 237, 0.55) !important;
transform: translateY(-1px) !important;
}
/* ── Settings accordion ── */
.gr-accordion {
background: #141824 !important;
border: 1px solid rgba(139, 92, 246, 0.2) !important;
border-radius: 12px !important;
margin-top: 12px !important;
}
.gr-accordion .label-wrap {
color: #a78bfa !important;
font-weight: 600 !important;
}
/* ── Radio buttons ── */
.gr-radio label {
background: #1e2333 !important;
border: 1px solid rgba(139, 92, 246, 0.2) !important;
border-radius: 8px !important;
color: #cbd5e1 !important;
padding: 6px 14px !important;
transition: all 0.15s;
}
.gr-radio label:hover {
border-color: rgba(139, 92, 246, 0.5) !important;
color: #e2e8f0 !important;
}
.gr-radio input:checked + label {
background: rgba(124, 58, 237, 0.25) !important;
border-color: #7c3aed !important;
color: #c4b5fd !important;
}
/* ── Action buttons ── */
#clear-file-btn {
background: #1e2333 !important;
border: 1px solid rgba(139, 92, 246, 0.25) !important;
color: #94a3b8 !important;
border-radius: 8px !important;
font-size: 0.82rem !important;
transition: all 0.15s;
}
#clear-file-btn:hover {
border-color: rgba(139, 92, 246, 0.5) !important;
color: #c4b5fd !important;
}
#clear-chat-btn {
background: rgba(239, 68, 68, 0.1) !important;
border: 1px solid rgba(239, 68, 68, 0.3) !important;
color: #f87171 !important;
border-radius: 8px !important;
font-size: 0.82rem !important;
transition: all 0.15s;
}
#clear-chat-btn:hover {
background: rgba(239, 68, 68, 0.2) !important;
border-color: rgba(239, 68, 68, 0.5) !important;
}
/* ── Status messages ── */
.file-status p { color: #4ade80 !important; font-size: 0.82rem !important; }
/* ── Footer tips ── */
.tips-block {
background: #141824;
border: 1px solid rgba(139, 92, 246, 0.15);
border-radius: 12px;
padding: 14px 20px;
margin-top: 12px;
}
.tips-block p, .tips-block li { color: #64748b !important; font-size: 0.82rem !important; }
/* ── Chatbot container ── */
.chatbot-wrap {
border: 1px solid rgba(139, 92, 246, 0.2) !important;
border-radius: 14px !important;
background: #0f1219 !important;
overflow: hidden !important;
}
"""
with gr.Blocks(
title="Kimi K2.6 Β· AI Reasoning Chatbot",
css=css,
theme=gr.themes.Base(
primary_hue="violet",
neutral_hue="slate",
font=gr.themes.GoogleFont("Inter"),
).set(
body_background_fill="#0d0f14",
body_text_color="#e2e8f0",
block_background_fill="#141824",
block_border_color="rgba(139,92,246,0.2)",
input_background_fill="#141824",
button_primary_background_fill="linear-gradient(135deg,#7c3aed,#5b21b6)",
button_primary_text_color="#ffffff",
),
) as demo:
# ── Header ─────────────────────────────────────────────────────────
gr.HTML("""
<div class="kimi-header">
<h1>πŸŒ™ Kimi K2.6</h1>
<p>Moonshot AI's 1-Trillion-Parameter Reasoning Model</p>
<span class="badge">⚑ Powered by Novita Inference API · 256K Context Window</span>
</div>
""")
# ── State ──────────────────────────────────────────────────────────
file_content_state = gr.State("")
# ── Chat window ────────────────────────────────────────────────────
chatbot = gr.Chatbot(
label="",
height=520,
show_label=False,
type="messages",
render_markdown=True,
bubble_full_width=False,
elem_classes=["chatbot-wrap"],
avatar_images=(
None,
"https://huggingface.co/moonshotai/Kimi-K2.6/resolve/main/figures/kimi-logo.png",
),
)
# ── Input row ──────────────────────────────────────────────────────
with gr.Row(equal_height=True):
msg = gr.Textbox(
label="",
placeholder="✦ Ask Kimi anything β€” coding, math, reasoning, analysis...",
lines=2,
max_lines=6,
scale=8,
show_label=False,
elem_classes=["input-wrap"],
container=False,
)
send_btn = gr.Button(
"Send πŸš€",
variant="primary",
scale=1,
min_width=100,
elem_id="send-btn",
)
# ── Settings accordion ─────────────────────────────────────────────
with gr.Accordion("βš™οΈ Settings & File Upload", open=False):
with gr.Row():
response_length = gr.Radio(
choices=["Short", "Medium", "Long"],
value="Medium",
label="πŸ“ Response Length",
info="Short Β· 512 tok Medium Β· 1024 tok Long Β· 4096 tok",
scale=2,
)
gr.HTML("<hr style='border-color:rgba(139,92,246,0.15);margin:12px 0;'>")
file_upload = gr.File(
label="πŸ“Ž Upload File for Analysis (PDF Β· Code Β· Text β€” max 5 MB)",
file_types=[".txt", ".pdf", ".py", ".js", ".ts",
".html", ".css", ".json", ".md", ".java",
".cpp", ".c", ".rs", ".go"],
type="filepath",
)
file_status = gr.Markdown("", elem_classes=["file-status"])
with gr.Row():
clear_file_btn = gr.Button(
"πŸ—‘οΈ Clear File",
size="sm",
variant="secondary",
elem_id="clear-file-btn",
)
clear_btn = gr.Button(
"🧹 Clear Chat",
size="sm",
variant="stop",
elem_id="clear-chat-btn",
)
clear_status = gr.Markdown("")
# ── Tips footer ────────────────────────────────────────────────────
gr.HTML("""
<div class="tips-block">
<b style="color:#7c3aed;">πŸ’‘ Tips</b>
<ul style="margin:6px 0 0 18px;padding:0;">
<li>Upload a PDF or code file, then ask Kimi to summarize, review, or debug it.</li>
<li>Use <b>Long</b> mode for complex coding or multi-step math problems.</li>
<li>Kimi's internal <b>Reasoning</b> block (if shown) reveals step-by-step thinking.</li>
<li>Press <b>Enter</b> or click <b>Send</b> to submit your message.</li>
</ul>
</div>
""")
# ── Event handler functions ────────────────────────────────────────
def handle_file_upload(file):
content, status = process_uploaded_file(file)
return content, status
def clear_file():
return None, "", ""
def clear_conversation():
result = clear_memory()
return [], result
# ── Wire up events ─────────────────────────────────────────────────
file_upload.change(
fn=handle_file_upload,
inputs=[file_upload],
outputs=[file_content_state, file_status],
)
clear_file_btn.click(
fn=clear_file,
outputs=[file_upload, file_content_state, file_status],
)
send_btn.click(
fn=chat_engine,
inputs=[msg, chatbot, response_length, file_content_state],
outputs=[chatbot],
api_name="chat",
)
msg.submit(
fn=chat_engine,
inputs=[msg, chatbot, response_length, file_content_state],
outputs=[chatbot],
api_name="chat",
)
clear_btn.click(
fn=clear_conversation,
outputs=[chatbot, clear_status],
api_name="clear_memory",
)
return demo
# ==================== LAUNCH ====================
if __name__ == "__main__":
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
)