| """UI logic for the "Processar vídeo nou" page.""" |
|
|
| from __future__ import annotations |
|
|
| import re |
| import shutil |
| import subprocess |
| from pathlib import Path |
|
|
| import streamlit as st |
|
|
|
|
| def _get_video_duration(path: str) -> float: |
| """Return video duration in seconds using ffprobe, ffmpeg or OpenCV as fallback.""" |
| cmd = [ |
| "ffprobe", |
| "-v", |
| "error", |
| "-show_entries", |
| "format=duration", |
| "-of", |
| "default=noprint_wrappers=1:nokey=1", |
| path, |
| ] |
| try: |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
| return float(result.stdout.strip()) |
| except (subprocess.CalledProcessError, ValueError, FileNotFoundError): |
| pass |
|
|
| if shutil.which("ffmpeg"): |
| try: |
| ffmpeg_cmd = ["ffmpeg", "-i", path] |
| result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False) |
| output = result.stderr or result.stdout or "" |
| match = re.search(r"Duration:\s*(\d+):(\d+):(\d+\.\d+)", output) |
| if match: |
| hours, minutes, seconds = match.groups() |
| total_seconds = (int(hours) * 3600) + (int(minutes) * 60) + float(seconds) |
| return float(total_seconds) |
| except FileNotFoundError: |
| pass |
|
|
| |
| try: |
| import cv2 |
|
|
| cap = cv2.VideoCapture(path) |
| if cap.isOpened(): |
| fps = cap.get(cv2.CAP_PROP_FPS) or 0 |
| frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0 |
| cap.release() |
|
|
| if fps > 0 and frame_count > 0: |
| return float(frame_count / fps) |
| else: |
| cap.release() |
| except Exception: |
| pass |
|
|
| return 0.0 |
|
|
|
|
| def _transcode_video(input_path: str, output_path: str, max_duration: int | None = None) -> None: |
| cmd = ["ffmpeg", "-y", "-i", input_path] |
| if max_duration is not None: |
| cmd += ["-t", str(max_duration)] |
| cmd += [ |
| "-c:v", |
| "libx264", |
| "-preset", |
| "veryfast", |
| "-crf", |
| "23", |
| "-c:a", |
| "aac", |
| "-movflags", |
| "+faststart", |
| output_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(result.stderr.strip() or "ffmpeg failed") |
|
|
|
|
| def render_process_video_page() -> None: |
| st.header("Processar un nou clip de vídeo") |
|
|
| |
| if "video_uploaded" not in st.session_state: |
| st.session_state.video_uploaded = None |
| if "characters_detected" not in st.session_state: |
| st.session_state.characters_detected = None |
| if "characters_saved" not in st.session_state: |
| st.session_state.characters_saved = False |
|
|
| |
| MAX_SIZE_MB = 20 |
| MAX_DURATION_S = 240 |
|
|
| uploaded_file = st.file_uploader( |
| "Puja un clip de vídeo (MP4, < 20MB, < 4 minuts)", |
| type=["mp4"], |
| key="video_uploader", |
| ) |
|
|
| if uploaded_file is not None: |
| |
| if st.session_state.video_uploaded is None or uploaded_file.name != st.session_state.video_uploaded.get( |
| "original_name" |
| ): |
| st.session_state.video_uploaded = {"original_name": uploaded_file.name, "status": "validating"} |
| st.session_state.characters_detected = None |
| st.session_state.characters_saved = False |
|
|
| if st.session_state.video_uploaded["status"] == "validating": |
| is_valid = True |
| if uploaded_file.size > MAX_SIZE_MB * 1024 * 1024: |
| st.error(f"El vídeo supera el límit de {MAX_SIZE_MB}MB.") |
| is_valid = False |
|
|
| if is_valid: |
| with st.spinner("Processant el vídeo..."): |
| temp_path = Path("temp_video.mp4") |
| with temp_path.open("wb") as f: |
| f.write(uploaded_file.getbuffer()) |
|
|
| was_truncated = False |
| final_video_path = None |
| try: |
| duration = _get_video_duration(str(temp_path)) |
| if not duration: |
| st.error("No s'ha pogut obtenir la durada del vídeo.") |
| is_valid = False |
|
|
| if is_valid: |
| if duration > MAX_DURATION_S: |
| was_truncated = True |
|
|
| video_name = Path(uploaded_file.name).stem |
| video_dir = Path("/tmp/data/videos") / video_name |
| video_dir.mkdir(parents=True, exist_ok=True) |
| final_video_path = video_dir / f"{video_name}.mp4" |
|
|
| try: |
| _transcode_video( |
| str(temp_path), |
| str(final_video_path), |
| MAX_DURATION_S if was_truncated else None, |
| ) |
| except RuntimeError as exc: |
| st.error(f"No s'ha pogut processar el vídeo: {exc}") |
| is_valid = False |
|
|
| if is_valid and final_video_path is not None: |
| st.session_state.video_uploaded.update( |
| { |
| "status": "processed", |
| "path": str(final_video_path), |
| "was_truncated": was_truncated, |
| } |
| ) |
| st.rerun() |
| finally: |
| if temp_path.exists(): |
| temp_path.unlink() |
|
|
| if st.session_state.video_uploaded and st.session_state.video_uploaded["status"] == "processed": |
| st.success(f"Vídeo '{st.session_state.video_uploaded['original_name']}' pujat i processat correctament.") |
| if st.session_state.video_uploaded["was_truncated"]: |
| st.warning("El vídeo s'ha truncat a 4 minuts.") |
|
|
| st.markdown("---") |
| col1, col2 = st.columns([1, 3]) |
| with col1: |
| detect_button_disabled = st.session_state.video_uploaded is None |
| if st.button("Detectar Personatges", disabled=detect_button_disabled): |
| with st.spinner("Detectant personatges..."): |
| st.session_state.characters_detected = [ |
| { |
| "id": "char1", |
| "image_path": "init_data/placeholder.png", |
| "description": "Dona amb cabell ros i ulleres", |
| }, |
| { |
| "id": "char2", |
| "image_path": "init_data/placeholder.png", |
| "description": "Home amb barba i barret", |
| }, |
| ] |
| st.session_state.characters_saved = False |
|
|
| if st.session_state.characters_detected: |
| st.subheader("Personatges detectats") |
| for char in st.session_state.characters_detected: |
| with st.form(key=f"form_{char['id']}"): |
| col1, col2 = st.columns(2) |
| with col1: |
| st.image(char["image_path"], width=150) |
| with col2: |
| st.caption(char["description"]) |
| st.text_input("Nom del personatge", key=f"name_{char['id']}") |
| st.form_submit_button("Cercar") |
|
|
| st.markdown("---_**") |
|
|
| col1, col2, col3 = st.columns([1, 1, 2]) |
| with col1: |
| if st.button("Desar", type="primary"): |
| st.session_state.characters_saved = True |
| st.success("Personatges desats correctament.") |
|
|
| with col2: |
| if st.session_state.characters_saved: |
| st.button("Generar Audiodescripció") |
|
|