""" Detecção e crop automático de vídeo baseado em análise de movimento. Extrai a região ativa do vídeo removendo bordas pretas/brancas estáticas. Adaptado de recurve-videos-export/detect_crop.py para uso no gemini worker. """ import cv2 import numpy as np import os import subprocess import time import shutil from concurrent.futures import ThreadPoolExecutor # EasyOCR reader loaded lazily _easyocr_reader = None def get_easyocr_reader(): global _easyocr_reader if _easyocr_reader is None: import easyocr # Disable easyocr verbose output to keep logs clean _easyocr_reader = easyocr.Reader(['en'], verbose=False) return _easyocr_reader def detect_and_crop_text(video_path, output_video_path): """ Second crop pass: Detect text using easyocr and crop if needed. Returns True if cropped, False otherwise. """ if not os.path.exists(video_path): return False cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return False total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) try: reader = get_easyocr_reader() except Exception as e: print(f"⚠️ EasyOCR indisponível: {e}") cap.release() return False fps = cap.get(cv2.CAP_PROP_FPS) if fps <= 0: fps = 30 duration = total_frames / fps num_samples = max(1, int(duration)) # 1 frame per second indices = np.linspace(0, total_frames - 1, num_samples, dtype=int) frames = [] for i in indices: cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if ret: frames.append(frame) cap.release() if not frames: return False all_boxes = [] print(f"🔍 Analisando até {num_samples} frames (1 fps) em paralelo para encontrar texto hardcoded e legendas dinâmicas...") def process_frame(f): return reader.readtext(f) # Usa multithreading para processar vários frames ao mesmo tempo, # garantindo uso máximo da CPU durante a inferência PyTorch. max_workers = min(4, (os.cpu_count() or 1)) stable_frames = 0 last_union = None half_frames = max(3, int(num_samples * 0.5)) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(process_frame, f) for f in frames] for future in futures: results = future.result() frame_xs = [] frame_ys = [] for (bbox, text, prob) in results: if prob > 0.4 and len(text.strip()) > 2: xs = [pt[0] for pt in bbox] ys = [pt[1] for pt in bbox] min_x = int(min(xs)) min_y = int(min(ys)) max_x = int(max(xs)) max_y = int(max(ys)) frame_xs.extend([min_x, max_x]) frame_ys.extend([min_y, max_y]) all_boxes.append((min_x, min_y, max_x, max_y)) print(f"📖 Texto encontrado: '{text}' | Box: ({min_x},{min_y}) até ({max_x},{max_y})") frame_union = None if frame_xs and frame_ys: frame_union = (min(frame_xs), min(frame_ys), max(frame_xs), max(frame_ys)) if frame_union is not None and last_union is not None: dx = abs(frame_union[0] - last_union[0]) + abs(frame_union[2] - last_union[2]) dy = abs(frame_union[1] - last_union[1]) + abs(frame_union[3] - last_union[3]) # Diferença de até 40 pixels total nas bordas é considerada tolerância/estável if dx < 40 and dy < 40: stable_frames += 1 else: stable_frames = 0 else: stable_frames = 0 last_union = frame_union if stable_frames >= half_frames: print(f"⚡ Texto estático detectado iterativamente. Otimizando e abortando a leitura dos frames restantes!") for f in futures: f.cancel() break if not all_boxes: print("✅ Nenhum aviso de texto significativo detectado.") return False margin = int(min(w, h) * 0.02) y_coords = [0, h] for (min_x, min_y, max_x, max_y) in all_boxes: y_coords.extend([max(0, int(min_y) - margin), min(h, int(max_y) + margin)]) y_coords = sorted(list(set(y_coords))) max_area = 0 best_rect = (0, 0, w, h) for i in range(len(y_coords)): for j in range(i + 1, len(y_coords)): y1, y2 = y_coords[i], y_coords[j] blocked_intervals = [] for b in all_boxes: b_min_x, b_min_y = max(0, int(b[0]) - margin), max(0, int(b[1]) - margin) b_max_x, b_max_y = min(w, int(b[2]) + margin), min(h, int(b[3]) + margin) # Intersects this horizontal strip? if b_min_y < y2 and b_max_y > y1: blocked_intervals.append((b_min_x, b_max_x)) blocked_intervals.sort() current_x = 0 for bx1, bx2 in blocked_intervals: if bx1 > current_x: area = (bx1 - current_x) * (y2 - y1) if area > max_area: max_area = area best_rect = (current_x, y1, bx1, y2) current_x = max(current_x, bx2) if w > current_x: area = (w - current_x) * (y2 - y1) if area > max_area: max_area = area best_rect = (current_x, y1, w, y2) crop_x, crop_y, crop_max_x, crop_max_y = best_rect crop_w = crop_max_x - crop_x crop_h = crop_max_y - crop_y if (crop_w * crop_h) < (w * h * 0.5): print(f"❌ Região útil muito pequena (sobraria {crop_w * crop_h / (w * h):.0%}). Texto possivelmente no meio. Abortando crop inteligente.") return "aborted_area_too_small" if crop_w >= w * 0.95 and crop_h >= h * 0.95: print("✅ Região de texto é irrelevante, mantendo vídeo intacto.") return "skipped" if crop_w % 2 != 0: crop_w -= 1 if crop_h % 2 != 0: crop_h -= 1 print(f"✂️ Text Crop (Inteligente 2D): {crop_w}x{crop_h} @ ({crop_x},{crop_y})") has_nvenc = check_nvenc_support() filter_chain = f"crop={crop_w}:{crop_h}:{crop_x}:{crop_y}" cmd_nvenc = [ "ffmpeg", "-y", "-loglevel", "error", "-i", video_path, "-vf", filter_chain, "-c:a", "copy", "-c:v", "h264_nvenc", "-preset", "fast", "-cq", "20", output_video_path ] cmd_cpu = [ "ffmpeg", "-y", "-loglevel", "error", "-i", video_path, "-vf", filter_chain, "-c:a", "copy", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "23", output_video_path ] print(f"🔄 Iniciando ffmpeg text crop...") t0 = time.time() crop_success = False if has_nvenc: try: subprocess.run(cmd_nvenc, check=True, capture_output=True) print(f"✅ Text crop concluído (NVENC) em {time.time() - t0:.1f}s") crop_success = True except subprocess.CalledProcessError: print(f"⚠️ NVENC indisponível para texto, usando CPU fallback...") if not crop_success: t_cpu = time.time() try: subprocess.run(cmd_cpu, check=True) print(f"✅ Text crop concluído (CPU) em {time.time() - t_cpu:.1f}s") crop_success = True except subprocess.CalledProcessError as e: print(f"❌ Text crop falhou (CPU): {e}") return "error" return "success" def check_nvenc_support(): """Checks if h264_nvenc encoder is available in ffmpeg.""" try: result = subprocess.run(['ffmpeg', '-encoders'], capture_output=True, text=True, check=True) return 'h264_nvenc' in result.stdout except (subprocess.CalledProcessError, FileNotFoundError): return False def get_crop_detect_coords(video_path, limit=24, skip=5, duration=5): """ Uses ffmpeg cropdetect filter to find the content area (removing black bars). Returns (w, h, x, y) or None if detection fails. """ try: # Pula os primeiros segundos (skip) para evitar intros pretas, # analisa por 'duration' segundos. cmd = [ "ffmpeg", "-ss", str(skip), "-i", video_path, "-t", str(duration), "-vf", f"cropdetect={limit}:16:0", "-f", "null", "-" ] print(f"🎬 Executando ffmpeg cropdetect...") result = subprocess.run(cmd, capture_output=True, text=True, check=False) # O output do cropdetect sai no stderr output = result.stderr # Procurar pela última linha com 'crop=' import re matches = re.findall(r"crop=(\d+):(\d+):(\d+):(\d+)", output) if matches: # Pegar a última ocorrência para garantir que a detecção estabilizou w, h, x, y = map(int, matches[-1]) return w, h, x, y return None except Exception as e: print(f"⚠️ Erro ao executar cropdetect: {e}") return None def get_content_density_crop(frames, color_var_threshold=8, complexity_threshold=10, min_density=0.15): """ Analyzes row-by-row color variance and complexity to find the 'congruent line of colors'. Isolates colorful video frames from monochromatic text overlays. Returns (y_min, y_max). """ if not frames: return None num_frames = len(frames) h, w = frames[0].shape[:2] all_y_min = [] all_y_max = [] for frame in frames: if len(frame.shape) != 3: continue # 1. Color Variance Check (Crucial for 'Várias cores de forma congruente') # In monochrome text (white/black/gray), R, G, B are identical or very close. # Across a real video frame, colors vary significantly along the row. b, g, r = cv2.split(frame.astype(np.int16)) rg = r - g gb = g - b br = b - r # Variância de cor na linha color_variance = np.std(rg, axis=1) + np.std(gb, axis=1) + np.std(br, axis=1) # 2. Complexity Density (Variation across the row) # Identifica linhas que são complexas (movimento/textura) em vez de texto isolado gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.int16) diff = np.abs(gray[:, 1:] - gray[:, :-1]) row_complexity = np.sum(diff > 15, axis=1) / w # Unimos critérios: Deve ter variância de cor OU ser muito complexo # (Para suportar vídeos P&B, mantemos uma margem de complexidade alta) is_content = (color_variance > color_var_threshold) | (row_complexity > 0.40) # Linhas que superam os critérios de conteúdo congruente content_rows = np.where(is_content)[0] if len(content_rows) > 0: # Encontrar o maior bloco contínuo (pula texto isolado) diffs = np.diff(content_rows) # O split ocorre onde a diferença não é 1 (quebra na continuidade) splits = np.where(diffs != 1)[0] + 1 blocks = np.split(content_rows, splits) # Escolher o maior bloco contínuo em termos de número de linhas main_block = max(blocks, key=len) all_y_min.append(main_block[0]) all_y_max.append(main_block[-1]) if not all_y_min or not all_y_max: return None # Usamos o percentil 50 (mediana) para as fronteiras para estabilidade y_min = int(np.percentile(all_y_min, 50)) y_max = int(np.percentile(all_y_max, 50)) # Adicionamos uma margem de segurança de 2px para não cortar o frame real y_min = max(0, y_min - 2) y_max = min(h, y_max + 2) return y_min, y_max def detect_and_crop_video(video_path, output_video_path, text_cut=True): """ Detecta a região com movimento no vídeo e gera um vídeo cropado. Retorna True se o crop foi realizado, False caso contrário. """ if not os.path.exists(video_path): print(f"Error: Video file not found at {video_path}") return False cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print("Error: Could not open video.") return False total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Sample frames to detect motion num_samples = 15 indices = np.linspace(0, total_frames - 1, num_samples, dtype=int) frames_gray = [] frames_bgr = [] for i in indices: cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if ret: frames_bgr.append(frame) gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) frames_gray.append(gray) cap.release() if len(frames_gray) < 2: print(f"❌ Erro: Não foi possível ler frames suficientes ({len(frames_gray)}/{num_samples}) para análise.") return False # --------------------------------------------------------- # Passo 1: Tentar detectar bordas via FFmpeg cropdetect # --------------------------------------------------------- crop_coords = get_crop_detect_coords(video_path) use_motion_fallback = True if crop_coords: cw, ch, cx, cy = crop_coords original_area = w * h crop_area = cw * ch reduction = (1 - crop_area / original_area) * 100 # Se houve uma redução significativa (>10%), confiamos no cropdetect if reduction > 10: print(f"✅ Cropdetect sugeriu: {cw}x{ch} @ ({cx},{cy}) | Redução: {reduction:.1f}%") x_min, y_min, x_max, y_max = cx, cy, cx + cw, cy + ch use_motion_fallback = False else: print(f"⏩ Cropdetect sugeriu redução irrelevante ({reduction:.1f}%). Usando motion fallback...") # --------------------------------------------------------- # Passo 2: Fallback para detecção de movimento (OpenCV) # --------------------------------------------------------- if use_motion_fallback: print(f"🔍 Analisando movimento em {len(frames_gray)} frames amostrados...") # Calculate accumulated difference accum_diff = np.zeros((h, w), dtype=np.float32) for i in range(len(frames_gray) - 1): diff = cv2.absdiff(frames_gray[i], frames_gray[i+1]) accum_diff = cv2.add(accum_diff, diff.astype(np.float32)) accum_diff = cv2.normalize(accum_diff, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) _, thresh = cv2.threshold(accum_diff, 20, 255, cv2.THRESH_BINARY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: print("❌ Aviso: Nenhum movimento detectado nos frames selecionados.") return False print(f"📊 Encontrados {len(contours)} contornos de movimento iniciais.") x_min, y_min = w, h x_max, y_max = 0, 0 found_any = False for c in contours: if cv2.contourArea(c) > 500: found_any = True x, y, cw, ch = cv2.boundingRect(c) x_min = min(x_min, x) y_min = min(y_min, y) x_max = max(x_max, x + cw) y_max = max(y_max, y + ch) if not found_any: print("❌ Aviso: Nenhum movimento significativo (>500px area) detectado.") return False print(f"✅ Movimento consolidado na região: {x_min},{y_min} até {x_max},{y_max}") # --------------------------------------------------------- # Passo 3: Refinamento por Densidade de Conteúdo (Garante linha divisória congruente) # --------------------------------------------------------- density_coords = get_content_density_crop(frames_bgr) if density_coords: dy_min, dy_max = density_coords print(f"🎨 Refinamento de densidade sugeriu: Y de {dy_min} até {dy_max}") # Aplicamos o refinamento se ele for mais restritivo (interno) ou se o movimento falhou # Para evitar cortar o vídeo original por erro, conferimos se a área é razoável y_min = max(y_min, dy_min) y_max = min(y_max, dy_max) print(f"✨ Região refinada final: Y de {y_min} até {y_max}") # Inset Logic (2px) inset = 2 x_min = min(x_min + inset, w) y_min = min(y_min + inset, h) x_max = max(x_max - inset, x_min) y_max = max(y_max - inset, y_min) final_w = x_max - x_min final_h = y_max - y_min # Ensure crop dimensions are even if final_w % 2 != 0: final_w -= 1 if final_h % 2 != 0: final_h -= 1 reduction_pct = (1 - (final_w * final_h) / (w * h)) * 100 print(f"✂️ Motion Crop: {final_w}x{final_h} @ ({x_min},{y_min}) | Redução de área: {reduction_pct:.1f}%") # Check for NVENC support has_nvenc = check_nvenc_support() # Define filter crop_filter = f"crop={final_w}:{final_h}:{x_min}:{y_min}" tmp_output_path = output_video_path + ".tmp.mp4" cpu_cmd = [ "ffmpeg", "-y", "-loglevel", "error", "-i", video_path, "-vf", crop_filter, "-c:a", "copy", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "23", tmp_output_path ] # Execute print(f"🔄 Iniciando ffmpeg crop...") t_ffmpeg = time.time() crop_success = False if has_nvenc: nvenc_cmd = [ "ffmpeg", "-y", "-loglevel", "error", "-i", video_path, "-vf", crop_filter, "-c:a", "copy", "-c:v", "h264_nvenc", "-preset", "fast", "-cq", "20", tmp_output_path ] try: subprocess.run(nvenc_cmd, check=True, capture_output=True) print(f"✅ Video crop concluído (NVENC) em {time.time() - t_ffmpeg:.1f}s") crop_success = True except subprocess.CalledProcessError: print(f"⚠️ NVENC indisponível, usando CPU fallback...") # CPU fallback if not crop_success: t_cpu = time.time() try: subprocess.run(cpu_cmd, check=True) print(f"✅ Video crop concluído (CPU) em {time.time() - t_cpu:.1f}s") crop_success = True except subprocess.CalledProcessError as e: print(f"❌ Video crop falhou (CPU): {e}") return False if crop_success: if text_cut: # Pass 2: Text crop print("🔄 Iniciando verificação de texto para segundo crop...") text_crop_status = detect_and_crop_text(tmp_output_path, output_video_path) if text_crop_status == "success": if os.path.exists(tmp_output_path): os.remove(tmp_output_path) return "success" elif text_crop_status == "aborted_area_too_small": if os.path.exists(tmp_output_path): os.remove(tmp_output_path) return "aborted_area_too_small" else: # skipped or error in text crop, keep the motion crop shutil.move(tmp_output_path, output_video_path) return "success" else: print("⏩ Pulando verificação OCR por configuração do usuário (text_cut=False).") shutil.move(tmp_output_path, output_video_path) return "success" return "error"