| """ |
| Detecção e crop automático de vídeo baseado em análise de movimento. |
| Extrai a região ativa do vídeo removendo bordas pretas/brancas estáticas. |
| Adaptado de recurve-videos-export/detect_crop.py para uso no gemini worker. |
| """ |
| import cv2 |
| import numpy as np |
| import os |
| import subprocess |
| import time |
| import shutil |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| |
| _easyocr_reader = None |
|
|
| def get_easyocr_reader(): |
| global _easyocr_reader |
| if _easyocr_reader is None: |
| import easyocr |
| |
| _easyocr_reader = easyocr.Reader(['en'], verbose=False) |
| return _easyocr_reader |
|
|
| def detect_and_crop_text(video_path, output_video_path): |
| """ |
| Second crop pass: Detect text using easyocr and crop if needed. |
| Returns True if cropped, False otherwise. |
| """ |
| if not os.path.exists(video_path): |
| return False |
|
|
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| return False |
| |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| |
| try: |
| reader = get_easyocr_reader() |
| except Exception as e: |
| print(f"⚠️ EasyOCR indisponível: {e}") |
| cap.release() |
| return False |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) |
| if fps <= 0: fps = 30 |
| duration = total_frames / fps |
| num_samples = max(1, int(duration)) |
| indices = np.linspace(0, total_frames - 1, num_samples, dtype=int) |
| |
| frames = [] |
| for i in indices: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, i) |
| ret, frame = cap.read() |
| if ret: |
| frames.append(frame) |
| cap.release() |
|
|
| if not frames: |
| return False |
|
|
| all_boxes = [] |
| print(f"🔍 Analisando até {num_samples} frames (1 fps) em paralelo para encontrar texto hardcoded e legendas dinâmicas...") |
| |
| def process_frame(f): |
| return reader.readtext(f) |
| |
| |
| |
| max_workers = min(4, (os.cpu_count() or 1)) |
| |
| stable_frames = 0 |
| last_union = None |
| half_frames = max(3, int(num_samples * 0.5)) |
| |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| futures = [executor.submit(process_frame, f) for f in frames] |
| |
| for future in futures: |
| results = future.result() |
| |
| frame_xs = [] |
| frame_ys = [] |
| for (bbox, text, prob) in results: |
| if prob > 0.4 and len(text.strip()) > 2: |
| xs = [pt[0] for pt in bbox] |
| ys = [pt[1] for pt in bbox] |
| min_x = int(min(xs)) |
| min_y = int(min(ys)) |
| max_x = int(max(xs)) |
| max_y = int(max(ys)) |
| |
| frame_xs.extend([min_x, max_x]) |
| frame_ys.extend([min_y, max_y]) |
| |
| all_boxes.append((min_x, min_y, max_x, max_y)) |
| print(f"📖 Texto encontrado: '{text}' | Box: ({min_x},{min_y}) até ({max_x},{max_y})") |
| |
| frame_union = None |
| if frame_xs and frame_ys: |
| frame_union = (min(frame_xs), min(frame_ys), max(frame_xs), max(frame_ys)) |
| |
| if frame_union is not None and last_union is not None: |
| dx = abs(frame_union[0] - last_union[0]) + abs(frame_union[2] - last_union[2]) |
| dy = abs(frame_union[1] - last_union[1]) + abs(frame_union[3] - last_union[3]) |
| |
| if dx < 40 and dy < 40: |
| stable_frames += 1 |
| else: |
| stable_frames = 0 |
| else: |
| stable_frames = 0 |
| |
| last_union = frame_union |
| |
| if stable_frames >= half_frames: |
| print(f"⚡ Texto estático detectado iterativamente. Otimizando e abortando a leitura dos frames restantes!") |
| for f in futures: |
| f.cancel() |
| break |
| |
| if not all_boxes: |
| print("✅ Nenhum aviso de texto significativo detectado.") |
| return False |
| |
| margin = int(min(w, h) * 0.02) |
| y_coords = [0, h] |
| |
| for (min_x, min_y, max_x, max_y) in all_boxes: |
| y_coords.extend([max(0, int(min_y) - margin), min(h, int(max_y) + margin)]) |
| |
| y_coords = sorted(list(set(y_coords))) |
| |
| max_area = 0 |
| best_rect = (0, 0, w, h) |
| |
| for i in range(len(y_coords)): |
| for j in range(i + 1, len(y_coords)): |
| y1, y2 = y_coords[i], y_coords[j] |
| |
| blocked_intervals = [] |
| for b in all_boxes: |
| b_min_x, b_min_y = max(0, int(b[0]) - margin), max(0, int(b[1]) - margin) |
| b_max_x, b_max_y = min(w, int(b[2]) + margin), min(h, int(b[3]) + margin) |
| |
| |
| if b_min_y < y2 and b_max_y > y1: |
| blocked_intervals.append((b_min_x, b_max_x)) |
| |
| blocked_intervals.sort() |
| |
| current_x = 0 |
| for bx1, bx2 in blocked_intervals: |
| if bx1 > current_x: |
| area = (bx1 - current_x) * (y2 - y1) |
| if area > max_area: |
| max_area = area |
| best_rect = (current_x, y1, bx1, y2) |
| current_x = max(current_x, bx2) |
| |
| if w > current_x: |
| area = (w - current_x) * (y2 - y1) |
| if area > max_area: |
| max_area = area |
| best_rect = (current_x, y1, w, y2) |
| |
| crop_x, crop_y, crop_max_x, crop_max_y = best_rect |
| crop_w = crop_max_x - crop_x |
| crop_h = crop_max_y - crop_y |
| |
| if (crop_w * crop_h) < (w * h * 0.5): |
| print(f"❌ Região útil muito pequena (sobraria {crop_w * crop_h / (w * h):.0%}). Texto possivelmente no meio. Abortando crop inteligente.") |
| return "aborted_area_too_small" |
| |
| if crop_w >= w * 0.95 and crop_h >= h * 0.95: |
| print("✅ Região de texto é irrelevante, mantendo vídeo intacto.") |
| return "skipped" |
| |
| if crop_w % 2 != 0: crop_w -= 1 |
| if crop_h % 2 != 0: crop_h -= 1 |
|
|
| print(f"✂️ Text Crop (Inteligente 2D): {crop_w}x{crop_h} @ ({crop_x},{crop_y})") |
|
|
| has_nvenc = check_nvenc_support() |
| filter_chain = f"crop={crop_w}:{crop_h}:{crop_x}:{crop_y}" |
| |
| cmd_nvenc = [ |
| "ffmpeg", "-y", "-loglevel", "error", |
| "-i", video_path, "-vf", filter_chain, |
| "-c:a", "copy", "-c:v", "h264_nvenc", |
| "-preset", "fast", "-cq", "20", |
| output_video_path |
| ] |
| |
| cmd_cpu = [ |
| "ffmpeg", "-y", "-loglevel", "error", |
| "-i", video_path, "-vf", filter_chain, |
| "-c:a", "copy", "-c:v", "libx264", |
| "-preset", "ultrafast", "-crf", "23", |
| output_video_path |
| ] |
| |
| print(f"🔄 Iniciando ffmpeg text crop...") |
| t0 = time.time() |
| |
| crop_success = False |
| if has_nvenc: |
| try: |
| subprocess.run(cmd_nvenc, check=True, capture_output=True) |
| print(f"✅ Text crop concluído (NVENC) em {time.time() - t0:.1f}s") |
| crop_success = True |
| except subprocess.CalledProcessError: |
| print(f"⚠️ NVENC indisponível para texto, usando CPU fallback...") |
| |
| if not crop_success: |
| t_cpu = time.time() |
| try: |
| subprocess.run(cmd_cpu, check=True) |
| print(f"✅ Text crop concluído (CPU) em {time.time() - t_cpu:.1f}s") |
| crop_success = True |
| except subprocess.CalledProcessError as e: |
| print(f"❌ Text crop falhou (CPU): {e}") |
| return "error" |
| |
| return "success" |
|
|
|
|
| def check_nvenc_support(): |
| """Checks if h264_nvenc encoder is available in ffmpeg.""" |
| try: |
| result = subprocess.run(['ffmpeg', '-encoders'], capture_output=True, text=True, check=True) |
| return 'h264_nvenc' in result.stdout |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| return False |
|
|
|
|
| def get_crop_detect_coords(video_path, limit=24, skip=5, duration=5): |
| """ |
| Uses ffmpeg cropdetect filter to find the content area (removing black bars). |
| Returns (w, h, x, y) or None if detection fails. |
| """ |
| try: |
| |
| |
| cmd = [ |
| "ffmpeg", "-ss", str(skip), "-i", video_path, |
| "-t", str(duration), "-vf", f"cropdetect={limit}:16:0", |
| "-f", "null", "-" |
| ] |
| print(f"🎬 Executando ffmpeg cropdetect...") |
| result = subprocess.run(cmd, capture_output=True, text=True, check=False) |
| |
| |
| output = result.stderr |
| |
| |
| import re |
| matches = re.findall(r"crop=(\d+):(\d+):(\d+):(\d+)", output) |
| if matches: |
| |
| w, h, x, y = map(int, matches[-1]) |
| return w, h, x, y |
| return None |
| except Exception as e: |
| print(f"⚠️ Erro ao executar cropdetect: {e}") |
| return None |
|
|
|
|
| def get_content_density_crop(frames, color_var_threshold=8, complexity_threshold=10, min_density=0.15): |
| """ |
| Analyzes row-by-row color variance and complexity to find the 'congruent line of colors'. |
| Isolates colorful video frames from monochromatic text overlays. |
| Returns (y_min, y_max). |
| """ |
| if not frames: |
| return None |
| |
| num_frames = len(frames) |
| h, w = frames[0].shape[:2] |
| all_y_min = [] |
| all_y_max = [] |
| |
| for frame in frames: |
| if len(frame.shape) != 3: |
| continue |
| |
| |
| |
| |
| b, g, r = cv2.split(frame.astype(np.int16)) |
| rg = r - g |
| gb = g - b |
| br = b - r |
| |
| color_variance = np.std(rg, axis=1) + np.std(gb, axis=1) + np.std(br, axis=1) |
| |
| |
| |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY).astype(np.int16) |
| diff = np.abs(gray[:, 1:] - gray[:, :-1]) |
| row_complexity = np.sum(diff > 15, axis=1) / w |
| |
| |
| |
| is_content = (color_variance > color_var_threshold) | (row_complexity > 0.40) |
| |
| |
| content_rows = np.where(is_content)[0] |
| |
| if len(content_rows) > 0: |
| |
| diffs = np.diff(content_rows) |
| |
| splits = np.where(diffs != 1)[0] + 1 |
| blocks = np.split(content_rows, splits) |
| |
| |
| main_block = max(blocks, key=len) |
| |
| all_y_min.append(main_block[0]) |
| all_y_max.append(main_block[-1]) |
| |
| if not all_y_min or not all_y_max: |
| return None |
| |
| |
| y_min = int(np.percentile(all_y_min, 50)) |
| y_max = int(np.percentile(all_y_max, 50)) |
| |
| |
| y_min = max(0, y_min - 2) |
| y_max = min(h, y_max + 2) |
| |
| return y_min, y_max |
|
|
|
|
| def detect_and_crop_video(video_path, output_video_path, text_cut=True): |
| """ |
| Detecta a região com movimento no vídeo e gera um vídeo cropado. |
| Retorna True se o crop foi realizado, False caso contrário. |
| """ |
| if not os.path.exists(video_path): |
| print(f"Error: Video file not found at {video_path}") |
| return False |
|
|
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| print("Error: Could not open video.") |
| return False |
|
|
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| |
| |
| num_samples = 15 |
| indices = np.linspace(0, total_frames - 1, num_samples, dtype=int) |
| |
| frames_gray = [] |
| frames_bgr = [] |
| for i in indices: |
| cap.set(cv2.CAP_PROP_POS_FRAMES, i) |
| ret, frame = cap.read() |
| if ret: |
| frames_bgr.append(frame) |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| frames_gray.append(gray) |
| |
| cap.release() |
| |
| if len(frames_gray) < 2: |
| print(f"❌ Erro: Não foi possível ler frames suficientes ({len(frames_gray)}/{num_samples}) para análise.") |
| return False |
|
|
| |
| |
| |
| crop_coords = get_crop_detect_coords(video_path) |
| |
| use_motion_fallback = True |
| if crop_coords: |
| cw, ch, cx, cy = crop_coords |
| original_area = w * h |
| crop_area = cw * ch |
| reduction = (1 - crop_area / original_area) * 100 |
| |
| |
| if reduction > 10: |
| print(f"✅ Cropdetect sugeriu: {cw}x{ch} @ ({cx},{cy}) | Redução: {reduction:.1f}%") |
| x_min, y_min, x_max, y_max = cx, cy, cx + cw, cy + ch |
| use_motion_fallback = False |
| else: |
| print(f"⏩ Cropdetect sugeriu redução irrelevante ({reduction:.1f}%). Usando motion fallback...") |
|
|
| |
| |
| |
| if use_motion_fallback: |
| print(f"🔍 Analisando movimento em {len(frames_gray)} frames amostrados...") |
|
|
| |
| accum_diff = np.zeros((h, w), dtype=np.float32) |
| |
| for i in range(len(frames_gray) - 1): |
| diff = cv2.absdiff(frames_gray[i], frames_gray[i+1]) |
| accum_diff = cv2.add(accum_diff, diff.astype(np.float32)) |
|
|
| accum_diff = cv2.normalize(accum_diff, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) |
| _, thresh = cv2.threshold(accum_diff, 20, 255, cv2.THRESH_BINARY) |
| |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) |
| thresh = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) |
| thresh = cv2.morphologyEx(thresh, cv2.MORPH_OPEN, kernel) |
| |
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| |
| if not contours: |
| print("❌ Aviso: Nenhum movimento detectado nos frames selecionados.") |
| return False |
| |
| print(f"📊 Encontrados {len(contours)} contornos de movimento iniciais.") |
| |
| x_min, y_min = w, h |
| x_max, y_max = 0, 0 |
| |
| found_any = False |
| for c in contours: |
| if cv2.contourArea(c) > 500: |
| found_any = True |
| x, y, cw, ch = cv2.boundingRect(c) |
| x_min = min(x_min, x) |
| y_min = min(y_min, y) |
| x_max = max(x_max, x + cw) |
| y_max = max(y_max, y + ch) |
| |
| if not found_any: |
| print("❌ Aviso: Nenhum movimento significativo (>500px area) detectado.") |
| return False |
|
|
| print(f"✅ Movimento consolidado na região: {x_min},{y_min} até {x_max},{y_max}") |
|
|
| |
| |
| |
| density_coords = get_content_density_crop(frames_bgr) |
| if density_coords: |
| dy_min, dy_max = density_coords |
| print(f"🎨 Refinamento de densidade sugeriu: Y de {dy_min} até {dy_max}") |
| |
| |
| y_min = max(y_min, dy_min) |
| y_max = min(y_max, dy_max) |
| print(f"✨ Região refinada final: Y de {y_min} até {y_max}") |
|
|
| |
| inset = 2 |
| x_min = min(x_min + inset, w) |
| y_min = min(y_min + inset, h) |
| x_max = max(x_max - inset, x_min) |
| y_max = max(y_max - inset, y_min) |
| |
| final_w = x_max - x_min |
| final_h = y_max - y_min |
| |
| |
| if final_w % 2 != 0: final_w -= 1 |
| if final_h % 2 != 0: final_h -= 1 |
| |
| reduction_pct = (1 - (final_w * final_h) / (w * h)) * 100 |
| print(f"✂️ Motion Crop: {final_w}x{final_h} @ ({x_min},{y_min}) | Redução de área: {reduction_pct:.1f}%") |
|
|
| |
| has_nvenc = check_nvenc_support() |
|
|
| |
| crop_filter = f"crop={final_w}:{final_h}:{x_min}:{y_min}" |
| tmp_output_path = output_video_path + ".tmp.mp4" |
|
|
| cpu_cmd = [ |
| "ffmpeg", |
| "-y", "-loglevel", "error", |
| "-i", video_path, |
| "-vf", crop_filter, |
| "-c:a", "copy", |
| "-c:v", "libx264", |
| "-preset", "ultrafast", |
| "-crf", "23", |
| tmp_output_path |
| ] |
|
|
| |
| print(f"🔄 Iniciando ffmpeg crop...") |
| t_ffmpeg = time.time() |
| |
| crop_success = False |
| if has_nvenc: |
| nvenc_cmd = [ |
| "ffmpeg", |
| "-y", "-loglevel", "error", |
| "-i", video_path, |
| "-vf", crop_filter, |
| "-c:a", "copy", |
| "-c:v", "h264_nvenc", |
| "-preset", "fast", |
| "-cq", "20", |
| tmp_output_path |
| ] |
| try: |
| subprocess.run(nvenc_cmd, check=True, capture_output=True) |
| print(f"✅ Video crop concluído (NVENC) em {time.time() - t_ffmpeg:.1f}s") |
| crop_success = True |
| except subprocess.CalledProcessError: |
| print(f"⚠️ NVENC indisponível, usando CPU fallback...") |
| |
| |
| if not crop_success: |
| t_cpu = time.time() |
| try: |
| subprocess.run(cpu_cmd, check=True) |
| print(f"✅ Video crop concluído (CPU) em {time.time() - t_cpu:.1f}s") |
| crop_success = True |
| except subprocess.CalledProcessError as e: |
| print(f"❌ Video crop falhou (CPU): {e}") |
| return False |
|
|
| if crop_success: |
| if text_cut: |
| |
| print("🔄 Iniciando verificação de texto para segundo crop...") |
| text_crop_status = detect_and_crop_text(tmp_output_path, output_video_path) |
| |
| if text_crop_status == "success": |
| if os.path.exists(tmp_output_path): |
| os.remove(tmp_output_path) |
| return "success" |
| elif text_crop_status == "aborted_area_too_small": |
| if os.path.exists(tmp_output_path): |
| os.remove(tmp_output_path) |
| return "aborted_area_too_small" |
| else: |
| |
| shutil.move(tmp_output_path, output_video_path) |
| return "success" |
| else: |
| print("⏩ Pulando verificação OCR por configuração do usuário (text_cut=False).") |
| shutil.move(tmp_output_path, output_video_path) |
| return "success" |
|
|
| return "error" |