| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392 |
- # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import os
- import io
- import sys
- import abc
- import shlex
- import locale
- import asyncio
- from .utils.arg import CLIArgument
- from .utils.subprocess import run_cmd as _run_cmd, CompletedProcess
- from ...utils import logging
- from ...utils.misc import abspath
- from ...utils.device import parse_device
- from ...utils.flags import DRY_RUN
- from ...utils.errors import raise_unsupported_api_error, CalledProcessError
- __all__ = ["BaseRunner", "InferOnlyRunner"]
- class BaseRunner(metaclass=abc.ABCMeta):
- """
- Abstract base class of Runner.
- Runner is responsible for executing training/inference/compression commands.
- """
- def __init__(self, runner_root_path):
- """
- Initialize the instance.
- Args:
- runner_root_path (str): Path of the directory where the scripts reside.
- """
- super().__init__()
- self.runner_root_path = abspath(runner_root_path)
- # Path to python interpreter
- self.python = sys.executable
- def prepare(self):
- """
- Make preparations for the execution of commands.
- For example, download prerequisites and install dependencies.
- """
- # By default we do nothing
- pass
- @abc.abstractmethod
- def train(self, config_path, cli_args, device, ips, save_dir, do_eval=True):
- """
- Execute model training command.
- Args:
- config_path (str): Path of the configuration file.
- cli_args (list[base.utils.arg.CLIArgument]): List of command-line
- arguments.
- device (str): A string that describes the device(s) to use, e.g.,
- 'cpu', 'xpu:0', 'gpu:1,2'.
- ips (str|None): Paddle cluster node ips, e.g.,
- '192.168.0.16,192.168.0.17'.
- save_dir (str): Directory to save log files.
- do_eval (bool, optional): Whether to perform model evaluation during
- training. Default: True.
- Returns:
- paddlex.repo_apis.base.utils.subprocess.CompletedProcess
- """
- raise NotImplementedError
- @abc.abstractmethod
- def evaluate(self, config_path, cli_args, device, ips):
- """
- Execute model evaluation command.
- Args:
- config_path (str): Path of the configuration file.
- cli_args (list[base.utils.arg.CLIArgument]): List of command-line
- arguments.
- device (str): A string that describes the device(s) to use, e.g.,
- 'cpu', 'xpu:0', 'gpu:1,2'.
- ips (str|None): Paddle cluster node ips, e.g.,
- '192.168.0.16,192.168.0.17'.
- Returns:
- paddlex.repo_apis.base.utils.subprocess.CompletedProcess
- """
- raise NotImplementedError
- @abc.abstractmethod
- def predict(self, config_path, cli_args, device):
- """
- Execute prediction command.
- Args:
- config_path (str): Path of the configuration file.
- cli_args (list[base.utils.arg.CLIArgument]): List of command-line
- arguments.
- device (str): A string that describes the device(s) to use, e.g.,
- 'cpu', 'xpu:0', 'gpu:1,2'.
- Returns:
- paddlex.repo_apis.base.utils.subprocess.CompletedProcess
- """
- raise NotImplementedError
- @abc.abstractmethod
- def export(self, config_path, cli_args, device):
- """
- Execute model export command.
- Args:
- config_path (str): Path of the configuration file.
- cli_args (list[base.utils.arg.CLIArgument]): List of command-line
- arguments.
- device (str): A string that describes the device(s) to use, e.g.,
- 'cpu', 'xpu:0', 'gpu:1,2'.
- Returns:
- paddlex.repo_apis.base.utils.subprocess.CompletedProcess
- """
- raise NotImplementedError
- @abc.abstractmethod
- def infer(self, config_path, cli_args, device):
- """
- Execute model inference command.
- Args:
- config_path (str): Path of the configuration file.
- cli_args (list[base.utils.arg.CLIArgument]): List of command-line
- arguments.
- device (str): A string that describes the device(s) to use, e.g.,
- 'cpu', 'xpu:0', 'gpu:1,2'.
- Returns:
- paddlex.repo_apis.base.utils.subprocess.CompletedProcess
- """
- raise NotImplementedError
- @abc.abstractmethod
- def compression(
- self, config_path, train_cli_args, export_cli_args, device, train_save_dir
- ):
- """
- Execute model compression (quantization aware training and model export)
- commands.
- Args:
- config_path (str): Path of the configuration file.
- train_cli_args (list[base.utils.arg.CLIArgument]): List of
- command-line arguments used for model training.
- export_cli_args (list[base.utils.arg.CLIArgument]): List of
- command-line arguments used for model export.
- device (str): A string that describes the device(s) to use, e.g.,
- 'cpu', 'xpu:0', 'gpu:1,2'.
- train_save_dir (str): Directory to store model snapshots.
- Returns:
- tuple[paddlex.repo_apis.base.utils.subprocess.CompletedProcess]
- """
- raise NotImplementedError
- def distributed(self, device, ips=None, log_dir=None):
- """distributed"""
- # TODO: docstring
- args = [self.python]
- if device is None:
- return args, None
- device, dev_ids = parse_device(device)
- if dev_ids is None or len(dev_ids) == 0:
- return args, None
- else:
- num_devices = len(dev_ids)
- dev_ids = ",".join([str(n) for n in dev_ids])
- if num_devices > 1:
- args.extend(["-m", "paddle.distributed.launch"])
- args.extend(["--devices", dev_ids])
- if ips is not None:
- args.extend(["--ips", ips])
- if log_dir is None:
- log_dir = os.getcwd()
- args.extend(["--log_dir", self._get_dist_train_log_dir(log_dir)])
- elif num_devices == 1:
- new_env = os.environ.copy()
- if device == "xpu":
- new_env["XPU_VISIBLE_DEVICES"] = dev_ids
- elif device == "npu":
- new_env["ASCEND_RT_VISIBLE_DEVICES"] = dev_ids
- elif device == "mlu":
- new_env["MLU_VISIBLE_DEVICES"] = dev_ids
- elif device == "gcu":
- new_env["TOPS_VISIBLE_DEVICES"] = dev_ids
- else:
- new_env["CUDA_VISIBLE_DEVICES"] = dev_ids
- return args, new_env
- return args, None
- def run_cmd(
- self,
- cmd,
- env=None,
- switch_wdir=True,
- silent=False,
- echo=True,
- capture_output=False,
- log_path=None,
- ):
- """run_cmd"""
- def _trans_args(cmd):
- out = []
- for ele in cmd:
- if isinstance(ele, CLIArgument):
- out.extend(ele.lst)
- else:
- out.append(ele)
- return out
- cmd = _trans_args(cmd)
- if DRY_RUN:
- # TODO: Accommodate Windows system
- logging.info(" ".join(shlex.quote(x) for x in cmd))
- # Mock return
- return CompletedProcess(cmd, returncode=0, stdout=str(cmd), stderr=None)
- if switch_wdir:
- if isinstance(switch_wdir, str):
- # In this case `switch_wdir` specifies a relative path
- cwd = os.path.join(self.runner_root_path, switch_wdir)
- else:
- cwd = self.runner_root_path
- else:
- cwd = None
- if not capture_output:
- if log_path is not None:
- logging.warning(
- "`log_path` will be ignored when `capture_output` is False."
- )
- cp = _run_cmd(
- cmd,
- env=env,
- cwd=cwd,
- silent=silent,
- echo=echo,
- pipe_stdout=False,
- pipe_stderr=False,
- blocking=True,
- )
- cp = CompletedProcess(
- cp.args, cp.returncode, stdout=cp.stdout, stderr=cp.stderr
- )
- else:
- # Refer to
- # https://stackoverflow.com/questions/17190221/subprocess-popen-cloning-stdout-and-stderr-both-to-terminal-and-variables/25960956
- async def _read_display_and_record_from_stream(
- in_stream, out_stream, files
- ):
- # According to
- # https://docs.python.org/3/library/subprocess.html#frequently-used-arguments
- _ENCODING = locale.getpreferredencoding(False)
- chars = []
- out_stream_is_buffered = hasattr(out_stream, "buffer")
- while True:
- flush = False
- char = await in_stream.read(1)
- if char == b"":
- break
- if out_stream_is_buffered:
- out_stream.buffer.write(char)
- chars.append(char)
- if char == b"\n":
- flush = True
- elif char == b"\r":
- # NOTE: In order to get tqdm progress bars to produce normal outputs
- # we treat '\r' as an ending character of line
- flush = True
- if flush:
- line = b"".join(chars)
- line = line.decode(_ENCODING)
- if not out_stream_is_buffered:
- # We use line buffering
- out_stream.write(line)
- else:
- out_stream.buffer.flush()
- for f in files:
- f.write(line)
- chars.clear()
- async def _tee_proc_call(proc_call, out_files, err_files):
- proc = await proc_call
- await asyncio.gather(
- _read_display_and_record_from_stream(
- proc.stdout, sys.stdout, out_files
- ),
- _read_display_and_record_from_stream(
- proc.stderr, sys.stderr, err_files
- ),
- )
- # NOTE: https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait
- retcode = await proc.wait()
- return retcode
- # Non-blocking call with stdout and stderr piped
- with io.StringIO() as stdout_buf, io.StringIO() as stderr_buf:
- proc_call = _run_cmd(
- cmd,
- env=env,
- cwd=cwd,
- echo=echo,
- silent=silent,
- pipe_stdout=True,
- pipe_stderr=True,
- blocking=False,
- async_run=True,
- )
- out_files = [stdout_buf]
- err_files = [stderr_buf]
- if log_path is not None:
- log_dir = os.path.dirname(log_path)
- os.makedirs(log_dir, exist_ok=True)
- log_file = open(log_path, "w", encoding="utf-8")
- logging.info(f"\nLog path: {os.path.abspath(log_path)} \n")
- out_files.append(log_file)
- err_files.append(log_file)
- try:
- retcode = asyncio.run(
- _tee_proc_call(proc_call, out_files, err_files)
- )
- finally:
- if log_path is not None:
- log_file.close()
- cp = CompletedProcess(
- cmd, retcode, stdout_buf.getvalue(), stderr_buf.getvalue()
- )
- if cp.returncode != 0:
- raise CalledProcessError(
- cp.returncode, cp.args, output=cp.stdout, stderr=cp.stderr
- )
- return cp
- def _get_dist_train_log_dir(self, log_dir):
- """_get_dist_train_log_dir"""
- return os.path.join(log_dir, "distributed_train_logs")
- def _get_train_log_path(self, log_dir):
- """_get_train_log_path"""
- return os.path.join(log_dir, "train.log")
- class InferOnlyRunner(BaseRunner):
- """InferOnlyRunner"""
- def train(self, *args, **kwargs):
- """train"""
- raise_unsupported_api_error(self.__class__, "train")
- def evaluate(self, *args, **kwargs):
- """evaluate"""
- raise_unsupported_api_error(self.__class__, "evalaute")
- def predict(self, *args, **kwargs):
- """predict"""
- raise_unsupported_api_error(self.__class__, "predict")
- def export(self, *args, **kwargs):
- """export"""
- raise_unsupported_api_error(self.__class__, "export")
- def compression(self, *args, **kwargs):
- """compression"""
- raise_unsupported_api_error(self.__class__, "compression")
|