|
@@ -3,6 +3,7 @@ import html
|
|
|
import copy
|
|
import copy
|
|
|
from typing import Any, Dict, List, Tuple, Optional, cast
|
|
from typing import Any, Dict, List, Tuple, Optional, cast
|
|
|
import ast
|
|
import ast
|
|
|
|
|
+from dataclasses import dataclass
|
|
|
|
|
|
|
|
import cv2
|
|
import cv2
|
|
|
import numpy as np
|
|
import numpy as np
|
|
@@ -28,6 +29,17 @@ class MinerUWiredTableRecognizer:
|
|
|
- recognize_v4(): 改进流程,使用自定义HTML生成和文本填充(支持data-bbox属性)
|
|
- recognize_v4(): 改进流程,使用自定义HTML生成和文本填充(支持data-bbox属性)
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
|
|
+ @dataclass
|
|
|
|
|
+ class DebugOptions:
|
|
|
|
|
+ enabled: bool = False
|
|
|
|
|
+ output_dir: Optional[str] = None
|
|
|
|
|
+ save_table_lines: bool = False
|
|
|
|
|
+ save_connected_components: bool = False
|
|
|
|
|
+ save_grid_structure: bool = False
|
|
|
|
|
+ save_text_overlay: bool = False
|
|
|
|
|
+ image_format: str = "png"
|
|
|
|
|
+ prefix: str = ""
|
|
|
|
|
+
|
|
|
def __init__(self, config: Dict[str, Any], ocr_engine: Any):
|
|
def __init__(self, config: Dict[str, Any], ocr_engine: Any):
|
|
|
self.config = config or {}
|
|
self.config = config or {}
|
|
|
self.upscale_ratio: float = self.config.get("upscale_ratio", 10 / 3)
|
|
self.upscale_ratio: float = self.config.get("upscale_ratio", 10 / 3)
|
|
@@ -41,6 +53,48 @@ class MinerUWiredTableRecognizer:
|
|
|
self.table_model = UnetTableModel(ocr_engine)
|
|
self.table_model = UnetTableModel(ocr_engine)
|
|
|
self.ocr_engine = ocr_engine
|
|
self.ocr_engine = ocr_engine
|
|
|
|
|
|
|
|
|
|
+ # 统一的调试选项,默认关闭写盘
|
|
|
|
|
+ self.debug_options = self._merge_debug_options(self.config.get("debug_options"))
|
|
|
|
|
+
|
|
|
|
|
+ # ======== Debug 选项与保存工具 ========
|
|
|
|
|
+ def _merge_debug_options(self, override: Optional[Dict[str, Any]] = None) -> "MinerUWiredTableRecognizer.DebugOptions":
|
|
|
|
|
+ base_dir = None
|
|
|
|
|
+ # 兼容旧配置键
|
|
|
|
|
+ if isinstance(self.config.get("debug_output_dir"), str):
|
|
|
|
|
+ base_dir = self.config.get("debug_output_dir")
|
|
|
|
|
+ opts = MinerUWiredTableRecognizer.DebugOptions(
|
|
|
|
|
+ enabled=bool(self.config.get("debug_enabled", False)),
|
|
|
|
|
+ output_dir=base_dir,
|
|
|
|
|
+ save_table_lines=bool(self.config.get("save_table_lines", False)),
|
|
|
|
|
+ save_connected_components=bool(self.config.get("save_connected_components", False)),
|
|
|
|
|
+ save_grid_structure=bool(self.config.get("save_grid_structure", False)),
|
|
|
|
|
+ save_text_overlay=bool(self.config.get("save_text_overlay", False)),
|
|
|
|
|
+ image_format=str(self.config.get("debug_image_format", "png")),
|
|
|
|
|
+ prefix=str(self.config.get("debug_prefix", "")),
|
|
|
|
|
+ )
|
|
|
|
|
+ if override and isinstance(override, dict):
|
|
|
|
|
+ # 覆盖层允许临时启用或指定目录
|
|
|
|
|
+ for k, v in override.items():
|
|
|
|
|
+ if hasattr(opts, k):
|
|
|
|
|
+ setattr(opts, k, v)
|
|
|
|
|
+ return opts
|
|
|
|
|
+
|
|
|
|
|
+ def _debug_is_on(self, flag: str, opts: Optional["MinerUWiredTableRecognizer.DebugOptions"] = None) -> bool:
|
|
|
|
|
+ o = opts or self.debug_options
|
|
|
|
|
+ if not o or not o.enabled:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if not o.output_dir:
|
|
|
|
|
+ return False
|
|
|
|
|
+ return bool(getattr(o, flag, False))
|
|
|
|
|
+
|
|
|
|
|
+ def _debug_path(self, name: str, opts: Optional["MinerUWiredTableRecognizer.DebugOptions"] = None) -> Optional[str]:
|
|
|
|
|
+ o = opts or self.debug_options
|
|
|
|
|
+ if not o or not o.output_dir:
|
|
|
|
|
+ return None
|
|
|
|
|
+ prefix = (o.prefix + "_") if o.prefix else ""
|
|
|
|
|
+ ext = o.image_format or "png"
|
|
|
|
|
+ return f"{o.output_dir}/{prefix}{name}.{ext}"
|
|
|
|
|
+
|
|
|
# ========== 坐标格式转换工具 ==========
|
|
# ========== 坐标格式转换工具 ==========
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
@@ -281,7 +335,7 @@ class MinerUWiredTableRecognizer:
|
|
|
self,
|
|
self,
|
|
|
bboxes: List[List[float]],
|
|
bboxes: List[List[float]],
|
|
|
ocr_boxes: List[Dict[str, Any]],
|
|
ocr_boxes: List[Dict[str, Any]],
|
|
|
- ) -> List[str]:
|
|
|
|
|
|
|
+ ) -> Tuple[List[str], List[float]]:
|
|
|
"""
|
|
"""
|
|
|
使用中心点落格策略填充文本。
|
|
使用中心点落格策略填充文本。
|
|
|
|
|
|
|
@@ -295,11 +349,13 @@ class MinerUWiredTableRecognizer:
|
|
|
|
|
|
|
|
Returns:
|
|
Returns:
|
|
|
每个单元格的文本列表
|
|
每个单元格的文本列表
|
|
|
|
|
+ 每个单元格的置信度列表
|
|
|
"""
|
|
"""
|
|
|
texts: List[str] = ["" for _ in bboxes]
|
|
texts: List[str] = ["" for _ in bboxes]
|
|
|
|
|
+ scores: List[float] = [0.0 for _ in bboxes]
|
|
|
|
|
|
|
|
if not ocr_boxes:
|
|
if not ocr_boxes:
|
|
|
- return texts
|
|
|
|
|
|
|
+ return texts, scores
|
|
|
|
|
|
|
|
# 预处理OCR结果:计算中心点
|
|
# 预处理OCR结果:计算中心点
|
|
|
ocr_items: List[Dict[str, Any]] = []
|
|
ocr_items: List[Dict[str, Any]] = []
|
|
@@ -314,33 +370,29 @@ class MinerUWiredTableRecognizer:
|
|
|
"center_y": cy,
|
|
"center_y": cy,
|
|
|
"y1": box[1],
|
|
"y1": box[1],
|
|
|
"text": item.get("text", ""),
|
|
"text": item.get("text", ""),
|
|
|
- "confidence": item.get("confidence", 0.0),
|
|
|
|
|
|
|
+ "confidence": float(item.get("confidence", item.get("score", 1.0))),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
# 为每个单元格匹配OCR文本
|
|
# 为每个单元格匹配OCR文本
|
|
|
for idx, bbox in enumerate(bboxes):
|
|
for idx, bbox in enumerate(bboxes):
|
|
|
x1, y1, x2, y2 = bbox
|
|
x1, y1, x2, y2 = bbox
|
|
|
- matched: List[Tuple[str, float]] = []
|
|
|
|
|
|
|
+ matched: List[Tuple[str, float, float]] = [] # (text, y1, score)
|
|
|
|
|
|
|
|
for ocr in ocr_items:
|
|
for ocr in ocr_items:
|
|
|
if x1 <= ocr["center_x"] <= x2 and y1 <= ocr["center_y"] <= y2:
|
|
if x1 <= ocr["center_x"] <= x2 and y1 <= ocr["center_y"] <= y2:
|
|
|
- matched.append((ocr["text"], ocr["y1"]))
|
|
|
|
|
|
|
+ matched.append((ocr["text"], ocr["y1"], ocr["confidence"]))
|
|
|
|
|
|
|
|
if matched:
|
|
if matched:
|
|
|
# 按y坐标排序,确保多行文本顺序正确
|
|
# 按y坐标排序,确保多行文本顺序正确
|
|
|
matched.sort(key=lambda x: x[1])
|
|
matched.sort(key=lambda x: x[1])
|
|
|
- texts[idx] = " ".join([t for t, _ in matched])
|
|
|
|
|
|
|
+ texts[idx] = "".join([t for t, _, _ in matched])
|
|
|
|
|
+ # 计算平均置信度
|
|
|
|
|
+ avg_score = sum([s for _, _, s in matched]) / len(matched)
|
|
|
|
|
+ scores[idx] = avg_score
|
|
|
|
|
+ else:
|
|
|
|
|
+ scores[idx] = 0.0 # 无匹配文本,置信度为0
|
|
|
|
|
|
|
|
- return texts
|
|
|
|
|
-
|
|
|
|
|
- def _match_text_by_center(
|
|
|
|
|
- self,
|
|
|
|
|
- cells_bbox: List[List[float]],
|
|
|
|
|
- ocr_boxes: List[Dict[str, Any]],
|
|
|
|
|
- ) -> List[str]:
|
|
|
|
|
- """使用中心点落格分配文本,行内按 y 排序后拼接。(旧版兼容)"""
|
|
|
|
|
- return self._fill_text_by_center_point(cells_bbox, ocr_boxes)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ return texts, scores
|
|
|
|
|
|
|
|
def recognize_legacy(
|
|
def recognize_legacy(
|
|
|
self,
|
|
self,
|
|
@@ -401,41 +453,12 @@ class MinerUWiredTableRecognizer:
|
|
|
# 缩回裁剪坐标
|
|
# 缩回裁剪坐标
|
|
|
polys /= upscale
|
|
polys /= upscale
|
|
|
bboxes = [self._poly_to_bbox(poly) for poly in polys]
|
|
bboxes = [self._poly_to_bbox(poly) for poly in polys]
|
|
|
- texts = self._match_text_by_center(bboxes, ocr_boxes or [])
|
|
|
|
|
- # 对空文本单元格触发单元格级 OCR 补充
|
|
|
|
|
|
|
+ texts, scores = self._fill_text_by_center_point(bboxes, ocr_boxes)
|
|
|
|
|
+
|
|
|
|
|
+ # 统一调用二次OCR封装(替换原有重复逻辑)
|
|
|
if self.ocr_engine is not None and any(not t for t in texts):
|
|
if self.ocr_engine is not None and any(not t for t in texts):
|
|
|
- crop_list = []
|
|
|
|
|
- crop_info = []
|
|
|
|
|
- h, w = table_image.shape[:2]
|
|
|
|
|
- margin = self.cell_crop_margin
|
|
|
|
|
- for idx, bbox in enumerate(bboxes):
|
|
|
|
|
- if texts[idx]:
|
|
|
|
|
- continue
|
|
|
|
|
- x1, y1, x2, y2 = bbox
|
|
|
|
|
- x1i, y1i, x2i, y2i = map(int, [x1, y1, x2, y2])
|
|
|
|
|
-
|
|
|
|
|
- # 增加裁剪边距防止文字被截断(特别是边界字符如"司")
|
|
|
|
|
- x1i = max(0, x1i - margin)
|
|
|
|
|
- y1i = max(0, y1i - margin)
|
|
|
|
|
- x2i = min(w, x2i + margin)
|
|
|
|
|
- y2i = min(h, y2i + margin)
|
|
|
|
|
-
|
|
|
|
|
- if x2i <= x1i or y2i <= y1i:
|
|
|
|
|
- continue
|
|
|
|
|
- crop = table_image[y1i:y2i, x1i:x2i]
|
|
|
|
|
- if crop.size == 0:
|
|
|
|
|
- continue
|
|
|
|
|
- crop_list.append(crop)
|
|
|
|
|
- crop_info.append(idx)
|
|
|
|
|
- if crop_list:
|
|
|
|
|
- try:
|
|
|
|
|
- ocr_res = self.ocr_engine.ocr(crop_list, det=False)
|
|
|
|
|
- if ocr_res and isinstance(ocr_res, list) and len(ocr_res) == 1:
|
|
|
|
|
- for loc, (text, score) in zip(crop_info, ocr_res[0]):
|
|
|
|
|
- if score >= self.ocr_conf_threshold and text:
|
|
|
|
|
- texts[loc] = text
|
|
|
|
|
- except Exception:
|
|
|
|
|
- pass
|
|
|
|
|
|
|
+ texts = self._second_pass_ocr_fill(table_image, bboxes, texts, scores)
|
|
|
|
|
+
|
|
|
for idx, bbox in enumerate(bboxes):
|
|
for idx, bbox in enumerate(bboxes):
|
|
|
lp = logic_points[idx] if len(logic_points) > idx else [0, 0, 0, 0]
|
|
lp = logic_points[idx] if len(logic_points) > idx else [0, 0, 0, 0]
|
|
|
cells.append({
|
|
cells.append({
|
|
@@ -449,12 +472,11 @@ class MinerUWiredTableRecognizer:
|
|
|
|
|
|
|
|
# 通过BeautifulSoup增强HTML,添加data-bbox和data-score属性(保留原始HTML结构)
|
|
# 通过BeautifulSoup增强HTML,添加data-bbox和data-score属性(保留原始HTML结构)
|
|
|
html_enhanced = self._enhance_html_with_cell_data(html_code, cells)
|
|
html_enhanced = self._enhance_html_with_cell_data(html_code, cells)
|
|
|
-
|
|
|
|
|
return {
|
|
return {
|
|
|
"html": html_enhanced or html_code or "",
|
|
"html": html_enhanced or html_code or "",
|
|
|
"cells": cells,
|
|
"cells": cells,
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def _enhance_html_with_cell_data(self, html_code: str, cells: List[Dict[str, Any]]) -> str:
|
|
def _enhance_html_with_cell_data(self, html_code: str, cells: List[Dict[str, Any]]) -> str:
|
|
|
"""通过BeautifulSoup增强HTML,为每个td添加data-bbox和data-score属性
|
|
"""通过BeautifulSoup增强HTML,为每个td添加data-bbox和data-score属性
|
|
|
|
|
|
|
@@ -527,7 +549,6 @@ class MinerUWiredTableRecognizer:
|
|
|
hpred_up: np.ndarray,
|
|
hpred_up: np.ndarray,
|
|
|
vpred_up: np.ndarray,
|
|
vpred_up: np.ndarray,
|
|
|
upscale: float = 1.0,
|
|
upscale: float = 1.0,
|
|
|
- debug_output_dir: Optional[str] = None
|
|
|
|
|
) -> List[List[float]]:
|
|
) -> List[List[float]]:
|
|
|
"""
|
|
"""
|
|
|
基于连通域分析从表格线 Mask 提取单元格
|
|
基于连通域分析从表格线 Mask 提取单元格
|
|
@@ -573,6 +594,15 @@ class MinerUWiredTableRecognizer:
|
|
|
# 过滤掉长条形的非单元格区域(例如边缘的细长空白)
|
|
# 过滤掉长条形的非单元格区域(例如边缘的细长空白)
|
|
|
if w_cell > w * 0.95 or h_cell > h * 0.95:
|
|
if w_cell > w * 0.95 or h_cell > h * 0.95:
|
|
|
continue
|
|
continue
|
|
|
|
|
+
|
|
|
|
|
+ # 转换到原图尺度
|
|
|
|
|
+ orig_h = h_cell / upscale
|
|
|
|
|
+ orig_w = w_cell / upscale
|
|
|
|
|
+
|
|
|
|
|
+ # 过滤极小高度/宽度的单元格 (可能是边缘噪声或线条残留)
|
|
|
|
|
+ # 阈值设为 5 像素,通常文本行不会小于这个高度
|
|
|
|
|
+ if orig_h < 5 or orig_w < 5:
|
|
|
|
|
+ continue
|
|
|
|
|
|
|
|
# 还原到原图坐标
|
|
# 还原到原图坐标
|
|
|
# 注意:连通域提取的是内部空白,实际单元格边界应该包含线条的一半宽度
|
|
# 注意:连通域提取的是内部空白,实际单元格边界应该包含线条的一半宽度
|
|
@@ -589,18 +619,6 @@ class MinerUWiredTableRecognizer:
|
|
|
bboxes.sort(key=lambda b: (int(b[1] / 10), b[0]))
|
|
bboxes.sort(key=lambda b: (int(b[1] / 10), b[0]))
|
|
|
|
|
|
|
|
logger.info(f"连通域分析提取到 {len(bboxes)} 个单元格")
|
|
logger.info(f"连通域分析提取到 {len(bboxes)} 个单元格")
|
|
|
-
|
|
|
|
|
- # 调试可视化
|
|
|
|
|
- if debug_output_dir:
|
|
|
|
|
- vis = np.zeros((h, w, 3), dtype=np.uint8)
|
|
|
|
|
- vis[grid_mask > 0] = [0, 0, 255] # 红色线条
|
|
|
|
|
-
|
|
|
|
|
- # 绘制提取出的框
|
|
|
|
|
- for i, box in enumerate(bboxes):
|
|
|
|
|
- x1, y1, x2, y2 = [int(c * upscale) for c in box]
|
|
|
|
|
- cv2.rectangle(vis, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
|
|
|
-
|
|
|
|
|
- cv2.imwrite(f"{debug_output_dir}/connected_components.png", vis)
|
|
|
|
|
|
|
|
|
|
return bboxes
|
|
return bboxes
|
|
|
|
|
|
|
@@ -643,7 +661,111 @@ class MinerUWiredTableRecognizer:
|
|
|
cv2.imwrite(output_path, vis_img)
|
|
cv2.imwrite(output_path, vis_img)
|
|
|
logger.info(f"检测线可视化: {output_path}")
|
|
logger.info(f"检测线可视化: {output_path}")
|
|
|
|
|
|
|
|
|
|
+ def _visualize_connected_components(
|
|
|
|
|
+ self,
|
|
|
|
|
+ hpred_up: np.ndarray,
|
|
|
|
|
+ vpred_up: np.ndarray,
|
|
|
|
|
+ bboxes: List[List[float]],
|
|
|
|
|
+ upscale: float,
|
|
|
|
|
+ output_path: str
|
|
|
|
|
+ ) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ 复刻连通域风格:红色网格线背景 + 绿色单元格框。
|
|
|
|
|
+ 使用上采样尺度的 mask 与坐标,保证线条清晰。
|
|
|
|
|
+ """
|
|
|
|
|
+ h, w = hpred_up.shape[:2]
|
|
|
|
|
+
|
|
|
|
|
+ # 与连通域提取相同的预处理,以获得直观的网格线背景
|
|
|
|
|
+ _, h_bin = cv2.threshold(hpred_up, 127, 255, cv2.THRESH_BINARY)
|
|
|
|
|
+ _, v_bin = cv2.threshold(vpred_up, 127, 255, cv2.THRESH_BINARY)
|
|
|
|
|
+ kernel_h = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 1))
|
|
|
|
|
+ kernel_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 5))
|
|
|
|
|
+ h_bin = cv2.dilate(h_bin, kernel_h, iterations=1)
|
|
|
|
|
+ v_bin = cv2.dilate(v_bin, kernel_v, iterations=1)
|
|
|
|
|
+ grid_mask = cv2.bitwise_or(h_bin, v_bin)
|
|
|
|
|
+
|
|
|
|
|
+ vis = np.zeros((h, w, 3), dtype=np.uint8)
|
|
|
|
|
+ vis[grid_mask > 0] = [0, 0, 255] # 红色线条
|
|
|
|
|
+
|
|
|
|
|
+ # 在上采样坐标系上绘制单元格框
|
|
|
|
|
+ for box in bboxes:
|
|
|
|
|
+ x1, y1, x2, y2 = [int(c * upscale) for c in box]
|
|
|
|
|
+ cv2.rectangle(vis, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
|
|
|
+
|
|
|
|
|
+ cv2.imwrite(output_path, vis)
|
|
|
|
|
+ logger.info(f"连通域可视化: {output_path}")
|
|
|
|
|
|
|
|
|
|
+ def _compress_grid(self, cells: List[Dict]) -> List[Dict]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 压缩网格索引,移除空行和空列
|
|
|
|
|
+ """
|
|
|
|
|
+ if not cells:
|
|
|
|
|
+ return []
|
|
|
|
|
+
|
|
|
|
|
+ # 1. 计算当前最大行列
|
|
|
|
|
+ max_row = 0
|
|
|
|
|
+ max_col = 0
|
|
|
|
|
+ for cell in cells:
|
|
|
|
|
+ max_row = max(max_row, cell["row"] + cell.get("rowspan", 1))
|
|
|
|
|
+ max_col = max(max_col, cell["col"] + cell.get("colspan", 1))
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 标记占用情况
|
|
|
|
|
+ # 修改:仅标记有单元格起始的行/列为占用
|
|
|
|
|
+ # 这样可以自动移除那些仅由 rowspan/colspan 跨越的"间隙行/列"
|
|
|
|
|
+ # 例如:表头行之间的微小垂直间隙,如果没有单元格从该间隙开始,则该行应被折叠
|
|
|
|
|
+ row_occupied = [False] * max_row
|
|
|
|
|
+ col_occupied = [False] * max_col
|
|
|
|
|
+
|
|
|
|
|
+ for cell in cells:
|
|
|
|
|
+ if cell["row"] < max_row:
|
|
|
|
|
+ row_occupied[cell["row"]] = True
|
|
|
|
|
+ if cell["col"] < max_col:
|
|
|
|
|
+ col_occupied[cell["col"]] = True
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 构建映射表
|
|
|
|
|
+ row_map = [0] * (max_row + 1)
|
|
|
|
|
+ current_row = 0
|
|
|
|
|
+ for r in range(max_row):
|
|
|
|
|
+ if row_occupied[r]:
|
|
|
|
|
+ current_row += 1
|
|
|
|
|
+ row_map[r + 1] = current_row
|
|
|
|
|
+
|
|
|
|
|
+ col_map = [0] * (max_col + 1)
|
|
|
|
|
+ current_col = 0
|
|
|
|
|
+ for c in range(max_col):
|
|
|
|
|
+ if col_occupied[c]:
|
|
|
|
|
+ current_col += 1
|
|
|
|
|
+ col_map[c + 1] = current_col
|
|
|
|
|
+
|
|
|
|
|
+ # 4. 更新单元格索引
|
|
|
|
|
+ new_cells = []
|
|
|
|
|
+ for cell in cells:
|
|
|
|
|
+ new_cell = cell.copy()
|
|
|
|
|
+
|
|
|
|
|
+ old_r1 = cell["row"]
|
|
|
|
|
+ old_r2 = old_r1 + cell.get("rowspan", 1)
|
|
|
|
|
+ new_r1 = row_map[old_r1]
|
|
|
|
|
+ new_r2 = row_map[old_r2]
|
|
|
|
|
+
|
|
|
|
|
+ old_c1 = cell["col"]
|
|
|
|
|
+ old_c2 = old_c1 + cell.get("colspan", 1)
|
|
|
|
|
+ new_c1 = col_map[old_c1]
|
|
|
|
|
+ new_c2 = col_map[old_c2]
|
|
|
|
|
+
|
|
|
|
|
+ # 如果压缩后 span 变为 0 (理论上不应该,因为只要有 cell 占用,occupied 就是 True),
|
|
|
|
|
+ # 但为了安全起见,确保至少为 1
|
|
|
|
|
+ new_span_r = max(1, new_r2 - new_r1)
|
|
|
|
|
+ new_span_c = max(1, new_c2 - new_c1)
|
|
|
|
|
+
|
|
|
|
|
+ new_cell["row"] = new_r1
|
|
|
|
|
+ new_cell["col"] = new_c1
|
|
|
|
|
+ new_cell["rowspan"] = new_span_r
|
|
|
|
|
+ new_cell["colspan"] = new_span_c
|
|
|
|
|
+
|
|
|
|
|
+ new_cells.append(new_cell)
|
|
|
|
|
+
|
|
|
|
|
+ return new_cells
|
|
|
|
|
+
|
|
|
def _recover_grid_structure(self, bboxes: List[List[float]]) -> List[Dict]:
|
|
def _recover_grid_structure(self, bboxes: List[List[float]]) -> List[Dict]:
|
|
|
"""
|
|
"""
|
|
|
从散乱的单元格 bbox 恢复表格的行列结构 (row, col, rowspan, colspan)
|
|
从散乱的单元格 bbox 恢复表格的行列结构 (row, col, rowspan, colspan)
|
|
@@ -726,6 +848,9 @@ class MinerUWiredTableRecognizer:
|
|
|
# 按行列排序
|
|
# 按行列排序
|
|
|
structured_cells.sort(key=lambda c: (c["row"], c["col"]))
|
|
structured_cells.sort(key=lambda c: (c["row"], c["col"]))
|
|
|
|
|
|
|
|
|
|
+ # 压缩网格,移除空行空列
|
|
|
|
|
+ structured_cells = self._compress_grid(structured_cells)
|
|
|
|
|
+
|
|
|
return structured_cells
|
|
return structured_cells
|
|
|
|
|
|
|
|
def _build_html_from_merged_cells(self, merged_cells: List[Dict]) -> str:
|
|
def _build_html_from_merged_cells(self, merged_cells: List[Dict]) -> str:
|
|
@@ -778,7 +903,7 @@ class MinerUWiredTableRecognizer:
|
|
|
if rowspan > 1:
|
|
if rowspan > 1:
|
|
|
attrs.append(f'rowspan="{rowspan}"')
|
|
attrs.append(f'rowspan="{rowspan}"')
|
|
|
|
|
|
|
|
- html_parts.append(f'<td {" ".join(attrs)}>{text}</td>')
|
|
|
|
|
|
|
+ html_parts.append(f'<td {"".join(attrs)}>{text}</td>')
|
|
|
|
|
|
|
|
# 标记占用
|
|
# 标记占用
|
|
|
for i in range(rowspan):
|
|
for i in range(rowspan):
|
|
@@ -836,7 +961,7 @@ class MinerUWiredTableRecognizer:
|
|
|
self,
|
|
self,
|
|
|
table_image: np.ndarray,
|
|
table_image: np.ndarray,
|
|
|
ocr_boxes: List[Dict[str, Any]],
|
|
ocr_boxes: List[Dict[str, Any]],
|
|
|
- debug_output_dir: Optional[str] = None,
|
|
|
|
|
|
|
+ debug_options: Optional[Dict[str, Any]] = None,
|
|
|
) -> Dict[str, Any]:
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
"""
|
|
|
V4版本:直接从表格线计算单元格,绕过 MinerU 的 cal_region_boxes
|
|
V4版本:直接从表格线计算单元格,绕过 MinerU 的 cal_region_boxes
|
|
@@ -862,117 +987,60 @@ class MinerUWiredTableRecognizer:
|
|
|
hpred_up = cv2.resize(hpred, (w_up, h_up), interpolation=cv2.INTER_NEAREST)
|
|
hpred_up = cv2.resize(hpred, (w_up, h_up), interpolation=cv2.INTER_NEAREST)
|
|
|
vpred_up = cv2.resize(vpred, (w_up, h_up), interpolation=cv2.INTER_NEAREST)
|
|
vpred_up = cv2.resize(vpred, (w_up, h_up), interpolation=cv2.INTER_NEAREST)
|
|
|
|
|
|
|
|
|
|
+ # 调试选项合并
|
|
|
|
|
+ dbg = self._merge_debug_options(debug_options or {})
|
|
|
|
|
+
|
|
|
# Step 1.5: 可视化表格线(调试用)- 需要缩放回原图
|
|
# Step 1.5: 可视化表格线(调试用)- 需要缩放回原图
|
|
|
- if debug_output_dir:
|
|
|
|
|
|
|
+ if self._debug_is_on("save_table_lines", dbg):
|
|
|
hpred_orig = cv2.resize(hpred_up, (w, h), interpolation=cv2.INTER_NEAREST)
|
|
hpred_orig = cv2.resize(hpred_up, (w, h), interpolation=cv2.INTER_NEAREST)
|
|
|
vpred_orig = cv2.resize(vpred_up, (w, h), interpolation=cv2.INTER_NEAREST)
|
|
vpred_orig = cv2.resize(vpred_up, (w, h), interpolation=cv2.INTER_NEAREST)
|
|
|
|
|
+ out_path = self._debug_path("unet_table_lines", dbg)
|
|
|
self._visualize_table_lines(
|
|
self._visualize_table_lines(
|
|
|
table_image,
|
|
table_image,
|
|
|
hpred_orig,
|
|
hpred_orig,
|
|
|
vpred_orig,
|
|
vpred_orig,
|
|
|
- output_path=f"{debug_output_dir}/unet_table_lines.png"
|
|
|
|
|
|
|
+ output_path=out_path if out_path else ""
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
# Step 2: 使用连通域法提取单元格 (替换了原来的投影法)
|
|
# Step 2: 使用连通域法提取单元格 (替换了原来的投影法)
|
|
|
- bboxes = self._compute_cells_from_lines(hpred_up, vpred_up, upscale, debug_output_dir)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ bboxes = self._compute_cells_from_lines(hpred_up, vpred_up, upscale)
|
|
|
if not bboxes:
|
|
if not bboxes:
|
|
|
raise RuntimeError("未能提取出单元格")
|
|
raise RuntimeError("未能提取出单元格")
|
|
|
|
|
|
|
|
|
|
+ # Step 2.5: 可视化连通域(线条+框,直观版)
|
|
|
|
|
+ if self._debug_is_on("save_connected_components", dbg):
|
|
|
|
|
+ out_path = self._debug_path("connected_components", dbg)
|
|
|
|
|
+ if out_path:
|
|
|
|
|
+ self._visualize_connected_components(
|
|
|
|
|
+ hpred_up,
|
|
|
|
|
+ vpred_up,
|
|
|
|
|
+ bboxes,
|
|
|
|
|
+ upscale,
|
|
|
|
|
+ output_path=out_path
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
# Step 3: 重建网格结构 (计算 row, col, rowspan, colspan)
|
|
# Step 3: 重建网格结构 (计算 row, col, rowspan, colspan)
|
|
|
# 这一步替代了原来的 _merge_cells_without_separator
|
|
# 这一步替代了原来的 _merge_cells_without_separator
|
|
|
merged_cells = self._recover_grid_structure(bboxes)
|
|
merged_cells = self._recover_grid_structure(bboxes)
|
|
|
|
|
|
|
|
# Step 3.5: 可视化逻辑结构 (新增)
|
|
# Step 3.5: 可视化逻辑结构 (新增)
|
|
|
- if debug_output_dir:
|
|
|
|
|
- self._visualize_grid_structure(
|
|
|
|
|
- table_image, merged_cells,
|
|
|
|
|
- output_path=f"{debug_output_dir}/grid_structure.png"
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ if self._debug_is_on("save_grid_structure", dbg):
|
|
|
|
|
+ out_path = self._debug_path("grid_structure", dbg)
|
|
|
|
|
+ if out_path:
|
|
|
|
|
+ self._visualize_grid_structure(
|
|
|
|
|
+ table_image, merged_cells,
|
|
|
|
|
+ output_path=out_path
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
# Step 4: 统一计算文本填充
|
|
# Step 4: 统一计算文本填充
|
|
|
bboxes_merged = [cell["bbox"] for cell in merged_cells]
|
|
bboxes_merged = [cell["bbox"] for cell in merged_cells]
|
|
|
- texts = self._fill_text_by_center_point(bboxes_merged, ocr_boxes or [])
|
|
|
|
|
|
|
+ texts, scores = self._fill_text_by_center_point(bboxes_merged, ocr_boxes or [])
|
|
|
|
|
|
|
|
- # Step 4.5: 对空单元格尝试二次 OCR (新增)
|
|
|
|
|
- # 针对漏检问题(特别是竖排小字),进行切片放大识别
|
|
|
|
|
- if hasattr(self, 'ocr_engine') and self.ocr_engine and any(not t for t in texts):
|
|
|
|
|
- crop_list = []
|
|
|
|
|
- crop_indices = []
|
|
|
|
|
- h_img, w_img = table_image.shape[:2]
|
|
|
|
|
- margin = self.cell_crop_margin
|
|
|
|
|
-
|
|
|
|
|
- for i, text in enumerate(texts):
|
|
|
|
|
- if text.strip():
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- bbox = bboxes_merged[i]
|
|
|
|
|
- x1, y1, x2, y2 = map(int, bbox)
|
|
|
|
|
-
|
|
|
|
|
- # 边界保护 + 少量外扩
|
|
|
|
|
- x1 = max(0, x1 - margin)
|
|
|
|
|
- y1 = max(0, y1 - margin)
|
|
|
|
|
- x2 = min(w_img, x2 + margin)
|
|
|
|
|
- y2 = min(h_img, y2 + margin)
|
|
|
|
|
-
|
|
|
|
|
- if x2 <= x1 or y2 <= y1:
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- cell_img = table_image[y1:y2, x1:x2]
|
|
|
|
|
- if cell_img.size == 0:
|
|
|
|
|
- continue
|
|
|
|
|
-
|
|
|
|
|
- # --- 关键改进:放大与旋转 ---
|
|
|
|
|
- cell_h, cell_w = cell_img.shape[:2]
|
|
|
|
|
-
|
|
|
|
|
- # 1. 放大图像:对于表格中的小字,放大能显著提高识别率
|
|
|
|
|
- # 建议放大 2 倍,如果原图特别小可以更大
|
|
|
|
|
- scale = 2.0
|
|
|
|
|
- if cell_h < 64 or cell_w < 64: # 只有较小的图才放大,避免大图过大
|
|
|
|
|
- cell_img = cv2.resize(cell_img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
|
|
|
|
|
-
|
|
|
|
|
- # 2. 处理竖排文本:如果高宽比很大(>2),很可能是竖排表头(如"优先股")
|
|
|
|
|
- # 通用 OCR 模型通常只支持横排,旋转 90 度变成横排
|
|
|
|
|
- if cell_h > cell_w * 2:
|
|
|
|
|
- cell_img = cv2.rotate(cell_img, cv2.ROTATE_90_COUNTERCLOCKWISE)
|
|
|
|
|
- # -------------------------
|
|
|
|
|
-
|
|
|
|
|
- crop_list.append(cell_img)
|
|
|
|
|
- crop_indices.append(i)
|
|
|
|
|
-
|
|
|
|
|
- if crop_list:
|
|
|
|
|
- try:
|
|
|
|
|
- # 批量识别,det=False 表示直接识别内容(假设裁剪图就是文本行)
|
|
|
|
|
- ocr_res = self.ocr_engine.ocr(crop_list, det=False)
|
|
|
|
|
-
|
|
|
|
|
- # 解析结果 (兼容 PaddleOCR 返回格式)
|
|
|
|
|
- # ocr_res 结构通常为 [(text, score), (text, score), ...] 对应每张图
|
|
|
|
|
- # 但有时可能包裹在列表中,需做兼容处理
|
|
|
|
|
- results = ocr_res
|
|
|
|
|
- if isinstance(ocr_res, list) and len(ocr_res) == 1 and isinstance(ocr_res[0], list) and len(ocr_res[0]) == len(crop_list):
|
|
|
|
|
- # 兼容 legacy 代码中遇到的 [[(t,s), (t,s)...]] 情况
|
|
|
|
|
- results = ocr_res[0]
|
|
|
|
|
-
|
|
|
|
|
- if len(results) == len(crop_list):
|
|
|
|
|
- for idx, res in enumerate(results):
|
|
|
|
|
- # res 可能是 (text, score) 或 [(text, score)] 或 None
|
|
|
|
|
- if not res: continue
|
|
|
|
|
-
|
|
|
|
|
- text = ""
|
|
|
|
|
- score = 0.0
|
|
|
|
|
-
|
|
|
|
|
- if isinstance(res, tuple):
|
|
|
|
|
- text, score = res
|
|
|
|
|
- elif isinstance(res, list) and len(res) > 0:
|
|
|
|
|
- text, score = res[0]
|
|
|
|
|
-
|
|
|
|
|
- if score >= self.ocr_conf_threshold and text:
|
|
|
|
|
- texts[crop_indices[idx]] = text
|
|
|
|
|
-
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.warning(f"二次OCR失败: {e}")
|
|
|
|
|
|
|
+ # Step 4.5: 二次 OCR 修正
|
|
|
|
|
+ # 针对漏检(空文本)、低置信度、竖排文本进行二次识别
|
|
|
|
|
+ if hasattr(self, 'ocr_engine') and self.ocr_engine:
|
|
|
|
|
+ texts = self._second_pass_ocr_fill(table_image, bboxes_merged, texts, scores)
|
|
|
|
|
|
|
|
- # 将文本填入 merged_cells
|
|
|
|
|
for i, cell in enumerate(merged_cells):
|
|
for i, cell in enumerate(merged_cells):
|
|
|
cell["text"] = texts[i] if i < len(texts) else ""
|
|
cell["text"] = texts[i] if i < len(texts) else ""
|
|
|
|
|
|
|
@@ -980,11 +1048,13 @@ class MinerUWiredTableRecognizer:
|
|
|
html_filled = self._build_html_from_merged_cells(merged_cells)
|
|
html_filled = self._build_html_from_merged_cells(merged_cells)
|
|
|
|
|
|
|
|
# Step 6: 可视化文本填充(调试用)
|
|
# Step 6: 可视化文本填充(调试用)
|
|
|
- if debug_output_dir:
|
|
|
|
|
- self._visualize_with_text(
|
|
|
|
|
- table_image, bboxes_merged, texts,
|
|
|
|
|
- output_path=f"{debug_output_dir}/text_filled_v4.png"
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ if self._debug_is_on("save_text_overlay", dbg):
|
|
|
|
|
+ out_path = self._debug_path("text_filled_v4", dbg)
|
|
|
|
|
+ if out_path:
|
|
|
|
|
+ self._visualize_with_text(
|
|
|
|
|
+ table_image, bboxes_merged, texts,
|
|
|
|
|
+ output_path=out_path
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
# Step 7: 组装 cells 输出
|
|
# Step 7: 组装 cells 输出
|
|
|
cells = []
|
|
cells = []
|
|
@@ -999,7 +1069,7 @@ class MinerUWiredTableRecognizer:
|
|
|
"matched_text": cell["text"],
|
|
"matched_text": cell["text"],
|
|
|
"score": 100.0,
|
|
"score": 100.0,
|
|
|
})
|
|
})
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
return {
|
|
return {
|
|
|
"html": html_filled,
|
|
"html": html_filled,
|
|
|
"cells": cells,
|
|
"cells": cells,
|
|
@@ -1148,9 +1218,147 @@ class MinerUWiredTableRecognizer:
|
|
|
"""
|
|
"""
|
|
|
if self.use_custom_postprocess:
|
|
if self.use_custom_postprocess:
|
|
|
try:
|
|
try:
|
|
|
- return self.recognize_v4(table_image, ocr_boxes, debug_output_dir="./output")
|
|
|
|
|
|
|
+ return self.recognize_v4(table_image, ocr_boxes, debug_options=self.debug_options.__dict__)
|
|
|
except Exception:
|
|
except Exception:
|
|
|
# 回退
|
|
# 回退
|
|
|
return self.recognize_legacy(table_image, ocr_boxes)
|
|
return self.recognize_legacy(table_image, ocr_boxes)
|
|
|
else:
|
|
else:
|
|
|
- return self.recognize_legacy(table_image, ocr_boxes)
|
|
|
|
|
|
|
+ return self.recognize_legacy(table_image, ocr_boxes)
|
|
|
|
|
+
|
|
|
|
|
+ def _second_pass_ocr_fill(
|
|
|
|
|
+ self,
|
|
|
|
|
+ table_image: np.ndarray,
|
|
|
|
|
+ bboxes: List[List[float]],
|
|
|
|
|
+ texts: List[str],
|
|
|
|
|
+ scores: Optional[List[float]] = None,
|
|
|
|
|
+ ) -> List[str]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 二次OCR统一封装:
|
|
|
|
|
+ - 对空文本单元格裁剪图块并少量外扩
|
|
|
|
|
+ - 对低置信度文本进行重识别
|
|
|
|
|
+ - 对竖排单元格(高宽比大)进行旋转后识别
|
|
|
|
|
+ """
|
|
|
|
|
+ try:
|
|
|
|
|
+ if not hasattr(self, "ocr_engine") or self.ocr_engine is None:
|
|
|
|
|
+ return texts
|
|
|
|
|
+
|
|
|
|
|
+ # 如果没有传入 scores,则默认全为 1.0(仅处理空文本)
|
|
|
|
|
+ if scores is None:
|
|
|
|
|
+ scores = [1.0 if t else 0.0 for t in texts]
|
|
|
|
|
+
|
|
|
|
|
+ h_img, w_img = table_image.shape[:2]
|
|
|
|
|
+ margin = getattr(self, "cell_crop_margin", 2)
|
|
|
|
|
+
|
|
|
|
|
+ # 触发二次OCR的阈值
|
|
|
|
|
+ trigger_score_thresh = 0.90
|
|
|
|
|
+
|
|
|
|
|
+ crop_list: List[np.ndarray] = []
|
|
|
|
|
+ crop_indices: List[int] = []
|
|
|
|
|
+
|
|
|
|
|
+ # 收集需要二次OCR的裁剪块
|
|
|
|
|
+ for i, t in enumerate(texts):
|
|
|
|
|
+ bbox = bboxes[i]
|
|
|
|
|
+ w_box = bbox[2] - bbox[0]
|
|
|
|
|
+ h_box = bbox[3] - bbox[1]
|
|
|
|
|
+
|
|
|
|
|
+ # 判断是否需要二次OCR
|
|
|
|
|
+ need_reocr = False
|
|
|
|
|
+
|
|
|
|
|
+ # 1. 文本为空
|
|
|
|
|
+ if not t or not t.strip():
|
|
|
|
|
+ need_reocr = True
|
|
|
|
|
+ # 2. 置信度过低
|
|
|
|
|
+ elif scores[i] < trigger_score_thresh:
|
|
|
|
|
+ need_reocr = True
|
|
|
|
|
+ # 3. 竖排单元格 (高宽比 > 2.5) 且置信度不是极高
|
|
|
|
|
+ # 竖排文本全图OCR容易出错,旋转后识别更准
|
|
|
|
|
+ elif h_box > w_box * 2.5 and scores[i] < 0.98:
|
|
|
|
|
+ need_reocr = True
|
|
|
|
|
+
|
|
|
|
|
+ if not need_reocr:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ if i >= len(bboxes):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ x1, y1, x2, y2 = map(int, bboxes[i])
|
|
|
|
|
+ x1 = max(0, x1 - margin)
|
|
|
|
|
+ y1 = max(0, y1 - margin)
|
|
|
|
|
+ x2 = min(w_img, x2 + margin)
|
|
|
|
|
+ y2 = min(h_img, y2 + margin)
|
|
|
|
|
+ if x2 <= x1 or y2 <= y1:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ cell_img = table_image[y1:y2, x1:x2]
|
|
|
|
|
+ if cell_img.size == 0:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ ch, cw = cell_img.shape[:2]
|
|
|
|
|
+ # 小图放大
|
|
|
|
|
+ if ch < 64 or cw < 64:
|
|
|
|
|
+ cell_img = cv2.resize(cell_img, None, fx=2.0, fy=2.0, interpolation=cv2.INTER_CUBIC)
|
|
|
|
|
+ ch, cw = cell_img.shape[:2]
|
|
|
|
|
+
|
|
|
|
|
+ # 竖排文本旋转为横排
|
|
|
|
|
+ # 这里的阈值设为 2.0,涵盖大部分竖排表头
|
|
|
|
|
+ if ch > cw * 2.0:
|
|
|
|
|
+ cell_img = cv2.rotate(cell_img, cv2.ROTATE_90_COUNTERCLOCKWISE)
|
|
|
|
|
+
|
|
|
|
|
+ crop_list.append(cell_img)
|
|
|
|
|
+ crop_indices.append(i)
|
|
|
|
|
+
|
|
|
|
|
+ if not crop_list:
|
|
|
|
|
+ return texts
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"触发二次OCR: {len(crop_list)} 个单元格 (总数 {len(texts)})")
|
|
|
|
|
+
|
|
|
|
|
+ # 批量OCR(det=False)
|
|
|
|
|
+ ocr_res = self.ocr_engine.ocr(crop_list, det=False)
|
|
|
|
|
+ results = ocr_res
|
|
|
|
|
+
|
|
|
|
|
+ # 兼容 [[(text,score),...]] 的嵌套返回
|
|
|
|
|
+ if isinstance(results, list) and len(results) == 1 and isinstance(results[0], list) and len(results[0]) == len(crop_list):
|
|
|
|
|
+ results = results[0]
|
|
|
|
|
+
|
|
|
|
|
+ # 解析为 (text, score)
|
|
|
|
|
+ def _parse_item(res_item) -> Tuple[str, float]:
|
|
|
|
|
+ if res_item is None:
|
|
|
|
|
+ return "", 0.0
|
|
|
|
|
+ # 直接 (text, score)
|
|
|
|
|
+ if isinstance(res_item, tuple) and len(res_item) >= 2:
|
|
|
|
|
+ return str(res_item[0] or ""), float(res_item[1] or 0.0)
|
|
|
|
|
+ # 列表形式,取第一个
|
|
|
|
|
+ if isinstance(res_item, list) and len(res_item) > 0:
|
|
|
|
|
+ first = res_item[0]
|
|
|
|
|
+ if isinstance(first, tuple) and len(first) >= 2:
|
|
|
|
|
+ return str(first[0] or ""), float(first[1] or 0.0)
|
|
|
|
|
+ if isinstance(first, list) and len(first) >= 2:
|
|
|
|
|
+ return str(first[0] or ""), float(first[1] or 0.0)
|
|
|
|
|
+ if isinstance(first, dict):
|
|
|
|
|
+ txt = str(first.get("text") or first.get("label") or "")
|
|
|
|
|
+ sc = float(first.get("score") or first.get("confidence") or 0.0)
|
|
|
|
|
+ return txt, sc
|
|
|
|
|
+ # 字典形式
|
|
|
|
|
+ if isinstance(res_item, dict):
|
|
|
|
|
+ txt = str(res_item.get("text") or res_item.get("label") or "")
|
|
|
|
|
+ sc = float(res_item.get("score") or res_item.get("confidence") or 0.0)
|
|
|
|
|
+ return txt, sc
|
|
|
|
|
+ return "", 0.0
|
|
|
|
|
+
|
|
|
|
|
+ # 对齐长度,避免越界
|
|
|
|
|
+ n = min(len(results) if isinstance(results, list) else 0, len(crop_list), len(crop_indices))
|
|
|
|
|
+ # 结果采纳阈值:二次识别的结果如果置信度太低,可能不如不填(或者保留原值?)
|
|
|
|
|
+ # 这里策略是:只要有结果且置信度尚可,就覆盖。
|
|
|
|
|
+ # 注意:如果原文本不为空但置信度低,二次识别结果置信度更低,是否覆盖?
|
|
|
|
|
+ # 目前逻辑是只要 > conf_th 就覆盖。
|
|
|
|
|
+ conf_th = float(getattr(self, "ocr_conf_threshold", 0.5))
|
|
|
|
|
+
|
|
|
|
|
+ for k in range(n):
|
|
|
|
|
+ text_k, score_k = _parse_item(results[k])
|
|
|
|
|
+ if text_k and score_k >= conf_th:
|
|
|
|
|
+ texts[crop_indices[k]] = text_k
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"二次OCR失败: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ return texts
|