engine / page_modules /process_video.py
VeuReu's picture
Upload 6 files
d345661 verified
raw
history blame
8.12 kB
"""UI logic for the "Processar vídeo nou" page."""
from __future__ import annotations
import re
import shutil
import subprocess
from pathlib import Path
import streamlit as st
def _get_video_duration(path: str) -> float:
"""Return video duration in seconds using ffprobe, ffmpeg or OpenCV as fallback."""
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
path,
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return float(result.stdout.strip())
except (subprocess.CalledProcessError, ValueError, FileNotFoundError):
pass
if shutil.which("ffmpeg"):
try:
ffmpeg_cmd = ["ffmpeg", "-i", path]
result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False)
output = result.stderr or result.stdout or ""
match = re.search(r"Duration:\s*(\d+):(\d+):(\d+\.\d+)", output)
if match:
hours, minutes, seconds = match.groups()
total_seconds = (int(hours) * 3600) + (int(minutes) * 60) + float(seconds)
return float(total_seconds)
except FileNotFoundError:
pass
# Últim recurs: intentar amb OpenCV si està disponible
try:
import cv2
cap = cv2.VideoCapture(path)
if cap.isOpened():
fps = cap.get(cv2.CAP_PROP_FPS) or 0
frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0
cap.release()
if fps > 0 and frame_count > 0:
return float(frame_count / fps)
else:
cap.release()
except Exception:
pass
return 0.0
def _transcode_video(input_path: str, output_path: str, max_duration: int | None = None) -> None:
cmd = ["ffmpeg", "-y", "-i", input_path]
if max_duration is not None:
cmd += ["-t", str(max_duration)]
cmd += [
"-c:v",
"libx264",
"-preset",
"veryfast",
"-crf",
"23",
"-c:a",
"aac",
"-movflags",
"+faststart",
output_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or "ffmpeg failed")
def render_process_video_page() -> None:
st.header("Processar un nou clip de vídeo")
# Inicializar el estado de la página si no existe
if "video_uploaded" not in st.session_state:
st.session_state.video_uploaded = None
if "characters_detected" not in st.session_state:
st.session_state.characters_detected = None
if "characters_saved" not in st.session_state:
st.session_state.characters_saved = False
# --- 1. Subida del vídeo ---
MAX_SIZE_MB = 20
MAX_DURATION_S = 240 # 4 minutos
uploaded_file = st.file_uploader(
"Puja un clip de vídeo (MP4, < 20MB, < 4 minuts)",
type=["mp4"],
key="video_uploader",
)
if uploaded_file is not None:
# Resetear el estado si se sube un nuevo archivo
if st.session_state.video_uploaded is None or uploaded_file.name != st.session_state.video_uploaded.get(
"original_name"
):
st.session_state.video_uploaded = {"original_name": uploaded_file.name, "status": "validating"}
st.session_state.characters_detected = None
st.session_state.characters_saved = False
if st.session_state.video_uploaded["status"] == "validating":
is_valid = True
if uploaded_file.size > MAX_SIZE_MB * 1024 * 1024:
st.error(f"El vídeo supera el límit de {MAX_SIZE_MB}MB.")
is_valid = False
if is_valid:
with st.spinner("Processant el vídeo..."):
temp_path = Path("temp_video.mp4")
with temp_path.open("wb") as f:
f.write(uploaded_file.getbuffer())
was_truncated = False
final_video_path = None
try:
duration = _get_video_duration(str(temp_path))
if not duration:
st.error("No s'ha pogut obtenir la durada del vídeo.")
is_valid = False
if is_valid:
if duration > MAX_DURATION_S:
was_truncated = True
video_name = Path(uploaded_file.name).stem
video_dir = Path("/tmp/data/videos") / video_name
video_dir.mkdir(parents=True, exist_ok=True)
final_video_path = video_dir / f"{video_name}.mp4"
try:
_transcode_video(
str(temp_path),
str(final_video_path),
MAX_DURATION_S if was_truncated else None,
)
except RuntimeError as exc:
st.error(f"No s'ha pogut processar el vídeo: {exc}")
is_valid = False
if is_valid and final_video_path is not None:
st.session_state.video_uploaded.update(
{
"status": "processed",
"path": str(final_video_path),
"was_truncated": was_truncated,
}
)
st.rerun()
finally:
if temp_path.exists():
temp_path.unlink()
if st.session_state.video_uploaded and st.session_state.video_uploaded["status"] == "processed":
st.success(f"Vídeo '{st.session_state.video_uploaded['original_name']}' pujat i processat correctament.")
if st.session_state.video_uploaded["was_truncated"]:
st.warning("El vídeo s'ha truncat a 4 minuts.")
st.markdown("---")
col1, col2 = st.columns([1, 3])
with col1:
detect_button_disabled = st.session_state.video_uploaded is None
if st.button("Detectar Personatges", disabled=detect_button_disabled):
with st.spinner("Detectant personatges..."):
st.session_state.characters_detected = [
{
"id": "char1",
"image_path": "init_data/placeholder.png",
"description": "Dona amb cabell ros i ulleres",
},
{
"id": "char2",
"image_path": "init_data/placeholder.png",
"description": "Home amb barba i barret",
},
]
st.session_state.characters_saved = False
if st.session_state.characters_detected:
st.subheader("Personatges detectats")
for char in st.session_state.characters_detected:
with st.form(key=f"form_{char['id']}"):
col1, col2 = st.columns(2)
with col1:
st.image(char["image_path"], width=150)
with col2:
st.caption(char["description"])
st.text_input("Nom del personatge", key=f"name_{char['id']}")
st.form_submit_button("Cercar")
st.markdown("---_**")
col1, col2, col3 = st.columns([1, 1, 2])
with col1:
if st.button("Desar", type="primary"):
st.session_state.characters_saved = True
st.success("Personatges desats correctament.")
with col2:
if st.session_state.characters_saved:
st.button("Generar Audiodescripció")