| |
| |
| """ |
| MatAnyone adapter — Using Official API (File-Based) |
| (Enhanced logging, explicit error handling, and stage progress) |
| |
| ... |
| """ |
| from __future__ import annotations |
| import os |
| import time |
| import logging |
| import tempfile |
| import importlib.metadata |
| from pathlib import Path |
| from typing import Optional, Callable, Tuple |
|
|
| logging.basicConfig(level=logging.INFO) |
| log = logging.getLogger(__name__) |
|
|
| |
| def _env_flag(name: str, default: str = "0") -> bool: |
| return os.getenv(name, default).strip().lower() in {"1", "true", "yes", "on"} |
|
|
| _PROGRESS_CB_ENABLED = _env_flag("MATANY_PROGRESS", "1") |
| _PROGRESS_MIN_INTERVAL = float(os.getenv("MATANY_PROGRESS_MIN_SEC", "0.25")) |
| _progress_last = 0.0 |
| _progress_last_msg = None |
| _progress_disabled = False |
|
|
| def _emit_progress(cb, pct: float, msg: str): |
| global _progress_last, _progress_last_msg, _progress_disabled |
| if not cb or not _PROGRESS_CB_ENABLED or _progress_disabled: |
| return |
| now = time.time() |
| if (now - _progress_last) < _PROGRESS_MIN_INTERVAL and msg == _progress_last_msg: |
| return |
| try: |
| try: |
| cb(pct, msg) |
| except TypeError: |
| cb(msg) |
| _progress_last = now |
| _progress_last_msg = msg |
| except Exception as e: |
| _progress_disabled = True |
| log.warning("[progress-cb] disabled due to exception: %s", e) |
|
|
| class MatAnyError(RuntimeError): |
| pass |
|
|
| def _cuda_snapshot(device: Optional[str]) -> str: |
| try: |
| import torch |
| if not torch.cuda.is_available(): |
| return "CUDA: N/A" |
| idx = 0 |
| if device and device.startswith("cuda:"): |
| try: |
| idx = int(device.split(":")[1]) |
| except (ValueError, IndexError): |
| idx = 0 |
| name = torch.cuda.get_device_name(idx) |
| alloc = torch.cuda.memory_allocated(idx) / (1024**3) |
| resv = torch.cuda.memory_reserved(idx) / (1024**3) |
| return f"device={idx}, name={name}, alloc={alloc:.2f}GB, reserved={resv:.2f}GB" |
| except Exception as e: |
| return f"CUDA snapshot error: {e!r}" |
|
|
| def _safe_empty_cache(): |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| log.info(f"[MATANY] CUDA memory before empty_cache: {_cuda_snapshot('cuda:0')}") |
| torch.cuda.empty_cache() |
| log.info(f"[MATANY] CUDA memory after empty_cache: {_cuda_snapshot('cuda:0')}") |
| except Exception: |
| pass |
|
|
| class MatAnyoneSession: |
| """ |
| Simple wrapper around MatAnyone's official API. |
| Uses file-based input/output as designed by the MatAnyone authors. |
| """ |
| def __init__(self, device: Optional[str] = None, precision: str = "auto"): |
| log.info(f"[MatAnyoneSession.__init__] device={device}, precision={precision}") |
| self.device = device or ("cuda" if self._cuda_available() else "cpu") |
| self.precision = precision.lower() |
| try: |
| version = importlib.metadata.version("matanyone") |
| log.info(f"[MATANY] MatAnyone version: {version}") |
| except Exception: |
| log.info("[MATANY] MatAnyone version unknown") |
| try: |
| from matanyone import InferenceCore |
| self.processor = InferenceCore("PeiqingYang/MatAnyone") |
| log.info("[MATANY] MatAnyone InferenceCore initialized successfully") |
| except Exception as e: |
| log.error(f"[MatAnyoneSession.__init__] Failed to initialize MatAnyone: {e}", exc_info=True) |
| raise MatAnyError(f"Failed to initialize MatAnyone: {e}") |
| |
| def _cuda_available(self) -> bool: |
| try: |
| import torch |
| return torch.cuda.is_available() |
| except Exception: |
| return False |
| |
| def process_stream( |
| self, |
| video_path: Path, |
| seed_mask_path: Optional[Path] = None, |
| out_dir: Optional[Path] = None, |
| progress_cb: Optional[Callable] = None, |
| ) -> Tuple[Path, Path]: |
| log.info(f"[MatAnyoneSession.process_stream] Start: video={video_path}, mask={seed_mask_path}, out_dir={out_dir}") |
| video_path = Path(video_path) |
| if not video_path.exists(): |
| log.error(f"[MatAnyoneSession.process_stream] Video file not found: {video_path}") |
| raise MatAnyError(f"Video file not found: {video_path}") |
| if seed_mask_path and not Path(seed_mask_path).exists(): |
| log.error(f"[MatAnyoneSession.process_stream] Seed mask not found: {seed_mask_path}") |
| raise MatAnyError(f"Seed mask not found: {seed_mask_path}") |
| out_dir = Path(out_dir) if out_dir else video_path.parent / "matanyone_output" |
| out_dir.mkdir(parents=True, exist_ok=True) |
| log.info(f"[MATANY] Processing video: {video_path}") |
| log.info(f"[MATANY] Using mask: {seed_mask_path}") |
| log.info(f"[MATANY] Output directory: {out_dir}") |
| _emit_progress(progress_cb, 0.0, "Initializing MatAnyone processing...") |
| try: |
| start_time = time.time() |
| _emit_progress(progress_cb, 0.1, "Running MatAnyone video matting...") |
| foreground_path, alpha_path = self.processor.process_video( |
| input_path=str(video_path), |
| mask_path=str(seed_mask_path) if seed_mask_path else None, |
| output_path=str(out_dir) |
| ) |
| processing_time = time.time() - start_time |
| log.info(f"[MATANY] Processing completed in {processing_time:.1f}s") |
| log.info(f"[MATANY] Foreground output: {foreground_path}") |
| log.info(f"[MATANY] Alpha output: {alpha_path}") |
| fg_path = Path(foreground_path) if foreground_path else None |
| al_path = Path(alpha_path) if alpha_path else None |
| if not fg_path or not fg_path.exists(): |
| log.error(f"[MatAnyoneSession.process_stream] Foreground output not created: {fg_path}") |
| raise MatAnyError(f"Foreground output not created: {fg_path}") |
| if not al_path or not al_path.exists(): |
| log.error(f"[MatAnyoneSession.process_stream] Alpha output not created: {al_path}") |
| raise MatAnyError(f"Alpha output not created: {al_path}") |
| _emit_progress(progress_cb, 1.0, "MatAnyone processing complete") |
| log.info(f"[MatAnyoneSession.process_stream] Success, returning paths.") |
| return al_path, fg_path |
| except Exception as e: |
| log.error(f"[MatAnyoneSession.process_stream] Processing failed: {e}", exc_info=True) |
| raise MatAnyError(f"MatAnyone processing failed: {e}") |
| finally: |
| _safe_empty_cache() |
|
|
| class MatAnyoneModel: |
| """Wrapper class for MatAnyone to match app_hf.py interface""" |
| def __init__(self, device="cuda"): |
| log.info(f"[MatAnyoneModel.__init__] device={device}") |
| self.device = device |
| self.session = None |
| self.loaded = False |
| self._load_model() |
| def _load_model(self): |
| try: |
| self.session = MatAnyoneSession(device=self.device, precision="auto") |
| self.loaded = True |
| log.info("[MatAnyoneModel._load_model] Loaded successfully") |
| except Exception as e: |
| log.error(f"[MatAnyoneModel._load_model] Error loading: {e}", exc_info=True) |
| self.loaded = False |
| def replace_background(self, video_path, masks, background_path): |
| log.info(f"[MatAnyoneModel.replace_background] Start") |
| if not self.loaded: |
| log.error("[MatAnyoneModel.replace_background] Model not loaded") |
| raise MatAnyError("MatAnyoneModel not loaded") |
| try: |
| video_path = Path(video_path) |
| mask_path = Path(masks) if isinstance(masks, (str, Path)) else None |
| with tempfile.TemporaryDirectory() as temp_dir: |
| output_dir = Path(temp_dir) |
| alpha_path, fg_path = self.session.process_stream( |
| video_path=video_path, |
| seed_mask_path=mask_path, |
| out_dir=output_dir, |
| progress_cb=None |
| ) |
| log.info(f"[MatAnyoneModel.replace_background] Success, returning fg_path: {fg_path}") |
| return str(fg_path) |
| except Exception as e: |
| log.error(f"[MatAnyoneModel.replace_background] Error: {e}", exc_info=True) |
| raise MatAnyError(f"Background replacement failed: {e}") |
|
|
| def create_matanyone_session(device=None): |
| log.info(f"[create_matanyone_session] device={device}") |
| return MatAnyoneSession(device=device) |
|
|
| def run_matanyone_on_files(video_path, mask_path, output_dir, device="cuda", progress_callback=None): |
| log.info(f"[run_matanyone_on_files] Start: video={video_path}, mask={mask_path}, out={output_dir}, device={device}") |
| try: |
| session = MatAnyoneSession(device=device) |
| alpha_path, fg_path = session.process_stream( |
| video_path=Path(video_path), |
| seed_mask_path=Path(mask_path) if mask_path else None, |
| out_dir=Path(output_dir), |
| progress_cb=progress_callback |
| ) |
| log.info(f"[run_matanyone_on_files] Success, returning (alpha, fg): {alpha_path}, {fg_path}") |
| return str(alpha_path), str(fg_path) |
| except Exception as e: |
| log.error(f"[run_matanyone_on_files] MatAnyone processing failed: {e}", exc_info=True) |
| return None, None |
|
|