| |
| """ |
| BackgroundFX Pro β Main Application Entry Point |
| Refactored modular architecture β orchestrates specialised components |
| """ |
| from __future__ import annotations |
|
|
| |
| import os |
|
|
| |
| os.environ.setdefault("OMP_NUM_THREADS", "2") |
| os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") |
| os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "max_split_size_mb:128") |
| os.environ.setdefault("LOG_LEVEL", "info") |
| os.environ.setdefault("APP_ENV", "production") |
|
|
| |
| from pathlib import Path |
| _base_cache = Path.home() / ".cache" |
| os.environ.setdefault("HF_HOME", str(_base_cache / "huggingface")) |
| os.environ.setdefault("TRANSFORMERS_CACHE", str(_base_cache / "huggingface" / "hub")) |
| os.environ.setdefault("TORCH_HOME", str(_base_cache / "torch")) |
|
|
| |
| def _ensure_cloudinary_url(): |
| if os.getenv("CLOUDINARY_URL"): |
| return |
| key = os.getenv("CLOUDINARY_API_KEY") |
| sec = os.getenv("CLOUDINARY_API_SECRET") |
| name = os.getenv("CLOUDINARY_CLOUD_NAME") |
| if key and sec and name: |
| os.environ["CLOUDINARY_URL"] = f"cloudinary://{key}:{sec}@{name}" |
|
|
| _ensure_cloudinary_url() |
|
|
| |
| try: |
| import early_env |
| except Exception: |
| pass |
|
|
| import logging |
| import threading |
| import traceback |
| import sys |
| import time |
| from typing import Optional, Tuple, Dict, Any, Callable |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| ) |
| logger = logging.getLogger("core.app") |
|
|
| |
| PROJECT_FILE = Path(__file__).resolve() |
| CORE_DIR = PROJECT_FILE.parent |
| ROOT = CORE_DIR.parent |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
|
|
| |
| loaders_dir = ROOT / "models" / "loaders" |
| loaders_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| try: |
| import gradio_client.utils as gc_utils |
| _orig_get_type = gc_utils.get_type |
| def _patched_get_type(schema): |
| if not isinstance(schema, dict): |
| if isinstance(schema, bool): return "boolean" |
| if isinstance(schema, str): return "string" |
| if isinstance(schema, (int, float)): return "number" |
| return "string" |
| return _orig_get_type(schema) |
| gc_utils.get_type = _patched_get_type |
| logger.info("Gradio schema patch applied") |
| except Exception as e: |
| logger.warning(f"Gradio patch failed: {e}") |
|
|
| |
| try: |
| from config.app_config import get_config |
| except ImportError: |
| |
| class DummyConfig: |
| def to_dict(self): |
| return {} |
| get_config = lambda: DummyConfig() |
|
|
| from utils.hardware.device_manager import DeviceManager |
| from utils.system.memory_manager import MemoryManager |
|
|
| |
| try: |
| from models.loaders.model_loader import ModelLoader |
| logger.info("Using split loader architecture") |
| except ImportError: |
| logger.warning("Split loaders not found, using legacy loader") |
| |
| from models.model_loader import ModelLoader |
|
|
| from processing.video.video_processor import CoreVideoProcessor |
| from processing.audio.audio_processor import AudioProcessor |
| from utils.monitoring.progress_tracker import ProgressTracker |
| from utils.cv_processing import validate_video_file |
|
|
| |
| TWO_STAGE_AVAILABLE = False |
| TWO_STAGE_IMPORT_ORIGIN = "" |
| TWO_STAGE_IMPORT_ERROR = "" |
| CHROMA_PRESETS: Dict[str, Dict[str, Any]] = {"standard": {}} |
| TwoStageProcessor = None |
|
|
| |
| two_stage_paths = [ |
| "processors.two_stage", |
| "processing.two_stage.two_stage_processor", |
| "processing.two_stage", |
| ] |
| for import_path in two_stage_paths: |
| try: |
| exec(f"from {import_path} import TwoStageProcessor, CHROMA_PRESETS") |
| TWO_STAGE_AVAILABLE = True |
| TWO_STAGE_IMPORT_ORIGIN = import_path |
| logger.info(f"Two-stage import OK ({import_path})") |
| break |
| except Exception as e: |
| TWO_STAGE_IMPORT_ERROR = str(e) |
| continue |
|
|
| if not TWO_STAGE_AVAILABLE: |
| logger.warning(f"Two-stage import FAILED from all paths: {TWO_STAGE_IMPORT_ERROR}") |
|
|
| |
| |
| try: |
| from tools.startup_selfcheck import schedule_startup_selfcheck |
| except Exception: |
| schedule_startup_selfcheck = None |
|
|
| |
| class ModelLoadingError(Exception): |
| pass |
| class VideoProcessingError(Exception): |
| pass |
|
|
| |
| |
| |
| class VideoProcessor: |
| """ |
| Main orchestrator β coordinates all specialised components. |
| """ |
| def __init__(self): |
| self.config = get_config() |
| self._patch_config_defaults(self.config) |
| self.device_manager = DeviceManager() |
| self.memory_manager = MemoryManager(self.device_manager.get_optimal_device()) |
| self.model_loader = ModelLoader(self.device_manager, self.memory_manager) |
| self.audio_processor = AudioProcessor() |
| self.core_processor: Optional[CoreVideoProcessor] = None |
| self.two_stage_processor: Optional[Any] = None |
| self.models_loaded = False |
| self.loading_lock = threading.Lock() |
| self.cancel_event = threading.Event() |
| self.progress_tracker: Optional[ProgressTracker] = None |
| logger.info(f"VideoProcessor on device: {self.device_manager.get_optimal_device()}") |
|
|
| |
| @staticmethod |
| def _patch_config_defaults(cfg: Any) -> None: |
| defaults = { |
| |
| "use_nvenc": False, |
| "prefer_mp4": True, |
| "video_codec": "mp4v", |
| "audio_copy": True, |
| "ffmpeg_path": "ffmpeg", |
| |
| "max_model_size": 0, |
| "max_model_size_bytes": 0, |
| |
| "output_dir": str((Path(__file__).resolve().parent.parent) / "outputs"), |
| |
| "matanyone_enabled": True, |
| "use_matanyone": True, |
| } |
| for k, v in defaults.items(): |
| if not hasattr(cfg, k): |
| setattr(cfg, k, v) |
| Path(cfg.output_dir).mkdir(parents=True, exist_ok=True) |
|
|
| |
| def _init_progress(self, video_path: str, cb: Optional[Callable] = None): |
| try: |
| import cv2 |
| cap = cv2.VideoCapture(video_path) |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| cap.release() |
| if total <= 0: |
| total = 100 |
| self.progress_tracker = ProgressTracker(total, cb) |
| except Exception as e: |
| logger.warning(f"Progress init failed: {e}") |
| self.progress_tracker = ProgressTracker(100, cb) |
|
|
| |
| def load_models(self, progress_callback: Optional[Callable] = None) -> str: |
| with self.loading_lock: |
| if self.models_loaded: |
| return "Models already loaded and validated" |
| try: |
| self.cancel_event.clear() |
| if progress_callback: |
| progress_callback(0.0, f"Loading on {self.device_manager.get_optimal_device()}") |
| sam2_loaded, mat_loaded = self.model_loader.load_all_models( |
| progress_callback=progress_callback, cancel_event=self.cancel_event |
| ) |
| if self.cancel_event.is_set(): |
| return "Model loading cancelled" |
|
|
| |
| sam2_predictor = sam2_loaded.model if sam2_loaded else None |
| mat_model = mat_loaded.model if mat_loaded else None |
|
|
| |
| self.core_processor = CoreVideoProcessor(config=self.config, models=self.model_loader) |
|
|
| |
| self.two_stage_processor = None |
| if TWO_STAGE_AVAILABLE and TwoStageProcessor and (sam2_predictor or mat_model): |
| try: |
| self.two_stage_processor = TwoStageProcessor( |
| sam2_predictor=sam2_predictor, matanyone_model=mat_model |
| ) |
| logger.info("Two-stage processor initialised") |
| except Exception as e: |
| logger.warning(f"Two-stage init failed: {e}") |
| self.two_stage_processor = None |
|
|
| self.models_loaded = True |
| msg = self.model_loader.get_load_summary() |
|
|
| |
| if self.two_stage_processor: |
| msg += "\nβ
Two-stage processor ready" |
| else: |
| msg += "\nβ οΈ Two-stage processor not available" |
|
|
| if mat_model: |
| msg += "\nβ
MatAnyone refinement active" |
| else: |
| msg += "\nβ οΈ MatAnyone not loaded (edges may be rough)" |
|
|
| logger.info(msg) |
| return msg |
| except (AttributeError, ModelLoadingError) as e: |
| self.models_loaded = False |
| err = f"Model loading failed: {e}" |
| logger.error(err) |
| return err |
| except Exception as e: |
| self.models_loaded = False |
| err = f"Unexpected error during model loading: {e}" |
| logger.error(f"{err}\n{traceback.format_exc()}") |
| return err |
|
|
| |
| def process_video( |
| self, |
| video_path: str, |
| background_choice: str, |
| custom_background_path: Optional[str] = None, |
| progress_callback: Optional[Callable] = None, |
| use_two_stage: bool = False, |
| chroma_preset: str = "standard", |
| key_color_mode: str = "auto", |
| preview_mask: bool = False, |
| preview_greenscreen: bool = False, |
| ) -> Tuple[Optional[str], Optional[str], str]: |
| |
| logger.info("=" * 60) |
| logger.info("BACKGROUND PATH DEBUGGING") |
| logger.info(f"background_choice: {background_choice}") |
| logger.info(f"custom_background_path type: {type(custom_background_path)}") |
| logger.info(f"custom_background_path value: {custom_background_path}") |
|
|
| |
| if isinstance(custom_background_path, dict): |
| original = custom_background_path |
| custom_background_path = custom_background_path.get('name') or custom_background_path.get('path') |
| logger.info(f"Extracted path from dict: {original} -> {custom_background_path}") |
|
|
| |
| try: |
| from PIL import Image |
| if isinstance(custom_background_path, Image.Image): |
| import tempfile |
| with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: |
| custom_background_path.save(tmp.name) |
| custom_background_path = tmp.name |
| logger.info(f"Saved PIL Image to: {custom_background_path}") |
| except ImportError: |
| pass |
|
|
| |
| if background_choice == "custom" or custom_background_path: |
| if custom_background_path: |
| if Path(custom_background_path).exists(): |
| logger.info(f"β
Background file exists: {custom_background_path}") |
| else: |
| logger.warning(f"β οΈ Background file does not exist: {custom_background_path}") |
| |
| import glob |
| patterns = [ |
| "/tmp/gradio*/**/*.jpg", |
| "/tmp/gradio*/**/*.jpeg", |
| "/tmp/gradio*/**/*.png", |
| "/tmp/**/*.jpg", |
| "/tmp/**/*.jpeg", |
| "/tmp/**/*.png", |
| ] |
| for pattern in patterns: |
| files = glob.glob(pattern, recursive=True) |
| if files: |
| |
| newest = max(files, key=os.path.getmtime) |
| logger.info(f"Found potential background: {newest}") |
| |
| if (time.time() - os.path.getmtime(newest)) < 300: |
| custom_background_path = newest |
| logger.info(f"β
Using recent temp file: {custom_background_path}") |
| break |
| else: |
| logger.error("β Custom background mode but path is None!") |
|
|
| logger.info(f"Final custom_background_path: {custom_background_path}") |
| logger.info("=" * 60) |
|
|
| if not self.models_loaded or not self.core_processor: |
| return None, None, "Models not loaded. Please click 'Load Models' first." |
| if self.cancel_event.is_set(): |
| return None, None, "Processing cancelled" |
|
|
| self._init_progress(video_path, progress_callback) |
| ok, why = validate_video_file(video_path) |
| if not ok: |
| return None, None, f"Invalid video: {why}" |
|
|
| try: |
| |
| mode = "two-stage" if use_two_stage else "single-stage" |
| matanyone_status = "enabled" if self.model_loader.get_matanyone() else "disabled" |
| logger.info(f"Processing video in {mode} mode, MatAnyone: {matanyone_status}") |
|
|
| |
| self._reset_matanyone_session() |
|
|
| if use_two_stage: |
| if not TWO_STAGE_AVAILABLE or self.two_stage_processor is None: |
| return None, None, "Two-stage processing not available" |
| final, green, msg = self._process_two_stage( |
| video_path, |
| background_choice, |
| custom_background_path, |
| progress_callback, |
| chroma_preset, |
| key_color_mode, |
| ) |
| return final, green, msg |
| else: |
| final, green, msg = self._process_single_stage( |
| video_path, |
| background_choice, |
| custom_background_path, |
| progress_callback, |
| preview_mask, |
| preview_greenscreen, |
| ) |
| return final, green, msg |
| except VideoProcessingError as e: |
| logger.error(f"Processing failed: {e}") |
| return None, None, f"Processing failed: {e}" |
| except Exception as e: |
| logger.error(f"Unexpected processing error: {e}\n{traceback.format_exc()}") |
| return None, None, f"Unexpected error: {e}" |
|
|
| |
| def _reset_matanyone_session(self): |
| """ |
| Ensure a fresh MatAnyone memory per video. The MatAnyone loader we use returns a |
| callable *stateful adapter*. If present, reset() clears its InferenceCore memory. |
| """ |
| try: |
| mat = self.model_loader.get_matanyone() |
| except Exception: |
| mat = None |
| if mat is not None and hasattr(mat, "reset") and callable(mat.reset): |
| try: |
| mat.reset() |
| logger.info("MatAnyone session reset for new video") |
| except Exception as e: |
| logger.warning(f"MatAnyone session reset failed (continuing): {e}") |
|
|
| |
| def _process_single_stage( |
| self, |
| video_path: str, |
| background_choice: str, |
| custom_background_path: Optional[str], |
| progress_callback: Optional[Callable], |
| preview_mask: bool, |
| preview_greenscreen: bool, |
| ) -> Tuple[Optional[str], Optional[str], str]: |
| |
| logger.info(f"[Single-stage] background_choice: {background_choice}") |
| logger.info(f"[Single-stage] custom_background_path: {custom_background_path}") |
|
|
| ts = int(time.time()) |
| out_dir = Path(self.config.output_dir) / "single_stage" |
| out_dir.mkdir(parents=True, exist_ok=True) |
| out_path = str(out_dir / f"processed_{ts}.mp4") |
|
|
| |
| result = self.core_processor.process_video( |
| input_path=video_path, |
| output_path=out_path, |
| bg_config={ |
| "background_choice": background_choice, |
| "custom_path": custom_background_path, |
| }, |
| progress_callback=progress_callback, |
| ) |
|
|
| if not result: |
| return None, None, "Video processing failed" |
|
|
| |
| if not (preview_mask or preview_greenscreen): |
| try: |
| final_path = self.audio_processor.add_audio_to_video( |
| original_video=video_path, processed_video=out_path |
| ) |
| except Exception as e: |
| logger.warning(f"Audio mux failed, returning video without audio: {e}") |
| final_path = out_path |
| else: |
| final_path = out_path |
|
|
| |
| try: |
| mat_loaded = bool(self.model_loader.get_matanyone()) |
| except Exception: |
| mat_loaded = False |
| matanyone_status = "β" if mat_loaded else "β" |
| msg = ( |
| "Processing completed.\n" |
| f"Frames: {result.get('frames', 'unknown')}\n" |
| f"Background: {background_choice}\n" |
| f"Mode: Single-stage\n" |
| f"MatAnyone: {matanyone_status}\n" |
| f"Device: {self.device_manager.get_optimal_device()}" |
| ) |
| return final_path, None, msg |
|
|
| |
| def _process_two_stage( |
| self, |
| video_path: str, |
| background_choice: str, |
| custom_background_path: Optional[str], |
| progress_callback: Optional[Callable], |
| chroma_preset: str, |
| key_color_mode: str, |
| ) -> Tuple[Optional[str], Optional[str], str]: |
| if self.two_stage_processor is None: |
| return None, None, "Two-stage processor not available" |
|
|
| |
| logger.info(f"[Two-stage] background_choice: {background_choice}") |
| logger.info(f"[Two-stage] custom_background_path: {custom_background_path}") |
|
|
| import cv2 |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| return None, None, "Could not open input video" |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 1280 |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 720 |
| cap.release() |
|
|
| |
| try: |
| background = self.core_processor.prepare_background( |
| background_choice, custom_background_path, w, h |
| ) |
| except Exception as e: |
| logger.error(f"Background preparation failed: {e}") |
| return None, None, f"Failed to prepare background: {e}" |
| if background is None: |
| return None, None, "Failed to prepare background" |
|
|
| ts = int(time.time()) |
| out_dir = Path(self.config.output_dir) / "two_stage" |
| out_dir.mkdir(parents=True, exist_ok=True) |
| final_out = str(out_dir / f"final_{ts}.mp4") |
|
|
| chroma_cfg = CHROMA_PRESETS.get(chroma_preset, CHROMA_PRESETS.get("standard", {})) |
| logger.info(f"Two-stage with preset: {chroma_preset} | key_color: {key_color_mode}") |
|
|
| |
| final_path, green_path, stage2_msg = self.two_stage_processor.process_full_pipeline( |
| video_path, |
| background, |
| final_out, |
| key_color_mode=key_color_mode, |
| chroma_settings=chroma_cfg, |
| progress_callback=progress_callback, |
| ) |
| if final_path is None: |
| return None, None, stage2_msg |
|
|
| |
| try: |
| final_with_audio = self.audio_processor.add_audio_to_video( |
| original_video=video_path, processed_video=final_path |
| ) |
| except Exception as e: |
| logger.warning(f"Audio mux failed: {e}") |
| final_with_audio = final_path |
|
|
| try: |
| mat_loaded = bool(self.model_loader.get_matanyone()) |
| except Exception: |
| mat_loaded = False |
| matanyone_status = "β" if mat_loaded else "β" |
| msg = ( |
| "Two-stage processing completed.\n" |
| f"Background: {background_choice}\n" |
| f"Chroma Preset: {chroma_preset}\n" |
| f"MatAnyone: {matanyone_status}\n" |
| f"Device: {self.device_manager.get_optimal_device()}" |
| ) |
| return final_with_audio, green_path, msg |
|
|
| |
| def get_status(self) -> Dict[str, Any]: |
| status = { |
| "models_loaded": self.models_loaded, |
| "two_stage_available": bool(TWO_STAGE_AVAILABLE and self.two_stage_processor), |
| "two_stage_origin": TWO_STAGE_IMPORT_ORIGIN or "", |
| "device": str(self.device_manager.get_optimal_device()), |
| "core_processor_loaded": self.core_processor is not None, |
| "config": self._safe_config_dict(), |
| "memory_usage": self._safe_memory_usage(), |
| } |
|
|
| try: |
| status["sam2_loaded"] = self.model_loader.get_sam2() is not None |
| status["matanyone_loaded"] = self.model_loader.get_matanyone() is not None |
| status["model_info"] = self.model_loader.get_model_info() |
| except Exception: |
| status["sam2_loaded"] = False |
| status["matanyone_loaded"] = False |
|
|
| if self.progress_tracker: |
| status["progress"] = self.progress_tracker.get_all_progress() |
|
|
| return status |
|
|
| def _safe_config_dict(self) -> Dict[str, Any]: |
| try: |
| return self.config.to_dict() |
| except Exception: |
| keys = ["use_nvenc", "prefer_mp4", "video_codec", "audio_copy", |
| "ffmpeg_path", "max_model_size", "max_model_size_bytes", |
| "output_dir", "matanyone_enabled"] |
| return {k: getattr(self.config, k, None) for k in keys} |
|
|
| def _safe_memory_usage(self) -> Dict[str, Any]: |
| try: |
| return self.memory_manager.get_memory_usage() |
| except Exception: |
| return {} |
|
|
| def cancel_processing(self): |
| self.cancel_event.set() |
| logger.info("Cancellation requested") |
|
|
| def cleanup_resources(self): |
| try: |
| self.memory_manager.cleanup_aggressive() |
| except Exception: |
| pass |
| try: |
| self.model_loader.cleanup() |
| except Exception: |
| pass |
| logger.info("Resources cleaned up") |
|
|
| |
| processor = VideoProcessor() |
|
|
| def load_models_with_validation(progress_callback: Optional[Callable] = None) -> str: |
| return processor.load_models(progress_callback) |
|
|
| def process_video_fixed( |
| video_path: str, |
| background_choice: str, |
| custom_background_path: Optional[str], |
| progress_callback: Optional[Callable] = None, |
| use_two_stage: bool = False, |
| chroma_preset: str = "standard", |
| key_color_mode: str = "auto", |
| preview_mask: bool = False, |
| preview_greenscreen: bool = False, |
| ) -> Tuple[Optional[str], Optional[str], str]: |
| return processor.process_video( |
| video_path, |
| background_choice, |
| custom_background_path, |
| progress_callback, |
| use_two_stage, |
| chroma_preset, |
| key_color_mode, |
| preview_mask, |
| preview_greenscreen, |
| ) |
|
|
| def get_model_status() -> Dict[str, Any]: |
| return processor.get_status() |
|
|
| def get_cache_status() -> Dict[str, Any]: |
| return processor.get_status() |
|
|
| PROCESS_CANCELLED = processor.cancel_event |
|
|
| |
| def main(): |
| try: |
| logger.info("Starting BackgroundFX Pro") |
| logger.info(f"Device: {processor.device_manager.get_optimal_device()}") |
| logger.info(f"Two-stage available: {TWO_STAGE_AVAILABLE}") |
|
|
| |
| if schedule_startup_selfcheck is not None: |
| try: |
| schedule_startup_selfcheck(mode=os.getenv("SELF_CHECK_MODE", "async")) |
| except Exception as e: |
| logger.error(f"Startup self-check skipped: {e}", exc_info=True) |
|
|
| |
| try: |
| from models.loaders.model_loader import ModelLoader |
| logger.info("Using split loader architecture") |
| except Exception: |
| logger.info("Using legacy loader") |
|
|
| |
| |
| try: |
| |
| from ui import ui_components |
|
|
| |
| if hasattr(ui_components, 'create_interface'): |
| create_interface = ui_components.create_interface |
| else: |
| logger.error("create_interface not found in ui_components") |
| logger.error(f"Available attributes: {dir(ui_components)}") |
| raise ImportError("create_interface function not found") |
|
|
| except ImportError as e: |
| logger.error(f"Failed to import UI components: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
| |
| try: |
| logger.info("Trying alternate import method...") |
| import importlib |
| ui_components = importlib.import_module('ui.ui_components') |
| create_interface = getattr(ui_components, 'create_interface') |
| logger.info("Alternate import successful") |
| except Exception as e2: |
| logger.error(f"Alternate import also failed: {e2}") |
| logger.info("System initialized but UI unavailable. Exiting.") |
| return |
|
|
| |
| try: |
| demo = create_interface() |
| demo.queue().launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True, |
| debug=False, |
| ) |
| except Exception as e: |
| logger.error(f"Failed to launch Gradio interface: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
| except Exception as e: |
| logger.error(f"Fatal error in main: {e}") |
| import traceback |
| traceback.print_exc() |
| finally: |
| processor.cleanup_resources() |
|
|
| if __name__ == "__main__": |
| main() |