| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| import logging |
| from typing import Dict, Optional, Union |
|
|
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from diffusers import ( |
| AutoencoderKL, |
| DDIMScheduler, |
| DiffusionPipeline, |
| LCMScheduler, |
| UNet2DConditionModel, |
| ) |
| from diffusers.utils import BaseOutput |
| from PIL import Image |
| from torch.utils.data import DataLoader, TensorDataset |
| from torchvision.transforms import InterpolationMode |
| from torchvision.transforms.functional import pil_to_tensor, resize |
| from tqdm.auto import tqdm |
| from transformers import CLIPTextModel, CLIPTokenizer |
|
|
| from .util.batchsize import find_batch_size |
| from .util.ensemble import ensemble_depth |
| from .util.image_util import ( |
| chw2hwc, |
| colorize_depth_maps, |
| get_tv_resample_method, |
| resize_max_res, |
| ) |
| from DA2.depth_anything_v2.dpt import DepthAnythingV2 |
|
|
| class MarigoldDepthOutput(BaseOutput): |
| """ |
| Output class for Marigold monocular depth prediction pipeline. |
| |
| Args: |
| depth_np (`np.ndarray`): |
| Predicted depth map, with depth values in the range of [0, 1]. |
| depth_colored (`PIL.Image.Image`): |
| Colorized depth map, with the shape of [3, H, W] and values in [0, 1]. |
| uncertainty (`None` or `np.ndarray`): |
| Uncalibrated uncertainty(MAD, median absolute deviation) coming from ensembling. |
| """ |
|
|
| depth_np: np.ndarray |
| depth_colored: Union[None, Image.Image] |
| uncertainty: Union[None, np.ndarray] |
|
|
|
|
| class MarigoldPipeline(DiffusionPipeline): |
| """ |
| Pipeline for monocular depth estimation using Marigold: https://marigoldmonodepth.github.io. |
| |
| This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
| library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) |
| |
| Args: |
| unet (`UNet2DConditionModel`): |
| Conditional U-Net to denoise the depth latent, conditioned on image latent. |
| vae (`AutoencoderKL`): |
| Variational Auto-Encoder (VAE) Model to encode and decode images and depth maps |
| to and from latent representations. |
| scheduler (`DDIMScheduler`): |
| A scheduler to be used in combination with `unet` to denoise the encoded image latents. |
| text_encoder (`CLIPTextModel`): |
| Text-encoder, for empty text embedding. |
| tokenizer (`CLIPTokenizer`): |
| CLIP tokenizer. |
| scale_invariant (`bool`, *optional*): |
| A model property specifying whether the predicted depth maps are scale-invariant. This value must be set in |
| the model config. When used together with the `shift_invariant=True` flag, the model is also called |
| "affine-invariant". NB: overriding this value is not supported. |
| shift_invariant (`bool`, *optional*): |
| A model property specifying whether the predicted depth maps are shift-invariant. This value must be set in |
| the model config. When used together with the `scale_invariant=True` flag, the model is also called |
| "affine-invariant". NB: overriding this value is not supported. |
| default_denoising_steps (`int`, *optional*): |
| The minimum number of denoising diffusion steps that are required to produce a prediction of reasonable |
| quality with the given model. This value must be set in the model config. When the pipeline is called |
| without explicitly setting `num_inference_steps`, the default value is used. This is required to ensure |
| reasonable results with various model flavors compatible with the pipeline, such as those relying on very |
| short denoising schedules (`LCMScheduler`) and those with full diffusion schedules (`DDIMScheduler`). |
| default_processing_resolution (`int`, *optional*): |
| The recommended value of the `processing_resolution` parameter of the pipeline. This value must be set in |
| the model config. When the pipeline is called without explicitly setting `processing_resolution`, the |
| default value is used. This is required to ensure reasonable results with various model flavors trained |
| with varying optimal processing resolution values. |
| """ |
|
|
| rgb_latent_scale_factor = 0.18215 |
| depth_latent_scale_factor = 0.18215 |
|
|
| def __init__( |
| self, |
| unet: UNet2DConditionModel, |
| vae: AutoencoderKL, |
| text_encoder: CLIPTextModel, |
| tokenizer: CLIPTokenizer, |
| scale_invariant: Optional[bool] = True, |
| shift_invariant: Optional[bool] = True, |
| default_denoising_steps: Optional[int] = None, |
| default_processing_resolution: Optional[int] = None, |
| ): |
| super().__init__() |
| self.register_modules( |
| unet=unet, |
| vae=vae, |
| text_encoder=text_encoder, |
| tokenizer=tokenizer, |
| ) |
| self.register_to_config( |
| scale_invariant=scale_invariant, |
| shift_invariant=shift_invariant, |
| default_denoising_steps=default_denoising_steps, |
| default_processing_resolution=default_processing_resolution, |
| ) |
|
|
| self.scale_invariant = scale_invariant |
| self.shift_invariant = shift_invariant |
| self.default_denoising_steps = default_denoising_steps |
| self.default_processing_resolution = default_processing_resolution |
|
|
| self.empty_text_embed = None |
|
|
| self._fft_masks = {} |
| da2_config = { |
| 'encoder': 'vitb', |
| 'features': 128, |
| 'out_channels': [96, 192, 384, 768], |
| } |
| |
| |
| if da2_config is not None: |
| self.da2 = DepthAnythingV2(**da2_config) |
| self.da2.load_state_dict(torch.load(f'./DA2/checkpoints/depth_anything_v2_{da2_config["encoder"]}.pth', map_location='cpu')) |
| self.da2.to(device="cpu").eval() |
| else: |
| self.da2 = None |
|
|
|
|
| @torch.no_grad() |
| def __call__( |
| self, |
| input_image: Union[Image.Image, torch.Tensor], |
| denoising_steps: Optional[int] = None, |
| ensemble_size: int = 1, |
| processing_res: Optional[int] = None, |
| match_input_res: bool = True, |
| resample_method: str = "bilinear", |
| batch_size: int = 0, |
| color_map: str = "Spectral", |
| show_progress_bar: bool = True, |
| ensemble_kwargs: Dict = None, |
| ) -> MarigoldDepthOutput: |
| """ |
| Function invoked when calling the pipeline. |
| |
| Args: |
| input_image (`Image`): |
| Input RGB (or gray-scale) image. |
| denoising_steps (`int`, *optional*, defaults to `None`): |
| Number of denoising diffusion steps during inference. The default value `None` results in automatic |
| selection. The number of steps should be at least 10 with the full Marigold models, and between 1 and 4 |
| for Marigold-LCM models. |
| ensemble_size (`int`, *optional*, defaults to `10`): |
| Number of predictions to be ensembled. |
| processing_res (`int`, *optional*, defaults to `None`): |
| Effective processing resolution. When set to `0`, processes at the original image resolution. This |
| produces crisper predictions, but may also lead to the overall loss of global context. The default |
| value `None` resolves to the optimal value from the model config. |
| match_input_res (`bool`, *optional*, defaults to `True`): |
| Resize depth prediction to match input resolution. |
| Only valid if `processing_res` > 0. |
| resample_method: (`str`, *optional*, defaults to `bilinear`): |
| Resampling method used to resize images and depth predictions. This can be one of `bilinear`, `bicubic` or `nearest`, defaults to: `bilinear`. |
| batch_size (`int`, *optional*, defaults to `0`): |
| Inference batch size, no bigger than `num_ensemble`. |
| If set to 0, the script will automatically decide the proper batch size. |
| generator (`torch.Generator`, *optional*, defaults to `None`) |
| Random generator for initial noise generation. |
| show_progress_bar (`bool`, *optional*, defaults to `True`): |
| Display a progress bar of diffusion denoising. |
| color_map (`str`, *optional*, defaults to `"Spectral"`, pass `None` to skip colorized depth map generation): |
| Colormap used to colorize the depth map. |
| scale_invariant (`str`, *optional*, defaults to `True`): |
| Flag of scale-invariant prediction, if True, scale will be adjusted from the raw prediction. |
| shift_invariant (`str`, *optional*, defaults to `True`): |
| Flag of shift-invariant prediction, if True, shift will be adjusted from the raw prediction, if False, near plane will be fixed at 0m. |
| ensemble_kwargs (`dict`, *optional*, defaults to `None`): |
| Arguments for detailed ensembling settings. |
| Returns: |
| `MarigoldDepthOutput`: Output class for Marigold monocular depth prediction pipeline, including: |
| - **depth_np** (`np.ndarray`) Predicted depth map, with depth values in the range of [0, 1] |
| - **depth_colored** (`PIL.Image.Image`) Colorized depth map, with the shape of [3, H, W] and values in [0, 1], None if `color_map` is `None` |
| - **uncertainty** (`None` or `np.ndarray`) Uncalibrated uncertainty(MAD, median absolute deviation) |
| coming from ensembling. None if `ensemble_size = 1` |
| """ |
| |
| if processing_res is None: |
| processing_res = self.default_processing_resolution |
|
|
| assert processing_res >= 0 |
|
|
| |
| |
|
|
| resample_method: InterpolationMode = get_tv_resample_method(resample_method) |
|
|
| |
| |
| if isinstance(input_image, Image.Image): |
| input_image = input_image.convert("RGB") |
| |
| rgb = pil_to_tensor(input_image) |
| rgb = rgb.unsqueeze(0) |
| elif isinstance(input_image, torch.Tensor): |
| rgb = input_image |
| else: |
| raise TypeError(f"Unknown input type: {type(input_image) = }") |
| input_size = rgb.shape |
| assert ( |
| 4 == rgb.dim() and 3 == input_size[-3] |
| ), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" |
|
|
| |
| if processing_res > 0: |
| rgb = resize_max_res( |
| rgb, |
| max_edge_resolution=processing_res, |
| resample_method=resample_method, |
| ) |
|
|
| |
| rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 |
| rgb_norm = rgb_norm.to(self.dtype) |
| assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 |
|
|
| |
| |
| duplicated_rgb = rgb_norm.expand(1, -1, -1, -1) |
| single_rgb_dataset = TensorDataset(duplicated_rgb) |
| if batch_size > 0: |
| _bs = batch_size |
| else: |
| _bs = 1 |
|
|
| single_rgb_loader = DataLoader( |
| single_rgb_dataset, batch_size=_bs, shuffle=False |
| ) |
|
|
| |
| depth_pred_ls = [] |
| if show_progress_bar: |
| iterable = tqdm( |
| single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False |
| ) |
| else: |
| iterable = single_rgb_loader |
| for batch in iterable: |
| (batched_img,) = batch |
| depth_pred_raw = self.single_infer( |
| rgb_in=batched_img, |
| ) |
| depth_pred_ls.append(depth_pred_raw.detach()) |
| depth_preds = torch.concat(depth_pred_ls, dim=0) |
| torch.cuda.empty_cache() |
|
|
| |
| if ensemble_size > 1: |
| depth_pred, pred_uncert = ensemble_depth( |
| depth_preds, |
| scale_invariant=self.scale_invariant, |
| shift_invariant=self.shift_invariant, |
| max_res=50, |
| **(ensemble_kwargs or {}), |
| ) |
| else: |
| depth_pred = depth_preds |
| pred_uncert = None |
|
|
| |
| if match_input_res: |
| depth_pred = resize( |
| depth_pred, |
| input_size[-2:], |
| interpolation=resample_method, |
| antialias=True, |
| ) |
|
|
| |
| depth_pred = depth_pred.squeeze() |
| depth_pred = depth_pred.cpu().numpy() |
| if pred_uncert is not None: |
| pred_uncert = pred_uncert.squeeze().cpu().numpy() |
|
|
| |
| depth_pred = depth_pred.clip(0, 1) |
|
|
| |
| if color_map is not None: |
| depth_colored = colorize_depth_maps( |
| depth_pred, 0, 1, cmap=color_map |
| ).squeeze() |
| depth_colored = (depth_colored * 255).astype(np.uint8) |
| depth_colored_hwc = chw2hwc(depth_colored) |
| depth_colored_img = Image.fromarray(depth_colored_hwc) |
| else: |
| depth_colored_img = None |
|
|
| return MarigoldDepthOutput( |
| depth_np=depth_pred, |
| depth_colored=depth_colored_img, |
| uncertainty=pred_uncert, |
| ) |
|
|
| def _check_inference_step(self, n_step: int) -> None: |
| """ |
| Check if denoising step is reasonable |
| Args: |
| n_step (`int`): denoising steps |
| """ |
| assert n_step >= 1 |
|
|
| if isinstance(self.scheduler, DDIMScheduler): |
| if n_step < 10: |
| logging.warning( |
| f"Too few denoising steps: {n_step}. Recommended to use the LCM checkpoint for few-step inference." |
| ) |
| elif isinstance(self.scheduler, LCMScheduler): |
| if not 1 <= n_step <= 4: |
| logging.warning( |
| f"Non-optimal setting of denoising steps: {n_step}. Recommended setting is 1-4 steps." |
| ) |
| else: |
| raise RuntimeError(f"Unsupported scheduler type: {type(self.scheduler)}") |
|
|
| def encode_empty_text(self): |
| """ |
| Encode text embedding for empty prompt |
| """ |
| prompt = "" |
| text_inputs = self.tokenizer( |
| prompt, |
| padding="do_not_pad", |
| max_length=self.tokenizer.model_max_length, |
| truncation=True, |
| return_tensors="pt", |
| ) |
| text_input_ids = text_inputs.input_ids.to(self.text_encoder.device) |
| self.empty_text_embed = self.text_encoder(text_input_ids)[0].to(self.dtype) |
|
|
| @torch.no_grad() |
| def single_infer( |
| self, |
| rgb_in: torch.Tensor, |
| ) -> torch.Tensor: |
| """ |
| Perform an individual depth prediction without ensembling. |
| |
| Args: |
| rgb_in (`torch.Tensor`): |
| Input RGB image. |
| num_inference_steps (`int`): |
| Number of diffusion denoisign steps (DDIM) during inference. |
| show_pbar (`bool`): |
| Display a progress bar of diffusion denoising. |
| generator (`torch.Generator`) |
| Random generator for initial noise generation. |
| Returns: |
| `torch.Tensor`: Predicted depth map. |
| """ |
| device = self.device |
| rgb_in = rgb_in.to(device) |
| depth_da2 = self.da2.infer_batch(rgb_in).to(device) |
|
|
| with torch.no_grad(): |
| |
| rgb_latent = self.encode_rgb(rgb_in) |
| depth_da2_latent = self.encode_rgb(depth_da2) |
|
|
| |
| if self.empty_text_embed is None: |
| self.encode_empty_text() |
| batch_empty_text_embed = self.empty_text_embed.repeat( |
| (rgb_latent.shape[0], 1, 1) |
| ).to(device) |
|
|
| |
| unet_input = torch.cat( |
| [depth_da2_latent, rgb_latent],dim=1 |
| ) |
|
|
| depth_latent = self.unet( |
| unet_input, 1, encoder_hidden_states=batch_empty_text_embed |
| ).sample |
|
|
| depth = self.decode_depth(depth_latent) |
|
|
| |
| depth = torch.clip(depth, -1.0, 1.0) |
| |
| depth = (depth + 1.0) / 2.0 |
|
|
| return depth |
|
|
| def encode_rgb(self, rgb_in: torch.Tensor) -> torch.Tensor: |
| """ |
| Encode RGB image into latent. |
| |
| Args: |
| rgb_in (`torch.Tensor`): |
| Input RGB image to be encoded. |
| |
| Returns: |
| `torch.Tensor`: Image latent. |
| """ |
| |
| h = self.vae.encoder(rgb_in) |
| moments = self.vae.quant_conv(h) |
| mean, logvar = torch.chunk(moments, 2, dim=1) |
| |
| rgb_latent = mean * self.rgb_latent_scale_factor |
| return rgb_latent |
|
|
| def decode_depth(self, depth_latent: torch.Tensor) -> torch.Tensor: |
| """ |
| Decode depth latent into depth map. |
| |
| Args: |
| depth_latent (`torch.Tensor`): |
| Depth latent to be decoded. |
| |
| Returns: |
| `torch.Tensor`: Decoded depth map. |
| """ |
| |
| depth_latent = depth_latent / self.depth_latent_scale_factor |
| |
| z = self.vae.post_quant_conv(depth_latent) |
| stacked = self.vae.decoder(z) |
| |
| depth_mean = stacked.mean(dim=1, keepdim=True) |
| return depth_mean |
|
|