| |
| """ |
| Modern UI for Video Background Replacer (PRO) |
| - File-based progress polling to keep session alive |
| - Real-time progress updates during long-running operations |
| """ |
| import streamlit as st |
| import os |
| from pathlib import Path |
| from PIL import Image |
| import numpy as np |
| import logging |
| import time |
| import threading |
|
|
| from utils.progress_tracker import get_progress |
|
|
| logger = logging.getLogger("Advanced Video Background Replacer") |
| UI_BUILD = "ui-2025-10-04-17-00Z" |
|
|
| def tail_file(path: str, lines: int = 400) -> str: |
| if not os.path.exists(path): |
| return "(log file not found)" |
| try: |
| with open(path, "r", encoding="utf-8", errors="replace") as f: |
| content = f.readlines() |
| return "".join(content[-lines:]) |
| except Exception as e: |
| return f"(failed to read log: {e})" |
|
|
| def read_file_bytes(path: str) -> bytes: |
| try: |
| if not os.path.exists(path): |
| return b"" |
| with open(path, "rb") as f: |
| return f.read() |
| except Exception: |
| return b"" |
|
|
| def _render_background_settings(): |
| stock_images = { |
| "Sunset Beach": "stock_images/sunset_beach.jpg", |
| "Urban Office": "stock_images/urban_office.jpg", |
| "Studio Lighting": "stock_images/studio_light.jpg", |
| } |
| st.header("2. Background Settings") |
| bg_type = st.radio( |
| "Select Background Type:", |
| ["Image", "Color", "Stock", "AI Prompt"], |
| horizontal=True, |
| key="bg_type_radio" |
| ) |
| background = None |
|
|
| if bg_type == "Image": |
| bg_image = st.file_uploader( |
| "Upload Background Image", |
| type=["jpg", "png", "jpeg"], |
| key="bg_image_uploader" |
| ) |
| if bg_image is not None: |
| bg_image.seek(0) |
| background = Image.open(bg_image).convert("RGB") |
| st.image(background, caption="Selected Background", use_column_width=True) |
|
|
| elif bg_type == "Color": |
| selected_color = st.color_picker( |
| "Choose Background Color", |
| st.session_state.get('bg_color', "#00FF00"), |
| key="color_picker" |
| ) |
| background = selected_color |
| color_preview = np.full( |
| (100, 100, 3), |
| tuple(int(selected_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)), |
| dtype=np.uint8 |
| ) |
| st.image(color_preview, caption="Selected Color", width=200) |
|
|
| elif bg_type == "Stock": |
| stock_choice = st.selectbox( |
| "Choose a professional stock background:", |
| list(stock_images.keys()), |
| key="stock_image_select" |
| ) |
| stock_img_path = stock_images[stock_choice] |
| try: |
| background = Image.open(stock_img_path).convert("RGB") |
| st.image(background, caption=stock_choice, use_column_width=True) |
| except FileNotFoundError: |
| st.warning(f"Stock image not found: {stock_img_path}. Upload your own image or choose another.") |
| background = None |
|
|
| elif bg_type == "AI Prompt": |
| prompt = st.text_input("Describe the background to generate (AI):", key="ai_bg_prompt") |
| ai_ready = False |
| if st.button("Generate Background", key="gen_bg_btn") and prompt: |
| background = Image.new("RGB", (512, 320), (64, 32, 96)) |
| st.session_state.generated_bg = background |
| st.success("AI-generated background (stub). Replace with your generator!") |
| ai_ready = True |
| elif "generated_bg" in st.session_state: |
| background = st.session_state.generated_bg |
| ai_ready = True |
| if ai_ready and background is not None: |
| st.image(background, caption="Generated Background", use_column_width=True) |
|
|
| return background, bg_type |
|
|
| def render_ui(process_video_func): |
| try: |
| with st.sidebar: |
| st.subheader("System Status") |
| st.caption(f"UI build: {UI_BUILD}") |
| st.markdown("**Log file:** `/tmp/app.log`") |
|
|
| if st.session_state.get('gpu_available', False): |
| try: |
| import torch |
| dev = torch.cuda.get_device_name(0) |
| except Exception: |
| dev = "Detected (name unavailable)" |
| st.success(f"GPU: {dev}") |
| else: |
| st.error("GPU: Not Available") |
|
|
| st.number_input("Tail last N lines", min_value=50, max_value=5000, step=50, key="log_tail_lines") |
| log_bytes = read_file_bytes("/tmp/app.log") |
| st.download_button( |
| "Download Logs", |
| data=log_bytes if log_bytes else b"Log file not available yet.", |
| file_name="app.log", |
| mime="text/plain", |
| use_container_width=True, |
| disabled=not bool(log_bytes) |
| ) |
| with st.expander("View Log Tail", expanded=True): |
| if st.button("Refresh log", use_container_width=True, key="refresh_log_btn"): |
| st.session_state['_last_log_refresh'] = time.time() |
| log_text = tail_file("/tmp/app.log", st.session_state.get('log_tail_lines', 400)) |
| st.code(log_text, language="text") |
|
|
| col1, col2 = st.columns([1, 1], gap="large") |
|
|
| with col1: |
| st.header("1. Upload Video") |
| uploaded_video = st.file_uploader( |
| "Upload Video", |
| type=["mp4", "mov", "avi", "mkv", "webm"], |
| key="video_uploader" |
| ) |
| st.markdown("### Video Preview") |
| video_preview_placeholder = st.empty() |
| if uploaded_video is not None: |
| try: |
| uploaded_video.seek(0) |
| video_bytes = uploaded_video.read() |
| st.session_state.video_bytes_cache = video_bytes |
| with video_preview_placeholder.container(): |
| st.video(video_bytes) |
| except Exception as e: |
| logger.error(f"[UI] Video preview error: {e}", exc_info=True) |
| video_preview_placeholder.error(f"Cannot display video: {e}") |
| else: |
| video_preview_placeholder.empty() |
|
|
| with col2: |
| background, bg_type = _render_background_settings() |
| st.header("3. Process Video") |
| |
| progress_container = st.container() |
| with progress_container: |
| progress_bar = st.progress(0) |
| status_text = st.empty() |
| stage_status = st.empty() |
| |
| can_process = ( |
| st.session_state.get('video_bytes_cache') is not None |
| and not st.session_state.get('processing', False) |
| and (background is not None) |
| ) |
|
|
| if st.button("Process Video", disabled=not can_process, use_container_width=True): |
| try: |
| logger.info("Process Video button clicked") |
| |
| import io |
| class _MemFile: |
| def __init__(self, name, data): |
| self.name = name |
| self._b = io.BytesIO(data) |
| def read(self): |
| self._b.seek(0) |
| return self._b.read() |
| def seek(self, pos): |
| self._b.seek(pos) |
|
|
| st.session_state.processing = True |
| mem_video = _MemFile("uploaded.mp4", st.session_state.video_bytes_cache) |
|
|
| |
| thread = threading.Thread( |
| target=process_video_func, |
| args=(mem_video, background, bg_type.lower()), |
| daemon=True |
| ) |
| thread.start() |
|
|
| |
| while thread.is_alive() or st.session_state.get('processing', False): |
| status = get_progress() |
| |
| if status.get('active') or not status.get('complete'): |
| |
| progress_bar.progress(status.get('progress', 0)) |
| status_text.info(f"**Status:** {status.get('message', 'Processing...')}") |
| stage_status.markdown(f"**Current Stage:** {status.get('stage', 'Unknown')}") |
| |
| |
| if status.get('error'): |
| status_text.error(f"**Error:** {status['error']}") |
| break |
| |
| |
| if status.get('complete'): |
| break |
| |
| time.sleep(1) |
| |
| |
| thread.join(timeout=5) |
| |
| |
| final_status = get_progress() |
| if final_status.get('error'): |
| progress_bar.progress(final_status.get('progress', 0)) |
| status_text.error(f"**Status:** Processing failed - {final_status['error']}") |
| st.error(f"Processing failed: {final_status['error']}") |
| elif st.session_state.get('processed_video_bytes'): |
| progress_bar.progress(100) |
| status_text.success("**Status:** Processing complete!") |
| st.success("Video processing complete!") |
| else: |
| status_text.error("**Status:** Processing failed. Check logs.") |
| st.error("Processing failed. Check logs for details.") |
| |
| except Exception as e: |
| st.session_state.processing = False |
| logger.error(f"[UI] Process video error: {e}", exc_info=True) |
| status_text.error(f"**Status:** Error - {str(e)}") |
| st.error(f"Processing error: {str(e)}. Check logs for details.") |
|
|
| if st.session_state.get('processed_video_bytes') is not None: |
| st.markdown("---") |
| st.markdown("### Processed Video") |
| try: |
| st.video(st.session_state.processed_video_bytes) |
| st.download_button( |
| label="Download Processed Video", |
| data=st.session_state.processed_video_bytes, |
| file_name="processed_video.mp4", |
| mime="video/mp4", |
| use_container_width=True |
| ) |
| except Exception as e: |
| logger.error(f"[UI] Display error: {e}", exc_info=True) |
| st.error(f"Display error: {e}") |
|
|
| if st.session_state.get('last_error'): |
| with st.expander("Last Error", expanded=True): |
| st.error(st.session_state.last_error) |
| if st.button("Clear Error"): |
| st.session_state.last_error = None |
| |
| except Exception as e: |
| logger.error(f"[UI] Render UI error: {e}", exc_info=True) |
| st.error(f"UI rendering error: {str(e)}. Check logs for details.") |