smartdigitalnetworks's picture
Update app.py
7b4a4fa verified
raw
history blame
27.9 kB
import subprocess
import sys
# Install local mediagallery package at runtime (for HF Spaces)
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "./mediagallery"])
import gradio as gr
import spaces
from gradio_mediagallery import MediaGallery
from PIL import Image
from moviepy.editor import VideoFileClip, AudioFileClip
import os
from openai import OpenAI
import subprocess
from pathlib import Path
import uuid
import tempfile
import shlex
import shutil
# Supported models configuration
MODELS = {
"zai-org/GLM-4.7-Flash": {
"base_url": "https://router.huggingface.co/v1",
"env_key": "HF_TOKEN",
"model_name": "zai-org/GLM-4.7-Flash:novita",
},
"moonshotai/Kimi-K2-Instruct": {
"base_url": "https://router.huggingface.co/v1",
"env_key": "HF_TOKEN",
"model_name": "moonshotai/Kimi-K2-Instruct-0905:groq",
},
}
# Initialize client with first available model
client = OpenAI(
base_url=next(iter(MODELS.values()))["base_url"],
api_key=os.environ[next(iter(MODELS.values()))["env_key"]],
)
allowed_medias = [
".png",
".jpg",
".webp",
".jpeg",
".tiff",
".bmp",
".gif",
".svg",
".mp3",
".wav",
".ogg",
".mp4",
".avi",
".mov",
".mkv",
".flv",
".wmv",
".webm",
".mpg",
".mpeg",
".m4v",
".3gp",
".3g2",
".3gpp",
]
class FileWrapper:
"""Wrapper to provide .name attribute for MediaGallery output tuples."""
def __init__(self, path):
self.name = path if isinstance(path, str) else str(path)
def normalize_files(files):
"""Convert MediaGallery output or gr.File output to list of file-like objects."""
if not files:
return []
result = []
for item in files:
if isinstance(item, tuple):
# MediaGallery returns (path, caption) tuples
path = item[0]
result.append(FileWrapper(path))
elif hasattr(item, "name"):
# gr.File returns objects with .name attribute
result.append(item)
elif isinstance(item, str):
# Direct file path
result.append(FileWrapper(item))
else:
result.append(FileWrapper(str(item)))
return result
def get_files_infos(files):
files = normalize_files(files)
results = []
for file in files:
file_path = Path(file.name)
info = {}
info["size"] = os.path.getsize(file_path)
# Sanitize filename by replacing spaces with underscores
info["name"] = file_path.name.replace(" ", "_")
file_extension = file_path.suffix
if file_extension in (".mp4", ".avi", ".mkv", ".mov"):
info["type"] = "video"
video = VideoFileClip(file.name)
info["duration"] = video.duration
info["dimensions"] = "{}x{}".format(video.size[0], video.size[1])
if video.audio:
info["type"] = "video/audio"
info["audio_channels"] = video.audio.nchannels
video.close()
elif file_extension in (".mp3", ".wav"):
info["type"] = "audio"
audio = AudioFileClip(file.name)
info["duration"] = audio.duration
info["audio_channels"] = audio.nchannels
audio.close()
elif file_extension in (
".png",
".jpg",
".jpeg",
".tiff",
".bmp",
".gif",
".svg",
):
info["type"] = "image"
img = Image.open(file.name)
info["dimensions"] = "{}x{}".format(img.size[0], img.size[1])
results.append(info)
return results
def get_completion(
prompt,
files_info,
top_p,
temperature,
model_choice,
conversation_history=None,
previous_error=None,
previous_command=None,
on_stream=None,
):
# Create table header
files_info_string = "| Type | Name | Dimensions | Duration | Audio Channels |\n"
files_info_string += "|------|------|------------|-----------|--------|\n"
# Add each file as a table row
for file_info in files_info:
dimensions = file_info.get("dimensions", "-")
duration = (
f"{file_info.get('duration', '-')}s" if "duration" in file_info else "-"
)
audio = (
f"{file_info.get('audio_channels', '-')} channels"
if "audio_channels" in file_info
else "-"
)
files_info_string += f"| {file_info['type']} | {file_info['name']} | {dimensions} | {duration} | {audio} |\n"
# Build the user message with optional error feedback
user_content = f"""## AVAILABLE ASSETS
{files_info_string}
## TASK
{prompt}
## REQUIREMENTS
- Output format: MP4 video saved as "output.mp4"
- Generate a single, complete FFmpeg command
- Command must work with the exact filenames listed above
Think briefly about the approach, then output the FFmpeg command in a ```bash code block."""
# Add error feedback if this is a retry
if previous_error and previous_command:
user_content += f"""
IMPORTANT: This is a retry attempt. The previous command failed with the following error:
PREVIOUS COMMAND (FAILED):
{previous_command}
ERROR MESSAGE:
{previous_error}
Please analyze the error and generate a corrected command that addresses the specific issue.
COMMON ERROR FIXES:
- If you see "do not match the corresponding output link" Images have different dimensions, use scale+pad approach
- If you see "Padded dimensions cannot be smaller than input dimensions" Fix pad calculation or use standard resolution (1920x1080 or 1080x1920)
- If you see "Failed to configure input pad" Check scale and pad syntax, ensure proper filter chain
- If you see "Invalid argument" in filters Simplify filter_complex syntax and check parentheses
- If you see "No option name near" with showwaves Use 'x' for size: s=1920x200 (NOT s=1920:200)
FORMAT DETECTION KEYWORDS:
- "vertical", "portrait", "9:16", "TikTok", "Instagram Stories", "phone" Use 1080x1920
- "horizontal", "landscape", "16:9", "YouTube", "TV" Use 1920x1080 (default)
- "square", "1:1", "Instagram post" Use 1080x1080"""
user_content += "\n\nYOUR RESPONSE:"
# Initialize conversation with system message and first user message
if conversation_history is None:
messages = [
{
"role": "system",
"content": """You are an expert FFmpeg engineer. Generate precise, working FFmpeg commands.
## OUTPUT FORMAT
1. Brief analysis (2-3 sentences max)
2. Single FFmpeg command in a ```bash code block
3. Output file must be "output.mp4"
## CORE RULES
- ONE command only, no chaining (no && or ;)
- Use exact filenames from the asset list
- Keep commands as simple as possible
- Always use: -c:v libx264 -pix_fmt yuv420p -movflags +faststart
## SLIDESHOW PATTERN (for multiple images)
When combining images with different dimensions:
```bash
ffmpeg -loop 1 -t 3 -i img1.jpg -loop 1 -t 3 -i img2.jpg -filter_complex "[0]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1[v0];[1]scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1[v1];[v0][v1]concat=n=2:v=1:a=0" -c:v libx264 -pix_fmt yuv420p output.mp4
```
- Default: 1920x1080, 3 seconds per image
- Vertical/portrait/TikTok: use 1080x1920
- Always scale+pad to normalize dimensions
## AUDIO WAVEFORM
For full-width waveform visualization (waveform width = video width):
```bash
ffmpeg -i audio.mp3 -i bg.png -filter_complex "[0:a]showwaves=s=1920x200:mode=line:colors=white[wave];[1]scale=1920:1080[bg];[bg][wave]overlay=0:(H-h)/2" -c:v libx264 -c:a aac output.mp4
```
CRITICAL:
- showwaves size uses 'x' separator: s=WIDTHxHEIGHT (NOT s=WIDTH:HEIGHT)
- For full-width: set waveform width = video width (e.g., s=1920x200 for 1920px wide video)
- overlay=0:(H-h)/2 positions at x=0 (full width) and centers vertically
## WITH BACKGROUND MUSIC
Add audio to video/slideshow:
```bash
ffmpeg ... -i music.mp3 -map "[vout]" -map N:a -shortest -c:a aac output.mp4
```
Where N is the audio input index.""",
},
{
"role": "user",
"content": user_content,
},
]
else:
# Use existing conversation history
messages = conversation_history[:]
# If there's a previous error, add it as a separate message exchange
if previous_error and previous_command:
# Add the failed command as assistant response
messages.append(
{
"role": "assistant",
"content": f"I'll execute this FFmpeg command:\n\n```bash\n{previous_command}\n```",
}
)
# Add the error as user feedback
messages.append(
{
"role": "user",
"content": f"""The command failed with the following error:
ERROR MESSAGE:
{previous_error}
Please analyze the error and generate a corrected command that addresses the specific issue.
COMMON ERROR FIXES:
- If you see "do not match the corresponding output link" Images have different dimensions, use scale+pad approach
- If you see "Padded dimensions cannot be smaller than input dimensions" Fix pad calculation or use standard resolution (1920x1080 or 1080x1920)
- If you see "Failed to configure input pad" Check scale and pad syntax, ensure proper filter chain
- If you see "Invalid argument" in filters Simplify filter_complex syntax and check parentheses
- If you see "No option name near" with showwaves Use 'x' for size: s=1920x200 (NOT s=1920:200)
FORMAT DETECTION KEYWORDS:
- "vertical", "portrait", "9:16", "TikTok", "Instagram Stories", "phone" Use 1080x1920
- "horizontal", "landscape", "16:9", "YouTube", "TV" Use 1920x1080 (default)
- "square", "1:1", "Instagram post" Use 1080x1080
Please provide a corrected FFmpeg command.""",
}
)
else:
# Add new user request to existing conversation
messages.append(
{
"role": "user",
"content": user_content,
}
)
try:
# Print the complete prompt
print("\n=== COMPLETE PROMPT ===")
for msg in messages:
print(f"\n[{msg['role'].upper()}]:")
print(msg["content"])
print("=====================\n")
if model_choice not in MODELS:
raise ValueError(f"Model {model_choice} is not supported")
model_config = MODELS[model_choice]
client.base_url = model_config["base_url"]
client.api_key = os.environ[model_config["env_key"]]
model = model_config.get("model_name", model_choice)
if on_stream:
# Streaming mode
stream = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=2048,
stream=True,
)
content = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content += chunk.choices[0].delta.content
on_stream(content)
else:
# Non-streaming mode
completion = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=2048,
)
content = completion.choices[0].message.content
print(f"\n=== RAW API RESPONSE ===\n{content}\n========================\n")
# Extract command from code block if present
import re
command = None
# Try multiple code block patterns
code_patterns = [
r"```(?:bash|sh|shell)?\n(.*?)\n```", # Standard code blocks
r"```\n(.*?)\n```", # Plain code blocks
r"`([^`]*ffmpeg[^`]*)`", # Inline code with ffmpeg
]
for pattern in code_patterns:
matches = re.findall(pattern, content, re.DOTALL | re.IGNORECASE)
for match in matches:
if "ffmpeg" in match.lower():
command = match.strip()
break
if command:
break
# If no code block found, try to find ffmpeg lines directly
if not command:
ffmpeg_lines = [
line.strip()
for line in content.split("\n")
if line.strip().lower().startswith("ffmpeg")
]
if ffmpeg_lines:
command = ffmpeg_lines[0]
# Last resort: look for any line containing ffmpeg
if not command:
for line in content.split("\n"):
line = line.strip()
if "ffmpeg" in line.lower() and len(line) > 10:
command = line
break
if not command:
print(f"ERROR: No ffmpeg command found in response")
command = content.replace("\n", " ").strip()
print(f"=== EXTRACTED COMMAND ===\n{command}\n========================\n")
# remove output.mp4 with the actual output file path
command = command.replace("output.mp4", "")
# Add the assistant's response to conversation history
messages.append({"role": "assistant", "content": content})
return command, messages
except Exception as e:
raise Exception("API Error")
@spaces.GPU(duration=120)
def execute_ffmpeg_command(args, temp_dir, output_file_path):
"""Execute FFmpeg command with GPU acceleration"""
final_command = args + ["-y", output_file_path]
print(f"\n=== EXECUTING FFMPEG COMMAND ===\nffmpeg {' '.join(final_command[1:])}\n")
subprocess.run(final_command, cwd=temp_dir)
return output_file_path
def compose_video(
prompt: str,
files: list = None,
top_p: float = 0.95,
temperature: float = 0.1,
model_choice: str = "zai-org/GLM-4.7-Flash",
) -> str:
"""
Compose videos from existing media assets using natural language instructions.
This tool is NOT for AI video generation. Instead, it uses AI to generate FFmpeg
commands that combine, edit, and transform your uploaded images, videos, and audio
files based on natural language descriptions.
Args:
prompt (str): Natural language instructions for video composition (e.g., "Create a slideshow with background music")
files (list, optional): List of media files (images, videos, audio) to use
top_p (float): Top-p sampling parameter for AI model (0.0-1.0, default: 0.95)
temperature (float): Temperature parameter for AI model creativity (0.0-5.0, default: 0.1)
model_choice (str): AI model to use for command generation (default: "zai-org/GLM-4.7-Flash")
Returns:
str: Path to the generated video file
Example:
compose_video("Create a 10-second slideshow from the images with fade transitions", files=[img1, img2, img3])
"""
return update(files or [], prompt, top_p, temperature, model_choice)
def update(
files,
prompt,
top_p=1,
temperature=1,
model_choice="zai-org/GLM-4.7-Flash",
):
if prompt == "":
raise gr.Error("Please enter a prompt.")
# Normalize files from MediaGallery or gr.File format
files = normalize_files(files)
files_info = get_files_infos(files)
# disable this if you're running the app locally or on your own server
for file_info in files_info:
if file_info["type"] == "video":
if file_info["duration"] > 120:
raise gr.Error(
"Please make sure all videos are less than 2 minute long."
)
if file_info["size"] > 100000000:
raise gr.Error("Please make sure all files are less than 100MB in size.")
attempts = 0
command_attempts = []
previous_error = None
previous_command = None
conversation_history = None
# Mutable container for streaming text and yield function
stream_state = {"text": "", "should_yield": False}
def get_last_lines(text, n=5):
"""Return last n lines of text"""
lines = text.strip().split('\n')
return '\n'.join(lines[-n:]) if lines else ""
def on_stream(text):
stream_state["text"] = text
stream_state["should_yield"] = True
while attempts < 2:
print("ATTEMPT", attempts + 1)
try:
# Show generating status
attempt_label = f" (retry {attempts})" if attempts > 0 else ""
yield None, f" Generating command{attempt_label}...\n"
# We need to stream the response - using a thread to allow yielding
import threading
result_holder = {"command": None, "history": None, "error": None}
def run_completion():
try:
cmd, hist = get_completion(
prompt,
files_info,
top_p,
temperature,
model_choice,
conversation_history,
previous_error,
previous_command,
on_stream=on_stream,
)
result_holder["command"] = cmd
result_holder["history"] = hist
except Exception as e:
result_holder["error"] = e
thread = threading.Thread(target=run_completion)
thread.start()
# Yield updates while streaming (show last 5 lines)
while thread.is_alive():
if stream_state["should_yield"]:
yield None, get_last_lines(stream_state["text"])
stream_state["should_yield"] = False
thread.join(timeout=0.1)
# Final yield of complete text (last 5 lines)
if stream_state["text"]:
yield None, get_last_lines(stream_state["text"])
if result_holder["error"]:
raise result_holder["error"]
command_string = result_holder["command"]
conversation_history = result_holder["history"]
print(
f"""///PROMPT {prompt} \n\n/// START OF COMMAND ///:\n\n{command_string}\n\n/// END OF COMMAND ///\n\n"""
)
# split command string into list of arguments
args = shlex.split(command_string)
if args[0] != "ffmpeg":
raise Exception("Command does not start with ffmpeg")
temp_dir = tempfile.mkdtemp()
# copy files to temp dir with sanitized names
for file in files:
file_path = Path(file.name)
sanitized_name = file_path.name.replace(" ", "_")
shutil.copy(file_path, Path(temp_dir) / sanitized_name)
# test if ffmpeg command is valid dry run
ffmpeg_dry_run = subprocess.run(
args + ["-f", "null", "-"],
stderr=subprocess.PIPE,
text=True,
cwd=temp_dir,
)
# Extract command for display
command_for_display = f"ffmpeg {' '.join(args[1:])} -y output.mp4"
if ffmpeg_dry_run.returncode == 0:
print("Command is valid.")
# Add successful command to attempts
command_attempts.append(
{
"command": command_for_display,
"status": " Valid",
"attempt": attempts + 1,
}
)
else:
print("Command is not valid. Error output:")
print(ffmpeg_dry_run.stderr)
# Add failed command to attempts with error
command_attempts.append(
{
"command": command_for_display,
"status": " Invalid",
"error": ffmpeg_dry_run.stderr,
"attempt": attempts + 1,
}
)
# Store error details for next retry
previous_error = ffmpeg_dry_run.stderr
previous_command = command_for_display
yield None, f" Command invalid, retrying..."
raise Exception(
f"FFMPEG command validation failed: {ffmpeg_dry_run.stderr}"
)
output_file_name = f"output_{uuid.uuid4()}.mp4"
output_file_path = str((Path(temp_dir) / output_file_name).resolve())
# Show status before executing
yield None, get_last_lines(stream_state["text"] + "\n\n Executing FFmpeg...")
execute_ffmpeg_command(args, temp_dir, output_file_path)
# Generate command display with all attempts
final_output = get_last_lines(stream_state["text"]) + "\n\n Done!"
yield output_file_path, final_output
return
except Exception as e:
attempts += 1
if attempts >= 2:
print("FROM UPDATE", e)
# Show model output with error (last 5 lines)
error_output = get_last_lines(stream_state["text"]) + f"\n\n Failed: {str(e)}"
yield None, error_output
return
def generate_command_display(command_attempts):
"""Generate a markdown display of all command attempts"""
if not command_attempts:
return "### No commands generated"
display = "### Generated Commands\n\n"
for attempt in command_attempts:
display += f"**Attempt {attempt['attempt']}** {attempt['status']}\n"
display += f"```bash\n{attempt['command']}\n```\n"
if attempt["status"] == " Invalid" and "error" in attempt:
display += f"<details>\n<summary> Error Details</summary>\n\n```\n{attempt['error']}\n```\n</details>\n\n"
else:
display += "\n"
return display
# Create MCP-compatible interface
mcp_interface = gr.Interface(
fn=compose_video,
inputs=[
gr.Textbox(
value="Create a slideshow with background music",
label="Video Composition Guidance",
),
gr.File(file_count="multiple", label="Media Files", file_types=allowed_medias),
gr.Slider(0.0, 1.0, value=0.95, label="Intensity"),
gr.Slider(0.0, 5.0, value=0.1, label="Probability"),
gr.Radio(
choices=list(MODELS.keys()), value=list(MODELS.keys())[0], label="Model"
),
],
outputs=gr.Video(label="Generated Video"),
title="Generative Video Editor MCP Tool",
description="Compose videos from media assets using natural language",
)
with gr.Blocks(css="footer {display:none !important}") as demo:
gr.Markdown(
"""
# 🎬 Generative Video Editor
Generate and execute editing commands that combine, edit and transform uploaded audio, image and video files based on your natural language descriptions.
""",
elem_id="header",
)
with gr.Row():
with gr.Column():
user_files = MediaGallery(
file_types=allowed_medias,
label="Media Assets",
columns=3,
interactive=True,
)
user_prompt = gr.Textbox(
placeholder="eg: Remove the 3 first seconds of the video",
label="Guidance",
lines=3,
)
btn = gr.Button("Generate Edits")
with gr.Accordion("Models", open=False):
model_choice = gr.Radio(
choices=list(MODELS.keys()),
value=list(MODELS.keys())[0],
label="Model",
)
top_p = gr.Slider(
minimum=-0,
maximum=1.0,
value=0.95,
step=0.05,
interactive=True,
visible=False,
label="Intensity",
)
temperature = gr.Slider(
minimum=-0,
maximum=5.0,
value=0.1,
step=0.1,
interactive=True,
visible=False,
label="Probability",
)
with gr.Column():
generated_command = gr.Textbox(
label="Model Output",
lines=5,
max_lines=5,
interactive=False,
)
generated_video = gr.Video(
interactive=False, label="Generated Video", include_audio=True
)
btn.click(
fn=update,
inputs=[user_files, user_prompt, top_p, temperature, model_choice],
outputs=[generated_command, generated_video],
)
with gr.Row():
gr.Examples(
examples=[
[
["./examples/welcomeiamagentfive.mp4"],
"Add a text watermark 'Agent 5' to the upper right corner of the video with white text and semi-transparent background.",
0.95,
0.1,
list(MODELS.keys())[0],
],
[
["./examples/welcomeiamagentfive.mp4"],
"Cut the video to extract only the middle 5 seconds (starting at 00:02 and ending at 00:07).",
0.95,
0.1,
list(MODELS.keys())[0],
],
[
["./examples/welcomeiamagentfive.mp4"],
"Convert the video to black and white (grayscale) while maintaining the original audio..",
0.95,
0.1,
list(MODELS.keys())[0],
],
[
["./examples/ai_talk.wav", "./examples/bg-image.png"],
"Use the image as the background with a full-width waveform visualization for the audio.",
0.95,
0.1,
list(MODELS.keys())[0],
],
[
[
"./examples/cat1.jpeg",
"./examples/cat2.jpeg",
"./examples/cat3.jpeg",
"./examples/cat4.jpeg",
"./examples/cat5.jpeg",
"./examples/cat6.jpeg",
"./examples/heat-wave.mp3",
],
"Create a slide show of the cat images with the audio as background music. Make the video duration match the audio duration.",
0.95,
0.1,
list(MODELS.keys())[0],
],
],
inputs=[user_files, user_prompt, top_p, temperature, model_choice],
outputs=[generated_command, generated_video],
fn=update,
run_on_click=True,
cache_examples=False,
)
with gr.Row():
gr.HTML("<br><br><br><br><br><br><br><br><br><br><br><br><br><br>")
# Launch MCP interface for tool access
mcp_interface.queue(default_concurrency_limit=20)
# Launch main demo
demo.queue(default_concurrency_limit=20)
demo.launch(show_api=True, ssr_mode=False, mcp_server=True)