| |
|
|
| import os, zipfile, tempfile, subprocess, base64 |
| from pathlib import Path |
| from typing import List, Optional, Tuple |
| import gradio as gr |
|
|
| def try_load_logo_b64() -> str: |
| try: |
| with open("bifrost_logo.png", "rb") as f: |
| import base64 |
| return base64.b64encode(f.read()).decode("utf-8") |
| except Exception: |
| return "" |
| LOGO_B64 = try_load_logo_b64() |
|
|
| def render_logo_html(px: int = 96) -> str: |
| img = f'<img src="data:image/png;base64,{LOGO_B64}" style="height:{px}px;width:auto;" />' if LOGO_B64 else "" |
| return f""" |
| <div style="display:flex;align-items:center;gap:16px;"> |
| {img} |
| <div> |
| <div style="font-size:1.6rem;font-weight:800;">Valknut · Re-encode Images-to-Video</div> |
| <div style="opacity:0.8;">Valknut binds frames and sound, encoding into one unified whole</div> |
| </div> |
| </div> |
| <hr> |
| """ |
|
|
| def _which(name: str) -> Optional[str]: |
| from shutil import which |
| return which(name) |
|
|
| FFMPEG = _which("ffmpeg") |
| FFPROBE = _which("ffprobe") |
| MISSING_MSG = "" if (FFMPEG and FFPROBE) else ( |
| "⚠️ FFmpeg/FFprobe not found. Add a 'packages.txt' with:\n" |
| "ffmpeg\nlibsm6\nlibxext6\nand restart the Space." |
| ) |
|
|
| def render_progress(pct: float, label: str = "") -> str: |
| pct = max(0.0, min(100.0, pct)) |
| return f'''<div style="width:100%;border:1px solid #ddd;border-radius:8px;overflow:hidden;height:18px;"> |
| <div style="height:100%;width:{pct:.1f}%;background:#3b82f6;"></div></div> |
| <div style="font-size:12px;opacity:.8;margin-top:4px;">{label} {pct:.1f}%</div>''' |
|
|
| def prepare_frames_from_upload(files: List[gr.File] | None, prefix: str = "enc") -> Tuple[Optional[str], Optional[str]]: |
| if not files: |
| return None, None |
| work = Path(tempfile.mkdtemp(prefix="enc_")) |
| frames_dir = work / "frames"; frames_dir.mkdir(parents=True, exist_ok=True) |
| detected_prefix = None |
|
|
| if len(files) == 1 and Path(files[0].name).suffix.lower() == ".zip": |
| with zipfile.ZipFile(files[0].name, "r") as zf: |
| zf.extractall(frames_dir) |
| imgs = sorted(frames_dir.glob("*.jpg")) + sorted(frames_dir.glob("*.png")) |
| if imgs: |
| detected_prefix = Path(imgs[0]).stem.split("_")[0] |
| return str(frames_dir), detected_prefix or prefix |
|
|
| counter = 1 |
| for f in files: |
| src = Path(f.name) |
| if src.suffix.lower() not in [".jpg", ".jpeg", ".png"]: |
| continue |
| dst = frames_dir / f"{prefix}_{counter:05d}{src.suffix.lower()}" |
| src.replace(dst) if src.exists() else None |
| counter += 1 |
| return str(frames_dir), prefix |
|
|
| def build_ffmpeg_encode(frames_dir: str, prefix: str, fps: float, fmt: str, |
| include_audio: bool, orig_video: str | None) -> list[str]: |
| jpgs = sorted(Path(frames_dir).glob(f"{prefix}_*.jpg")) |
| pngs = sorted(Path(frames_dir).glob(f"{prefix}_*.png")) |
| imgs = jpgs if jpgs else pngs |
| if not imgs: |
| return [] |
|
|
| first_frame = imgs[0].name |
| pattern = str(imgs[0].with_name(f"{prefix}_%05d{imgs[0].suffix}")) |
| start_num = int(Path(first_frame).stem.split("_")[-1]) |
|
|
| args = [FFMPEG, "-y", "-start_number", str(start_num), |
| "-framerate", f"{fps:.6f}", "-i", pattern] |
|
|
| if include_audio and orig_video: |
| args += ["-i", orig_video, "-map", "0:v:0", "-map", "1:a:0", "-shortest"] |
|
|
| if fmt == "h265": |
| vcodec = ["-c:v", "libx265"] |
| elif fmt == "vp9": |
| vcodec = ["-c:v", "libvpx-vp9"] |
| else: |
| vcodec = ["-c:v", "libx264"] |
|
|
| out_name = "output.mp4" if fmt in ("h264", "h265") else "output.webm" |
| return args + vcodec + ["-pix_fmt", "yuv420p", "-crf", "18", out_name] |
|
|
|
|
| def step3_encode( |
| uploaded_frames: List[gr.File] | None, |
| uploaded_audio_video: gr.File | None, |
| fps: float, |
| fmt: str, |
| include_audio: bool, |
| prog_html: str, |
| ): |
| if not (FFMPEG and FFPROBE): |
| yield None, "FFmpeg/FFprobe missing. See note below.", prog_html |
| return |
| frames_dir, prefix = prepare_frames_from_upload(uploaded_frames, "enc") |
| if not frames_dir or not prefix: |
| yield None, "No frames provided. Upload a ZIP or images.", prog_html |
| return |
|
|
| orig_path = uploaded_audio_video.name if uploaded_audio_video else None |
| cmd = build_ffmpeg_encode(frames_dir, prefix, float(fps or 30.0), fmt, include_audio, orig_path) |
|
|
| |
| cmd = [cmd[0], "-progress", "pipe:2"] + cmd[1:] |
| proc = subprocess.Popen( |
| cmd, stderr=subprocess.PIPE, stdout=subprocess.DEVNULL, |
| text=True, bufsize=1, cwd=frames_dir |
| ) |
| |
| |
| total_frames = len(list(Path(frames_dir).glob(f"{prefix}_*.jpg"))) + len(list(Path(frames_dir).glob(f"{prefix}_*.png"))) |
| current = 0 |
| errors = [] |
| |
| while True: |
| line = proc.stderr.readline() |
| if not line and proc.poll() is not None: |
| break |
| |
| if "frame=" in line: |
| try: |
| current = int(line.strip().split("=")[-1]) |
| except Exception: |
| pass |
| if total_frames > 0: |
| pct = min(100.0, (current / total_frames) * 100.0) |
| yield None, f"Encoding… {current}/{total_frames} frames", render_progress(pct, f"Encoding {pct:.0f}%") |
| else: |
| yield None, "Encoding…", render_progress(50.0, "Encoding…") |
| |
| |
| if "Error" in line or "No such file" in line: |
| errors.append(line.strip()) |
| |
| ret = proc.wait() |
| out_file = Path(frames_dir) / ("output.mp4" if fmt in ("h264", "h265") else "output.webm") |
| |
| if ret != 0 or not out_file.exists(): |
| err = "\n".join(errors) or "Unknown ffmpeg error." |
| yield None, f"Encoding failed.\n\n{err}", render_progress(0.0, "Failed") |
| return |
|
|
|
|
| yield str(out_file), f"Video created: {out_file.name}", render_progress(100.0, "Encoding complete") |
|
|
| def build_ui(): |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.HTML(render_logo_html(88)) |
| gr.Markdown("Re-encode a folder/ZIP of frames into a video. Optionally mix audio from a video/audio file.") |
|
|
| uploaded_frames = gr.Files(label="Upload frames (ZIP or images)", type="filepath") |
| uploaded_audio = gr.File(label="Optional: video/audio for audio track", file_types=[".mp4",".mov",".mkv",".webm",".mp3",".wav"], type="filepath") |
|
|
| with gr.Row(): |
| fps = gr.Number(value=30.0, label="FPS") |
| fmt = gr.Dropdown(["h264", "h265", "vp9"], value="h264", label="Video format") |
| include_audio = gr.Checkbox(True, label="Include audio if available") |
|
|
| btn_encode = gr.Button("Create Video", variant="primary") |
| prog = gr.HTML(render_progress(0.0, "Idle")) |
| video_player = gr.Video(label="Output video") |
| details = gr.Markdown("") |
|
|
| btn_encode.click( |
| step3_encode, |
| inputs=[uploaded_frames, uploaded_audio, fps, fmt, include_audio, prog], |
| outputs=[video_player, details, prog], |
| ) |
|
|
| if MISSING_MSG: |
| gr.Markdown(f"<span style='color:#b45309'>{MISSING_MSG}</span>") |
|
|
| return demo |
|
|
| if __name__ == "__main__": |
| build_ui().queue().launch() |
|
|