| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
- # Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import base64
- import math
- from collections import UserDict
- from io import BytesIO
- from typing import Any, Dict, List, Optional, Tuple, Union
- import numpy as np
- import paddle
- import PIL.Image
- import requests
- from packaging import version
- from PIL import Image
- from ...common.tokenizer.tokenizer_utils_base import ExplicitEnum
- def is_paddle_tensor(tensor):
- return paddle.is_tensor(tensor)
- def to_numpy(obj):
- """
- Convert a TensorFlow tensor, PyTorch tensor, Numpy array or python list to a Numpy array.
- """
- if isinstance(obj, (dict, UserDict)):
- return {k: to_numpy(v) for k, v in obj.items()}
- elif isinstance(obj, (list, tuple)):
- return np.array(obj)
- elif is_paddle_tensor(obj):
- return obj.detach().cpu().numpy()
- else:
- return obj
- if version.parse(version.parse(PIL.__version__).base_version) >= version.parse("9.1.0"):
- PILImageResampling = PIL.Image.Resampling
- else:
- PILImageResampling = PIL.Image
- ImageInput = Union[
- "PIL.Image.Image",
- np.ndarray,
- "paddle.Tensor",
- List["PIL.Image.Image"],
- List[np.ndarray],
- List["paddle.Tensor"],
- ] # noqa
- TextInput = str
- class ChannelDimension(ExplicitEnum):
- FIRST = "channels_first"
- LAST = "channels_last"
- class TensorType(ExplicitEnum):
- """
- Possible values for the `return_tensors` argument in [`PretrainedTokenizerBase.__call__`]. Useful for
- tab-completion in an IDE.
- """
- PADDLE = "pd"
- NUMPY = "np"
- def is_valid_image(img):
- return (
- isinstance(img, PIL.Image.Image)
- or isinstance(img, np.ndarray)
- or is_paddle_tensor(img)
- )
- def valid_images(imgs):
- # If we have an list of images, make sure every image is valid
- if isinstance(imgs, (list, tuple)):
- for img in imgs:
- if not valid_images(img):
- return False
- # If not a list of tuple, we have been given a single image or batched tensor of images
- elif not is_valid_image(imgs):
- return False
- return True
- def is_batched(img):
- if isinstance(img, (list, tuple)):
- return is_valid_image(img[0])
- return False
- def make_list_of_images(images, expected_ndims: int = 3) -> List[ImageInput]:
- """
- Ensure that the input is a list of images. If the input is a single image, it is converted to a list of length 1.
- If the input is a batch of images, it is converted to a list of images.
- Args:
- images (`ImageInput`):
- Image of images to turn into a list of images.
- expected_ndims (`int`, *optional*, defaults to 3):
- Expected number of dimensions for a single input image. If the input image has a different number of
- dimensions, an error is raised.
- """
- if is_batched(images):
- return images
- # Either the input is a single image, in which case we create a list of length 1
- if isinstance(images, PIL.Image.Image):
- # PIL images are never batched
- return [images]
- if is_valid_image(images):
- if images.ndim == expected_ndims + 1:
- # Batch of images
- images = list(images)
- elif images.ndim == expected_ndims:
- # Single image
- images = [images]
- else:
- raise ValueError(
- f"Invalid image shape. Expected either {expected_ndims + 1} or {expected_ndims} dimensions, but got"
- f" {images.ndim} dimensions."
- )
- return images
- raise ValueError(
- "Invalid image type. Expected either PIL.Image.Image, numpy.ndarray, paddle.Tensor "
- f"but got {type(images)}."
- )
- def to_numpy_array(img) -> np.ndarray:
- if not is_valid_image(img):
- raise ValueError(f"Invalid image type: {type(img)}")
- if isinstance(img, PIL.Image.Image):
- return np.array(img)
- return to_numpy(img)
- def infer_channel_dimension_format(image: np.ndarray) -> ChannelDimension:
- """
- Infers the channel dimension format of `image`.
- Args:
- image (`np.ndarray`):
- The image to infer the channel dimension of.
- Returns:
- The channel dimension of the image.
- """
- if image.ndim == 3:
- first_dim, last_dim = 0, 2
- elif image.ndim == 4:
- first_dim, last_dim = 1, 3
- else:
- raise ValueError(f"Unsupported number of image dimensions: {image.ndim}")
- if image.shape[first_dim] in (1, 3):
- return ChannelDimension.FIRST
- elif image.shape[last_dim] in (1, 3):
- return ChannelDimension.LAST
- raise ValueError("Unable to infer channel dimension format")
- def get_channel_dimension_axis(image: np.ndarray) -> int:
- """
- Returns the channel dimension axis of the image.
- Args:
- image (`np.ndarray`):
- The image to get the channel dimension axis of.
- Returns:
- The channel dimension axis of the image.
- """
- channel_dim = infer_channel_dimension_format(image)
- if channel_dim == ChannelDimension.FIRST:
- return image.ndim - 3
- elif channel_dim == ChannelDimension.LAST:
- return image.ndim - 1
- raise ValueError(f"Unsupported data format: {channel_dim}")
- def get_image_size(
- image: np.ndarray, channel_dim: ChannelDimension = None
- ) -> Tuple[int, int]:
- """
- Returns the (height, width) dimensions of the image.
- Args:
- image (`np.ndarray`):
- The image to get the dimensions of.
- channel_dim (`ChannelDimension`, *optional*):
- Which dimension the channel dimension is in. If `None`, will infer the channel dimension from the image.
- Returns:
- A tuple of the image's height and width.
- """
- if channel_dim is None:
- channel_dim = infer_channel_dimension_format(image)
- if channel_dim == ChannelDimension.FIRST:
- return image.shape[-2], image.shape[-1]
- elif channel_dim == ChannelDimension.LAST:
- return image.shape[-3], image.shape[-2]
- else:
- raise ValueError(f"Unsupported data format: {channel_dim}")
- def convert_to_rgb(image: ImageInput) -> ImageInput:
- """
- Converts an image to RGB format. Only converts if the image is of type PIL.Image.Image, otherwise returns the image
- as is.
- Args:
- image (Image):
- The image to convert.
- """
- if not isinstance(image, PIL.Image.Image):
- return image
- image = image.convert("RGB")
- return image
- def to_channel_dimension_format(
- image: np.ndarray,
- channel_dim: Union[ChannelDimension, str],
- input_channel_dim: Optional[Union[ChannelDimension, str]] = None,
- ) -> np.ndarray:
- """
- Converts `image` to the channel dimension format specified by `channel_dim`.
- Args:
- image (`numpy.ndarray`):
- The image to have its channel dimension set.
- channel_dim (`ChannelDimension`):
- The channel dimension format to use.
- Returns:
- `np.ndarray`: The image with the channel dimension set to `channel_dim`.
- """
- if not isinstance(image, np.ndarray):
- raise ValueError(f"Input image must be of type np.ndarray, got {type(image)}")
- if input_channel_dim is None:
- input_channel_dim = infer_channel_dimension_format(image)
- target_channel_dim = ChannelDimension(channel_dim)
- if input_channel_dim == target_channel_dim:
- return image
- if target_channel_dim == ChannelDimension.FIRST:
- image = image.transpose((2, 0, 1))
- elif target_channel_dim == ChannelDimension.LAST:
- image = image.transpose((1, 2, 0))
- else:
- raise ValueError("Unsupported channel dimension format: {}".format(channel_dim))
- return image
- class BatchFeature(UserDict):
- r"""
- Holds the feature extractor specific `__call__` methods.
- This class is derived from a python dictionary and can be used as a dictionary.
- Args:
- data (`dict`):
- Dictionary of lists/arrays/tensors returned by the __call__/pad methods ('input_values', 'attention_mask',
- etc.).
- tensor_type (`Union[None, str, TensorType]`, *optional*):
- You can give a tensor_type here to convert the lists of integers in Paddle/Numpy Tensors at
- initialization.
- """
- def __init__(
- self,
- data: Optional[Dict[str, Any]] = None,
- tensor_type: Union[None, str, TensorType] = None,
- ):
- super().__init__(data)
- self.convert_to_tensors(tensor_type=tensor_type)
- def __getitem__(self, item: str):
- """
- If the key is a string, returns the value of the dict associated to `key` ('input_values', 'attention_mask',
- etc.).
- """
- if isinstance(item, str):
- return self.data[item]
- else:
- raise KeyError(
- "Indexing with integers is not available when using Python based feature extractors"
- )
- def __getattr__(self, item: str):
- try:
- return self.data[item]
- except KeyError:
- raise AttributeError
- def __getstate__(self):
- return {"data": self.data}
- def __setstate__(self, state):
- if "data" in state:
- self.data = state["data"]
- def keys(self):
- return self.data.keys()
- def values(self):
- return self.data.values()
- def items(self):
- return self.data.items()
- def convert_to_tensors(self, tensor_type: Optional[Union[str, TensorType]] = None):
- """
- Convert the inner content to tensors.
- Args:
- tensor_type (`str` or [`TensorType`], *optional*):
- The type of tensors to use. If `str`, should be one of the values of the enum [`TensorType`]. If
- `None`, no modification is done.
- """
- if tensor_type is None:
- return self
- # Convert to TensorType
- if not isinstance(tensor_type, TensorType):
- tensor_type = TensorType(tensor_type)
- # Get a function reference for the correct framework
- if tensor_type == TensorType.PADDLE:
- as_tensor = paddle.to_tensor
- is_tensor = paddle.is_tensor
- else:
- as_tensor = np.asarray
- def is_tensor(x):
- return isinstance(x, np.ndarray)
- # Do the tensor conversion in batch
- for key, value in self.items():
- try:
- if not is_tensor(value):
- tensor = as_tensor(value)
- self[key] = tensor
- except: # noqa E722
- if key == "overflowing_tokens":
- raise ValueError(
- "Unable to create tensor returning overflowing tokens of different lengths. "
- "Please see if a fast version of this tokenizer is available to have this feature available."
- )
- raise ValueError(
- "Unable to create tensor, you should probably activate truncation and/or padding "
- "with 'padding=True' 'truncation=True' to have batched tensors with the same length."
- )
- return self
- class PaddingStrategy(ExplicitEnum):
- """
- Possible values for the `padding` argument in [`PretrainedTokenizerBase.__call__`]. Useful for tab-completion in an
- IDE.
- """
- LONGEST = "longest"
- MAX_LENGTH = "max_length"
- DO_NOT_PAD = "do_not_pad"
- def extract_vision_info(
- conversations: Union[List[dict], List[List[dict]]]
- ) -> List[dict]:
- vision_infos = []
- if isinstance(conversations[0], dict):
- conversations = [conversations]
- for conversation in conversations:
- for message in conversation:
- if isinstance(message["content"], list):
- for ele in message["content"]:
- if (
- "image" in ele
- or "image_url" in ele
- or ele["type"] in ("image", "image_url")
- ):
- vision_infos.append(ele)
- return vision_infos
- def process_vision_info(
- conversations: Union[List[dict], List[List[dict]]],
- ) -> Tuple[
- Union[List[Image.Image], None, List[Union[paddle.Tensor, List[Image.Image]]], None]
- ]:
- vision_infos = extract_vision_info(conversations)
- image_inputs = []
- for vision_info in vision_infos:
- if "image" in vision_info or "image_url" in vision_info:
- image_inputs.append(fetch_image(vision_info))
- else:
- raise ValueError("image, image_url should in content.")
- if len(image_inputs) == 0:
- image_inputs = None
- return image_inputs
- def fetch_image(
- ele: Dict[str, Union[str, Image.Image]],
- size_factor: int,
- min_pixels: int,
- max_pixels: int,
- max_ratio: float,
- ) -> Image.Image:
- if not isinstance(ele, dict):
- ele = {"image": ele}
- if "image" in ele:
- image = ele["image"]
- else:
- image = ele["image_url"]
- image_obj = None
- if isinstance(image, Image.Image):
- image_obj = image
- elif isinstance(image, np.ndarray):
- image_obj = Image.fromarray(image)
- elif image.startswith("http://") or image.startswith("https://"):
- image_obj = Image.open(requests.get(image, stream=True).raw)
- elif image.startswith("file://"):
- image_obj = Image.open(image[7:])
- elif image.startswith("data:image"):
- data = image.split(";", 1)[1]
- if data.startswith("base64,"):
- data = base64.b64decode(data[7:])
- image_obj = Image.open(BytesIO(data))
- else:
- image_obj = Image.open(image)
- if image_obj is None:
- raise ValueError(
- f"Unrecognized image input, support local path, http url, base64 and PIL.Image, got {image}"
- )
- image = image_obj.convert("RGB")
- # resize
- if "resized_height" in ele and "resized_width" in ele:
- resized_height, resized_width = smart_resize(
- ele["resized_height"],
- ele["resized_width"],
- factor=size_factor,
- min_pixels=min_pixels,
- max_pixels=max_pixels,
- max_ratio=max_ratio,
- )
- else:
- width, height = image.size # Image, not tensor
- min_pixels = ele.get("min_pixels", min_pixels)
- max_pixels = ele.get("max_pixels", max_pixels)
- resized_height, resized_width = smart_resize(
- height,
- width,
- factor=size_factor,
- min_pixels=min_pixels,
- max_pixels=max_pixels,
- max_ratio=max_ratio,
- )
- image = image.resize((resized_width, resized_height))
- return image
- def round_by_factor(number: int, factor: int) -> int:
- """Returns the closest integer to 'number' that is divisible by 'factor'."""
- return round(number / factor) * factor
- def ceil_by_factor(number: int, factor: int) -> int:
- """Returns the smallest integer greater than or equal to 'number' that is divisible by 'factor'."""
- return math.ceil(number / factor) * factor
- def floor_by_factor(number: int, factor: int) -> int:
- """Returns the largest integer less than or equal to 'number' that is divisible by 'factor'."""
- return math.floor(number / factor) * factor
- def smart_resize(
- height: int,
- width: int,
- factor: int,
- min_pixels: int,
- max_pixels: int,
- max_ratio: float,
- ) -> Tuple[int, int]:
- """
- Rescales the image so that the following conditions are met:
- 1. Both dimensions (height and width) are divisible by 'factor'.
- 2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
- 3. The aspect ratio of the image is maintained as closely as possible.
- """
- if max(height, width) / min(height, width) > max_ratio:
- raise ValueError(
- f"absolute aspect ratio must be smaller than {max_ratio}, got {max(height, width) / min(height, width)}"
- )
- h_bar = max(factor, round_by_factor(height, factor))
- w_bar = max(factor, round_by_factor(width, factor))
- if h_bar * w_bar > max_pixels:
- beta = math.sqrt((height * width) / max_pixels)
- h_bar = floor_by_factor(height / beta, factor)
- w_bar = floor_by_factor(width / beta, factor)
- elif h_bar * w_bar < min_pixels:
- beta = math.sqrt(min_pixels / (height * width))
- h_bar = ceil_by_factor(height * beta, factor)
- w_bar = ceil_by_factor(width * beta, factor)
- return h_bar, w_bar
- def make_batched_images(images) -> List[List[ImageInput]]:
- """
- Accepts images in list or nested list format, and makes a list of images for preprocessing.
- Args:
- images (`Union[List[List[ImageInput]], List[ImageInput], ImageInput]`):
- The input image.
- Returns:
- list: A list of images.
- """
- if (
- isinstance(images, (list, tuple))
- and isinstance(images[0], (list, tuple))
- and is_valid_image(images[0][0])
- ):
- return [img for img_list in images for img in img_list]
- elif isinstance(images, (list, tuple)) and is_valid_image(images[0]):
- return images
- elif is_valid_image(images):
- return [images]
- raise ValueError(f"Could not make batched images from {images}")
|