|
|
@@ -12,12 +12,14 @@
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
+from typing import Any, Dict, List, Optional, Union
|
|
|
import os
|
|
|
import ast
|
|
|
from pathlib import Path
|
|
|
import numpy as np
|
|
|
import pickle
|
|
|
-from typing import Any, Dict, List, Optional, Union
|
|
|
+import tarfile
|
|
|
+
|
|
|
|
|
|
from ....utils import logging
|
|
|
from ....utils.download import download
|
|
|
@@ -52,17 +54,37 @@ class Det3DBatchSampler(BaseBatchSampler):
|
|
|
)
|
|
|
self._batch_size = batch_size
|
|
|
|
|
|
- def load_annotations(self, ann_file: str) -> List[Dict]:
|
|
|
+ def load_annotations(self, ann_file: str, data_root_dir: str) -> List[Dict]:
|
|
|
"""Load annotations from ann_file.
|
|
|
|
|
|
Args:
|
|
|
ann_file (str): Path of the annotation file.
|
|
|
+ data_root_dir: (str): Path of the data root directory.
|
|
|
|
|
|
Returns:
|
|
|
list[dict]: List of annotations sorted by timestamps.
|
|
|
"""
|
|
|
data = pickle.load(open(ann_file, "rb"))
|
|
|
data_infos = list(sorted(data["infos"], key=lambda e: e["timestamp"]))
|
|
|
+ # append root_dir to image and lidar filepaths
|
|
|
+ for item in data_infos:
|
|
|
+ # lidar data
|
|
|
+ lidar_path = item["lidar_path"]
|
|
|
+ new_lidar_path = os.path.join(data_root_dir, lidar_path)
|
|
|
+ item["lidar_path"] = new_lidar_path
|
|
|
+ # camera data
|
|
|
+ cam_data = item["cams"]
|
|
|
+ for cam_data_item_key in cam_data:
|
|
|
+ cam_data_item = cam_data[cam_data_item_key]
|
|
|
+ cam_data_item_path = cam_data_item["data_path"]
|
|
|
+ new_cam_data_item_path = os.path.join(data_root_dir, cam_data_item_path)
|
|
|
+ cam_data_item["data_path"] = new_cam_data_item_path
|
|
|
+ # sweep data
|
|
|
+ sweeps = item["sweeps"]
|
|
|
+ for sweep_item in sweeps:
|
|
|
+ sweep_item_path = sweep_item["data_path"]
|
|
|
+ new_sweep_item_path = os.path.join(data_root_dir, sweep_item_path)
|
|
|
+ sweep_item["data_path"] = new_sweep_item_path
|
|
|
return data_infos
|
|
|
|
|
|
def sample(self, inputs: Union[List[str], str]):
|
|
|
@@ -81,7 +103,12 @@ class Det3DBatchSampler(BaseBatchSampler):
|
|
|
logging.warning(
|
|
|
f"Not supported input data type! Only `str` is supported! So has been ignored: {input}."
|
|
|
)
|
|
|
- self.data_infos = self.load_annotations(ann_path)
|
|
|
+ # extract tar file
|
|
|
+ tar_root_dir = os.path.dirname(ann_path)
|
|
|
+ self.extract_tar(ann_path, tar_root_dir)
|
|
|
+ data_root_dir, _ = os.path.splitext(ann_path)
|
|
|
+ ann_pkl_path = os.path.join(data_root_dir, "nuscenes_infos_val.pkl")
|
|
|
+ self.data_infos = self.load_annotations(ann_pkl_path, data_root_dir)
|
|
|
sample_set.extend(self.data_infos)
|
|
|
|
|
|
batch = []
|
|
|
@@ -98,3 +125,11 @@ class Det3DBatchSampler(BaseBatchSampler):
|
|
|
raise NotImplementedError(
|
|
|
"rand batch is not supported for 3D detection annotation data"
|
|
|
)
|
|
|
+
|
|
|
+ def extract_tar(self, tar_path, extract_path="."):
|
|
|
+ try:
|
|
|
+ with tarfile.open(tar_path, "r") as tar:
|
|
|
+ tar.extractall(path=extract_path)
|
|
|
+ print(f"file extract to {extract_path}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"error occurred while extracting tar file: {e}")
|