|
@@ -147,25 +147,6 @@ class GridRecovery:
|
|
|
return valid_lines
|
|
return valid_lines
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
|
- def _filter_lines_by_bboxes(lines, bboxes, is_horizontal, tolerance=5.0):
|
|
|
|
|
- """过滤线条,只保留与bboxes边界对齐的线条"""
|
|
|
|
|
- if not bboxes:
|
|
|
|
|
- return lines
|
|
|
|
|
-
|
|
|
|
|
- if is_horizontal:
|
|
|
|
|
- bbox_coords = {bbox[1] for bbox in bboxes} | {bbox[3] for bbox in bboxes}
|
|
|
|
|
- else:
|
|
|
|
|
- bbox_coords = {bbox[0] for bbox in bboxes} | {bbox[2] for bbox in bboxes}
|
|
|
|
|
-
|
|
|
|
|
- filtered_lines = []
|
|
|
|
|
- for line in lines:
|
|
|
|
|
- line_coord = (line[1] + line[3]) / 2 if is_horizontal else (line[0] + line[2]) / 2
|
|
|
|
|
- if any(abs(line_coord - coord) < tolerance for coord in bbox_coords):
|
|
|
|
|
- filtered_lines.append(line)
|
|
|
|
|
-
|
|
|
|
|
- return filtered_lines
|
|
|
|
|
-
|
|
|
|
|
- @staticmethod
|
|
|
|
|
def _save_debug_image(debug_dir, debug_prefix, step_name, img, is_lines=False, lines=None):
|
|
def _save_debug_image(debug_dir, debug_prefix, step_name, img, is_lines=False, lines=None):
|
|
|
"""保存调试图片"""
|
|
"""保存调试图片"""
|
|
|
if not debug_dir:
|
|
if not debug_dir:
|
|
@@ -198,8 +179,6 @@ class GridRecovery:
|
|
|
debug_dir: Optional[str] = None,
|
|
debug_dir: Optional[str] = None,
|
|
|
debug_prefix: str = "",
|
|
debug_prefix: str = "",
|
|
|
crop_padding: int = 10, # 新增:裁剪时的padding值(原图坐标系)
|
|
crop_padding: int = 10, # 新增:裁剪时的padding值(原图坐标系)
|
|
|
- ocr_bboxes: Optional[List[Dict]] = None, # 🆕 整页OCR结果
|
|
|
|
|
- enable_ocr_edge_compensation: bool = True, # 🆕 是否启用OCR边缘补偿
|
|
|
|
|
) -> List[List[float]]:
|
|
) -> List[List[float]]:
|
|
|
"""
|
|
"""
|
|
|
基于矢量重构的连通域分析 (Advanced Vector-based Recovery)
|
|
基于矢量重构的连通域分析 (Advanced Vector-based Recovery)
|
|
@@ -210,7 +189,6 @@ class GridRecovery:
|
|
|
3. 线段归并/连接 (adjust_lines)
|
|
3. 线段归并/连接 (adjust_lines)
|
|
|
4. 几何延长线段 (Custom final_adjust_lines with larger threshold)
|
|
4. 几何延长线段 (Custom final_adjust_lines with larger threshold)
|
|
|
5. 重绘Mask并进行连通域分析
|
|
5. 重绘Mask并进行连通域分析
|
|
|
- 6. 🆕 OCR补偿未封闭的边缘单元格
|
|
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
|
hpred_up: 横线预测mask(上采样后)
|
|
hpred_up: 横线预测mask(上采样后)
|
|
@@ -221,14 +199,11 @@ class GridRecovery:
|
|
|
debug_dir: 调试输出目录 (Optional)
|
|
debug_dir: 调试输出目录 (Optional)
|
|
|
debug_prefix: 调试文件名前缀 (Optional)
|
|
debug_prefix: 调试文件名前缀 (Optional)
|
|
|
crop_padding: 裁剪时的padding值(原图坐标系,默认10px)
|
|
crop_padding: 裁剪时的padding值(原图坐标系,默认10px)
|
|
|
- ocr_bboxes: 🆕 整页OCR结果 [{'bbox': [x1,y1,x2,y2], 'text': str, 'confidence': float}, ...]
|
|
|
|
|
- enable_ocr_edge_compensation: 🆕 是否启用OCR边缘补偿(默认True)
|
|
|
|
|
|
|
|
|
|
注意:
|
|
注意:
|
|
|
- hpred_up/vpred_up 是上采样后的mask,坐标系已经放大了 upscale 倍
|
|
- hpred_up/vpred_up 是上采样后的mask,坐标系已经放大了 upscale 倍
|
|
|
- crop_padding 是原图坐标系的值,需要乘以 upscale 转换到mask坐标系
|
|
- crop_padding 是原图坐标系的值,需要乘以 upscale 转换到mask坐标系
|
|
|
- edge_margin 用于过滤贴近图像边缘的线条(padding区域的噪声)
|
|
- edge_margin 用于过滤贴近图像边缘的线条(padding区域的噪声)
|
|
|
- - ocr_bboxes坐标应为原图坐标系,补偿算法会自动处理坐标转换
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
Returns:
|
|
|
单元格bbox列表 [[x1, y1, x2, y2], ...] (原图坐标系)
|
|
单元格bbox列表 [[x1, y1, x2, y2], ...] (原图坐标系)
|
|
@@ -480,49 +455,6 @@ class GridRecovery:
|
|
|
else:
|
|
else:
|
|
|
logger.info(f"矢量重构分析提取到 {len(bboxes)} 个单元格 (Dynamic Alpha: {dynamic_alpha}, upscale={upscale:.3f})")
|
|
logger.info(f"矢量重构分析提取到 {len(bboxes)} 个单元格 (Dynamic Alpha: {dynamic_alpha}, upscale={upscale:.3f})")
|
|
|
|
|
|
|
|
- # 🆕 Step 6: OCR补偿未封闭的边缘单元格
|
|
|
|
|
- if enable_ocr_edge_compensation and ocr_bboxes and orig_h is not None and orig_w is not None:
|
|
|
|
|
- logger.info("━━━━━━━━ 🔍 OCR边缘补偿 ━━━━━━━━")
|
|
|
|
|
-
|
|
|
|
|
- # 转换线条坐标到原图坐标系 (从mask坐标系转换)
|
|
|
|
|
- rowboxes_orig = [
|
|
|
|
|
- [line[0] / scale_w, line[1] / scale_h, line[2] / scale_w, line[3] / scale_h]
|
|
|
|
|
- for line in rowboxes
|
|
|
|
|
- ]
|
|
|
|
|
- colboxes_orig = [
|
|
|
|
|
- [line[0] / scale_w, line[1] / scale_h, line[2] / scale_w, line[3] / scale_h]
|
|
|
|
|
- for line in colboxes
|
|
|
|
|
- ]
|
|
|
|
|
-
|
|
|
|
|
- # 过滤线条:只保留与existing_bboxes边界对齐的线条
|
|
|
|
|
- rowboxes_filtered = GridRecovery._filter_lines_by_bboxes(rowboxes_orig, bboxes, is_horizontal=True)
|
|
|
|
|
- colboxes_filtered = GridRecovery._filter_lines_by_bboxes(colboxes_orig, bboxes, is_horizontal=False)
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(
|
|
|
|
|
- f"🔍 线条过滤: 横线 {len(rowboxes_orig)}→{len(rowboxes_filtered)}, "
|
|
|
|
|
- f"竖线 {len(colboxes_orig)}→{len(colboxes_filtered)}"
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # 调用OCR补偿算法 (所有坐标均为原图坐标系)
|
|
|
|
|
- compensated_bboxes = GridRecovery._compensate_unclosed_cells(
|
|
|
|
|
- existing_bboxes=bboxes, # 已有bbox (原图坐标系)
|
|
|
|
|
- ocr_bboxes=ocr_bboxes, # OCR结果 (原图坐标系)
|
|
|
|
|
- rowboxes=rowboxes_filtered, # 🆕 使用过滤后的水平线
|
|
|
|
|
- colboxes=colboxes_filtered, # 🆕 使用过滤后的垂直线
|
|
|
|
|
- img_h=orig_h,
|
|
|
|
|
- img_w=orig_w,
|
|
|
|
|
- debug_dir=debug_dir,
|
|
|
|
|
- debug_prefix=debug_prefix
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- if compensated_bboxes:
|
|
|
|
|
- logger.info(f"✅ OCR补偿成功: +{len(compensated_bboxes)}个边缘单元格")
|
|
|
|
|
- bboxes.extend(compensated_bboxes)
|
|
|
|
|
- # 重新排序
|
|
|
|
|
- bboxes.sort(key=lambda b: (int(b[1] / 10), b[0]))
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info("ℹ️ OCR补偿: 无需补偿边缘单元格")
|
|
|
|
|
-
|
|
|
|
|
return bboxes
|
|
return bboxes
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
@@ -757,458 +689,3 @@ class GridRecovery:
|
|
|
new_cells.append(new_cell)
|
|
new_cells.append(new_cell)
|
|
|
|
|
|
|
|
return new_cells
|
|
return new_cells
|
|
|
-
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def _compensate_unclosed_cells(
|
|
|
|
|
- existing_bboxes: List[List[float]],
|
|
|
|
|
- ocr_bboxes: List[Dict],
|
|
|
|
|
- rowboxes: List[List[float]],
|
|
|
|
|
- colboxes: List[List[float]],
|
|
|
|
|
- img_h: float,
|
|
|
|
|
- img_w: float,
|
|
|
|
|
- min_confidence: float = 0.7,
|
|
|
|
|
- debug_dir: Optional[str] = None,
|
|
|
|
|
- debug_prefix: str = ""
|
|
|
|
|
- ) -> List[List[float]]:
|
|
|
|
|
- """
|
|
|
|
|
- 基于网格矩阵补偿未封闭的边缘单元格
|
|
|
|
|
-
|
|
|
|
|
- 新算法思路:
|
|
|
|
|
- 1. 从rowboxes/colboxes构建网格矩阵
|
|
|
|
|
- 2. 将existing_bboxes映射到网格单元
|
|
|
|
|
- 3. 检测空的边缘单元格(与已有单元格相邻)
|
|
|
|
|
- 4. 用OCR填充这些空单元格
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- existing_bboxes: 连通域检测到的bbox列表 (原图坐标系)
|
|
|
|
|
- ocr_bboxes: 整页OCR结果
|
|
|
|
|
- rowboxes: 水平线列表 (原图坐标系)
|
|
|
|
|
- colboxes: 垂直线列表 (原图坐标系)
|
|
|
|
|
- img_h, img_w: 原图尺寸
|
|
|
|
|
- min_confidence: OCR最小置信度阈值
|
|
|
|
|
- debug_dir, debug_prefix: Debug可视化参数
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- 补偿的bbox列表 (原图坐标系)
|
|
|
|
|
- """
|
|
|
|
|
- if not ocr_bboxes or not rowboxes or not colboxes:
|
|
|
|
|
- logger.debug("📊 OCR补偿: 缺少必要数据")
|
|
|
|
|
- return []
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"🔧 OCR补偿参数: img_size=({img_w:.0f}×{img_h:.0f})")
|
|
|
|
|
-
|
|
|
|
|
- # Step 1: 过滤OCR
|
|
|
|
|
- valid_ocr = [
|
|
|
|
|
- ocr for ocr in ocr_bboxes
|
|
|
|
|
- if ocr.get('confidence', 1.0) >= min_confidence
|
|
|
|
|
- and len(ocr.get('text', '').strip()) > 0
|
|
|
|
|
- ]
|
|
|
|
|
-
|
|
|
|
|
- if not valid_ocr:
|
|
|
|
|
- logger.debug(f"📊 OCR补偿: 过滤后无有效OCR")
|
|
|
|
|
- return []
|
|
|
|
|
-
|
|
|
|
|
- # Step 2: 构建网格(使用线条中点作为分割线)
|
|
|
|
|
- row_dividers = sorted(set((line[1] + line[3]) / 2 for line in rowboxes))
|
|
|
|
|
- col_dividers = sorted(set((line[0] + line[2]) / 2 for line in colboxes))
|
|
|
|
|
-
|
|
|
|
|
- # 添加图像边界
|
|
|
|
|
- if not row_dividers or row_dividers[0] > 5:
|
|
|
|
|
- row_dividers.insert(0, 0.0)
|
|
|
|
|
- if not row_dividers or row_dividers[-1] < img_h - 5:
|
|
|
|
|
- row_dividers.append(img_h)
|
|
|
|
|
- if not col_dividers or col_dividers[0] > 5:
|
|
|
|
|
- col_dividers.insert(0, 0.0)
|
|
|
|
|
- if not col_dividers or col_dividers[-1] < img_w - 5:
|
|
|
|
|
- col_dividers.append(img_w)
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(f"📊 网格: {len(row_dividers)-1}行 × {len(col_dividers)-1}列")
|
|
|
|
|
-
|
|
|
|
|
- # Step 3: 将existing_bboxes映射到网格单元(支持跨行跨列)
|
|
|
|
|
- grid = {} # {(row, col): True} - 标记已占用的单元格
|
|
|
|
|
-
|
|
|
|
|
- def find_overlapping_cells(bbox: List[float]) -> List[tuple]:
|
|
|
|
|
- """找到bbox覆盖的所有网格单元[(row, col), ...]"""
|
|
|
|
|
- x1, y1, x2, y2 = bbox
|
|
|
|
|
- cells = []
|
|
|
|
|
-
|
|
|
|
|
- for i in range(len(row_dividers) - 1):
|
|
|
|
|
- # 检查垂直方向重叠
|
|
|
|
|
- grid_y1, grid_y2 = row_dividers[i], row_dividers[i + 1]
|
|
|
|
|
- if max(y1, grid_y1) < min(y2, grid_y2): # 有重叠
|
|
|
|
|
- for j in range(len(col_dividers) - 1):
|
|
|
|
|
- # 检查水平方向重叠
|
|
|
|
|
- grid_x1, grid_x2 = col_dividers[j], col_dividers[j + 1]
|
|
|
|
|
- if max(x1, grid_x1) < min(x2, grid_x2): # 有重叠
|
|
|
|
|
- cells.append((i, j))
|
|
|
|
|
-
|
|
|
|
|
- return cells
|
|
|
|
|
-
|
|
|
|
|
- # 标记所有existing_bbox占用的网格单元
|
|
|
|
|
- for bbox in existing_bboxes:
|
|
|
|
|
- cells = find_overlapping_cells(bbox)
|
|
|
|
|
- for cell in cells:
|
|
|
|
|
- grid[cell] = True
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(f"📊 已占用: {len(grid)}个网格单元 (共{(len(row_dividers)-1)*(len(col_dividers)-1)}个)")
|
|
|
|
|
-
|
|
|
|
|
- # Step 4: 迭代补偿 - 多轮查找有相邻单元格的OCR
|
|
|
|
|
- # 第一轮补偿的OCR成为"已占用",让后续OCR能找到相邻单元格
|
|
|
|
|
- ocr_to_empty_cells = {} # {ocr_index: {'ocr', 'empty_cells'}}
|
|
|
|
|
- remaining_ocr_indices = set(range(len(valid_ocr))) # 剩余未处理的OCR索引
|
|
|
|
|
- iteration = 0
|
|
|
|
|
- max_iterations = 10 # 防止无限循环
|
|
|
|
|
-
|
|
|
|
|
- while remaining_ocr_indices and iteration < max_iterations:
|
|
|
|
|
- iteration += 1
|
|
|
|
|
- newly_added = {} # 本轮新增的OCR
|
|
|
|
|
-
|
|
|
|
|
- for idx in list(remaining_ocr_indices):
|
|
|
|
|
- ocr = valid_ocr[idx]
|
|
|
|
|
- ocr_bbox = ocr['bbox']
|
|
|
|
|
- ocr_text = ocr.get('text', '')[:30]
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 使用OCR bbox的中心点查找所在单元格,避免跨多行/列的错误映射
|
|
|
|
|
- ocr_center_x = (ocr_bbox[0] + ocr_bbox[2]) / 2
|
|
|
|
|
- ocr_center_y = (ocr_bbox[1] + ocr_bbox[3]) / 2
|
|
|
|
|
-
|
|
|
|
|
- # 找到中心点所在的行和列
|
|
|
|
|
- center_row = None
|
|
|
|
|
- center_col = None
|
|
|
|
|
- for i in range(len(row_dividers) - 1):
|
|
|
|
|
- if row_dividers[i] <= ocr_center_y < row_dividers[i + 1]:
|
|
|
|
|
- center_row = i
|
|
|
|
|
- break
|
|
|
|
|
- for j in range(len(col_dividers) - 1):
|
|
|
|
|
- if col_dividers[j] <= ocr_center_x < col_dividers[j + 1]:
|
|
|
|
|
- center_col = j
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- if center_row is None or center_col is None:
|
|
|
|
|
- logger.debug(
|
|
|
|
|
- f"⏭️ 跳过OCR '{ocr_text}': 中心点({ocr_center_x:.1f},{ocr_center_y:.1f})不在网格内"
|
|
|
|
|
- )
|
|
|
|
|
- remaining_ocr_indices.remove(idx)
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- # 检查中心点所在单元格是否为空
|
|
|
|
|
- center_cell = (center_row, center_col)
|
|
|
|
|
- if center_cell in grid:
|
|
|
|
|
- logger.debug(
|
|
|
|
|
- f"⏭️ 跳过OCR '{ocr_text}': 单元格[{center_row},{center_col}]已被占用"
|
|
|
|
|
- )
|
|
|
|
|
- remaining_ocr_indices.remove(idx)
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- # 只使用中心点所在的单元格作为初始empty_cells
|
|
|
|
|
- empty_cells = [center_cell]
|
|
|
|
|
-
|
|
|
|
|
- # 检查是否是边缘单元格(至少一个空单元格与已占用单元格相邻)
|
|
|
|
|
- has_neighbor = False
|
|
|
|
|
- for row, col in empty_cells:
|
|
|
|
|
- for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
|
|
|
|
|
- neighbor = (row + dr, col + dc)
|
|
|
|
|
- if neighbor in grid:
|
|
|
|
|
- has_neighbor = True
|
|
|
|
|
- break
|
|
|
|
|
- if has_neighbor:
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- if not has_neighbor:
|
|
|
|
|
- # 本轮没有相邻单元格,留到下一轮
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- # 找到有相邻单元格的OCR,添加到本轮结果
|
|
|
|
|
- newly_added[idx] = {
|
|
|
|
|
- 'ocr': ocr,
|
|
|
|
|
- 'empty_cells': empty_cells
|
|
|
|
|
- }
|
|
|
|
|
- remaining_ocr_indices.remove(idx)
|
|
|
|
|
-
|
|
|
|
|
- if not newly_added:
|
|
|
|
|
- # 本轮没有新增OCR,终止迭代
|
|
|
|
|
- logger.debug(f"📊 迭代终止: 第{iteration}轮无新增OCR")
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- # 将本轮新增的OCR添加到总结果
|
|
|
|
|
- ocr_to_empty_cells.update(newly_added)
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 立即将本轮新增的OCR标记到grid,作为下一轮的"已占用单元格"
|
|
|
|
|
- for idx, ocr_data in newly_added.items():
|
|
|
|
|
- for cell in ocr_data['empty_cells']:
|
|
|
|
|
- grid[cell] = True
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(
|
|
|
|
|
- f"📊 第{iteration}轮: 新增{len(newly_added)}个OCR, "
|
|
|
|
|
- f"剩余{len(remaining_ocr_indices)}个待处理"
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- if remaining_ocr_indices:
|
|
|
|
|
- logger.debug(
|
|
|
|
|
- f"⏭️ {len(remaining_ocr_indices)}个OCR无法补偿(无相邻单元格或超出迭代次数)"
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(f"📊 Step 4完成: {len(ocr_to_empty_cells)}个OCR需要补偿(共{iteration}轮迭代)")
|
|
|
|
|
-
|
|
|
|
|
- # Step 5: grid已在迭代过程中更新,跳过
|
|
|
|
|
- # (不需要再次标记,因为每轮迭代都已经更新了grid)
|
|
|
|
|
-
|
|
|
|
|
- # Step 6: 去除边缘整行或整列的空网格(确定表格实际内容边界)
|
|
|
|
|
- occupied_rows = set(r for r, c in grid.keys())
|
|
|
|
|
- occupied_cols = set(c for r, c in grid.keys())
|
|
|
|
|
-
|
|
|
|
|
- if not occupied_rows or not occupied_cols:
|
|
|
|
|
- logger.warning("⚠️ 没有占用的单元格,无法确定表格边界")
|
|
|
|
|
- return []
|
|
|
|
|
-
|
|
|
|
|
- # 确定表格实际内容范围
|
|
|
|
|
- content_min_row = min(occupied_rows)
|
|
|
|
|
- content_max_row = max(occupied_rows)
|
|
|
|
|
- content_min_col = min(occupied_cols)
|
|
|
|
|
- content_max_col = max(occupied_cols)
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(
|
|
|
|
|
- f"📊 Step 6完成: 表格内容边界 = "
|
|
|
|
|
- f"row[{content_min_row}-{content_max_row}] × col[{content_min_col}-{content_max_col}]"
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 不恢复grid状态,保持OCR单元格的临时标记
|
|
|
|
|
- # 这样在扩展时,每个OCR都能看到其他OCR的占用,避免重复扩展
|
|
|
|
|
-
|
|
|
|
|
- # Step 7: 对所有标记的OCR区域统一扩展(只能向表格内部扩展)
|
|
|
|
|
- # 🆕 辅助函数:检查侧边相邻列/行的已占用单元格边界
|
|
|
|
|
- def get_side_boundary_for_vertical_expansion(current_min_col, current_max_col, direction='up'):
|
|
|
|
|
- """向上/下扩展时,检查左右两侧相邻列的单元格边界"""
|
|
|
|
|
- boundary_rows = []
|
|
|
|
|
-
|
|
|
|
|
- # 检查左侧相邻列(current_min_col - 1)
|
|
|
|
|
- if current_min_col > 0:
|
|
|
|
|
- left_col = current_min_col - 1
|
|
|
|
|
- occupied_rows_in_left = [r for r, c in grid.keys() if c == left_col]
|
|
|
|
|
- if occupied_rows_in_left:
|
|
|
|
|
- if direction == 'up':
|
|
|
|
|
- boundary_rows.append(min(occupied_rows_in_left))
|
|
|
|
|
- else: # down
|
|
|
|
|
- boundary_rows.append(max(occupied_rows_in_left))
|
|
|
|
|
-
|
|
|
|
|
- # 检查右侧相邻列(current_max_col + 1)
|
|
|
|
|
- if current_max_col < len(col_dividers) - 2:
|
|
|
|
|
- right_col = current_max_col + 1
|
|
|
|
|
- occupied_rows_in_right = [r for r, c in grid.keys() if c == right_col]
|
|
|
|
|
- if occupied_rows_in_right:
|
|
|
|
|
- if direction == 'up':
|
|
|
|
|
- boundary_rows.append(min(occupied_rows_in_right))
|
|
|
|
|
- else: # down
|
|
|
|
|
- boundary_rows.append(max(occupied_rows_in_right))
|
|
|
|
|
-
|
|
|
|
|
- return boundary_rows
|
|
|
|
|
-
|
|
|
|
|
- def get_side_boundary_for_horizontal_expansion(current_min_row, current_max_row, direction='left'):
|
|
|
|
|
- """向左/右扩展时,检查上下两侧相邻行的单元格边界"""
|
|
|
|
|
- boundary_cols = []
|
|
|
|
|
-
|
|
|
|
|
- # 检查上侧相邻行(current_min_row - 1)
|
|
|
|
|
- if current_min_row > 0:
|
|
|
|
|
- top_row = current_min_row - 1
|
|
|
|
|
- occupied_cols_in_top = [c for r, c in grid.keys() if r == top_row]
|
|
|
|
|
- if occupied_cols_in_top:
|
|
|
|
|
- if direction == 'left':
|
|
|
|
|
- boundary_cols.append(min(occupied_cols_in_top))
|
|
|
|
|
- else: # right
|
|
|
|
|
- boundary_cols.append(max(occupied_cols_in_top))
|
|
|
|
|
-
|
|
|
|
|
- # 检查下侧相邻行(current_max_row + 1)
|
|
|
|
|
- if current_max_row < len(row_dividers) - 2:
|
|
|
|
|
- bottom_row = current_max_row + 1
|
|
|
|
|
- occupied_cols_in_bottom = [c for r, c in grid.keys() if r == bottom_row]
|
|
|
|
|
- if occupied_cols_in_bottom:
|
|
|
|
|
- if direction == 'left':
|
|
|
|
|
- boundary_cols.append(min(occupied_cols_in_bottom))
|
|
|
|
|
- else: # right
|
|
|
|
|
- boundary_cols.append(max(occupied_cols_in_bottom))
|
|
|
|
|
-
|
|
|
|
|
- return boundary_cols
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 逐个处理每个OCR,扩展完立即更新grid状态
|
|
|
|
|
- # 这样后续OCR能看到前面OCR已经扩展占据的单元格,避免重复扩展
|
|
|
|
|
- for idx, ocr_data in ocr_to_empty_cells.items():
|
|
|
|
|
- empty_cells = ocr_data['empty_cells']
|
|
|
|
|
- ocr = ocr_data['ocr']
|
|
|
|
|
- ocr_text = ocr.get('text', '')[:30]
|
|
|
|
|
-
|
|
|
|
|
- # 向上下左右扩展连续的空单元格(只能在表格内容边界内扩展)
|
|
|
|
|
- expanded = set(empty_cells)
|
|
|
|
|
- changed = True
|
|
|
|
|
- while changed:
|
|
|
|
|
- changed = False
|
|
|
|
|
- current_min_row = min(r for r, c in expanded)
|
|
|
|
|
- current_max_row = max(r for r, c in expanded)
|
|
|
|
|
- current_min_col = min(c for r, c in expanded)
|
|
|
|
|
- current_max_col = max(c for r, c in expanded)
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 向上扩展(不能超过表格内容上边界 content_min_row)
|
|
|
|
|
- if current_min_row > content_min_row:
|
|
|
|
|
- row_above = current_min_row - 1
|
|
|
|
|
- # 检查该行是否都是空的
|
|
|
|
|
- if all((row_above, col) not in grid for col in range(current_min_col, current_max_col + 1)):
|
|
|
|
|
- # 检查左右侧相邻列的单元格最小行(上边界)
|
|
|
|
|
- side_boundaries = get_side_boundary_for_vertical_expansion(
|
|
|
|
|
- current_min_col, current_max_col, 'up'
|
|
|
|
|
- )
|
|
|
|
|
- can_expand = True
|
|
|
|
|
- if side_boundaries:
|
|
|
|
|
- # 左右侧单元格的最小行,不能扩展超过它
|
|
|
|
|
- min_side_row = min(side_boundaries)
|
|
|
|
|
- if row_above < min_side_row:
|
|
|
|
|
- can_expand = False
|
|
|
|
|
-
|
|
|
|
|
- if can_expand:
|
|
|
|
|
- for col in range(current_min_col, current_max_col + 1):
|
|
|
|
|
- expanded.add((row_above, col))
|
|
|
|
|
- changed = True
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 向下扩展(不能超过表格内容下边界 content_max_row)
|
|
|
|
|
- if current_max_row < content_max_row:
|
|
|
|
|
- row_below = current_max_row + 1
|
|
|
|
|
- if all((row_below, col) not in grid for col in range(current_min_col, current_max_col + 1)):
|
|
|
|
|
- side_boundaries = get_side_boundary_for_vertical_expansion(
|
|
|
|
|
- current_min_col, current_max_col, 'down'
|
|
|
|
|
- )
|
|
|
|
|
- can_expand = True
|
|
|
|
|
- if side_boundaries:
|
|
|
|
|
- max_side_row = max(side_boundaries)
|
|
|
|
|
- if row_below > max_side_row:
|
|
|
|
|
- can_expand = False
|
|
|
|
|
-
|
|
|
|
|
- if can_expand:
|
|
|
|
|
- for col in range(current_min_col, current_max_col + 1):
|
|
|
|
|
- expanded.add((row_below, col))
|
|
|
|
|
- changed = True
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 向左扩展(不能超过表格内容左边界 content_min_col)
|
|
|
|
|
- if current_min_col > content_min_col:
|
|
|
|
|
- col_left = current_min_col - 1
|
|
|
|
|
- if all((row, col_left) not in grid for row in range(current_min_row, current_max_row + 1)):
|
|
|
|
|
- side_boundaries = get_side_boundary_for_horizontal_expansion(
|
|
|
|
|
- current_min_row, current_max_row, 'left'
|
|
|
|
|
- )
|
|
|
|
|
- can_expand = True
|
|
|
|
|
- if side_boundaries:
|
|
|
|
|
- min_side_col = min(side_boundaries)
|
|
|
|
|
- if col_left < min_side_col:
|
|
|
|
|
- can_expand = False
|
|
|
|
|
-
|
|
|
|
|
- if can_expand:
|
|
|
|
|
- for row in range(current_min_row, current_max_row + 1):
|
|
|
|
|
- expanded.add((row, col_left))
|
|
|
|
|
- changed = True
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 向右扩展(不能超过表格内容右边界 content_max_col)
|
|
|
|
|
- if current_max_col < content_max_col:
|
|
|
|
|
- col_right = current_max_col + 1
|
|
|
|
|
- if all((row, col_right) not in grid for row in range(current_min_row, current_max_row + 1)):
|
|
|
|
|
- side_boundaries = get_side_boundary_for_horizontal_expansion(
|
|
|
|
|
- current_min_row, current_max_row, 'right'
|
|
|
|
|
- )
|
|
|
|
|
- can_expand = True
|
|
|
|
|
- if side_boundaries:
|
|
|
|
|
- max_side_col = max(side_boundaries)
|
|
|
|
|
- if col_right > max_side_col:
|
|
|
|
|
- can_expand = False
|
|
|
|
|
-
|
|
|
|
|
- if can_expand:
|
|
|
|
|
- for row in range(current_min_row, current_max_row + 1):
|
|
|
|
|
- expanded.add((row, col_right))
|
|
|
|
|
- changed = True
|
|
|
|
|
-
|
|
|
|
|
- # 🆕 扩展完成后,立即将扩展后的单元格标记到grid中
|
|
|
|
|
- # 这样后续OCR扩展时能看到这个OCR占据的区域,避免重复扩展
|
|
|
|
|
- for cell in expanded:
|
|
|
|
|
- grid[cell] = True
|
|
|
|
|
-
|
|
|
|
|
- # 更新扩展后的空单元格
|
|
|
|
|
- ocr_to_empty_cells[idx]['expanded_cells'] = list(expanded)
|
|
|
|
|
- logger.debug(f" OCR '{ocr_text}' 扩展完成: {list(expanded)}")
|
|
|
|
|
-
|
|
|
|
|
- logger.debug(f"📊 Step 7完成: 所有OCR区域已扩展")
|
|
|
|
|
-
|
|
|
|
|
- # Step 8: 生成补偿bbox
|
|
|
|
|
- compensated_bboxes = []
|
|
|
|
|
-
|
|
|
|
|
- for idx, ocr_data in ocr_to_empty_cells.items():
|
|
|
|
|
- empty_cells = ocr_data['expanded_cells']
|
|
|
|
|
- ocr = ocr_data['ocr']
|
|
|
|
|
- ocr_text = ocr.get('text', '')[:30]
|
|
|
|
|
-
|
|
|
|
|
- # 找到所有空单元格的边界范围
|
|
|
|
|
- min_row = min(r for r, c in empty_cells)
|
|
|
|
|
- max_row = max(r for r, c in empty_cells)
|
|
|
|
|
- min_col = min(c for r, c in empty_cells)
|
|
|
|
|
- max_col = max(c for r, c in empty_cells)
|
|
|
|
|
-
|
|
|
|
|
- # 使用网格边界作为bbox(精确对齐)
|
|
|
|
|
- # 显式转换为Python float,避免numpy.float32导致JSON序列化错误
|
|
|
|
|
- y1 = float(row_dividers[min_row])
|
|
|
|
|
- y2 = float(row_dividers[max_row + 1])
|
|
|
|
|
- x1 = float(col_dividers[min_col])
|
|
|
|
|
- x2 = float(col_dividers[max_col + 1])
|
|
|
|
|
-
|
|
|
|
|
- compensated_bbox = [x1, y1, x2, y2]
|
|
|
|
|
- compensated_bboxes.append(compensated_bbox)
|
|
|
|
|
-
|
|
|
|
|
- # 标记这些单元格为已占用
|
|
|
|
|
- for row, col in empty_cells:
|
|
|
|
|
- grid[(row, col)] = True
|
|
|
|
|
-
|
|
|
|
|
- logger.info(
|
|
|
|
|
- f"✅ 补偿单元格[{min_row}-{max_row},{min_col}-{max_col}]: '{ocr_text}' | "
|
|
|
|
|
- f"bbox=[{x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f}] | "
|
|
|
|
|
- f"占据{len(empty_cells)}个网格单元"
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- # Step 5: Debug可视化(增强版:颜色区分原有/补偿单元格)
|
|
|
|
|
- if debug_dir and compensated_bboxes:
|
|
|
|
|
- try:
|
|
|
|
|
- from pathlib import Path
|
|
|
|
|
- vis_img = np.ones((int(img_h), int(img_w), 3), dtype=np.uint8) * 255
|
|
|
|
|
-
|
|
|
|
|
- # 绘制网格线(浅灰色虚线)
|
|
|
|
|
- for y in row_dividers:
|
|
|
|
|
- cv2.line(vis_img, (0, int(y)), (int(img_w), int(y)), (220, 220, 220), 1, cv2.LINE_AA)
|
|
|
|
|
- for x in col_dividers:
|
|
|
|
|
- cv2.line(vis_img, (int(x), 0), (int(x), int(img_h)), (220, 220, 220), 1, cv2.LINE_AA)
|
|
|
|
|
-
|
|
|
|
|
- # 绘制现有bbox(绿色 - 原有单元格)
|
|
|
|
|
- for bbox in existing_bboxes:
|
|
|
|
|
- x1, y1, x2, y2 = [int(v) for v in bbox]
|
|
|
|
|
- cv2.rectangle(vis_img, (x1, y1), (x2, y2), (0, 200, 0), 2)
|
|
|
|
|
-
|
|
|
|
|
- # 绘制补偿bbox(橙色 - 补偿单元格,加粗)
|
|
|
|
|
- for bbox in compensated_bboxes:
|
|
|
|
|
- x1, y1, x2, y2 = [int(v) for v in bbox]
|
|
|
|
|
- cv2.rectangle(vis_img, (x1, y1), (x2, y2), (0, 165, 255), 3) # 橙色,线宽3
|
|
|
|
|
-
|
|
|
|
|
- # 添加图例和统计信息
|
|
|
|
|
- legend_y = 30
|
|
|
|
|
- cv2.putText(vis_img, f"OCR Compensation: +{len(compensated_bboxes)} cells", (10, legend_y),
|
|
|
|
|
- cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
|
|
|
|
|
- legend_y += 35
|
|
|
|
|
- cv2.putText(vis_img, f"Green: Original ({len(existing_bboxes)})", (10, legend_y),
|
|
|
|
|
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 200, 0), 2)
|
|
|
|
|
- legend_y += 30
|
|
|
|
|
- cv2.putText(vis_img, f"Orange: Compensated ({len(compensated_bboxes)})", (10, legend_y),
|
|
|
|
|
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2)
|
|
|
|
|
- legend_y += 30
|
|
|
|
|
- cv2.putText(vis_img, f"Gray: Grid lines ({len(row_dividers)-1}x{len(col_dividers)-1})", (10, legend_y),
|
|
|
|
|
- cv2.FONT_HERSHEY_SIMPLEX, 0.6, (150, 150, 150), 2)
|
|
|
|
|
-
|
|
|
|
|
- out_path = Path(debug_dir) / f"{debug_prefix}step06_ocr_compensation.png"
|
|
|
|
|
- cv2.imwrite(str(out_path), vis_img)
|
|
|
|
|
- logger.info(f"💾 OCR补偿可视化已保存: {out_path}")
|
|
|
|
|
- logger.info(f" 📊 单元格统计: 原有={len(existing_bboxes)}, 补偿={len(compensated_bboxes)}, "
|
|
|
|
|
- f"总计={len(existing_bboxes) + len(compensated_bboxes)}")
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.warning(f"⚠️ Debug可视化失败: {e}")
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"🎉 OCR补偿完成: +{len(compensated_bboxes)}个边缘单元格")
|
|
|
|
|
- return compensated_bboxes
|
|
|