| import os |
| import sys |
| import shutil |
| import threading |
| import time |
| from pathlib import Path |
| from typing import Any, Optional |
|
|
| import gradio as gr |
| import numpy as np |
| import torch |
| from PIL import Image |
|
|
| |
| if not os.environ.get("HF_TOKEN") and not os.environ.get("HUGGING_FACE_HUB_TOKEN"): |
| _hub_tok = (os.environ.get("near") or os.environ.get("NEAR") or "").strip() |
| if _hub_tok: |
| os.environ["HF_TOKEN"] = _hub_tok |
| print( |
| "[HyShape] HF_TOKEN unset; using Space secret 'near' as HF_TOKEN.", |
| flush=True, |
| ) |
|
|
| |
| try: |
| _raw_zerogpu_cap = int(os.environ.get("NEAR_ZEROGPU_HF_CEILING_S", "90")) |
| except ValueError: |
| _raw_zerogpu_cap = 90 |
| _ZEROGPU_ENV_CAP_S = min(max(15, _raw_zerogpu_cap), 120) |
| for _env_key in ("NEAR_ZEROGPU_MAX_SECONDS", "NEAR_ZEROGPU_DURATION_CAP"): |
| if _env_key in os.environ: |
| try: |
| if int(os.environ[_env_key]) > _ZEROGPU_ENV_CAP_S: |
| os.environ[_env_key] = str(_ZEROGPU_ENV_CAP_S) |
| except ValueError: |
| pass |
| print( |
| f"[HyShape] ZeroGPU cap set to {_ZEROGPU_ENV_CAP_S}s. " |
| "Callbacks use plain spaces.GPU.", |
| flush=True, |
| ) |
|
|
| try: |
| import spaces |
| except ImportError: |
| spaces = None |
|
|
| sys.path.insert(0, "./hy3dshape") |
| os.environ.setdefault("ATTN_BACKEND", "xformers") |
| os.environ.setdefault("SPCONV_ALGO", "native") |
| os.environ.setdefault("TORCH_CUDA_ARCH_LIST", "7.5;8.0;8.6;8.9;9.0") |
|
|
| GPU = spaces.GPU if spaces is not None else (lambda f: f) |
|
|
|
|
| def _truthy_env(name: str, default: str) -> bool: |
| value = (os.environ.get(name) if name in os.environ else default).strip().lower() |
| return value in ("1", "true", "yes", "on") |
|
|
|
|
| |
| _CPU_PRELOAD_AT_START = _truthy_env("NEAR_HYSHAPE_GEOMETRY_CPU_PRELOAD_AT_START", "1") |
| print( |
| f"[HyShape] background CPU geometry preload at start: " |
| f"{'enabled' if _CPU_PRELOAD_AT_START else 'disabled'} " |
| f"(NEAR_HYSHAPE_GEOMETRY_CPU_PRELOAD_AT_START, default 1).", |
| flush=True, |
| ) |
|
|
| APP_DIR = Path(__file__).resolve().parent |
| CACHE_DIR = APP_DIR / "tmp_gradio_hyshape" |
| CACHE_DIR.mkdir(exist_ok=True) |
| DEFAULT_IMAGE = APP_DIR / "assets/example_image/T.png" |
| DEFAULT_PORT = 7860 |
|
|
| _SESSION_LAST_TOUCH: dict[str, float] = {} |
| _SESSION_TOUCH_LOCK = threading.Lock() |
| _MODEL_LOCK = threading.Lock() |
| _LIGHT_PREPROCESS_LOCK = threading.Lock() |
| _LIGHT_PREPROCESSOR: Any | None = None |
| GEOMETRY_PIPELINE: Any | None = None |
|
|
|
|
| def _path_is_git_lfs_pointer(path: Path) -> bool: |
| try: |
| if not path.is_file(): |
| return False |
| if path.stat().st_size > 512: |
| return False |
| head = path.read_bytes()[:120] |
| return head.startswith(b"version https://git-lfs.github.com/spec/v1") |
| except OSError: |
| return False |
|
|
|
|
| def _session_touch(session_id: str) -> None: |
| with _SESSION_TOUCH_LOCK: |
| _SESSION_LAST_TOUCH[session_id] = time.time() |
|
|
|
|
| def _session_forget(session_id: str) -> None: |
| with _SESSION_TOUCH_LOCK: |
| _SESSION_LAST_TOUCH.pop(session_id, None) |
|
|
|
|
| def ensure_session_dir(req: Optional[gr.Request]) -> Path: |
| session_id = getattr(req, "session_hash", None) or "shared" |
| session_dir = CACHE_DIR / str(session_id) |
| session_dir.mkdir(parents=True, exist_ok=True) |
| _session_touch(str(session_id)) |
| return session_dir |
|
|
|
|
| def clear_session_dir(req: Optional[gr.Request]) -> str: |
| session_dir = ensure_session_dir(req) |
| shutil.rmtree(session_dir, ignore_errors=True) |
| session_dir.mkdir(parents=True, exist_ok=True) |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| return "HyShape cache cleared." |
|
|
|
|
| def end_session(req: gr.Request) -> None: |
| session_id = getattr(req, "session_hash", None) or "shared" |
| shutil.rmtree(CACHE_DIR / str(session_id), ignore_errors=True) |
| _session_forget(str(session_id)) |
|
|
|
|
| def _ensure_rgba(image: Image.Image) -> Image.Image: |
| if image.mode == "RGBA": |
| return image |
| if image.mode == "RGB": |
| r, g, b = image.split() |
| a = Image.new("L", image.size, 255) |
| return Image.merge("RGBA", (r, g, b, a)) |
| return image.convert("RGBA") |
|
|
|
|
| def _flatten_rgba_on_matte(image: Image.Image, matte_rgb: tuple[float, float, float]) -> Image.Image: |
| rgba = _ensure_rgba(image) |
| matte = tuple(int(round(channel * 255)) for channel in matte_rgb) |
| background = Image.new("RGBA", rgba.size, matte + (255,)) |
| return Image.alpha_composite(background, rgba).convert("RGB") |
|
|
|
|
| def _get_light_image_preprocessor(): |
| global _LIGHT_PREPROCESSOR |
| if _LIGHT_PREPROCESSOR is not None: |
| return _LIGHT_PREPROCESSOR |
| with _LIGHT_PREPROCESS_LOCK: |
| if _LIGHT_PREPROCESSOR is None: |
| from hy3dshape.rembg import BackgroundRemover |
|
|
| _LIGHT_PREPROCESSOR = BackgroundRemover() |
| print("[HyShape] Background remover ready.", flush=True) |
| return _LIGHT_PREPROCESSOR |
|
|
|
|
| def _preprocess_image_rgba_light(input_image: Image.Image) -> Image.Image: |
| image = _ensure_rgba(input_image) |
| has_alpha = False |
| if image.mode == "RGBA": |
| alpha = np.array(image)[:, :, 3] |
| has_alpha = not np.all(alpha == 255) |
|
|
| if has_alpha: |
| output = image |
| else: |
| rgb = image.convert("RGB") |
| max_size = max(rgb.size) |
| scale = min(1, 1024 / max_size) |
| if scale < 1: |
| rgb = rgb.resize( |
| (int(rgb.width * scale), int(rgb.height * scale)), |
| Image.Resampling.LANCZOS, |
| ) |
| output = _get_light_image_preprocessor()(rgb) |
|
|
| if output.mode != "RGBA": |
| output = output.convert("RGBA") |
| output_np = np.array(output) |
| alpha = output_np[:, :, 3] |
| bbox = np.argwhere(alpha > 0.8 * 255) |
| if bbox.size == 0: |
| return output.resize((518, 518), Image.Resampling.LANCZOS).convert("RGBA") |
|
|
| crop_bbox = ( |
| int(np.min(bbox[:, 1])), |
| int(np.min(bbox[:, 0])), |
| int(np.max(bbox[:, 1])), |
| int(np.max(bbox[:, 0])), |
| ) |
| center = ((crop_bbox[0] + crop_bbox[2]) / 2, (crop_bbox[1] + crop_bbox[3]) / 2) |
| size = max(crop_bbox[2] - crop_bbox[0], crop_bbox[3] - crop_bbox[1]) |
| size = int(size * 1.2) |
| padded_bbox = ( |
| center[0] - size // 2, |
| center[1] - size // 2, |
| center[0] + size // 2, |
| center[1] + size // 2, |
| ) |
| return output.crop(padded_bbox).resize((518, 518), Image.Resampling.LANCZOS).convert("RGBA") |
|
|
|
|
| def preprocess_image_only(image_input: Optional[Image.Image]): |
| if image_input is None: |
| return None, None, "Upload an input image." |
| started_at = time.time() |
| rgba = _preprocess_image_rgba_light(image_input) |
| elapsed = time.time() - started_at |
| print(f"[HyShape] lightweight preprocess done in {elapsed:.1f}s", flush=True) |
| return rgba, rgba, f"Image preprocessed in {elapsed:.1f}s." |
|
|
|
|
| def _ensure_geometry_loaded_on_cpu_locked() -> Any: |
| """Caller must hold ``_MODEL_LOCK``. Loads weights on CPU only (no ``.to(cuda)``).""" |
| global GEOMETRY_PIPELINE |
| if GEOMETRY_PIPELINE is not None: |
| return GEOMETRY_PIPELINE |
|
|
| from hy3dshape.pipelines import Hunyuan3DDiTFlowMatchingPipeline |
|
|
| hy_id = os.environ.get("NEAR_HUNYUAN_PRETRAINED", "tencent/Hunyuan3D-2.1") |
| started_at = time.time() |
| print(f"[HyShape] Loading geometry on CPU from {hy_id!r}...", flush=True) |
| GEOMETRY_PIPELINE = Hunyuan3DDiTFlowMatchingPipeline.from_pretrained(hy_id, device="cpu") |
| print(f"[HyShape] from_pretrained (CPU only) done in {time.time() - started_at:.1f}s", flush=True) |
| return GEOMETRY_PIPELINE |
|
|
|
|
| def preload_geometry_cpu_worker() -> None: |
| """Runs in a daemon thread at Space startup; does not use ``@spaces.GPU``.""" |
| try: |
| started_at = time.time() |
| print("[HyShape] background: CPU geometry preload started", flush=True) |
| with _MODEL_LOCK: |
| _ensure_geometry_loaded_on_cpu_locked() |
| print(f"[HyShape] background: CPU geometry preload finished in {time.time() - started_at:.1f}s", flush=True) |
| except Exception as exc: |
| print(f"[HyShape] background: CPU geometry preload failed: {exc}", flush=True) |
|
|
|
|
| def start_geometry_cpu_preload_thread() -> None: |
| threading.Thread( |
| target=preload_geometry_cpu_worker, |
| daemon=True, |
| name="hyshape-geometry-cpu-preload", |
| ).start() |
|
|
|
|
| def ensure_geometry_on_cuda() -> Any: |
| """Load on CPU if needed, then move to CUDA inside a ``@spaces.GPU`` callback.""" |
| with _MODEL_LOCK: |
| pipeline = _ensure_geometry_loaded_on_cpu_locked() |
| if torch.cuda.is_available(): |
| move_started_at = time.time() |
| pipeline.to("cuda") |
| print( |
| f"[HyShape] geometry on GPU (to() took {time.time() - move_started_at:.1f}s)", |
| flush=True, |
| ) |
| else: |
| print("[HyShape] CUDA unavailable in this callback; geometry stays on CPU.", flush=True) |
| return pipeline |
|
|
|
|
| @GPU |
| @torch.inference_mode() |
| def generate_mesh( |
| image_input: Optional[Image.Image], |
| req: gr.Request, |
| progress=gr.Progress(track_tqdm=True), |
| ): |
| started_at = time.time() |
| print( |
| "[HyShape] generate_mesh callback entered " |
| f"(cuda_available={torch.cuda.is_available()}, session={getattr(req, 'session_hash', 'shared')})", |
| flush=True, |
| ) |
| progress(0.05, desc="Entered GPU callback") |
|
|
| if image_input is None: |
| raise gr.Error("Please upload an input image.") |
|
|
| session_dir = ensure_session_dir(req) |
| rgba = _ensure_rgba(image_input) |
| if rgba.size != (518, 518): |
| rgba = _preprocess_image_rgba_light(rgba) |
|
|
| rgba_path = session_dir / "input_preprocessed_rgba.png" |
| rgba.save(rgba_path) |
| mesh_rgb = _flatten_rgba_on_matte(rgba, (1.0, 1.0, 1.0)) |
| mesh_rgb.save(session_dir / "input_processed.png") |
|
|
| progress(0.2, desc="Moving geometry to GPU") |
| geometry_pipeline = ensure_geometry_on_cuda() |
|
|
| progress(0.5, desc="Generating geometry") |
| mesh_started_at = time.time() |
| mesh = geometry_pipeline(image=mesh_rgb)[0] |
| print(f"[HyShape] geometry generation done in {time.time() - mesh_started_at:.1f}s", flush=True) |
|
|
| mesh_path = session_dir / "hyshape_mesh.glb" |
| mesh.export(mesh_path) |
| total_elapsed = time.time() - started_at |
| print(f"[HyShape] generate_mesh total: {total_elapsed:.1f}s", flush=True) |
| return rgba, str(mesh_path), f"HyShape mesh ready in {total_elapsed:.1f}s." |
|
|
|
|
| def build_app() -> gr.Blocks: |
| example_images = [ |
| [str(path)] |
| for path in sorted((APP_DIR / "assets/example_image").glob("*.png")) |
| if not _path_is_git_lfs_pointer(path) |
| ] |
|
|
| with gr.Blocks(title="HyShape ZeroGPU Probe", delete_cache=None) as demo: |
| gr.Markdown( |
| """ |
| ## HyShape ZeroGPU Probe |
| This diagnostic app isolates the Hunyuan geometry path. |
| |
| - Upload an image or click an example. |
| - The upload path only performs lightweight preprocessing. |
| - `Generate Mesh` is the only place that requests ZeroGPU: it moves the CPU-loaded weights to GPU and runs inference. |
| - By default a **background thread** loads Hunyuan on **CPU at container start** (no GPU lease). Disable with `NEAR_HYSHAPE_GEOMETRY_CPU_PRELOAD_AT_START=0`. |
| """ |
| ) |
|
|
| with gr.Row(equal_height=False): |
| with gr.Column(scale=1, min_width=360): |
| image_input = gr.Image( |
| label="Input Image", |
| type="pil", |
| image_mode="RGBA", |
| value=str(DEFAULT_IMAGE) if DEFAULT_IMAGE.exists() else None, |
| height=400, |
| ) |
| mesh_button = gr.Button("Generate Mesh", variant="primary") |
| clear_button = gr.Button("Clear Cache", variant="secondary") |
|
|
| if example_images: |
| gr.Examples( |
| examples=example_images, |
| inputs=[image_input], |
| label="Example Images", |
| ) |
|
|
| with gr.Column(scale=2, min_width=560): |
| status_md = gr.Markdown("Ready.") |
| processed_preview = gr.Image( |
| label="Preprocessed RGBA", |
| interactive=False, |
| height=320, |
| ) |
| mesh_viewer = gr.Model3D( |
| label="Generated Mesh", |
| interactive=False, |
| height=520, |
| ) |
|
|
| demo.unload(end_session) |
|
|
| image_input.upload( |
| preprocess_image_only, |
| inputs=[image_input], |
| outputs=[image_input, processed_preview, status_md], |
| ) |
|
|
| mesh_button.click( |
| generate_mesh, |
| inputs=[image_input], |
| outputs=[processed_preview, mesh_viewer, status_md], |
| ) |
|
|
| clear_button.click( |
| clear_session_dir, |
| outputs=[status_md], |
| ).then( |
| lambda: (None, None), |
| outputs=[processed_preview, mesh_viewer], |
| ) |
|
|
| return demo |
|
|
|
|
| demo = build_app() |
| demo.queue(max_size=2) |
|
|
| if _CPU_PRELOAD_AT_START: |
| start_geometry_cpu_preload_thread() |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
|
|
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--host", |
| type=str, |
| default=os.environ.get("GRADIO_SERVER_NAME", "0.0.0.0"), |
| ) |
| parser.add_argument( |
| "--port", |
| type=int, |
| default=int(os.environ.get("PORT", os.environ.get("GRADIO_SERVER_PORT", str(DEFAULT_PORT)))), |
| ) |
| parser.add_argument("--share", action="store_true") |
| args = parser.parse_args() |
|
|
| demo.launch( |
| server_name=args.host, |
| server_port=args.port, |
| share=args.share, |
| ) |
|
|