Spaces:
Running on Zero
Running on Zero
| import os | |
| import ctypes | |
| import site | |
| # nvidia-npp-cu12 installs libnppicc.so.12 inside site-packages/nvidia/npp/lib/, | |
| # which is not on LD_LIBRARY_PATH. Load it globally before torchcodec is imported | |
| # so the dynamic linker can resolve it when torchcodec dlopen's its shared libs. | |
| def _preload_npp(): | |
| for _sp in site.getsitepackages(): | |
| _p = os.path.join(_sp, "nvidia", "npp", "lib", "libnppicc.so.12") | |
| if os.path.exists(_p): | |
| ctypes.CDLL(_p, mode=ctypes.RTLD_GLOBAL) | |
| return | |
| _preload_npp() | |
| import queue | |
| import uuid | |
| import traceback | |
| import threading | |
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoProcessor | |
| import modelscope_studio.components.antd as antd | |
| import modelscope_studio.components.antdx as antdx | |
| import modelscope_studio.components.base as ms | |
| import modelscope_studio.components.pro as pro | |
| try: | |
| import spaces | |
| HAS_SPACES = True | |
| except ImportError: | |
| HAS_SPACES = False | |
| # --------------------------------------------------------------------------- | |
| # Model | |
| # --------------------------------------------------------------------------- | |
| MODEL_ID = "OpenMOSS-Team/MOSS-VL-Instruct-0408" | |
| print("Loading processor...") | |
| processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True) | |
| print("Loading model...") | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| trust_remote_code=True, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| attn_implementation="flash_attention_2", | |
| ) | |
| except Exception: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| trust_remote_code=True, | |
| torch_dtype=torch.bfloat16, | |
| device_map="auto", | |
| attn_implementation="sdpa", | |
| ) | |
| model.eval() | |
| print("Model ready.") | |
| # --------------------------------------------------------------------------- | |
| # Theme (Ant Design token — matches Qwen style but in MOSS green accent) | |
| # --------------------------------------------------------------------------- | |
| THEME = { | |
| "token": { | |
| "colorPrimary": "#4f7c6a", | |
| } | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Welcome screen config | |
| # --------------------------------------------------------------------------- | |
| def welcome_config(): | |
| return { | |
| "title": "MOSS-VL", | |
| "description": "Multimodal vision-language model. Upload an image or video and ask anything.", | |
| "icon": "asserts/cleaned_small_logo.png", | |
| "elem_style": { | |
| "maxWidth": "960px", | |
| "margin": "40px auto 0", | |
| "width": "100%", | |
| "textAlign": "center", | |
| }, | |
| "prompts": { | |
| "title": "What can I help with?", | |
| "elem_style": { | |
| "width": "100%", | |
| "display": "flex", | |
| "flexWrap": "wrap", | |
| "gap": "12px", | |
| "justifyContent": "center", | |
| "alignItems": "stretch", | |
| }, | |
| "styles": { | |
| "title": { | |
| "width": "100%", | |
| "textAlign": "center", | |
| "marginBottom": "6px", | |
| "fontSize": "14px", | |
| }, | |
| "item": { | |
| "flex": "1 1 0", | |
| "maxWidth": "420px", | |
| "minWidth": "280px", | |
| }, | |
| }, | |
| "items": [ | |
| { | |
| "label": "🖼️ Image Perception", | |
| "children": [ | |
| { | |
| "label": "Image Caption", | |
| "children": [ | |
| {"label": "", "description": "请详细描述这张图片的内容。"}, | |
| {"label": "", "description": "Describe this image in detail."}, | |
| ], | |
| }, | |
| { | |
| "label": "Multi-Image Caption", | |
| "children": [ | |
| {"label": "", "description": "这几张图片分别是什么?请逐一详细说明。"}, | |
| {"label": "", "description": "What are these pictures? Please explain in detail one by one."}, | |
| ], | |
| }, | |
| ], | |
| }, | |
| { | |
| "label": "📄 OCR / Document", | |
| "children": [ | |
| { | |
| "label": "OCR", | |
| "children": [ | |
| {"label": "", "description": "提取图片中的所有文字。"}, | |
| {"label": "", "description": "Extract all text in the image."}, | |
| ], | |
| }, | |
| { | |
| "label": "Document Parsing", | |
| "children": [ | |
| {"label": "", "description": "将文档转换为 Markdown 格式。"}, | |
| {"label": "", "description": "Convert this document to Markdown."}, | |
| ], | |
| }, | |
| ], | |
| }, | |
| { | |
| "label": "🎬 Video Understanding", | |
| "children": [ | |
| { | |
| "label": "Video Caption", | |
| "children": [ | |
| {"label": "", "description": "请描述这个视频的内容。"}, | |
| {"label": "", "description": "Describe this video."}, | |
| ], | |
| }, | |
| { | |
| "label": "Temporal Grounding", | |
| "children": [ | |
| {"label": "", "description": "观看此视频并确定主要的叙事片段。对于每个不同的时间块,提供时间戳并描述发生了什么。"}, | |
| {"label": "", "description": "Watch this video and identify the main narrative segments. For each distinct time block, provide the timestamps and describe what happens."}, | |
| ], | |
| }, | |
| ], | |
| }, | |
| ], | |
| }, | |
| } | |
| def user_config(): | |
| return { | |
| "actions": ["edit", "delete"], | |
| } | |
| def bot_config(disabled_actions=None): | |
| actions = ["copy", "retry", "delete"] | |
| if disabled_actions: | |
| actions = [a for a in actions if a not in disabled_actions] | |
| return { | |
| "avatar": _logo_url, | |
| "header": "MOSS-VL", | |
| "actions": actions, | |
| } | |
| def _file_path(f) -> str: | |
| """Extract real filesystem path from either a plain string or a Gradio file dict.""" | |
| if isinstance(f, str): | |
| return f | |
| if isinstance(f, dict): | |
| return f.get("path") or f.get("name") or "" | |
| return "" | |
| # --------------------------------------------------------------------------- | |
| # Inference (multi-turn — yields loading placeholder then final reply) | |
| # --------------------------------------------------------------------------- | |
| _VIDEO_EXTENSIONS = frozenset({".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"}) | |
| def _build_model_messages(history): | |
| """Convert pro.Chatbot history to the model's multi-turn message format. | |
| User turns become ``[{type: image, image: path}, {type: text, text: ...}]``. | |
| Assistant turns become plain strings. Loading placeholders are skipped. | |
| """ | |
| model_messages = [] | |
| for msg in history: | |
| if msg.get("loading"): | |
| continue | |
| role = msg["role"] | |
| if role == "user": | |
| content_parts = [] | |
| for part in msg.get("content", []): | |
| if part["type"] == "file": | |
| for f in (part.get("content") or []): | |
| path = _file_path(f) | |
| if path and os.path.exists(path): | |
| ext = os.path.splitext(path)[1].lower() | |
| if ext in _VIDEO_EXTENSIONS: | |
| content_parts.append({"type": "video", "video": path}) | |
| else: | |
| content_parts.append({"type": "image", "image": path}) | |
| elif part["type"] == "text": | |
| t = part.get("content", "") | |
| if t.strip(): | |
| content_parts.append({"type": "text", "text": t}) | |
| if content_parts: | |
| model_messages.append({"role": "user", "content": content_parts}) | |
| elif role == "assistant": | |
| text_parts = [] | |
| for part in msg.get("content", []): | |
| if isinstance(part, dict) and part.get("type") == "text": | |
| text_parts.append(part.get("content", "")) | |
| text = "\n".join(text_parts).strip() | |
| if text: | |
| model_messages.append({"role": "assistant", "content": text}) | |
| return model_messages | |
| # Media defaults matching the official inference reference | |
| _IMAGE_MEDIA_DEFAULTS = { | |
| "min_pixels": 4096, | |
| "max_pixels": 16777216, | |
| "multi_image_max_pixels": 201326592, | |
| "patch_size": 16, | |
| "temporal_patch_size": 1, | |
| "merge_size": 2, | |
| "image_mean": [0.5, 0.5, 0.5], | |
| "image_std": [0.5, 0.5, 0.5], | |
| } | |
| _VIDEO_MEDIA_DEFAULTS = { | |
| "min_pixels": 4096, | |
| "max_pixels": 16777216, | |
| "video_max_pixels": 201326592, | |
| "patch_size": 16, | |
| "temporal_patch_size": 1, | |
| "merge_size": 2, | |
| "video_fps": 1.0, | |
| "min_frames": 1, | |
| "max_frames": 256, | |
| "num_extract_threads": 4, | |
| "image_mean": [0.5, 0.5, 0.5], | |
| "image_std": [0.5, 0.5, 0.5], | |
| } | |
| def _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256): | |
| """ | |
| messages: list of history dicts in pro.Chatbot format. | |
| The caller must have already appended an assistant bubble as the last item. | |
| Yields: (updated history list, new_last_image_path) | |
| """ | |
| history = list(messages) if messages else [] | |
| # Last item is the pre-created assistant bubble; user message is second-to-last | |
| user_msg = None | |
| for msg in reversed(history[:-1]): | |
| if msg["role"] == "user": | |
| user_msg = msg | |
| break | |
| if user_msg is None: | |
| return | |
| text = "" | |
| new_image = None | |
| for part in user_msg.get("content", []): | |
| if part["type"] == "text": | |
| text = part["content"] | |
| elif part["type"] == "file": | |
| files = part["content"] | |
| if files: | |
| new_image = _file_path(files[0]) | |
| if new_image and os.path.exists(new_image): | |
| last_image_path = new_image | |
| if not text.strip(): | |
| history[-1]["loading"] = False | |
| history[-1]["content"] = [{"type": "text", "content": "⚠️ Please enter a prompt."}] | |
| yield history, last_image_path | |
| return | |
| # Yield loading bubble immediately before heavy model work | |
| yield history, last_image_path | |
| try: | |
| model_messages = _build_model_messages(history[:-1]) | |
| # Detect media types to pick correct defaults | |
| has_image = any( | |
| p.get("type") == "image" | |
| for m in model_messages | |
| for p in (m["content"] if isinstance(m["content"], list) else []) | |
| ) | |
| has_video = any( | |
| p.get("type") == "video" | |
| for m in model_messages | |
| for p in (m["content"] if isinstance(m["content"], list) else []) | |
| ) | |
| media_kwargs = {} | |
| if has_image: | |
| media_kwargs.update(_IMAGE_MEDIA_DEFAULTS) | |
| if has_video: | |
| media_kwargs.update({**_VIDEO_MEDIA_DEFAULTS, "video_fps": float(video_fps), "max_frames": int(max_frames)}) | |
| do_sample = temperature > 0.0 | |
| query = { | |
| "messages": model_messages, | |
| "media_kwargs": media_kwargs, | |
| "generate_kwargs": { | |
| "max_new_tokens": int(max_new_tokens), | |
| "temperature": float(temperature), | |
| "top_k": 50, | |
| "top_p": float(top_p), | |
| "repetition_penalty": float(repetition_penalty), | |
| "do_sample": do_sample, | |
| "vision_chunked_length": 64, | |
| }, | |
| } | |
| # Use the official offline_generate streaming API (queue-based) | |
| in_q: "queue.Queue[dict]" = queue.Queue() | |
| out_q: "queue.Queue[str]" = queue.Queue() | |
| worker = threading.Thread( | |
| target=model.offline_generate, | |
| args=(processor, in_q, out_q), | |
| kwargs={"vision_chunked_length": 64}, | |
| daemon=True, | |
| ) | |
| worker.start() | |
| in_q.put(dict(query)) | |
| partial_text = "" | |
| try: | |
| while True: | |
| token = out_q.get(timeout=300) | |
| if token == "<|round_start|>": | |
| continue | |
| if token == "<|round_end|>": | |
| break | |
| if token.startswith("[ERROR] "): | |
| raise RuntimeError(token) | |
| partial_text += token | |
| history[-1]["loading"] = False | |
| history[-1]["content"] = [{"type": "text", "content": partial_text + "▋"}] | |
| yield history, last_image_path | |
| finally: | |
| in_q.put({"stop_offline_generate": True}) | |
| worker.join(timeout=30.0) | |
| if partial_text: | |
| history[-1]["content"] = [{"type": "text", "content": partial_text}] | |
| except torch.cuda.OutOfMemoryError: | |
| history[-1]["loading"] = False | |
| history[-1]["content"] = [{"type": "text", "content": "❌ Out of memory — try a smaller image or fewer Max New Tokens."}] | |
| except Exception: | |
| history[-1]["loading"] = False | |
| history[-1]["content"] = [{"type": "text", "content": f"❌ Error:\n```\n{traceback.format_exc()}\n```"}] | |
| yield history, last_image_path | |
| if HAS_SPACES: | |
| def run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256): | |
| yield from _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path, video_fps, max_frames) | |
| else: | |
| def run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256): | |
| yield from _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path, video_fps, max_frames) | |
| # --------------------------------------------------------------------------- | |
| # CSS | |
| # --------------------------------------------------------------------------- | |
| CSS = """ | |
| /* Use 100vh (absolute) so body.offsetHeight = viewport height. | |
| iFrameResizer reads offsetHeight — this prevents it from expanding | |
| the iframe beyond the viewport and making the outer page scroll. */ | |
| html { | |
| height: 100vh !important; | |
| overflow: hidden !important; | |
| } | |
| body { | |
| height: 100vh !important; | |
| overflow: hidden !important; | |
| } | |
| .gradio-container { | |
| padding: 0 !important; | |
| height: 100vh !important; | |
| overflow: hidden !important; | |
| } | |
| .gradio-container > main.fillable { | |
| padding: 0 !important; | |
| height: 100vh !important; | |
| overflow: hidden !important; | |
| } | |
| footer { | |
| display: none !important; | |
| } | |
| /* Height locked via JS-set --app-height to avoid iframe 100vh feedback loop */ | |
| #chatbot { | |
| height: var(--app-height, 780px); | |
| max-height: var(--app-height, 780px); | |
| } | |
| /* Propagate fixed height through any wrapper divs down to the ant-col children */ | |
| #chatbot > *, | |
| #chatbot .ant-row, | |
| #chatbot .ant-col { | |
| height: 100% !important; | |
| } | |
| /* Gradio injects extra wrapper divs between ant-col and chatbot-chat; propagate height */ | |
| #chatbot .ant-col > div { | |
| height: 100% !important; | |
| } | |
| /* Sidebar col: full-height gray background, override antd gutter padding */ | |
| #chatbot .sidebar-col { | |
| height: 100% !important; | |
| background-color: var(--ms-gr-ant-color-bg-layout) !important; | |
| padding-left: 0 !important; | |
| padding-right: 0 !important; | |
| } | |
| #chatbot .chatbot-conversations { | |
| height: 100%; | |
| background-color: var(--ms-gr-ant-color-bg-layout); | |
| padding-left: 4px; | |
| padding-right: 4px; | |
| overflow-y: auto; | |
| } | |
| #chatbot .chatbot-conversations .chatbot-conversations-list { | |
| padding-left: 0; | |
| padding-right: 0; | |
| } | |
| #chatbot .chatbot-chat { | |
| padding: 32px; | |
| padding-top: 64px; | |
| padding-bottom: 24px; | |
| height: 100%; | |
| display: flex; | |
| flex-direction: column; | |
| overflow: hidden; | |
| } | |
| @media (max-width: 768px) { | |
| #chatbot .chatbot-chat { | |
| padding: 10px; | |
| padding-bottom: 16px; | |
| } | |
| } | |
| #chatbot .chatbot-chat .chatbot-chat-messages { | |
| flex: 1; | |
| min-height: 0; | |
| overflow-y: auto; | |
| } | |
| #chatbot .chatbot-chat .chatbot-chat-messages > div { | |
| height: 100% !important; | |
| display: flex !important; | |
| flex-direction: column !important; | |
| } | |
| /* Vertically center welcome content only (safe — won't break scroll when messages exist) */ | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages { | |
| display: flex; | |
| flex-direction: column; | |
| } | |
| /* Equal-height top-level cards */ | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-items { | |
| display: flex !important; | |
| align-items: stretch !important; | |
| } | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item { | |
| display: flex !important; | |
| flex-direction: column !important; | |
| height: auto !important; | |
| flex: 1 1 0 !important; | |
| } | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item > * { | |
| flex: 1; | |
| display: flex; | |
| flex-direction: column; | |
| height: 100%; | |
| } | |
| /* Sub-group rows within each card */ | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-items { | |
| display: flex !important; | |
| flex-direction: column !important; | |
| align-items: stretch !important; | |
| flex: 1; | |
| height: 100%; | |
| } | |
| /* Sub-groups (level 2) */ | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-item { | |
| flex: 1 1 0 !important; | |
| display: flex !important; | |
| flex-direction: column !important; | |
| box-sizing: border-box !important; | |
| } | |
| /* Leaf prompt buttons (level 3): smaller font and compact height */ | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-item .ant-prompts-item { | |
| flex: 1 1 0 !important; | |
| height: auto !important; | |
| display: flex !important; | |
| align-items: center !important; | |
| padding: 4px 8px !important; | |
| box-sizing: border-box !important; | |
| font-size: 11px !important; | |
| line-height: 1.4 !important; | |
| } | |
| /* Sub-group label — smaller font */ | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-title { | |
| font-size: 11px !important; | |
| opacity: 0.65; | |
| margin-bottom: 4px !important; | |
| padding: 0 !important; | |
| } | |
| /* Make \n in description render as real line breaks */ | |
| .ant-prompts-item-description { | |
| white-space: pre-wrap !important; | |
| } | |
| /* Welcome header: icon stacked above title */ | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome { | |
| display: flex !important; | |
| flex-direction: column !important; | |
| align-items: center !important; | |
| } | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-icon { | |
| font-size: 80px !important; | |
| margin-bottom: 8px !important; | |
| margin-inline-end: 0 !important; | |
| } | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-icon img { | |
| width: 80px !important; | |
| height: 80px !important; | |
| } | |
| #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-title { | |
| font-size: 36px !important; | |
| } | |
| /* Bot avatar: no circle crop, transparent-friendly */ | |
| #chatbot .ant-avatar { | |
| border-radius: 0 !important; | |
| background: transparent !important; | |
| border: none !important; | |
| box-shadow: none !important; | |
| } | |
| #chatbot .ant-avatar img { | |
| border-radius: 0 !important; | |
| object-fit: contain !important; | |
| } | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # UI | |
| # --------------------------------------------------------------------------- | |
| _ROOT_PATH = os.environ.get("GRADIO_ROOT_PATH", "").rstrip("/") | |
| _ASSETS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "asserts") | |
| _LOGO_PATH = os.path.join(_ASSETS_DIR, "pure_logo.png") | |
| _logo_url = "https://huggingface.co/spaces/OpenMOSS-Team/MOSS-VL/resolve/main/asserts/pure_logo.png" | |
| # One-shot snapshot of window.innerHeight → --app-height. | |
| # Reads once after iFrameResizer has set the initial iframe size, then | |
| # NEVER updates. This breaks the feedback loop where iFrameResizer grows | |
| # the iframe in response to content height and our JS keeps chasing it. | |
| _SYNC_HEIGHT_JS = """ | |
| () => { | |
| let attempts = 0; | |
| const snapshot = () => { | |
| const h = window.innerHeight; | |
| // Only accept plausible values (iframe default is 150px). | |
| if (h > 500) { | |
| document.documentElement.style.setProperty('--app-height', h + 'px'); | |
| return; // one-shot: stop polling, never listen for resize | |
| } | |
| // Poll every 50ms up to 2 seconds; after that let CSS fallback (780px) take over. | |
| if (attempts++ < 40) { | |
| setTimeout(snapshot, 50); | |
| } | |
| }; | |
| snapshot(); | |
| } | |
| """ | |
| # Per-row height equalization for the 3-column welcome prompt grid. | |
| # Structure assumed: 3 top-level column items, each with 4 leaf items (2 groups × 2 leaves). | |
| # Columns are identified as prompts-items that are NOT nested inside another prompts-item. | |
| # Then for each row index 0-3, we equalize min-height across the 3 columns. | |
| _EQUALIZE_ROWS_JS = """ | |
| () => { | |
| const fix = () => { | |
| const all = [...document.querySelectorAll('[class*="prompts-item"]')]; | |
| if (all.length < 12) { setTimeout(fix, 400); return; } | |
| // Top-level column items: not contained in any other prompts-item | |
| const cols = all.filter(el => !el.parentElement.closest('[class*="prompts-item"]')); | |
| if (cols.length !== 3) { setTimeout(fix, 400); return; } | |
| // For each column collect leaf items (no nested prompts-item) in DOM order | |
| const colLeaves = cols.map(col => | |
| [...col.querySelectorAll('[class*="prompts-item"]')] | |
| .filter(el => !el.querySelector('[class*="prompts-item"]')) | |
| ); | |
| if (!colLeaves.every(l => l.length === 4)) { setTimeout(fix, 400); return; } | |
| // Check all items have rendered height | |
| if (colLeaves.flat().some(el => el.getBoundingClientRect().height < 5)) { | |
| setTimeout(fix, 400); return; | |
| } | |
| // Equalize row by row | |
| for (let r = 0; r < 4; r++) { | |
| const row = colLeaves.map(col => col[r]); | |
| const maxH = Math.max(...row.map(el => el.getBoundingClientRect().height)); | |
| row.forEach(el => { el.style.minHeight = maxH + 'px'; }); | |
| } | |
| }; | |
| setTimeout(fix, 1500); | |
| } | |
| """ | |
| with gr.Blocks(css=CSS, fill_width=True, title="MOSS-VL Demo") as demo: | |
| # Generation settings (shared state) | |
| gen_settings = gr.State({ | |
| "max_new_tokens": 512, | |
| "temperature": 0.0, | |
| "top_p": 1.0, | |
| "repetition_penalty": 1.0, | |
| }) | |
| # Conversation state | |
| state = gr.State({ | |
| "conversation_contexts": {}, # id -> {"history": [...]} | |
| "conversations": [], # [{key, label}, ...] | |
| "conversation_id": "", | |
| }) | |
| with ms.Application(), antdx.XProvider(theme=THEME): | |
| with antd.Row(gutter=[20, 20], wrap=False, elem_id="chatbot"): | |
| # ── LEFT SIDEBAR ── | |
| with antd.Col( | |
| md=dict(flex="0 0 260px", span=24, order=0), | |
| span=0, | |
| order=1, | |
| elem_style=dict(width=0), | |
| elem_classes="sidebar-col", | |
| ) as sidebar_col: | |
| with ms.Div(elem_classes="chatbot-conversations"): | |
| with antd.Flex(vertical=True, gap="small", elem_style=dict(height="100%")): | |
| # Logo | |
| gr.HTML( | |
| f'<div style="display:flex;align-items:center;justify-content:center;' | |
| f'gap:8px;padding:8px;white-space:nowrap;">' | |
| f'<img src="{_logo_url}" ' | |
| f'style="width:40px;height:40px;object-fit:contain;display:block;" />' | |
| f'<span style="font-size:22px;font-weight:600;line-height:1;">MOSS-VL</span>' | |
| f'</div>' | |
| ) | |
| # New conversation button | |
| with antd.Button( | |
| value=None, | |
| color="primary", | |
| variant="filled", | |
| block=True, | |
| ) as add_conv_btn: | |
| ms.Text("New Conversation") | |
| with ms.Slot("icon"): | |
| antd.Icon("PlusOutlined") | |
| # Conversation list | |
| with antdx.Conversations( | |
| elem_classes="chatbot-conversations-list", | |
| ) as conversations: | |
| with ms.Slot("menu.items"): | |
| with antd.Menu.Item( | |
| label="Delete", key="delete", danger=True | |
| ) as conv_delete_item: | |
| with ms.Slot("icon"): | |
| antd.Icon("DeleteOutlined") | |
| # Settings accordion at bottom of sidebar | |
| with antd.Collapse(ghost=True): | |
| with antd.Collapse.Item( | |
| label="⚙ Generation Settings", | |
| key="settings", | |
| ): | |
| max_new_tokens = gr.Slider(64, 8192, value=4096, step=64, label="Max New Tokens") | |
| temperature = gr.Slider(0.0, 1.5, value=0.5, step=0.05, label="Temperature") | |
| top_p = gr.Slider(0.1, 1.0, value=1.0, step=0.05, label="Top-p") | |
| repetition_penalty = gr.Slider(1.0, 2.0, value=1.05, step=0.05, label="Repetition Penalty") | |
| with antd.Collapse.Item( | |
| label="🎬 Video Sampling", | |
| key="video", | |
| ): | |
| video_fps = gr.Slider(0.1, 4.0, value=1.0, step=0.1, label="FPS") | |
| max_frames = gr.Slider(8, 512, value=256, step=8, label="Max Frames") | |
| # ── MAIN CHAT AREA ── | |
| with antd.Col(flex=1, elem_style=dict(height="100%")): | |
| with antd.Flex( | |
| vertical=True, | |
| gap="small", | |
| elem_classes="chatbot-chat", | |
| ): | |
| # Chatbot | |
| chatbot = pro.Chatbot( | |
| elem_classes="chatbot-chat-messages", | |
| height=0, | |
| welcome_config=welcome_config(), | |
| user_config=user_config(), | |
| bot_config=bot_config(), | |
| ) | |
| # Multimodal input (built-in + button for attachments) | |
| with pro.MultimodalInput( | |
| placeholder="Message MOSS-VL…", | |
| upload_config={ | |
| "accept": "image/*,video/*", | |
| "multiple": False, | |
| }, | |
| ) as chat_input: | |
| with ms.Slot("prefix"): | |
| with antd.Flex(gap=4, wrap=True): | |
| with antd.Button(value=None, type="text") as clear_btn: | |
| with ms.Slot("icon"): | |
| antd.Icon("ClearOutlined") | |
| # ── EVENT HANDLERS ── | |
| def preprocess(state_value, clear_input=True): | |
| history = state_value["conversation_contexts"].get( | |
| state_value["conversation_id"], {} | |
| ).get("history", []) | |
| updates = { | |
| conversations: gr.update( | |
| active_key=state_value["conversation_id"], | |
| items=[{**c, "disabled": c["key"] != state_value["conversation_id"]} | |
| for c in state_value["conversations"]], | |
| ), | |
| add_conv_btn: gr.update(disabled=True), | |
| clear_btn: gr.update(disabled=True), | |
| conv_delete_item: gr.update(disabled=True), | |
| chatbot: gr.update( | |
| value=history, | |
| bot_config=bot_config(disabled_actions=["retry", "edit", "delete"]), | |
| user_config={"actions": []}, | |
| ), | |
| state: gr.update(value=state_value), | |
| } | |
| if clear_input: | |
| updates[chat_input] = gr.update(value=None, loading=True) | |
| else: | |
| updates[chat_input] = gr.update(loading=True) | |
| return updates | |
| def postprocess(state_value): | |
| history = state_value["conversation_contexts"].get( | |
| state_value["conversation_id"], {} | |
| ).get("history", []) | |
| return { | |
| chat_input: gr.update(loading=False), | |
| conv_delete_item: gr.update(disabled=False), | |
| clear_btn: gr.update(disabled=False), | |
| conversations: gr.update(items=state_value["conversations"]), | |
| add_conv_btn: gr.update(disabled=False), | |
| chatbot: gr.update( | |
| value=history, | |
| bot_config=bot_config(), | |
| user_config=user_config(), | |
| ), | |
| state: gr.update(value=state_value), | |
| } | |
| def add_user_message(input_value, state_value): | |
| text = input_value.get("text", "") if input_value else "" | |
| files = input_value.get("files", []) if input_value else [] | |
| persistent_files = [_file_path(f) for f in files] | |
| if not state_value["conversation_id"]: | |
| conv_id = str(uuid.uuid4()) | |
| state_value["conversation_id"] = conv_id | |
| state_value["conversations"].append({"label": text[:40] or "New Chat", "key": conv_id}) | |
| state_value["conversation_contexts"][conv_id] = {"history": [], "last_image_path": None} | |
| ctx = state_value["conversation_contexts"][state_value["conversation_id"]] | |
| history = ctx["history"] | |
| history.append({ | |
| "key": str(uuid.uuid4()), | |
| "role": "user", | |
| "content": [ | |
| {"type": "file", "content": persistent_files}, | |
| {"type": "text", "content": text}, | |
| ], | |
| }) | |
| history.append({ | |
| "key": str(uuid.uuid4()), | |
| "role": "assistant", | |
| "header": "MOSS-VL", | |
| "loading": True, | |
| "content": [{"type": "text", "content": ""}], | |
| }) | |
| return preprocess(state_value, clear_input=True) | |
| def generate_response(state_value, max_tok, temp, top_p_, rep_pen, v_fps, v_max_frames): | |
| conv_id = state_value.get("conversation_id", "") | |
| if not conv_id or conv_id not in state_value.get("conversation_contexts", {}): | |
| return | |
| ctx = state_value["conversation_contexts"][conv_id] | |
| history = ctx["history"] | |
| last_img = ctx.get("last_image_path") | |
| for updated_history, new_last_img in run_generate( | |
| history, False, max_tok, temp, top_p_, rep_pen, last_img, v_fps, v_max_frames | |
| ): | |
| ctx["history"] = updated_history | |
| ctx["last_image_path"] = new_last_img | |
| yield updated_history, state_value | |
| def apply_welcome_prompt(e: gr.EventData, input_value): | |
| if input_value is None: | |
| input_value = {} | |
| input_value["text"] = e._data["payload"][0]["value"]["description"] | |
| return gr.update(value=input_value) | |
| def new_chat(state_value): | |
| if not state_value["conversation_id"]: | |
| return gr.skip() | |
| state_value["conversation_id"] = "" | |
| return ( | |
| gr.update(active_key=""), | |
| gr.update(value=None), | |
| gr.update(value=state_value), | |
| ) | |
| def select_conversation(state_value, e: gr.EventData): | |
| key = e._data["payload"][0] | |
| if state_value["conversation_id"] == key or key not in state_value["conversation_contexts"]: | |
| return gr.skip() | |
| state_value["conversation_id"] = key | |
| history = state_value["conversation_contexts"][key]["history"] | |
| return ( | |
| gr.update(active_key=key), | |
| gr.update(value=history), | |
| gr.update(value=state_value), | |
| ) | |
| def conversation_menu(state_value, e: gr.EventData): | |
| conv_id = e._data["payload"][0]["key"] | |
| operation = e._data["payload"][1]["key"] | |
| if operation == "delete": | |
| del state_value["conversation_contexts"][conv_id] | |
| state_value["conversations"] = [ | |
| c for c in state_value["conversations"] if c["key"] != conv_id | |
| ] | |
| if state_value["conversation_id"] == conv_id: | |
| state_value["conversation_id"] = "" | |
| return ( | |
| gr.update(items=state_value["conversations"], active_key=""), | |
| gr.update(value=None), | |
| gr.update(value=state_value), | |
| ) | |
| else: | |
| return ( | |
| gr.update(items=state_value["conversations"]), | |
| gr.skip(), | |
| gr.update(value=state_value), | |
| ) | |
| return gr.skip() | |
| def clear_history(state_value): | |
| if not state_value["conversation_id"]: | |
| return gr.skip() | |
| state_value["conversation_contexts"][state_value["conversation_id"]]["history"] = [] | |
| return gr.update(value=None), gr.update(value=state_value) | |
| def prepare_retry(state_value, e: gr.EventData): | |
| index = e._data["payload"][0]["index"] | |
| ctx = state_value["conversation_contexts"][state_value["conversation_id"]] | |
| ctx["history"] = ctx["history"][:index] | |
| ctx["history"].append({ | |
| "key": str(uuid.uuid4()), | |
| "role": "assistant", | |
| "header": "MOSS-VL", | |
| "loading": True, | |
| "content": [{"type": "text", "content": ""}], | |
| }) | |
| return preprocess(state_value, clear_input=False) | |
| def delete_message(state_value, e: gr.EventData): | |
| index = e._data["payload"][0]["index"] | |
| history = state_value["conversation_contexts"][state_value["conversation_id"]]["history"] | |
| history.pop(index) | |
| return gr.update(value=state_value) | |
| def handle_edit(state_value, e: gr.EventData): | |
| payload = e._data["payload"][0] | |
| index = payload["index"] | |
| ctx = state_value["conversation_contexts"][state_value["conversation_id"]] | |
| # Extract new text from the edited content | |
| new_content = payload.get("value", "") | |
| if isinstance(new_content, list): | |
| # content is a list of parts — extract text | |
| new_text = " ".join( | |
| p.get("content", "") or p.get("text", "") | |
| for p in new_content | |
| if isinstance(p, dict) and p.get("type") == "text" | |
| ) | |
| elif isinstance(new_content, str): | |
| new_text = new_content | |
| else: | |
| new_text = "" | |
| # Update the user message at index with the new text, keep files intact | |
| original_msg = ctx["history"][index] | |
| new_parts = [] | |
| for part in original_msg.get("content", []): | |
| if part.get("type") == "file": | |
| new_parts.append(part) | |
| elif part.get("type") == "text": | |
| new_parts.append({"type": "text", "content": new_text}) | |
| if not any(p.get("type") == "text" for p in new_parts): | |
| new_parts.append({"type": "text", "content": new_text}) | |
| ctx["history"][index]["content"] = new_parts | |
| # Drop everything after the edited message (old assistant reply + later turns) | |
| ctx["history"] = ctx["history"][:index + 1] | |
| # Append loading assistant bubble | |
| ctx["history"].append({ | |
| "key": str(uuid.uuid4()), | |
| "role": "assistant", | |
| "header": "MOSS-VL", | |
| "loading": True, | |
| "content": [{"type": "text", "content": ""}], | |
| }) | |
| return preprocess(state_value, clear_input=False) | |
| # Wire events | |
| ui_outputs = [ | |
| chat_input, conv_delete_item, clear_btn, | |
| add_conv_btn, conversations, chatbot, state, | |
| ] | |
| stream_outputs = [chatbot, state] | |
| gen_settings = [max_new_tokens, temperature, top_p, repetition_penalty, video_fps, max_frames] | |
| # Submit: add message → stream tokens → restore UI | |
| submit_step1 = chat_input.submit( | |
| fn=add_user_message, | |
| inputs=[chat_input, state], | |
| outputs=ui_outputs, | |
| ) | |
| submit_step2 = submit_step1.then( | |
| fn=generate_response, | |
| inputs=[state] + gen_settings, | |
| outputs=stream_outputs, | |
| ) | |
| submit_step2.then( | |
| fn=postprocess, | |
| inputs=[state], | |
| outputs=ui_outputs, | |
| ) | |
| chat_input.cancel( | |
| fn=postprocess, | |
| inputs=[state], | |
| outputs=ui_outputs, | |
| cancels=[submit_step1, submit_step2], | |
| queue=False, | |
| ) | |
| chatbot.welcome_prompt_select( | |
| fn=apply_welcome_prompt, | |
| inputs=[chat_input], | |
| outputs=[chat_input], | |
| ) | |
| add_conv_btn.click( | |
| fn=new_chat, | |
| inputs=[state], | |
| outputs=[conversations, chatbot, state], | |
| ) | |
| conversations.active_change( | |
| fn=select_conversation, | |
| inputs=[state], | |
| outputs=[conversations, chatbot, state], | |
| ) | |
| conversations.menu_click( | |
| fn=conversation_menu, | |
| inputs=[state], | |
| outputs=[conversations, chatbot, state], | |
| ) | |
| clear_btn.click( | |
| fn=clear_history, | |
| inputs=[state], | |
| outputs=[chatbot, state], | |
| ) | |
| chatbot.delete( | |
| fn=delete_message, | |
| inputs=[state], | |
| outputs=[state], | |
| ) | |
| # Edit: update message → stream tokens → restore UI | |
| edit_step1 = chatbot.edit( | |
| fn=handle_edit, | |
| inputs=[state], | |
| outputs=ui_outputs, | |
| ) | |
| edit_step2 = edit_step1.then( | |
| fn=generate_response, | |
| inputs=[state] + gen_settings, | |
| outputs=stream_outputs, | |
| ) | |
| edit_step2.then( | |
| fn=postprocess, | |
| inputs=[state], | |
| outputs=ui_outputs, | |
| ) | |
| # Retry: prepare → stream tokens → restore UI | |
| retry_step1 = chatbot.retry( | |
| fn=prepare_retry, | |
| inputs=[state], | |
| outputs=ui_outputs, | |
| ) | |
| retry_step2 = retry_step1.then( | |
| fn=generate_response, | |
| inputs=[state] + gen_settings, | |
| outputs=stream_outputs, | |
| ) | |
| retry_step2.then( | |
| fn=postprocess, | |
| inputs=[state], | |
| outputs=ui_outputs, | |
| ) | |
| # Lock chatbot height to actual viewport height (avoids iframe 100vh loop) | |
| demo.load(fn=None, inputs=None, outputs=None, js=_SYNC_HEIGHT_JS) | |
| # Per-row height equalization for the welcome prompt grid | |
| demo.load(fn=None, inputs=None, outputs=None, js=_EQUALIZE_ROWS_JS) | |
| demo.queue(default_concurrency_limit=1, max_size=20) | |
| # Mount asserts directory as /assets so logo can be served without going | |
| # through gradio's cache validation (which rejects paths not in temp dir) | |
| from fastapi.staticfiles import StaticFiles | |
| demo.app.mount("/assets", StaticFiles(directory=_ASSETS_DIR), name="assets") | |
| if __name__ == "__main__": | |
| demo.launch(ssr_mode=False, root_path=_ROOT_PATH) | |