team-10/env/Lib/site-packages/transformers/image_processing_utils_fast.py
2025-08-02 07:34:44 +02:00

731 lines
27 KiB
Python

# Copyright 2024 The HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections.abc import Iterable
from copy import deepcopy
from functools import lru_cache, partial
from typing import Any, Optional, TypedDict, Union
import numpy as np
from .image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict
from .image_transforms import (
convert_to_rgb,
get_resize_output_image_size,
get_size_with_aspect_ratio,
group_images_by_shape,
reorder_images,
)
from .image_utils import (
ChannelDimension,
ImageInput,
ImageType,
SizeDict,
get_image_size,
get_image_size_for_max_height_width,
get_image_type,
infer_channel_dimension_format,
make_flat_list_of_images,
validate_kwargs,
validate_preprocess_arguments,
)
from .processing_utils import Unpack
from .utils import (
TensorType,
auto_docstring,
is_torch_available,
is_torchvision_available,
is_torchvision_v2_available,
is_vision_available,
logging,
)
from .utils.import_utils import is_rocm_platform
if is_vision_available():
from .image_utils import PILImageResampling
if is_torch_available():
import torch
if is_torchvision_available():
from .image_utils import pil_torch_interpolation_mapping
if is_torchvision_v2_available():
from torchvision.transforms.v2 import functional as F
else:
from torchvision.transforms import functional as F
else:
pil_torch_interpolation_mapping = None
logger = logging.get_logger(__name__)
@lru_cache(maxsize=10)
def validate_fast_preprocess_arguments(
do_rescale: Optional[bool] = None,
rescale_factor: Optional[float] = None,
do_normalize: Optional[bool] = None,
image_mean: Optional[Union[float, list[float]]] = None,
image_std: Optional[Union[float, list[float]]] = None,
do_pad: Optional[bool] = None,
size_divisibility: Optional[int] = None,
do_center_crop: Optional[bool] = None,
crop_size: Optional[SizeDict] = None,
do_resize: Optional[bool] = None,
size: Optional[SizeDict] = None,
resample: Optional["PILImageResampling"] = None,
return_tensors: Optional[Union[str, TensorType]] = None,
data_format: Optional[ChannelDimension] = ChannelDimension.FIRST,
):
"""
Checks validity of typically used arguments in an `ImageProcessorFast` `preprocess` method.
Raises `ValueError` if arguments incompatibility is caught.
"""
validate_preprocess_arguments(
do_rescale=do_rescale,
rescale_factor=rescale_factor,
do_normalize=do_normalize,
image_mean=image_mean,
image_std=image_std,
do_pad=do_pad,
size_divisibility=size_divisibility,
do_center_crop=do_center_crop,
crop_size=crop_size,
do_resize=do_resize,
size=size,
resample=resample,
)
# Extra checks for ImageProcessorFast
if return_tensors is not None and return_tensors != "pt":
raise ValueError("Only returning PyTorch tensors is currently supported.")
if data_format != ChannelDimension.FIRST:
raise ValueError("Only channel first data format is currently supported.")
def safe_squeeze(tensor: "torch.Tensor", axis: Optional[int] = None) -> "torch.Tensor":
"""
Squeezes a tensor, but only if the axis specified has dim 1.
"""
if axis is None:
return tensor.squeeze()
try:
return tensor.squeeze(axis=axis)
except ValueError:
return tensor
def max_across_indices(values: Iterable[Any]) -> list[Any]:
"""
Return the maximum value across all indices of an iterable of values.
"""
return [max(values_i) for values_i in zip(*values)]
def get_max_height_width(images: list["torch.Tensor"]) -> tuple[int]:
"""
Get the maximum height and width across all images in a batch.
"""
_, max_height, max_width = max_across_indices([img.shape for img in images])
return (max_height, max_width)
def divide_to_patches(
image: Union[np.array, "torch.Tensor"], patch_size: int
) -> list[Union[np.array, "torch.Tensor"]]:
"""
Divides an image into patches of a specified size.
Args:
image (`Union[np.array, "torch.Tensor"]`):
The input image.
patch_size (`int`):
The size of each patch.
Returns:
list: A list of Union[np.array, "torch.Tensor"] representing the patches.
"""
patches = []
height, width = get_image_size(image, channel_dim=ChannelDimension.FIRST)
for i in range(0, height, patch_size):
for j in range(0, width, patch_size):
patch = image[:, i : i + patch_size, j : j + patch_size]
patches.append(patch)
return patches
class DefaultFastImageProcessorKwargs(TypedDict, total=False):
do_resize: Optional[bool]
size: Optional[dict[str, int]]
default_to_square: Optional[bool]
resample: Optional[Union["PILImageResampling", "F.InterpolationMode"]]
do_center_crop: Optional[bool]
crop_size: Optional[dict[str, int]]
do_rescale: Optional[bool]
rescale_factor: Optional[Union[int, float]]
do_normalize: Optional[bool]
image_mean: Optional[Union[float, list[float]]]
image_std: Optional[Union[float, list[float]]]
do_convert_rgb: Optional[bool]
return_tensors: Optional[Union[str, TensorType]]
data_format: Optional[ChannelDimension]
input_data_format: Optional[Union[str, ChannelDimension]]
device: Optional["torch.device"]
disable_grouping: Optional[bool]
@auto_docstring
class BaseImageProcessorFast(BaseImageProcessor):
resample = None
image_mean = None
image_std = None
size = None
default_to_square = True
crop_size = None
do_resize = None
do_center_crop = None
do_rescale = None
rescale_factor = 1 / 255
do_normalize = None
do_convert_rgb = None
return_tensors = None
data_format = ChannelDimension.FIRST
input_data_format = None
device = None
model_input_names = ["pixel_values"]
valid_kwargs = DefaultFastImageProcessorKwargs
unused_kwargs = None
def __init__(
self,
**kwargs: Unpack[DefaultFastImageProcessorKwargs],
) -> None:
super().__init__(**kwargs)
kwargs = self.filter_out_unused_kwargs(kwargs)
size = kwargs.pop("size", self.size)
self.size = (
get_size_dict(size=size, default_to_square=kwargs.pop("default_to_square", self.default_to_square))
if size is not None
else None
)
crop_size = kwargs.pop("crop_size", self.crop_size)
self.crop_size = get_size_dict(crop_size, param_name="crop_size") if crop_size is not None else None
for key in self.valid_kwargs.__annotations__.keys():
kwarg = kwargs.pop(key, None)
if kwarg is not None:
setattr(self, key, kwarg)
else:
setattr(self, key, deepcopy(getattr(self, key, None)))
# get valid kwargs names
self._valid_kwargs_names = list(self.valid_kwargs.__annotations__.keys())
def resize(
self,
image: "torch.Tensor",
size: SizeDict,
interpolation: "F.InterpolationMode" = None,
antialias: bool = True,
**kwargs,
) -> "torch.Tensor":
"""
Resize an image to `(size["height"], size["width"])`.
Args:
image (`torch.Tensor`):
Image to resize.
size (`SizeDict`):
Dictionary in the format `{"height": int, "width": int}` specifying the size of the output image.
interpolation (`InterpolationMode`, *optional*, defaults to `InterpolationMode.BILINEAR`):
`InterpolationMode` filter to use when resizing the image e.g. `InterpolationMode.BICUBIC`.
Returns:
`torch.Tensor`: The resized image.
"""
interpolation = interpolation if interpolation is not None else F.InterpolationMode.BILINEAR
if size.shortest_edge and size.longest_edge:
# Resize the image so that the shortest edge or the longest edge is of the given size
# while maintaining the aspect ratio of the original image.
new_size = get_size_with_aspect_ratio(
image.size()[-2:],
size.shortest_edge,
size.longest_edge,
)
elif size.shortest_edge:
new_size = get_resize_output_image_size(
image,
size=size.shortest_edge,
default_to_square=False,
input_data_format=ChannelDimension.FIRST,
)
elif size.max_height and size.max_width:
new_size = get_image_size_for_max_height_width(image.size()[-2:], size.max_height, size.max_width)
elif size.height and size.width:
new_size = (size.height, size.width)
else:
raise ValueError(
"Size must contain 'height' and 'width' keys, or 'max_height' and 'max_width', or 'shortest_edge' key. Got"
f" {size}."
)
# This is a workaround to avoid a bug in torch.compile when dealing with uint8 on AMD MI3XX GPUs
# Tracked in PyTorch issue: https://github.com/pytorch/pytorch/issues/155209
# TODO: remove this once the bug is fixed (detected with torch==2.7.0+git1fee196, torchvision==0.22.0+9eb57cd)
if torch.compiler.is_compiling() and is_rocm_platform():
return self.compile_friendly_resize(image, new_size, interpolation, antialias)
return F.resize(image, new_size, interpolation=interpolation, antialias=antialias)
@staticmethod
def compile_friendly_resize(
image: "torch.Tensor",
new_size: tuple[int, int],
interpolation: Optional["F.InterpolationMode"] = None,
antialias: bool = True,
) -> "torch.Tensor":
"""
A wrapper around `F.resize` so that it is compatible with torch.compile when the image is a uint8 tensor.
"""
if image.dtype == torch.uint8:
image = image.float() / 256
image = F.resize(image, new_size, interpolation=interpolation, antialias=antialias)
image = image * 256
image = torch.where(image > 255, 255, image)
image = torch.where(image < 0, 0, image)
image = image.round().to(torch.uint8)
else:
image = F.resize(image, new_size, interpolation=interpolation, antialias=antialias)
return image
def rescale(
self,
image: "torch.Tensor",
scale: float,
**kwargs,
) -> "torch.Tensor":
"""
Rescale an image by a scale factor. image = image * scale.
Args:
image (`torch.Tensor`):
Image to rescale.
scale (`float`):
The scaling factor to rescale pixel values by.
Returns:
`torch.Tensor`: The rescaled image.
"""
return image * scale
def normalize(
self,
image: "torch.Tensor",
mean: Union[float, Iterable[float]],
std: Union[float, Iterable[float]],
**kwargs,
) -> "torch.Tensor":
"""
Normalize an image. image = (image - image_mean) / image_std.
Args:
image (`torch.Tensor`):
Image to normalize.
mean (`torch.Tensor`, `float` or `Iterable[float]`):
Image mean to use for normalization.
std (`torch.Tensor`, `float` or `Iterable[float]`):
Image standard deviation to use for normalization.
Returns:
`torch.Tensor`: The normalized image.
"""
return F.normalize(image, mean, std)
@lru_cache(maxsize=10)
def _fuse_mean_std_and_rescale_factor(
self,
do_normalize: Optional[bool] = None,
image_mean: Optional[Union[float, list[float]]] = None,
image_std: Optional[Union[float, list[float]]] = None,
do_rescale: Optional[bool] = None,
rescale_factor: Optional[float] = None,
device: Optional["torch.device"] = None,
) -> tuple:
if do_rescale and do_normalize:
# Fused rescale and normalize
image_mean = torch.tensor(image_mean, device=device) * (1.0 / rescale_factor)
image_std = torch.tensor(image_std, device=device) * (1.0 / rescale_factor)
do_rescale = False
return image_mean, image_std, do_rescale
def rescale_and_normalize(
self,
images: "torch.Tensor",
do_rescale: bool,
rescale_factor: float,
do_normalize: bool,
image_mean: Union[float, list[float]],
image_std: Union[float, list[float]],
) -> "torch.Tensor":
"""
Rescale and normalize images.
"""
image_mean, image_std, do_rescale = self._fuse_mean_std_and_rescale_factor(
do_normalize=do_normalize,
image_mean=image_mean,
image_std=image_std,
do_rescale=do_rescale,
rescale_factor=rescale_factor,
device=images.device,
)
# if/elif as we use fused rescale and normalize if both are set to True
if do_normalize:
images = self.normalize(images.to(dtype=torch.float32), image_mean, image_std)
elif do_rescale:
images = self.rescale(images, rescale_factor)
return images
def center_crop(
self,
image: "torch.Tensor",
size: dict[str, int],
**kwargs,
) -> "torch.Tensor":
"""
Center crop an image to `(size["height"], size["width"])`. If the input size is smaller than `crop_size` along
any edge, the image is padded with 0's and then center cropped.
Args:
image (`"torch.Tensor"`):
Image to center crop.
size (`dict[str, int]`):
Size of the output image.
Returns:
`torch.Tensor`: The center cropped image.
"""
if size.height is None or size.width is None:
raise ValueError(f"The size dictionary must have keys 'height' and 'width'. Got {size.keys()}")
return F.center_crop(image, (size["height"], size["width"]))
def convert_to_rgb(
self,
image: ImageInput,
) -> ImageInput:
"""
Converts an image to RGB format. Only converts if the image is of type PIL.Image.Image, otherwise returns the image
as is.
Args:
image (ImageInput):
The image to convert.
Returns:
ImageInput: The converted image.
"""
return convert_to_rgb(image)
def filter_out_unused_kwargs(self, kwargs: dict):
"""
Filter out the unused kwargs from the kwargs dictionary.
"""
if self.unused_kwargs is None:
return kwargs
for kwarg_name in self.unused_kwargs:
if kwarg_name in kwargs:
logger.warning_once(f"This processor does not use the `{kwarg_name}` parameter. It will be ignored.")
kwargs.pop(kwarg_name)
return kwargs
def _prepare_images_structure(
self,
images: ImageInput,
expected_ndims: int = 3,
) -> ImageInput:
"""
Prepare the images structure for processing.
Args:
images (`ImageInput`):
The input images to process.
Returns:
`ImageInput`: The images with a valid nesting.
"""
return make_flat_list_of_images(images, expected_ndims=expected_ndims)
def _process_image(
self,
image: ImageInput,
do_convert_rgb: Optional[bool] = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
device: Optional["torch.device"] = None,
) -> "torch.Tensor":
image_type = get_image_type(image)
if image_type not in [ImageType.PIL, ImageType.TORCH, ImageType.NUMPY]:
raise ValueError(f"Unsupported input image type {image_type}")
if do_convert_rgb:
image = self.convert_to_rgb(image)
if image_type == ImageType.PIL:
image = F.pil_to_tensor(image)
elif image_type == ImageType.NUMPY:
# not using F.to_tensor as it doesn't handle (C, H, W) numpy arrays
image = torch.from_numpy(image).contiguous()
# If the image is 2D, we need to unsqueeze it to add a channel dimension for processing
if image.ndim == 2:
image = image.unsqueeze(0)
# Infer the channel dimension format if not provided
if input_data_format is None:
input_data_format = infer_channel_dimension_format(image)
if input_data_format == ChannelDimension.LAST:
# We force the channel dimension to be first for torch tensors as this is what torchvision expects.
image = image.permute(2, 0, 1).contiguous()
# Now that we have torch tensors, we can move them to the right device
if device is not None:
image = image.to(device)
return image
def _prepare_image_like_inputs(
self,
images: ImageInput,
do_convert_rgb: Optional[bool] = None,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
device: Optional["torch.device"] = None,
expected_ndims: int = 3,
) -> list["torch.Tensor"]:
"""
Prepare image-like inputs for processing.
Args:
images (`ImageInput`):
The image-like inputs to process.
do_convert_rgb (`bool`, *optional*):
Whether to convert the images to RGB.
input_data_format (`str` or `ChannelDimension`, *optional*):
The input data format of the images.
device (`torch.device`, *optional*):
The device to put the processed images on.
expected_ndims (`int`, *optional*):
The expected number of dimensions for the images. (can be 2 for segmentation maps etc.)
Returns:
List[`torch.Tensor`]: The processed images.
"""
# Get structured images (potentially nested)
images = self._prepare_images_structure(images, expected_ndims=expected_ndims)
process_image_partial = partial(
self._process_image, do_convert_rgb=do_convert_rgb, input_data_format=input_data_format, device=device
)
# Check if we have nested structure, assuming the nesting is consistent
has_nested_structure = len(images) > 0 and isinstance(images[0], (list, tuple))
if has_nested_structure:
processed_images = [[process_image_partial(img) for img in nested_list] for nested_list in images]
else:
processed_images = [process_image_partial(img) for img in images]
return processed_images
def _further_process_kwargs(
self,
size: Optional[SizeDict] = None,
crop_size: Optional[SizeDict] = None,
default_to_square: Optional[bool] = None,
image_mean: Optional[Union[float, list[float]]] = None,
image_std: Optional[Union[float, list[float]]] = None,
data_format: Optional[ChannelDimension] = None,
**kwargs,
) -> dict:
"""
Update kwargs that need further processing before being validated
Can be overridden by subclasses to customize the processing of kwargs.
"""
if kwargs is None:
kwargs = {}
if size is not None:
size = SizeDict(**get_size_dict(size=size, default_to_square=default_to_square))
if crop_size is not None:
crop_size = SizeDict(**get_size_dict(crop_size, param_name="crop_size"))
if isinstance(image_mean, list):
image_mean = tuple(image_mean)
if isinstance(image_std, list):
image_std = tuple(image_std)
if data_format is None:
data_format = ChannelDimension.FIRST
kwargs["size"] = size
kwargs["crop_size"] = crop_size
kwargs["default_to_square"] = default_to_square
kwargs["image_mean"] = image_mean
kwargs["image_std"] = image_std
kwargs["data_format"] = data_format
return kwargs
def _validate_preprocess_kwargs(
self,
do_rescale: Optional[bool] = None,
rescale_factor: Optional[float] = None,
do_normalize: Optional[bool] = None,
image_mean: Optional[Union[float, tuple[float]]] = None,
image_std: Optional[Union[float, tuple[float]]] = None,
do_resize: Optional[bool] = None,
size: Optional[SizeDict] = None,
do_center_crop: Optional[bool] = None,
crop_size: Optional[SizeDict] = None,
resample: Optional[Union["PILImageResampling", "F.InterpolationMode"]] = None,
return_tensors: Optional[Union[str, TensorType]] = None,
data_format: Optional[ChannelDimension] = None,
**kwargs,
):
"""
validate the kwargs for the preprocess method.
"""
validate_fast_preprocess_arguments(
do_rescale=do_rescale,
rescale_factor=rescale_factor,
do_normalize=do_normalize,
image_mean=image_mean,
image_std=image_std,
do_resize=do_resize,
size=size,
do_center_crop=do_center_crop,
crop_size=crop_size,
resample=resample,
return_tensors=return_tensors,
data_format=data_format,
)
def __call__(self, images: ImageInput, *args, **kwargs: Unpack[DefaultFastImageProcessorKwargs]) -> BatchFeature:
return self.preprocess(images, *args, **kwargs)
@auto_docstring
def preprocess(self, images: ImageInput, *args, **kwargs: Unpack[DefaultFastImageProcessorKwargs]) -> BatchFeature:
# args are not validated, but their order in the `preprocess` and `_preprocess` signatures must be the same
validate_kwargs(captured_kwargs=kwargs.keys(), valid_processor_keys=self._valid_kwargs_names)
# Set default kwargs from self. This ensures that if a kwarg is not provided
# by the user, it gets its default value from the instance, or is set to None.
for kwarg_name in self._valid_kwargs_names:
kwargs.setdefault(kwarg_name, getattr(self, kwarg_name, None))
# Extract parameters that are only used for preparing the input images
do_convert_rgb = kwargs.pop("do_convert_rgb")
input_data_format = kwargs.pop("input_data_format")
device = kwargs.pop("device")
# Update kwargs that need further processing before being validated
kwargs = self._further_process_kwargs(**kwargs)
# Validate kwargs
self._validate_preprocess_kwargs(**kwargs)
# torch resize uses interpolation instead of resample
resample = kwargs.pop("resample")
# Check if resample is an int before checking if it's an instance of PILImageResampling
# because if pillow < 9.1.0, resample is an int and PILImageResampling is a module.
# Checking PILImageResampling will fail with error `TypeError: isinstance() arg 2 must be a type or tuple of types`.
kwargs["interpolation"] = (
pil_torch_interpolation_mapping[resample] if isinstance(resample, (int, PILImageResampling)) else resample
)
# Pop kwargs that are not needed in _preprocess
kwargs.pop("default_to_square")
kwargs.pop("data_format")
return self._preprocess_image_like_inputs(
images, *args, do_convert_rgb=do_convert_rgb, input_data_format=input_data_format, device=device, **kwargs
)
def _preprocess_image_like_inputs(
self,
images: ImageInput,
*args,
do_convert_rgb: bool,
input_data_format: ChannelDimension,
device: Optional[Union[str, "torch.device"]] = None,
**kwargs: Unpack[DefaultFastImageProcessorKwargs],
) -> BatchFeature:
"""
Preprocess image-like inputs.
To be overriden by subclasses when image-like inputs other than images should be processed.
It can be used for segmentation maps, depth maps, etc.
"""
# Prepare input images
images = self._prepare_image_like_inputs(
images=images, do_convert_rgb=do_convert_rgb, input_data_format=input_data_format, device=device
)
return self._preprocess(images, *args, **kwargs)
def _preprocess(
self,
images: list["torch.Tensor"],
do_resize: bool,
size: SizeDict,
interpolation: Optional["F.InterpolationMode"],
do_center_crop: bool,
crop_size: SizeDict,
do_rescale: bool,
rescale_factor: float,
do_normalize: bool,
image_mean: Optional[Union[float, list[float]]],
image_std: Optional[Union[float, list[float]]],
disable_grouping: Optional[bool],
return_tensors: Optional[Union[str, TensorType]],
**kwargs,
) -> BatchFeature:
# Group images by size for batched resizing
grouped_images, grouped_images_index = group_images_by_shape(images, disable_grouping=disable_grouping)
resized_images_grouped = {}
for shape, stacked_images in grouped_images.items():
if do_resize:
stacked_images = self.resize(image=stacked_images, size=size, interpolation=interpolation)
resized_images_grouped[shape] = stacked_images
resized_images = reorder_images(resized_images_grouped, grouped_images_index)
# Group images by size for further processing
# Needed in case do_resize is False, or resize returns images with different sizes
grouped_images, grouped_images_index = group_images_by_shape(resized_images, disable_grouping=disable_grouping)
processed_images_grouped = {}
for shape, stacked_images in grouped_images.items():
if do_center_crop:
stacked_images = self.center_crop(stacked_images, crop_size)
# Fused rescale and normalize
stacked_images = self.rescale_and_normalize(
stacked_images, do_rescale, rescale_factor, do_normalize, image_mean, image_std
)
processed_images_grouped[shape] = stacked_images
processed_images = reorder_images(processed_images_grouped, grouped_images_index)
processed_images = torch.stack(processed_images, dim=0) if return_tensors else processed_images
return BatchFeature(data={"pixel_values": processed_images}, tensor_type=return_tensors)
def to_dict(self):
encoder_dict = super().to_dict()
encoder_dict.pop("_valid_processor_keys", None)
encoder_dict.pop("_valid_kwargs_names", None)
return encoder_dict