| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import numpy as np
- import cv2
- import os
- from typing import Generic, List, Optional, Any, Dict
- import pickle
- from ...utils.benchmark import benchmark
- class _EasyDict(dict):
- def __getattr__(self, key: str):
- if key in self:
- return self[key]
- return super().__getattr__(self, key)
- def __setattr__(self, key: str, value: Generic):
- self[key] = value
- class SampleMeta(_EasyDict):
- # yapf: disable
- __slots__ = [
- "camera_intrinsic",
- # bgr or rgb
- "image_format",
- # pillow or cv2
- "image_reader",
- # chw or hwc
- "channel_order",
- # Unique ID of the sample
- "id",
- "time_lag",
- "ref_from_curr"
- ]
- # yapf: enable
- def __init__(self, **kwargs):
- for key, value in kwargs.items():
- setattr(self, key, value)
- class Sample(_EasyDict):
- """Data structure containing sample data information"""
- _VALID_MODALITIES = ["image", "lidar", "radar", "multimodal", "multiview"]
- def __init__(self, path: str, modality: str):
- if modality not in self._VALID_MODALITIES:
- raise ValueError(
- "Only modality {} is supported, but got {}".format(
- self._VALID_MODALITIES, modality
- )
- )
- self.meta = SampleMeta()
- self.path = path
- self.data = None
- self.modality = modality.lower()
- self.bboxes_2d = None
- self.bboxes_3d = None
- self.labels = None
- self.sweeps = []
- self.attrs = None
- @benchmark.timeit_with_options(name=None, is_read_operation=True)
- class ReadNuscenesData:
- def __init__(
- self,
- dataset_root="",
- load_interval=1,
- noise_sensor_type="camera",
- drop_frames=False,
- drop_set=[0, "discrete"],
- modality="multimodal",
- extrinsics_noise=False,
- extrinsics_noise_type="single",
- ):
- self.load_interval = load_interval
- self.noise_data = None
- self.noise_sensor_type = noise_sensor_type
- self.drop_frames = drop_frames
- self.drop_ratio = drop_set[0]
- self.drop_type = drop_set[1]
- self.modality = modality
- self.extrinsics_noise = extrinsics_noise
- self.extrinsics_noise_type = extrinsics_noise_type
- self.dataset_root = dataset_root
- def get_data_info(self, info):
- """Get data info.
- Returns:
- dict: Data information that will be passed to the data \
- preprocessing pipelines. It includes the following keys:
- - sample_idx (str): Sample index.
- - pts_filename (str): Filename of point clouds.
- - sweeps (list[dict]): Infos of sweeps.
- - timestamp (float): Sample timestamp.
- - img_filename (str, optional): Image filename.
- - lidar2img (list[np.ndarray], optional): Transformations \
- from lidar to different cameras.
- - ann_info (dict): Annotation info.
- """
- sample = Sample(path=None, modality=self.modality)
- sample.sample_idx = info["token"]
- sample.meta.id = info["token"]
- sample.pts_filename = os.path.join(self.dataset_root, info["lidar_path"])
- sample.sweeps = info["sweeps"]
- sample.timestamp = info["timestamp"] / 1e6
- if self.noise_sensor_type == "lidar":
- if self.drop_frames:
- pts_filename = sample.pts_filename
- file_name = pts_filename.split("/")[-1]
- if self.noise_data[file_name]["noise"]["drop_frames"][self.drop_ratio][
- self.drop_type
- ]["stuck"]:
- replace_file = self.noise_data[file_name]["noise"]["drop_frames"][
- self.drop_ratio
- ][self.drop_type]["replace"]
- if replace_file != "":
- pts_filename = pts_filename.replace(file_name, replace_file)
- sample.pts_filename = pts_filename
- sample.sweeps = self.noise_data[replace_file]["mmdet_info"][
- "sweeps"
- ]
- sample.timestamp = (
- self.noise_data[replace_file]["mmdet_info"]["timestamp"]
- / 1e6
- )
- cam_orders = [
- "CAM_FRONT_LEFT",
- "CAM_FRONT",
- "CAM_FRONT_RIGHT",
- "CAM_BACK_RIGHT",
- "CAM_BACK",
- "CAM_BACK_LEFT",
- ]
- if self.modality == "multiview" or self.modality == "multimodal":
- image_paths = []
- lidar2img_rts = []
- caminfos = []
- for cam_type in cam_orders:
- cam_info = info["cams"][cam_type]
- cam_data_path = cam_info["data_path"]
- cam_data_path = os.path.join(self.dataset_root, cam_data_path)
- file_name = cam_data_path.split("/")[-1]
- if self.noise_sensor_type == "camera":
- if self.drop_frames:
- if self.noise_data[file_name]["noise"]["drop_frames"][
- self.drop_ratio
- ][self.drop_type]["stuck"]:
- replace_file = self.noise_data[file_name]["noise"][
- "drop_frames"
- ][self.drop_ratio][self.drop_type]["replace"]
- if replace_file != "":
- cam_data_path = cam_data_path.replace(
- file_name, replace_file
- )
- image_paths.append(cam_data_path)
- # obtain lidar to image transformation matrix
- if self.extrinsics_noise:
- sensor2lidar_rotation = self.noise_data[file_name]["noise"][
- "extrinsics_noise"
- ][f"{self.extrinsics_noise_type}_noise_sensor2lidar_rotation"]
- sensor2lidar_translation = self.noise_data[file_name]["noise"][
- "extrinsics_noise"
- ][f"{self.extrinsics_noise_type}_noise_sensor2lidar_translation"]
- else:
- sensor2lidar_rotation = cam_info["sensor2lidar_rotation"]
- sensor2lidar_translation = cam_info["sensor2lidar_translation"]
- lidar2cam_r = np.linalg.inv(sensor2lidar_rotation)
- lidar2cam_t = sensor2lidar_translation @ lidar2cam_r.T
- lidar2cam_rt = np.eye(4)
- lidar2cam_rt[:3, :3] = lidar2cam_r.T
- lidar2cam_rt[3, :3] = -lidar2cam_t
- intrinsic = cam_info["cam_intrinsic"]
- viewpad = np.eye(4)
- viewpad[: intrinsic.shape[0], : intrinsic.shape[1]] = intrinsic
- lidar2img_rt = viewpad @ lidar2cam_rt.T
- lidar2img_rts.append(lidar2img_rt)
- caminfos.append(
- {
- "sensor2lidar_translation": sensor2lidar_translation,
- "sensor2lidar_rotation": sensor2lidar_rotation,
- "cam_intrinsic": cam_info["cam_intrinsic"],
- }
- )
- sample.update(
- dict(
- img_filename=image_paths, lidar2img=lidar2img_rts, caminfo=caminfos
- )
- )
- return sample
- def prepare_test_data(self, info):
- sample = self.get_data_info(info)
- sample = self.add_new_fields(sample)
- return sample
- def add_new_fields(self, sample):
- sample["img_fields"] = []
- sample["bbox3d_fields"] = []
- sample["pts_mask_fields"] = []
- sample["pts_seg_fields"] = []
- sample["bbox_fields"] = []
- sample["mask_fields"] = []
- sample["seg_fields"] = []
- return sample
- def __call__(self, batch_data):
- return [self.prepare_test_data(data_info) for data_info in batch_data]
|