MOSS-VL / app.py
huazzeng's picture
Release current version
6a72916
import os
import ctypes
import site
# nvidia-npp-cu12 installs libnppicc.so.12 inside site-packages/nvidia/npp/lib/,
# which is not on LD_LIBRARY_PATH. Load it globally before torchcodec is imported
# so the dynamic linker can resolve it when torchcodec dlopen's its shared libs.
def _preload_npp():
for _sp in site.getsitepackages():
_p = os.path.join(_sp, "nvidia", "npp", "lib", "libnppicc.so.12")
if os.path.exists(_p):
ctypes.CDLL(_p, mode=ctypes.RTLD_GLOBAL)
return
_preload_npp()
import queue
import uuid
import traceback
import threading
import gradio as gr
import torch
from transformers import AutoModelForCausalLM, AutoProcessor
import modelscope_studio.components.antd as antd
import modelscope_studio.components.antdx as antdx
import modelscope_studio.components.base as ms
import modelscope_studio.components.pro as pro
try:
import spaces
HAS_SPACES = True
except ImportError:
HAS_SPACES = False
# ---------------------------------------------------------------------------
# Model
# ---------------------------------------------------------------------------
MODEL_ID = "OpenMOSS-Team/MOSS-VL-Instruct-0408"
print("Loading processor...")
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True)
print("Loading model...")
try:
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="auto",
attn_implementation="flash_attention_2",
)
except Exception:
model = AutoModelForCausalLM.from_pretrained(
MODEL_ID,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="auto",
attn_implementation="sdpa",
)
model.eval()
print("Model ready.")
# ---------------------------------------------------------------------------
# Theme (Ant Design token — matches Qwen style but in MOSS green accent)
# ---------------------------------------------------------------------------
THEME = {
"token": {
"colorPrimary": "#4f7c6a",
}
}
# ---------------------------------------------------------------------------
# Welcome screen config
# ---------------------------------------------------------------------------
def welcome_config():
return {
"title": "MOSS-VL",
"description": "Multimodal vision-language model. Upload an image or video and ask anything.",
"icon": "asserts/cleaned_small_logo.png",
"elem_style": {
"maxWidth": "960px",
"margin": "40px auto 0",
"width": "100%",
"textAlign": "center",
},
"prompts": {
"title": "What can I help with?",
"elem_style": {
"width": "100%",
"display": "flex",
"flexWrap": "wrap",
"gap": "12px",
"justifyContent": "center",
"alignItems": "stretch",
},
"styles": {
"title": {
"width": "100%",
"textAlign": "center",
"marginBottom": "6px",
"fontSize": "14px",
},
"item": {
"flex": "1 1 0",
"maxWidth": "420px",
"minWidth": "280px",
},
},
"items": [
{
"label": "🖼️ Image Perception",
"children": [
{
"label": "Image Caption",
"children": [
{"label": "", "description": "请详细描述这张图片的内容。"},
{"label": "", "description": "Describe this image in detail."},
],
},
{
"label": "Multi-Image Caption",
"children": [
{"label": "", "description": "这几张图片分别是什么?请逐一详细说明。"},
{"label": "", "description": "What are these pictures? Please explain in detail one by one."},
],
},
],
},
{
"label": "📄 OCR / Document",
"children": [
{
"label": "OCR",
"children": [
{"label": "", "description": "提取图片中的所有文字。"},
{"label": "", "description": "Extract all text in the image."},
],
},
{
"label": "Document Parsing",
"children": [
{"label": "", "description": "将文档转换为 Markdown 格式。"},
{"label": "", "description": "Convert this document to Markdown."},
],
},
],
},
{
"label": "🎬 Video Understanding",
"children": [
{
"label": "Video Caption",
"children": [
{"label": "", "description": "请描述这个视频的内容。"},
{"label": "", "description": "Describe this video."},
],
},
{
"label": "Temporal Grounding",
"children": [
{"label": "", "description": "观看此视频并确定主要的叙事片段。对于每个不同的时间块,提供时间戳并描述发生了什么。"},
{"label": "", "description": "Watch this video and identify the main narrative segments. For each distinct time block, provide the timestamps and describe what happens."},
],
},
],
},
],
},
}
def user_config():
return {
"actions": ["edit", "delete"],
}
def bot_config(disabled_actions=None):
actions = ["copy", "retry", "delete"]
if disabled_actions:
actions = [a for a in actions if a not in disabled_actions]
return {
"avatar": _logo_url,
"header": "MOSS-VL",
"actions": actions,
}
def _file_path(f) -> str:
"""Extract real filesystem path from either a plain string or a Gradio file dict."""
if isinstance(f, str):
return f
if isinstance(f, dict):
return f.get("path") or f.get("name") or ""
return ""
# ---------------------------------------------------------------------------
# Inference (multi-turn — yields loading placeholder then final reply)
# ---------------------------------------------------------------------------
_VIDEO_EXTENSIONS = frozenset({".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"})
def _build_model_messages(history):
"""Convert pro.Chatbot history to the model's multi-turn message format.
User turns become ``[{type: image, image: path}, {type: text, text: ...}]``.
Assistant turns become plain strings. Loading placeholders are skipped.
"""
model_messages = []
for msg in history:
if msg.get("loading"):
continue
role = msg["role"]
if role == "user":
content_parts = []
for part in msg.get("content", []):
if part["type"] == "file":
for f in (part.get("content") or []):
path = _file_path(f)
if path and os.path.exists(path):
ext = os.path.splitext(path)[1].lower()
if ext in _VIDEO_EXTENSIONS:
content_parts.append({"type": "video", "video": path})
else:
content_parts.append({"type": "image", "image": path})
elif part["type"] == "text":
t = part.get("content", "")
if t.strip():
content_parts.append({"type": "text", "text": t})
if content_parts:
model_messages.append({"role": "user", "content": content_parts})
elif role == "assistant":
text_parts = []
for part in msg.get("content", []):
if isinstance(part, dict) and part.get("type") == "text":
text_parts.append(part.get("content", ""))
text = "\n".join(text_parts).strip()
if text:
model_messages.append({"role": "assistant", "content": text})
return model_messages
# Media defaults matching the official inference reference
_IMAGE_MEDIA_DEFAULTS = {
"min_pixels": 4096,
"max_pixels": 16777216,
"multi_image_max_pixels": 201326592,
"patch_size": 16,
"temporal_patch_size": 1,
"merge_size": 2,
"image_mean": [0.5, 0.5, 0.5],
"image_std": [0.5, 0.5, 0.5],
}
_VIDEO_MEDIA_DEFAULTS = {
"min_pixels": 4096,
"max_pixels": 16777216,
"video_max_pixels": 201326592,
"patch_size": 16,
"temporal_patch_size": 1,
"merge_size": 2,
"video_fps": 1.0,
"min_frames": 1,
"max_frames": 256,
"num_extract_threads": 4,
"image_mean": [0.5, 0.5, 0.5],
"image_std": [0.5, 0.5, 0.5],
}
def _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256):
"""
messages: list of history dicts in pro.Chatbot format.
The caller must have already appended an assistant bubble as the last item.
Yields: (updated history list, new_last_image_path)
"""
history = list(messages) if messages else []
# Last item is the pre-created assistant bubble; user message is second-to-last
user_msg = None
for msg in reversed(history[:-1]):
if msg["role"] == "user":
user_msg = msg
break
if user_msg is None:
return
text = ""
new_image = None
for part in user_msg.get("content", []):
if part["type"] == "text":
text = part["content"]
elif part["type"] == "file":
files = part["content"]
if files:
new_image = _file_path(files[0])
if new_image and os.path.exists(new_image):
last_image_path = new_image
if not text.strip():
history[-1]["loading"] = False
history[-1]["content"] = [{"type": "text", "content": "⚠️ Please enter a prompt."}]
yield history, last_image_path
return
# Yield loading bubble immediately before heavy model work
yield history, last_image_path
try:
model_messages = _build_model_messages(history[:-1])
# Detect media types to pick correct defaults
has_image = any(
p.get("type") == "image"
for m in model_messages
for p in (m["content"] if isinstance(m["content"], list) else [])
)
has_video = any(
p.get("type") == "video"
for m in model_messages
for p in (m["content"] if isinstance(m["content"], list) else [])
)
media_kwargs = {}
if has_image:
media_kwargs.update(_IMAGE_MEDIA_DEFAULTS)
if has_video:
media_kwargs.update({**_VIDEO_MEDIA_DEFAULTS, "video_fps": float(video_fps), "max_frames": int(max_frames)})
do_sample = temperature > 0.0
query = {
"messages": model_messages,
"media_kwargs": media_kwargs,
"generate_kwargs": {
"max_new_tokens": int(max_new_tokens),
"temperature": float(temperature),
"top_k": 50,
"top_p": float(top_p),
"repetition_penalty": float(repetition_penalty),
"do_sample": do_sample,
"vision_chunked_length": 64,
},
}
# Use the official offline_generate streaming API (queue-based)
in_q: "queue.Queue[dict]" = queue.Queue()
out_q: "queue.Queue[str]" = queue.Queue()
worker = threading.Thread(
target=model.offline_generate,
args=(processor, in_q, out_q),
kwargs={"vision_chunked_length": 64},
daemon=True,
)
worker.start()
in_q.put(dict(query))
partial_text = ""
try:
while True:
token = out_q.get(timeout=300)
if token == "<|round_start|>":
continue
if token == "<|round_end|>":
break
if token.startswith("[ERROR] "):
raise RuntimeError(token)
partial_text += token
history[-1]["loading"] = False
history[-1]["content"] = [{"type": "text", "content": partial_text + "▋"}]
yield history, last_image_path
finally:
in_q.put({"stop_offline_generate": True})
worker.join(timeout=30.0)
if partial_text:
history[-1]["content"] = [{"type": "text", "content": partial_text}]
except torch.cuda.OutOfMemoryError:
history[-1]["loading"] = False
history[-1]["content"] = [{"type": "text", "content": "❌ Out of memory — try a smaller image or fewer Max New Tokens."}]
except Exception:
history[-1]["loading"] = False
history[-1]["content"] = [{"type": "text", "content": f"❌ Error:\n```\n{traceback.format_exc()}\n```"}]
yield history, last_image_path
if HAS_SPACES:
@spaces.GPU(duration=120)
def run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256):
yield from _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path, video_fps, max_frames)
else:
def run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256):
yield from _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path, video_fps, max_frames)
# ---------------------------------------------------------------------------
# CSS
# ---------------------------------------------------------------------------
CSS = """
/* Use 100vh (absolute) so body.offsetHeight = viewport height.
iFrameResizer reads offsetHeight — this prevents it from expanding
the iframe beyond the viewport and making the outer page scroll. */
html {
height: 100vh !important;
overflow: hidden !important;
}
body {
height: 100vh !important;
overflow: hidden !important;
}
.gradio-container {
padding: 0 !important;
height: 100vh !important;
overflow: hidden !important;
}
.gradio-container > main.fillable {
padding: 0 !important;
height: 100vh !important;
overflow: hidden !important;
}
footer {
display: none !important;
}
/* Height locked via JS-set --app-height to avoid iframe 100vh feedback loop */
#chatbot {
height: var(--app-height, 780px);
max-height: var(--app-height, 780px);
}
/* Propagate fixed height through any wrapper divs down to the ant-col children */
#chatbot > *,
#chatbot .ant-row,
#chatbot .ant-col {
height: 100% !important;
}
/* Gradio injects extra wrapper divs between ant-col and chatbot-chat; propagate height */
#chatbot .ant-col > div {
height: 100% !important;
}
/* Sidebar col: full-height gray background, override antd gutter padding */
#chatbot .sidebar-col {
height: 100% !important;
background-color: var(--ms-gr-ant-color-bg-layout) !important;
padding-left: 0 !important;
padding-right: 0 !important;
}
#chatbot .chatbot-conversations {
height: 100%;
background-color: var(--ms-gr-ant-color-bg-layout);
padding-left: 4px;
padding-right: 4px;
overflow-y: auto;
}
#chatbot .chatbot-conversations .chatbot-conversations-list {
padding-left: 0;
padding-right: 0;
}
#chatbot .chatbot-chat {
padding: 32px;
padding-top: 64px;
padding-bottom: 24px;
height: 100%;
display: flex;
flex-direction: column;
overflow: hidden;
}
@media (max-width: 768px) {
#chatbot .chatbot-chat {
padding: 10px;
padding-bottom: 16px;
}
}
#chatbot .chatbot-chat .chatbot-chat-messages {
flex: 1;
min-height: 0;
overflow-y: auto;
}
#chatbot .chatbot-chat .chatbot-chat-messages > div {
height: 100% !important;
display: flex !important;
flex-direction: column !important;
}
/* Vertically center welcome content only (safe — won't break scroll when messages exist) */
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages {
display: flex;
flex-direction: column;
}
/* Equal-height top-level cards */
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-items {
display: flex !important;
align-items: stretch !important;
}
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item {
display: flex !important;
flex-direction: column !important;
height: auto !important;
flex: 1 1 0 !important;
}
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item > * {
flex: 1;
display: flex;
flex-direction: column;
height: 100%;
}
/* Sub-group rows within each card */
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-items {
display: flex !important;
flex-direction: column !important;
align-items: stretch !important;
flex: 1;
height: 100%;
}
/* Sub-groups (level 2) */
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-item {
flex: 1 1 0 !important;
display: flex !important;
flex-direction: column !important;
box-sizing: border-box !important;
}
/* Leaf prompt buttons (level 3): smaller font and compact height */
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-item .ant-prompts-item {
flex: 1 1 0 !important;
height: auto !important;
display: flex !important;
align-items: center !important;
padding: 4px 8px !important;
box-sizing: border-box !important;
font-size: 11px !important;
line-height: 1.4 !important;
}
/* Sub-group label — smaller font */
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-title {
font-size: 11px !important;
opacity: 0.65;
margin-bottom: 4px !important;
padding: 0 !important;
}
/* Make \n in description render as real line breaks */
.ant-prompts-item-description {
white-space: pre-wrap !important;
}
/* Welcome header: icon stacked above title */
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome {
display: flex !important;
flex-direction: column !important;
align-items: center !important;
}
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-icon {
font-size: 80px !important;
margin-bottom: 8px !important;
margin-inline-end: 0 !important;
}
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-icon img {
width: 80px !important;
height: 80px !important;
}
#chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-title {
font-size: 36px !important;
}
/* Bot avatar: no circle crop, transparent-friendly */
#chatbot .ant-avatar {
border-radius: 0 !important;
background: transparent !important;
border: none !important;
box-shadow: none !important;
}
#chatbot .ant-avatar img {
border-radius: 0 !important;
object-fit: contain !important;
}
"""
# ---------------------------------------------------------------------------
# UI
# ---------------------------------------------------------------------------
_ROOT_PATH = os.environ.get("GRADIO_ROOT_PATH", "").rstrip("/")
_ASSETS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "asserts")
_LOGO_PATH = os.path.join(_ASSETS_DIR, "pure_logo.png")
_logo_url = "https://huggingface.co/spaces/OpenMOSS-Team/MOSS-VL/resolve/main/asserts/pure_logo.png"
# One-shot snapshot of window.innerHeight → --app-height.
# Reads once after iFrameResizer has set the initial iframe size, then
# NEVER updates. This breaks the feedback loop where iFrameResizer grows
# the iframe in response to content height and our JS keeps chasing it.
_SYNC_HEIGHT_JS = """
() => {
let attempts = 0;
const snapshot = () => {
const h = window.innerHeight;
// Only accept plausible values (iframe default is 150px).
if (h > 500) {
document.documentElement.style.setProperty('--app-height', h + 'px');
return; // one-shot: stop polling, never listen for resize
}
// Poll every 50ms up to 2 seconds; after that let CSS fallback (780px) take over.
if (attempts++ < 40) {
setTimeout(snapshot, 50);
}
};
snapshot();
}
"""
# Per-row height equalization for the 3-column welcome prompt grid.
# Structure assumed: 3 top-level column items, each with 4 leaf items (2 groups × 2 leaves).
# Columns are identified as prompts-items that are NOT nested inside another prompts-item.
# Then for each row index 0-3, we equalize min-height across the 3 columns.
_EQUALIZE_ROWS_JS = """
() => {
const fix = () => {
const all = [...document.querySelectorAll('[class*="prompts-item"]')];
if (all.length < 12) { setTimeout(fix, 400); return; }
// Top-level column items: not contained in any other prompts-item
const cols = all.filter(el => !el.parentElement.closest('[class*="prompts-item"]'));
if (cols.length !== 3) { setTimeout(fix, 400); return; }
// For each column collect leaf items (no nested prompts-item) in DOM order
const colLeaves = cols.map(col =>
[...col.querySelectorAll('[class*="prompts-item"]')]
.filter(el => !el.querySelector('[class*="prompts-item"]'))
);
if (!colLeaves.every(l => l.length === 4)) { setTimeout(fix, 400); return; }
// Check all items have rendered height
if (colLeaves.flat().some(el => el.getBoundingClientRect().height < 5)) {
setTimeout(fix, 400); return;
}
// Equalize row by row
for (let r = 0; r < 4; r++) {
const row = colLeaves.map(col => col[r]);
const maxH = Math.max(...row.map(el => el.getBoundingClientRect().height));
row.forEach(el => { el.style.minHeight = maxH + 'px'; });
}
};
setTimeout(fix, 1500);
}
"""
with gr.Blocks(css=CSS, fill_width=True, title="MOSS-VL Demo") as demo:
# Generation settings (shared state)
gen_settings = gr.State({
"max_new_tokens": 512,
"temperature": 0.0,
"top_p": 1.0,
"repetition_penalty": 1.0,
})
# Conversation state
state = gr.State({
"conversation_contexts": {}, # id -> {"history": [...]}
"conversations": [], # [{key, label}, ...]
"conversation_id": "",
})
with ms.Application(), antdx.XProvider(theme=THEME):
with antd.Row(gutter=[20, 20], wrap=False, elem_id="chatbot"):
# ── LEFT SIDEBAR ──
with antd.Col(
md=dict(flex="0 0 260px", span=24, order=0),
span=0,
order=1,
elem_style=dict(width=0),
elem_classes="sidebar-col",
) as sidebar_col:
with ms.Div(elem_classes="chatbot-conversations"):
with antd.Flex(vertical=True, gap="small", elem_style=dict(height="100%")):
# Logo
gr.HTML(
f'<div style="display:flex;align-items:center;justify-content:center;'
f'gap:8px;padding:8px;white-space:nowrap;">'
f'<img src="{_logo_url}" '
f'style="width:40px;height:40px;object-fit:contain;display:block;" />'
f'<span style="font-size:22px;font-weight:600;line-height:1;">MOSS-VL</span>'
f'</div>'
)
# New conversation button
with antd.Button(
value=None,
color="primary",
variant="filled",
block=True,
) as add_conv_btn:
ms.Text("New Conversation")
with ms.Slot("icon"):
antd.Icon("PlusOutlined")
# Conversation list
with antdx.Conversations(
elem_classes="chatbot-conversations-list",
) as conversations:
with ms.Slot("menu.items"):
with antd.Menu.Item(
label="Delete", key="delete", danger=True
) as conv_delete_item:
with ms.Slot("icon"):
antd.Icon("DeleteOutlined")
# Settings accordion at bottom of sidebar
with antd.Collapse(ghost=True):
with antd.Collapse.Item(
label="⚙ Generation Settings",
key="settings",
):
max_new_tokens = gr.Slider(64, 8192, value=4096, step=64, label="Max New Tokens")
temperature = gr.Slider(0.0, 1.5, value=0.5, step=0.05, label="Temperature")
top_p = gr.Slider(0.1, 1.0, value=1.0, step=0.05, label="Top-p")
repetition_penalty = gr.Slider(1.0, 2.0, value=1.05, step=0.05, label="Repetition Penalty")
with antd.Collapse.Item(
label="🎬 Video Sampling",
key="video",
):
video_fps = gr.Slider(0.1, 4.0, value=1.0, step=0.1, label="FPS")
max_frames = gr.Slider(8, 512, value=256, step=8, label="Max Frames")
# ── MAIN CHAT AREA ──
with antd.Col(flex=1, elem_style=dict(height="100%")):
with antd.Flex(
vertical=True,
gap="small",
elem_classes="chatbot-chat",
):
# Chatbot
chatbot = pro.Chatbot(
elem_classes="chatbot-chat-messages",
height=0,
welcome_config=welcome_config(),
user_config=user_config(),
bot_config=bot_config(),
)
# Multimodal input (built-in + button for attachments)
with pro.MultimodalInput(
placeholder="Message MOSS-VL…",
upload_config={
"accept": "image/*,video/*",
"multiple": False,
},
) as chat_input:
with ms.Slot("prefix"):
with antd.Flex(gap=4, wrap=True):
with antd.Button(value=None, type="text") as clear_btn:
with ms.Slot("icon"):
antd.Icon("ClearOutlined")
# ── EVENT HANDLERS ──
def preprocess(state_value, clear_input=True):
history = state_value["conversation_contexts"].get(
state_value["conversation_id"], {}
).get("history", [])
updates = {
conversations: gr.update(
active_key=state_value["conversation_id"],
items=[{**c, "disabled": c["key"] != state_value["conversation_id"]}
for c in state_value["conversations"]],
),
add_conv_btn: gr.update(disabled=True),
clear_btn: gr.update(disabled=True),
conv_delete_item: gr.update(disabled=True),
chatbot: gr.update(
value=history,
bot_config=bot_config(disabled_actions=["retry", "edit", "delete"]),
user_config={"actions": []},
),
state: gr.update(value=state_value),
}
if clear_input:
updates[chat_input] = gr.update(value=None, loading=True)
else:
updates[chat_input] = gr.update(loading=True)
return updates
def postprocess(state_value):
history = state_value["conversation_contexts"].get(
state_value["conversation_id"], {}
).get("history", [])
return {
chat_input: gr.update(loading=False),
conv_delete_item: gr.update(disabled=False),
clear_btn: gr.update(disabled=False),
conversations: gr.update(items=state_value["conversations"]),
add_conv_btn: gr.update(disabled=False),
chatbot: gr.update(
value=history,
bot_config=bot_config(),
user_config=user_config(),
),
state: gr.update(value=state_value),
}
def add_user_message(input_value, state_value):
text = input_value.get("text", "") if input_value else ""
files = input_value.get("files", []) if input_value else []
persistent_files = [_file_path(f) for f in files]
if not state_value["conversation_id"]:
conv_id = str(uuid.uuid4())
state_value["conversation_id"] = conv_id
state_value["conversations"].append({"label": text[:40] or "New Chat", "key": conv_id})
state_value["conversation_contexts"][conv_id] = {"history": [], "last_image_path": None}
ctx = state_value["conversation_contexts"][state_value["conversation_id"]]
history = ctx["history"]
history.append({
"key": str(uuid.uuid4()),
"role": "user",
"content": [
{"type": "file", "content": persistent_files},
{"type": "text", "content": text},
],
})
history.append({
"key": str(uuid.uuid4()),
"role": "assistant",
"header": "MOSS-VL",
"loading": True,
"content": [{"type": "text", "content": ""}],
})
return preprocess(state_value, clear_input=True)
def generate_response(state_value, max_tok, temp, top_p_, rep_pen, v_fps, v_max_frames):
conv_id = state_value.get("conversation_id", "")
if not conv_id or conv_id not in state_value.get("conversation_contexts", {}):
return
ctx = state_value["conversation_contexts"][conv_id]
history = ctx["history"]
last_img = ctx.get("last_image_path")
for updated_history, new_last_img in run_generate(
history, False, max_tok, temp, top_p_, rep_pen, last_img, v_fps, v_max_frames
):
ctx["history"] = updated_history
ctx["last_image_path"] = new_last_img
yield updated_history, state_value
def apply_welcome_prompt(e: gr.EventData, input_value):
if input_value is None:
input_value = {}
input_value["text"] = e._data["payload"][0]["value"]["description"]
return gr.update(value=input_value)
def new_chat(state_value):
if not state_value["conversation_id"]:
return gr.skip()
state_value["conversation_id"] = ""
return (
gr.update(active_key=""),
gr.update(value=None),
gr.update(value=state_value),
)
def select_conversation(state_value, e: gr.EventData):
key = e._data["payload"][0]
if state_value["conversation_id"] == key or key not in state_value["conversation_contexts"]:
return gr.skip()
state_value["conversation_id"] = key
history = state_value["conversation_contexts"][key]["history"]
return (
gr.update(active_key=key),
gr.update(value=history),
gr.update(value=state_value),
)
def conversation_menu(state_value, e: gr.EventData):
conv_id = e._data["payload"][0]["key"]
operation = e._data["payload"][1]["key"]
if operation == "delete":
del state_value["conversation_contexts"][conv_id]
state_value["conversations"] = [
c for c in state_value["conversations"] if c["key"] != conv_id
]
if state_value["conversation_id"] == conv_id:
state_value["conversation_id"] = ""
return (
gr.update(items=state_value["conversations"], active_key=""),
gr.update(value=None),
gr.update(value=state_value),
)
else:
return (
gr.update(items=state_value["conversations"]),
gr.skip(),
gr.update(value=state_value),
)
return gr.skip()
def clear_history(state_value):
if not state_value["conversation_id"]:
return gr.skip()
state_value["conversation_contexts"][state_value["conversation_id"]]["history"] = []
return gr.update(value=None), gr.update(value=state_value)
def prepare_retry(state_value, e: gr.EventData):
index = e._data["payload"][0]["index"]
ctx = state_value["conversation_contexts"][state_value["conversation_id"]]
ctx["history"] = ctx["history"][:index]
ctx["history"].append({
"key": str(uuid.uuid4()),
"role": "assistant",
"header": "MOSS-VL",
"loading": True,
"content": [{"type": "text", "content": ""}],
})
return preprocess(state_value, clear_input=False)
def delete_message(state_value, e: gr.EventData):
index = e._data["payload"][0]["index"]
history = state_value["conversation_contexts"][state_value["conversation_id"]]["history"]
history.pop(index)
return gr.update(value=state_value)
def handle_edit(state_value, e: gr.EventData):
payload = e._data["payload"][0]
index = payload["index"]
ctx = state_value["conversation_contexts"][state_value["conversation_id"]]
# Extract new text from the edited content
new_content = payload.get("value", "")
if isinstance(new_content, list):
# content is a list of parts — extract text
new_text = " ".join(
p.get("content", "") or p.get("text", "")
for p in new_content
if isinstance(p, dict) and p.get("type") == "text"
)
elif isinstance(new_content, str):
new_text = new_content
else:
new_text = ""
# Update the user message at index with the new text, keep files intact
original_msg = ctx["history"][index]
new_parts = []
for part in original_msg.get("content", []):
if part.get("type") == "file":
new_parts.append(part)
elif part.get("type") == "text":
new_parts.append({"type": "text", "content": new_text})
if not any(p.get("type") == "text" for p in new_parts):
new_parts.append({"type": "text", "content": new_text})
ctx["history"][index]["content"] = new_parts
# Drop everything after the edited message (old assistant reply + later turns)
ctx["history"] = ctx["history"][:index + 1]
# Append loading assistant bubble
ctx["history"].append({
"key": str(uuid.uuid4()),
"role": "assistant",
"header": "MOSS-VL",
"loading": True,
"content": [{"type": "text", "content": ""}],
})
return preprocess(state_value, clear_input=False)
# Wire events
ui_outputs = [
chat_input, conv_delete_item, clear_btn,
add_conv_btn, conversations, chatbot, state,
]
stream_outputs = [chatbot, state]
gen_settings = [max_new_tokens, temperature, top_p, repetition_penalty, video_fps, max_frames]
# Submit: add message → stream tokens → restore UI
submit_step1 = chat_input.submit(
fn=add_user_message,
inputs=[chat_input, state],
outputs=ui_outputs,
)
submit_step2 = submit_step1.then(
fn=generate_response,
inputs=[state] + gen_settings,
outputs=stream_outputs,
)
submit_step2.then(
fn=postprocess,
inputs=[state],
outputs=ui_outputs,
)
chat_input.cancel(
fn=postprocess,
inputs=[state],
outputs=ui_outputs,
cancels=[submit_step1, submit_step2],
queue=False,
)
chatbot.welcome_prompt_select(
fn=apply_welcome_prompt,
inputs=[chat_input],
outputs=[chat_input],
)
add_conv_btn.click(
fn=new_chat,
inputs=[state],
outputs=[conversations, chatbot, state],
)
conversations.active_change(
fn=select_conversation,
inputs=[state],
outputs=[conversations, chatbot, state],
)
conversations.menu_click(
fn=conversation_menu,
inputs=[state],
outputs=[conversations, chatbot, state],
)
clear_btn.click(
fn=clear_history,
inputs=[state],
outputs=[chatbot, state],
)
chatbot.delete(
fn=delete_message,
inputs=[state],
outputs=[state],
)
# Edit: update message → stream tokens → restore UI
edit_step1 = chatbot.edit(
fn=handle_edit,
inputs=[state],
outputs=ui_outputs,
)
edit_step2 = edit_step1.then(
fn=generate_response,
inputs=[state] + gen_settings,
outputs=stream_outputs,
)
edit_step2.then(
fn=postprocess,
inputs=[state],
outputs=ui_outputs,
)
# Retry: prepare → stream tokens → restore UI
retry_step1 = chatbot.retry(
fn=prepare_retry,
inputs=[state],
outputs=ui_outputs,
)
retry_step2 = retry_step1.then(
fn=generate_response,
inputs=[state] + gen_settings,
outputs=stream_outputs,
)
retry_step2.then(
fn=postprocess,
inputs=[state],
outputs=ui_outputs,
)
# Lock chatbot height to actual viewport height (avoids iframe 100vh loop)
demo.load(fn=None, inputs=None, outputs=None, js=_SYNC_HEIGHT_JS)
# Per-row height equalization for the welcome prompt grid
demo.load(fn=None, inputs=None, outputs=None, js=_EQUALIZE_ROWS_JS)
demo.queue(default_concurrency_limit=1, max_size=20)
# Mount asserts directory as /assets so logo can be served without going
# through gradio's cache validation (which rejects paths not in temp dir)
from fastapi.staticfiles import StaticFiles
demo.app.mount("/assets", StaticFiles(directory=_ASSETS_DIR), name="assets")
if __name__ == "__main__":
demo.launch(ssr_mode=False, root_path=_ROOT_PATH)