|
@@ -5,6 +5,7 @@
|
|
|
"""
|
|
"""
|
|
|
from typing import List, Dict, Any, Tuple, Optional
|
|
from typing import List, Dict, Any, Tuple, Optional
|
|
|
import bisect
|
|
import bisect
|
|
|
|
|
+import json
|
|
|
import cv2
|
|
import cv2
|
|
|
import numpy as np
|
|
import numpy as np
|
|
|
import os
|
|
import os
|
|
@@ -12,6 +13,7 @@ import re
|
|
|
from loguru import logger
|
|
from loguru import logger
|
|
|
|
|
|
|
|
from ocr_utils.coordinate_utils import CoordinateUtils
|
|
from ocr_utils.coordinate_utils import CoordinateUtils
|
|
|
|
|
+from ocr_utils.watermark import WatermarkProcessor
|
|
|
|
|
|
|
|
|
|
|
|
|
class TextFiller:
|
|
class TextFiller:
|
|
@@ -50,6 +52,61 @@ class TextFiller:
|
|
|
self.second_pass_prefer_whole_on_tie: bool = bool(
|
|
self.second_pass_prefer_whole_on_tie: bool = bool(
|
|
|
sp_cfg.get("prefer_whole_on_tie", True)
|
|
sp_cfg.get("prefer_whole_on_tie", True)
|
|
|
)
|
|
)
|
|
|
|
|
+ self.second_pass_reocr_mode: str = str(sp_cfg.get("reocr_mode", "default"))
|
|
|
|
|
+ self.second_pass_header_row: int = int(sp_cfg.get("header_row", 0))
|
|
|
|
|
+ self.second_pass_strip_aspect: float = float(
|
|
|
|
|
+ sp_cfg.get("strip_fallback_aspect_ratio", 1.8)
|
|
|
|
|
+ )
|
|
|
|
|
+ self.second_pass_whole_longer_extra: int = int(
|
|
|
|
|
+ sp_cfg.get("whole_longer_min_extra_chars", 2)
|
|
|
|
|
+ )
|
|
|
|
|
+ self.second_pass_row_peer_min_nonempty: int = int(
|
|
|
|
|
+ sp_cfg.get("row_peer_min_nonempty", 5)
|
|
|
|
|
+ )
|
|
|
|
|
+ cpp = sp_cfg.get("cell_preprocess") or {}
|
|
|
|
|
+ if not isinstance(cpp, dict):
|
|
|
|
|
+ cpp = {}
|
|
|
|
|
+ light = cpp.get("light") or {}
|
|
|
|
|
+ if not isinstance(light, dict):
|
|
|
|
|
+ light = {}
|
|
|
|
|
+ self.second_pass_light_upscale_min: int = int(
|
|
|
|
|
+ light.get("upscale_min_side", 64)
|
|
|
|
|
+ )
|
|
|
|
|
+ er = cpp.get("enhance_retry") or {}
|
|
|
|
|
+ if not isinstance(er, dict):
|
|
|
|
|
+ er = {}
|
|
|
|
|
+ self.second_pass_enhance_retry_enabled: bool = bool(er.get("enabled", True))
|
|
|
|
|
+ self.second_pass_enhance_score_below: float = float(
|
|
|
|
|
+ er.get("score_below", 0.90)
|
|
|
|
|
+ )
|
|
|
|
|
+ self.second_pass_enhance_min_chars: int = int(er.get("min_chars", 4))
|
|
|
|
|
+ self.second_pass_enhance_short_tall: bool = bool(
|
|
|
|
|
+ er.get("short_text_in_tall_cell", True)
|
|
|
|
|
+ )
|
|
|
|
|
+ contrast = er.get("contrast") or {}
|
|
|
|
|
+ if not isinstance(contrast, dict):
|
|
|
|
|
+ contrast = {}
|
|
|
|
|
+ self.second_pass_enhance_contrast: Dict[str, Any] = dict(contrast)
|
|
|
|
|
+ sharpen = er.get("sharpen") or {}
|
|
|
|
|
+ if not isinstance(sharpen, dict):
|
|
|
|
|
+ sharpen = {}
|
|
|
|
|
+ self.second_pass_enhance_sharpen: Dict[str, Any] = dict(sharpen)
|
|
|
|
|
+
|
|
|
|
|
+ wm_user = cpp.get("watermark") or {}
|
|
|
|
|
+ if not isinstance(wm_user, dict):
|
|
|
|
|
+ wm_user = {}
|
|
|
|
|
+ self._cell_wm_processor = WatermarkProcessor.from_user_config(
|
|
|
|
|
+ wm_user, scope="cell"
|
|
|
|
|
+ )
|
|
|
|
|
+ denoise = cpp.get("denoise") or {}
|
|
|
|
|
+ if not isinstance(denoise, dict):
|
|
|
|
|
+ denoise = {}
|
|
|
|
|
+ self._cell_denoise_enabled: bool = bool(denoise.get("enabled", True))
|
|
|
|
|
+ self._cell_denoise_method: str = str(denoise.get("method", "median"))
|
|
|
|
|
+ cell_contrast = cpp.get("contrast") or {}
|
|
|
|
|
+ if not isinstance(cell_contrast, dict):
|
|
|
|
|
+ cell_contrast = {}
|
|
|
|
|
+ self._cell_contrast_cfg: Dict[str, Any] = dict(cell_contrast)
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
|
def sanitize_debug_filename(text: str, max_length: int = 50) -> str:
|
|
def sanitize_debug_filename(text: str, max_length: int = 50) -> str:
|
|
@@ -232,7 +289,7 @@ class TextFiller:
|
|
|
|
|
|
|
|
def _recognize_whole_cell(self, cell_img: np.ndarray) -> Tuple[str, float]:
|
|
def _recognize_whole_cell(self, cell_img: np.ndarray) -> Tuple[str, float]:
|
|
|
try:
|
|
try:
|
|
|
- rec_res = self.ocr_engine.ocr(cell_img, det=False, rec=True)
|
|
|
|
|
|
|
+ rec_res = self.ocr_engine.ocr(cell_img, det=True, rec=True)
|
|
|
items = self._extract_ocr_batch_results(rec_res)
|
|
items = self._extract_ocr_batch_results(rec_res)
|
|
|
if not items:
|
|
if not items:
|
|
|
return "", 0.0
|
|
return "", 0.0
|
|
@@ -241,19 +298,22 @@ class TextFiller:
|
|
|
logger.warning(f"整格 OCR 失败: {e}")
|
|
logger.warning(f"整格 OCR 失败: {e}")
|
|
|
return "", 0.0
|
|
return "", 0.0
|
|
|
|
|
|
|
|
- def _recognize_cell_lines(self, cell_img: np.ndarray) -> List[Tuple[str, float]]:
|
|
|
|
|
- """det 分行后逐行识别,检测框按阅读顺序(上行下、左到右)排序。"""
|
|
|
|
|
- blocks: List[Tuple[str, float]] = []
|
|
|
|
|
|
|
+ def _recognize_cell_lines_detailed(
|
|
|
|
|
+ self, cell_img: np.ndarray
|
|
|
|
|
+ ) -> List[Dict[str, Any]]:
|
|
|
|
|
+ """det 分行后逐行识别,返回含 det_bbox 的行列表。"""
|
|
|
|
|
+ lines: List[Dict[str, Any]] = []
|
|
|
try:
|
|
try:
|
|
|
det_res = self.ocr_engine.ocr(cell_img, det=True, rec=False)
|
|
det_res = self.ocr_engine.ocr(cell_img, det=True, rec=False)
|
|
|
dt_boxes = []
|
|
dt_boxes = []
|
|
|
if det_res and len(det_res) > 0:
|
|
if det_res and len(det_res) > 0:
|
|
|
dt_boxes = det_res[0] if det_res[0] else []
|
|
dt_boxes = det_res[0] if det_res[0] else []
|
|
|
if not dt_boxes:
|
|
if not dt_boxes:
|
|
|
- return blocks
|
|
|
|
|
|
|
+ return lines
|
|
|
h, w = cell_img.shape[:2]
|
|
h, w = cell_img.shape[:2]
|
|
|
sorted_boxes = self.sort_det_boxes_reading_order(dt_boxes, h, w)
|
|
sorted_boxes = self.sort_det_boxes_reading_order(dt_boxes, h, w)
|
|
|
rec_img_list: List[np.ndarray] = []
|
|
rec_img_list: List[np.ndarray] = []
|
|
|
|
|
+ det_bboxes: List[List[int]] = []
|
|
|
for box in sorted_boxes:
|
|
for box in sorted_boxes:
|
|
|
xyxy = self._det_box_to_xyxy(box, w, h)
|
|
xyxy = self._det_box_to_xyxy(box, w, h)
|
|
|
if xyxy is None:
|
|
if xyxy is None:
|
|
@@ -262,17 +322,146 @@ class TextFiller:
|
|
|
cropped = cell_img[y1:y2, x1:x2]
|
|
cropped = cell_img[y1:y2, x1:x2]
|
|
|
if cropped.size > 0:
|
|
if cropped.size > 0:
|
|
|
rec_img_list.append(cropped)
|
|
rec_img_list.append(cropped)
|
|
|
|
|
+ det_bboxes.append([x1, y1, x2, y2])
|
|
|
if not rec_img_list:
|
|
if not rec_img_list:
|
|
|
- return blocks
|
|
|
|
|
|
|
+ return lines
|
|
|
rec_res = self.ocr_engine.ocr(rec_img_list, det=False, rec=True)
|
|
rec_res = self.ocr_engine.ocr(rec_img_list, det=False, rec=True)
|
|
|
rec_items = self._extract_ocr_batch_results(rec_res)
|
|
rec_items = self._extract_ocr_batch_results(rec_res)
|
|
|
- for rec_item in rec_items:
|
|
|
|
|
|
|
+ for idx, rec_item in enumerate(rec_items):
|
|
|
text, score = self._parse_single_rec_item(rec_item)
|
|
text, score = self._parse_single_rec_item(rec_item)
|
|
|
if text:
|
|
if text:
|
|
|
- blocks.append((text, score))
|
|
|
|
|
|
|
+ lines.append(
|
|
|
|
|
+ {
|
|
|
|
|
+ "index": len(lines),
|
|
|
|
|
+ "text": text,
|
|
|
|
|
+ "score": score,
|
|
|
|
|
+ "det_bbox": det_bboxes[idx] if idx < len(det_bboxes) else [],
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.warning(f"分行 OCR 失败: {e}")
|
|
logger.warning(f"分行 OCR 失败: {e}")
|
|
|
- return blocks
|
|
|
|
|
|
|
+ return lines
|
|
|
|
|
+
|
|
|
|
|
+ def _recognize_cell_lines(self, cell_img: np.ndarray) -> List[Tuple[str, float]]:
|
|
|
|
|
+ return [
|
|
|
|
|
+ (ln["text"], ln["score"])
|
|
|
|
|
+ for ln in self._recognize_cell_lines_detailed(cell_img)
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ def _needs_strip_line_fallback(
|
|
|
|
|
+ self,
|
|
|
|
|
+ cell_img: np.ndarray,
|
|
|
|
|
+ line_blocks: List[Tuple[str, float]],
|
|
|
|
|
+ ) -> bool:
|
|
|
|
|
+ if cell_img is None or cell_img.size == 0:
|
|
|
|
|
+ return False
|
|
|
|
|
+ h, w = cell_img.shape[:2]
|
|
|
|
|
+ if w <= 0:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if h / w < self.second_pass_strip_aspect:
|
|
|
|
|
+ return False
|
|
|
|
|
+ return len(line_blocks) <= 1
|
|
|
|
|
+
|
|
|
|
|
+ def _recognize_strip_fallback(
|
|
|
|
|
+ self, cell_img: np.ndarray, n_strips: int = 4
|
|
|
|
|
+ ) -> Tuple[str, float, List[Dict[str, Any]]]:
|
|
|
|
|
+ """竖长格水平条带扫描 det+rec。"""
|
|
|
|
|
+ h, w = cell_img.shape[:2]
|
|
|
|
|
+ if h < 8 or w < 4:
|
|
|
|
|
+ return "", 0.0, []
|
|
|
|
|
+ n_strips = max(2, int(n_strips))
|
|
|
|
|
+ strip_h = max(1, h // n_strips)
|
|
|
|
|
+ all_lines: List[Dict[str, Any]] = []
|
|
|
|
|
+ for si in range(n_strips):
|
|
|
|
|
+ y1 = si * strip_h
|
|
|
|
|
+ y2 = h if si == n_strips - 1 else (si + 1) * strip_h
|
|
|
|
|
+ strip = cell_img[y1:y2, :]
|
|
|
|
|
+ if strip.size == 0:
|
|
|
|
|
+ continue
|
|
|
|
|
+ for ln in self._recognize_cell_lines_detailed(strip):
|
|
|
|
|
+ bb = ln.get("det_bbox") or []
|
|
|
|
|
+ if len(bb) >= 4:
|
|
|
|
|
+ ln = dict(ln)
|
|
|
|
|
+ ln["det_bbox"] = [bb[0], bb[1] + y1, bb[2], bb[3] + y1]
|
|
|
|
|
+ all_lines.append(ln)
|
|
|
|
|
+ blocks = [(ln["text"], ln["score"]) for ln in all_lines]
|
|
|
|
|
+ text, score = self.aggregate_line_ocr(
|
|
|
|
|
+ blocks,
|
|
|
|
|
+ line_min_score=self.second_pass_line_min_score,
|
|
|
|
|
+ drop_low_score_blocks=self.second_pass_drop_low,
|
|
|
|
|
+ )
|
|
|
|
|
+ return text, score, all_lines
|
|
|
|
|
+
|
|
|
|
|
+ def _upscale_cell_if_small(
|
|
|
|
|
+ self, cell_img: np.ndarray, min_side: Optional[int] = None
|
|
|
|
|
+ ) -> np.ndarray:
|
|
|
|
|
+ min_side = min_side if min_side is not None else self.second_pass_light_upscale_min
|
|
|
|
|
+ ch, cw = cell_img.shape[:2]
|
|
|
|
|
+ if ch >= min_side and cw >= min_side:
|
|
|
|
|
+ return cell_img
|
|
|
|
|
+ scale = max(min_side / max(ch, 1), min_side / max(cw, 1), 1.0)
|
|
|
|
|
+ if scale <= 1.0:
|
|
|
|
|
+ return cell_img
|
|
|
|
|
+ return cv2.resize(
|
|
|
|
|
+ cell_img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def _denoise_cell(self, cell_img: np.ndarray) -> np.ndarray:
|
|
|
|
|
+ if not self._cell_denoise_enabled:
|
|
|
|
|
+ return cell_img
|
|
|
|
|
+ method = self._cell_denoise_method
|
|
|
|
|
+ if method == "median":
|
|
|
|
|
+ k = 3
|
|
|
|
|
+ if cell_img.ndim == 2:
|
|
|
|
|
+ return cv2.medianBlur(cell_img, k)
|
|
|
|
|
+ return cv2.medianBlur(cell_img, k)
|
|
|
|
|
+ return cell_img
|
|
|
|
|
+
|
|
|
|
|
+ def _apply_cell_contrast(
|
|
|
|
|
+ self, cell_img: np.ndarray, contrast_cfg: Dict[str, Any]
|
|
|
|
|
+ ) -> np.ndarray:
|
|
|
|
|
+ from ocr_utils.watermark.contrast import apply_contrast_enhancement_config
|
|
|
|
|
+
|
|
|
|
|
+ if not contrast_cfg.get("enabled", False):
|
|
|
|
|
+ return cell_img
|
|
|
|
|
+ if len(cell_img.shape) == 3:
|
|
|
|
|
+ gray = cv2.cvtColor(cell_img, cv2.COLOR_BGR2GRAY)
|
|
|
|
|
+ else:
|
|
|
|
|
+ gray = cell_img
|
|
|
|
|
+ gray = apply_contrast_enhancement_config(gray, contrast_cfg)
|
|
|
|
|
+ if self.second_pass_enhance_sharpen.get("enabled", False):
|
|
|
|
|
+ amount = float(self.second_pass_enhance_sharpen.get("amount", 0.3))
|
|
|
|
|
+ blurred = cv2.GaussianBlur(gray, (0, 0), 1.0)
|
|
|
|
|
+ gray = cv2.addWeighted(gray, 1.0 + amount, blurred, -amount, 0)
|
|
|
|
|
+ if cell_img.ndim == 3:
|
|
|
|
|
+ return cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
|
|
|
|
|
+ return gray
|
|
|
|
|
+
|
|
|
|
|
+ def _preprocess_cell_for_ocr(
|
|
|
|
|
+ self, cell_img: np.ndarray, mode: str = "light"
|
|
|
|
|
+ ) -> Tuple[np.ndarray, List[str]]:
|
|
|
|
|
+ stages: List[str] = []
|
|
|
|
|
+ img = cell_img
|
|
|
|
|
+
|
|
|
|
|
+ if self._cell_wm_processor.enabled:
|
|
|
|
|
+ img, wm_stages = self._cell_wm_processor.process(img, force=True)
|
|
|
|
|
+ stages.extend(wm_stages)
|
|
|
|
|
+
|
|
|
|
|
+ if self._cell_denoise_enabled and "wm" in stages:
|
|
|
|
|
+ img = self._denoise_cell(img)
|
|
|
|
|
+ stages.append("denoise")
|
|
|
|
|
+
|
|
|
|
|
+ if mode == "enhance":
|
|
|
|
|
+ contrast_cfg = self.second_pass_enhance_contrast
|
|
|
|
|
+ if self._cell_contrast_cfg.get("enabled", False):
|
|
|
|
|
+ contrast_cfg = self._cell_contrast_cfg
|
|
|
|
|
+ if contrast_cfg.get("enabled", False) and "wm" in stages:
|
|
|
|
|
+ img = self._apply_cell_contrast(img, contrast_cfg)
|
|
|
|
|
+ stages.append("contrast")
|
|
|
|
|
+
|
|
|
|
|
+ img = self._upscale_cell_if_small(img)
|
|
|
|
|
+ stages.append("upscale")
|
|
|
|
|
+ return img, stages
|
|
|
|
|
|
|
|
def _pick_line_vs_whole(
|
|
def _pick_line_vs_whole(
|
|
|
self,
|
|
self,
|
|
@@ -280,23 +469,339 @@ class TextFiller:
|
|
|
line_score: float,
|
|
line_score: float,
|
|
|
whole_text: str,
|
|
whole_text: str,
|
|
|
whole_score: float,
|
|
whole_score: float,
|
|
|
|
|
+ strip_text: str = "",
|
|
|
|
|
+ strip_score: float = 0.0,
|
|
|
) -> Tuple[str, float, str]:
|
|
) -> Tuple[str, float, str]:
|
|
|
- """返回 (text, score, strategy) strategy in lines|whole|tie_whole|tie_lines."""
|
|
|
|
|
- if not self.second_pass_whole_fallback:
|
|
|
|
|
- return line_text, line_score, "lines"
|
|
|
|
|
- if not whole_text and line_text:
|
|
|
|
|
- return line_text, line_score, "lines"
|
|
|
|
|
- if whole_text and not line_text:
|
|
|
|
|
- return whole_text, whole_score, "whole"
|
|
|
|
|
- if not whole_text and not line_text:
|
|
|
|
|
|
|
+ """返回 (text, score, strategy)。"""
|
|
|
|
|
+ candidates: List[Tuple[str, float, str]] = []
|
|
|
|
|
+ if line_text:
|
|
|
|
|
+ candidates.append((line_text, line_score, "lines"))
|
|
|
|
|
+ if whole_text and self.second_pass_whole_fallback:
|
|
|
|
|
+ candidates.append((whole_text, whole_score, "whole"))
|
|
|
|
|
+ if strip_text:
|
|
|
|
|
+ candidates.append((strip_text, strip_score, "strip"))
|
|
|
|
|
+
|
|
|
|
|
+ if not candidates:
|
|
|
return "", 0.0, "empty"
|
|
return "", 0.0, "empty"
|
|
|
- if line_score > whole_score:
|
|
|
|
|
- return line_text, line_score, "lines"
|
|
|
|
|
- if line_score < whole_score:
|
|
|
|
|
- return whole_text, whole_score, "whole"
|
|
|
|
|
- if self.second_pass_prefer_whole_on_tie and whole_text:
|
|
|
|
|
- return whole_text, whole_score, "tie_whole"
|
|
|
|
|
- return line_text, line_score, "tie_lines"
|
|
|
|
|
|
|
+
|
|
|
|
|
+ if (
|
|
|
|
|
+ whole_text
|
|
|
|
|
+ and line_text
|
|
|
|
|
+ and line_score > whole_score
|
|
|
|
|
+ and len(whole_text) >= len(line_text) + self.second_pass_whole_longer_extra
|
|
|
|
|
+ and len(whole_text) > len(line_text)
|
|
|
|
|
+ ):
|
|
|
|
|
+ return whole_text, whole_score, "whole_longer"
|
|
|
|
|
+
|
|
|
|
|
+ if (
|
|
|
|
|
+ strip_text
|
|
|
|
|
+ and line_text
|
|
|
|
|
+ and line_score > strip_score
|
|
|
|
|
+ and len(strip_text) >= len(line_text) + self.second_pass_whole_longer_extra
|
|
|
|
|
+ and len(strip_text) > len(line_text)
|
|
|
|
|
+ ):
|
|
|
|
|
+ return strip_text, strip_score, "strip_longer"
|
|
|
|
|
+
|
|
|
|
|
+ best = max(candidates, key=lambda c: (c[1], len(c[0])))
|
|
|
|
|
+ if len(candidates) > 1:
|
|
|
|
|
+ top_score = best[1]
|
|
|
|
|
+ tied = [c for c in candidates if abs(c[1] - top_score) < 1e-6]
|
|
|
|
|
+ if len(tied) > 1 and self.second_pass_prefer_whole_on_tie:
|
|
|
|
|
+ for pref in ("whole", "strip", "lines"):
|
|
|
|
|
+ for c in tied:
|
|
|
|
|
+ if c[2] == pref or c[2].endswith(pref):
|
|
|
|
|
+ if pref == "whole" and c[2] == "whole":
|
|
|
|
|
+ return c[0], c[1], "tie_whole"
|
|
|
|
|
+ if pref == "strip" and "strip" in c[2]:
|
|
|
|
|
+ return c[0], c[1], "tie_strip"
|
|
|
|
|
+ return best[0], best[1], "tie_lines"
|
|
|
|
|
+ return best[0], best[1], best[2]
|
|
|
|
|
+
|
|
|
|
|
+ @staticmethod
|
|
|
|
|
+ def _pick_better_ocr_result(
|
|
|
|
|
+ pass1: Dict[str, Any], pass2: Dict[str, Any]
|
|
|
|
|
+ ) -> Dict[str, Any]:
|
|
|
|
|
+ """Pass2 增强重试后择优;拒绝异常分数或覆盖已接受的高分短文本。"""
|
|
|
|
|
+ t1 = (pass1.get("final_text") or "").strip()
|
|
|
|
|
+ t2 = (pass2.get("final_text") or "").strip()
|
|
|
|
|
+ s1 = float(pass1.get("final_score") or 0.0)
|
|
|
|
|
+ s2 = float(pass2.get("final_score") or 0.0)
|
|
|
|
|
+ if not t2:
|
|
|
|
|
+ return pass1
|
|
|
|
|
+ if not t1:
|
|
|
|
|
+ return pass2 if 0.0 <= s2 <= 1.0 else pass1
|
|
|
|
|
+ if s2 > 1.0 or s2 < 0.0:
|
|
|
|
|
+ return pass1
|
|
|
|
|
+ if pass1.get("accepted") and not pass2.get("accepted"):
|
|
|
|
|
+ return pass1
|
|
|
|
|
+ if s1 >= 0.95 and len(t2) > len(t1) + 2 and s2 < 0.5:
|
|
|
|
|
+ return pass1
|
|
|
|
|
+ if len(t2) > len(t1) + 1 and s1 >= 0.9 and s2 <= s1:
|
|
|
|
|
+ return pass1
|
|
|
|
|
+ if len(t2) > len(t1) + 1:
|
|
|
|
|
+ return pass2
|
|
|
|
|
+ if len(t1) > len(t2) + 1:
|
|
|
|
|
+ return pass1
|
|
|
|
|
+ if s2 > s1 + 0.02:
|
|
|
|
|
+ return pass2
|
|
|
|
|
+ if s1 > s2 + 0.02:
|
|
|
|
|
+ return pass1
|
|
|
|
|
+ return pass2 if len(t2) >= len(t1) else pass1
|
|
|
|
|
+
|
|
|
|
|
+ def _should_run_whole_fallback(
|
|
|
|
|
+ self,
|
|
|
|
|
+ line_text: str,
|
|
|
|
|
+ line_score: float,
|
|
|
|
|
+ cell_img: np.ndarray,
|
|
|
|
|
+ line_blocks: List[Tuple[str, float]],
|
|
|
|
|
+ base_conf_th: float,
|
|
|
|
|
+ ) -> bool:
|
|
|
|
|
+ if not self.second_pass_whole_fallback:
|
|
|
|
|
+ return False
|
|
|
|
|
+ if not line_text:
|
|
|
|
|
+ return True
|
|
|
|
|
+ if line_score < base_conf_th:
|
|
|
|
|
+ return True
|
|
|
|
|
+ if self._needs_strip_line_fallback(cell_img, line_blocks):
|
|
|
|
|
+ return True
|
|
|
|
|
+ if (
|
|
|
|
|
+ line_text
|
|
|
|
|
+ and line_score >= base_conf_th
|
|
|
|
|
+ and len(line_text) < self.second_pass_enhance_min_chars
|
|
|
|
|
+ ):
|
|
|
|
|
+ return True
|
|
|
|
|
+ return False
|
|
|
|
|
+
|
|
|
|
|
+ def _needs_enhance_retry(
|
|
|
|
|
+ self,
|
|
|
|
|
+ result: Dict[str, Any],
|
|
|
|
|
+ cell_img: np.ndarray,
|
|
|
|
|
+ dynamic_conf_th: float,
|
|
|
|
|
+ ) -> Tuple[bool, List[str]]:
|
|
|
|
|
+ if not self.second_pass_enhance_retry_enabled:
|
|
|
|
|
+ return False, []
|
|
|
|
|
+ reasons: List[str] = []
|
|
|
|
|
+ text = (result.get("final_text") or "").strip()
|
|
|
|
|
+ score = float(result.get("final_score") or 0.0)
|
|
|
|
|
+ if not result.get("accepted", False):
|
|
|
|
|
+ reasons.append("not_accepted")
|
|
|
|
|
+ if score < self.second_pass_enhance_score_below:
|
|
|
|
|
+ reasons.append("score_below_threshold")
|
|
|
|
|
+ if text and len(text) < self.second_pass_enhance_min_chars:
|
|
|
|
|
+ reasons.append("suspicious_short_text")
|
|
|
|
|
+ h, w = cell_img.shape[:2]
|
|
|
|
|
+ if (
|
|
|
|
|
+ self.second_pass_enhance_short_tall
|
|
|
|
|
+ and w > 0
|
|
|
|
|
+ and h / w >= self.second_pass_strip_aspect
|
|
|
|
|
+ and len(result.get("lines") or []) <= 1
|
|
|
|
|
+ and len(text) < self.second_pass_enhance_min_chars + 2
|
|
|
|
|
+ ):
|
|
|
|
|
+ reasons.append("tall_cell_single_line")
|
|
|
|
|
+ return bool(reasons), reasons
|
|
|
|
|
+
|
|
|
|
|
+ def _ocr_one_cell(
|
|
|
|
|
+ self, cell_img: np.ndarray, base_conf_th: float
|
|
|
|
|
+ ) -> Dict[str, Any]:
|
|
|
|
|
+ line_entries = self._recognize_cell_lines_detailed(cell_img)
|
|
|
|
|
+ line_blocks = [(ln["text"], ln["score"]) for ln in line_entries]
|
|
|
|
|
+ line_text, line_score = self.aggregate_line_ocr(
|
|
|
|
|
+ line_blocks,
|
|
|
|
|
+ line_min_score=self.second_pass_line_min_score,
|
|
|
|
|
+ drop_low_score_blocks=self.second_pass_drop_low,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ whole_text, whole_score = "", 0.0
|
|
|
|
|
+ whole_skipped = "line_score_ok"
|
|
|
|
|
+ run_whole = self._should_run_whole_fallback(
|
|
|
|
|
+ line_text, line_score, cell_img, line_blocks, base_conf_th
|
|
|
|
|
+ )
|
|
|
|
|
+ if run_whole:
|
|
|
|
|
+ whole_text, whole_score = self._recognize_whole_cell(cell_img)
|
|
|
|
|
+ whole_skipped = None
|
|
|
|
|
+ elif line_text and line_score >= base_conf_th:
|
|
|
|
|
+ if len(line_text) < self.second_pass_enhance_min_chars:
|
|
|
|
|
+ whole_skipped = "short_text_high_score"
|
|
|
|
|
+ else:
|
|
|
|
|
+ whole_skipped = "line_score>=%.2f" % base_conf_th
|
|
|
|
|
+ else:
|
|
|
|
|
+ whole_skipped = "line_score>=%.2f" % base_conf_th
|
|
|
|
|
+
|
|
|
|
|
+ strip_text, strip_score, strip_lines = "", 0.0, []
|
|
|
|
|
+ if self._needs_strip_line_fallback(cell_img, line_blocks):
|
|
|
|
|
+ strip_text, strip_score, strip_lines = self._recognize_strip_fallback(
|
|
|
|
|
+ cell_img
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ final_text, final_score, strategy = self._pick_line_vs_whole(
|
|
|
|
|
+ line_text,
|
|
|
|
|
+ line_score,
|
|
|
|
|
+ whole_text,
|
|
|
|
|
+ whole_score,
|
|
|
|
|
+ strip_text,
|
|
|
|
|
+ strip_score,
|
|
|
|
|
+ )
|
|
|
|
|
+ dynamic_conf_th = self.calculate_dynamic_confidence_threshold(
|
|
|
|
|
+ final_text, base_conf_th
|
|
|
|
|
+ )
|
|
|
|
|
+ accepted = bool(final_text) and final_score >= dynamic_conf_th
|
|
|
|
|
+ return {
|
|
|
|
|
+ "lines": line_entries,
|
|
|
|
|
+ "line_aggregate": {"text": line_text, "score": line_score},
|
|
|
|
|
+ "whole": {
|
|
|
|
|
+ "text": whole_text,
|
|
|
|
|
+ "score": whole_score,
|
|
|
|
|
+ "skipped": whole_skipped,
|
|
|
|
|
+ },
|
|
|
|
|
+ "strip": {
|
|
|
|
|
+ "text": strip_text,
|
|
|
|
|
+ "score": strip_score,
|
|
|
|
|
+ "lines": strip_lines,
|
|
|
|
|
+ },
|
|
|
|
|
+ "final_text": final_text,
|
|
|
|
|
+ "final_score": final_score,
|
|
|
|
|
+ "strategy": strategy,
|
|
|
|
|
+ "dynamic_conf_threshold": dynamic_conf_th,
|
|
|
|
|
+ "accepted": accepted,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def _infer_header_row(
|
|
|
|
|
+ self,
|
|
|
|
|
+ merged_cells: Optional[List[Dict[str, Any]]],
|
|
|
|
|
+ texts: List[str],
|
|
|
|
|
+ scores: List[float],
|
|
|
|
|
+ ) -> int:
|
|
|
|
|
+ if self.second_pass_header_row >= 0:
|
|
|
|
|
+ return self.second_pass_header_row
|
|
|
|
|
+ if not merged_cells:
|
|
|
|
|
+ return 0
|
|
|
|
|
+ row_scores: Dict[int, List[float]] = {}
|
|
|
|
|
+ for i, cell in enumerate(merged_cells):
|
|
|
|
|
+ row = int(cell.get("row", 0))
|
|
|
|
|
+ t = (texts[i] if i < len(texts) else "").strip()
|
|
|
|
|
+ sc = float(scores[i] if i < len(scores) else 0.0)
|
|
|
|
|
+ if t:
|
|
|
|
|
+ row_scores.setdefault(row, []).append(sc)
|
|
|
|
|
+ if not row_scores:
|
|
|
|
|
+ return 0
|
|
|
|
|
+ best_row = 0
|
|
|
|
|
+ best_avg = -1.0
|
|
|
|
|
+ for row, scs in row_scores.items():
|
|
|
|
|
+ avg = sum(scs) / len(scs)
|
|
|
|
|
+ if avg > best_avg:
|
|
|
|
|
+ best_avg = avg
|
|
|
|
|
+ best_row = row
|
|
|
|
|
+ return best_row
|
|
|
|
|
+
|
|
|
|
|
+ def _should_second_pass_cell(
|
|
|
|
|
+ self,
|
|
|
|
|
+ i: int,
|
|
|
|
|
+ texts: List[str],
|
|
|
|
|
+ scores: List[float],
|
|
|
|
|
+ need_reocr_indices: List[int],
|
|
|
|
|
+ merged_cells: Optional[List[Dict[str, Any]]],
|
|
|
|
|
+ pdf_type: str,
|
|
|
|
|
+ force_all: bool,
|
|
|
|
|
+ header_row: int,
|
|
|
|
|
+ ) -> Tuple[bool, List[str]]:
|
|
|
|
|
+ reasons: List[str] = []
|
|
|
|
|
+ t = texts[i] if i < len(texts) else ""
|
|
|
|
|
+ sc = float(scores[i] if i < len(scores) else 0.0)
|
|
|
|
|
+ bbox_row = None
|
|
|
|
|
+ if merged_cells and i < len(merged_cells):
|
|
|
|
|
+ bbox_row = int(merged_cells[i].get("row", 0))
|
|
|
|
|
+
|
|
|
|
|
+ if force_all:
|
|
|
|
|
+ return True, ["force_all"]
|
|
|
|
|
+ if i in need_reocr_indices:
|
|
|
|
|
+ reasons.append("spanning_or_cross_cell")
|
|
|
|
|
+ if sc < 0.90:
|
|
|
|
|
+ reasons.append("low_first_pass_score")
|
|
|
|
|
+ if merged_cells and i < len(merged_cells):
|
|
|
|
|
+ bb = merged_cells[i].get("bbox") or []
|
|
|
|
|
+ if len(bb) >= 4:
|
|
|
|
|
+ w_box = bb[2] - bb[0]
|
|
|
|
|
+ h_box = bb[3] - bb[1]
|
|
|
|
|
+ if h_box > w_box * 2.5 and sc < 0.95:
|
|
|
|
|
+ reasons.append("tall_cell_low_score")
|
|
|
|
|
+
|
|
|
|
|
+ if self.second_pass_reocr_mode == "bank_statement" and merged_cells:
|
|
|
|
|
+ if bbox_row is not None and bbox_row > header_row and not (t or "").strip():
|
|
|
|
|
+ if "body_row_empty" not in reasons:
|
|
|
|
|
+ reasons.append("body_row_empty")
|
|
|
|
|
+ if bbox_row is not None and bbox_row > header_row:
|
|
|
|
|
+ same_row_nonempty = 0
|
|
|
|
|
+ for j, other in enumerate(merged_cells):
|
|
|
|
|
+ if int(other.get("row", -1)) != bbox_row:
|
|
|
|
|
+ continue
|
|
|
|
|
+ ot = (texts[j] if j < len(texts) else "").strip()
|
|
|
|
|
+ if ot:
|
|
|
|
|
+ same_row_nonempty += 1
|
|
|
|
|
+ if (
|
|
|
|
|
+ not (t or "").strip()
|
|
|
|
|
+ and same_row_nonempty >= self.second_pass_row_peer_min_nonempty
|
|
|
|
|
+ and "row_peer_nonempty" not in reasons
|
|
|
|
|
+ ):
|
|
|
|
|
+ reasons.append("row_peer_nonempty")
|
|
|
|
|
+
|
|
|
|
|
+ if not reasons:
|
|
|
|
|
+ if (not t or not t.strip()) and sc < 0.95 and pdf_type != "txt":
|
|
|
|
|
+ reasons.append("empty_low_score")
|
|
|
|
|
+
|
|
|
|
|
+ return bool(reasons), reasons
|
|
|
|
|
+
|
|
|
|
|
+ def _save_cell_ocr_debug(
|
|
|
|
|
+ self,
|
|
|
|
|
+ cell_ocr_dir: str,
|
|
|
|
|
+ cell_idx: int,
|
|
|
|
|
+ debug_img: np.ndarray,
|
|
|
|
|
+ result: Dict[str, Any],
|
|
|
|
|
+ *,
|
|
|
|
|
+ first_pass_text: str = "",
|
|
|
|
|
+ first_pass_score: float = 0.0,
|
|
|
|
|
+ trigger_reasons: Optional[List[str]] = None,
|
|
|
|
|
+ bbox: Optional[List[float]] = None,
|
|
|
|
|
+ pass_label: str = "",
|
|
|
|
|
+ ) -> None:
|
|
|
|
|
+ tag = self.sanitize_debug_filename(result.get("final_text") or "empty")
|
|
|
|
|
+ strategy = result.get("strategy") or "empty"
|
|
|
|
|
+ stem = f"cell{cell_idx:03d}"
|
|
|
|
|
+ if pass_label:
|
|
|
|
|
+ stem += f"_{pass_label}"
|
|
|
|
|
+ stem += f"_{strategy}_{tag}"
|
|
|
|
|
+ png_path = os.path.join(cell_ocr_dir, f"{stem}.png")
|
|
|
|
|
+ try:
|
|
|
|
|
+ cv2.imwrite(png_path, debug_img)
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"保存单元格OCR图片失败 (cell {cell_idx}): {e}")
|
|
|
|
|
+ return
|
|
|
|
|
+ payload = {
|
|
|
|
|
+ "cell_idx": cell_idx,
|
|
|
|
|
+ "bbox": bbox,
|
|
|
|
|
+ "first_pass": {"text": first_pass_text, "score": first_pass_score},
|
|
|
|
|
+ "trigger_reason": trigger_reasons or [],
|
|
|
|
|
+ "lines": result.get("lines") or [],
|
|
|
|
|
+ "line_aggregate": result.get("line_aggregate"),
|
|
|
|
|
+ "whole": result.get("whole"),
|
|
|
|
|
+ "strip": result.get("strip"),
|
|
|
|
|
+ "final": {
|
|
|
|
|
+ "text": result.get("final_text") or "",
|
|
|
|
|
+ "score": result.get("final_score") or 0.0,
|
|
|
|
|
+ "strategy": strategy,
|
|
|
|
|
+ "accepted": result.get("accepted", False),
|
|
|
|
|
+ },
|
|
|
|
|
+ "dynamic_conf_threshold": result.get("dynamic_conf_threshold"),
|
|
|
|
|
+ "pass1": result.get("pass1"),
|
|
|
|
|
+ "pass2": result.get("pass2"),
|
|
|
|
|
+ "enhance_retry": result.get("enhance_retry"),
|
|
|
|
|
+ "preprocess_stages": result.get("preprocess_stages") or [],
|
|
|
|
|
+ }
|
|
|
|
|
+ json_path = os.path.join(cell_ocr_dir, f"{stem}.json")
|
|
|
|
|
+ try:
|
|
|
|
|
+ with open(json_path, "w", encoding="utf-8") as f:
|
|
|
|
|
+ json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.warning(f"保存单元格OCR JSON失败 (cell {cell_idx}): {e}")
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
|
def calculate_dynamic_confidence_threshold(text: str, base_threshold: float = 0.9) -> float:
|
|
def calculate_dynamic_confidence_threshold(text: str, base_threshold: float = 0.9) -> float:
|
|
@@ -683,34 +1188,6 @@ class TextFiller:
|
|
|
|
|
|
|
|
processed_ocr_indices.add(ocr_idx)
|
|
processed_ocr_indices.add(ocr_idx)
|
|
|
|
|
|
|
|
- # 已匹配到单元格但 OCR box 宽度明显超出单元格(漏检跨格的补充)
|
|
|
|
|
- # for cell_idx, cell_bbox in enumerate(bboxes):
|
|
|
|
|
- # if not matched_boxes_list[cell_idx]:
|
|
|
|
|
- # continue
|
|
|
|
|
- # cell_w = cell_bbox[2] - cell_bbox[0]
|
|
|
|
|
- # if cell_w <= 0:
|
|
|
|
|
- # continue
|
|
|
|
|
- # for box in matched_boxes_list[cell_idx]:
|
|
|
|
|
- # ocr_bbox = CoordinateUtils.poly_to_bbox(box.get("bbox", []))
|
|
|
|
|
- # if not ocr_bbox or len(ocr_bbox) < 4:
|
|
|
|
|
- # continue
|
|
|
|
|
- # ocr_w = ocr_bbox[2] - ocr_bbox[0]
|
|
|
|
|
- # if ocr_w <= cell_w * self.ocr_bbox_width_overflow_ratio:
|
|
|
|
|
- # continue
|
|
|
|
|
- # cx = (ocr_bbox[0] + ocr_bbox[2]) / 2
|
|
|
|
|
- # cy = (ocr_bbox[1] + ocr_bbox[3]) / 2
|
|
|
|
|
- # spanning = self.detect_ocr_box_spanning_cells(
|
|
|
|
|
- # ocr_bbox, bboxes, center_point=(cx, cy)
|
|
|
|
|
- # )
|
|
|
|
|
- # targets = spanning if len(spanning) >= 2 else [cell_idx]
|
|
|
|
|
- # for tidx in targets:
|
|
|
|
|
- # if tidx not in need_reocr_indices:
|
|
|
|
|
- # need_reocr_indices.append(tidx)
|
|
|
|
|
- # logger.debug(
|
|
|
|
|
- # f"OCR box 宽度({ocr_w:.0f})超出单元格{cell_idx}宽度({cell_w:.0f}),"
|
|
|
|
|
- # f"标记重识别: {targets}"
|
|
|
|
|
- # )
|
|
|
|
|
-
|
|
|
|
|
return texts, scores, matched_boxes_list, need_reocr_indices
|
|
return texts, scores, matched_boxes_list, need_reocr_indices
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
@@ -864,10 +1341,11 @@ class TextFiller:
|
|
|
force_all: bool = False,
|
|
force_all: bool = False,
|
|
|
output_dir: Optional[str] = None,
|
|
output_dir: Optional[str] = None,
|
|
|
debug_prefix: Optional[str] = None,
|
|
debug_prefix: Optional[str] = None,
|
|
|
|
|
+ merged_cells: Optional[List[Dict[str, Any]]] = None,
|
|
|
) -> List[str]:
|
|
) -> List[str]:
|
|
|
"""
|
|
"""
|
|
|
- 二次OCR:分行 det+rec(低分块丢弃、长度加权置信度)+ 整格 det=False 兜底择优。
|
|
|
|
|
- debug 图落盘至 output_dir/{debug_prefix}/cell{idx}_{text}.png
|
|
|
|
|
|
|
+ 二次OCR:分行 det+rec + 整格/条带兜底 + 低分笔画增强重试。
|
|
|
|
|
+ debug: output_dir/{debug_prefix}/cell{idx}_{strategy}_{tag}.png + 同名 .json
|
|
|
"""
|
|
"""
|
|
|
try:
|
|
try:
|
|
|
if not self.ocr_engine:
|
|
if not self.ocr_engine:
|
|
@@ -888,28 +1366,21 @@ class TextFiller:
|
|
|
|
|
|
|
|
h_img, w_img = table_image.shape[:2]
|
|
h_img, w_img = table_image.shape[:2]
|
|
|
margin = self.cell_crop_margin
|
|
margin = self.cell_crop_margin
|
|
|
- trigger_score_thresh = 0.90
|
|
|
|
|
-
|
|
|
|
|
- crop_list: List[np.ndarray] = []
|
|
|
|
|
- crop_indices: List[int] = []
|
|
|
|
|
-
|
|
|
|
|
- for i, t in enumerate(texts):
|
|
|
|
|
- bbox = bboxes[i]
|
|
|
|
|
- w_box = bbox[2] - bbox[0]
|
|
|
|
|
- h_box = bbox[3] - bbox[1]
|
|
|
|
|
-
|
|
|
|
|
- need_reocr = False
|
|
|
|
|
- if force_all:
|
|
|
|
|
- need_reocr = True
|
|
|
|
|
- elif i in need_reocr_indices:
|
|
|
|
|
- need_reocr = True
|
|
|
|
|
- elif (not t or not t.strip()) and scores[i] < 0.95:
|
|
|
|
|
- need_reocr = pdf_type != 'txt'
|
|
|
|
|
- elif scores[i] < trigger_score_thresh:
|
|
|
|
|
- need_reocr = True
|
|
|
|
|
- elif h_box > w_box * 2.5 and scores[i] < 0.95:
|
|
|
|
|
- need_reocr = True
|
|
|
|
|
|
|
+ header_row = self._infer_header_row(merged_cells, texts, scores)
|
|
|
|
|
+
|
|
|
|
|
+ jobs: List[Tuple[int, np.ndarray, List[str], List[float]]] = []
|
|
|
|
|
|
|
|
|
|
+ for i, _t in enumerate(texts):
|
|
|
|
|
+ need_reocr, trigger_reasons = self._should_second_pass_cell(
|
|
|
|
|
+ i,
|
|
|
|
|
+ texts,
|
|
|
|
|
+ scores,
|
|
|
|
|
+ need_reocr_indices,
|
|
|
|
|
+ merged_cells,
|
|
|
|
|
+ pdf_type,
|
|
|
|
|
+ force_all,
|
|
|
|
|
+ header_row,
|
|
|
|
|
+ )
|
|
|
if not need_reocr or i >= len(bboxes):
|
|
if not need_reocr or i >= len(bboxes):
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
@@ -921,61 +1392,92 @@ class TextFiller:
|
|
|
if x2 <= x1 or y2 <= y1:
|
|
if x2 <= x1 or y2 <= y1:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- cell_img = table_image[y1:y2, x1:x2]
|
|
|
|
|
- if cell_img.size == 0:
|
|
|
|
|
|
|
+ raw_crop = table_image[y1:y2, x1:x2]
|
|
|
|
|
+ if raw_crop.size == 0:
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- ch, cw = cell_img.shape[:2]
|
|
|
|
|
- if ch < 64 or cw < 64:
|
|
|
|
|
- cell_img = cv2.resize(
|
|
|
|
|
- cell_img, None, fx=2.0, fy=2.0, interpolation=cv2.INTER_CUBIC
|
|
|
|
|
- )
|
|
|
|
|
- logger.debug(
|
|
|
|
|
- f"单元格 {i} 裁剪过小,放大至 {cell_img.shape[1]}x{cell_img.shape[0]} 像素"
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- crop_list.append(cell_img)
|
|
|
|
|
- crop_indices.append(i)
|
|
|
|
|
|
|
+ fp_text = texts[i] if i < len(texts) else ""
|
|
|
|
|
+ fp_score = float(scores[i] if i < len(scores) else 0.0)
|
|
|
|
|
+ jobs.append((i, raw_crop, trigger_reasons, [fp_text, fp_score, bboxes[i]]))
|
|
|
|
|
|
|
|
- if not crop_list:
|
|
|
|
|
|
|
+ if not jobs:
|
|
|
return texts
|
|
return texts
|
|
|
|
|
|
|
|
- logger.info(f"触发二次OCR: {len(crop_list)} 个单元格 (总数 {len(texts)})")
|
|
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ f"触发二次OCR: {len(jobs)} 个单元格 (总数 {len(texts)}, "
|
|
|
|
|
+ f"mode={self.second_pass_reocr_mode}, header_row={header_row})"
|
|
|
|
|
+ )
|
|
|
base_conf_th = self.ocr_conf_threshold
|
|
base_conf_th = self.ocr_conf_threshold
|
|
|
- line_min = self.second_pass_line_min_score
|
|
|
|
|
- drop_low = self.second_pass_drop_low
|
|
|
|
|
|
|
|
|
|
- for k, cell_img in enumerate(crop_list):
|
|
|
|
|
- cell_idx = crop_indices[k]
|
|
|
|
|
-
|
|
|
|
|
- line_blocks = self._recognize_cell_lines(cell_img)
|
|
|
|
|
- line_text, line_score = self.aggregate_line_ocr(
|
|
|
|
|
- line_blocks,
|
|
|
|
|
- line_min_score=line_min,
|
|
|
|
|
- drop_low_score_blocks=drop_low,
|
|
|
|
|
|
|
+ for cell_idx, raw_crop, trigger_reasons, meta in jobs:
|
|
|
|
|
+ fp_text, fp_score, cell_bbox = meta[0], float(meta[1]), meta[2]
|
|
|
|
|
+ cell_img, preprocess_stages = self._preprocess_cell_for_ocr(
|
|
|
|
|
+ raw_crop, mode="light"
|
|
|
)
|
|
)
|
|
|
|
|
+ pass1 = self._ocr_one_cell(cell_img, base_conf_th)
|
|
|
|
|
+ pass1["preprocess_stages"] = list(preprocess_stages)
|
|
|
|
|
+ pass1["pass1"] = {
|
|
|
|
|
+ "text": pass1.get("final_text"),
|
|
|
|
|
+ "score": pass1.get("final_score"),
|
|
|
|
|
+ "strategy": pass1.get("strategy"),
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- whole_text, whole_score = ("", 0.0)
|
|
|
|
|
- if self.second_pass_whole_fallback and line_score < base_conf_th:
|
|
|
|
|
- whole_text, whole_score = self._recognize_whole_cell(cell_img)
|
|
|
|
|
|
|
+ result = dict(pass1)
|
|
|
|
|
+ enhance_info: Dict[str, Any] = {"triggered": False, "reason": []}
|
|
|
|
|
+ dyn_th = float(pass1.get("dynamic_conf_threshold") or base_conf_th)
|
|
|
|
|
+ do_retry, retry_reasons = self._needs_enhance_retry(
|
|
|
|
|
+ pass1, cell_img, dyn_th
|
|
|
|
|
+ )
|
|
|
|
|
+ if do_retry:
|
|
|
|
|
+ enhance_info["triggered"] = True
|
|
|
|
|
+ enhance_info["reason"] = retry_reasons
|
|
|
|
|
+ enhanced_img, enhance_stages = self._preprocess_cell_for_ocr(
|
|
|
|
|
+ raw_crop, mode="enhance"
|
|
|
|
|
+ )
|
|
|
|
|
+ pass2 = self._ocr_one_cell(enhanced_img, base_conf_th)
|
|
|
|
|
+ pass2["preprocess_stages"] = list(enhance_stages)
|
|
|
|
|
+ pass2["pass2"] = {
|
|
|
|
|
+ "text": pass2.get("final_text"),
|
|
|
|
|
+ "score": pass2.get("final_score"),
|
|
|
|
|
+ "strategy": pass2.get("strategy"),
|
|
|
|
|
+ }
|
|
|
|
|
+ result = self._pick_better_ocr_result(pass1, pass2)
|
|
|
|
|
+ result["pass1"] = pass1.get("pass1")
|
|
|
|
|
+ result["pass2"] = pass2.get("pass2")
|
|
|
|
|
+ enhance_info["pass2"] = result.get("pass2")
|
|
|
|
|
+ result["enhance_retry"] = enhance_info
|
|
|
|
|
|
|
|
- final_text, final_score, strategy = self._pick_line_vs_whole(
|
|
|
|
|
- line_text, line_score, whole_text, whole_score
|
|
|
|
|
|
|
+ debug_img, _ = self._preprocess_cell_for_ocr(
|
|
|
|
|
+ raw_crop, mode="enhance" if enhance_info["triggered"] else "light"
|
|
|
)
|
|
)
|
|
|
|
|
+ if cell_ocr_dir:
|
|
|
|
|
+ self._save_cell_ocr_debug(
|
|
|
|
|
+ cell_ocr_dir,
|
|
|
|
|
+ cell_idx,
|
|
|
|
|
+ debug_img,
|
|
|
|
|
+ result,
|
|
|
|
|
+ first_pass_text=fp_text,
|
|
|
|
|
+ first_pass_score=fp_score,
|
|
|
|
|
+ trigger_reasons=trigger_reasons,
|
|
|
|
|
+ bbox=cell_bbox,
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
- if cell_ocr_dir and cell_img is not None:
|
|
|
|
|
- try:
|
|
|
|
|
- tag = self.sanitize_debug_filename(final_text or "empty")
|
|
|
|
|
- filename = f"cell{cell_idx:03d}_{strategy}_{tag}.png"
|
|
|
|
|
- cv2.imwrite(os.path.join(cell_ocr_dir, filename), cell_img)
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.warning(f"保存单元格OCR图片失败 (cell {cell_idx}): {e}")
|
|
|
|
|
|
|
+ final_text = (result.get("final_text") or "").strip()
|
|
|
|
|
+ final_score = float(result.get("final_score") or 0.0)
|
|
|
|
|
+ strategy = result.get("strategy") or "empty"
|
|
|
|
|
|
|
|
if not final_text:
|
|
if not final_text:
|
|
|
|
|
+ logger.debug(
|
|
|
|
|
+ f"单元格 {cell_idx} 二次OCR({strategy}) 无文本, "
|
|
|
|
|
+ f"trigger={trigger_reasons}"
|
|
|
|
|
+ )
|
|
|
continue
|
|
continue
|
|
|
|
|
|
|
|
- dynamic_conf_th = self.calculate_dynamic_confidence_threshold(
|
|
|
|
|
- final_text, base_conf_th
|
|
|
|
|
|
|
+ dynamic_conf_th = float(
|
|
|
|
|
+ result.get("dynamic_conf_threshold")
|
|
|
|
|
+ or self.calculate_dynamic_confidence_threshold(
|
|
|
|
|
+ final_text, base_conf_th
|
|
|
|
|
+ )
|
|
|
)
|
|
)
|
|
|
if final_score >= dynamic_conf_th:
|
|
if final_score >= dynamic_conf_th:
|
|
|
texts[cell_idx] = final_text
|
|
texts[cell_idx] = final_text
|