# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import enum import json from pathlib import Path import numpy as np import pandas as pd import yaml from PIL import Image from ....utils.deps import class_requires_deps, is_dep_available from .tablepyxl import document_to_xl if is_dep_available("opencv-contrib-python"): import cv2 __all__ = [ "WriterType", "ImageWriter", "TextWriter", "JsonWriter", "CSVWriter", "HtmlWriter", "XlsxWriter", "YAMLWriter", "VideoWriter", "MarkdownWriter", ] class WriterType(enum.Enum): """WriterType""" IMAGE = 1 VIDEO = 2 TEXT = 3 JSON = 4 HTML = 5 XLSX = 6 CSV = 7 YAML = 8 MARKDOWN = 9 TXT = 10 class _BaseWriter(object): """_BaseWriter""" def __init__(self, backend, **bk_args): super().__init__() if len(bk_args) == 0: bk_args = self.get_default_backend_args() self.bk_type = backend self.bk_args = bk_args self._backend = self.get_backend() def write(self, out_path, obj): """write""" raise NotImplementedError def get_backend(self, bk_args=None): """get backend""" if bk_args is None: bk_args = self.bk_args return self._init_backend(self.bk_type, bk_args) def set_backend(self, backend, **bk_args): self.bk_type = backend self.bk_args = bk_args self._backend = self.get_backend() def _init_backend(self, bk_type, bk_args): """init backend""" raise NotImplementedError def get_type(self): """get type""" raise NotImplementedError def get_default_backend_args(self): """get default backend arguments""" return {} class ImageWriter(_BaseWriter): """ImageWriter""" def __init__(self, backend="opencv", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj): """write""" return self._backend.write_obj(str(out_path), obj) def _init_backend(self, bk_type, bk_args): """init backend""" if bk_type == "opencv": return OpenCVImageWriterBackend(**bk_args) elif bk_type == "pil" or bk_type == "pillow": return PILImageWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.IMAGE class VideoWriter(_BaseWriter): """VideoWriter""" def __init__(self, backend="opencv", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj): """write""" return self._backend.write_obj(str(out_path), obj) def _init_backend(self, bk_type, bk_args): """init backend""" if bk_type == "opencv": return OpenCVVideoWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.VIDEO class TextWriter(_BaseWriter): """TextWriter""" def __init__(self, backend="python", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj): """write""" return self._backend.write_obj(str(out_path), obj) def _init_backend(self, bk_type, bk_args): """init backend""" if bk_type == "python": return TextWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.TEXT class JsonWriter(_BaseWriter): def __init__(self, backend="json", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj, **bk_args): return self._backend.write_obj(str(out_path), obj, **bk_args) def _init_backend(self, bk_type, bk_args): if bk_type == "json": return JsonWriterBackend(**bk_args) elif bk_type == "ujson": return UJsonWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.JSON class HtmlWriter(_BaseWriter): def __init__(self, backend="html", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj, **bk_args): return self._backend.write_obj(str(out_path), obj, **bk_args) def _init_backend(self, bk_type, bk_args): if bk_type == "html": return HtmlWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.HTML class XlsxWriter(_BaseWriter): def __init__(self, backend="xlsx", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj, **bk_args): return self._backend.write_obj(str(out_path), obj, **bk_args) def _init_backend(self, bk_type, bk_args): if bk_type == "xlsx": return XlsxWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.XLSX class YAMLWriter(_BaseWriter): def __init__(self, backend="PyYAML", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj, **bk_args): return self._backend.write_obj(str(out_path), obj, **bk_args) def _init_backend(self, bk_type, bk_args): if bk_type == "PyYAML": return YAMLWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.YAML class MarkdownWriter(_BaseWriter): """MarkdownWriter""" def __init__(self, backend="markdown", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj): """write""" return self._backend.write_obj(str(out_path), obj) def _init_backend(self, bk_type, bk_args): """init backend""" if bk_type == "markdown": return MarkdownWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.MARKDOWN class _BaseWriterBackend(object): """_BaseWriterBackend""" def write_obj(self, out_path, obj, **bk_args): """write object""" Path(out_path).parent.mkdir(parents=True, exist_ok=True) return self._write_obj(out_path, obj, **bk_args) def _write_obj(self, out_path, obj, **bk_args): """write object""" raise NotImplementedError class TextWriterBackend(_BaseWriterBackend): """TextWriterBackend""" def __init__(self, mode="w", encoding="utf-8"): super().__init__() self.mode = mode self.encoding = encoding def _write_obj(self, out_path, obj): """write text object""" with open(out_path, mode=self.mode, encoding=self.encoding) as f: f.write(obj) class HtmlWriterBackend(_BaseWriterBackend): def __init__(self, mode="w", encoding="utf-8"): super().__init__() self.mode = mode self.encoding = encoding def _write_obj(self, out_path, obj, **bk_args): with open(out_path, mode=self.mode, encoding=self.encoding) as f: f.write(obj) class XlsxWriterBackend(_BaseWriterBackend): def _write_obj(self, out_path, obj, **bk_args): document_to_xl(obj, out_path) class _ImageWriterBackend(_BaseWriterBackend): """_ImageWriterBackend""" @class_requires_deps("opencv-contrib-python") class OpenCVImageWriterBackend(_ImageWriterBackend): """OpenCVImageWriterBackend""" def _write_obj(self, out_path, obj): """write image object by OpenCV""" if isinstance(obj, Image.Image): # Assuming the channel order is RGB. arr = np.asarray(obj)[:, :, ::-1] elif isinstance(obj, np.ndarray): arr = obj else: raise TypeError("Unsupported object type") return cv2.imwrite(out_path, arr) class PILImageWriterBackend(_ImageWriterBackend): """PILImageWriterBackend""" def __init__(self, format_=None): super().__init__() self.format = format_ def _write_obj(self, out_path, obj): """write image object by PIL""" if isinstance(obj, Image.Image): img = obj elif isinstance(obj, np.ndarray): img = Image.fromarray(obj) else: raise TypeError("Unsupported object type") if len(img.getbands()) == 4: self.format = "PNG" return img.save(out_path, format=self.format) class _VideoWriterBackend(_BaseWriterBackend): """_VideoWriterBackend""" @class_requires_deps("opencv-contrib-python") class OpenCVVideoWriterBackend(_VideoWriterBackend): """OpenCVImageWriterBackend""" def _write_obj(self, out_path, obj): """write video object by OpenCV""" obj, fps = obj if isinstance(obj, np.ndarray): vr = obj width, height = vr[0].shape[1], vr[0].shape[0] fourcc = cv2.VideoWriter_fourcc(*"mp4v") # Alternatively, use 'XVID' out = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) for frame in vr: out.write(frame) out.release() else: raise TypeError("Unsupported object type") class _BaseJsonWriterBackend(object): def __init__(self, indent=4, ensure_ascii=False): super().__init__() self.indent = indent self.ensure_ascii = ensure_ascii def write_obj(self, out_path, obj, **bk_args): Path(out_path).parent.mkdir(parents=True, exist_ok=True) return self._write_obj(out_path, obj, **bk_args) def _write_obj(self, out_path, obj): raise NotImplementedError class JsonWriterBackend(_BaseJsonWriterBackend): def _write_obj(self, out_path, obj, **bk_args): with open(out_path, "w", encoding="utf-8") as f: json.dump(obj, f, **bk_args) class UJsonWriterBackend(_BaseJsonWriterBackend): # TODO def _write_obj(self, out_path, obj, **bk_args): raise NotImplementedError class YAMLWriterBackend(_BaseWriterBackend): def __init__(self, mode="w", encoding="utf-8"): super().__init__() self.mode = mode self.encoding = encoding def _write_obj(self, out_path, obj, **bk_args): """write text object""" with open(out_path, mode=self.mode, encoding=self.encoding) as f: yaml.dump(obj, f, **bk_args) class CSVWriter(_BaseWriter): """CSVWriter""" def __init__(self, backend="pandas", **bk_args): super().__init__(backend=backend, **bk_args) def write(self, out_path, obj): """write""" return self._backend.write_obj(str(out_path), obj) def _init_backend(self, bk_type, bk_args): """init backend""" if bk_type == "pandas": return PandasCSVWriterBackend(**bk_args) else: raise ValueError("Unsupported backend type") def get_type(self): """get type""" return WriterType.CSV class _CSVWriterBackend(_BaseWriterBackend): """_CSVWriterBackend""" class PandasCSVWriterBackend(_CSVWriterBackend): """PILImageWriterBackend""" def __init__(self): super().__init__() def _write_obj(self, out_path, obj): """write image object by PIL""" if isinstance(obj, pd.DataFrame): ts = obj else: raise TypeError("Unsupported object type") return ts.to_csv(out_path) class MarkdownWriterBackend(_BaseWriterBackend): """MarkdownWriterBackend""" def __init__(self): super().__init__() def _write_obj(self, out_path, obj): """write markdown obj""" with open(out_path, mode="w", encoding="utf-8", errors="replace") as f: f.write(obj)