|
@@ -4,6 +4,7 @@
|
|
|
提供从表格线提取单元格和恢复网格结构的功能。
|
|
提供从表格线提取单元格和恢复网格结构的功能。
|
|
|
"""
|
|
"""
|
|
|
from typing import List, Dict, Optional
|
|
from typing import List, Dict, Optional
|
|
|
|
|
+from pathlib import Path
|
|
|
import cv2
|
|
import cv2
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
from loguru import logger
|
|
from loguru import logger
|
|
@@ -22,6 +23,8 @@ class GridRecovery:
|
|
|
debug_dir: Optional[str] = None,
|
|
debug_dir: Optional[str] = None,
|
|
|
debug_prefix: str = "",
|
|
debug_prefix: str = "",
|
|
|
crop_padding: int = 10, # 新增:裁剪时的padding值(原图坐标系)
|
|
crop_padding: int = 10, # 新增:裁剪时的padding值(原图坐标系)
|
|
|
|
|
+ ocr_bboxes: Optional[List[Dict]] = None, # 🆕 整页OCR结果
|
|
|
|
|
+ enable_ocr_edge_compensation: bool = True, # 🆕 是否启用OCR边缘补偿
|
|
|
) -> List[List[float]]:
|
|
) -> List[List[float]]:
|
|
|
"""
|
|
"""
|
|
|
基于矢量重构的连通域分析 (Advanced Vector-based Recovery)
|
|
基于矢量重构的连通域分析 (Advanced Vector-based Recovery)
|
|
@@ -32,6 +35,7 @@ class GridRecovery:
|
|
|
3. 线段归并/连接 (adjust_lines)
|
|
3. 线段归并/连接 (adjust_lines)
|
|
|
4. 几何延长线段 (Custom final_adjust_lines with larger threshold)
|
|
4. 几何延长线段 (Custom final_adjust_lines with larger threshold)
|
|
|
5. 重绘Mask并进行连通域分析
|
|
5. 重绘Mask并进行连通域分析
|
|
|
|
|
+ 6. 🆕 OCR补偿未封闭的边缘单元格
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
hpred_up: 横线预测mask(上采样后)
|
|
hpred_up: 横线预测mask(上采样后)
|
|
@@ -42,14 +46,17 @@ class GridRecovery:
|
|
|
debug_dir: 调试输出目录 (Optional)
|
|
debug_dir: 调试输出目录 (Optional)
|
|
|
debug_prefix: 调试文件名前缀 (Optional)
|
|
debug_prefix: 调试文件名前缀 (Optional)
|
|
|
crop_padding: 裁剪时的padding值(原图坐标系,默认10px)
|
|
crop_padding: 裁剪时的padding值(原图坐标系,默认10px)
|
|
|
|
|
+ ocr_bboxes: 🆕 整页OCR结果 [{'bbox': [x1,y1,x2,y2], 'text': str, 'confidence': float}, ...]
|
|
|
|
|
+ enable_ocr_edge_compensation: 🆕 是否启用OCR边缘补偿(默认True)
|
|
|
|
|
|
|
|
注意:
|
|
注意:
|
|
|
- hpred_up/vpred_up 是上采样后的mask,坐标系已经放大了 upscale 倍
|
|
- hpred_up/vpred_up 是上采样后的mask,坐标系已经放大了 upscale 倍
|
|
|
- crop_padding 是原图坐标系的值,需要乘以 upscale 转换到mask坐标系
|
|
- crop_padding 是原图坐标系的值,需要乘以 upscale 转换到mask坐标系
|
|
|
- edge_margin 用于过滤贴近图像边缘的线条(padding区域的噪声)
|
|
- edge_margin 用于过滤贴近图像边缘的线条(padding区域的噪声)
|
|
|
|
|
+ - ocr_bboxes坐标应为原图坐标系,补偿算法会自动处理坐标转换
|
|
|
|
|
|
|
|
Returns:
|
|
Returns:
|
|
|
- 单元格bbox列表 [[x1, y1, x2, y2], ...]
|
|
|
|
|
|
|
+ 单元格bbox列表 [[x1, y1, x2, y2], ...] (原图坐标系)
|
|
|
"""
|
|
"""
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
import cv2
|
|
import cv2
|
|
@@ -489,6 +496,40 @@ class GridRecovery:
|
|
|
else:
|
|
else:
|
|
|
logger.info(f"矢量重构分析提取到 {len(bboxes)} 个单元格 (Dynamic Alpha: {dynamic_alpha}, upscale={upscale:.3f})")
|
|
logger.info(f"矢量重构分析提取到 {len(bboxes)} 个单元格 (Dynamic Alpha: {dynamic_alpha}, upscale={upscale:.3f})")
|
|
|
|
|
|
|
|
|
|
+ # 🆕 Step 6: OCR补偿未封闭的边缘单元格
|
|
|
|
|
+ if enable_ocr_edge_compensation and ocr_bboxes and orig_h is not None and orig_w is not None:
|
|
|
|
|
+ logger.info("━━━━━━━━ 🔍 OCR边缘补偿 ━━━━━━━━")
|
|
|
|
|
+
|
|
|
|
|
+ # 转换线条坐标到原图坐标系 (从mask坐标系转换)
|
|
|
|
|
+ rowboxes_orig = [
|
|
|
|
|
+ [line[0] / scale_w, line[1] / scale_h, line[2] / scale_w, line[3] / scale_h]
|
|
|
|
|
+ for line in rowboxes
|
|
|
|
|
+ ]
|
|
|
|
|
+ colboxes_orig = [
|
|
|
|
|
+ [line[0] / scale_w, line[1] / scale_h, line[2] / scale_w, line[3] / scale_h]
|
|
|
|
|
+ for line in colboxes
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ # 调用OCR补偿算法 (所有坐标均为原图坐标系)
|
|
|
|
|
+ compensated_bboxes = GridRecovery._compensate_unclosed_cells(
|
|
|
|
|
+ existing_bboxes=bboxes, # 已有bbox (原图坐标系)
|
|
|
|
|
+ ocr_bboxes=ocr_bboxes, # OCR结果 (原图坐标系)
|
|
|
|
|
+ rowboxes=rowboxes_orig, # 水平线 (原图坐标系)
|
|
|
|
|
+ colboxes=colboxes_orig, # 垂直线 (原图坐标系)
|
|
|
|
|
+ img_h=orig_h,
|
|
|
|
|
+ img_w=orig_w,
|
|
|
|
|
+ debug_dir=debug_dir,
|
|
|
|
|
+ debug_prefix=debug_prefix
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if compensated_bboxes:
|
|
|
|
|
+ logger.info(f"✅ OCR补偿成功: +{len(compensated_bboxes)}个边缘单元格")
|
|
|
|
|
+ bboxes.extend(compensated_bboxes)
|
|
|
|
|
+ # 重新排序
|
|
|
|
|
+ bboxes.sort(key=lambda b: (int(b[1] / 10), b[0]))
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.info("ℹ️ OCR补偿: 无需补偿边缘单元格")
|
|
|
|
|
+
|
|
|
return bboxes
|
|
return bboxes
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
@@ -531,11 +572,7 @@ class GridRecovery:
|
|
|
return grid_lines
|
|
return grid_lines
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
|
- def recover_grid_structure(
|
|
|
|
|
- bboxes: List[List[float]],
|
|
|
|
|
- ocr_bboxes: Optional[List[Dict]] = None,
|
|
|
|
|
- enable_ocr_compensation: bool = True
|
|
|
|
|
- ) -> List[Dict]:
|
|
|
|
|
|
|
+ def recover_grid_structure(bboxes: List[List[float]]) -> List[Dict]:
|
|
|
"""
|
|
"""
|
|
|
从散乱的单元格 bbox 恢复表格的行列结构 (row, col, rowspan, colspan)
|
|
从散乱的单元格 bbox 恢复表格的行列结构 (row, col, rowspan, colspan)
|
|
|
重构版:基于投影网格线 (Projected Grid Lines) 的算法
|
|
重构版:基于投影网格线 (Projected Grid Lines) 的算法
|
|
@@ -543,8 +580,6 @@ class GridRecovery:
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
bboxes: 单元格bbox列表
|
|
bboxes: 单元格bbox列表
|
|
|
- ocr_bboxes: 整页OCR结果 [{'bbox': [x1,y1,x2,y2], 'text': '...'}, ...](可选)
|
|
|
|
|
- enable_ocr_compensation: 是否启用OCR补偿缺失单元格
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
Returns:
|
|
|
结构化单元格列表,包含 row, col, rowspan, colspan
|
|
结构化单元格列表,包含 row, col, rowspan, colspan
|
|
@@ -567,15 +602,6 @@ class GridRecovery:
|
|
|
x_coords.append(b[2])
|
|
x_coords.append(b[2])
|
|
|
col_dividers = GridRecovery.find_grid_lines(x_coords, tolerance=5, min_support=2)
|
|
col_dividers = GridRecovery.find_grid_lines(x_coords, tolerance=5, min_support=2)
|
|
|
|
|
|
|
|
- # 2.5. OCR补偿缺失单元格(在分配row/col之前)
|
|
|
|
|
- if enable_ocr_compensation and ocr_bboxes:
|
|
|
|
|
- compensated_bboxes = GridRecovery._compensate_with_ocr(
|
|
|
|
|
- bboxes, ocr_bboxes, row_dividers, col_dividers
|
|
|
|
|
- )
|
|
|
|
|
- if compensated_bboxes:
|
|
|
|
|
- logger.info(f"🔧 OCR补偿: +{len(compensated_bboxes)} 个缺失单元格")
|
|
|
|
|
- bboxes = bboxes + compensated_bboxes
|
|
|
|
|
-
|
|
|
|
|
# 3. 构建网格结构
|
|
# 3. 构建网格结构
|
|
|
structured_cells = []
|
|
structured_cells = []
|
|
|
|
|
|
|
@@ -739,208 +765,351 @@ class GridRecovery:
|
|
|
return new_cells
|
|
return new_cells
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
|
- def _compensate_with_ocr(
|
|
|
|
|
|
|
+ def _compensate_unclosed_cells(
|
|
|
existing_bboxes: List[List[float]],
|
|
existing_bboxes: List[List[float]],
|
|
|
ocr_bboxes: List[Dict],
|
|
ocr_bboxes: List[Dict],
|
|
|
- row_dividers: List[float],
|
|
|
|
|
- col_dividers: List[float],
|
|
|
|
|
- min_overlap_ratio: float = 0.3
|
|
|
|
|
|
|
+ rowboxes: List[List[float]],
|
|
|
|
|
+ colboxes: List[List[float]],
|
|
|
|
|
+ img_h: float,
|
|
|
|
|
+ img_w: float,
|
|
|
|
|
+ min_confidence: float = 0.7,
|
|
|
|
|
+ debug_dir: Optional[str] = None,
|
|
|
|
|
+ debug_prefix: str = ""
|
|
|
) -> List[List[float]]:
|
|
) -> List[List[float]]:
|
|
|
"""
|
|
"""
|
|
|
- 利用整页OCR信息补偿缺失的单元格
|
|
|
|
|
|
|
+ 基于网格矩阵补偿未封闭的边缘单元格
|
|
|
|
|
|
|
|
- 策略:
|
|
|
|
|
- 1. 计算所有理论单元格位置(基于网格线)
|
|
|
|
|
- 2. 检查哪些理论位置有OCR内容但没有检测到单元格
|
|
|
|
|
- 3. 根据OCR bbox跨越的网格数量自动判断是否为合并单元格
|
|
|
|
|
- 4. 只补偿有相邻单元格的位置(避免孤立补偿)
|
|
|
|
|
|
|
+ 新算法思路:
|
|
|
|
|
+ 1. 从rowboxes/colboxes构建网格矩阵
|
|
|
|
|
+ 2. 将existing_bboxes映射到网格单元
|
|
|
|
|
+ 3. 检测空的边缘单元格(与已有单元格相邻)
|
|
|
|
|
+ 4. 用OCR填充这些空单元格
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
- existing_bboxes: 已检测到的单元格bbox
|
|
|
|
|
- ocr_bboxes: 整页OCR结果 [{'bbox': [x1,y1,x2,y2], 'text': '...'}, ...]
|
|
|
|
|
- row_dividers: 行分割线
|
|
|
|
|
- col_dividers: 列分割线
|
|
|
|
|
- min_overlap_ratio: OCR bbox与理论单元格的最小重叠率
|
|
|
|
|
-
|
|
|
|
|
|
|
+ existing_bboxes: 连通域检测到的bbox列表 (原图坐标系)
|
|
|
|
|
+ ocr_bboxes: 整页OCR结果
|
|
|
|
|
+ rowboxes: 水平线列表 (原图坐标系)
|
|
|
|
|
+ colboxes: 垂直线列表 (原图坐标系)
|
|
|
|
|
+ img_h, img_w: 原图尺寸
|
|
|
|
|
+ min_confidence: OCR最小置信度阈值
|
|
|
|
|
+ debug_dir, debug_prefix: Debug可视化参数
|
|
|
|
|
+
|
|
|
Returns:
|
|
Returns:
|
|
|
- 补偿的bbox列表
|
|
|
|
|
|
|
+ 补偿的bbox列表 (原图坐标系)
|
|
|
"""
|
|
"""
|
|
|
- if not ocr_bboxes or len(row_dividers) < 2 or len(col_dividers) < 2:
|
|
|
|
|
|
|
+ if not ocr_bboxes or not rowboxes or not colboxes:
|
|
|
|
|
+ logger.debug("📊 OCR补偿: 缺少必要数据")
|
|
|
return []
|
|
return []
|
|
|
|
|
|
|
|
- # 1. 构建已存在单元格的覆盖区域(快速查找)
|
|
|
|
|
- existing_coverage = set()
|
|
|
|
|
- for bbox in existing_bboxes:
|
|
|
|
|
- # 计算该bbox覆盖的理论网格区域
|
|
|
|
|
- covered_rows = []
|
|
|
|
|
- covered_cols = []
|
|
|
|
|
|
|
+ logger.info(f"🔧 OCR补偿参数: img_size=({img_w:.0f}×{img_h:.0f})")
|
|
|
|
|
+
|
|
|
|
|
+ # Step 1: 过滤OCR
|
|
|
|
|
+ valid_ocr = [
|
|
|
|
|
+ ocr for ocr in ocr_bboxes
|
|
|
|
|
+ if ocr.get('confidence', 1.0) >= min_confidence
|
|
|
|
|
+ and len(ocr.get('text', '').strip()) > 0
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ if not valid_ocr:
|
|
|
|
|
+ logger.debug(f"📊 OCR补偿: 过滤后无有效OCR")
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ # Step 2: 构建网格(使用线条中点作为分割线)
|
|
|
|
|
+ row_dividers = sorted(set((line[1] + line[3]) / 2 for line in rowboxes))
|
|
|
|
|
+ col_dividers = sorted(set((line[0] + line[2]) / 2 for line in colboxes))
|
|
|
|
|
+
|
|
|
|
|
+ # 添加图像边界
|
|
|
|
|
+ if not row_dividers or row_dividers[0] > 5:
|
|
|
|
|
+ row_dividers.insert(0, 0.0)
|
|
|
|
|
+ if not row_dividers or row_dividers[-1] < img_h - 5:
|
|
|
|
|
+ row_dividers.append(img_h)
|
|
|
|
|
+ if not col_dividers or col_dividers[0] > 5:
|
|
|
|
|
+ col_dividers.insert(0, 0.0)
|
|
|
|
|
+ if not col_dividers or col_dividers[-1] < img_w - 5:
|
|
|
|
|
+ col_dividers.append(img_w)
|
|
|
|
|
+
|
|
|
|
|
+ logger.debug(f"📊 网格: {len(row_dividers)-1}行 × {len(col_dividers)-1}列")
|
|
|
|
|
+
|
|
|
|
|
+ # Step 3: 将existing_bboxes映射到网格单元(支持跨行跨列)
|
|
|
|
|
+ grid = {} # {(row, col): True} - 标记已占用的单元格
|
|
|
|
|
+
|
|
|
|
|
+ def find_overlapping_cells(bbox: List[float]) -> List[tuple]:
|
|
|
|
|
+ """找到bbox覆盖的所有网格单元[(row, col), ...]"""
|
|
|
|
|
+ x1, y1, x2, y2 = bbox
|
|
|
|
|
+ cells = []
|
|
|
|
|
|
|
|
for i in range(len(row_dividers) - 1):
|
|
for i in range(len(row_dividers) - 1):
|
|
|
- if GridRecovery._has_overlap_1d(bbox[1], bbox[3], row_dividers[i], row_dividers[i+1]):
|
|
|
|
|
- covered_rows.append(i)
|
|
|
|
|
|
|
+ # 检查垂直方向重叠
|
|
|
|
|
+ grid_y1, grid_y2 = row_dividers[i], row_dividers[i + 1]
|
|
|
|
|
+ if max(y1, grid_y1) < min(y2, grid_y2): # 有重叠
|
|
|
|
|
+ for j in range(len(col_dividers) - 1):
|
|
|
|
|
+ # 检查水平方向重叠
|
|
|
|
|
+ grid_x1, grid_x2 = col_dividers[j], col_dividers[j + 1]
|
|
|
|
|
+ if max(x1, grid_x1) < min(x2, grid_x2): # 有重叠
|
|
|
|
|
+ cells.append((i, j))
|
|
|
|
|
|
|
|
- for j in range(len(col_dividers) - 1):
|
|
|
|
|
- if GridRecovery._has_overlap_1d(bbox[0], bbox[2], col_dividers[j], col_dividers[j+1]):
|
|
|
|
|
- covered_cols.append(j)
|
|
|
|
|
-
|
|
|
|
|
- # 标记覆盖的所有理论单元格
|
|
|
|
|
- for r in covered_rows:
|
|
|
|
|
- for c in covered_cols:
|
|
|
|
|
- existing_coverage.add((r, c))
|
|
|
|
|
|
|
+ return cells
|
|
|
|
|
|
|
|
- logger.debug(f"📊 理论网格: {len(row_dividers)-1}行 × {len(col_dividers)-1}列, 已覆盖: {len(existing_coverage)} 个单元格")
|
|
|
|
|
|
|
+ # 标记所有existing_bbox占用的网格单元
|
|
|
|
|
+ for bbox in existing_bboxes:
|
|
|
|
|
+ cells = find_overlapping_cells(bbox)
|
|
|
|
|
+ for cell in cells:
|
|
|
|
|
+ grid[cell] = True
|
|
|
|
|
|
|
|
- # 2. 遍历OCR结果,查找缺失的单元格
|
|
|
|
|
- compensated_bboxes = []
|
|
|
|
|
- ocr_processed = set() # 避免重复补偿
|
|
|
|
|
|
|
+ logger.debug(f"📊 已占用: {len(grid)}个网格单元 (共{(len(row_dividers)-1)*(len(col_dividers)-1)}个)")
|
|
|
|
|
+
|
|
|
|
|
+ # Step 4: 第一遍 - 为所有OCR找到其覆盖的空单元格(不扩展)
|
|
|
|
|
+ ocr_to_empty_cells = {} # {ocr_index: [empty_cells]}
|
|
|
|
|
|
|
|
- for ocr in ocr_bboxes:
|
|
|
|
|
|
|
+ for idx, ocr in enumerate(valid_ocr):
|
|
|
ocr_bbox = ocr['bbox']
|
|
ocr_bbox = ocr['bbox']
|
|
|
- ocr_text = ocr.get('text', '')
|
|
|
|
|
|
|
+ ocr_text = ocr.get('text', '')[:30]
|
|
|
|
|
|
|
|
- # 计算OCR bbox覆盖的理论网格区域
|
|
|
|
|
- covered_rows = []
|
|
|
|
|
- covered_cols = []
|
|
|
|
|
|
|
+ # 找到OCR覆盖的所有网格单元
|
|
|
|
|
+ overlapping_cells = find_overlapping_cells(ocr_bbox)
|
|
|
|
|
|
|
|
- for i in range(len(row_dividers) - 1):
|
|
|
|
|
- theoretical_bbox = [col_dividers[0], row_dividers[i], col_dividers[-1], row_dividers[i+1]]
|
|
|
|
|
- overlap = GridRecovery._compute_overlap_ratio(ocr_bbox, theoretical_bbox)
|
|
|
|
|
- if overlap > min_overlap_ratio * 0.5: # 行方向用更宽松的阈值
|
|
|
|
|
- covered_rows.append(i)
|
|
|
|
|
-
|
|
|
|
|
- for j in range(len(col_dividers) - 1):
|
|
|
|
|
- theoretical_bbox = [col_dividers[j], row_dividers[0], col_dividers[j+1], row_dividers[-1]]
|
|
|
|
|
- overlap = GridRecovery._compute_overlap_ratio(ocr_bbox, theoretical_bbox)
|
|
|
|
|
- if overlap > min_overlap_ratio * 0.5: # 列方向用更宽松的阈值
|
|
|
|
|
- covered_cols.append(j)
|
|
|
|
|
-
|
|
|
|
|
- if not covered_rows or not covered_cols:
|
|
|
|
|
|
|
+ if not overlapping_cells:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- # 找出缺失的单元格(逐个检查,而不是要求全部缺失)
|
|
|
|
|
- missing_cells = []
|
|
|
|
|
- for r in covered_rows:
|
|
|
|
|
- for c in covered_cols:
|
|
|
|
|
- if (r, c) not in existing_coverage:
|
|
|
|
|
- missing_cells.append((r, c))
|
|
|
|
|
-
|
|
|
|
|
- if not missing_cells:
|
|
|
|
|
- continue # 该OCR覆盖的区域没有缺失单元格
|
|
|
|
|
-
|
|
|
|
|
- # 检查缺失单元格是否有相邻单元格(避免孤立补偿)
|
|
|
|
|
- valid_missing_cells = []
|
|
|
|
|
- for r, c in missing_cells:
|
|
|
|
|
- # 检查上下左右是否有相邻单元格(已存在或待补偿)
|
|
|
|
|
- if ((r-1, c) in existing_coverage or
|
|
|
|
|
- (r+1, c) in existing_coverage or
|
|
|
|
|
- (r, c-1) in existing_coverage or
|
|
|
|
|
- (r, c+1) in existing_coverage or
|
|
|
|
|
- (r-1, c) in missing_cells or
|
|
|
|
|
- (r+1, c) in missing_cells or
|
|
|
|
|
- (r, c-1) in missing_cells or
|
|
|
|
|
- (r, c+1) in missing_cells):
|
|
|
|
|
- valid_missing_cells.append((r, c))
|
|
|
|
|
-
|
|
|
|
|
- if not valid_missing_cells:
|
|
|
|
|
- logger.debug(f"⚠️ 跳过孤立OCR: '{ocr_text[:20]}' at (R{covered_rows}C{covered_cols})")
|
|
|
|
|
|
|
+ # 找出未被占用的单元格
|
|
|
|
|
+ empty_cells = [cell for cell in overlapping_cells if cell not in grid]
|
|
|
|
|
+
|
|
|
|
|
+ if not empty_cells:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- # 新策略:通过观察相邻单元格的分布推断合并尺寸
|
|
|
|
|
- # 为每个缺失单元格分析其应该占据的网格范围
|
|
|
|
|
- for r, c in valid_missing_cells:
|
|
|
|
|
- # 避免重复补偿
|
|
|
|
|
- if (r, c) in ocr_processed:
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- # 分析该位置的行列跨度
|
|
|
|
|
- # 新策略:不仅检查当前OCR内的缺失单元格,还检查整个网格的覆盖情况
|
|
|
|
|
-
|
|
|
|
|
- # 1. 向下探测:检查同列(c)中有多少连续的缺失单元格(不限于当前OCR)
|
|
|
|
|
- rowspan_candidate = 1
|
|
|
|
|
- r_check = r + 1
|
|
|
|
|
- while r_check < len(row_dividers) - 1:
|
|
|
|
|
- # 关键改变:检查existing_coverage而不只是valid_missing_cells
|
|
|
|
|
- if (r_check, c) not in existing_coverage and (r_check, c) not in ocr_processed:
|
|
|
|
|
- rowspan_candidate += 1
|
|
|
|
|
- r_check += 1
|
|
|
|
|
- else:
|
|
|
|
|
|
|
+ # 检查是否是边缘单元格(至少一个空单元格与已占用单元格相邻)
|
|
|
|
|
+ has_neighbor = False
|
|
|
|
|
+ for row, col in empty_cells:
|
|
|
|
|
+ for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
|
|
|
|
|
+ neighbor = (row + dr, col + dc)
|
|
|
|
|
+ if neighbor in grid:
|
|
|
|
|
+ has_neighbor = True
|
|
|
break
|
|
break
|
|
|
|
|
+ if has_neighbor:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if not has_neighbor:
|
|
|
|
|
+ logger.debug(f"⏭️ 跳过OCR '{ocr_text}': 无相邻单元格")
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 记录这个OCR的初始空单元格
|
|
|
|
|
+ ocr_to_empty_cells[idx] = {
|
|
|
|
|
+ 'ocr': ocr,
|
|
|
|
|
+ 'empty_cells': empty_cells
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ logger.debug(f"📊 第一遍完成: {len(ocr_to_empty_cells)}个OCR需要补偿")
|
|
|
|
|
+
|
|
|
|
|
+ # Step 5: 第二遍 - 对所有标记的OCR区域统一扩展
|
|
|
|
|
+ # 🆕 辅助函数:检查侧边相邻列/行的已占用单元格边界
|
|
|
|
|
+ def get_side_boundary_for_vertical_expansion(current_min_col, current_max_col, direction='up'):
|
|
|
|
|
+ """向上/下扩展时,检查左右两侧相邻列的单元格边界"""
|
|
|
|
|
+ boundary_rows = []
|
|
|
|
|
+
|
|
|
|
|
+ # 检查左侧相邻列(current_min_col - 1)
|
|
|
|
|
+ if current_min_col > 0:
|
|
|
|
|
+ left_col = current_min_col - 1
|
|
|
|
|
+ occupied_rows_in_left = [r for r, c in grid.keys() if c == left_col]
|
|
|
|
|
+ if occupied_rows_in_left:
|
|
|
|
|
+ if direction == 'up':
|
|
|
|
|
+ boundary_rows.append(min(occupied_rows_in_left))
|
|
|
|
|
+ else: # down
|
|
|
|
|
+ boundary_rows.append(max(occupied_rows_in_left))
|
|
|
|
|
+
|
|
|
|
|
+ # 检查右侧相邻列(current_max_col + 1)
|
|
|
|
|
+ if current_max_col < len(col_dividers) - 2:
|
|
|
|
|
+ right_col = current_max_col + 1
|
|
|
|
|
+ occupied_rows_in_right = [r for r, c in grid.keys() if c == right_col]
|
|
|
|
|
+ if occupied_rows_in_right:
|
|
|
|
|
+ if direction == 'up':
|
|
|
|
|
+ boundary_rows.append(min(occupied_rows_in_right))
|
|
|
|
|
+ else: # down
|
|
|
|
|
+ boundary_rows.append(max(occupied_rows_in_right))
|
|
|
|
|
+
|
|
|
|
|
+ return boundary_rows
|
|
|
|
|
+
|
|
|
|
|
+ def get_side_boundary_for_horizontal_expansion(current_min_row, current_max_row, direction='left'):
|
|
|
|
|
+ """向左/右扩展时,检查上下两侧相邻行的单元格边界"""
|
|
|
|
|
+ boundary_cols = []
|
|
|
|
|
+
|
|
|
|
|
+ # 检查上侧相邻行(current_min_row - 1)
|
|
|
|
|
+ if current_min_row > 0:
|
|
|
|
|
+ top_row = current_min_row - 1
|
|
|
|
|
+ occupied_cols_in_top = [c for r, c in grid.keys() if r == top_row]
|
|
|
|
|
+ if occupied_cols_in_top:
|
|
|
|
|
+ if direction == 'left':
|
|
|
|
|
+ boundary_cols.append(min(occupied_cols_in_top))
|
|
|
|
|
+ else: # right
|
|
|
|
|
+ boundary_cols.append(max(occupied_cols_in_top))
|
|
|
|
|
+
|
|
|
|
|
+ # 检查下侧相邻行(current_max_row + 1)
|
|
|
|
|
+ if current_max_row < len(row_dividers) - 2:
|
|
|
|
|
+ bottom_row = current_max_row + 1
|
|
|
|
|
+ occupied_cols_in_bottom = [c for r, c in grid.keys() if r == bottom_row]
|
|
|
|
|
+ if occupied_cols_in_bottom:
|
|
|
|
|
+ if direction == 'left':
|
|
|
|
|
+ boundary_cols.append(min(occupied_cols_in_bottom))
|
|
|
|
|
+ else: # right
|
|
|
|
|
+ boundary_cols.append(max(occupied_cols_in_bottom))
|
|
|
|
|
+
|
|
|
|
|
+ return boundary_cols
|
|
|
|
|
+
|
|
|
|
|
+ # 对每个OCR区域进行扩展
|
|
|
|
|
+ for idx, ocr_data in ocr_to_empty_cells.items():
|
|
|
|
|
+ empty_cells = ocr_data['empty_cells']
|
|
|
|
|
+ ocr = ocr_data['ocr']
|
|
|
|
|
+ ocr_text = ocr.get('text', '')[:30]
|
|
|
|
|
+
|
|
|
|
|
+ # 向上下左右扩展连续的空单元格(必须与侧边已有单元格对齐)
|
|
|
|
|
+ expanded = set(empty_cells)
|
|
|
|
|
+ changed = True
|
|
|
|
|
+ while changed:
|
|
|
|
|
+ changed = False
|
|
|
|
|
+ current_min_row = min(r for r, c in expanded)
|
|
|
|
|
+ current_max_row = max(r for r, c in expanded)
|
|
|
|
|
+ current_min_col = min(c for r, c in expanded)
|
|
|
|
|
+ current_max_col = max(c for r, c in expanded)
|
|
|
|
|
|
|
|
- # 2. 向右探测:检查同行(r)中有多少连续的缺失单元格(不限于当前OCR)
|
|
|
|
|
- colspan_candidate = 1
|
|
|
|
|
- c_check = c + 1
|
|
|
|
|
- while c_check < len(col_dividers) - 1:
|
|
|
|
|
- # 关键改变:检查existing_coverage而不只是valid_missing_cells
|
|
|
|
|
- if (r, c_check) not in existing_coverage and (r, c_check) not in ocr_processed:
|
|
|
|
|
- colspan_candidate += 1
|
|
|
|
|
- c_check += 1
|
|
|
|
|
- else:
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ # 🆕 尝试向上扩展(整行都是空的,且不超过左右侧单元格的上边界)
|
|
|
|
|
+ if current_min_row > 0:
|
|
|
|
|
+ row_above = current_min_row - 1
|
|
|
|
|
+ # 检查该行是否都是空的
|
|
|
|
|
+ if all((row_above, col) not in grid for col in range(current_min_col, current_max_col + 1)):
|
|
|
|
|
+ # 🆕 检查左右侧相邻列的单元格最小行(上边界)
|
|
|
|
|
+ side_boundaries = get_side_boundary_for_vertical_expansion(
|
|
|
|
|
+ current_min_col, current_max_col, 'up'
|
|
|
|
|
+ )
|
|
|
|
|
+ can_expand = True
|
|
|
|
|
+ if side_boundaries:
|
|
|
|
|
+ # 左右侧单元格的最小行,不能扩展超过它
|
|
|
|
|
+ min_side_row = min(side_boundaries)
|
|
|
|
|
+ if row_above < min_side_row:
|
|
|
|
|
+ can_expand = False
|
|
|
|
|
+
|
|
|
|
|
+ if can_expand:
|
|
|
|
|
+ for col in range(current_min_col, current_max_col + 1):
|
|
|
|
|
+ expanded.add((row_above, col))
|
|
|
|
|
+ changed = True
|
|
|
|
|
|
|
|
- # 3. 验证:检查推断出的矩形区域内的所有单元格是否都缺失
|
|
|
|
|
- is_valid_merge = True
|
|
|
|
|
- cells_to_process = []
|
|
|
|
|
- if rowspan_candidate > 1 or colspan_candidate > 1:
|
|
|
|
|
- for rr in range(r, r + rowspan_candidate):
|
|
|
|
|
- for cc in range(c, c + colspan_candidate):
|
|
|
|
|
- if (rr, cc) in existing_coverage or (rr, cc) in ocr_processed:
|
|
|
|
|
- # 区域内有单元格已存在或已处理
|
|
|
|
|
- is_valid_merge = False
|
|
|
|
|
- break
|
|
|
|
|
- cells_to_process.append((rr, cc))
|
|
|
|
|
- if not is_valid_merge:
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ # 🆕 尝试向下扩展(整行都是空的,且不超过左右侧单元格的下边界)
|
|
|
|
|
+ if current_max_row < len(row_dividers) - 2:
|
|
|
|
|
+ row_below = current_max_row + 1
|
|
|
|
|
+ if all((row_below, col) not in grid for col in range(current_min_col, current_max_col + 1)):
|
|
|
|
|
+ side_boundaries = get_side_boundary_for_vertical_expansion(
|
|
|
|
|
+ current_min_col, current_max_col, 'down'
|
|
|
|
|
+ )
|
|
|
|
|
+ can_expand = True
|
|
|
|
|
+ if side_boundaries:
|
|
|
|
|
+ max_side_row = max(side_boundaries)
|
|
|
|
|
+ if row_below > max_side_row:
|
|
|
|
|
+ can_expand = False
|
|
|
|
|
+
|
|
|
|
|
+ if can_expand:
|
|
|
|
|
+ for col in range(current_min_col, current_max_col + 1):
|
|
|
|
|
+ expanded.add((row_below, col))
|
|
|
|
|
+ changed = True
|
|
|
|
|
|
|
|
- # 4. 如果不是有效的合并,降级为1×1
|
|
|
|
|
- if not is_valid_merge or not cells_to_process:
|
|
|
|
|
- rowspan_candidate = 1
|
|
|
|
|
- colspan_candidate = 1
|
|
|
|
|
- cells_to_process = [(r, c)]
|
|
|
|
|
|
|
+ # 🆕 尝试向左扩展(整列都是空的,且不超过上下侧单元格的左边界)
|
|
|
|
|
+ if current_min_col > 0:
|
|
|
|
|
+ col_left = current_min_col - 1
|
|
|
|
|
+ if all((row, col_left) not in grid for row in range(current_min_row, current_max_row + 1)):
|
|
|
|
|
+ side_boundaries = get_side_boundary_for_horizontal_expansion(
|
|
|
|
|
+ current_min_row, current_max_row, 'left'
|
|
|
|
|
+ )
|
|
|
|
|
+ can_expand = True
|
|
|
|
|
+ if side_boundaries:
|
|
|
|
|
+ min_side_col = min(side_boundaries)
|
|
|
|
|
+ if col_left < min_side_col:
|
|
|
|
|
+ can_expand = False
|
|
|
|
|
+
|
|
|
|
|
+ if can_expand:
|
|
|
|
|
+ for row in range(current_min_row, current_max_row + 1):
|
|
|
|
|
+ expanded.add((row, col_left))
|
|
|
|
|
+ changed = True
|
|
|
|
|
+
|
|
|
|
|
+ # 🆕 尝试向右扩展(整列都是空的,且不超过上下侧单元格的右边界)
|
|
|
|
|
+ if current_max_col < len(col_dividers) - 2:
|
|
|
|
|
+ col_right = current_max_col + 1
|
|
|
|
|
+ if all((row, col_right) not in grid for row in range(current_min_row, current_max_row + 1)):
|
|
|
|
|
+ side_boundaries = get_side_boundary_for_horizontal_expansion(
|
|
|
|
|
+ current_min_row, current_max_row, 'right'
|
|
|
|
|
+ )
|
|
|
|
|
+ can_expand = True
|
|
|
|
|
+ if side_boundaries:
|
|
|
|
|
+ max_side_col = max(side_boundaries)
|
|
|
|
|
+ if col_right > max_side_col:
|
|
|
|
|
+ can_expand = False
|
|
|
|
|
+
|
|
|
|
|
+ if can_expand:
|
|
|
|
|
+ for row in range(current_min_row, current_max_row + 1):
|
|
|
|
|
+ expanded.add((row, col_right))
|
|
|
|
|
+ changed = True
|
|
|
|
|
+
|
|
|
|
|
+ # 更新扩展后的空单元格
|
|
|
|
|
+ ocr_to_empty_cells[idx]['expanded_cells'] = list(expanded)
|
|
|
|
|
+
|
|
|
|
|
+ logger.debug(f"📊 第二遍完成: 所有OCR区域已扩展")
|
|
|
|
|
+
|
|
|
|
|
+ # Step 6: 第三遍 - 生成补偿bbox
|
|
|
|
|
+ compensated_bboxes = []
|
|
|
|
|
+
|
|
|
|
|
+ for idx, ocr_data in ocr_to_empty_cells.items():
|
|
|
|
|
+ empty_cells = ocr_data['expanded_cells']
|
|
|
|
|
+ ocr = ocr_data['ocr']
|
|
|
|
|
+ ocr_text = ocr.get('text', '')[:30]
|
|
|
|
|
+
|
|
|
|
|
+ # 找到所有空单元格的边界范围
|
|
|
|
|
+ min_row = min(r for r, c in empty_cells)
|
|
|
|
|
+ max_row = max(r for r, c in empty_cells)
|
|
|
|
|
+ min_col = min(c for r, c in empty_cells)
|
|
|
|
|
+ max_col = max(c for r, c in empty_cells)
|
|
|
|
|
+
|
|
|
|
|
+ # 使用网格边界作为bbox(精确对齐)
|
|
|
|
|
+ y1 = row_dividers[min_row]
|
|
|
|
|
+ y2 = row_dividers[max_row + 1]
|
|
|
|
|
+ x1 = col_dividers[min_col]
|
|
|
|
|
+ x2 = col_dividers[max_col + 1]
|
|
|
|
|
+
|
|
|
|
|
+ compensated_bbox = [x1, y1, x2, y2]
|
|
|
|
|
+ compensated_bboxes.append(compensated_bbox)
|
|
|
|
|
+
|
|
|
|
|
+ # 标记这些单元格为已占用
|
|
|
|
|
+ for row, col in empty_cells:
|
|
|
|
|
+ grid[(row, col)] = True
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ f"✅ 补偿单元格[{min_row}-{max_row},{min_col}-{max_col}]: '{ocr_text}' | "
|
|
|
|
|
+ f"bbox=[{x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f}] | "
|
|
|
|
|
+ f"占据{len(empty_cells)}个网格单元"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Step 5: Debug可视化
|
|
|
|
|
+ if debug_dir and compensated_bboxes:
|
|
|
|
|
+ try:
|
|
|
|
|
+ from pathlib import Path
|
|
|
|
|
+ vis_img = np.ones((int(img_h), int(img_w), 3), dtype=np.uint8) * 255
|
|
|
|
|
|
|
|
- # 生成补偿bbox
|
|
|
|
|
- compensated_bbox = [
|
|
|
|
|
- col_dividers[c],
|
|
|
|
|
- row_dividers[r],
|
|
|
|
|
- col_dividers[c + colspan_candidate],
|
|
|
|
|
- row_dividers[r + rowspan_candidate]
|
|
|
|
|
- ]
|
|
|
|
|
|
|
+ # 绘制网格线(灰色虚线)
|
|
|
|
|
+ for y in row_dividers:
|
|
|
|
|
+ cv2.line(vis_img, (0, int(y)), (int(img_w), int(y)), (200, 200, 200), 1, cv2.LINE_AA)
|
|
|
|
|
+ for x in col_dividers:
|
|
|
|
|
+ cv2.line(vis_img, (int(x), 0), (int(x), int(img_h)), (200, 200, 200), 1, cv2.LINE_AA)
|
|
|
|
|
|
|
|
- # 标记已处理的区域
|
|
|
|
|
- region_key = (r, r + rowspan_candidate - 1, c, c + colspan_candidate - 1)
|
|
|
|
|
- if region_key in ocr_processed:
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- # 标记所有涉及的单元格为已处理和已覆盖
|
|
|
|
|
- for rr, cc in cells_to_process:
|
|
|
|
|
- ocr_processed.add((rr, cc))
|
|
|
|
|
- existing_coverage.add((rr, cc))
|
|
|
|
|
|
|
+ # 绘制现有bbox(蓝色)
|
|
|
|
|
+ for bbox in existing_bboxes:
|
|
|
|
|
+ x1, y1, x2, y2 = [int(v) for v in bbox]
|
|
|
|
|
+ cv2.rectangle(vis_img, (x1, y1), (x2, y2), (255, 0, 0), 2)
|
|
|
|
|
|
|
|
- compensated_bboxes.append(compensated_bbox)
|
|
|
|
|
|
|
+ # 绘制补偿bbox(绿色)
|
|
|
|
|
+ for bbox in compensated_bboxes:
|
|
|
|
|
+ x1, y1, x2, y2 = [int(v) for v in bbox]
|
|
|
|
|
+ cv2.rectangle(vis_img, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
|
|
|
|
|
|
- merge_info = f"({rowspan_candidate}×{colspan_candidate}合并)" if rowspan_candidate > 1 or colspan_candidate > 1 else "(1×1)"
|
|
|
|
|
- logger.info(
|
|
|
|
|
- f"✨ 补偿单元格: '{ocr_text[:30]}' at R{r}C{c} {merge_info}"
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ out_path = Path(debug_dir) / f"{debug_prefix}step06_ocr_compensation.png"
|
|
|
|
|
+ cv2.imwrite(str(out_path), vis_img)
|
|
|
|
|
+ logger.debug(f"💾 Debug图: {out_path}")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"⚠️ Debug可视化失败: {e}")
|
|
|
|
|
|
|
|
|
|
+ logger.info(f"🎉 OCR补偿完成: +{len(compensated_bboxes)}个边缘单元格")
|
|
|
return compensated_bboxes
|
|
return compensated_bboxes
|
|
|
-
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def _has_overlap_1d(a1: float, a2: float, b1: float, b2: float) -> bool:
|
|
|
|
|
- """判断两个1维区间是否有重叠"""
|
|
|
|
|
- return max(a1, b1) < min(a2, b2)
|
|
|
|
|
-
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def _compute_overlap_ratio(bbox1: List[float], bbox2: List[float]) -> float:
|
|
|
|
|
- """计算bbox1与bbox2的重叠率(相对于bbox1的面积)"""
|
|
|
|
|
- x1 = max(bbox1[0], bbox2[0])
|
|
|
|
|
- y1 = max(bbox1[1], bbox2[1])
|
|
|
|
|
- x2 = min(bbox1[2], bbox2[2])
|
|
|
|
|
- y2 = min(bbox1[3], bbox2[3])
|
|
|
|
|
-
|
|
|
|
|
- if x2 <= x1 or y2 <= y1:
|
|
|
|
|
- return 0.0
|
|
|
|
|
-
|
|
|
|
|
- overlap_area = (x2 - x1) * (y2 - y1)
|
|
|
|
|
- bbox1_area = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
|
|
|
|
|
-
|
|
|
|
|
- return overlap_area / bbox1_area if bbox1_area > 0 else 0.0
|
|
|