| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- # copyright (c) 2024 PaddlePaddle Authors. All Rights Reserve.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from ..base import BasePipeline
- from typing import Any, Dict, Optional
- import numpy as np
- import cv2
- from ..components import CropByBoxes
- from .utils import convert_points_to_boxes, get_sub_regions_ocr_res
- from .table_recognition_post_processing import get_table_recognition_res
- from .result import LayoutParsingResult
- ########## [TODO]后续需要更新路径
- from ...components.transforms import ReadImage
- class LayoutParsingPipeline(BasePipeline):
- """Layout Parsing Pipeline"""
- entities = "layout_parsing"
- def __init__(self,
- config,
- device=None,
- pp_option=None,
- use_hpip: bool = False,
- hpi_params: Optional[Dict[str, Any]] = None):
- super().__init__(device=device, pp_option=pp_option,
- use_hpip=use_hpip, hpi_params=hpi_params)
-
- self.inintial_predictor(config)
- self.img_reader = ReadImage(format="BGR")
- self._crop_by_boxes = CropByBoxes()
-
- def inintial_predictor(self, config):
- layout_det_config = config['SubModules']["LayoutDetection"]
- self.layout_det_model = self.create_model(layout_det_config)
- self.use_doc_preprocessor = False
- if 'use_doc_preprocessor' in config:
- self.use_doc_preprocessor = config['use_doc_preprocessor']
- if self.use_doc_preprocessor:
- doc_preprocessor_config = config['SubPipelines']['DocPreprocessor']
- self.doc_preprocessor_pipeline = self.create_pipeline(doc_preprocessor_config)
-
- self.use_common_ocr = False
- if "use_common_ocr" in config:
- self.use_common_ocr = config['use_common_ocr']
- if self.use_common_ocr:
- common_ocr_config = config['SubPipelines']['CommonOCR']
- self.common_ocr_pipeline = self.create_pipeline(common_ocr_config)
-
- self.use_seal_recognition = False
- if "use_seal_recognition" in config:
- self.use_seal_recognition = config['use_seal_recognition']
- if self.use_seal_recognition:
- seal_ocr_config = config['SubPipelines']['SealOCR']
- self.seal_ocr_pipeline = self.create_pipeline(seal_ocr_config)
-
- self.use_table_recognition = False
- if "use_table_recognition" in config:
- self.use_table_recognition = config['use_table_recognition']
- if self.use_table_recognition:
- table_structure_config = config['SubModules']['TableStructurePredictor']
- self.table_structure_model = self.create_model(table_structure_config)
- if not self.use_common_ocr:
- common_ocr_config = config['SubPipelines']['OCR']
- self.common_ocr_pipeline = self.create_pipeline(common_ocr_config)
- return
- def get_text_paragraphs_ocr_res(self, overall_ocr_res, layout_det_res):
- '''get ocr res of the text paragraphs'''
- object_boxes = []
- for box_info in layout_det_res['boxes']:
- if box_info['label'].lower() in ['image', 'formula', 'table', 'seal']:
- object_boxes.append(box_info['coordinate'])
- object_boxes = np.array(object_boxes)
- return get_sub_regions_ocr_res(overall_ocr_res, object_boxes, flag_within=False)
- def check_input_params(self, input_params):
- if input_params['use_doc_preprocessor'] and not self.use_doc_preprocessor:
- raise ValueError("The models for doc preprocessor are not initialized.")
- if input_params['use_common_ocr'] and not self.use_common_ocr:
- raise ValueError("The models for common OCR are not initialized.")
- if input_params['use_seal_recognition'] and not self.use_seal_recognition:
- raise ValueError("The models for seal recognition are not initialized.")
- if input_params['use_table_recognition'] and not self.use_table_recognition:
- raise ValueError("The models for table recognition are not initialized.")
- return
- def predict(self, input,
- use_doc_orientation_classify=True,
- use_doc_unwarping=True,
- use_common_ocr=True,
- use_seal_recognition=True,
- use_table_recognition=True,
- **kwargs):
- if not isinstance(input, list):
- input_list = [input]
- else:
- input_list = input
-
- input_params = {"use_doc_preprocessor":self.use_doc_preprocessor,
- "use_doc_orientation_classify":use_doc_orientation_classify,
- "use_doc_unwarping":use_doc_unwarping,
- "use_common_ocr":use_common_ocr,
- "use_seal_recognition":use_seal_recognition,
- "use_table_recognition":use_table_recognition}
-
- if use_doc_orientation_classify or use_doc_unwarping:
- input_params['use_doc_preprocessor'] = True
- self.check_input_params(input_params)
- img_id = 1
- for input in input_list:
- if isinstance(input, str):
- image_array = next(self.img_reader(input))[0]['img']
- else:
- image_array = input
- assert len(image_array.shape) == 3
- if input_params['use_doc_preprocessor']:
- doc_preprocessor_res = next(self.doc_preprocessor_pipeline(
- image_array,
- use_doc_orientation_classify=use_doc_orientation_classify,
- use_doc_unwarping=use_doc_unwarping))
- doc_preprocessor_image = doc_preprocessor_res['output_img']
- doc_preprocessor_res['img_id'] = img_id
- else:
- doc_preprocessor_res = {}
- doc_preprocessor_image = image_array
-
- ########## [TODO]RT-DETR 检测结果有重复
- layout_det_res = next(self.layout_det_model(doc_preprocessor_image))
- if input_params['use_common_ocr'] or input_params['use_table_recognition']:
- overall_ocr_res = next(self.common_ocr_pipeline(doc_preprocessor_image))
- overall_ocr_res['img_id'] = img_id
- dt_boxes = convert_points_to_boxes(overall_ocr_res['dt_polys'])
- overall_ocr_res['dt_boxes'] = dt_boxes
- else:
- overall_ocr_res = {}
-
- text_paragraphs_ocr_res = {}
- if input_params['use_common_ocr']:
- text_paragraphs_ocr_res = self.get_text_paragraphs_ocr_res(
- overall_ocr_res, layout_det_res)
- text_paragraphs_ocr_res['img_id'] = img_id
-
- table_res_list = []
- if input_params['use_table_recognition']:
- table_region_id = 1
- for box_info in layout_det_res['boxes']:
- if box_info['label'].lower() in ['table']:
- crop_img_info = self._crop_by_boxes(doc_preprocessor_image, [box_info])
- crop_img_info = crop_img_info[0]
- table_structure_pred = next(self.table_structure_model(
- crop_img_info['img']))
- table_recognition_res = get_table_recognition_res(
- crop_img_info, table_structure_pred, overall_ocr_res)
- table_recognition_res['table_region_id'] = table_region_id
- table_region_id += 1
- table_res_list.append(table_recognition_res)
-
- seal_res_list = []
- if input_params['use_seal_recognition']:
- seal_region_id = 1
- for box_info in layout_det_res['boxes']:
- if box_info['label'].lower() in ['seal']:
- crop_img_info = self._crop_by_boxes(doc_preprocessor_image, [box_info])
- crop_img_info = crop_img_info[0]
- seal_ocr_res = next(self.seal_ocr_pipeline(crop_img_info['img']))
- seal_ocr_res['seal_region_id'] = seal_region_id
- seal_region_id += 1
- seal_res_list.append(seal_ocr_res)
-
- single_img_res = {"layout_det_res":layout_det_res,
- "doc_preprocessor_res":doc_preprocessor_res,
- "text_paragraphs_ocr_res":text_paragraphs_ocr_res,
- "table_res_list":table_res_list,
- "seal_res_list":seal_res_list,
- "input_params":input_params}
- yield LayoutParsingResult(single_img_res)
|