bbox_extractor.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. """
  2. bbox 提取模块
  3. 负责从 PaddleOCR 结果中提取文字框信息
  4. """
  5. from typing import List, Dict
  6. import numpy as np
  7. from pathlib import Path
  8. class BBoxExtractor:
  9. """bbox 提取器"""
  10. @staticmethod
  11. def extract_paddle_text_boxes(paddle_data: Dict) -> List[Dict]:
  12. """
  13. 提取 PaddleOCR 的文字框信息
  14. Args:
  15. paddle_data: PaddleOCR 输出的数据
  16. Returns:
  17. 文字框列表(坐标已转换为 angle=0 时的坐标)
  18. """
  19. text_boxes = []
  20. if 'overall_ocr_res' not in paddle_data:
  21. return text_boxes
  22. ocr_res = paddle_data['overall_ocr_res']
  23. rec_texts = ocr_res.get('rec_texts', [])
  24. rec_polys = ocr_res.get('rec_polys', [])
  25. rec_scores = ocr_res.get('rec_scores', [])
  26. # 🎯 获取旋转角度
  27. rotation_angle = BBoxExtractor._get_rotation_angle(paddle_data)
  28. # 🎯 如果有旋转,需要获取原始图像尺寸
  29. orig_image_size = None
  30. if rotation_angle != 0:
  31. orig_image_size = BBoxExtractor._get_original_image_size(paddle_data)
  32. print(f"🔄 检测到旋转角度: {rotation_angle}°")
  33. print(f"📐 原始图像尺寸: {orig_image_size[0]} x {orig_image_size[1]}")
  34. for i, (text, poly, score) in enumerate(zip(rec_texts, rec_polys, rec_scores)):
  35. if text and text.strip():
  36. # 🎯 如果有旋转角度,转换坐标
  37. if rotation_angle != 0 and orig_image_size:
  38. poly = BBoxExtractor._inverse_rotate_coordinates(
  39. poly, rotation_angle, orig_image_size
  40. )
  41. # 计算 bbox (x_min, y_min, x_max, y_max)
  42. bbox = BBoxExtractor._poly_to_bbox(poly)
  43. text_boxes.append({
  44. 'text': text,
  45. 'bbox': bbox,
  46. 'poly': poly,
  47. 'score': score,
  48. 'paddle_bbox_index': i,
  49. 'used': False
  50. })
  51. return text_boxes
  52. @staticmethod
  53. def _get_rotation_angle(paddle_data: Dict) -> float:
  54. """获取旋转角度"""
  55. if 'doc_preprocessor_res' not in paddle_data:
  56. return 0.0
  57. doc_res = paddle_data['doc_preprocessor_res']
  58. if isinstance(doc_res, dict) and 'angle' in doc_res:
  59. return float(doc_res['angle'])
  60. return 0.0
  61. @staticmethod
  62. def _get_original_image_size(paddle_data: Dict) -> tuple:
  63. """
  64. 获取原始图像尺寸(从图片文件读取)
  65. Args:
  66. paddle_data: PaddleOCR 数据
  67. Returns:
  68. (width, height) 元组
  69. """
  70. from PIL import Image
  71. # 🎯 从 input_path 读取图像
  72. input_path = paddle_data.get('input_path')
  73. if input_path and Path(input_path).exists():
  74. try:
  75. with Image.open(input_path) as img:
  76. # 返回原始图像尺寸
  77. return img.size # (width, height)
  78. except Exception as e:
  79. print(f"⚠️ 无法读取图像文件 {input_path}: {e}")
  80. # 🎯 降级方案:从 layout_det_res 推断
  81. if 'layout_det_res' in paddle_data:
  82. layout_res = paddle_data['layout_det_res']
  83. if 'boxes' in layout_res and layout_res['boxes']:
  84. max_x = 0
  85. max_y = 0
  86. for box in layout_res['boxes']:
  87. coord = box.get('coordinate', [])
  88. if len(coord) >= 4:
  89. max_x = max(max_x, coord[2])
  90. max_y = max(max_y, coord[3])
  91. if max_x > 0 and max_y > 0:
  92. return (int(max_x) + 50, int(max_y) + 50)
  93. # 🎯 最后降级:从 overall_ocr_res 推断
  94. if 'overall_ocr_res' in paddle_data:
  95. ocr_res = paddle_data['overall_ocr_res']
  96. rec_polys = ocr_res.get('rec_polys', [])
  97. if rec_polys:
  98. max_x = 0
  99. max_y = 0
  100. for poly in rec_polys:
  101. for point in poly:
  102. max_x = max(max_x, point[0])
  103. max_y = max(max_y, point[1])
  104. if max_x > 0 and max_y > 0:
  105. return (int(max_x) + 50, int(max_y) + 50)
  106. # 🎯 默认 A4 尺寸
  107. print("⚠️ 无法确定原始图像尺寸,使用默认值")
  108. return (2480, 3508)
  109. @staticmethod
  110. def _inverse_rotate_coordinates(poly: List[List[float]],
  111. angle: float,
  112. orig_image_size: tuple) -> List[List[float]]:
  113. """
  114. 反向旋转坐标
  115. 参考 ocr_validator_utils.rotate_image_and_coordinates 的逆操作
  116. PaddleOCR 在旋转后的图像上识别,坐标是旋转后的
  117. 我们需要将坐标转换回原始图像(未旋转)
  118. Args:
  119. poly: 旋转后图像上的多边形坐标 [[x',y'], ...]
  120. angle: 旋转角度(度数,PaddleX 使用的角度)
  121. orig_image_size: 原始图像尺寸 (width, height)
  122. Returns:
  123. 原始图像上的多边形坐标 [[x,y], ...]
  124. """
  125. orig_width, orig_height = orig_image_size
  126. # 🎯 根据旋转角度计算旋转后的图像尺寸
  127. if angle == 90:
  128. rotated_width, rotated_height = orig_height, orig_width
  129. elif angle == 270:
  130. rotated_width, rotated_height = orig_height, orig_width
  131. else:
  132. rotated_width, rotated_height = orig_width, orig_height
  133. inverse_poly = []
  134. for point in poly:
  135. x_rot, y_rot = point[0], point[1] # 旋转后的坐标
  136. # 🎯 反向旋转(参考 rotate_image_and_coordinates 的逆操作)
  137. if angle == 90:
  138. # 正向: rotated = image.rotate(90, expand=True)
  139. # x_rot = y_orig
  140. # y_rot = rotated_width - x_orig = orig_height - x_orig
  141. # 反向: x_orig = rotated_width - y_rot = orig_height - y_rot
  142. # y_orig = x_rot
  143. x_orig = rotated_width - y_rot
  144. y_orig = x_rot
  145. elif angle == 270:
  146. # 正向: rotated = image.rotate(-90, expand=True)
  147. # x_rot = rotated_width - y_orig = orig_height - y_orig
  148. # y_rot = x_orig
  149. # 反向: y_orig = rotated_width - x_rot = orig_height - x_rot
  150. # x_orig = y_rot
  151. x_orig = y_rot
  152. y_orig = rotated_width - x_rot
  153. elif angle == 180:
  154. # 正向: rotated = image.rotate(180)
  155. # x_rot = orig_width - x_orig
  156. # y_rot = orig_height - y_orig
  157. # 反向: x_orig = orig_width - x_rot
  158. # y_orig = orig_height - y_rot
  159. x_orig = orig_width - x_rot
  160. y_orig = orig_height - y_rot
  161. else:
  162. # 其他角度或0度,不转换
  163. x_orig = x_rot
  164. y_orig = y_rot
  165. inverse_poly.append([x_orig, y_orig])
  166. return inverse_poly
  167. @staticmethod
  168. def _poly_to_bbox(poly: List[List[float]]) -> List[float]:
  169. """将多边形转换为 bbox [x_min, y_min, x_max, y_max]"""
  170. xs = [p[0] for p in poly]
  171. ys = [p[1] for p in poly]
  172. return [min(xs), min(ys), max(xs), max(ys)]
  173. @staticmethod
  174. def extract_table_cells_with_bbox(merged_data: List[Dict]) -> List[Dict]:
  175. """
  176. 提取所有表格单元格及其 bbox 信息
  177. Args:
  178. merged_data: 合并后的数据
  179. Returns:
  180. 单元格列表
  181. """
  182. import json
  183. from bs4 import BeautifulSoup
  184. cells = []
  185. for item in merged_data:
  186. if item['type'] != 'table':
  187. continue
  188. html = item.get('table_body_with_bbox', item.get('table_body', ''))
  189. soup = BeautifulSoup(html, 'html.parser')
  190. for row_idx, row in enumerate(soup.find_all('tr')):
  191. for col_idx, cell in enumerate(row.find_all(['td', 'th'])):
  192. cell_text = cell.get_text(strip=True)
  193. bbox_str = cell.get('data-bbox', '')
  194. if bbox_str:
  195. try:
  196. bbox = json.loads(bbox_str)
  197. cells.append({
  198. 'text': cell_text,
  199. 'bbox': bbox,
  200. 'row': row_idx,
  201. 'col': col_idx,
  202. 'score': float(cell.get('data-score', 0)),
  203. 'paddle_index': int(cell.get('data-paddle-index', -1))
  204. })
  205. except (json.JSONDecodeError, ValueError):
  206. pass
  207. return cells