import os import ctypes import site # nvidia-npp-cu12 installs libnppicc.so.12 inside site-packages/nvidia/npp/lib/, # which is not on LD_LIBRARY_PATH. Load it globally before torchcodec is imported # so the dynamic linker can resolve it when torchcodec dlopen's its shared libs. def _preload_npp(): for _sp in site.getsitepackages(): _p = os.path.join(_sp, "nvidia", "npp", "lib", "libnppicc.so.12") if os.path.exists(_p): ctypes.CDLL(_p, mode=ctypes.RTLD_GLOBAL) return _preload_npp() import queue import uuid import traceback import threading import gradio as gr import torch from transformers import AutoModelForCausalLM, AutoProcessor import modelscope_studio.components.antd as antd import modelscope_studio.components.antdx as antdx import modelscope_studio.components.base as ms import modelscope_studio.components.pro as pro try: import spaces HAS_SPACES = True except ImportError: HAS_SPACES = False # --------------------------------------------------------------------------- # Model # --------------------------------------------------------------------------- MODEL_ID = "OpenMOSS-Team/MOSS-VL-Instruct-0408" print("Loading processor...") processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True) print("Loading model...") try: model = AutoModelForCausalLM.from_pretrained( MODEL_ID, trust_remote_code=True, torch_dtype=torch.bfloat16, device_map="auto", attn_implementation="flash_attention_2", ) except Exception: model = AutoModelForCausalLM.from_pretrained( MODEL_ID, trust_remote_code=True, torch_dtype=torch.bfloat16, device_map="auto", attn_implementation="sdpa", ) model.eval() print("Model ready.") # --------------------------------------------------------------------------- # Theme (Ant Design token — matches Qwen style but in MOSS green accent) # --------------------------------------------------------------------------- THEME = { "token": { "colorPrimary": "#4f7c6a", } } # --------------------------------------------------------------------------- # Welcome screen config # --------------------------------------------------------------------------- def welcome_config(): return { "title": "MOSS-VL", "description": "Multimodal vision-language model. Upload an image or video and ask anything.", "icon": "asserts/cleaned_small_logo.png", "elem_style": { "maxWidth": "960px", "margin": "40px auto 0", "width": "100%", "textAlign": "center", }, "prompts": { "title": "What can I help with?", "elem_style": { "width": "100%", "display": "flex", "flexWrap": "wrap", "gap": "12px", "justifyContent": "center", "alignItems": "stretch", }, "styles": { "title": { "width": "100%", "textAlign": "center", "marginBottom": "6px", "fontSize": "14px", }, "item": { "flex": "1 1 0", "maxWidth": "420px", "minWidth": "280px", }, }, "items": [ { "label": "🖼️ Image Perception", "children": [ { "label": "Image Caption", "children": [ {"label": "", "description": "请详细描述这张图片的内容。"}, {"label": "", "description": "Describe this image in detail."}, ], }, { "label": "Multi-Image Caption", "children": [ {"label": "", "description": "这几张图片分别是什么?请逐一详细说明。"}, {"label": "", "description": "What are these pictures? Please explain in detail one by one."}, ], }, ], }, { "label": "📄 OCR / Document", "children": [ { "label": "OCR", "children": [ {"label": "", "description": "提取图片中的所有文字。"}, {"label": "", "description": "Extract all text in the image."}, ], }, { "label": "Document Parsing", "children": [ {"label": "", "description": "将文档转换为 Markdown 格式。"}, {"label": "", "description": "Convert this document to Markdown."}, ], }, ], }, { "label": "🎬 Video Understanding", "children": [ { "label": "Video Caption", "children": [ {"label": "", "description": "请描述这个视频的内容。"}, {"label": "", "description": "Describe this video."}, ], }, { "label": "Temporal Grounding", "children": [ {"label": "", "description": "观看此视频并确定主要的叙事片段。对于每个不同的时间块,提供时间戳并描述发生了什么。"}, {"label": "", "description": "Watch this video and identify the main narrative segments. For each distinct time block, provide the timestamps and describe what happens."}, ], }, ], }, ], }, } def user_config(): return { "actions": ["edit", "delete"], } def bot_config(disabled_actions=None): actions = ["copy", "retry", "delete"] if disabled_actions: actions = [a for a in actions if a not in disabled_actions] return { "avatar": _logo_url, "header": "MOSS-VL", "actions": actions, } def _file_path(f) -> str: """Extract real filesystem path from either a plain string or a Gradio file dict.""" if isinstance(f, str): return f if isinstance(f, dict): return f.get("path") or f.get("name") or "" return "" # --------------------------------------------------------------------------- # Inference (multi-turn — yields loading placeholder then final reply) # --------------------------------------------------------------------------- _VIDEO_EXTENSIONS = frozenset({".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".m4v"}) def _build_model_messages(history): """Convert pro.Chatbot history to the model's multi-turn message format. User turns become ``[{type: image, image: path}, {type: text, text: ...}]``. Assistant turns become plain strings. Loading placeholders are skipped. """ model_messages = [] for msg in history: if msg.get("loading"): continue role = msg["role"] if role == "user": content_parts = [] for part in msg.get("content", []): if part["type"] == "file": for f in (part.get("content") or []): path = _file_path(f) if path and os.path.exists(path): ext = os.path.splitext(path)[1].lower() if ext in _VIDEO_EXTENSIONS: content_parts.append({"type": "video", "video": path}) else: content_parts.append({"type": "image", "image": path}) elif part["type"] == "text": t = part.get("content", "") if t.strip(): content_parts.append({"type": "text", "text": t}) if content_parts: model_messages.append({"role": "user", "content": content_parts}) elif role == "assistant": text_parts = [] for part in msg.get("content", []): if isinstance(part, dict) and part.get("type") == "text": text_parts.append(part.get("content", "")) text = "\n".join(text_parts).strip() if text: model_messages.append({"role": "assistant", "content": text}) return model_messages # Media defaults matching the official inference reference _IMAGE_MEDIA_DEFAULTS = { "min_pixels": 4096, "max_pixels": 16777216, "multi_image_max_pixels": 201326592, "patch_size": 16, "temporal_patch_size": 1, "merge_size": 2, "image_mean": [0.5, 0.5, 0.5], "image_std": [0.5, 0.5, 0.5], } _VIDEO_MEDIA_DEFAULTS = { "min_pixels": 4096, "max_pixels": 16777216, "video_max_pixels": 201326592, "patch_size": 16, "temporal_patch_size": 1, "merge_size": 2, "video_fps": 1.0, "min_frames": 1, "max_frames": 256, "num_extract_threads": 4, "image_mean": [0.5, 0.5, 0.5], "image_std": [0.5, 0.5, 0.5], } def _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256): """ messages: list of history dicts in pro.Chatbot format. The caller must have already appended an assistant bubble as the last item. Yields: (updated history list, new_last_image_path) """ history = list(messages) if messages else [] # Last item is the pre-created assistant bubble; user message is second-to-last user_msg = None for msg in reversed(history[:-1]): if msg["role"] == "user": user_msg = msg break if user_msg is None: return text = "" new_image = None for part in user_msg.get("content", []): if part["type"] == "text": text = part["content"] elif part["type"] == "file": files = part["content"] if files: new_image = _file_path(files[0]) if new_image and os.path.exists(new_image): last_image_path = new_image if not text.strip(): history[-1]["loading"] = False history[-1]["content"] = [{"type": "text", "content": "⚠️ Please enter a prompt."}] yield history, last_image_path return # Yield loading bubble immediately before heavy model work yield history, last_image_path try: model_messages = _build_model_messages(history[:-1]) # Detect media types to pick correct defaults has_image = any( p.get("type") == "image" for m in model_messages for p in (m["content"] if isinstance(m["content"], list) else []) ) has_video = any( p.get("type") == "video" for m in model_messages for p in (m["content"] if isinstance(m["content"], list) else []) ) media_kwargs = {} if has_image: media_kwargs.update(_IMAGE_MEDIA_DEFAULTS) if has_video: media_kwargs.update({**_VIDEO_MEDIA_DEFAULTS, "video_fps": float(video_fps), "max_frames": int(max_frames)}) do_sample = temperature > 0.0 query = { "messages": model_messages, "media_kwargs": media_kwargs, "generate_kwargs": { "max_new_tokens": int(max_new_tokens), "temperature": float(temperature), "top_k": 50, "top_p": float(top_p), "repetition_penalty": float(repetition_penalty), "do_sample": do_sample, "vision_chunked_length": 64, }, } # Use the official offline_generate streaming API (queue-based) in_q: "queue.Queue[dict]" = queue.Queue() out_q: "queue.Queue[str]" = queue.Queue() worker = threading.Thread( target=model.offline_generate, args=(processor, in_q, out_q), kwargs={"vision_chunked_length": 64}, daemon=True, ) worker.start() in_q.put(dict(query)) partial_text = "" try: while True: token = out_q.get(timeout=300) if token == "<|round_start|>": continue if token == "<|round_end|>": break if token.startswith("[ERROR] "): raise RuntimeError(token) partial_text += token history[-1]["loading"] = False history[-1]["content"] = [{"type": "text", "content": partial_text + "▋"}] yield history, last_image_path finally: in_q.put({"stop_offline_generate": True}) worker.join(timeout=30.0) if partial_text: history[-1]["content"] = [{"type": "text", "content": partial_text}] except torch.cuda.OutOfMemoryError: history[-1]["loading"] = False history[-1]["content"] = [{"type": "text", "content": "❌ Out of memory — try a smaller image or fewer Max New Tokens."}] except Exception: history[-1]["loading"] = False history[-1]["content"] = [{"type": "text", "content": f"❌ Error:\n```\n{traceback.format_exc()}\n```"}] yield history, last_image_path if HAS_SPACES: @spaces.GPU(duration=120) def run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256): yield from _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path, video_fps, max_frames) else: def run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path=None, video_fps=1.0, max_frames=256): yield from _run_generate(messages, enable_thinking, max_new_tokens, temperature, top_p, repetition_penalty, last_image_path, video_fps, max_frames) # --------------------------------------------------------------------------- # CSS # --------------------------------------------------------------------------- CSS = """ /* Use 100vh (absolute) so body.offsetHeight = viewport height. iFrameResizer reads offsetHeight — this prevents it from expanding the iframe beyond the viewport and making the outer page scroll. */ html { height: 100vh !important; overflow: hidden !important; } body { height: 100vh !important; overflow: hidden !important; } .gradio-container { padding: 0 !important; height: 100vh !important; overflow: hidden !important; } .gradio-container > main.fillable { padding: 0 !important; height: 100vh !important; overflow: hidden !important; } footer { display: none !important; } /* Height locked via JS-set --app-height to avoid iframe 100vh feedback loop */ #chatbot { height: var(--app-height, 780px); max-height: var(--app-height, 780px); } /* Propagate fixed height through any wrapper divs down to the ant-col children */ #chatbot > *, #chatbot .ant-row, #chatbot .ant-col { height: 100% !important; } /* Gradio injects extra wrapper divs between ant-col and chatbot-chat; propagate height */ #chatbot .ant-col > div { height: 100% !important; } /* Sidebar col: full-height gray background, override antd gutter padding */ #chatbot .sidebar-col { height: 100% !important; background-color: var(--ms-gr-ant-color-bg-layout) !important; padding-left: 0 !important; padding-right: 0 !important; } #chatbot .chatbot-conversations { height: 100%; background-color: var(--ms-gr-ant-color-bg-layout); padding-left: 4px; padding-right: 4px; overflow-y: auto; } #chatbot .chatbot-conversations .chatbot-conversations-list { padding-left: 0; padding-right: 0; } #chatbot .chatbot-chat { padding: 32px; padding-top: 64px; padding-bottom: 24px; height: 100%; display: flex; flex-direction: column; overflow: hidden; } @media (max-width: 768px) { #chatbot .chatbot-chat { padding: 10px; padding-bottom: 16px; } } #chatbot .chatbot-chat .chatbot-chat-messages { flex: 1; min-height: 0; overflow-y: auto; } #chatbot .chatbot-chat .chatbot-chat-messages > div { height: 100% !important; display: flex !important; flex-direction: column !important; } /* Vertically center welcome content only (safe — won't break scroll when messages exist) */ #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages { display: flex; flex-direction: column; } /* Equal-height top-level cards */ #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-items { display: flex !important; align-items: stretch !important; } #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item { display: flex !important; flex-direction: column !important; height: auto !important; flex: 1 1 0 !important; } #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item > * { flex: 1; display: flex; flex-direction: column; height: 100%; } /* Sub-group rows within each card */ #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-items { display: flex !important; flex-direction: column !important; align-items: stretch !important; flex: 1; height: 100%; } /* Sub-groups (level 2) */ #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-item { flex: 1 1 0 !important; display: flex !important; flex-direction: column !important; box-sizing: border-box !important; } /* Leaf prompt buttons (level 3): smaller font and compact height */ #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-item .ant-prompts-item { flex: 1 1 0 !important; height: auto !important; display: flex !important; align-items: center !important; padding: 4px 8px !important; box-sizing: border-box !important; font-size: 11px !important; line-height: 1.4 !important; } /* Sub-group label — smaller font */ #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-prompts-item .ant-prompts-title { font-size: 11px !important; opacity: 0.65; margin-bottom: 4px !important; padding: 0 !important; } /* Make \n in description render as real line breaks */ .ant-prompts-item-description { white-space: pre-wrap !important; } /* Welcome header: icon stacked above title */ #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome { display: flex !important; flex-direction: column !important; align-items: center !important; } #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-icon { font-size: 80px !important; margin-bottom: 8px !important; margin-inline-end: 0 !important; } #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-icon img { width: 80px !important; height: 80px !important; } #chatbot .chatbot-chat-messages .ms-gr-pro-chatbot-messages .ant-welcome-title { font-size: 36px !important; } /* Bot avatar: no circle crop, transparent-friendly */ #chatbot .ant-avatar { border-radius: 0 !important; background: transparent !important; border: none !important; box-shadow: none !important; } #chatbot .ant-avatar img { border-radius: 0 !important; object-fit: contain !important; } """ # --------------------------------------------------------------------------- # UI # --------------------------------------------------------------------------- _ROOT_PATH = os.environ.get("GRADIO_ROOT_PATH", "").rstrip("/") _ASSETS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "asserts") _LOGO_PATH = os.path.join(_ASSETS_DIR, "pure_logo.png") _logo_url = "https://huggingface.co/spaces/OpenMOSS-Team/MOSS-VL/resolve/main/asserts/pure_logo.png" # One-shot snapshot of window.innerHeight → --app-height. # Reads once after iFrameResizer has set the initial iframe size, then # NEVER updates. This breaks the feedback loop where iFrameResizer grows # the iframe in response to content height and our JS keeps chasing it. _SYNC_HEIGHT_JS = """ () => { let attempts = 0; const snapshot = () => { const h = window.innerHeight; // Only accept plausible values (iframe default is 150px). if (h > 500) { document.documentElement.style.setProperty('--app-height', h + 'px'); return; // one-shot: stop polling, never listen for resize } // Poll every 50ms up to 2 seconds; after that let CSS fallback (780px) take over. if (attempts++ < 40) { setTimeout(snapshot, 50); } }; snapshot(); } """ # Per-row height equalization for the 3-column welcome prompt grid. # Structure assumed: 3 top-level column items, each with 4 leaf items (2 groups × 2 leaves). # Columns are identified as prompts-items that are NOT nested inside another prompts-item. # Then for each row index 0-3, we equalize min-height across the 3 columns. _EQUALIZE_ROWS_JS = """ () => { const fix = () => { const all = [...document.querySelectorAll('[class*="prompts-item"]')]; if (all.length < 12) { setTimeout(fix, 400); return; } // Top-level column items: not contained in any other prompts-item const cols = all.filter(el => !el.parentElement.closest('[class*="prompts-item"]')); if (cols.length !== 3) { setTimeout(fix, 400); return; } // For each column collect leaf items (no nested prompts-item) in DOM order const colLeaves = cols.map(col => [...col.querySelectorAll('[class*="prompts-item"]')] .filter(el => !el.querySelector('[class*="prompts-item"]')) ); if (!colLeaves.every(l => l.length === 4)) { setTimeout(fix, 400); return; } // Check all items have rendered height if (colLeaves.flat().some(el => el.getBoundingClientRect().height < 5)) { setTimeout(fix, 400); return; } // Equalize row by row for (let r = 0; r < 4; r++) { const row = colLeaves.map(col => col[r]); const maxH = Math.max(...row.map(el => el.getBoundingClientRect().height)); row.forEach(el => { el.style.minHeight = maxH + 'px'; }); } }; setTimeout(fix, 1500); } """ with gr.Blocks(css=CSS, fill_width=True, title="MOSS-VL Demo") as demo: # Generation settings (shared state) gen_settings = gr.State({ "max_new_tokens": 512, "temperature": 0.0, "top_p": 1.0, "repetition_penalty": 1.0, }) # Conversation state state = gr.State({ "conversation_contexts": {}, # id -> {"history": [...]} "conversations": [], # [{key, label}, ...] "conversation_id": "", }) with ms.Application(), antdx.XProvider(theme=THEME): with antd.Row(gutter=[20, 20], wrap=False, elem_id="chatbot"): # ── LEFT SIDEBAR ── with antd.Col( md=dict(flex="0 0 260px", span=24, order=0), span=0, order=1, elem_style=dict(width=0), elem_classes="sidebar-col", ) as sidebar_col: with ms.Div(elem_classes="chatbot-conversations"): with antd.Flex(vertical=True, gap="small", elem_style=dict(height="100%")): # Logo gr.HTML( f'
' f'' f'MOSS-VL' f'
' ) # New conversation button with antd.Button( value=None, color="primary", variant="filled", block=True, ) as add_conv_btn: ms.Text("New Conversation") with ms.Slot("icon"): antd.Icon("PlusOutlined") # Conversation list with antdx.Conversations( elem_classes="chatbot-conversations-list", ) as conversations: with ms.Slot("menu.items"): with antd.Menu.Item( label="Delete", key="delete", danger=True ) as conv_delete_item: with ms.Slot("icon"): antd.Icon("DeleteOutlined") # Settings accordion at bottom of sidebar with antd.Collapse(ghost=True): with antd.Collapse.Item( label="⚙ Generation Settings", key="settings", ): max_new_tokens = gr.Slider(64, 8192, value=4096, step=64, label="Max New Tokens") temperature = gr.Slider(0.0, 1.5, value=0.5, step=0.05, label="Temperature") top_p = gr.Slider(0.1, 1.0, value=1.0, step=0.05, label="Top-p") repetition_penalty = gr.Slider(1.0, 2.0, value=1.05, step=0.05, label="Repetition Penalty") with antd.Collapse.Item( label="🎬 Video Sampling", key="video", ): video_fps = gr.Slider(0.1, 4.0, value=1.0, step=0.1, label="FPS") max_frames = gr.Slider(8, 512, value=256, step=8, label="Max Frames") # ── MAIN CHAT AREA ── with antd.Col(flex=1, elem_style=dict(height="100%")): with antd.Flex( vertical=True, gap="small", elem_classes="chatbot-chat", ): # Chatbot chatbot = pro.Chatbot( elem_classes="chatbot-chat-messages", height=0, welcome_config=welcome_config(), user_config=user_config(), bot_config=bot_config(), ) # Multimodal input (built-in + button for attachments) with pro.MultimodalInput( placeholder="Message MOSS-VL…", upload_config={ "accept": "image/*,video/*", "multiple": False, }, ) as chat_input: with ms.Slot("prefix"): with antd.Flex(gap=4, wrap=True): with antd.Button(value=None, type="text") as clear_btn: with ms.Slot("icon"): antd.Icon("ClearOutlined") # ── EVENT HANDLERS ── def preprocess(state_value, clear_input=True): history = state_value["conversation_contexts"].get( state_value["conversation_id"], {} ).get("history", []) updates = { conversations: gr.update( active_key=state_value["conversation_id"], items=[{**c, "disabled": c["key"] != state_value["conversation_id"]} for c in state_value["conversations"]], ), add_conv_btn: gr.update(disabled=True), clear_btn: gr.update(disabled=True), conv_delete_item: gr.update(disabled=True), chatbot: gr.update( value=history, bot_config=bot_config(disabled_actions=["retry", "edit", "delete"]), user_config={"actions": []}, ), state: gr.update(value=state_value), } if clear_input: updates[chat_input] = gr.update(value=None, loading=True) else: updates[chat_input] = gr.update(loading=True) return updates def postprocess(state_value): history = state_value["conversation_contexts"].get( state_value["conversation_id"], {} ).get("history", []) return { chat_input: gr.update(loading=False), conv_delete_item: gr.update(disabled=False), clear_btn: gr.update(disabled=False), conversations: gr.update(items=state_value["conversations"]), add_conv_btn: gr.update(disabled=False), chatbot: gr.update( value=history, bot_config=bot_config(), user_config=user_config(), ), state: gr.update(value=state_value), } def add_user_message(input_value, state_value): text = input_value.get("text", "") if input_value else "" files = input_value.get("files", []) if input_value else [] persistent_files = [_file_path(f) for f in files] if not state_value["conversation_id"]: conv_id = str(uuid.uuid4()) state_value["conversation_id"] = conv_id state_value["conversations"].append({"label": text[:40] or "New Chat", "key": conv_id}) state_value["conversation_contexts"][conv_id] = {"history": [], "last_image_path": None} ctx = state_value["conversation_contexts"][state_value["conversation_id"]] history = ctx["history"] history.append({ "key": str(uuid.uuid4()), "role": "user", "content": [ {"type": "file", "content": persistent_files}, {"type": "text", "content": text}, ], }) history.append({ "key": str(uuid.uuid4()), "role": "assistant", "header": "MOSS-VL", "loading": True, "content": [{"type": "text", "content": ""}], }) return preprocess(state_value, clear_input=True) def generate_response(state_value, max_tok, temp, top_p_, rep_pen, v_fps, v_max_frames): conv_id = state_value.get("conversation_id", "") if not conv_id or conv_id not in state_value.get("conversation_contexts", {}): return ctx = state_value["conversation_contexts"][conv_id] history = ctx["history"] last_img = ctx.get("last_image_path") for updated_history, new_last_img in run_generate( history, False, max_tok, temp, top_p_, rep_pen, last_img, v_fps, v_max_frames ): ctx["history"] = updated_history ctx["last_image_path"] = new_last_img yield updated_history, state_value def apply_welcome_prompt(e: gr.EventData, input_value): if input_value is None: input_value = {} input_value["text"] = e._data["payload"][0]["value"]["description"] return gr.update(value=input_value) def new_chat(state_value): if not state_value["conversation_id"]: return gr.skip() state_value["conversation_id"] = "" return ( gr.update(active_key=""), gr.update(value=None), gr.update(value=state_value), ) def select_conversation(state_value, e: gr.EventData): key = e._data["payload"][0] if state_value["conversation_id"] == key or key not in state_value["conversation_contexts"]: return gr.skip() state_value["conversation_id"] = key history = state_value["conversation_contexts"][key]["history"] return ( gr.update(active_key=key), gr.update(value=history), gr.update(value=state_value), ) def conversation_menu(state_value, e: gr.EventData): conv_id = e._data["payload"][0]["key"] operation = e._data["payload"][1]["key"] if operation == "delete": del state_value["conversation_contexts"][conv_id] state_value["conversations"] = [ c for c in state_value["conversations"] if c["key"] != conv_id ] if state_value["conversation_id"] == conv_id: state_value["conversation_id"] = "" return ( gr.update(items=state_value["conversations"], active_key=""), gr.update(value=None), gr.update(value=state_value), ) else: return ( gr.update(items=state_value["conversations"]), gr.skip(), gr.update(value=state_value), ) return gr.skip() def clear_history(state_value): if not state_value["conversation_id"]: return gr.skip() state_value["conversation_contexts"][state_value["conversation_id"]]["history"] = [] return gr.update(value=None), gr.update(value=state_value) def prepare_retry(state_value, e: gr.EventData): index = e._data["payload"][0]["index"] ctx = state_value["conversation_contexts"][state_value["conversation_id"]] ctx["history"] = ctx["history"][:index] ctx["history"].append({ "key": str(uuid.uuid4()), "role": "assistant", "header": "MOSS-VL", "loading": True, "content": [{"type": "text", "content": ""}], }) return preprocess(state_value, clear_input=False) def delete_message(state_value, e: gr.EventData): index = e._data["payload"][0]["index"] history = state_value["conversation_contexts"][state_value["conversation_id"]]["history"] history.pop(index) return gr.update(value=state_value) def handle_edit(state_value, e: gr.EventData): payload = e._data["payload"][0] index = payload["index"] ctx = state_value["conversation_contexts"][state_value["conversation_id"]] # Extract new text from the edited content new_content = payload.get("value", "") if isinstance(new_content, list): # content is a list of parts — extract text new_text = " ".join( p.get("content", "") or p.get("text", "") for p in new_content if isinstance(p, dict) and p.get("type") == "text" ) elif isinstance(new_content, str): new_text = new_content else: new_text = "" # Update the user message at index with the new text, keep files intact original_msg = ctx["history"][index] new_parts = [] for part in original_msg.get("content", []): if part.get("type") == "file": new_parts.append(part) elif part.get("type") == "text": new_parts.append({"type": "text", "content": new_text}) if not any(p.get("type") == "text" for p in new_parts): new_parts.append({"type": "text", "content": new_text}) ctx["history"][index]["content"] = new_parts # Drop everything after the edited message (old assistant reply + later turns) ctx["history"] = ctx["history"][:index + 1] # Append loading assistant bubble ctx["history"].append({ "key": str(uuid.uuid4()), "role": "assistant", "header": "MOSS-VL", "loading": True, "content": [{"type": "text", "content": ""}], }) return preprocess(state_value, clear_input=False) # Wire events ui_outputs = [ chat_input, conv_delete_item, clear_btn, add_conv_btn, conversations, chatbot, state, ] stream_outputs = [chatbot, state] gen_settings = [max_new_tokens, temperature, top_p, repetition_penalty, video_fps, max_frames] # Submit: add message → stream tokens → restore UI submit_step1 = chat_input.submit( fn=add_user_message, inputs=[chat_input, state], outputs=ui_outputs, ) submit_step2 = submit_step1.then( fn=generate_response, inputs=[state] + gen_settings, outputs=stream_outputs, ) submit_step2.then( fn=postprocess, inputs=[state], outputs=ui_outputs, ) chat_input.cancel( fn=postprocess, inputs=[state], outputs=ui_outputs, cancels=[submit_step1, submit_step2], queue=False, ) chatbot.welcome_prompt_select( fn=apply_welcome_prompt, inputs=[chat_input], outputs=[chat_input], ) add_conv_btn.click( fn=new_chat, inputs=[state], outputs=[conversations, chatbot, state], ) conversations.active_change( fn=select_conversation, inputs=[state], outputs=[conversations, chatbot, state], ) conversations.menu_click( fn=conversation_menu, inputs=[state], outputs=[conversations, chatbot, state], ) clear_btn.click( fn=clear_history, inputs=[state], outputs=[chatbot, state], ) chatbot.delete( fn=delete_message, inputs=[state], outputs=[state], ) # Edit: update message → stream tokens → restore UI edit_step1 = chatbot.edit( fn=handle_edit, inputs=[state], outputs=ui_outputs, ) edit_step2 = edit_step1.then( fn=generate_response, inputs=[state] + gen_settings, outputs=stream_outputs, ) edit_step2.then( fn=postprocess, inputs=[state], outputs=ui_outputs, ) # Retry: prepare → stream tokens → restore UI retry_step1 = chatbot.retry( fn=prepare_retry, inputs=[state], outputs=ui_outputs, ) retry_step2 = retry_step1.then( fn=generate_response, inputs=[state] + gen_settings, outputs=stream_outputs, ) retry_step2.then( fn=postprocess, inputs=[state], outputs=ui_outputs, ) # Lock chatbot height to actual viewport height (avoids iframe 100vh loop) demo.load(fn=None, inputs=None, outputs=None, js=_SYNC_HEIGHT_JS) # Per-row height equalization for the welcome prompt grid demo.load(fn=None, inputs=None, outputs=None, js=_EQUALIZE_ROWS_JS) demo.queue(default_concurrency_limit=1, max_size=20) # Mount asserts directory as /assets so logo can be served without going # through gradio's cache validation (which rejects paths not in temp dir) from fastapi.staticfiles import StaticFiles demo.app.mount("/assets", StaticFiles(directory=_ASSETS_DIR), name="assets") if __name__ == "__main__": demo.launch(ssr_mode=False, root_path=_ROOT_PATH)