Эх сурвалжийг харах

support tostr, tojson in formula pipeline (#2842)

* support tostr, tojson in formula pipeline

* adjust formula pipeline to alilgn ocr

* add video and formula param in cli
liuhongen1234567 10 сар өмнө
parent
commit
c2d7a8ff62

+ 4 - 2
api_examples/pipelines/test_formula_recognition.py

@@ -39,5 +39,7 @@ output = pipeline.predict(
 # )
 
 for res in output:
-    # res.save_to_img("./output/")
-    res.save_results("./output")
+    print(res)
+    res.print()
+    res.save_to_img("./output")
+    res.save_to_json("./output")

+ 84 - 74
paddlex/inference/pipelines_new/formula_recognition/pipeline.py

@@ -17,10 +17,7 @@ from typing import Any, Dict, Optional
 import numpy as np
 import cv2
 from ..base import BasePipeline
-from ..components import CropByBoxes
-
-# from ..layout_parsing.utils import convert_points_to_boxes
-from ..components import convert_points_to_boxes
+from ..components import CropByBoxes, convert_points_to_boxes
 
 from .result import FormulaRecognitionResult
 from ...models_new.formula_recognition.result import (
@@ -50,7 +47,7 @@ class FormulaRecognitionPipeline(BasePipeline):
         use_hpip: bool = False,
         hpi_params: Optional[Dict[str, Any]] = None,
     ) -> None:
-        """Initializes the layout parsing pipeline.
+        """Initializes the formula recognition pipeline.
 
         Args:
             config (Dict): Configuration dictionary containing various settings.
@@ -64,24 +61,31 @@ class FormulaRecognitionPipeline(BasePipeline):
             device=device, pp_option=pp_option, use_hpip=use_hpip, hpi_params=hpi_params
         )
 
-        self.use_doc_preprocessor = False
-        if "use_doc_preprocessor" in config:
-            self.use_doc_preprocessor = config["use_doc_preprocessor"]
-
+        self.use_doc_preprocessor = config.get("use_doc_preprocessor", True)
         if self.use_doc_preprocessor:
-            doc_preprocessor_config = config["SubPipelines"]["DocPreprocessor"]
+            doc_preprocessor_config = config.get("SubPipelines", {}).get(
+                "DocPreprocessor",
+                {
+                    "pipeline_config_error": "config error for doc_preprocessor_pipeline!"
+                },
+            )
             self.doc_preprocessor_pipeline = self.create_pipeline(
                 doc_preprocessor_config
             )
 
-        self.use_layout_detection = True
-        if "use_layout_detection" in config:
-            self.use_layout_detection = config["use_layout_detection"]
+        self.use_layout_detection = config.get("use_layout_detection", True)
+
         if self.use_layout_detection:
-            layout_det_config = config["SubModules"]["LayoutDetection"]
+            layout_det_config = config.get("SubModules", {}).get(
+                "LayoutDetection",
+                {"model_config_error": "config error for layout_det_model!"},
+            )
             self.layout_det_model = self.create_model(layout_det_config)
 
-        formula_recognition_config = config["SubModules"]["FormulaRecognition"]
+        formula_recognition_config = config.get("SubModules", {}).get(
+            "FormulaRecognition",
+            {"model_config_error": "config error for formula_rec_model!"},
+        )
         self.formula_recognition_model = self.create_model(formula_recognition_config)
 
         self._crop_by_boxes = CropByBoxes()
@@ -89,26 +93,56 @@ class FormulaRecognitionPipeline(BasePipeline):
         self.batch_sampler = ImageBatchSampler(batch_size=1)
         self.img_reader = ReadImage(format="BGR")
 
-    def check_input_params_valid(
-        self, input_params: Dict, layout_det_res: DetResult
+    def get_model_settings(
+        self,
+        use_doc_orientation_classify: Optional[bool],
+        use_doc_unwarping: Optional[bool],
+        use_layout_detection: Optional[bool],
+    ) -> dict:
+        """
+        Get the model settings based on the provided parameters or default values.
+
+        Args:
+            use_doc_orientation_classify (Optional[bool]): Whether to use document orientation classification.
+            use_doc_unwarping (Optional[bool]): Whether to use document unwarping.
+            use_layout_detection (Optional[bool]): Whether to use layout detection.
+
+        Returns:
+            dict: A dictionary containing the model settings.
+        """
+        if use_doc_orientation_classify is None and use_doc_unwarping is None:
+            use_doc_preprocessor = self.use_doc_preprocessor
+        else:
+            use_doc_preprocessor = True
+
+        if use_layout_detection is None:
+            use_layout_detection = self.use_layout_detection
+
+        return dict(
+            use_doc_preprocessor=use_doc_preprocessor,
+            use_layout_detection=use_layout_detection,
+        )
+
+    def check_model_settings_valid(
+        self, model_settings: Dict, layout_det_res: DetResult
     ) -> bool:
         """
         Check if the input parameters are valid based on the initialized models.
 
         Args:
-            input_params (Dict): A dictionary containing input parameters.
+            model_settings (Dict): A dictionary containing input parameters.
             layout_det_res (DetResult): The layout detection result.
         Returns:
             bool: True if all required models are initialized according to input parameters, False otherwise.
         """
 
-        if input_params["use_doc_preprocessor"] and not self.use_doc_preprocessor:
+        if model_settings["use_doc_preprocessor"] and not self.use_doc_preprocessor:
             logging.error(
                 "Set use_doc_preprocessor, but the models for doc preprocessor are not initialized."
             )
             return False
 
-        if input_params["use_layout_detection"]:
+        if model_settings["use_layout_detection"]:
             if layout_det_res is not None:
                 logging.error(
                     "The layout detection model has already been initialized, please set use_layout_detection=False"
@@ -123,36 +157,6 @@ class FormulaRecognitionPipeline(BasePipeline):
 
         return True
 
-    def predict_doc_preprocessor_res(
-        self, image_array: np.ndarray, input_params: dict
-    ) -> tuple[DocPreprocessorResult, np.ndarray]:
-        """
-        Preprocess the document image based on input parameters.
-
-        Args:
-            image_array (np.ndarray): The input image array.
-            input_params (dict): Dictionary containing preprocessing parameters.
-
-        Returns:
-            tuple[DocPreprocessorResult, np.ndarray]: A tuple containing the preprocessing
-                                              result dictionary and the processed image array.
-        """
-        if input_params["use_doc_preprocessor"]:
-            use_doc_orientation_classify = input_params["use_doc_orientation_classify"]
-            use_doc_unwarping = input_params["use_doc_unwarping"]
-            doc_preprocessor_res = next(
-                self.doc_preprocessor_pipeline(
-                    image_array,
-                    use_doc_orientation_classify=use_doc_orientation_classify,
-                    use_doc_unwarping=use_doc_unwarping,
-                )
-            )
-            doc_preprocessor_image = doc_preprocessor_res["output_img"]
-        else:
-            doc_preprocessor_res = {}
-            doc_preprocessor_image = image_array
-        return doc_preprocessor_res, doc_preprocessor_image
-
     def predict_single_formula_recognition_res(
         self,
         image_array: np.ndarray,
@@ -179,7 +183,7 @@ class FormulaRecognitionPipeline(BasePipeline):
         use_doc_orientation_classify: bool = False,
         use_doc_unwarping: bool = False,
         layout_det_res: DetResult = None,
-        **kwargs
+        **kwargs,
     ) -> FormulaRecognitionResult:
         """
         This function predicts the layout parsing result for the given input.
@@ -197,34 +201,41 @@ class FormulaRecognitionPipeline(BasePipeline):
             formulaRecognitionResult: The predicted formula recognition result.
         """
 
-        input_params = {
-            "use_layout_detection": use_layout_detection,
-            "use_doc_preprocessor": self.use_doc_preprocessor,
-            "use_doc_orientation_classify": use_doc_orientation_classify,
-            "use_doc_unwarping": use_doc_unwarping,
-        }
-
-        if use_doc_orientation_classify or use_doc_unwarping:
-            input_params["use_doc_preprocessor"] = True
-        else:
-            input_params["use_doc_preprocessor"] = False
+        model_settings = self.get_model_settings(
+            use_doc_orientation_classify,
+            use_doc_unwarping,
+            use_layout_detection,
+        )
 
-        if not self.check_input_params_valid(input_params, layout_det_res):
-            yield None
+        if not self.check_model_settings_valid(model_settings, layout_det_res):
+            yield {"error": "the input params for model settings are invalid!"}
 
         for img_id, batch_data in enumerate(self.batch_sampler(input)):
+            if not isinstance(batch_data[0], str):
+                # TODO: add support input_pth for ndarray and pdf
+                input_path = f"{img_id}.jpg"
+            else:
+                input_path = batch_data[0]
+
             image_array = self.img_reader(batch_data)[0]
-            input_path = batch_data[0]
-            img_id += 1
 
-            doc_preprocessor_res, doc_preprocessor_image = (
-                self.predict_doc_preprocessor_res(image_array, input_params)
-            )
+            if model_settings["use_doc_preprocessor"]:
+                doc_preprocessor_res = next(
+                    self.doc_preprocessor_pipeline(
+                        image_array,
+                        use_doc_orientation_classify=use_doc_orientation_classify,
+                        use_doc_unwarping=use_doc_unwarping,
+                    )
+                )
+            else:
+                doc_preprocessor_res = {"output_img": image_array}
+
+            doc_preprocessor_image = doc_preprocessor_res["output_img"]
 
             formula_res_list = []
             formula_region_id = 1
 
-            if not input_params["use_layout_detection"] and layout_det_res is None:
+            if not model_settings["use_layout_detection"] and layout_det_res is None:
                 layout_det_res = {}
                 img_height, img_width = doc_preprocessor_image.shape[:2]
                 single_formula_rec_res = self.predict_single_formula_recognition_res(
@@ -234,7 +245,7 @@ class FormulaRecognitionPipeline(BasePipeline):
                 formula_res_list.append(single_formula_rec_res)
                 formula_region_id += 1
             else:
-                if input_params["use_layout_detection"]:
+                if model_settings["use_layout_detection"]:
                     layout_det_res = next(self.layout_det_model(doc_preprocessor_image))
                 for box_info in layout_det_res["boxes"]:
                     if box_info["label"].lower() in ["formula"]:
@@ -251,11 +262,10 @@ class FormulaRecognitionPipeline(BasePipeline):
                         formula_region_id += 1
 
             single_img_res = {
+                "input_path": input_path,
                 "layout_det_res": layout_det_res,
                 "doc_preprocessor_res": doc_preprocessor_res,
                 "formula_res_list": formula_res_list,
-                "input_params": input_params,
-                "img_id": img_id,
-                "img_name": input_path,
+                "model_settings": model_settings,
             }
             yield FormulaRecognitionResult(single_img_res)

+ 110 - 55
paddlex/inference/pipelines_new/formula_recognition/result.py

@@ -13,18 +13,20 @@
 # limitations under the License.
 
 import os, sys
-from typing import Tuple
+from typing import Tuple, List, Dict, Any
 import cv2
 import PIL
 import math
+import copy
 import random
 import tempfile
 import subprocess
 import numpy as np
 from pathlib import Path
+import PIL
 from PIL import Image, ImageDraw, ImageFont
 
-from ...common.result import BaseCVResult
+from ...common.result import BaseCVResult, JsonMixin, ImgMixin, StrMixin
 from ....utils import logging
 from ....utils.fonts import PINGFANG_FONT_FILE_PATH
 from ...models_new.formula_recognition.result import (
@@ -36,53 +38,71 @@ from ...models_new.formula_recognition.result import (
     create_font,
     crop_white_area,
     draw_box_txt_fine,
+    draw_formula_module,
 )
 
 
-class FormulaRecognitionResult(dict):
-    """Layout Parsing Result"""
-
-    def __init__(self, data) -> None:
-        """Initializes a new instance of the class with the specified data."""
-        super().__init__(data)
+class FormulaRecognitionResult(BaseCVResult):
+    """Formula Recognition Result"""
 
-    def save_to_img(self, save_path: str) -> None:
+    def _to_img(self) -> Dict[str, Image.Image]:
         """
-        Saves an image with overlaid formula recognition results.
-
-        This function attempts to save an image with recognized formulas highlighted
-        and annotated. It verifies the environment setup before proceeding and logs
-        a warning if the necessary rendering engine is not installed. The output image
-        consists of two halves: the left side shows the original image with bounding
-        boxes, and the right side shows the recognized formulas.
-
-        Args:
-            save_path (str): The directory path where the output image will be saved.
+        Converts the internal data to a PIL Image with detection and recognition results.
 
         Returns:
-            None
+            Dict[str, Image.Image]: An image with detection boxes, texts, and scores blended on it.
         """
+        image = Image.fromarray(self["doc_preprocessor_res"]["output_img"])
         try:
             env_valid()
         except subprocess.CalledProcessError as e:
             logging.warning(
                 "Please refer to 2.3 Formula Recognition Pipeline Visualization in Formula Recognition Pipeline Tutorial to install the LaTeX rendering engine at first."
             )
-            return None
-        if not os.path.exists(save_path):
-            os.makedirs(save_path)
-        img_id = self["img_id"]
-        img_name = self["img_name"]
+            return {f"formula_res_img": image}
+
         if len(self["layout_det_res"]) <= 0:
-            return
-        image = Image.fromarray(self["layout_det_res"]["input_img"])
+            image = np.array(image.convert("RGB"))
+            rec_formula = self["formula_res_list"][0]["rec_formula"]
+            xywh = crop_white_area(image)
+            if xywh is not None:
+                x, y, w, h = xywh
+                image = image[y : y + h, x : x + w]
+            image = Image.fromarray(image)
+            image_width, image_height = image.size
+            box = [
+                [0, 0],
+                [image_width, 0],
+                [image_width, image_height],
+                [0, image_height],
+            ]
+            try:
+                img_formula = draw_formula_module(
+                    image.size, box, rec_formula, is_debug=False
+                )
+                img_formula = Image.fromarray(img_formula)
+                render_width, render_height = img_formula.size
+                resize_height = render_height
+                resize_width = int(resize_height * image_width / image_height)
+                image = image.resize((resize_width, resize_height), Image.LANCZOS)
+
+                new_image_width = image.width + int(render_width) + 10
+                new_image = Image.new(
+                    "RGB", (new_image_width, render_height), (255, 255, 255)
+                )
+                new_image.paste(image, (0, 0))
+                new_image.paste(img_formula, (image.width + 10, 0))
+                return {f"formula_res_img": new_image}
+            except subprocess.CalledProcessError as e:
+                logging.warning("Syntax error detected in formula, rendering failed.")
+                return {f"formula_res_img": image}
+
         h, w = image.height, image.width
         img_left = image.copy()
         img_right = np.ones((h, w, 3), dtype=np.uint8) * 255
         random.seed(0)
         draw_left = ImageDraw.Draw(img_left)
 
-        formula_save_path = os.path.join(save_path, "formula_img_{}.jpg".format(img_id))
         formula_res_list = self["formula_res_list"]
         for tno in range(len(self["formula_res_list"])):
             formula_res = self["formula_res_list"][tno]
@@ -117,38 +137,73 @@ class FormulaRecognitionResult(dict):
         img_show = Image.new("RGB", (int(w * 2), h), (255, 255, 255))
         img_show.paste(img_left, (0, 0, w, h))
         img_show.paste(Image.fromarray(img_right), (w, 0, w * 2, h))
-        img_show.save(formula_save_path)
 
-    def save_results(self, save_path: str) -> None:
-        """Save the formula recognition results to the specified directory.
+        model_settings = self["model_settings"]
+        res_img_dict = {f"formula_res_img": img_show}
+        if model_settings["use_doc_preprocessor"]:
+            res_img_dict.update(**self["doc_preprocessor_res"].img)
+        return res_img_dict
+
+    def _to_str(self, *args, **kwargs) -> Dict[str, str]:
+        """Converts the instance's attributes to a dictionary and then to a string.
 
         Args:
-            save_path (str): The directory path to save the results.
+            *args: Additional positional arguments passed to the base class method.
+            **kwargs: Additional keyword arguments passed to the base class method.
+
+        Returns:
+            Dict[str, str]: A dictionary with the instance's attributes converted to strings.
         """
-        if not os.path.exists(save_path):
-            os.makedirs(save_path)
-        if not os.path.isdir(save_path):
-            return
-
-        img_id = self["img_id"]
-        layout_det_res = self["layout_det_res"]
-        if len(layout_det_res) > 0:
-            save_img_path = Path(save_path) / f"layout_det_result_img{img_id}.jpg"
-            layout_det_res.save_to_img(save_img_path)
-        self.save_to_img(save_path)
-        input_params = self["input_params"]
-        if input_params["use_doc_preprocessor"]:
-            save_img_path = Path(save_path) / f"doc_preprocessor_result_img{img_id}.jpg"
-            self["doc_preprocessor_res"].save_to_img(save_img_path)
+        data = {}
+        data["input_path"] = self["input_path"]
+        data["model_settings"] = self["model_settings"]
+        if self["model_settings"]["use_doc_preprocessor"]:
+            data["doc_preprocessor_res"] = self["doc_preprocessor_res"].str["res"]
+
+        data["formula_res_list"] = []
         for tno in range(len(self["formula_res_list"])):
-            formula_res = self["formula_res_list"][tno]
-            formula_region_id = formula_res["formula_region_id"]
-            save_img_path = (
-                Path(save_path)
-                / f"formula_res_img{img_id}_region{formula_region_id}.jpg"
-            )
-            formula_res.save_to_img(save_img_path)
-        return
+            rec_formula_dict = {
+                "rec_formula": self["formula_res_list"][tno]["rec_formula"],
+                "formula_region_id": self["formula_res_list"][tno]["formula_region_id"],
+            }
+            if "dt_polys" in self["formula_res_list"][tno]:
+                rec_formula_dict["dt_polys"] = (
+                    self["formula_res_list"][tno]["dt_polys"],
+                )
+            data["formula_res_list"].append(rec_formula_dict)
+
+        return StrMixin._to_str(data, *args, **kwargs)
+
+    def _to_json(self, *args, **kwargs) -> Dict[str, str]:
+        """
+        Converts the object's data to a JSON dictionary.
+
+        Args:
+            *args: Positional arguments passed to the JsonMixin._to_json method.
+            **kwargs: Keyword arguments passed to the JsonMixin._to_json method.
+
+        Returns:
+            Dict[str, str]: A dictionary containing the object's data in JSON format.
+        """
+        data = {}
+        data["input_path"] = self["input_path"]
+        data["model_settings"] = self["model_settings"]
+        if self["model_settings"]["use_doc_preprocessor"]:
+            data["doc_preprocessor_res"] = self["doc_preprocessor_res"].str["res"]
+
+        data["formula_res_list"] = []
+        for tno in range(len(self["formula_res_list"])):
+            rec_formula_dict = {
+                "rec_formula": self["formula_res_list"][tno]["rec_formula"],
+                "formula_region_id": self["formula_res_list"][tno]["formula_region_id"],
+            }
+            if "dt_polys" in self["formula_res_list"][tno]:
+                rec_formula_dict["dt_polys"] = (
+                    self["formula_res_list"][tno]["dt_polys"],
+                )
+            data["formula_res_list"].append(rec_formula_dict)
+
+        return JsonMixin._to_json(data, *args, **kwargs)
 
 
 def draw_box_formula_fine(

+ 8 - 0
paddlex/utils/pipeline_arguments.py

@@ -116,4 +116,12 @@ PIPELINE_ARGUMENTS = {
         },
     ],
     "ts_classification": None,
+    "formula_recognition": None,
+    "video_classification": [
+        {
+            "name": "--topk",
+            "type": int,
+            "help": "Sets the Top-K value for video classification.",
+        },
+    ],
 }