det_3d_batch_sampler.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import os
  15. import pickle
  16. import tarfile
  17. from pathlib import Path
  18. from typing import Any, Dict, List, Union
  19. from ....utils import logging
  20. from ....utils.cache import CACHE_DIR
  21. from ....utils.download import download
  22. from .base_batch_sampler import BaseBatchSampler
  23. class Det3DBatchSampler(BaseBatchSampler):
  24. def __init__(self, temp_dir) -> None:
  25. super().__init__()
  26. self.temp_dir = temp_dir
  27. # XXX: auto download for url
  28. def _download_from_url(self, in_path: str) -> str:
  29. file_name = Path(in_path).name
  30. save_path = Path(CACHE_DIR) / "predict_input" / file_name
  31. download(in_path, save_path, overwrite=True)
  32. return save_path.as_posix()
  33. @property
  34. def batch_size(self) -> int:
  35. """Gets the batch size."""
  36. return self._batch_size
  37. @batch_size.setter
  38. def batch_size(self, batch_size: int) -> None:
  39. """Sets the batch size.
  40. Args:
  41. batch_size (int): The batch size to set.
  42. """
  43. if batch_size != 1:
  44. logging.warning(
  45. "inference for 3D models only support batch_size equal to 1"
  46. )
  47. self._batch_size = batch_size
  48. def load_annotations(self, ann_file: str, data_root_dir: str) -> List[Dict]:
  49. """Load annotations from ann_file.
  50. Args:
  51. ann_file (str): Path of the annotation file.
  52. data_root_dir: (str): Path of the data root directory.
  53. Returns:
  54. list[dict]: List of annotations sorted by timestamps.
  55. """
  56. data = pickle.load(open(ann_file, "rb"))
  57. data_infos = list(sorted(data["infos"], key=lambda e: e["timestamp"]))
  58. # append root_dir to image and lidar filepaths
  59. for item in data_infos:
  60. # lidar data
  61. lidar_path = item["lidar_path"]
  62. new_lidar_path = os.path.join(data_root_dir, lidar_path)
  63. item["lidar_path"] = new_lidar_path
  64. # camera data
  65. cam_data = item["cams"]
  66. for cam_data_item_key in cam_data:
  67. cam_data_item = cam_data[cam_data_item_key]
  68. cam_data_item_path = cam_data_item["data_path"]
  69. new_cam_data_item_path = os.path.join(data_root_dir, cam_data_item_path)
  70. cam_data_item["data_path"] = new_cam_data_item_path
  71. # sweep data
  72. sweeps = item["sweeps"]
  73. for sweep_item in sweeps:
  74. sweep_item_path = sweep_item["data_path"]
  75. new_sweep_item_path = os.path.join(data_root_dir, sweep_item_path)
  76. sweep_item["data_path"] = new_sweep_item_path
  77. return data_infos
  78. def sample(self, inputs: Union[List[str], str]):
  79. if not isinstance(inputs, list):
  80. inputs = [inputs]
  81. sample_set = []
  82. for input in inputs:
  83. if isinstance(input, str):
  84. ann_path = (
  85. self._download_from_url(input)
  86. if input.startswith("http")
  87. else input
  88. )
  89. else:
  90. logging.warning(
  91. f"Not supported input data type! Only `str` is supported! So has been ignored: {input}."
  92. )
  93. # extract tar file to tempdir
  94. dataset_name = self.extract_tar(ann_path, self.temp_dir)
  95. data_root_dir = os.path.join(self.temp_dir, dataset_name)
  96. ann_pkl_path = os.path.join(data_root_dir, "nuscenes_infos_val.pkl")
  97. self.data_infos = self.load_annotations(ann_pkl_path, data_root_dir)
  98. sample_set.extend(self.data_infos)
  99. batch = []
  100. for sample in sample_set:
  101. batch.append(sample)
  102. if len(batch) == self.batch_size:
  103. yield batch
  104. batch = []
  105. if len(batch) > 0:
  106. yield batch
  107. def _rand_batch(self, data_size: int) -> List[Any]:
  108. raise NotImplementedError(
  109. "rand batch is not supported for 3D detection annotation data"
  110. )
  111. def extract_tar(self, tar_path, extract_path="."):
  112. try:
  113. memdirs = set()
  114. with tarfile.open(tar_path, "r") as tar:
  115. for member in tar.getmembers():
  116. memdir = member.name.split("/")[0]
  117. memdirs.add(memdir)
  118. tar.extract(member, path=extract_path)
  119. logging.info(f"file extract to {extract_path}")
  120. assert (
  121. len(memdirs) == 1
  122. ), "Only one base directory is allowed for 3d bev dataset!"
  123. return list(memdirs)[0]
  124. except Exception as e:
  125. logging.error(f"error occurred while extracting tar file")
  126. raise e