OzzyGT's picture
OzzyGT HF Staff
initial commit
88c28fc
from typing import List, Union
import av
import numpy as np
import torch
from diffusers.modular_pipelines import (
ComponentSpec,
InputParam,
ModularPipelineBlocks,
OutputParam,
PipelineState,
)
from matplotlib import colormaps
from PIL import Image
from transformers import DepthProForDepthEstimation, DepthProImageProcessor
TURBO_CMAP = colormaps["turbo"]
def save_video(frames: List[Image.Image], fps: float, output_path: str) -> None:
"""Save a list of PIL Image frames as an MP4 video."""
container = av.open(output_path, mode="w")
stream = container.add_stream("libx264", rate=int(fps))
stream.pix_fmt = "yuv420p"
stream.width = frames[0].width
stream.height = frames[0].height
for frame in frames:
video_frame = av.VideoFrame.from_image(frame)
for packet in stream.encode(video_frame):
container.mux(packet)
for packet in stream.encode():
container.mux(packet)
container.close()
class DepthProEstimatorBlock(ModularPipelineBlocks):
_requirements = {
"transformers": ">=5.1.0",
"torch": ">=2.9.0",
"torchvision": ">=0.16.0",
"av": ">=12.0.0",
"matplotlib": ">=3.7.0",
}
@property
def expected_components(self) -> List[ComponentSpec]:
return [
ComponentSpec(
name="depth_estimator",
type_hint=DepthProForDepthEstimation,
pretrained_model_name_or_path="apple/DepthPro-hf",
),
ComponentSpec(
name="depth_estimator_processor",
type_hint=DepthProImageProcessor,
pretrained_model_name_or_path="apple/DepthPro-hf",
),
]
@property
def inputs(self) -> List[InputParam]:
return [
InputParam(
"image",
type_hint=Union[Image.Image, List[Image.Image]],
required=False,
description="Image(s) to estimate depth for",
),
InputParam(
"video_path",
type_hint=str,
required=False,
description="Path to input video file. When provided, image is ignored.",
),
InputParam(
"output_type",
type_hint=str,
default="depth_image",
description="Output type: 'depth_image', 'depth_tensor', or 'depth_and_fov'",
),
InputParam(
"colormap",
type_hint=str,
default="grayscale",
description="Depth visualization format: 'grayscale' or 'turbo' (colormapped)",
),
]
@property
def intermediate_outputs(self) -> List[OutputParam]:
return [
OutputParam(
"depth_image",
type_hint=Image.Image,
description="Normalized depth map as a grayscale PIL image (single image mode)",
),
OutputParam(
"predicted_depth",
type_hint=torch.Tensor,
description="Raw metric depth tensor (H x W) (single image mode)",
),
OutputParam(
"field_of_view",
type_hint=float,
description="Estimated horizontal field of view (single image mode)",
),
OutputParam(
"focal_length",
type_hint=float,
description="Estimated focal length (single image mode)",
),
OutputParam(
"depth_frames",
type_hint=list,
description="List of per-frame depth PIL images (video mode)",
),
OutputParam(
"fps",
type_hint=float,
description="Source video frame rate (video mode)",
),
]
def _estimate_depth(self, image: Image.Image, processor, model) -> np.ndarray:
inputs = processor(images=[image], return_tensors="pt").to(model.device)
outputs = model(**inputs)
post_processed = processor.post_process_depth_estimation(
outputs, target_sizes=[(image.height, image.width)]
)
return post_processed[0]
def _normalize_depth(self, depth: np.ndarray) -> np.ndarray:
inverse_depth = 1.0 / np.clip(depth, 0.1, 250.0)
inv_min = inverse_depth.min()
inv_max = inverse_depth.max()
return (inverse_depth - inv_min) / (inv_max - inv_min + 1e-8)
def _apply_colormap(self, normalized: np.ndarray, mode: str) -> np.ndarray:
if mode == "turbo":
colored = (TURBO_CMAP(normalized)[..., :3] * 255).astype(np.uint8)
return colored
return (normalized * 255.0).astype(np.uint8)
def _process_video(self, video_path, processor, model, colormap):
input_container = av.open(video_path)
video_stream = input_container.streams.video[0]
fps = video_stream.average_rate
depth_frames = []
for frame in input_container.decode(video=0):
pil_image = frame.to_image().convert("RGB")
result = self._estimate_depth(pil_image, processor, model)
depth_np = result["predicted_depth"].float().cpu().numpy()
normalized = self._normalize_depth(depth_np)
colored = self._apply_colormap(normalized, colormap)
if colormap == "turbo":
depth_frame = Image.fromarray(colored, mode="RGB")
else:
depth_frame = Image.fromarray(colored, mode="L")
depth_frames.append(depth_frame)
input_container.close()
return depth_frames, fps
@torch.no_grad()
def __call__(self, components, state: PipelineState) -> PipelineState:
block_state = self.get_block_state(state)
processor = components.depth_estimator_processor
model = components.depth_estimator
video_path = getattr(block_state, "video_path", None)
if video_path:
depth_frames, fps = self._process_video(
video_path, processor, model, block_state.colormap
)
block_state.depth_frames = depth_frames
block_state.fps = float(fps)
block_state.depth_image = None
block_state.predicted_depth = None
block_state.field_of_view = None
block_state.focal_length = None
else:
image = block_state.image
if not isinstance(image, list):
image = [image]
result = self._estimate_depth(image[0], processor, model)
predicted_depth = result["predicted_depth"]
block_state.predicted_depth = predicted_depth
block_state.field_of_view = result.get("field_of_view")
block_state.focal_length = result.get("focal_length")
depth_np = predicted_depth.float().cpu().numpy()
normalized = self._normalize_depth(depth_np)
colored = self._apply_colormap(normalized, block_state.colormap)
if block_state.colormap == "turbo":
block_state.depth_image = Image.fromarray(colored, mode="RGB")
else:
block_state.depth_image = Image.fromarray(colored, mode="L")
block_state.depth_frames = None
block_state.fps = None
self.set_block_state(state, block_state)
return components, state