# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections import UserDict from typing import Any, Dict, List, Optional, Tuple, Union import numpy as np import paddle import PIL.Image from packaging import version from ...common.tokenizer.tokenizer_utils_base import ExplicitEnum def is_paddle_tensor(tensor): return paddle.is_tensor(tensor) def to_numpy(obj): """ Convert a TensorFlow tensor, PyTorch tensor, Numpy array or python list to a Numpy array. """ if isinstance(obj, (dict, UserDict)): return {k: to_numpy(v) for k, v in obj.items()} elif isinstance(obj, (list, tuple)): return np.array(obj) elif is_paddle_tensor(obj): return obj.detach().cpu().numpy() else: return obj if version.parse(version.parse(PIL.__version__).base_version) >= version.parse("9.1.0"): PILImageResampling = PIL.Image.Resampling else: PILImageResampling = PIL.Image ImageInput = Union[ "PIL.Image.Image", np.ndarray, "paddle.Tensor", List["PIL.Image.Image"], List[np.ndarray], List["paddle.Tensor"], ] # noqa TextInput = str class ChannelDimension(ExplicitEnum): FIRST = "channels_first" LAST = "channels_last" class TensorType(ExplicitEnum): """ Possible values for the `return_tensors` argument in [`PretrainedTokenizerBase.__call__`]. Useful for tab-completion in an IDE. """ PADDLE = "pd" NUMPY = "np" def is_valid_image(img): return ( isinstance(img, PIL.Image.Image) or isinstance(img, np.ndarray) or is_paddle_tensor(img) ) def valid_images(imgs): # If we have an list of images, make sure every image is valid if isinstance(imgs, (list, tuple)): for img in imgs: if not valid_images(img): return False # If not a list of tuple, we have been given a single image or batched tensor of images elif not is_valid_image(imgs): return False return True def is_batched(img): if isinstance(img, (list, tuple)): return is_valid_image(img[0]) return False def make_list_of_images(images, expected_ndims: int = 3) -> List[ImageInput]: """ Ensure that the input is a list of images. If the input is a single image, it is converted to a list of length 1. If the input is a batch of images, it is converted to a list of images. Args: images (`ImageInput`): Image of images to turn into a list of images. expected_ndims (`int`, *optional*, defaults to 3): Expected number of dimensions for a single input image. If the input image has a different number of dimensions, an error is raised. """ if is_batched(images): return images # Either the input is a single image, in which case we create a list of length 1 if isinstance(images, PIL.Image.Image): # PIL images are never batched return [images] if is_valid_image(images): if images.ndim == expected_ndims + 1: # Batch of images images = list(images) elif images.ndim == expected_ndims: # Single image images = [images] else: raise ValueError( f"Invalid image shape. Expected either {expected_ndims + 1} or {expected_ndims} dimensions, but got" f" {images.ndim} dimensions." ) return images raise ValueError( "Invalid image type. Expected either PIL.Image.Image, numpy.ndarray, paddle.Tensor " f"but got {type(images)}." ) def to_numpy_array(img) -> np.ndarray: if not is_valid_image(img): raise ValueError(f"Invalid image type: {type(img)}") if isinstance(img, PIL.Image.Image): return np.array(img) return to_numpy(img) def infer_channel_dimension_format(image: np.ndarray) -> ChannelDimension: """ Infers the channel dimension format of `image`. Args: image (`np.ndarray`): The image to infer the channel dimension of. Returns: The channel dimension of the image. """ if image.ndim == 3: first_dim, last_dim = 0, 2 elif image.ndim == 4: first_dim, last_dim = 1, 3 else: raise ValueError(f"Unsupported number of image dimensions: {image.ndim}") if image.shape[first_dim] in (1, 3): return ChannelDimension.FIRST elif image.shape[last_dim] in (1, 3): return ChannelDimension.LAST raise ValueError("Unable to infer channel dimension format") def get_channel_dimension_axis(image: np.ndarray) -> int: """ Returns the channel dimension axis of the image. Args: image (`np.ndarray`): The image to get the channel dimension axis of. Returns: The channel dimension axis of the image. """ channel_dim = infer_channel_dimension_format(image) if channel_dim == ChannelDimension.FIRST: return image.ndim - 3 elif channel_dim == ChannelDimension.LAST: return image.ndim - 1 raise ValueError(f"Unsupported data format: {channel_dim}") def get_image_size( image: np.ndarray, channel_dim: ChannelDimension = None ) -> Tuple[int, int]: """ Returns the (height, width) dimensions of the image. Args: image (`np.ndarray`): The image to get the dimensions of. channel_dim (`ChannelDimension`, *optional*): Which dimension the channel dimension is in. If `None`, will infer the channel dimension from the image. Returns: A tuple of the image's height and width. """ if channel_dim is None: channel_dim = infer_channel_dimension_format(image) if channel_dim == ChannelDimension.FIRST: return image.shape[-2], image.shape[-1] elif channel_dim == ChannelDimension.LAST: return image.shape[-3], image.shape[-2] else: raise ValueError(f"Unsupported data format: {channel_dim}") def convert_to_rgb(image: ImageInput) -> ImageInput: """ Converts an image to RGB format. Only converts if the image is of type PIL.Image.Image, otherwise returns the image as is. Args: image (Image): The image to convert. """ if not isinstance(image, PIL.Image.Image): return image image = image.convert("RGB") return image def to_channel_dimension_format( image: np.ndarray, channel_dim: Union[ChannelDimension, str], input_channel_dim: Optional[Union[ChannelDimension, str]] = None, ) -> np.ndarray: """ Converts `image` to the channel dimension format specified by `channel_dim`. Args: image (`numpy.ndarray`): The image to have its channel dimension set. channel_dim (`ChannelDimension`): The channel dimension format to use. Returns: `np.ndarray`: The image with the channel dimension set to `channel_dim`. """ if not isinstance(image, np.ndarray): raise ValueError(f"Input image must be of type np.ndarray, got {type(image)}") if input_channel_dim is None: input_channel_dim = infer_channel_dimension_format(image) target_channel_dim = ChannelDimension(channel_dim) if input_channel_dim == target_channel_dim: return image if target_channel_dim == ChannelDimension.FIRST: image = image.transpose((2, 0, 1)) elif target_channel_dim == ChannelDimension.LAST: image = image.transpose((1, 2, 0)) else: raise ValueError("Unsupported channel dimension format: {}".format(channel_dim)) return image class BatchFeature(UserDict): r""" Holds the feature extractor specific `__call__` methods. This class is derived from a python dictionary and can be used as a dictionary. Args: data (`dict`): Dictionary of lists/arrays/tensors returned by the __call__/pad methods ('input_values', 'attention_mask', etc.). tensor_type (`Union[None, str, TensorType]`, *optional*): You can give a tensor_type here to convert the lists of integers in Paddle/Numpy Tensors at initialization. """ def __init__( self, data: Optional[Dict[str, Any]] = None, tensor_type: Union[None, str, TensorType] = None, ): super().__init__(data) self.convert_to_tensors(tensor_type=tensor_type) def __getitem__(self, item: str): """ If the key is a string, returns the value of the dict associated to `key` ('input_values', 'attention_mask', etc.). """ if isinstance(item, str): return self.data[item] else: raise KeyError( "Indexing with integers is not available when using Python based feature extractors" ) def __getattr__(self, item: str): try: return self.data[item] except KeyError: raise AttributeError def __getstate__(self): return {"data": self.data} def __setstate__(self, state): if "data" in state: self.data = state["data"] def keys(self): return self.data.keys() def values(self): return self.data.values() def items(self): return self.data.items() def convert_to_tensors(self, tensor_type: Optional[Union[str, TensorType]] = None): """ Convert the inner content to tensors. Args: tensor_type (`str` or [`TensorType`], *optional*): The type of tensors to use. If `str`, should be one of the values of the enum [`TensorType`]. If `None`, no modification is done. """ if tensor_type is None: return self # Convert to TensorType if not isinstance(tensor_type, TensorType): tensor_type = TensorType(tensor_type) # Get a function reference for the correct framework if tensor_type == TensorType.PADDLE: as_tensor = paddle.to_tensor is_tensor = paddle.is_tensor else: as_tensor = np.asarray def is_tensor(x): return isinstance(x, np.ndarray) # Do the tensor conversion in batch for key, value in self.items(): try: if not is_tensor(value): tensor = as_tensor(value) self[key] = tensor except: # noqa E722 if key == "overflowing_tokens": raise ValueError( "Unable to create tensor returning overflowing tokens of different lengths. " "Please see if a fast version of this tokenizer is available to have this feature available." ) raise ValueError( "Unable to create tensor, you should probably activate truncation and/or padding " "with 'padding=True' 'truncation=True' to have batched tensors with the same length." ) return self