predictor.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. # Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Any, List, Optional, Sequence
  15. import numpy as np
  16. from ....modules.keypoint_detection.model_list import MODELS
  17. from ....utils import logging
  18. from ...common.batch_sampler import ImageBatchSampler
  19. from ..common import ToBatch
  20. from ..object_detection import DetPredictor
  21. from .processors import KptPostProcess, TopDownAffine
  22. from .result import KptResult
  23. class KptBatchSampler(ImageBatchSampler):
  24. # don't support to pass pdf file as input
  25. PDF_SUFFIX = []
  26. def sample(self, inputs):
  27. if not isinstance(inputs, list):
  28. inputs = [inputs]
  29. batch = []
  30. for input in inputs:
  31. if isinstance(input, (np.ndarray, dict)):
  32. batch.append(input)
  33. if len(batch) == self.batch_size:
  34. yield batch
  35. batch = []
  36. elif isinstance(input, str):
  37. file_path = (
  38. self._download_from_url(input)
  39. if input.startswith("http")
  40. else input
  41. )
  42. file_list = self._get_files_list(file_path)
  43. for file_path in file_list:
  44. batch.append(file_path)
  45. if len(batch) == self.batch_size:
  46. yield batch
  47. batch = []
  48. else:
  49. logging.warning(
  50. f"Not supported input data type! Only `numpy.ndarray` and `str` are supported! So has been ignored: {input}."
  51. )
  52. if len(batch) > 0:
  53. yield batch
  54. class KptPredictor(DetPredictor):
  55. entities = MODELS
  56. flip_perm = [ # The left-right joints exchange order list
  57. [1, 2],
  58. [3, 4],
  59. [5, 6],
  60. [7, 8],
  61. [9, 10],
  62. [11, 12],
  63. [13, 14],
  64. [15, 16],
  65. ]
  66. def __init__(
  67. self,
  68. *args,
  69. flip: bool = False,
  70. use_udp: Optional[bool] = None,
  71. **kwargs,
  72. ):
  73. """Keypoint Predictor
  74. Args:
  75. flip (bool): Whether to do flipping test. Default value is ``False``.
  76. use_udp (Optional[bool]): Whether to use unbiased data processing. Default value is ``None``.
  77. """
  78. self.flip = flip
  79. self.use_udp = use_udp
  80. super().__init__(*args, **kwargs)
  81. for op in self.pre_ops:
  82. if isinstance(op, TopDownAffine):
  83. self.input_size = op.input_size
  84. break
  85. if any([name in self.model_name for name in ["PP-TinyPose"]]):
  86. self.shift_heatmap = True
  87. else:
  88. self.shift_heatmap = False
  89. def _build_batch_sampler(self):
  90. return KptBatchSampler()
  91. def _get_result_class(self):
  92. return KptResult
  93. def _format_output(self, pred: Sequence[Any]) -> List[dict]:
  94. """Transform batch outputs into a list of single image output."""
  95. return [
  96. {
  97. "heatmap": res[0],
  98. "masks": res[1],
  99. }
  100. for res in zip(*pred)
  101. ]
  102. def flip_back(self, output_flipped, matched_parts):
  103. assert (
  104. output_flipped.ndim == 4
  105. ), "output_flipped should be [batch_size, num_joints, height, width]"
  106. output_flipped = output_flipped[:, :, :, ::-1]
  107. for pair in matched_parts:
  108. tmp = output_flipped[:, pair[0], :, :].copy()
  109. output_flipped[:, pair[0], :, :] = output_flipped[:, pair[1], :, :]
  110. output_flipped[:, pair[1], :, :] = tmp
  111. return output_flipped
  112. def process(self, batch_data: List[dict]):
  113. """
  114. Process a batch of data through the preprocessing, inference, and postprocessing.
  115. Args:
  116. batch_data (List[Union[str, np.ndarray], ...]): A batch of input data (e.g., image file paths).
  117. Returns:
  118. dict: A dictionary containing the input path, raw image, class IDs, scores, and label names
  119. for every instance of the batch. Keys include 'input_path', 'input_img', 'class_ids', 'scores', and 'label_names'.
  120. """
  121. datas = batch_data
  122. # preprocess
  123. for pre_op in self.pre_ops[:-1]:
  124. datas = pre_op(datas)
  125. # use `ToBatch` format batch inputs
  126. batch_inputs = self.pre_ops[-1]([data["img"] for data in datas])
  127. # do infer
  128. batch_preds = self.infer(batch_inputs)
  129. if self.flip:
  130. # flip w
  131. batch_inputs[0] = np.flip(batch_inputs[0], axis=3)
  132. preds_flipped = self.infer(batch_inputs)
  133. output_flipped = self.flip_back(preds_flipped[0], self.flip_perm)
  134. if self.shift_heatmap:
  135. output_flipped[:, :, :, 1:] = output_flipped.copy()[:, :, :, 0:-1]
  136. batch_preds[0] = (batch_preds[0] + output_flipped) * 0.5
  137. # process a batch of predictions into a list of single image result
  138. preds_list = self._format_output(batch_preds)
  139. # postprocess
  140. keypoints = self.post_op(preds_list, datas)
  141. return {
  142. "input_path": [data.get("img_path", None) for data in datas],
  143. "input_img": [data["ori_img"] for data in datas],
  144. "kpts": keypoints,
  145. }
  146. @DetPredictor.register("TopDownEvalAffine")
  147. def build_topdown_affine(self, trainsize, use_udp=False):
  148. return TopDownAffine(
  149. input_size=trainsize,
  150. use_udp=use_udp if self.use_udp is None else self.use_udp,
  151. )
  152. def build_to_batch(self):
  153. return ToBatch()
  154. def build_postprocess(self):
  155. return KptPostProcess(use_dark=True)