| """
|
| processing_prismatic.py
|
|
|
| HuggingFace-style preprocessor definitions for Prismatic VLMs, inheriting from `ProcessorMixin`. Default configuration
|
| specifies `siglip-224px+7b`.
|
| """
|
|
|
| from typing import Any, ClassVar, List, Optional, Tuple, Union
|
|
|
| import timm.data
|
| import torch
|
| import torchvision.transforms.functional as TVF
|
| from PIL import Image
|
| from torchvision.transforms import CenterCrop, Compose, Normalize, Resize, ToTensor
|
| from transformers import PreTrainedTokenizerBase
|
| from transformers.image_processing_utils import BatchFeature, ImageProcessingMixin
|
| from transformers.processing_utils import ProcessorMixin
|
| from transformers.tokenization_utils import PaddingStrategy, PreTokenizedInput, TextInput, TruncationStrategy
|
| from transformers.utils import TensorType
|
|
|
|
|
|
|
| def letterbox_pad_transform(image: Image.Image, padding_fill_value: Tuple[int, int, int]) -> Image.Image:
|
| """Given a PIL.Image, pad to square by adding a symmetric border around the height/width."""
|
| (w, h), max_wh = image.size, max(image.size)
|
| horizontal_pad, vertical_pad = int((max_wh - w) / 2), int((max_wh - h) / 2)
|
| padding = (horizontal_pad, vertical_pad, horizontal_pad, vertical_pad)
|
|
|
| return TVF.pad(image, padding, fill=padding_fill_value, padding_mode="constant")
|
|
|
|
|
| class PrismaticImageProcessor(ImageProcessingMixin):
|
| model_input_names: ClassVar[List[str]] = ["pixel_values"]
|
|
|
| def __init__(
|
| self,
|
| use_fused_vision_backbone: bool = False,
|
| image_resize_strategy: str = "letterbox",
|
| input_sizes: Optional[List[Tuple[int, int, int]]] = None,
|
| interpolations: Optional[List[str]] = None,
|
| means: Optional[List[Tuple[float, float, float]]] = None,
|
| stds: Optional[List[Tuple[float, float, float]]] = None,
|
| **kwargs: str,
|
| ) -> None:
|
| """
|
| Initialize a PrismaticImageProcessor as a wrapper around a torchvision transform; this transform will be
|
| created by TIMM, and edited to follow our custom `image_resize_strategy` logic.
|
| @param use_fused_vision_backbone: Boolean indicating single or fused (dual) vision backbone
|
| @param image_resize_strategy: Prismatic image resize strategy in < resize-naive | resize-crop | letterbox >
|
| @param input_size: [TIMM :: `data_cfg`] Input image size as tuple (channels, width, height)
|
| @param interpolation: [TIMM :: `data_cfg`] Interpolation as string (default: "bicubic")
|
| @param mean: [TIMM :: `data_cfg`] Normalization mean as float tuple (or two-tuple if `fused_backbone`)
|
| @param std: [TIMM :: `data_cfg`] Normalization std as float tuple (or two-tuple if `fused_backbone`)
|
| """
|
| self.use_fused_vision_backbone = use_fused_vision_backbone
|
| self.image_resize_strategy = image_resize_strategy
|
|
|
|
|
| input_sizes = [(3, 224, 224)] if input_sizes is None else input_sizes
|
| means = [(0.5, 0.5, 0.5)] if means is None else means
|
| stds = [(0.5, 0.5, 0.5)] if stds is None else stds
|
|
|
|
|
| self.input_sizes, self.interpolations, self.means, self.stds = input_sizes, interpolations, means, stds
|
|
|
|
|
| self.tvf_resize_params, self.tvf_crop_params, self.tvf_normalize_params = [], [], []
|
| self.tvf_do_letterbox, self.tvf_letterbox_fill = False, None
|
|
|
| for idx in range(len(input_sizes)):
|
| transform = timm.data.create_transform(
|
| input_size=self.input_sizes[idx],
|
| interpolation=self.interpolations[idx],
|
| mean=self.means[idx],
|
| std=self.stds[idx],
|
| crop_pct=1.0,
|
| crop_mode="center",
|
| is_training=False,
|
| )
|
|
|
|
|
| if not (
|
| isinstance(transform, Compose)
|
| and (len(transform.transforms) == 4)
|
| and isinstance(transform.transforms[0], Resize)
|
| and isinstance(transform.transforms[1], CenterCrop)
|
| and isinstance(transform.transforms[2], ToTensor)
|
| and isinstance(transform.transforms[3], Normalize)
|
| and (transform.transforms[0].size == self.input_sizes[idx][-1])
|
| and (transform.transforms[1].size == self.input_sizes[idx][-2:])
|
| ):
|
| raise ValueError(f"Unexpected TIMM image transformation structure/sizes: `{transform}`")
|
|
|
|
|
|
|
| resize_t, crop_t, norm_t = transform.transforms[0], transform.transforms[1], transform.transforms[3]
|
| self.tvf_resize_params.append(
|
| {
|
| "size": resize_t.size,
|
| "interpolation": TVF.pil_modes_mapping[resize_t.interpolation],
|
| "max_size": None,
|
| "antialias": True,
|
| }
|
| )
|
| self.tvf_crop_params.append({"output_size": crop_t.size})
|
| self.tvf_normalize_params.append(
|
| {
|
| "mean": norm_t.mean.float().numpy().tolist(),
|
| "std": norm_t.std.float().numpy().tolist(),
|
| "inplace": False,
|
| }
|
| )
|
| self.tvf_do_letterbox, self.tvf_letterbox_fill = False, None
|
|
|
|
|
| if self.image_resize_strategy == "resize-naive":
|
| self.tvf_resize_params[idx]["size"] = (resize_t.size, resize_t.size)
|
| elif self.image_resize_strategy == "letterbox":
|
| self.tvf_do_letterbox, self.tvf_letterbox_fill = True, tuple([int(x * 255) for x in self.means[idx]])
|
| elif self.image_resize_strategy == "resize-crop":
|
| pass
|
| else:
|
| raise ValueError(f"Image resize strategy `{self.image_resize_strategy}` is not supported!")
|
|
|
|
|
| super().__init__(**kwargs)
|
|
|
| def apply_transform(self, img: Image.Image) -> torch.Tensor:
|
| """Apply `functional` variant of TIMM's Transform = Compose([Resize -> CenterCrop -> ToTensor -> Normalize])"""
|
| if self.tvf_do_letterbox:
|
| img = letterbox_pad_transform(img, self.tvf_letterbox_fill)
|
|
|
|
|
| imgs_t = []
|
| for idx in range(len(self.input_sizes)):
|
| img_idx = TVF.resize(img, **self.tvf_resize_params[idx])
|
| img_idx = TVF.center_crop(img_idx, **self.tvf_crop_params[idx])
|
| img_idx_t = TVF.to_tensor(img_idx)
|
| img_idx_t = TVF.normalize(img_idx_t, **self.tvf_normalize_params[idx])
|
| imgs_t.append(img_idx_t)
|
|
|
|
|
| img_t = torch.vstack(imgs_t)
|
|
|
| return img_t
|
|
|
| def preprocess(
|
| self,
|
| images: Union[Image.Image, List[Image.Image]],
|
| return_tensors: Optional[Union[str, TensorType]] = None,
|
| **_: str,
|
| ) -> BatchFeature:
|
| """
|
| Preprocess an image (or batch of images); note that unlike the `transformers :: BaseImageProcessor` we
|
| explicitly only handle PIL.Image.Image instances for simplicity.
|
| @param images: A (batch of) PIL.Image.Image instance(s) to preprocess.
|
| @param return_tensors: BatchFeature default Tensor format (e.g., "pt" for torch); if None, returns np.ndarray
|
| @return: Instance of `transformers :: BatchFeature` with a single key "pixel_values"
|
| """
|
| if not isinstance(images, list):
|
| images = [images]
|
|
|
|
|
| pixel_values = torch.stack([self.apply_transform(img.convert("RGB")) for img in images])
|
|
|
|
|
| return BatchFeature(data={"pixel_values": pixel_values.float().numpy()}, tensor_type=return_tensors)
|
|
|
| def __call__(self, images: Union[Image.Image, List[Image.Image]], **kwargs) -> BatchFeature:
|
| return self.preprocess(images, **kwargs)
|
|
|
|
|
|
|
|
|
| class PrismaticProcessor(ProcessorMixin):
|
| attributes: ClassVar[List[str]] = ["image_processor", "tokenizer"]
|
| image_processor_class: str = "AutoImageProcessor"
|
| tokenizer_class: str = "AutoTokenizer"
|
|
|
| def __init__(
|
| self,
|
| image_processor: Optional[ImageProcessingMixin] = None,
|
| tokenizer: Optional[PreTrainedTokenizerBase] = None,
|
| ) -> None:
|
| super().__init__(image_processor, tokenizer)
|
|
|
| def __call__(
|
| self,
|
| text: Union[TextInput, PreTokenizedInput, List[TextInput], List[PreTokenizedInput]],
|
| images: Union[Image.Image, List[Image.Image]],
|
| padding: Union[bool, str, PaddingStrategy] = False,
|
| truncation: Optional[Union[bool, str, TruncationStrategy]] = None,
|
| max_length: Optional[int] = None,
|
| return_tensors: Optional[Union[str, TensorType]] = TensorType.PYTORCH,
|
| ) -> BatchFeature:
|
| """
|
| Preprocess a given (batch) of text/images for a Prismatic VLM; forwards text to the underlying LLM's tokenizer,
|
| forwards images to PrismaticImageProcessor.
|
| @param text: The (batch) of text to encode; must be a string or list of strings.
|
| @param images: A (batch of) PIL.Image.Image instance(s) to preprocess.
|
| @param padding: Sequence padding strategy (if multiple specified) in < True = "longest" | "max_length" | False >
|
| @param truncation: Truncation strategy for the output sequences; requires `max_length` to be specified
|
| @param max_length: Maximum length (in tokens) to truncate
|
| @param return_tensors: Type of return tensors (usually "pt" or TensorType.PYTORCH)
|
| @return: BatchFeature with keys for `input_ids`, `attention_mask` and `pixel_values`.
|
| """
|
| pixel_values = self.image_processor(images, return_tensors=return_tensors)["pixel_values"]
|
| text_inputs = self.tokenizer(
|
| text, return_tensors=return_tensors, padding=padding, truncation=truncation, max_length=max_length
|
| )
|
|
|
|
|
| if pixel_values.shape[0] != text_inputs.input_ids.shape[0]:
|
| raise ValueError("Batch is malformed; expected same number of images and text inputs!")
|
|
|
| return BatchFeature(data={**text_inputs, "pixel_values": pixel_values})
|
|
|
|
|
| def batch_decode(
|
| self,
|
| sequences: Union[List[int], List[List[int]], torch.Tensor, Any],
|
| skip_special_tokens: bool = False,
|
| clean_up_tokenization_spaces: Optional[bool] = None,
|
| **kwargs: str,
|
| ) -> List[str]:
|
| return self.tokenizer.batch_decode(
|
| sequences=sequences,
|
| skip_special_tokens=skip_special_tokens,
|
| clean_up_tokenization_spaces=clean_up_tokenization_spaces,
|
| **kwargs,
|
| )
|
|
|
| def decode(
|
| self,
|
| token_ids: Union[int, List[int], torch.Tensor, Any],
|
| skip_special_tokens: bool = False,
|
| clean_up_tokenization_spaces: Optional[bool] = None,
|
| **kwargs: str,
|
| ) -> str:
|
| return self.tokenizer.decode(
|
| token_ids=token_ids,
|
| skip_special_tokens=skip_special_tokens,
|
| clean_up_tokenization_spaces=clean_up_tokenization_spaces,
|
| **kwargs,
|
| )
|
|
|
| @property
|
| def model_input_names(self) -> List[str]:
|
| tokenizer_input_names = self.tokenizer.model_input_names
|
| image_processor_input_names = self.image_processor.model_input_names
|
|
|
| return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
|
|
|