subapi / detect_crop_video.py
habulaj's picture
Update detect_crop_video.py
417fcb1 verified
"""
Detecção e crop automático de vídeo baseado em análise de movimento.
Extrai a região ativa do vídeo removendo bordas pretas/brancas estáticas.
Adaptado de recurve-videos-export/detect_crop.py para uso no gemini worker.
"""
import cv2
import numpy as np
import os
import subprocess
import time
import shutil
from concurrent.futures import ThreadPoolExecutor
# EasyOCR reader loaded lazily
_easyocr_reader = None
def get_easyocr_reader():
global _easyocr_reader
if _easyocr_reader is None:
import easyocr
# Disable easyocr verbose output to keep logs clean
_easyocr_reader = easyocr.Reader(['en'], verbose=False)
return _easyocr_reader
def detect_and_crop_text(video_path, output_video_path):
"""
Second crop pass: Detect text using easyocr and crop if needed.
Returns True if cropped, False otherwise.
"""
if not os.path.exists(video_path):
return False
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return False
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
try:
reader = get_easyocr_reader()
except Exception as e:
print(f"⚠️ EasyOCR indisponível: {e}")
cap.release()
return False
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0: fps = 30
duration = total_frames / fps
num_samples = max(1, int(duration)) # 1 frame per second
indices = np.linspace(0, total_frames - 1, num_samples, dtype=int)
frames = []
for i in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
frames.append(frame)
cap.release()
if not frames:
return False
all_boxes = []
print(f"🔍 Analisando até {num_samples} frames (1 fps) em paralelo para encontrar texto hardcoded e legendas dinâmicas...")
def process_frame(f):
return reader.readtext(f)
# Usa multithreading para processar vários frames ao mesmo tempo,
# garantindo uso máximo da CPU durante a inferência PyTorch.
max_workers = min(4, (os.cpu_count() or 1))
stable_frames = 0
last_union = None
half_frames = max(3, int(num_samples * 0.5))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(process_frame, f) for f in frames]
for future in futures:
results = future.result()
frame_xs = []
frame_ys = []
for (bbox, text, prob) in results:
if prob > 0.4 and len(text.strip()) > 2:
xs = [pt[0] for pt in bbox]
ys = [pt[1] for pt in bbox]
min_x = int(min(xs))
min_y = int(min(ys))
max_x = int(max(xs))
max_y = int(max(ys))
frame_xs.extend([min_x, max_x])
frame_ys.extend([min_y, max_y])
all_boxes.append((min_x, min_y, max_x, max_y))
print(f"📖 Texto encontrado: '{text}' | Box: ({min_x},{min_y}) até ({max_x},{max_y})")
frame_union = None
if frame_xs and frame_ys:
frame_union = (min(frame_xs), min(frame_ys), max(frame_xs), max(frame_ys))
if frame_union is not None and last_union is not None:
dx = abs(frame_union[0] - last_union[0]) + abs(frame_union[2] - last_union[2])
dy = abs(frame_union[1] - last_union[1]) + abs(frame_union[3] - last_union[3])
# Diferença de até 40 pixels total nas bordas é considerada tolerância/estável
if dx < 40 and dy < 40:
stable_frames += 1
else:
stable_frames = 0
else:
stable_frames = 0
last_union = frame_union
if stable_frames >= half_frames:
print(f"⚡ Texto estático detectado iterativamente. Otimizando e abortando a leitura dos frames restantes!")
for f in futures:
f.cancel()
break
if not all_boxes:
print("✅ Nenhum aviso de texto significativo detectado.")
return False
margin = int(min(w, h) * 0.02)
y_coords = [0, h]
for (min_x, min_y, max_x, max_y) in all_boxes:
y_coords.extend([max(0, int(min_y) - margin), min(h, int(max_y) + margin)])
y_coords = sorted(list(set(y_coords)))
max_area = 0
best_rect = (0, 0, w, h)
for i in range(len(y_coords)):
for j in range(i + 1, len(y_coords)):
y1, y2 = y_coords[i], y_coords[j]
blocked_intervals = []
for b in all_boxes:
b_min_x, b_min_y = max(0, int(b[0]) - margin), max(0, int(b[1]) - margin)
b_max_x, b_max_y = min(w, int(b[2]) + margin), min(h, int(b[3]) + margin)
# Intersects this horizontal strip?
if b_min_y < y2 and b_max_y > y1:
blocked_intervals.append((b_min_x, b_max_x))
blocked_intervals.sort()
current_x = 0
for bx1, bx2 in blocked_intervals:
if bx1 > current_x:
area = (bx1 - current_x) * (y2 - y1)
if area > max_area:
max_area = area
best_rect = (current_x, y1, bx1, y2)
current_x = max(current_x, bx2)
if w > current_x:
area = (w - current_x) * (y2 - y1)
if area > max_area:
max_area = area
best_rect = (current_x, y1, w, y2)
crop_x, crop_y, crop_max_x, crop_max_y = best_rect
crop_w = crop_max_x - crop_x
crop_h = crop_max_y - crop_y
if (crop_w * crop_h) < (w * h * 0.5):
print(f"❌ Região útil muito pequena (sobraria {crop_w * crop_h / (w * h):.0%}). Texto possivelmente no meio. Abortando crop inteligente.")
return "aborted_area_too_small"
if crop_w >= w * 0.95 and crop_h >= h * 0.95:
print("✅ Região de texto é irrelevante, mantendo vídeo intacto.")
return "skipped"
if crop_w % 2 != 0: crop_w -= 1
if crop_h % 2 != 0: crop_h -= 1
print(f"✂️ Text Crop (Inteligente 2D): {crop_w}x{crop_h} @ ({crop_x},{crop_y})")
has_nvenc = check_nvenc_support()
filter_chain = f"crop={crop_w}:{crop_h}:{crop_x}:{crop_y}"
cmd_nvenc = [
"ffmpeg", "-y", "-loglevel", "error",
"-i", video_path, "-vf", filter_chain,
"-c:a", "copy", "-c:v", "h264_nvenc",
"-preset", "fast", "-cq", "20",
output_video_path
]
cmd_cpu = [
"ffmpeg", "-y", "-loglevel", "error",
"-i", video_path, "-vf", filter_chain,
"-c:a", "copy", "-c:v", "libx264",
"-preset", "ultrafast", "-crf", "23",
output_video_path
]
print(f"🔄 Iniciando ffmpeg text crop...")
t0 = time.time()
crop_success = False
if has_nvenc:
try:
subprocess.run(cmd_nvenc, check=True, capture_output=True)
print(f"✅ Text crop concluído (NVENC) em {time.time() - t0:.1f}s")
crop_success = True
except subprocess.CalledProcessError:
print(f"⚠️ NVENC indisponível para texto, usando CPU fallback...")
if not crop_success:
t_cpu = time.time()
try:
subprocess.run(cmd_cpu, check=True)
print(f"✅ Text crop concluído (CPU) em {time.time() - t_cpu:.1f}s")
crop_success = True
except subprocess.CalledProcessError as e:
print(f"❌ Text crop falhou (CPU): {e}")
return "error"
return "success"
def check_nvenc_support():
"""Checks if h264_nvenc encoder is available in ffmpeg."""
try:
result = subprocess.run(['ffmpeg', '-encoders'], capture_output=True, text=True, check=True)
return 'h264_nvenc' in result.stdout
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def get_crop_detect_coords(video_path, limit=24, skip=5, duration=5):
"""
Uses ffmpeg cropdetect filter to find the content area (removing black bars).
Returns (w, h, x, y) or None if detection fails.
"""
try:
# Pula os primeiros segundos (skip) para evitar intros pretas,
# analisa por 'duration' segundos.
cmd = [
"ffmpeg", "-ss", str(skip), "-i", video_path,
"-t", str(duration), "-vf", f"cropdetect={limit}:16:0",
"-f", "null", "-"
]
print(f"🎬 Executando ffmpeg cropdetect...")
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
# O output do cropdetect sai no stderr
output = result.stderr
# Procurar pela última linha com 'crop='
import re
matches = re.findall(r"crop=(\d+):(\d+):(\d+):(\d+)", output)
if matches:
# Pegar a última ocorrência para garantir que a detecção estabilizou
w, h, x, y = map(int, matches[-1])
return w, h, x, y
return None
except Exception as e:
print(f"⚠️ Erro ao executar cropdetect: {e}")
return None
def get_content_density_crop(frames, color_var_threshold=8, complexity_threshold=10, min_density=0.15):
"""
Analyzes row-by-row color variance and complexity to find the 'congruent line of colors'.
Isolates colorful video frames from monochromatic text overlays.
Returns (y_min, y_max).
"""
if not frames:
return None
num_frames = len(frames)
h, w = frames[0].shape[:2]
all_y_min = []
all_y_max = []
for frame in frames:
if len(frame.shape) != 3:
continue
# 1. Color Variance Check (Crucial for 'Várias cores de forma congruente')
# In monochrome text (white/black/gray), R, G, B are identical or very close.
# Across a real video frame, colors vary significantly along the row.
b, g, r = cv2.split(frame.astype(np.int16))
rg = r - g
gb = g - b
br = b - r
# Variância de cor na linha
color_variance = np.std(rg, axis=1) + np.std(gb, axis=1) + np.std(br, axis=1)
# 2. Complexity Density (Variation across the row)
# Identifica linhas que são complexas (movimento/textura) em vez de texto isolado
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.int16)
diff = np.abs(gray[:, 1:] - gray[:, :-1])
row_complexity = np.sum(diff > 15, axis=1) / w
# Unimos critérios: Deve ter variância de cor OU ser muito complexo
# (Para suportar vídeos P&B, mantemos uma margem de complexidade alta)
is_content = (color_variance > color_var_threshold) | (row_complexity > 0.40)
# Linhas que superam os critérios de conteúdo congruente
content_rows = np.where(is_content)[0]
if len(content_rows) > 0:
# Encontrar o maior bloco contínuo (pula texto isolado)
diffs = np.diff(content_rows)
# O split ocorre onde a diferença não é 1 (quebra na continuidade)
splits = np.where(diffs != 1)[0] + 1
blocks = np.split(content_rows, splits)
# Escolher o maior bloco contínuo em termos de número de linhas
main_block = max(blocks, key=len)
all_y_min.append(main_block[0])
all_y_max.append(main_block[-1])
if not all_y_min or not all_y_max:
return None
# Usamos o percentil 50 (mediana) para as fronteiras para estabilidade
y_min = int(np.percentile(all_y_min, 50))
y_max = int(np.percentile(all_y_max, 50))
# Adicionamos uma margem de segurança de 2px para não cortar o frame real
y_min = max(0, y_min - 2)
y_max = min(h, y_max + 2)
return y_min, y_max
def detect_and_crop_video(video_path, output_video_path, text_cut=True):
"""
Detecta a região com movimento no vídeo e gera um vídeo cropado.
Retorna True se o crop foi realizado, False caso contrário.
"""
if not os.path.exists(video_path):
print(f"Error: Video file not found at {video_path}")
return False
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Error: Could not open video.")
return False
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Sample frames to detect motion
num_samples = 15
indices = np.linspace(0, total_frames - 1, num_samples, dtype=int)
frames_gray = []
frames_bgr = []
for i in indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, i)
ret, frame = cap.read()
if ret:
frames_bgr.append(frame)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
frames_gray.append(gray)
cap.release()
if len(frames_gray) < 2:
print(f"❌ Erro: Não foi possível ler frames suficientes ({len(frames_gray)}/{num_samples}) para análise.")
return False
# ---------------------------------------------------------
# Passo 1: Tentar detectar bordas via FFmpeg cropdetect
# ---------------------------------------------------------
crop_coords = get_crop_detect_coords(video_path)
use_motion_fallback = True
if crop_coords:
cw, ch, cx, cy = crop_coords
original_area = w * h
crop_area = cw * ch
reduction = (1 - crop_area / original_area) * 100
# Se houve uma redução significativa (>10%), confiamos no cropdetect
if reduction > 10:
print(f"✅ Cropdetect sugeriu: {cw}x{ch} @ ({cx},{cy}) | Redução: {reduction:.1f}%")
x_min, y_min, x_max, y_max = cx, cy, cx + cw, cy + ch
use_motion_fallback = False
else:
print(f"⏩ Cropdetect sugeriu redução irrelevante ({reduction:.1f}%). Usando motion fallback...")
# ---------------------------------------------------------
# Passo 2: Fallback para detecção de movimento (OpenCV)
# ---------------------------------------------------------
if use_motion_fallback:
print(f"🔍 Analisando movimento em {len(frames_gray)} frames amostrados...")
# Calculate accumulated difference
accum_diff = np.zeros((h, w), dtype=np.float32)
for i in range(len(frames_gray) - 1):
diff = cv2.absdiff(frames_gray[i], frames_gray[i+1])
accum_diff = cv2.add(accum_diff, diff.astype(np.float32))
accum_diff = cv2.normalize(accum_diff, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
_, thresh = cv2.threshold(accum_diff, 20, 255, cv2.THRESH_BINARY)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
print("❌ Aviso: Nenhum movimento detectado nos frames selecionados.")
return False
print(f"📊 Encontrados {len(contours)} contornos de movimento iniciais.")
x_min, y_min = w, h
x_max, y_max = 0, 0
found_any = False
for c in contours:
if cv2.contourArea(c) > 500:
found_any = True
x, y, cw, ch = cv2.boundingRect(c)
x_min = min(x_min, x)
y_min = min(y_min, y)
x_max = max(x_max, x + cw)
y_max = max(y_max, y + ch)
if not found_any:
print("❌ Aviso: Nenhum movimento significativo (>500px area) detectado.")
return False
print(f"✅ Movimento consolidado na região: {x_min},{y_min} até {x_max},{y_max}")
# ---------------------------------------------------------
# Passo 3: Refinamento por Densidade de Conteúdo (Garante linha divisória congruente)
# ---------------------------------------------------------
density_coords = get_content_density_crop(frames_bgr)
if density_coords:
dy_min, dy_max = density_coords
print(f"🎨 Refinamento de densidade sugeriu: Y de {dy_min} até {dy_max}")
# Aplicamos o refinamento se ele for mais restritivo (interno) ou se o movimento falhou
# Para evitar cortar o vídeo original por erro, conferimos se a área é razoável
y_min = max(y_min, dy_min)
y_max = min(y_max, dy_max)
print(f"✨ Região refinada final: Y de {y_min} até {y_max}")
# Inset Logic (2px)
inset = 2
x_min = min(x_min + inset, w)
y_min = min(y_min + inset, h)
x_max = max(x_max - inset, x_min)
y_max = max(y_max - inset, y_min)
final_w = x_max - x_min
final_h = y_max - y_min
# Ensure crop dimensions are even
if final_w % 2 != 0: final_w -= 1
if final_h % 2 != 0: final_h -= 1
reduction_pct = (1 - (final_w * final_h) / (w * h)) * 100
print(f"✂️ Motion Crop: {final_w}x{final_h} @ ({x_min},{y_min}) | Redução de área: {reduction_pct:.1f}%")
# Check for NVENC support
has_nvenc = check_nvenc_support()
# Define filter
crop_filter = f"crop={final_w}:{final_h}:{x_min}:{y_min}"
tmp_output_path = output_video_path + ".tmp.mp4"
cpu_cmd = [
"ffmpeg",
"-y", "-loglevel", "error",
"-i", video_path,
"-vf", crop_filter,
"-c:a", "copy",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "23",
tmp_output_path
]
# Execute
print(f"🔄 Iniciando ffmpeg crop...")
t_ffmpeg = time.time()
crop_success = False
if has_nvenc:
nvenc_cmd = [
"ffmpeg",
"-y", "-loglevel", "error",
"-i", video_path,
"-vf", crop_filter,
"-c:a", "copy",
"-c:v", "h264_nvenc",
"-preset", "fast",
"-cq", "20",
tmp_output_path
]
try:
subprocess.run(nvenc_cmd, check=True, capture_output=True)
print(f"✅ Video crop concluído (NVENC) em {time.time() - t_ffmpeg:.1f}s")
crop_success = True
except subprocess.CalledProcessError:
print(f"⚠️ NVENC indisponível, usando CPU fallback...")
# CPU fallback
if not crop_success:
t_cpu = time.time()
try:
subprocess.run(cpu_cmd, check=True)
print(f"✅ Video crop concluído (CPU) em {time.time() - t_cpu:.1f}s")
crop_success = True
except subprocess.CalledProcessError as e:
print(f"❌ Video crop falhou (CPU): {e}")
return False
if crop_success:
if text_cut:
# Pass 2: Text crop
print("🔄 Iniciando verificação de texto para segundo crop...")
text_crop_status = detect_and_crop_text(tmp_output_path, output_video_path)
if text_crop_status == "success":
if os.path.exists(tmp_output_path):
os.remove(tmp_output_path)
return "success"
elif text_crop_status == "aborted_area_too_small":
if os.path.exists(tmp_output_path):
os.remove(tmp_output_path)
return "aborted_area_too_small"
else:
# skipped or error in text crop, keep the motion crop
shutil.move(tmp_output_path, output_video_path)
return "success"
else:
print("⏩ Pulando verificação OCR por configuração do usuário (text_cut=False).")
shutil.move(tmp_output_path, output_video_path)
return "success"
return "error"