det_3d_batch_sampler.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Any, Dict, List, Optional, Union
  15. import os
  16. import ast
  17. from pathlib import Path
  18. import numpy as np
  19. import pickle
  20. import tarfile
  21. from ....utils import logging
  22. from ....utils.download import download
  23. from ....utils.cache import CACHE_DIR
  24. from .base_batch_sampler import BaseBatchSampler
  25. class Det3DBatchSampler(BaseBatchSampler):
  26. # XXX: auto download for url
  27. def _download_from_url(self, in_path: str) -> str:
  28. file_name = Path(in_path).name
  29. save_path = Path(CACHE_DIR) / "predict_input" / file_name
  30. download(in_path, save_path, overwrite=True)
  31. return save_path.as_posix()
  32. @property
  33. def batch_size(self) -> int:
  34. """Gets the batch size."""
  35. return self._batch_size
  36. @batch_size.setter
  37. def batch_size(self, batch_size: int) -> None:
  38. """Sets the batch size.
  39. Args:
  40. batch_size (int): The batch size to set.
  41. """
  42. if batch_size != 1:
  43. logging.warning(
  44. "inference for 3D models only support batch_size equal to 1"
  45. )
  46. self._batch_size = batch_size
  47. def load_annotations(self, ann_file: str, data_root_dir: str) -> List[Dict]:
  48. """Load annotations from ann_file.
  49. Args:
  50. ann_file (str): Path of the annotation file.
  51. data_root_dir: (str): Path of the data root directory.
  52. Returns:
  53. list[dict]: List of annotations sorted by timestamps.
  54. """
  55. data = pickle.load(open(ann_file, "rb"))
  56. data_infos = list(sorted(data["infos"], key=lambda e: e["timestamp"]))
  57. # append root_dir to image and lidar filepaths
  58. for item in data_infos:
  59. # lidar data
  60. lidar_path = item["lidar_path"]
  61. new_lidar_path = os.path.join(data_root_dir, lidar_path)
  62. item["lidar_path"] = new_lidar_path
  63. # camera data
  64. cam_data = item["cams"]
  65. for cam_data_item_key in cam_data:
  66. cam_data_item = cam_data[cam_data_item_key]
  67. cam_data_item_path = cam_data_item["data_path"]
  68. new_cam_data_item_path = os.path.join(data_root_dir, cam_data_item_path)
  69. cam_data_item["data_path"] = new_cam_data_item_path
  70. # sweep data
  71. sweeps = item["sweeps"]
  72. for sweep_item in sweeps:
  73. sweep_item_path = sweep_item["data_path"]
  74. new_sweep_item_path = os.path.join(data_root_dir, sweep_item_path)
  75. sweep_item["data_path"] = new_sweep_item_path
  76. return data_infos
  77. def sample(self, inputs: Union[List[str], str]):
  78. if not isinstance(inputs, list):
  79. inputs = [inputs]
  80. sample_set = []
  81. for input in inputs:
  82. if isinstance(input, str):
  83. ann_path = (
  84. self._download_from_url(input)
  85. if input.startswith("http")
  86. else input
  87. )
  88. else:
  89. logging.warning(
  90. f"Not supported input data type! Only `str` is supported! So has been ignored: {input}."
  91. )
  92. # extract tar file
  93. tar_root_dir = os.path.dirname(ann_path)
  94. self.extract_tar(ann_path, tar_root_dir)
  95. data_root_dir, _ = os.path.splitext(ann_path)
  96. ann_pkl_path = os.path.join(data_root_dir, "nuscenes_infos_val.pkl")
  97. self.data_infos = self.load_annotations(ann_pkl_path, data_root_dir)
  98. sample_set.extend(self.data_infos)
  99. batch = []
  100. for sample in sample_set:
  101. batch.append(sample)
  102. if len(batch) == self.batch_size:
  103. yield batch
  104. batch = []
  105. if len(batch) > 0:
  106. yield batch
  107. def _rand_batch(self, data_size: int) -> List[Any]:
  108. raise NotImplementedError(
  109. "rand batch is not supported for 3D detection annotation data"
  110. )
  111. def extract_tar(self, tar_path, extract_path="."):
  112. try:
  113. with tarfile.open(tar_path, "r") as tar:
  114. tar.extractall(path=extract_path)
  115. print(f"file extract to {extract_path}")
  116. except Exception as e:
  117. print(f"error occurred while extracting tar file: {e}")