import subprocess import sys # Install local mediagallery package at runtime (for HF Spaces) subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "./mediagallery"]) import gradio as gr import spaces from gradio_mediagallery import MediaGallery from PIL import Image from moviepy.editor import VideoFileClip, AudioFileClip import os from openai import OpenAI import subprocess from pathlib import Path import uuid import tempfile import shlex import shutil # Supported models configuration MODELS = { "zai-org/GLM-4.7-Flash": { "base_url": "https://router.huggingface.co/v1", "env_key": "HF_TOKEN", "model_name": "zai-org/GLM-4.7-Flash:novita", }, } # Initialize client with first available model client = OpenAI( base_url=next(iter(MODELS.values()))["base_url"], api_key=os.environ[next(iter(MODELS.values()))["env_key"]], ) allowed_medias = [ ".png", ".jpg", ".webp", ".jpeg", ".tiff", ".bmp", ".gif", ".svg", ".mp3", ".wav", ".ogg", ".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".mpg", ".mpeg", ".m4v", ".3gp", ".3g2", ".3gpp", ] class FileWrapper: """Wrapper to provide .name attribute for MediaGallery output tuples.""" def __init__(self, path): self.name = path if isinstance(path, str) else str(path) def normalize_files(files): """Convert MediaGallery output or gr.File output to list of file-like objects.""" if not files: return [] result = [] for item in files: if isinstance(item, tuple): # MediaGallery returns (path, caption) tuples path = item[0] result.append(FileWrapper(path)) elif hasattr(item, "name"): # gr.File returns objects with .name attribute result.append(item) elif isinstance(item, str): # Direct file path result.append(FileWrapper(item)) else: result.append(FileWrapper(str(item))) return result def get_files_infos(files): files = normalize_files(files) results = [] for file in files: file_path = Path(file.name) info = {} info["size"] = os.path.getsize(file_path) # Sanitize filename by replacing spaces with underscores info["name"] = file_path.name.replace(" ", "_") file_extension = file_path.suffix if file_extension in (".mp4", ".avi", ".mkv", ".mov"): info["type"] = "video" video = VideoFileClip(file.name) info["duration"] = video.duration info["dimensions"] = "{}x{}".format(video.size[0], video.size[1]) if video.audio: info["type"] = "video/audio" info["audio_channels"] = video.audio.nchannels video.close() elif file_extension in (".mp3", ".wav"): info["type"] = "audio" audio = AudioFileClip(file.name) info["duration"] = audio.duration info["audio_channels"] = audio.nchannels audio.close() elif file_extension in ( ".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".gif", ".svg", ): info["type"] = "image" img = Image.open(file.name) info["dimensions"] = "{}x{}".format(img.size[0], img.size[1]) results.append(info) return results def get_completion( prompt, files_info, top_p, temperature, model_choice, conversation_history=None, previous_error=None, previous_command=None, on_stream=None, ): # Create table header files_info_string = "| Type | Name | Dimensions | Duration | Audio Channels |\n" files_info_string += "|------|------|------------|-----------|--------|\n" # Add each file as a table row for file_info in files_info: dimensions = file_info.get("dimensions", "-") duration = ( f"{file_info.get('duration', '-')}s" if "duration" in file_info else "-" ) audio = ( f"{file_info.get('audio_channels', '-')} channels" if "audio_channels" in file_info else "-" ) files_info_string += f"| {file_info['type']} | {file_info['name']} | {dimensions} | {duration} | {audio} |\n" # Build the user message with optional error feedback user_content = f"""## AVAILABLE ASSETS {files_info_string} ## TASK {prompt} ## REQUIREMENTS - Output format: MP4 video saved as "output.mp4" - Generate a single, complete FFmpeg command - Command must work with the exact filenames listed above Think briefly about the approach, then output the FFmpeg command in a ```bash code block.""" # Add error feedback if this is a retry if previous_error and previous_command: user_content += f""" IMPORTANT: This is a retry attempt. The previous command failed with the following error: PREVIOUS COMMAND (FAILED): {previous_command} ERROR MESSAGE: {previous_error} Please analyze the error and generate a corrected command that addresses the specific issue. COMMON ERROR FIXES: - If you see "do not match the corresponding output link" → Images have different dimensions, use scale+pad approach - If you see "Padded dimensions cannot be smaller than input dimensions" → Fix pad calculation or use standard resolution (1920x1080 or 1080x1920) - If you see "Failed to configure input pad" → Check scale and pad syntax, ensure proper filter chain - If you see "Invalid argument" in filters → Simplify filter_complex syntax and check parentheses - If you see "No option name near" with showwaves → Use 'x' for size: s=1920x200 (NOT s=1920:200) FORMAT DETECTION KEYWORDS: - "vertical", "portrait", "9:16", "TikTok", "Instagram Stories", "phone" → Use 1080x1920 - "horizontal", "landscape", "16:9", "YouTube", "TV" → Use 1920x1080 (default) - "square", "1:1", "Instagram post" → Use 1080x1080""" user_content += "\n\nYOUR RESPONSE:" # Initialize conversation with system message and first user message if conversation_history is None: messages = [ { "role": "system", "content": """You are an expert FFmpeg engineer. Generate precise, working FFmpeg commands. ## OUTPUT FORMAT 1. Brief analysis (2-3 sentences max) 2. Single FFmpeg command in a ```bash code block 3. Output file must be "output.mp4" ## CORE RULES - ONE command only, no chaining (no && or ;) - Use exact filenames from the asset list - Keep commands as simple as possible - Always use: -c:v libx264 -pix_fmt yuv420p -movflags +faststart ## SLIDESHOW PATTERN (for multiple images) When combining images with different dimensions: ```bash ffmpeg -loop 1 -t 3 -i img1.jpg -loop 1 -t 3 -i img2.jpg -filter_complex "[0]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1[v0];[1]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1[v1];[v0][v1]concat=n=2:v=1:a=0" -c:v libx264 -pix_fmt yuv420p output.mp4 ``` - Default: 1920x1080, 3 seconds per image - Vertical/portrait/TikTok: use 1080x1920 - Always scale+pad to normalize dimensions ## AUDIO WAVEFORM For full-width waveform visualization (waveform width = video width): ```bash ffmpeg -i audio.mp3 -i bg.png -filter_complex "[0:a]showwaves=s=1920x200:mode=line:colors=white[wave];[1]scale=1920:1080[bg];[bg][wave]overlay=0:(H-h)/2" -c:v libx264 -c:a aac output.mp4 ``` CRITICAL: - showwaves size uses 'x' separator: s=WIDTHxHEIGHT (NOT s=WIDTH:HEIGHT) - For full-width: set waveform width = video width (e.g., s=1920x200 for 1920px wide video) - overlay=0:(H-h)/2 positions at x=0 (full width) and centers vertically ## WITH BACKGROUND MUSIC Add audio to video/slideshow: ```bash ffmpeg ... -i music.mp3 -map "[vout]" -map N:a -shortest -c:a aac output.mp4 ``` Where N is the audio input index.""", }, { "role": "user", "content": user_content, }, ] else: # Use existing conversation history messages = conversation_history[:] # If there's a previous error, add it as a separate message exchange if previous_error and previous_command: # Add the failed command as assistant response messages.append( { "role": "assistant", "content": f"I'll execute this FFmpeg command:\n\n```bash\n{previous_command}\n```", } ) # Add the error as user feedback messages.append( { "role": "user", "content": f"""The command failed with the following error: ERROR MESSAGE: {previous_error} Please analyze the error and generate a corrected command that addresses the specific issue. COMMON ERROR FIXES: - If you see "do not match the corresponding output link" → Images have different dimensions, use scale+pad approach - If you see "Padded dimensions cannot be smaller than input dimensions" → Fix pad calculation or use standard resolution (1920x1080 or 1080x1920) - If you see "Failed to configure input pad" → Check scale and pad syntax, ensure proper filter chain - If you see "Invalid argument" in filters → Simplify filter_complex syntax and check parentheses - If you see "No option name near" with showwaves → Use 'x' for size: s=1920x200 (NOT s=1920:200) FORMAT DETECTION KEYWORDS: - "vertical", "portrait", "9:16", "TikTok", "Instagram Stories", "phone" → Use 1080x1920 - "horizontal", "landscape", "16:9", "YouTube", "TV" → Use 1920x1080 (default) - "square", "1:1", "Instagram post" → Use 1080x1080 Please provide a corrected FFmpeg command.""", } ) else: # Add new user request to existing conversation messages.append( { "role": "user", "content": user_content, } ) try: # Print the complete prompt print("\n=== COMPLETE PROMPT ===") for msg in messages: print(f"\n[{msg['role'].upper()}]:") print(msg["content"]) print("=====================\n") if model_choice not in MODELS: raise ValueError(f"Model {model_choice} is not supported") model_config = MODELS[model_choice] client.base_url = model_config["base_url"] client.api_key = os.environ[model_config["env_key"]] model = model_config.get("model_name", model_choice) if on_stream: # Streaming mode stream = client.chat.completions.create( model=model, messages=messages, temperature=temperature, top_p=top_p, max_tokens=2048, stream=True, ) content = "" for chunk in stream: if chunk.choices[0].delta.content: content += chunk.choices[0].delta.content on_stream(content) else: # Non-streaming mode completion = client.chat.completions.create( model=model, messages=messages, temperature=temperature, top_p=top_p, max_tokens=2048, ) content = completion.choices[0].message.content print(f"\n=== RAW API RESPONSE ===\n{content}\n========================\n") # Extract command from code block if present import re command = None # Try multiple code block patterns code_patterns = [ r"```(?:bash|sh|shell)?\n(.*?)\n```", # Standard code blocks r"```\n(.*?)\n```", # Plain code blocks r"`([^`]*ffmpeg[^`]*)`", # Inline code with ffmpeg ] for pattern in code_patterns: matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE) for match in matches: if "ffmpeg" in match.lower(): command = match.strip() break if command: break # If no code block found, try to find ffmpeg lines directly if not command: ffmpeg_lines = [ line.strip() for line in content.split("\n") if line.strip().lower().startswith("ffmpeg") ] if ffmpeg_lines: command = ffmpeg_lines[0] # Last resort: look for any line containing ffmpeg if not command: for line in content.split("\n"): line = line.strip() if "ffmpeg" in line.lower() and len(line) > 10: command = line break if not command: print(f"ERROR: No ffmpeg command found in response") command = content.replace("\n", " ").strip() print(f"=== EXTRACTED COMMAND ===\n{command}\n========================\n") # remove output.mp4 with the actual output file path command = command.replace("output.mp4", "") # Add the assistant's response to conversation history messages.append({"role": "assistant", "content": content}) return command, messages except Exception as e: raise Exception("API Error") @spaces.GPU(duration=120) def execute_ffmpeg_command(args, temp_dir, output_file_path): """Execute FFmpeg command with GPU acceleration""" final_command = args + ["-y", output_file_path] print(f"\n=== EXECUTING FFMPEG COMMAND ===\nffmpeg {' '.join(final_command[1:])}\n") subprocess.run(final_command, cwd=temp_dir) return output_file_path def compose_video( prompt: str, files: list = None, top_p: float = 0.95, temperature: float = 0.1, model_choice: str = "zai-org/GLM-4.7-Flash", ) -> str: """ Compose videos from existing media assets using natural language instructions. This tool is NOT for AI video generation. Instead, it uses AI to generate FFmpeg commands that combine, edit, and transform your uploaded images, videos, and audio files based on natural language descriptions. Args: prompt (str): Natural language instructions for video composition (e.g., "Create a slideshow with background music") files (list, optional): List of media files (images, videos, audio) to use top_p (float): Top-p sampling parameter for AI model (0.0-1.0, default: 0.95) temperature (float): Temperature parameter for AI model creativity (0.0-5.0, default: 0.1) model_choice (str): AI model to use for command generation (default: "zai-org/GLM-4.7-Flash") Returns: str: Path to the generated video file Example: compose_video("Create a 10-second slideshow from the images with fade transitions", files=[img1, img2, img3]) """ return update(files or [], prompt, top_p, temperature, model_choice) def update( files, prompt, top_p=1, temperature=1, model_choice="zai-org/GLM-4.7-Flash", ): if prompt == "": raise gr.Error("Please enter a prompt.") # Normalize files from MediaGallery or gr.File format files = normalize_files(files) files_info = get_files_infos(files) # disable this if you're running the app locally or on your own server for file_info in files_info: if file_info["type"] == "video": if file_info["duration"] > 120: raise gr.Error( "Please make sure all videos are less than 2 minute long." ) if file_info["size"] > 100000000: raise gr.Error("Please make sure all files are less than 100MB in size.") attempts = 0 command_attempts = [] previous_error = None previous_command = None conversation_history = None # Mutable container for streaming text and yield function stream_state = {"text": "", "should_yield": False} def get_last_lines(text, n=5): """Return last n lines of text""" lines = text.strip().split('\n') return '\n'.join(lines[-n:]) if lines else "" def on_stream(text): stream_state["text"] = text stream_state["should_yield"] = True while attempts < 2: print("ATTEMPT", attempts + 1) try: # Show generating status attempt_label = f" (retry {attempts})" if attempts > 0 else "" yield None, f"🤖 Generating command{attempt_label}...\n" # We need to stream the response - using a thread to allow yielding import threading result_holder = {"command": None, "history": None, "error": None} def run_completion(): try: cmd, hist = get_completion( prompt, files_info, top_p, temperature, model_choice, conversation_history, previous_error, previous_command, on_stream=on_stream, ) result_holder["command"] = cmd result_holder["history"] = hist except Exception as e: result_holder["error"] = e thread = threading.Thread(target=run_completion) thread.start() # Yield updates while streaming (show last 5 lines) while thread.is_alive(): if stream_state["should_yield"]: yield None, get_last_lines(stream_state["text"]) stream_state["should_yield"] = False thread.join(timeout=0.1) # Final yield of complete text (last 5 lines) if stream_state["text"]: yield None, get_last_lines(stream_state["text"]) if result_holder["error"]: raise result_holder["error"] command_string = result_holder["command"] conversation_history = result_holder["history"] print( f"""///PROMPT {prompt} \n\n/// START OF COMMAND ///:\n\n{command_string}\n\n/// END OF COMMAND ///\n\n""" ) # split command string into list of arguments args = shlex.split(command_string) if args[0] != "ffmpeg": raise Exception("Command does not start with ffmpeg") temp_dir = tempfile.mkdtemp() # copy files to temp dir with sanitized names for file in files: file_path = Path(file.name) sanitized_name = file_path.name.replace(" ", "_") shutil.copy(file_path, Path(temp_dir) / sanitized_name) # test if ffmpeg command is valid dry run ffmpeg_dry_run = subprocess.run( args + ["-f", "null", "-"], stderr=subprocess.PIPE, text=True, cwd=temp_dir, ) # Extract command for display command_for_display = f"ffmpeg {' '.join(args[1:])} -y output.mp4" if ffmpeg_dry_run.returncode == 0: print("Command is valid.") # Add successful command to attempts command_attempts.append( { "command": command_for_display, "status": "✅ Valid", "attempt": attempts + 1, } ) else: print("Command is not valid. Error output:") print(ffmpeg_dry_run.stderr) # Add failed command to attempts with error command_attempts.append( { "command": command_for_display, "status": "❌ Invalid", "error": ffmpeg_dry_run.stderr, "attempt": attempts + 1, } ) # Store error details for next retry previous_error = ffmpeg_dry_run.stderr previous_command = command_for_display yield None, f"❌ Command invalid, retrying..." raise Exception( f"FFMPEG command validation failed: {ffmpeg_dry_run.stderr}" ) output_file_name = f"output_{uuid.uuid4()}.mp4" output_file_path = str((Path(temp_dir) / output_file_name).resolve()) # Show status before executing yield None, get_last_lines(stream_state["text"] + "\n\n⚙️ Executing FFmpeg...") execute_ffmpeg_command(args, temp_dir, output_file_path) # Generate command display with all attempts final_output = get_last_lines(stream_state["text"]) + "\n\n✅ Done!" yield output_file_path, final_output return except Exception as e: attempts += 1 if attempts >= 2: print("FROM UPDATE", e) # Show model output with error (last 5 lines) error_output = get_last_lines(stream_state["text"]) + f"\n\n❌ Failed: {str(e)}" yield None, error_output return def generate_command_display(command_attempts): """Generate a markdown display of all command attempts""" if not command_attempts: return "### No commands generated" display = "### Generated Commands\n\n" for attempt in command_attempts: display += f"**Attempt {attempt['attempt']}** {attempt['status']}\n" display += f"```bash\n{attempt['command']}\n```\n" if attempt["status"] == "❌ Invalid" and "error" in attempt: display += f"
\n🔍 Error Details\n\n```\n{attempt['error']}\n```\n
\n\n" else: display += "\n" return display # Create MCP-compatible interface mcp_interface = gr.Interface( fn=compose_video, inputs=[ gr.Textbox( value="Create a slideshow with background music", label="Video Composition Instructions", ), gr.File(file_count="multiple", label="Media Files", file_types=allowed_medias), gr.Slider(0.0, 1.0, value=0.95, label="Top-p"), gr.Slider(0.0, 5.0, value=0.1, label="Temperature"), gr.Radio( choices=list(MODELS.keys()), value=list(MODELS.keys())[0], label="Model" ), ], outputs=gr.Video(label="Generated Video"), title="AI Video Composer MCP Tool", description="Compose videos from media assets using natural language", ) with gr.Blocks(css="footer {display:none !important}") as demo: gr.Markdown( """ # 🎬 Generative Video Editor Generate and execute editing commands that combine, edit and transform uploaded audio, image and video files based on your natural language descriptions. """, elem_id="header", ) with gr.Row(): with gr.Column(): user_prompt = gr.Textbox( placeholder="eg: Remove the 3 first seconds of the video", label="Instructions", lines=3, ) user_files = MediaGallery( file_types=allowed_medias, label="Media Assets", columns=3, interactive=True, ) btn = gr.Button("Run") with gr.Accordion("Parameters", open=False): model_choice = gr.Radio( choices=list(MODELS.keys()), value=list(MODELS.keys())[0], label="Model", ) top_p = gr.Slider( minimum=-0, maximum=1.0, value=0.95, step=0.05, interactive=True, label="Top-p (nucleus sampling)", ) temperature = gr.Slider( minimum=-0, maximum=5.0, value=0.1, step=0.1, interactive=True, label="Temperature", ) with gr.Column(): generated_command = gr.Textbox( label="Model Output", lines=5, max_lines=5, interactive=False, ) generated_video = gr.Video( interactive=False, label="Generated Video", include_audio=True ) btn.click( fn=update, inputs=[user_files, user_prompt, top_p, temperature, model_choice], outputs=[generated_video, generated_command], ) with gr.Row(): gr.Examples( examples=[ [ ["./examples/five.mp4"], "Add a text watermark 'Agent 5' to the upper right corner of the video with white text and semi-transparent background.", 0.95, 0.1, list(MODELS.keys())[0], ], [ ["./examples/five.mp4"], "Cut the video to extract only the middle 5 seconds (starting at 00:02 and ending at 00:07).", 0.95, 0.1, list(MODELS.keys())[0], ], [ ["./examples/five.mp4"], "Convert the video to black and white (grayscale) while maintaining the original audio..", 0.95, 0.1, list(MODELS.keys())[0], ], [ ["./examples/ai_talk.wav", "./examples/bg-image.png"], "Use the image as the background with a full-width waveform visualization for the audio. Make the waveform 250 pixels tall.", 0.95, 0.1, list(MODELS.keys())[0], ], [ [ "./examples/cat1.jpeg", "./examples/cat2.jpeg", "./examples/cat3.jpeg", "./examples/cat4.jpeg", "./examples/cat5.jpeg", "./examples/cat6.jpeg", "./examples/heat-wave.mp3", ], "Create a slide show of the cat images with the audio as background music. Make the video duration match the audio duration.", 0.95, 0.1, list(MODELS.keys())[0], ], ], inputs=[user_files, user_prompt, top_p, temperature, model_choice], outputs=[generated_video, generated_command], fn=update, run_on_click=False, cache_examples=False, ) with gr.Row(): gr.HTML("













") # Launch MCP interface for tool access mcp_interface.queue(default_concurrency_limit=200) # Launch main demo demo.queue(default_concurrency_limit=200) demo.launch(show_api=False, ssr_mode=False, mcp_server=True)