Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import os | |
| import sys | |
| from pathlib import Path | |
| try: | |
| import spaces | |
| except ImportError: | |
| class _SpacesCompat: | |
| def GPU(*decorator_args, **decorator_kwargs): | |
| if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1 and not decorator_kwargs: | |
| return decorator_args[0] | |
| def decorator(fn): | |
| return fn | |
| return decorator | |
| spaces = _SpacesCompat() | |
| import gradio as gr | |
| import torch | |
| try: | |
| from huggingface_hub import snapshot_download | |
| except Exception: | |
| snapshot_download = None | |
| CURRENT_FILE = Path(__file__).resolve() | |
| PROJECT_ROOT = CURRENT_FILE.parents[1] | |
| for candidate in (CURRENT_FILE.parent, CURRENT_FILE.parents[1]): | |
| if (candidate / "infer").exists() and (candidate / "models").exists(): | |
| PROJECT_ROOT = candidate | |
| break | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from demo.real_world_pipeline import ( # noqa: E402 | |
| DEFAULT_BBOX_MODEL, | |
| DEFAULT_MODEL_REPO_ID, | |
| DEFAULT_REAL_CONFIG_PATH, | |
| DEFAULT_RUN_NAME, | |
| DEFAULT_WORK_DIR, | |
| run_real_world_pipeline, | |
| ) | |
| DEFAULT_EXAMPLE_DIR = Path( | |
| os.environ.get( | |
| "SYNLAYERS_EXAMPLE_DIR", | |
| str(PROJECT_ROOT / "demo" / "examples"), | |
| ) | |
| ) | |
| HF_HOME = Path(os.environ.get("HF_HOME", str(Path.home() / ".cache" / "huggingface"))) | |
| HF_HOME.mkdir(parents=True, exist_ok=True) | |
| os.environ["HF_HOME"] = str(HF_HOME) | |
| os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") | |
| def read_int_env(name: str, default: int) -> int: | |
| raw = os.environ.get(name) | |
| if raw is None: | |
| return default | |
| try: | |
| return int(raw) | |
| except ValueError: | |
| return default | |
| def clamp(value: int, low: int, high: int) -> int: | |
| return max(low, min(value, high)) | |
| ZERO_GPU_SIZE = ( | |
| os.environ.get("SYNLAYERS_ZERO_GPU_SIZE", "large").strip() or "large" | |
| ).lower() | |
| # ZeroGPU duration has a hard upper limit. 120s is usually the safe maximum. | |
| ZERO_GPU_DURATION = clamp( | |
| read_int_env("SYNLAYERS_ZERO_GPU_DURATION", 330), | |
| 60, | |
| 360, | |
| ) | |
| MODEL_PREFETCH_STATUS = { | |
| "enabled": os.environ.get("SYNLAYERS_DISABLE_PREFETCH", "0") != "1", | |
| "bbox_model": str(DEFAULT_BBOX_MODEL), | |
| "main_model": str(os.environ.get("SYNLAYERS_MODEL_REPO") or DEFAULT_MODEL_REPO_ID), | |
| "bbox_done": False, | |
| "main_done": False, | |
| "error": "", | |
| } | |
| def is_hf_repo_id(path_or_repo: str | Path | None) -> bool: | |
| if path_or_repo is None: | |
| return False | |
| value = str(path_or_repo) | |
| if not value: | |
| return False | |
| # Local path. | |
| if value.startswith("/") or value.startswith("./") or value.startswith("../"): | |
| return False | |
| # HF repo id usually looks like "namespace/repo". | |
| return "/" in value and not Path(value).exists() | |
| def prefetch_one_model(repo_id_or_path: str | Path | None, label: str) -> bool: | |
| if snapshot_download is None: | |
| MODEL_PREFETCH_STATUS["error"] += ( | |
| f"\n- Cannot prefetch {label}: huggingface_hub.snapshot_download is unavailable." | |
| ) | |
| return False | |
| if not is_hf_repo_id(repo_id_or_path): | |
| return True | |
| repo_id = str(repo_id_or_path) | |
| try: | |
| snapshot_download( | |
| repo_id=repo_id, | |
| local_files_only=False, | |
| resume_download=True, | |
| allow_patterns=[ | |
| "config.json", | |
| "generation_config.json", | |
| "preprocessor_config.json", | |
| "processor_config.json", | |
| "tokenizer.json", | |
| "tokenizer_config.json", | |
| "special_tokens_map.json", | |
| "merges.txt", | |
| "vocab.json", | |
| "*.py", | |
| "*.json", | |
| "*.safetensors", | |
| "*.safetensors.index.json", | |
| "*.bin", | |
| "*.pt", | |
| ], | |
| ignore_patterns=[ | |
| ".git/*", | |
| "*.md", | |
| "*.txt", | |
| "*.png", | |
| "*.jpg", | |
| "*.jpeg", | |
| "*.webp", | |
| "*.mp4", | |
| "*.zip", | |
| "*.tar", | |
| "*.tar.gz", | |
| ], | |
| ) | |
| return True | |
| except Exception as exc: | |
| MODEL_PREFETCH_STATUS["error"] += f"\n- Failed to prefetch {label} `{repo_id}`: {exc}" | |
| return False | |
| def prefetch_model_assets() -> None: | |
| """ | |
| Download model files before the ZeroGPU function is called. | |
| This does not instantiate the models. It only ensures files are already in | |
| the Hugging Face cache, so download time is not counted inside @spaces.GPU. | |
| If the actual model construction in run_real_world_pipeline() is still slow, | |
| the next step is to refactor demo/real_world_pipeline.py to cache model | |
| objects globally. | |
| """ | |
| if not MODEL_PREFETCH_STATUS["enabled"]: | |
| return | |
| bbox_ok = prefetch_one_model(DEFAULT_BBOX_MODEL, "bbox model") | |
| main_model = os.environ.get("SYNLAYERS_MODEL_REPO") or DEFAULT_MODEL_REPO_ID | |
| main_ok = prefetch_one_model(main_model, "main model") | |
| MODEL_PREFETCH_STATUS["bbox_done"] = bool(bbox_ok) | |
| MODEL_PREFETCH_STATUS["main_done"] = bool(main_ok) | |
| # Run prefetch during Space startup, outside the ZeroGPU-decorated function. | |
| prefetch_model_assets() | |
| def list_example_images(limit: int = 6) -> list[list[str]]: | |
| if not DEFAULT_EXAMPLE_DIR.exists(): | |
| return [] | |
| candidates = [] | |
| for ext in ("*.png", "*.jpg", "*.jpeg", "*.webp"): | |
| candidates.extend(DEFAULT_EXAMPLE_DIR.glob(ext)) | |
| candidates = sorted(candidates)[:limit] | |
| return [[str(path)] for path in candidates] | |
| def build_gallery(result: dict) -> list[tuple[str, str]]: | |
| gallery: list[tuple[str, str]] = [] | |
| if result.get("whole_image_rgba"): | |
| gallery.append((result["whole_image_rgba"], "Whole RGBA")) | |
| if result.get("background_rgba"): | |
| gallery.append((result["background_rgba"], "Background RGBA")) | |
| for idx, path in enumerate(result.get("layer_images", [])): | |
| gallery.append((path, f"Layer {idx}")) | |
| return gallery | |
| def get_gpu_name() -> str: | |
| if not torch.cuda.is_available(): | |
| return "None" | |
| try: | |
| return torch.cuda.get_device_name(torch.cuda.current_device()) | |
| except Exception as exc: | |
| return f"Unavailable ({exc})" | |
| def is_zero_gpu_space() -> bool: | |
| accelerator = os.environ.get("ACCELERATOR", "").lower() | |
| return ( | |
| os.environ.get("ZEROGPU_V2", "").lower() == "true" | |
| or os.environ.get("ZERO_GPU_PATCH_TORCH_DEVICE") == "1" | |
| or accelerator == "zerogpu" | |
| or accelerator.startswith("zero") | |
| ) | |
| def get_runtime_status_markdown() -> str: | |
| accelerator = os.environ.get("ACCELERATOR", "unknown") | |
| space_id = os.environ.get("SPACE_ID", "local") | |
| model_repo = os.environ.get("SYNLAYERS_MODEL_REPO") or DEFAULT_MODEL_REPO_ID | |
| zero_gpu_enabled = is_zero_gpu_space() | |
| lines = [ | |
| "## Runtime Status", | |
| f"- `SPACE_ID`: `{space_id}`", | |
| f"- `ACCELERATOR`: `{accelerator}`", | |
| f"- `HF_HOME`: `{os.environ.get('HF_HOME', '')}`", | |
| f"- `SYNLAYERS_MODEL_REPO`: `{model_repo}`", | |
| "", | |
| "## Model Asset Prefetch", | |
| f"- `Prefetch enabled`: `{MODEL_PREFETCH_STATUS['enabled']}`", | |
| f"- `BBox model`: `{MODEL_PREFETCH_STATUS['bbox_model']}`", | |
| f"- `Main model`: `{MODEL_PREFETCH_STATUS['main_model']}`", | |
| f"- `BBox model files prefetched`: `{MODEL_PREFETCH_STATUS['bbox_done']}`", | |
| f"- `Main model files prefetched`: `{MODEL_PREFETCH_STATUS['main_done']}`", | |
| ] | |
| if MODEL_PREFETCH_STATUS["error"]: | |
| lines.extend( | |
| [ | |
| "", | |
| "### Prefetch Warnings", | |
| MODEL_PREFETCH_STATUS["error"], | |
| ] | |
| ) | |
| lines.append("") | |
| if zero_gpu_enabled: | |
| lines.extend( | |
| [ | |
| "## ZeroGPU", | |
| f"- `ZeroGPU mode`: `True`", | |
| f"- `Requested GPU size`: `{ZERO_GPU_SIZE}`", | |
| f"- `Requested max duration`: `{ZERO_GPU_DURATION}` seconds", | |
| f"- `CUDA probe outside @spaces.GPU`: `{torch.cuda.is_available()}`", | |
| "", | |
| "This Space is configured for Hugging Face ZeroGPU.", | |
| "A shared GPU is requested on demand when you click `Run Full Pipeline`.", | |
| "Model files are prefetched during Space startup, before the ZeroGPU function is called.", | |
| "If the first request still times out, the remaining bottleneck is model construction inside `run_real_world_pipeline()`.", | |
| ] | |
| ) | |
| else: | |
| cuda_available = torch.cuda.is_available() | |
| lines.extend( | |
| [ | |
| "## CUDA", | |
| f"- `CUDA available`: `{cuda_available}`", | |
| f"- `GPU device`: `{get_gpu_name()}`", | |
| "", | |
| ] | |
| ) | |
| if accelerator == "none" or not cuda_available: | |
| lines.extend( | |
| [ | |
| "This Space is not currently running with a usable CUDA GPU.", | |
| "The GPU type must be chosen by the Space owner in Hugging Face `Settings -> Hardware`.", | |
| "Visitors cannot switch GPUs from inside the Gradio app.", | |
| ] | |
| ) | |
| else: | |
| lines.append("The CUDA runtime is available and the full SynLayers pipeline can run here.") | |
| return "\n".join(lines) | |
| def run_demo_inference( | |
| image_path: str, | |
| sample_name: str, | |
| max_new_tokens: int, | |
| seed_value: float, | |
| ) -> dict: | |
| seed = int(seed_value) if seed_value >= 0 else None | |
| return run_real_world_pipeline( | |
| image_path=image_path, | |
| sample_name=sample_name or None, | |
| work_dir=DEFAULT_WORK_DIR, | |
| bbox_model=DEFAULT_BBOX_MODEL, | |
| config_path=DEFAULT_REAL_CONFIG_PATH, | |
| max_new_tokens=int(max_new_tokens), | |
| seed=seed, | |
| run_name=DEFAULT_RUN_NAME, | |
| ) | |
| def run_demo( | |
| image_path: str, | |
| sample_name: str, | |
| max_new_tokens: int, | |
| seed_value: float, | |
| ): | |
| if not image_path: | |
| raise gr.Error("Please upload an input image first.") | |
| try: | |
| result = run_demo_inference( | |
| image_path=image_path, | |
| sample_name=sample_name, | |
| max_new_tokens=max_new_tokens, | |
| seed_value=seed_value, | |
| ) | |
| except Exception as exc: | |
| raise gr.Error(str(exc)) from exc | |
| return ( | |
| result["bbox_visualization"], | |
| result["merged_image"], | |
| result["bbox_record"].get("whole_caption", ""), | |
| result["bbox_record"], | |
| result["metadata"], | |
| build_gallery(result), | |
| result["archive_path"], | |
| result["case_dir"], | |
| ) | |
| with gr.Blocks(title="SynLayers Real-World Demo") as demo: | |
| gr.Markdown( | |
| """ | |
| # SynLayers Real-World Decomposition | |
| Upload a single image and run the full pipeline in one step: | |
| 1. VLM for whole-caption + bounding-box detection | |
| 2. SynLayers real-image layer decomposition | |
| This Space can run either on a dedicated GPU Space or on Hugging Face ZeroGPU. | |
| The first request may still take time while Python modules and model objects are initialized. | |
| Model files are prefetched during Space startup to avoid downloading large weights inside the ZeroGPU function. | |
| """ | |
| ) | |
| runtime_status = gr.Markdown(get_runtime_status_markdown()) | |
| refresh_status_button = gr.Button("Refresh Runtime Status") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input = gr.Image(type="filepath", label="Input Image") | |
| sample_name_input = gr.Textbox( | |
| label="Optional Sample Name", | |
| placeholder="Leave empty to use the uploaded filename", | |
| ) | |
| max_new_tokens_input = gr.Slider( | |
| minimum=128, | |
| maximum=2048, | |
| value=1024, | |
| step=64, | |
| label="VLM Max New Tokens", | |
| ) | |
| seed_input = gr.Number( | |
| value=42, | |
| precision=0, | |
| label="Seed (-1 keeps config default)", | |
| ) | |
| run_button = gr.Button("Run Full Pipeline", variant="primary") | |
| with gr.Column(scale=1): | |
| bbox_vis_output = gr.Image(type="filepath", label="Detected Bounding Boxes") | |
| merged_output = gr.Image(type="filepath", label="Merged Decomposition") | |
| caption_output = gr.Textbox(label="Whole Caption", lines=6) | |
| with gr.Row(): | |
| bbox_json_output = gr.JSON(label="BBox JSON") | |
| meta_json_output = gr.JSON(label="Inference Metadata") | |
| layer_gallery = gr.Gallery(label="Predicted Layers", columns=4, height="auto") | |
| with gr.Row(): | |
| archive_output = gr.File(label="Download Result Bundle") | |
| case_dir_output = gr.Textbox(label="Saved Case Directory") | |
| examples = list_example_images() | |
| if examples: | |
| gr.Examples(examples=examples, inputs=[image_input], label="Example Images") | |
| refresh_status_button.click( | |
| fn=get_runtime_status_markdown, | |
| outputs=runtime_status, | |
| ) | |
| run_button.click( | |
| fn=run_demo, | |
| inputs=[ | |
| image_input, | |
| sample_name_input, | |
| max_new_tokens_input, | |
| seed_input, | |
| ], | |
| outputs=[ | |
| bbox_vis_output, | |
| merged_output, | |
| caption_output, | |
| bbox_json_output, | |
| meta_json_output, | |
| layer_gallery, | |
| archive_output, | |
| case_dir_output, | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", "7860")), | |
| ) |