det_3d_batch_sampler.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Any, Dict, List, Optional, Union
  15. import os
  16. import ast
  17. from pathlib import Path
  18. import numpy as np
  19. import pickle
  20. import tarfile
  21. from ....utils import logging
  22. from ....utils.download import download
  23. from ....utils.cache import CACHE_DIR
  24. from .base_batch_sampler import BaseBatchSampler
  25. class Det3DBatchSampler(BaseBatchSampler):
  26. def __init__(self, temp_dir) -> None:
  27. super().__init__()
  28. self.temp_dir = temp_dir
  29. # XXX: auto download for url
  30. def _download_from_url(self, in_path: str) -> str:
  31. file_name = Path(in_path).name
  32. save_path = Path(CACHE_DIR) / "predict_input" / file_name
  33. download(in_path, save_path, overwrite=True)
  34. return save_path.as_posix()
  35. @property
  36. def batch_size(self) -> int:
  37. """Gets the batch size."""
  38. return self._batch_size
  39. @batch_size.setter
  40. def batch_size(self, batch_size: int) -> None:
  41. """Sets the batch size.
  42. Args:
  43. batch_size (int): The batch size to set.
  44. """
  45. if batch_size != 1:
  46. logging.warning(
  47. "inference for 3D models only support batch_size equal to 1"
  48. )
  49. self._batch_size = batch_size
  50. def load_annotations(self, ann_file: str, data_root_dir: str) -> List[Dict]:
  51. """Load annotations from ann_file.
  52. Args:
  53. ann_file (str): Path of the annotation file.
  54. data_root_dir: (str): Path of the data root directory.
  55. Returns:
  56. list[dict]: List of annotations sorted by timestamps.
  57. """
  58. data = pickle.load(open(ann_file, "rb"))
  59. data_infos = list(sorted(data["infos"], key=lambda e: e["timestamp"]))
  60. # append root_dir to image and lidar filepaths
  61. for item in data_infos:
  62. # lidar data
  63. lidar_path = item["lidar_path"]
  64. new_lidar_path = os.path.join(data_root_dir, lidar_path)
  65. item["lidar_path"] = new_lidar_path
  66. # camera data
  67. cam_data = item["cams"]
  68. for cam_data_item_key in cam_data:
  69. cam_data_item = cam_data[cam_data_item_key]
  70. cam_data_item_path = cam_data_item["data_path"]
  71. new_cam_data_item_path = os.path.join(data_root_dir, cam_data_item_path)
  72. cam_data_item["data_path"] = new_cam_data_item_path
  73. # sweep data
  74. sweeps = item["sweeps"]
  75. for sweep_item in sweeps:
  76. sweep_item_path = sweep_item["data_path"]
  77. new_sweep_item_path = os.path.join(data_root_dir, sweep_item_path)
  78. sweep_item["data_path"] = new_sweep_item_path
  79. return data_infos
  80. def sample(self, inputs: Union[List[str], str]):
  81. if not isinstance(inputs, list):
  82. inputs = [inputs]
  83. sample_set = []
  84. for input in inputs:
  85. if isinstance(input, str):
  86. ann_path = (
  87. self._download_from_url(input)
  88. if input.startswith("http")
  89. else input
  90. )
  91. else:
  92. logging.warning(
  93. f"Not supported input data type! Only `str` is supported! So has been ignored: {input}."
  94. )
  95. # extract tar file to tempdir
  96. self.extract_tar(ann_path, self.temp_dir)
  97. dataset_name = os.path.basename(ann_path).split(".")[0]
  98. data_root_dir = os.path.join(self.temp_dir, dataset_name)
  99. ann_pkl_path = os.path.join(data_root_dir, "nuscenes_infos_val.pkl")
  100. self.data_infos = self.load_annotations(ann_pkl_path, data_root_dir)
  101. sample_set.extend(self.data_infos)
  102. batch = []
  103. for sample in sample_set:
  104. batch.append(sample)
  105. if len(batch) == self.batch_size:
  106. yield batch
  107. batch = []
  108. if len(batch) > 0:
  109. yield batch
  110. def _rand_batch(self, data_size: int) -> List[Any]:
  111. raise NotImplementedError(
  112. "rand batch is not supported for 3D detection annotation data"
  113. )
  114. def extract_tar(self, tar_path, extract_path="."):
  115. try:
  116. with tarfile.open(tar_path, "r") as tar:
  117. for member in tar.getmembers():
  118. tar.extract(member, path=extract_path)
  119. print(f"file extract to {extract_path}")
  120. except Exception as e:
  121. print(f"error occurred while extracting tar file: {e}")