Selaa lähdekoodia

feat: 添加全局动态规划算法以优化HTML表格行与PaddleOCR文本行的匹配

zhch158_admin 3 päivää sitten
vanhempi
commit
f4ac7d0dc7
1 muutettua tiedostoa jossa 333 lisäystä ja 0 poistoa
  1. 333 0
      merger/Tablecells匹配-动态规划.md

+ 333 - 0
merger/Tablecells匹配-动态规划.md

@@ -0,0 +1,333 @@
+基于您提供的 table_cell_matcher.py 代码,`_match_html_rows_to_paddle_groups` 方法实现了一个**全局动态规划 (Global Dynamic Programming)** 算法。
+
+这个算法的核心目标是解决两个序列(HTML 表格行序列 vs PaddleOCR 文本行序列)的**非线性对齐**问题。
+
+以下是该算法的详细原理解析:
+
+### 1. 核心思想:为什么用动态规划?
+
+传统的“贪婪算法”是逐行匹配,如果第 1 行匹配错了,第 2 行就会基于错误的位置继续找,导致“一步错,步步错”。
+
+**动态规划 (DP)** 的思想是:**不急着做决定**。它会计算所有可能的匹配路径的得分,最后回溯找出一条**总分最高**的路径。即使中间某一行匹配分值较低,只要它能让整体结构最合理,DP 就会选择它。
+
+### 2. 算法状态定义
+
+在代码中,`dp` 矩阵定义如下:
+
+*   **维度**:`dp[n_html][n_paddle]`
+*   **含义**:`dp[i][j]` 表示 **“HTML 的前 `i` 行”** 成功匹配到了 **“Paddle 的前 `j` 组”** 时,所能获得的**最大累计得分**。
+*   **值**:
+    *   `-inf` (负无穷):表示此状态不可达(例如 HTML 第 5 行不可能匹配到 Paddle 第 1 组,因为顺序不对)。
+    *   `float`:表示当前的累计相似度分数。
+
+### 3. 算法执行流程
+
+#### 第一步:预计算 (Pre-computation)
+为了性能,代码预先计算了 Paddle 组的合并文本。
+*   `merged_cache[(j, k)]`:存储从 Paddle 第 `j` 组开始,合并 `k` 个组后的文本。
+*   这避免了在多重循环中反复进行字符串拼接。
+
+#### 第二步:初始化 (Initialization)
+处理 HTML 的第 0 行 (`i=0`)。
+*   尝试将 HTML 第 0 行匹配 Paddle 的第 `0` 到 `SEARCH_WINDOW` 组。
+*   允许合并 `1` 到 `MAX_MERGE` (4) 个 Paddle 组。
+*   **得分计算**:`相似度 - 跳过惩罚`。
+*   如果得分有效,填入 `dp[0][end_j]`。
+
+#### 第三步:状态转移 (State Transition) - 核心循环
+这是算法最复杂的部分,遍历每一行 HTML (`i` 从 1 到 N)。对于当前行,有两种选择:
+
+**选择 A:跳过当前 HTML 行 (The "Skip" Strategy)**
+*   **场景**:HTML 有这一行,但 OCR 漏识别了,或者 OCR 顺序错乱导致当前位置找不到对应的 OCR 组。
+*   **逻辑**:直接继承上一行的最佳状态,但扣除 `SKIP_HTML_PENALTY`。
+*   **代码**:
+    ```python
+    score_skip = dp[i-1][prev_j] - SKIP_HTML_PENALTY
+    if score_skip > dp[i][prev_j]:
+        dp[i][prev_j] = score_skip
+        path[(i, prev_j)] = (prev_j, prev_j + 1) # 标记未消耗新组
+    ```
+*   **作用**:防止“链条断裂”。即使这一行匹配失败,状态也能传递给下一行。
+
+**选择 B:匹配 Paddle 组 (The "Match" Strategy)**
+*   **场景**:正常匹配。
+*   **逻辑**:
+    1.  找到上一行所有有效的结束位置 `prev_j`。
+    2.  **Gap (跳过)**:允许跳过中间的一些 Paddle 组(可能是噪音或页眉),即 `start_j = prev_j + 1 + gap`。
+    3.  **Merge (合并)**:尝试将 `start_j` 开始的 `1` 到 `4` 个 Paddle 组视为一行。
+    4.  **计算得分**:
+        $$Score = Similarity(HTML\_Text, Paddle\_Text) - Gap\_Penalty - Length\_Penalty$$
+    5.  **转移方程**:
+        $$dp[i][end\_j] = \max(dp[i][end\_j], \ dp[i-1][prev\_j] + Score)$$
+*   **代码**:
+    ```python
+    total_score = prev_score + current_score
+    if total_score > dp[i][end_j]:
+        dp[i][end_j] = total_score
+        path[(i, end_j)] = (prev_j, start_j) # 记录路径
+    ```
+
+#### 第四步:回溯 (Backtracking)
+当填满 `dp` 表后,我们需要找出最优路径:
+1.  **找终点**:在最后一行 HTML (`n_html-1`) 中找到得分最高的 `j`。如果最后一行没匹配上,就往前找倒数第二行,以此类推。
+2.  **倒推**:利用 `path` 字典,从终点一步步回到起点。
+    *   `path[(i, j)]` 存储了 `(prev_j, start_j)`。
+    *   如果 `start_j <= j`:说明 HTML 行 `i` 对应了 Paddle 组 `[start_j, ..., j]`。
+    *   如果 `start_j > j`:说明 HTML 行 `i` 被跳过了(对应空列表)。
+
+#### 第五步:孤儿挽救 (Orphan Rescue)
+DP 找的是“主干路径”,可能会漏掉一些没能进入最优路径的 Paddle 组(Orphans)。
+*   **逻辑**:遍历所有未使用的 Paddle 组,看它在物理位置上(Y坐标)离哪个已匹配的组最近(上方或下方)。
+*   **归属**:将其强行合并到最近的 HTML 行中。这解决了 OCR 过度切分导致的小碎片丢失问题。
+
+### 4. 关键参数解析
+
+*   **`MAX_MERGE = 4`**:
+    *   允许一行 HTML 对应最多 4 行 OCR 文本。解决 OCR 将长表格行切分成多行的问题。
+*   **`SEARCH_WINDOW = 15`**:
+    *   允许 OCR 结果相对于 HTML 出现较大的位移或中间插入了大量噪音(如印章干扰)。
+*   **`SKIP_HTML_PENALTY = 0.3`**:
+    *   这是您之前遇到问题的关键修复。允许算法以 0.3 分的代价放弃一行 HTML,从而保全后续的匹配。
+*   **`SKIP_PADDLE_PENALTY = 0.1`**:
+    *   允许跳过 OCR 中的噪音行,但跳过越多扣分越多,倾向于连续匹配。
+
+### 5. 总结
+
+这个算法将表格行匹配问题转化为了一个**寻找最大权值路径**的图论问题。
+
+*   **节点**:`(HTML行索引, Paddle组索引)`
+*   **边**:匹配操作(得分)或跳过操作(罚分)。
+*   **目标**:找到从第 0 行到第 N 行的总分最高路线。
+
+这种方法极其鲁棒,能够同时处理:OCR 分行、OCR 漏行、OCR 噪音、HTML 空行以及轻微的顺序错乱问题。
+
+---
+
+# 行级动态规划 (Row-Level DP) 
+
+1.  **贪婪算法 (Greedy)**:它是一个接一个匹配的。如果第一个单元格错误地“吞掉”了本属于第二个单元格的 Box(或者像之前的案例,短日期匹配到了长 ID),后面的单元格就无路可走了。
+2.  **逻辑耦合 (Coupling)**:匹配流程控制(循环、指针移动)与 相似度评分逻辑(各种防御、加分、扣分)深度纠缠在一起,导致代码臃肿且难以维护。
+
+### 🚀 更好的思路:行级动态规划 (Row-Level DP)
+
+既然你已经在行与行的匹配中使用了 DP,**在行内的单元格匹配中,DP 同样是终极解决方案。**
+
+**核心思想**:
+不要问“当前单元格最匹配哪个 Box”,而要问“**如何将这一行的 M 个 Box 分配给 N 个单元格,使得整体匹配得分最高**”。
+
+这样,如果“短日期”匹配“长ID”会导致“长ID”对应的单元格没东西匹配(总分变低),DP 就会自动放弃这个错误匹配,选择全局最优解。
+
+### 🛠️ 重构方案
+
+我们将代码拆分为两个清晰的部分:
+1.  **纯粹的评分函数 (`_compute_match_score`)**:只负责计算“文本 A”和“文本 B”的匹配度,包含所有的防御逻辑(长度、类型、子序列等)。
+2.  **DP 匹配器 (`_match_cells_in_row_dp`)**:只负责路径规划,决定哪个 Box 归哪个 Cell。
+
+#### 1. 提取评分逻辑 (解耦与瘦身)
+
+把所有复杂的 `if/else` 防御逻辑移到这里。
+
+```python
+    def _compute_match_score(self, cell_text: str, box_text: str) -> float:
+        """
+        纯粹的评分函数:计算单元格文本与候选 Box 文本的匹配得分
+        包含所有防御逻辑、加权逻辑
+        """
+        # 1. 预处理
+        cell_norm = self.text_matcher.normalize_text(cell_text)
+        box_norm = self.text_matcher.normalize_text(box_text)
+        
+        if not cell_norm or not box_norm:
+            return 0.0
+            
+        # --- ⚡️ 快速防御 (Fast Fail) ---
+        len_cell = len(cell_norm)
+        len_box = len(box_norm)
+        
+        # 长度差异过大直接 0 分 (除非是包含关系且特征明显)
+        if len_box > len_cell * 3 + 5:
+            # 除非 cell 很长 (长ID),否则不许短 cell 匹配超长 box
+            if len_cell < 5: 
+                return 0.0
+
+        # --- 🔍 核心相似度计算 ---
+        cell_proc = self._preprocess_text_for_matching(cell_text)
+        box_proc = self._preprocess_text_for_matching(box_text)
+        
+        # A. Token Sort (解决乱序)
+        score_sort = fuzz.token_sort_ratio(cell_proc, box_proc)
+        
+        # B. Partial (解决截断/包含)
+        score_partial = fuzz.partial_ratio(cell_norm, box_norm)
+        
+        # C. Subsequence (解决噪音插入)
+        score_subseq = 0.0
+        if len_cell > 5:
+            score_subseq = self._calculate_subsequence_score(cell_norm, box_norm)
+
+        # --- 🛡️ 深度防御逻辑 (Deep Defense) ---
+        
+        # 1. 短文本防御 (防止 "1" 匹配 "1000")
+        if score_partial > 80:
+            import re
+            has_content = lambda t: bool(re.search(r'[a-zA-Z0-9\u4e00-\u9fa5]', t))
+            
+            # 纯符号防御
+            if not has_content(cell_norm) and has_content(box_norm):
+                if len_box > len_cell + 2: score_partial = 0.0
+            
+            # 微小碎片防御
+            elif len_cell <= 2 and len_box > 8:
+                score_partial = 0.0
+                
+            # 覆盖率防御 (防止 "2024" 匹配 "ID2024...")
+            else:
+                coverage = len_cell / len_box
+                if coverage < 0.3 and score_sort < 45:
+                    score_partial = 0.0
+
+        # 2. 子序列防御 (防止短数字匹配长ID)
+        if score_subseq > 80:
+            if len_box > len_cell * 1.5:
+                import re
+                # 如果是数字类型且较短
+                if re.match(r'^[\d\-\:\.\s]+$', cell_norm) and len_cell < 12:
+                    score_subseq = 0.0
+
+        # --- 📊 综合评分 ---
+        final_score = max(score_sort, score_partial, score_subseq)
+        
+        # 精确匹配奖励
+        if cell_norm == box_norm:
+            final_score = 100.0
+        # 包含关系奖励
+        elif cell_norm in box_norm:
+            final_score = min(100, final_score + 5)
+            
+        return final_score
+```
+
+#### 2. 实现行内 DP (解决贪婪问题)
+
+用 DP 替代 `_match_cell_sequential`。
+
+```python
+    def _match_cells_in_row_dp(self, html_cells: List, row_boxes: List[Dict]) -> List[Dict]:
+        """
+        使用动态规划进行行内单元格匹配
+        目标:找到一种分配方案,使得整行的匹配总分最高
+        """
+        n_cells = len(html_cells)
+        n_boxes = len(row_boxes)
+        
+        # dp[i][j] 表示:前 i 个单元格 消耗了 前 j 个 boxes 的最大得分
+        dp = np.full((n_cells + 1, n_boxes + 1), -np.inf)
+        dp[0][0] = 0
+        
+        # path[i][j] = (prev_j, matched_info) 用于回溯
+        path = {}
+        
+        # 允许合并的最大 box 数量
+        MAX_MERGE = 5 
+        
+        for i in range(1, n_cells + 1):
+            cell = html_cells[i-1]
+            cell_text = cell.get_text(strip=True)
+            
+            # 优化:如果单元格为空,直接继承状态,不消耗 box
+            if not cell_text:
+                for j in range(n_boxes + 1):
+                    if dp[i-1][j] > -np.inf:
+                        dp[i][j] = dp[i-1][j]
+                        path[(i, j)] = (j, None) # None 表示空匹配
+                continue
+
+            # 遍历当前 box 指针 j
+            for j in range(n_boxes + 1):
+                # 1. 策略:跳过当前单元格 (Cell Missing)
+                # 继承 dp[i-1][j]
+                if dp[i-1][j] > dp[i][j]:
+                    dp[i][j] = dp[i-1][j]
+                    path[(i, j)] = (j, None)
+
+                # 2. 策略:匹配 k 个 boxes (从 j-k 到 j)
+                # 我们尝试用 boxes[prev_j : j] 来匹配 cell[i]
+                # prev_j 是上一个单元格结束的位置
+                
+                # 限制搜索范围,提高性能
+                # 假设中间跳过的噪音 box 不超过 3 个
+                search_start = max(0, j - MAX_MERGE - 3)
+                
+                for prev_j in range(j - 1, search_start - 1, -1):
+                    if dp[i-1][prev_j] == -np.inf:
+                        continue
+                        
+                    # 取出中间的 boxes (prev_j 到 j)
+                    # 注意:这里我们允许中间有一些 box 被跳过(视为噪音),
+                    # 但为了简化 DP,我们通常假设连续取用,或者只取末尾的 k 个
+                    
+                    # 简化版:尝试合并 boxes[prev_j:j] 中的所有内容
+                    candidate_boxes = row_boxes[prev_j:j]
+                    
+                    # 组合文本
+                    merged_text = "".join([b['text'] for b in candidate_boxes])
+                    # 或者用空格连接,视情况而定
+                    merged_text_spaced = " ".join([b['text'] for b in candidate_boxes])
+                    
+                    # 计算得分
+                    score = self._compute_match_score(cell_text, merged_text_spaced)
+                    
+                    if score > 60: # 只有及格的匹配才考虑
+                        new_score = dp[i-1][prev_j] + score
+                        if new_score > dp[i][j]:
+                            dp[i][j] = new_score
+                            path[(i, j)] = (prev_j, {
+                                'text': merged_text_spaced,
+                                'boxes': candidate_boxes,
+                                'score': score
+                            })
+
+        # --- 回溯找最优解 ---
+        # 找到最后一行得分最高的 j
+        best_j = np.argmax(dp[n_cells])
+        if dp[n_cells][best_j] == -np.inf:
+            return [] # 匹配完全失败
+            
+        results = []
+        curr_i, curr_j = n_cells, best_j
+        
+        while curr_i > 0:
+            prev_j, match_info = path.get((curr_i, curr_j), (curr_j, None))
+            
+            if match_info:
+                # 构造结果
+                cell_idx = curr_i - 1
+                # 这里需要把结果存下来,最后反转
+                results.append({
+                    'cell_idx': cell_idx,
+                    'match_info': match_info
+                })
+            
+            curr_i -= 1
+            curr_j = prev_j
+            
+        return results[::-1]
+```
+
+### 💡 为什么这个方案更好?
+
+1.  **全局最优**:
+    *   **场景**:Cell 1=`2024`, Cell 2=`ID2024...`。
+    *   **贪婪**:Cell 1 看到 `ID2024...`,部分匹配 100分,抢走。Cell 2 没得吃,0分。总分 100。
+    *   **DP**:
+        *   方案 A (Cell 1 抢走): Cell 1 得 100,Cell 2 得 0。总分 100。
+        *   方案 B (Cell 1 跳过): Cell 1 匹配后面的短日期 (100分),Cell 2 匹配长ID (100分)。总分 200。
+    *   **结果**:DP 自动选择方案 B。
+
+2.  **代码清晰**:
+    *   `_compute_match_score` 只有评分逻辑,没有循环和指针。
+    *   `_match_cells_in_row_dp` 只有路径规划,没有复杂的字符串处理。
+
+3.  **天然处理合并**:
+    *   DP 的内层循环 `candidate_boxes = row_boxes[prev_j:j]` 天然支持将多个 Box 合并给一个 Cell,不需要单独写 `merged_bboxes` 逻辑。
+