# copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import json from ...base import BaseModel from ...base.utils.arg import CLIArgument from ...base.utils.subprocess import CompletedProcess from ....utils.misc import abspath from ....utils import logging from .config import DetConfig from .official_categories import official_categories class DetModel(BaseModel): """Object Detection Model""" def train( self, batch_size: int = None, learning_rate: float = None, epochs_iters: int = None, ips: str = None, device: str = "gpu", resume_path: str = None, dy2st: bool = False, amp: str = "OFF", num_workers: int = None, use_vdl: bool = True, save_dir: str = None, **kwargs, ) -> CompletedProcess: """train self Args: batch_size (int, optional): the train batch size value. Defaults to None. learning_rate (float, optional): the train learning rate value. Defaults to None. epochs_iters (int, optional): the train epochs value. Defaults to None. ips (str, optional): the ip addresses of nodes when using distribution. Defaults to None. device (str, optional): the running device. Defaults to 'gpu'. resume_path (str, optional): the checkpoint file path to resume training. Train from scratch if it is set to None. Defaults to None. dy2st (bool, optional): Enable dynamic to static. Defaults to False. amp (str, optional): the amp settings. Defaults to 'OFF'. num_workers (int, optional): the workers number. Defaults to None. use_vdl (bool, optional): enable VisualDL. Defaults to True. save_dir (str, optional): the directory path to save train output. Defaults to None. Returns: CompletedProcess: the result of training subprocess execution. """ config = self.config.copy() cli_args = [] if batch_size is not None: config.update_batch_size(batch_size, "train") if learning_rate is not None: config.update_learning_rate(learning_rate) if epochs_iters is not None: config.update_epochs(epochs_iters) config.update_cossch_epoch(epochs_iters) device_type, _ = self.runner.parse_device(device) config.update_device(device_type) if resume_path is not None: assert resume_path.endswith( ".pdparams" ), "resume_path should be endswith .pdparam" resume_dir = resume_path[0:-9] cli_args.append(CLIArgument("--resume", resume_dir)) if dy2st: cli_args.append(CLIArgument("--to_static")) if num_workers is not None: config.update_num_workers(num_workers) if save_dir is None: save_dir = abspath(config.get_train_save_dir()) else: save_dir = abspath(save_dir) config.update_save_dir(save_dir) if use_vdl: cli_args.append(CLIArgument("--use_vdl", use_vdl)) cli_args.append(CLIArgument("--vdl_log_dir", save_dir)) do_eval = kwargs.pop("do_eval", True) enable_ce = kwargs.pop("enable_ce", None) profile = kwargs.pop("profile", None) if profile is not None: cli_args.append(CLIArgument("--profiler_options", profile)) # Benchmarking mode settings benchmark = kwargs.pop("benchmark", None) if benchmark is not None: envs = benchmark.get("env", None) amp = benchmark.get("amp", None) do_eval = benchmark.get("do_eval", False) num_workers = benchmark.get("num_workers", None) config.update_log_ranks(device) config.update_shuffle(benchmark.get("shuffle", False)) config.update_shared_memory(benchmark.get("shared_memory", True)) config.update_print_mem_info(benchmark.get("print_mem_info", True)) if num_workers is not None: config.update_num_workers(num_workers) if amp == "O1": # TODO: ppdet only support ampO1 cli_args.append(CLIArgument("--amp")) if envs is not None: for env_name, env_value in envs.items(): os.environ[env_name] = str(env_value) # set seed to 0 for benchmark mode by enable_ce cli_args.append(CLIArgument("--enable_ce", True)) else: if amp != "OFF" and amp is not None: # TODO: consider amp is O1 or O2 in ppdet cli_args.append(CLIArgument("--amp")) if enable_ce is not None: cli_args.append(CLIArgument("--enable_ce", enable_ce)) # PDX related settings config.update({"uniform_output_enabled": True}) config.update({"pdx_model_name": self.name}) hpi_config_path = self.model_info.get("hpi_config_path", None) if hpi_config_path: hpi_config_path = hpi_config_path.as_posix() config.update({"hpi_config_path": hpi_config_path}) self._assert_empty_kwargs(kwargs) with self._create_new_config_file() as config_path: config.dump(config_path) return self.runner.train( config_path, cli_args, device, ips, save_dir, do_eval=do_eval ) def evaluate( self, weight_path: str, batch_size: int = None, ips: bool = None, device: bool = "gpu", amp: bool = "OFF", num_workers: int = None, **kwargs, ) -> CompletedProcess: """evaluate self using specified weight Args: weight_path (str): the path of model weight file to be evaluated. batch_size (int, optional): the batch size value in evaluating. Defaults to None. ips (str, optional): the ip addresses of nodes when using distribution. Defaults to None. device (str, optional): the running device. Defaults to 'gpu'. amp (str, optional): the AMP setting. Defaults to 'OFF'. num_workers (int, optional): the workers number in evaluating. Defaults to None. Returns: CompletedProcess: the result of evaluating subprocess execution. """ config = self.config.copy() cli_args = [] weight_path = abspath(weight_path) config.update_weights(weight_path) if batch_size is not None: config.update_batch_size(batch_size, "eval") device_type, device_ids = self.runner.parse_device(device) if len(device_ids) > 1: raise ValueError( f"multi-{device_type} evaluation is not supported. Please use a single {device_type}." ) config.update_device(device_type) if amp != "OFF": # TODO: consider amp is O1 or O2 in ppdet cli_args.append(CLIArgument("--amp")) if num_workers is not None: config.update_num_workers(num_workers) self._assert_empty_kwargs(kwargs) with self._create_new_config_file() as config_path: config.dump(config_path) cp = self.runner.evaluate(config_path, cli_args, device, ips) return cp def predict( self, input_path: str, weight_path: str, device: str = "gpu", save_dir: str = None, **kwargs, ) -> CompletedProcess: """predict using specified weight Args: weight_path (str): the path of model weight file used to predict. input_path (str): the path of image file to be predicted. device (str, optional): the running device. Defaults to 'gpu'. save_dir (str, optional): the directory path to save predict output. Defaults to None. Returns: CompletedProcess: the result of predicting subprocess execution. """ config = self.config.copy() cli_args = [] input_path = abspath(input_path) if os.path.isfile(input_path): cli_args.append(CLIArgument("--infer_img", input_path)) else: cli_args.append(CLIArgument("--infer_dir", input_path)) if "infer_list" in kwargs: infer_list = abspath(kwargs.get("infer_list")) cli_args.append(CLIArgument("--infer_list", infer_list)) if "visualize" in kwargs: cli_args.append(CLIArgument("--visualize", kwargs["visualize"])) if "save_results" in kwargs: cli_args.append(CLIArgument("--save_results", kwargs["save_results"])) if "save_threshold" in kwargs: cli_args.append(CLIArgument("--save_threshold", kwargs["save_threshold"])) if "rtn_im_file" in kwargs: cli_args.append(CLIArgument("--rtn_im_file", kwargs["rtn_im_file"])) weight_path = abspath(weight_path) config.update_weights(weight_path) device_type, _ = self.runner.parse_device(device) config.update_device(device_type) if save_dir is not None: save_dir = abspath(save_dir) cli_args.append(CLIArgument("--output_dir", save_dir)) self._assert_empty_kwargs(kwargs) with self._create_new_config_file() as config_path: config.dump(config_path) return self.runner.predict(config_path, cli_args, device) def export(self, weight_path: str, save_dir: str, **kwargs) -> CompletedProcess: """export the dynamic model to static model Args: weight_path (str): the model weight file path that used to export. save_dir (str): the directory path to save export output. Returns: CompletedProcess: the result of exporting subprocess execution. """ config = self.config.copy() cli_args = [] if not weight_path.startswith("http"): weight_path = abspath(weight_path) config.update_weights(weight_path) save_dir = abspath(save_dir) cli_args.append(CLIArgument("--output_dir", save_dir)) input_shape = kwargs.pop("input_shape", None) if input_shape is not None: cli_args.append( CLIArgument("-o", f"TestReader.inputs_def.image_shape={input_shape}") ) use_trt = kwargs.pop("use_trt", None) if use_trt is not None: cli_args.append(CLIArgument("-o", f"trt={bool(use_trt)}")) exclude_nms = kwargs.pop("exclude_nms", None) if exclude_nms is not None: cli_args.append(CLIArgument("-o", f"exclude_nms={bool(exclude_nms)}")) # PDX related settings config.update({"pdx_model_name": self.name}) hpi_config_path = self.model_info.get("hpi_config_path", None) if hpi_config_path: hpi_config_path = hpi_config_path.as_posix() config.update({"hpi_config_path": hpi_config_path}) if self.name in official_categories.keys(): anno_val_file = abspath(os.path.join(config.TestDataset['dataset_dir'], config.TestDataset['anno_path'])) if anno_val_file == None or (not os.path.isfile(anno_val_file)): categories = official_categories[self.name] temp_anno = {'images': [], 'annotations': [], 'categories': categories} with self._create_new_val_json_file() as anno_file: json.dump(temp_anno, open(anno_file, 'w')) config.update({"TestDataset": {"dataset_dir": '', "anno_path": anno_file}}) logging.warning(f"{self.name} does not have validate annotations, use {anno_file} default instead.") self._assert_empty_kwargs(kwargs) with self._create_new_config_file() as config_path: config.dump(config_path) return self.runner.export(config_path, cli_args, None) self._assert_empty_kwargs(kwargs) with self._create_new_config_file() as config_path: config.dump(config_path) return self.runner.export(config_path, cli_args, None) def infer( self, model_dir: str, input_path: str, device: str = "gpu", save_dir: str = None, **kwargs, ): """predict image using infernece model Args: model_dir (str): the directory path of inference model files that would use to predict. input_path (str): the path of image that would be predict. device (str, optional): the running device. Defaults to 'gpu'. save_dir (str, optional): the directory path to save output. Defaults to None. Returns: CompletedProcess: the result of infering subprocess execution. """ model_dir = abspath(model_dir) input_path = abspath(input_path) if save_dir is not None: save_dir = abspath(save_dir) cli_args = [] cli_args.append(CLIArgument("--model_dir", model_dir)) cli_args.append(CLIArgument("--image_file", input_path)) if save_dir is not None: cli_args.append(CLIArgument("--output_dir", save_dir)) device_type, _ = self.runner.parse_device(device) cli_args.append(CLIArgument("--device", device_type)) self._assert_empty_kwargs(kwargs) return self.runner.infer(cli_args, device) def compression( self, weight_path: str, batch_size: int = None, learning_rate: float = None, epochs_iters: int = None, device: str = None, use_vdl: bool = True, save_dir: str = None, **kwargs, ) -> CompletedProcess: """compression model Args: weight_path (str): the path to weight file of model. batch_size (int, optional): the batch size value of compression training. Defaults to None. learning_rate (float, optional): the learning rate value of compression training. Defaults to None. epochs_iters (int, optional): the epochs or iters of compression training. Defaults to None. device (str, optional): the device to run compression training. Defaults to 'gpu'. use_vdl (bool, optional): whether or not to use VisualDL. Defaults to True. save_dir (str, optional): the directory to save output. Defaults to None. Returns: CompletedProcess: the result of compression subprocess execution. """ weight_path = abspath(weight_path) if save_dir is None: save_dir = self.config["save_dir"] save_dir = abspath(save_dir) config = self.config.copy() cps_config = DetConfig( self.name, config_path=self.model_info["auto_compression_config_path"] ) train_cli_args = [] export_cli_args = [] cps_config.update_pretrained_weights(weight_path) if batch_size is not None: cps_config.update_batch_size(batch_size, "train") if learning_rate is not None: cps_config.update_learning_rate(learning_rate) if epochs_iters is not None: cps_config.update_epochs(epochs_iters) if device is not None: device_type, _ = self.runner.parse_device(device) config.update_device(device_type) if save_dir is not None: save_dir = abspath(config.get_train_save_dir()) else: save_dir = abspath(save_dir) cps_config.update_save_dir(save_dir) if use_vdl: train_cli_args.append(CLIArgument("--use_vdl", use_vdl)) train_cli_args.append(CLIArgument("--vdl_log_dir", save_dir)) export_cli_args.append( CLIArgument("--output_dir", os.path.join(save_dir, "export")) ) with self._create_new_config_file() as config_path: config.dump(config_path) # TODO: refactor me cps_config_path = config_path[0:-4] + "_compression" + config_path[-4:] cps_config.dump(cps_config_path) train_cli_args.append(CLIArgument("--slim_config", cps_config_path)) export_cli_args.append(CLIArgument("--slim_config", cps_config_path)) self._assert_empty_kwargs(kwargs) self.runner.compression( config_path, train_cli_args, export_cli_args, device, save_dir )