浏览代码

fix(优化二次OCR逻辑与参数): 更新二次OCR处理逻辑,增加对空单元格的判断,优化匹配框的使用,提升银行流水模式下的OCR准确性。同时调整相关测试用例以验证新逻辑的有效性。

zhch158_admin 7 小时之前
父节点
当前提交
a8ca9d4dc3

+ 7 - 3
ocr_tools/universal_doc_parser/models/adapters/mineru_wired_table.py

@@ -200,11 +200,14 @@ class MinerUWiredTableRecognizer:
             polys /= upscale
             # 使用 CoordinateUtils.poly_to_bbox() 替换 _poly_to_bbox()
             bboxes = [CoordinateUtils.poly_to_bbox(poly.tolist()) for poly in polys]
-            texts, scores, _, _ = self.text_filler.fill_text_by_center_point(bboxes, ocr_boxes)
+            texts, scores, matched_boxes_list, _ = self.text_filler.fill_text_by_center_point(bboxes, ocr_boxes)
 
             # 统一调用二次OCR封装(替换原有重复逻辑)
             if self.ocr_engine is not None and any(not t for t in texts):
-                texts = self.text_filler.second_pass_ocr_fill(table_image, bboxes, texts, scores)
+                texts = self.text_filler.second_pass_ocr_fill(
+                    table_image, bboxes, texts, scores,
+                    matched_boxes_list=matched_boxes_list,
+                )
 
             for idx, bbox in enumerate(bboxes):
                 lp = logic_points[idx] if len(logic_points) > idx else [0, 0, 0, 0]
@@ -464,7 +467,7 @@ class MinerUWiredTableRecognizer:
             bboxes_merged = [cell["bbox"] for cell in merged_cells]
             texts, scores, matched_boxes_list, need_reocr_indices = self.text_filler.fill_text_by_center_point(bboxes_merged, ocr_boxes or [])
             
-            # Step 4.5: 二次 OCR(银行流水:表体空单元必跑 + 低分/跨格;可选笔画增强重试)
+            # Step 4.5: 二次 OCR(银行流水:表头/表体空单元 + 低分/跨格;可选笔画增强重试)
             if hasattr(self, 'ocr_engine') and self.ocr_engine:
                 cell_ocr_dir = None
                 if debug_root is not None:
@@ -477,6 +480,7 @@ class MinerUWiredTableRecognizer:
                     output_dir=cell_ocr_dir,
                     debug_prefix=dbg.prefix or None,
                     merged_cells=merged_cells,
+                    matched_boxes_list=matched_boxes_list,
                 )
 
             for i, cell in enumerate(merged_cells):

+ 201 - 35
ocr_tools/universal_doc_parser/models/adapters/wired_table/text_filling.py

@@ -6,6 +6,7 @@
 from typing import List, Dict, Any, Tuple, Optional
 import bisect
 import json
+import math
 import cv2
 import numpy as np
 import os
@@ -55,8 +56,7 @@ class TextFiller:
         self.second_pass_header_row: int = 0
         self.second_pass_strip_aspect: float = 1.8
         self.second_pass_whole_longer_extra: int = 2
-        self.second_pass_row_peer_min_nonempty: int = 5
-        self.second_pass_suspicious_short_min_chars: int = 4
+        self.second_pass_suspicious_short_min_chars: int = 4 # 最少4个字符,否则会触发整格兜底
         cpp = sp_cfg.get("cell_preprocess") or {}
         if not isinstance(cpp, dict):
             cpp = {}
@@ -70,6 +70,10 @@ class TextFiller:
         if not isinstance(er, dict):
             er = {}
         self.second_pass_enhance_retry_enabled: bool = bool(er.get("enabled", False))
+        er_upscale = er.get("upscale_min_side")
+        self.second_pass_enhance_upscale_min: int = int(
+            er_upscale if er_upscale is not None else self.second_pass_light_upscale_min
+        )
         self.second_pass_enhance_score_below: float = float(
             er.get("score_below", 0.90)
         )
@@ -506,7 +510,12 @@ class TextFiller:
                 )
                 stages.append("contrast")
 
-        img = self._upscale_cell_if_small(img)
+        upscale_min = (
+            self.second_pass_enhance_upscale_min
+            if mode == "enhance"
+            else self.second_pass_light_upscale_min
+        )
+        img = self._upscale_cell_if_small(img, min_side=upscale_min)
         stages.append("upscale")
         return img, stages
 
@@ -749,6 +758,30 @@ class TextFiller:
                 best_row = row
         return best_row
 
+    @staticmethod
+    def _column_empty_ratio(
+        merged_cells: List[Dict[str, Any]],
+        matched_boxes_list: List[List[Dict[str, Any]]],
+        col: int,
+        header_row: int,
+    ) -> float:
+        """返回指定列中(表头除外)空值格比例。
+
+        基于已匹配的 OCR 框判空(而非 texts):
+        - matched_boxes_list[j] 非空 → OCR 框命中该格 → 列不空
+        - matched_boxes_list[j] 为空 → 无 OCR 框命中 → 列空
+
+        这比 texts 判空更可靠,因为 OCR 可能检测到框但未识别出文本。
+        """
+        col_cells = [
+            matched_boxes_list[j] if j < len(matched_boxes_list) else []
+            for j, c in enumerate(merged_cells)
+            if int(c.get("col", -1)) == col and int(c.get("row", 0)) > header_row
+        ]
+        if not col_cells:
+            return 0.0
+        return sum(1 for boxes in col_cells if not boxes) / len(col_cells)
+
     def _should_second_pass_cell(
         self,
         i: int,
@@ -756,6 +789,7 @@ class TextFiller:
         scores: List[float],
         need_reocr_indices: List[int],
         merged_cells: Optional[List[Dict[str, Any]]],
+        matched_boxes_list: Optional[List[List[Dict[str, Any]]]],
         pdf_type: str,
         force_all: bool,
         header_row: int,
@@ -764,8 +798,10 @@ class TextFiller:
         t = texts[i] if i < len(texts) else ""
         sc = float(scores[i] if i < len(scores) else 0.0)
         bbox_row = None
+        bbox_col = None
         if merged_cells and i < len(merged_cells):
             bbox_row = int(merged_cells[i].get("row", 0))
+            bbox_col = int(merged_cells[i].get("col", -1))
 
         if force_all:
             return True, ["force_all"]
@@ -781,31 +817,87 @@ class TextFiller:
                 if h_box > w_box * 2.5 and sc < 0.95:
                     reasons.append("tall_cell_low_score")
 
-        if self.second_pass_reocr_mode == "bank_statement" and merged_cells:
-            if bbox_row is not None and bbox_row > header_row and not (t or "").strip():
-                if "body_row_empty" not in reasons:
-                    reasons.append("body_row_empty")
-            if bbox_row is not None and bbox_row > header_row:
-                same_row_nonempty = 0
-                for j, other in enumerate(merged_cells):
-                    if int(other.get("row", -1)) != bbox_row:
-                        continue
-                    ot = (texts[j] if j < len(texts) else "").strip()
-                    if ot:
-                        same_row_nonempty += 1
-                if (
-                    not (t or "").strip()
-                    and same_row_nonempty >= self.second_pass_row_peer_min_nonempty
-                    and "row_peer_nonempty" not in reasons
-                ):
-                    reasons.append("row_peer_nonempty")
+        # ── bank_statement 空单元格逻辑:仅对无值的格生效 ──
+        cell_empty = not (t or "").strip()
+        if self.second_pass_reocr_mode == "bank_statement" and merged_cells and cell_empty:
+            if bbox_row is not None and bbox_row == header_row:
+                if "header_row_empty" not in reasons:
+                    reasons.append("header_row_empty")
+            elif bbox_row is not None and bbox_row > header_row:
+                if bbox_col is not None and bbox_col >= 0:
+                    col_empty_ratio = self._column_empty_ratio(
+                        merged_cells, matched_boxes_list or [], bbox_col, header_row
+                    )
+                    if col_empty_ratio < 0.5:
+                        # 该列表体大部分格子有值 → 本格为空可能是 OCR 遗漏
+                        if "body_row_empty_column_mostly_filled" not in reasons:
+                            reasons.append("body_row_empty_column_mostly_filled")
+                    else:
+                        # 该列表体本来就多为空 → 不触发二次 OCR
+                        return False, []
+                else:
+                    if "body_row_empty" not in reasons:
+                        reasons.append("body_row_empty")
 
         if not reasons:
-            if (not t or not t.strip()) and sc < 0.95 and pdf_type != "txt":
+            if cell_empty and sc < 0.95 and pdf_type != "txt":
                 reasons.append("empty_low_score")
 
         return bool(reasons), reasons
 
+    @staticmethod
+    def _is_ocr_vertically_incomplete(
+        cell_bbox: List[float],
+        matched_boxes: List[Dict[str, Any]],
+        *,
+        y_deviation_ratio: float = 1.0 / 3.0,
+        max_merged_height_ratio: float = 0.7,
+        margin_asymmetry_threshold: float = 0.3,
+    ) -> bool:
+        """已匹配 OCR box 纵向分布不完整 → 需二次 OCR。
+
+        银行流水单元格文字纵向居中。两种判定互补:
+        1. y_center 偏移:合并 bbox 的 y_center 偏离 cell y_center 超过阈值;
+        2. 空白不对称:文字紧贴单元格顶部/底部,上下空白比差异过大
+           (如仅识别到"支行"两个偏底部的字)。
+
+        Returns:
+            True 表示一验 OCR 结果可能纵向不完整
+        """
+        if not matched_boxes:
+            return False
+        all_ys: List[float] = []
+        for m in matched_boxes:
+            b = m.get("bbox") or []
+            if len(b) >= 4:
+                all_ys.extend([float(b[1]), float(b[3])])
+        if not all_ys:
+            return False
+
+        merged_y1, merged_y2 = min(all_ys), max(all_ys)
+        merged_yc = (merged_y1 + merged_y2) / 2.0
+        merged_h = merged_y2 - merged_y1
+
+        cell_y1, cell_y2 = cell_bbox[1], cell_bbox[3]
+        cell_yc = (cell_y1 + cell_y2) / 2.0
+        cell_h = max(cell_y2 - cell_y1, 1.0)
+
+        # 合并 bbox 已覆盖单元格大部 → 不触发
+        if merged_h >= cell_h * max_merged_height_ratio:
+            return False
+
+        # 条件一:y_center 偏移过大
+        deviation = abs(merged_yc - cell_yc)
+        if deviation > cell_h * y_deviation_ratio:
+            return True
+
+        # 条件二:上下空白不对称(文字贴在顶部或底部)
+        top_gap = merged_y1 - cell_y1
+        bottom_gap = cell_y2 - merged_y2
+        asymmetry = abs((top_gap / cell_h) - (bottom_gap / cell_h))
+        return asymmetry > margin_asymmetry_threshold
+
+
     def _save_cell_ocr_debug(
         self,
         cell_ocr_dir: str,
@@ -1002,15 +1094,60 @@ class TextFiller:
         return idx_part
 
     @staticmethod
+    def _is_bbox_slanted(
+        original_box: Dict[str, Any],
+        *,
+        angle_threshold: float = 10.0,
+    ) -> bool:
+        """判断 OCR 框是否斜向(如水印),基于原始多边形角度检测。
+
+        银行流水水印通常为斜向(30–45°),正常文字接近水平(0–2°)。
+        若上边与水平面的夹角超过 angle_threshold,视为斜框应丢弃。
+
+        Args:
+            original_box: OCR 原始 box 字典(包含 original_bbox 或 bbox 字段)
+            angle_threshold: 角度阈值(度),> 此值视为斜向
+
+        Returns:
+            True 表示该框是斜向的
+        """
+        poly = original_box.get("poly") or original_box.get("original_bbox") or []
+        if not poly:
+            logger.warning(f"OCR box 没有 poly 字段,视为水平框: original_box: {original_box}")
+            return False
+
+        # 多边形格式 [[x1,y1],[x2,y2],[x3,y3],[x4,y4]]
+        if isinstance(poly[0], (list, tuple)) and len(poly) > 1:
+            dx = float(poly[1][0] - poly[0][0])
+            dy = float(poly[1][1] - poly[0][1])
+        elif len(poly) >= 8:
+            # [x1,y1,x2,y2,x3,y3,x4,y4] 平面格式
+            dx = float(poly[2] - poly[0])
+            dy = float(poly[3] - poly[1])
+        else:
+            # [x1,y1,x2,y2] 轴对齐格式,无法判断 → 不算斜向
+            return False
+
+        if abs(dx) < 1e-6:
+            angle_deg = 90.0
+        else:
+            angle_rad = math.atan2(dy, dx)
+            angle_deg = abs(math.degrees(angle_rad))
+
+        return angle_deg > angle_threshold
+
+    @staticmethod
     def _resolve_cell_matched_boxes(
         matched: List[Tuple[str, float, float, float, float, Dict[str, Any]]],
         *,
         cell_idx: Optional[int] = None,
         y_tolerance: int = 5,
         inside_ratio: float = 0.7,
+        slanted_angle_threshold: float = 10.0,
     ) -> Tuple[List[Tuple[str, float, float, float, float, Dict[str, Any]]], bool]:
         """
         处理同格内嵌套 OCR 框:
+        - 丢弃斜向 OCR 框(水印,角度 > slanted_angle_threshold°);
         - 大框有字、小框在内:丢弃小框,保留大框文本;
         - 大框无字、小框在内:丢弃小框,整格 score 置 0(触发二次 OCR)。
         """
@@ -1021,6 +1158,14 @@ class TextFiller:
 
         entries: List[Dict[str, Any]] = []
         for text, y1, x1, overlap_ratio, score, original_box in matched:
+            # 过滤斜向水印框
+            if TextFiller._is_bbox_slanted(original_box, angle_threshold=slanted_angle_threshold):
+                logger.debug(
+                    f"cell={cell_idx} 丢弃斜向 OCR 框: "
+                    f"text={text!r}, {TextFiller._ocr_box_debug_tag(original_box)}"
+                )
+                continue
+
             bbox = TextFiller._bbox_from_ocr_original_box(original_box)
             entries.append(
                 {
@@ -1217,21 +1362,38 @@ class TextFiller:
                     ))
             
             if matched:
-                matched, force_zero_score = self._resolve_cell_matched_boxes(
-                    matched, cell_idx=idx
-                )
-                if matched:
-                    texts[idx] = "".join(
-                        [(t or "").strip() for t, _, _, _, _, _ in matched]
-                    )
-                    avg_score = sum(s for _, _, _, _, s, _ in matched) / len(matched)
-                    scores[idx] = 0.0 if force_zero_score else avg_score
-                    matched_boxes_list[idx] = [
-                        box for _, _, _, _, _, box in matched
-                    ]
-                else:
+                # 先检测纵向不完整 → 交二次 OCR 从头识别
+                raw_boxes: List[Dict[str, Any]] = []
+                for m in matched:
+                    ob = m[5]
+                    b = ob.get("bbox") or []
+                    if b and len(b) >= 4:
+                        if not isinstance(b[0], (int, float)):
+                            b = CoordinateUtils.poly_to_bbox(b)
+                        raw_boxes.append({"bbox": b})
+                if raw_boxes and self._is_ocr_vertically_incomplete(cell_bbox, raw_boxes):
                     texts[idx] = ""
                     scores[idx] = 0.0
+                    matched_boxes_list[idx] = []
+                    if idx not in need_reocr_indices:
+                        logger.debug(f"单元格[{idx}]检测到 OCR box 纵向不完整,需要二次 OCR: {ocr_item['text'][:20]}...")
+                        need_reocr_indices.append(idx)
+                else:
+                    matched, force_zero_score = self._resolve_cell_matched_boxes(
+                        matched, cell_idx=idx
+                    )
+                    if matched:
+                        texts[idx] = "".join(
+                            [(t or "").strip() for t, _, _, _, _, _ in matched]
+                        )
+                        avg_score = sum(s for _, _, _, _, s, _ in matched) / len(matched)
+                        scores[idx] = 0.0 if force_zero_score else avg_score
+                        matched_boxes_list[idx] = [
+                            box for _, _, _, _, _, box in matched
+                        ]
+                    else:
+                        texts[idx] = ""
+                        scores[idx] = 0.0
             else:
                 scores[idx] = 0.0 # 无匹配文本,置信度为0
 
@@ -1415,6 +1577,7 @@ class TextFiller:
         output_dir: Optional[str] = None,
         debug_prefix: Optional[str] = None,
         merged_cells: Optional[List[Dict[str, Any]]] = None,
+        matched_boxes_list: Optional[List[List[Dict[str, Any]]]] = None,
     ) -> List[str]:
         """
         二次OCR:分行 det+rec + 整格/条带兜底 + 低分笔画增强重试。
@@ -1450,6 +1613,7 @@ class TextFiller:
                     scores,
                     need_reocr_indices,
                     merged_cells,
+                    matched_boxes_list,
                     pdf_type,
                     force_all,
                     header_row,
@@ -1502,6 +1666,7 @@ class TextFiller:
                     pass1, cell_img, dyn_th
                 )
                 if do_retry:
+                    logger.debug(f"do_retry: 触发低分/难例再试: {retry_reasons}")
                     enhance_info["triggered"] = True
                     enhance_info["reason"] = retry_reasons
                     enhanced_img, enhance_stages = self._preprocess_cell_for_ocr(
@@ -1518,6 +1683,7 @@ class TextFiller:
                     result["pass1"] = pass1.get("pass1")
                     result["pass2"] = pass2.get("pass2")
                     enhance_info["pass2"] = result.get("pass2")
+                    logger.debug(f"do_retry: 二次OCR cell_idx={cell_idx} 结果: {result}")
                 result["enhance_retry"] = enhance_info
 
                 debug_img, _ = self._preprocess_cell_for_ocr(

+ 281 - 17
ocr_tools/universal_doc_parser/tests/test_second_pass_ocr_aggregate.py

@@ -225,36 +225,258 @@ class TestBankStatementReocrTrigger:
     def _filler(self) -> TextFiller:
         return TextFiller(
             ocr_engine=None,
-            config={
-                "second_pass_ocr": {
-                    "reocr_mode": "bank_statement",
-                    "header_row": 0,
-                    "row_peer_min_nonempty": 3,
-                }
-            },
+            config={"second_pass_ocr": {"reocr_mode": "bank_statement"}},
         )
 
-    def test_body_row_empty_triggers(self):
+    def test_header_empty_row_triggers(self):
+        """表头行空单元格触发 header_row_empty。"""
+        f = self._filler()
+        merged = [{"row": 0, "col": 0, "bbox": [0, 0, 10, 10]}]
+        ok, reasons = f._should_second_pass_cell(
+            0, [""], [0.99], [], merged, None, "ocr", False, 0
+        )
+        assert ok is True
+        assert "header_row_empty" in reasons
+        assert "body_row_empty" not in reasons
+
+    def test_body_row_empty_column_mostly_filled_triggers(self):
+        """列表体大部分有值、本格为空 → body_row_empty_column_mostly_filled。"""
         f = self._filler()
         merged = [
-            {"row": 0, "col": 0, "bbox": [0, 0, 10, 10]},
             {"row": 1, "col": 0, "bbox": [0, 10, 10, 20]},
+            {"row": 2, "col": 0, "bbox": [0, 20, 10, 30]},
+            {"row": 3, "col": 0, "bbox": [0, 30, 10, 40]},
         ]
-        texts = ["header", ""]
-        scores = [0.99, 0.0]
+        texts = ["有值", "有值", ""]
+        scores = [0.99, 0.99, 0.0]
+        matched = [[{"text": "有值"}], [{"text": "有值"}], []]
         ok, reasons = f._should_second_pass_cell(
-            1, texts, scores, [], merged, "ocr", False, 0
+            2, texts, scores, [], merged, matched, "ocr", False, 0
         )
         assert ok is True
-        assert "body_row_empty" in reasons
+        assert "body_row_empty_column_mostly_filled" in reasons
 
-    def test_header_empty_not_body_row_forced(self):
+    def test_column_mostly_empty_skips(self):
+        """该列表体大多为空 → 不触发二次 OCR。"""
         f = self._filler()
-        merged = [{"row": 0, "col": 0, "bbox": [0, 0, 10, 10]}]
+        merged = [
+            {"row": 1, "col": 0, "bbox": [0, 10, 10, 20]},
+            {"row": 2, "col": 0, "bbox": [0, 20, 10, 30]},
+            {"row": 3, "col": 0, "bbox": [0, 30, 10, 40]},
+        ]
+        texts = ["", "", ""]
+        scores = [0.0, 0.0, 0.0]
+        matched = [[], [], []]
+        ok, _ = f._should_second_pass_cell(
+            2, texts, scores, [], merged, matched, "ocr", False, 0
+        )
+        assert ok is False
+
+    def test_non_empty_cell_not_affected_by_bank_statement(self):
+        """银行流水模式下,有值格不进入空单元格逻辑。"""
+        f = self._filler()
+        merged = [
+            {"row": 1, "col": 0, "bbox": [0, 10, 10, 20]},
+            {"row": 1, "col": 1, "bbox": [10, 10, 20, 20]},
+        ]
+        texts = ["有值", ""]
+        scores = [0.99, 0.0]
         ok, reasons = f._should_second_pass_cell(
-            0, [""], [0.99], [], merged, "ocr", False, 0
+            0, texts, scores, [], merged, None, "ocr", False, 0
         )
-        assert "body_row_empty" not in reasons
+        assert ok is False  # 有值不触发
+
+
+class TestEnhanceRetryUpscale:
+    def test_enhance_retry_upscale_min_side_from_yaml(self):
+        f = TextFiller(
+            ocr_engine=None,
+            config={
+                "second_pass_ocr": {
+                    "cell_preprocess": {
+                        "upscale_min_side": 96,
+                        "enhance_retry": {
+                            "enabled": True,
+                            "upscale_min_side": 128,
+                        },
+                    }
+                }
+            },
+        )
+        assert f.second_pass_light_upscale_min == 96
+        assert f.second_pass_enhance_upscale_min == 128
+
+    def test_enhance_upscale_defaults_to_pass1(self):
+        f = TextFiller(
+            ocr_engine=None,
+            config={
+                "second_pass_ocr": {
+                    "cell_preprocess": {"upscale_min_side": 192},
+                }
+            },
+        )
+        assert f.second_pass_light_upscale_min == 192
+        assert f.second_pass_enhance_upscale_min == 192
+
+    def test_preprocess_uses_different_upscale_for_enhance(self):
+        import numpy as np
+
+        f = TextFiller(
+            ocr_engine=None,
+            config={
+                "second_pass_ocr": {
+                    "cell_preprocess": {
+                        "upscale_min_side": 64,
+                        "enhance_retry": {"enabled": True, "upscale_min_side": 128},
+                    }
+                }
+            },
+        )
+        small = np.zeros((40, 30, 3), dtype=np.uint8)
+        light, _ = f._preprocess_cell_for_ocr(small, mode="light")
+        enhance, _ = f._preprocess_cell_for_ocr(small, mode="enhance")
+        assert max(light.shape[:2]) >= 64
+        assert max(enhance.shape[:2]) >= 128
+
+
+class TestOcrVerticallyIncomplete:
+    """银行流水单元格 OCR 纵向完整性判断:
+    - y_center 偏移检测
+    - 空白不对称检测(文字贴在顶部/底部时触发)
+    """
+
+    def test_center_aligned_no_trigger(self):
+        """匹配到的 OCR 纵向居中 → 不标记不完整。"""
+        cell_bbox = [100, 50, 200, 90]  # yc=70, h=40
+        matched = [{"bbox": [110, 60, 190, 80]}]  # yc=70, h=20 → 居中
+        assert TextFiller._is_ocr_vertically_incomplete(cell_bbox, matched) is False
+
+    def test_ocr_on_bottom_triggers(self):
+        """匹配到的 OCR 偏底部 → 标记不完整(y_center 偏移 + 空白不对称均触发)。"""
+        cell_bbox = [100, 50, 200, 90]  # yc=70, h=40, threshold=40/3≈13.33
+        matched = [{"bbox": [110, 80, 190, 88]}]  # yc=(80+88)/2=84, 偏离|84-70|=14>13.33
+        assert TextFiller._is_ocr_vertically_incomplete(cell_bbox, matched) is True
+
+    def test_empty_matched_returns_false(self):
+        assert TextFiller._is_ocr_vertically_incomplete([0, 0, 10, 10], []) is False
+
+    def test_tall_matched_text_not_triggered(self):
+        """已匹配 OCR 合并高度超过 cell_h * 0.7 → 认为已够全,不触发。"""
+        cell_bbox = [100, 50, 200, 90]  # h=40
+        matched = [{"bbox": [110, 51, 190, 87]}]  # h=36 > 40*0.7=28 → 不触发
+        assert TextFiller._is_ocr_vertically_incomplete(cell_bbox, matched) is False
+
+    def test_margin_asymmetry_short_text_on_bottom_triggers(self):
+        """短文本偏底部:y_center 偏差不达标,但空白不对称触发(如"支行")。"""
+        # cell y=[144.59, 230.38], h=85.79, yc=187.48
+        # text y=[190, 226], h=36, yc=208
+        # y_center 偏差: |208-187.48|=20.52 < 85.79/3≈28.6 → 不触发
+        # 上方空白: 190-144.59=45.41 → 52.9%
+        # 下方空白: 230.38-226=4.38 → 5.1%
+        # 不对称度: |0.529-0.051|=0.478 > 0.3 → 触发
+        cell_bbox = [900, 144.59, 1100, 230.38]
+        matched = [{"bbox": [940.0, 190.0, 999.0, 226.0]}]
+        assert TextFiller._is_ocr_vertically_incomplete(cell_bbox, matched) is True
+
+    def test_margin_asymmetry_on_top_triggers(self):
+        """短文本偏顶部 → 空白不对称触发。"""
+        cell_bbox = [100, 50, 200, 90]  # h=40
+        matched = [{"bbox": [110, 50, 190, 58]}]  # 贴在顶部
+        assert TextFiller._is_ocr_vertically_incomplete(cell_bbox, matched) is True
+
+    def test_margin_centered_no_asymmetry(self):
+        """文字居中且空白对称 → 不触发(如"广东农信"完美居中)。"""
+        cell_bbox = [100, 50, 200, 90]  # h=40
+        # 上方空白≈14.5, 下方空白≈14.5 → 对称
+        matched = [{"bbox": [110, 64.5, 190, 75.5]}]
+        assert TextFiller._is_ocr_vertically_incomplete(cell_bbox, matched) is False
+
+    def test_margin_slightly_off_center_no_trigger(self):
+        """文字轻微偏离但不对称度在阈值内 → 不触发。"""
+        cell_bbox = [100, 50, 200, 90]  # h=40
+        # 上方空白=9, 下方空白=11 → 不对称度=|0.225-0.275|=0.05 < 0.3
+        matched = [{"bbox": [110, 59, 190, 79]}]
+        assert TextFiller._is_ocr_vertically_incomplete(cell_bbox, matched) is False
+
+
+class TestColumnEmptyRatio:
+    def test_mixed(self):
+        merged = [
+            {"row": 1, "col": 0}, {"row": 2, "col": 0}, {"row": 3, "col": 0},
+        ]
+        # 第 3 个无 OCR 框命中 → 算空
+        matched = [[{"text": "a"}], [{"text": "b"}], []]
+        r = TextFiller._column_empty_ratio(merged, matched, 0, 0)
+        assert abs(r - 1 / 3) < 0.01
+
+    def test_all_empty(self):
+        merged = [{"row": 1, "col": 0}, {"row": 2, "col": 0}]
+        matched = [[], []]
+        r = TextFiller._column_empty_ratio(merged, matched, 0, 0)
+        assert abs(r - 1.0) < 0.01
+
+    def test_header_excluded(self):
+        merged = [
+            {"row": 0, "col": 0},
+            {"row": 1, "col": 0},
+            {"row": 2, "col": 0},
+        ]
+        matched = [[], [{"text": "a"}], [{"text": "b"}]]
+        r = TextFiller._column_empty_ratio(merged, matched, 0, 0)
+        assert abs(r - 0.0) < 0.01
+
+    def test_ocr_box_hit_but_text_empty_not_counted_empty(self):
+        """OCR 检测到框但未识别出文字 → 不应计为空。"""
+        merged = [{"row": 1, "col": 0}, {"row": 2, "col": 0}]
+        matched = [
+            [{"text": "", "bbox": [0, 0, 10, 10], "confidence": 1.0}],
+            [{"text": "a", "bbox": [0, 20, 10, 30], "confidence": 0.99}],
+        ]
+        r = TextFiller._column_empty_ratio(merged, matched, 0, 0)
+        assert abs(r - 0.0) < 0.01  # 两格都有 OCR 框命中
+
+
+class TestIsBboxSlanted:
+    """OCR 斜框检测:水印等斜向文本框应被过滤。"""
+
+    def test_slanted_poly_list_format(self):
+        # 上边角度 ≈ 31°
+        box = {"original_bbox": [[0, 0], [50, 31], [50, 51], [0, 20]]}
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is True
+
+    def test_horizontal_poly_list_format(self):
+        # 几乎水平 (dy=2, dx=50 → angle≈2.3°)
+        box = {"original_bbox": [[0, 0], [50, 2], [50, 52], [0, 50]]}
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is False
+
+    def test_slanted_flat8_format(self):
+        """8值平面格式:[x1,y1,x2,y2,x3,y3,x4,y4],上边 ≈35°"""
+        box = {"original_bbox": [0, 0, 50, 35, 50, 55, 0, 20]}
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is True
+
+    def test_horizontal_flat8_format(self):
+        box = {"original_bbox": [0, 0, 50, 1, 50, 51, 0, 50]}
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is False
+
+    def test_axis_aligned_bbox_not_slanted(self):
+        """轴对齐 bbox [x1,y1,x2,y2] 无法判断角度 → 不算斜向"""
+        box = {"original_bbox": [0, 0, 50, 20]}
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is False
+
+    def test_empty_poly_not_slanted(self):
+        box: dict = {}
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is False
+
+    def test_fallback_to_bbox_key(self):
+        """没有 original_bbox 时回退到 bbox 字段"""
+        box = {"poly": [[0, 0], [50, 31], [50, 51], [0, 20]]}
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is True
+
+    def test_custom_threshold(self):
+        """阈值调大后同一框不再斜向"""
+        box = {"original_bbox": [[0, 0], [50, 15], [50, 35], [0, 20]]}  # ≈16.7°
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=10.0) is True
+        assert TextFiller._is_bbox_slanted(box, angle_threshold=20.0) is False
 
 
 class TestResolveCellMatchedBoxes:
@@ -302,6 +524,48 @@ class TestResolveCellMatchedBoxes:
         assert len(resolved) == 1
         assert resolved[0][0] == "广东兴宁农村商业银"
 
+    def test_slanted_watermark_filtered(self):
+        """斜向水印框被丢弃,仅保留正常水平框。"""
+        # 斜框:上边 ≈ 31°,text="有限公司"
+        slanted_box = {
+            "bbox": [50, 20, 100, 52],
+            "original_bbox": [[50, 20], [100, 51], [100, 71], [50, 40]],
+            "text": "有限公司",
+            "confidence": 0.9,
+        }
+        entry_slanted = (
+            "有限公司",
+            slanted_box["bbox"][1],  # y1
+            slanted_box["bbox"][0],  # x1
+            1.0,
+            0.9,
+            slanted_box,
+        )
+        # 水平框
+        normal = self._matched_entry("200.00", [55.0, 30.0, 95.0, 45.0], 0.95)
+        matched = [entry_slanted, normal]
+        resolved, force_zero = TextFiller._resolve_cell_matched_boxes(
+            matched, cell_idx=10, slanted_angle_threshold=10.0
+        )
+        assert force_zero is False
+        assert len(resolved) == 1
+        assert resolved[0][0] == "200.00"
+
+    def test_all_slanted_returns_empty(self):
+        """全部斜框 → entries 为空 → 返回值也为空。"""
+        box = {
+            "bbox": [50, 20, 100, 52],
+            "original_bbox": [[50, 20], [100, 51], [100, 71], [50, 40]],
+            "text": "有限公司",
+            "confidence": 0.9,
+        }
+        entry = ("有限公司", box["bbox"][1], box["bbox"][0], 1.0, 0.9, box)
+        resolved, force_zero = TextFiller._resolve_cell_matched_boxes(
+            [entry], slanted_angle_threshold=10.0
+        )
+        assert resolved == []
+        assert force_zero is False
+
     def test_fill_by_center_point_empty_container(self):
         filler = TextFiller(ocr_engine=None, config={})
         cell = [900.0, 1580.0, 1100.0, 1680.0]