| """ |
| Complete utils/__init__.py with all required functions |
| Provides direct implementations to avoid import recursion |
| """ |
|
|
| import cv2 |
| import numpy as np |
| from PIL import Image |
| import torch |
| import logging |
| from typing import Optional, Tuple, Dict, Any, List |
| import tempfile |
| import os |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| PROFESSIONAL_BACKGROUNDS = { |
| "office": {"color": (240, 248, 255), "gradient": True}, |
| "studio": {"color": (32, 32, 32), "gradient": False}, |
| "nature": {"color": (34, 139, 34), "gradient": True}, |
| "abstract": {"color": (75, 0, 130), "gradient": True}, |
| "white": {"color": (255, 255, 255), "gradient": False}, |
| "black": {"color": (0, 0, 0), "gradient": False} |
| } |
|
|
| def validate_video_file(video_path: str) -> bool: |
| """Validate if video file is readable""" |
| try: |
| if not os.path.exists(video_path): |
| return False |
| |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| return False |
| |
| ret, frame = cap.read() |
| cap.release() |
| return ret and frame is not None |
| |
| except Exception as e: |
| logger.error(f"Video validation failed: {e}") |
| return False |
|
|
| def segment_person_hq(frame: np.ndarray, use_sam2: bool = True) -> Optional[np.ndarray]: |
| """High-quality person segmentation using SAM2 or fallback methods""" |
| try: |
| if use_sam2: |
| |
| try: |
| from sam2.sam2_image_predictor import SAM2ImagePredictor |
| from sam2.build_sam import build_sam2 |
| from huggingface_hub import hf_hub_download |
| |
| |
| sam_checkpoint = hf_hub_download("facebook/sam2-hiera-base-plus", "sam2_hiera_b+.pt") |
| sam_model = build_sam2(model_name='sam2_hiera_base_plus_t', ckpt_path=sam_checkpoint) |
| predictor = SAM2ImagePredictor(sam_model) |
| |
| |
| predictor.set_image(frame) |
| |
| |
| h, w = frame.shape[:2] |
| center_point = np.array([[w//2, h//2]]) |
| center_label = np.array([1]) |
| |
| masks, scores, _ = predictor.predict( |
| point_coords=center_point, |
| point_labels=center_label, |
| multimask_output=False |
| ) |
| |
| return masks[0] if len(masks) > 0 else None |
| |
| except Exception as e: |
| logger.warning(f"SAM2 segmentation failed: {e}, falling back to simple method") |
| |
| |
| return _simple_person_segmentation(frame) |
| |
| except Exception as e: |
| logger.error(f"Person segmentation failed: {e}") |
| return None |
|
|
| def _simple_person_segmentation(frame: np.ndarray) -> np.ndarray: |
| """Simple person segmentation using color-based methods""" |
| |
| hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV) |
| |
| |
| |
| lower_green = np.array([40, 40, 40]) |
| upper_green = np.array([80, 255, 255]) |
| green_mask = cv2.inRange(hsv, lower_green, upper_green) |
| |
| |
| lower_white = np.array([0, 0, 200]) |
| upper_white = np.array([180, 30, 255]) |
| white_mask = cv2.inRange(hsv, lower_white, upper_white) |
| |
| |
| bg_mask = cv2.bitwise_or(green_mask, white_mask) |
| |
| |
| person_mask = cv2.bitwise_not(bg_mask) |
| |
| |
| kernel = np.ones((5, 5), np.uint8) |
| person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_CLOSE, kernel) |
| person_mask = cv2.morphologyEx(person_mask, cv2.MORPH_OPEN, kernel) |
| |
| |
| return person_mask.astype(np.float32) / 255.0 |
|
|
| def refine_mask_hq(mask: np.ndarray, frame: np.ndarray, use_matanyone: bool = True) -> np.ndarray: |
| """High-quality mask refinement using MatAnyone or fallback methods""" |
| try: |
| if use_matanyone: |
| try: |
| from matanyone import InferenceCore |
| |
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| processor = InferenceCore(model_name="PeiqingYang/MatAnyone-v1.0", device=device) |
| |
| |
| frame_pil = Image.fromarray(frame.astype(np.uint8)) |
| mask_pil = Image.fromarray((mask * 255).astype(np.uint8)) |
| |
| |
| refined_mask = processor.infer(frame_pil, mask_pil) |
| |
| |
| return np.array(refined_mask).astype(np.float32) / 255.0 |
| |
| except Exception as e: |
| logger.warning(f"MatAnyone refinement failed: {e}, using simple refinement") |
| |
| |
| return _simple_mask_refinement(mask, frame) |
| |
| except Exception as e: |
| logger.error(f"Mask refinement failed: {e}") |
| return mask |
|
|
| def _simple_mask_refinement(mask: np.ndarray, frame: np.ndarray) -> np.ndarray: |
| """Simple mask refinement using OpenCV operations""" |
| |
| mask_uint8 = (mask * 255).astype(np.uint8) |
| |
| |
| mask_blurred = cv2.GaussianBlur(mask_uint8, (5, 5), 0) |
| |
| |
| mask_refined = cv2.bilateralFilter(mask_blurred, 9, 75, 75) |
| |
| |
| return mask_refined.astype(np.float32) / 255.0 |
|
|
| def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.ndarray) -> np.ndarray: |
| """High-quality background replacement with proper compositing""" |
| try: |
| |
| h, w = frame.shape[:2] |
| background_resized = cv2.resize(background, (w, h)) |
| |
| |
| if len(mask.shape) == 2: |
| mask_3d = np.stack([mask] * 3, axis=-1) |
| else: |
| mask_3d = mask |
| |
| |
| mask_feathered = _apply_feathering(mask_3d) |
| |
| |
| result = frame * mask_feathered + background_resized * (1 - mask_feathered) |
| |
| return result.astype(np.uint8) |
| |
| except Exception as e: |
| logger.error(f"Background replacement failed: {e}") |
| return frame |
|
|
| def _apply_feathering(mask: np.ndarray, feather_amount: int = 3) -> np.ndarray: |
| """Apply feathering to mask edges for smoother blending""" |
| if len(mask.shape) == 3: |
| |
| mask_single = mask[:, :, 0] |
| else: |
| mask_single = mask |
| |
| |
| mask_feathered = cv2.GaussianBlur(mask_single, (feather_amount*2+1, feather_amount*2+1), 0) |
| |
| |
| if len(mask.shape) == 3: |
| mask_feathered = np.stack([mask_feathered] * 3, axis=-1) |
| |
| return mask_feathered |
|
|
| def create_professional_background(bg_type: str, width: int, height: int) -> np.ndarray: |
| """Create professional background of specified type and size""" |
| try: |
| if bg_type not in PROFESSIONAL_BACKGROUNDS: |
| bg_type = "office" |
| |
| config = PROFESSIONAL_BACKGROUNDS[bg_type] |
| color = config["color"] |
| use_gradient = config["gradient"] |
| |
| if use_gradient: |
| |
| background = _create_gradient_background(color, width, height) |
| else: |
| |
| background = np.full((height, width, 3), color, dtype=np.uint8) |
| |
| return background |
| |
| except Exception as e: |
| logger.error(f"Background creation failed: {e}") |
| |
| return np.full((height, width, 3), (255, 255, 255), dtype=np.uint8) |
|
|
| def _create_gradient_background(base_color: Tuple[int, int, int], width: int, height: int) -> np.ndarray: |
| """Create a gradient background from base color""" |
| |
| r, g, b = base_color |
| |
| |
| dark_color = (int(r * 0.7), int(g * 0.7), int(b * 0.7)) |
| |
| |
| background = np.zeros((height, width, 3), dtype=np.uint8) |
| |
| for y in range(height): |
| |
| blend = y / height |
| |
| |
| current_r = int(dark_color[0] * (1 - blend) + r * blend) |
| current_g = int(dark_color[1] * (1 - blend) + g * blend) |
| current_b = int(dark_color[2] * (1 - blend) + b * blend) |
| |
| background[y, :] = [current_r, current_g, current_b] |
| |
| return background |
|
|
| |
| __all__ = [ |
| "segment_person_hq", |
| "refine_mask_hq", |
| "replace_background_hq", |
| "create_professional_background", |
| "PROFESSIONAL_BACKGROUNDS", |
| "validate_video_file" |
| ] |
|
|