adaptive_row_splitter.py 9.2 KB


  1. """
  2. 自适应行分割器
  3. 基于 Y 轴投影密度分析,自动检测行间空白带
  4. 适用于行高不固定的银行流水等表格
  5. """
  6. import numpy as np
  7. from typing import List, Dict, Tuple, Optional
  8. from dataclasses import dataclass, field
  9. @dataclass
  10. class RowRegion:
  11. """行区域"""
  12. y_top: int
  13. y_bottom: int
  14. boxes: List[Dict] = field(default_factory=list)
  15. row_index: int = 0
  16. @property
  17. def height(self) -> int:
  18. return self.y_bottom - self.y_top
  19. @property
  20. def y_center(self) -> float:
  21. return (self.y_top + self.y_bottom) / 2
  22. def to_dict(self) -> Dict:
  23. return {
  24. 'y_top': self.y_top,
  25. 'y_bottom': self.y_bottom,
  26. 'row_index': self.row_index,
  27. 'box_count': len(self.boxes),
  28. 'height': self.height
  29. }
  30. class AdaptiveRowSplitter:
  31. """
  32. 自适应行分割器
  33. 核心思路:
  34. 1. 构建 Y 轴投影密度图(每个 Y 坐标有多少文本覆盖)
  35. 2. 找到密度为 0 或极低的"空白带"
  36. 3. 空白带的中线即为行分割线
  37. 优势:
  38. - 不假设固定行高
  39. - 自动适应多行文本单元格
  40. - 对 OCR 结果的微小误差有容错性
  41. """
  42. def __init__(self,
  43. min_gap_height: int = 6, # 最小行间隙高度 (像素)
  44. density_threshold: float = 0.05, # 空白判定阈值 (归一化后)
  45. smooth_window: int = 3, # 平滑窗口大小
  46. merge_close_gaps: bool = True, # 是否合并过近的空白带
  47. min_row_height: int = 15): # 最小行高
  48. self.min_gap_height = min_gap_height
  49. self.density_threshold = density_threshold
  50. self.smooth_window = smooth_window
  51. self.merge_close_gaps = merge_close_gaps
  52. self.min_row_height = min_row_height
  53. def split_rows(self,
  54. ocr_boxes: List[Dict],
  55. table_region: Tuple[int, int, int, int],
  56. debug: bool = False) -> Tuple[List[RowRegion], Optional[Dict]]:
  57. """
  58. 自适应行分割
  59. Args:
  60. ocr_boxes: OCR 识别结果 [{text, bbox: [x1,y1,x2,y2]}]
  61. table_region: 表格区域 (x1, y1, x2, y2)
  62. debug: 是否返回调试信息
  63. Returns:
  64. (行区域列表, 调试信息)
  65. """
  66. x1, y1, x2, y2 = table_region
  67. height = y2 - y1
  68. if height <= 0:
  69. return [], None
  70. # 1. 筛选表格区域内的 boxes
  71. table_boxes = [
  72. b for b in ocr_boxes
  73. if self._box_in_region(b['bbox'], table_region)
  74. ]
  75. if not table_boxes:
  76. return [], None
  77. # 2. 构建 Y 轴投影密度图
  78. density = self._build_y_projection(table_boxes, y1, y2)
  79. # 3. 平滑处理 (消除噪音)
  80. density_smooth = self._smooth(density, self.smooth_window)
  81. # 4. 找到空白带 (密度低于阈值的连续区域)
  82. gaps = self._find_gaps(density_smooth, self.density_threshold, self.min_gap_height)
  83. # 5. 可选:合并过近的空白带
  84. if self.merge_close_gaps:
  85. gaps = self._merge_close_gaps(gaps, self.min_row_height)
  86. # 6. 根据空白带分割行
  87. rows = self._split_by_gaps(table_boxes, gaps, y1, y2)
  88. # 7. 设置行索引
  89. for i, row in enumerate(rows):
  90. row.row_index = i
  91. # 调试信息
  92. debug_info = None
  93. if debug:
  94. debug_info = {
  95. 'density': density.tolist(),
  96. 'density_smooth': density_smooth.tolist(),
  97. 'gaps': gaps,
  98. 'table_region': table_region,
  99. 'box_count': len(table_boxes)
  100. }
  101. return rows, debug_info
  102. def _build_y_projection(self,
  103. boxes: List[Dict],
  104. y_start: int,
  105. y_end: int) -> np.ndarray:
  106. """
  107. 构建 Y 轴投影密度图
  108. 对于每个 Y 坐标,计算有多少个 box 的纵向范围覆盖了该位置
  109. """
  110. height = y_end - y_start
  111. density = np.zeros(height, dtype=np.float32)
  112. for box in boxes:
  113. # 计算 box 在相对坐标系中的 Y 范围
  114. box_y1 = max(0, int(box['bbox'][1] - y_start))
  115. box_y2 = min(height, int(box['bbox'][3] - y_start))
  116. if box_y2 > box_y1:
  117. # 该 box 覆盖的 Y 范围内,密度 +1
  118. density[box_y1:box_y2] += 1
  119. # 归一化到 [0, 1]
  120. max_val = density.max()
  121. if max_val > 0:
  122. density = density / max_val
  123. return density
  124. def _smooth(self, arr: np.ndarray, window: int) -> np.ndarray:
  125. """移动平均平滑"""
  126. if window <= 1 or len(arr) < window:
  127. return arr.copy()
  128. kernel = np.ones(window) / window
  129. # 使用 'same' 模式保持长度不变
  130. smoothed = np.convolve(arr, kernel, mode='same')
  131. return smoothed
  132. def _find_gaps(self,
  133. density: np.ndarray,
  134. threshold: float,
  135. min_height: int) -> List[Tuple[int, int]]:
  136. """
  137. 找到空白带 (密度低于阈值的连续区域)
  138. Returns:
  139. 空白带列表 [(y_start, y_end), ...] (相对坐标)
  140. """
  141. gaps = []
  142. in_gap = False
  143. gap_start = 0
  144. for i, d in enumerate(density):
  145. if d < threshold:
  146. if not in_gap:
  147. in_gap = True
  148. gap_start = i
  149. else:
  150. if in_gap:
  151. in_gap = False
  152. gap_end = i
  153. # 只保留足够高的空白带
  154. if gap_end - gap_start >= min_height:
  155. gaps.append((gap_start, gap_end))
  156. # 处理末尾的空白带
  157. if in_gap:
  158. gap_end = len(density)
  159. if gap_end - gap_start >= min_height:
  160. gaps.append((gap_start, gap_end))
  161. return gaps
  162. def _merge_close_gaps(self,
  163. gaps: List[Tuple[int, int]],
  164. min_distance: int) -> List[Tuple[int, int]]:
  165. """
  166. 合并过近的空白带
  167. 如果两个空白带之间的距离小于 min_distance,说明中间的内容太少,
  168. 应该将它们合并为一个更大的空白带
  169. """
  170. if len(gaps) <= 1:
  171. return gaps
  172. merged = [gaps[0]]
  173. for gap in gaps[1:]:
  174. last_gap = merged[-1]
  175. # 计算两个空白带之间的距离
  176. distance = gap[0] - last_gap[1]
  177. if distance < min_distance:
  178. # 合并:扩展上一个空白带
  179. merged[-1] = (last_gap[0], gap[1])
  180. else:
  181. merged.append(gap)
  182. return merged
  183. def _split_by_gaps(self,
  184. boxes: List[Dict],
  185. gaps: List[Tuple[int, int]],
  186. y_offset: int,
  187. y_end: int) -> List[RowRegion]:
  188. """
  189. 根据空白带分割行
  190. """
  191. if not gaps:
  192. # 没有空白带,整个区域作为一行
  193. return [RowRegion(y_top=y_offset, y_bottom=y_end, boxes=boxes)]
  194. # 计算分割线 (空白带的中点)
  195. separators = [y_offset] # 第一行从表格顶部开始
  196. for gap_start, gap_end in gaps:
  197. # 空白带的中点作为分割线
  198. separator = y_offset + (gap_start + gap_end) // 2
  199. separators.append(separator)
  200. separators.append(y_end) # 最后一行到表格底部结束
  201. # 根据分割线划分 boxes
  202. rows = []
  203. for i in range(len(separators) - 1):
  204. row_top = separators[i]
  205. row_bottom = separators[i + 1]
  206. # 找到属于这一行的 boxes (中心点在行范围内)
  207. row_boxes = [
  208. b for b in boxes
  209. if self._box_in_row(b['bbox'], row_top, row_bottom)
  210. ]
  211. # 按 x 坐标排序
  212. row_boxes.sort(key=lambda b: b['bbox'][0])
  213. if row_boxes: # 只添加非空行
  214. rows.append(RowRegion(
  215. y_top=row_top,
  216. y_bottom=row_bottom,
  217. boxes=row_boxes
  218. ))
  219. return rows
  220. def _box_in_region(self, bbox: List[int], region: Tuple[int, int, int, int]) -> bool:
  221. """判断 box 是否在区域内 (基于中心点)"""
  222. x1, y1, x2, y2 = region
  223. bx1, by1, bx2, by2 = bbox
  224. cx, cy = (bx1 + bx2) / 2, (by1 + by2) / 2
  225. return x1 <= cx <= x2 and y1 <= cy <= y2
  226. def _box_in_row(self, bbox: List[int], row_top: int, row_bottom: int) -> bool:
  227. """判断 box 是否属于某行 (基于中心点)"""
  228. by1, by2 = bbox[1], bbox[3]
  229. cy = (by1 + by2) / 2
  230. return row_top <= cy < row_bottom