""" 表格单元格匹配器 负责将 HTML 表格单元格与 PaddleOCR bbox 进行匹配 """ from typing import List, Dict, Tuple, Optional from bs4 import BeautifulSoup import numpy as np try: from rapidfuzz import fuzz except ImportError: from fuzzywuzzy import fuzz try: from .text_matcher import TextMatcher from .bbox_extractor import BBoxExtractor except ImportError: from text_matcher import TextMatcher from bbox_extractor import BBoxExtractor class TableCellMatcher: """表格单元格匹配器""" def __init__(self, text_matcher: TextMatcher, x_tolerance: int = 3, y_tolerance: int = 10, inclination_threshold: float = 0.3): """ Args: text_matcher: 文本匹配器 x_tolerance: X轴容差(用于列边界判断) y_tolerance: Y轴容差(用于行分组) inclination_threshold: 倾斜校正阈值(度数) """ self.text_matcher = text_matcher self.x_tolerance = x_tolerance self.y_tolerance = y_tolerance self.inclination_threshold = inclination_threshold # 倾斜校正阈值(度数) def enhance_table_html_with_bbox(self, html: str, paddle_text_boxes: List[Dict], start_pointer: int, table_bbox: Optional[List[int]] = None) -> Tuple[str, List[Dict], int]: """ 为 HTML 表格添加 bbox 信息(优化版:使用行级动态规划) """ soup = BeautifulSoup(html, 'html.parser') cells = [] # 🔑 第一步:筛选表格区域内的 paddle boxes table_region_boxes, actual_table_bbox = self._filter_boxes_in_table_region( paddle_text_boxes[start_pointer:], table_bbox, html ) if not table_region_boxes: print(f"⚠️ 未在表格区域找到 paddle boxes") return str(soup), cells, start_pointer print(f"📊 表格区域: {len(table_region_boxes)} 个文本框") # 🔑 第二步:将表格区域的 boxes 按行分组 grouped_boxes = self._group_paddle_boxes_by_rows( table_region_boxes, y_tolerance=self.y_tolerance, auto_correct_skew=True, inclination_threshold=self.inclination_threshold ) # 🔑 第三步:在每组内按 x 坐标排序 for group in grouped_boxes: group['boxes'].sort(key=lambda x: x['bbox'][0]) grouped_boxes.sort(key=lambda g: g['y_center']) # 🔑 第四步:智能匹配 HTML 行与 paddle 行组 html_rows = soup.find_all('tr') row_mapping = self._match_html_rows_to_paddle_groups(html_rows, grouped_boxes) print(f" HTML行: {len(html_rows)} 行, 映射: {len([v for v in row_mapping.values() if v])} 个有效映射") # 🔑 第五步:遍历 HTML 表格,使用 DP 进行行内匹配 for row_idx, row in enumerate(html_rows): group_indices = row_mapping.get(row_idx, []) if not group_indices: continue # 合并多个组的 boxes current_boxes = [] for group_idx in group_indices: if group_idx < len(grouped_boxes): current_boxes.extend(grouped_boxes[group_idx]['boxes']) # 再次按 x 排序确保顺序 current_boxes.sort(key=lambda x: x['bbox'][0]) html_cells = row.find_all(['td', 'th']) if not html_cells: continue # 🎯 核心变更:使用行级 DP 替代原来的顺序匹配 # 输入:HTML 单元格列表, OCR Box 列表 # 输出:匹配结果列表 dp_results = self._match_cells_in_row_dp(html_cells, current_boxes) print(f" 行 {row_idx + 1}: {len(html_cells)} 列, 匹配到 {len(dp_results)} 个单元格") # 解析 DP 结果并填充 cells 列表 for res in dp_results: cell_idx = res['cell_idx'] match_info = res['match_info'] cell_element = html_cells[cell_idx] cell_text = cell_element.get_text(strip=True) matched_boxes = match_info['boxes'] matched_text = match_info['text'] score = match_info['score'] # 标记 box 为已使用 paddle_indices = [] for box in matched_boxes: box['used'] = True paddle_indices.append(box.get('paddle_bbox_index', -1)) # 计算合并后的 bbox (使用原始坐标 original_bbox 优先) merged_bbox = self._merge_boxes_bbox(matched_boxes) # 注入 HTML 属性 cell_element['data-bbox'] = f"[{merged_bbox[0]},{merged_bbox[1]},{merged_bbox[2]},{merged_bbox[3]}]" cell_element['data-score'] = f"{score:.4f}" cell_element['data-paddle-indices'] = str(paddle_indices) # 构建返回结构 (保持与原函数一致) cells.append({ 'type': 'table_cell', 'text': cell_text, 'matched_text': matched_text, 'bbox': merged_bbox, 'row': row_idx + 1, 'col': cell_idx + 1, 'score': score, 'paddle_bbox_indices': paddle_indices }) print(f" 列 {cell_idx + 1}: '{cell_text[:15]}...' 匹配 {len(matched_boxes)} 个box (分值: {score:.1f})") # 计算新的指针位置 (逻辑保持不变:基于 used 标记) used_count = sum(1 for box in table_region_boxes if box.get('used')) new_pointer = start_pointer + used_count print(f" 总计匹配: {len(cells)} 个单元格") return str(soup), cells, new_pointer def _merge_boxes_bbox(self, boxes: List[Dict]) -> List[int]: """辅助函数:合并多个 box 的坐标""" if not boxes: return [0, 0, 0, 0] # 优先使用 original_bbox,如果没有则使用 bbox def get_coords(b): return b.get('original_bbox', b['bbox']) x1 = min(get_coords(b)[0] for b in boxes) y1 = min(get_coords(b)[1] for b in boxes) x2 = max(get_coords(b)[2] for b in boxes) y2 = max(get_coords(b)[3] for b in boxes) return [x1, y1, x2, y2] def _match_cells_in_row_dp(self, html_cells: List, row_boxes: List[Dict]) -> List[Dict]: """ 使用动态规划进行行内单元格匹配 目标:找到一种分配方案,使得整行的匹配总分最高 """ n_cells = len(html_cells) n_boxes = len(row_boxes) # dp[i][j] 表示:前 i 个单元格 消耗了 前 j 个 boxes 的最大得分 dp = np.full((n_cells + 1, n_boxes + 1), -np.inf) dp[0][0] = 0 # path[i][j] = (prev_j, matched_info) 用于回溯 path = {} # 允许合并的最大 box 数量 MAX_MERGE = 5 for i in range(1, n_cells + 1): cell = html_cells[i-1] cell_text = cell.get_text(strip=True) # 如果单元格为空,允许继承状态(相当于跳过该单元格) if not cell_text: for j in range(n_boxes + 1): if dp[i-1][j] > -np.inf: dp[i][j] = dp[i-1][j] path[(i, j)] = (j, None) continue # 遍历当前 box 指针 j for j in range(n_boxes + 1): # 策略 A: 当前单元格不匹配任何 box (Cell Missing / OCR漏检) if dp[i-1][j] > dp[i][j]: dp[i][j] = dp[i-1][j] path[(i, j)] = (j, None) # 策略 B: 当前单元格匹配了 k 个 boxes (从 prev_j 到 j) # 限制搜索范围:最多往前看 MAX_MERGE 个 box search_limit = max(0, j - MAX_MERGE) # 允许中间跳过少量噪音 box (例如 prev_j 到 j 之间跨度大,但只取了部分) # 但为了简化,这里假设是连续取用 row_boxes[prev_j:j] for prev_j in range(j - 1, search_limit - 1, -1): if dp[i-1][prev_j] == -np.inf: continue candidate_boxes = row_boxes[prev_j:j] # 组合文本 (使用空格连接) merged_text = " ".join([b['text'] for b in candidate_boxes]) # 计算得分 score = self._compute_match_score(cell_text, merged_text) # 只有及格的匹配才考虑 if score > 50: new_score = dp[i-1][prev_j] + score if new_score > dp[i][j]: dp[i][j] = new_score path[(i, j)] = (prev_j, { 'text': merged_text, 'boxes': candidate_boxes, 'score': score }) # --- 回溯找最优解 --- best_j = np.argmax(dp[n_cells]) if dp[n_cells][best_j] == -np.inf: return [] results = [] curr_i, curr_j = n_cells, best_j while curr_i > 0: step_info = path.get((curr_i, curr_j)) if step_info: prev_j, match_info = step_info if match_info: results.append({ 'cell_idx': curr_i - 1, 'match_info': match_info }) curr_j = prev_j curr_i -= 1 return results[::-1] def _compute_match_score(self, cell_text: str, box_text: str) -> float: """ 纯粹的评分函数:计算单元格文本与候选 Box 文本的匹配得分 包含所有防御逻辑 """ # 1. 预处理 cell_norm = self.text_matcher.normalize_text(cell_text) box_norm = self.text_matcher.normalize_text(box_text) if not cell_norm or not box_norm: return 0.0 # --- ⚡️ 快速防御 --- len_cell = len(cell_norm) len_box = len(box_norm) # 长度差异过大直接 0 分 (除非是包含关系且特征明显) if len_box > len_cell * 3 + 5: if len_cell < 5: return 0.0 # --- 🔍 核心相似度计算 --- cell_proc = self._preprocess_text_for_matching(cell_text) box_proc = self._preprocess_text_for_matching(box_text) # A. Token Sort (解决乱序) score_sort = fuzz.token_sort_ratio(cell_proc, box_proc) # B. Partial (解决截断/包含) score_partial = fuzz.partial_ratio(cell_norm, box_norm) # C. Subsequence (解决噪音插入) score_subseq = 0.0 if len_cell > 5: score_subseq = self._calculate_subsequence_score(cell_norm, box_norm) # --- 🛡️ 深度防御逻辑 --- # 1. 短文本防御 if score_partial > 80: import re has_content = lambda t: bool(re.search(r'[a-zA-Z0-9\u4e00-\u9fa5]', t)) # 纯符号防御 if not has_content(cell_norm) and has_content(box_norm): if len_box > len_cell + 2: score_partial = 0.0 # 微小碎片防御 elif len_cell <= 2 and len_box > 8: score_partial = 0.0 # 覆盖率防御 else: coverage = len_cell / len_box if len_box > 0 else 0 if coverage < 0.3 and score_sort < 45: score_partial = 0.0 # 2. 子序列防御 if score_subseq > 80: if len_box > len_cell * 1.5: import re if re.match(r'^[\d\-\:\.\s]+$', cell_norm) and len_cell < 12: score_subseq = 0.0 # --- 📊 综合评分 --- final_score = max(score_sort, score_partial, score_subseq) # 精确匹配奖励 if cell_norm == box_norm: final_score = 100.0 elif cell_norm in box_norm: final_score = min(100, final_score + 5) return final_score def _filter_boxes_in_table_region(self, paddle_boxes: List[Dict], table_bbox: Optional[List[int]], html: str) -> Tuple[List[Dict], List[int]]: """ 筛选表格区域内的 paddle boxes 策略: 1. 如果有 table_bbox,使用边界框筛选(扩展边界) 2. 如果没有 table_bbox,通过内容匹配推断区域 Args: paddle_boxes: paddle OCR 结果 table_bbox: 表格边界框 [x1, y1, x2, y2] html: HTML 内容(用于内容验证) Returns: (筛选后的 boxes, 实际表格边界框) """ if not paddle_boxes: return [], [0, 0, 0, 0] # 🎯 策略 1: 使用提供的 table_bbox(扩展边界) if table_bbox and len(table_bbox) == 4: x1, y1, x2, y2 = table_bbox # 扩展边界(考虑边框外的文本) margin = 20 expanded_bbox = [ max(0, x1 - margin), max(0, y1 - margin), x2 + margin, y2 + margin ] filtered = [] for box in paddle_boxes: bbox = box['bbox'] box_center_x = (bbox[0] + bbox[2]) / 2 box_center_y = (bbox[1] + bbox[3]) / 2 # 中心点在扩展区域内 if (expanded_bbox[0] <= box_center_x <= expanded_bbox[2] and expanded_bbox[1] <= box_center_y <= expanded_bbox[3]): filtered.append(box) if filtered: # 计算实际边界框 actual_bbox = [ min(b['bbox'][0] for b in filtered), min(b['bbox'][1] for b in filtered), max(b['bbox'][2] for b in filtered), max(b['bbox'][3] for b in filtered) ] return filtered, actual_bbox # 🎯 策略 2: 通过内容匹配推断区域 print(" ℹ️ 无 table_bbox,使用内容匹配推断表格区域...") # 提取 HTML 中的所有文本 from bs4 import BeautifulSoup soup = BeautifulSoup(html, 'html.parser') html_texts = set() for cell in soup.find_all(['td', 'th']): text = cell.get_text(strip=True) if text: html_texts.add(self.text_matcher.normalize_text(text)) if not html_texts: return [], [0, 0, 0, 0] # 找出与 HTML 内容匹配的 boxes matched_boxes = [] for box in paddle_boxes: normalized_text = self.text_matcher.normalize_text(box['text']) # 检查是否匹配 if any(normalized_text in ht or ht in normalized_text for ht in html_texts): matched_boxes.append(box) if not matched_boxes: # 🔑 降级:如果精确匹配失败,使用模糊匹配 print(" ℹ️ 精确匹配失败,尝试模糊匹配...") for box in paddle_boxes: normalized_text = self.text_matcher.normalize_text(box['text']) for ht in html_texts: similarity = fuzz.partial_ratio(normalized_text, ht) if similarity >= 70: # 降低阈值 matched_boxes.append(box) break if matched_boxes: # 计算边界框 actual_bbox = [ min(b['bbox'][0] for b in matched_boxes), min(b['bbox'][1] for b in matched_boxes), max(b['bbox'][2] for b in matched_boxes), max(b['bbox'][3] for b in matched_boxes) ] # 🔑 扩展边界,包含可能遗漏的文本 margin = 30 expanded_bbox = [ max(0, actual_bbox[0] - margin), max(0, actual_bbox[1] - margin), actual_bbox[2] + margin, actual_bbox[3] + margin ] # 重新筛选(包含边界上的文本) final_filtered = [] for box in paddle_boxes: bbox = box['bbox'] box_center_x = (bbox[0] + bbox[2]) / 2 box_center_y = (bbox[1] + bbox[3]) / 2 if (expanded_bbox[0] <= box_center_x <= expanded_bbox[2] and expanded_bbox[1] <= box_center_y <= expanded_bbox[3]): final_filtered.append(box) return final_filtered, actual_bbox # 🔑 最后的降级:返回所有 boxes print(" ⚠️ 无法确定表格区域,使用所有 paddle boxes") if paddle_boxes: actual_bbox = [ min(b['bbox'][0] for b in paddle_boxes), min(b['bbox'][1] for b in paddle_boxes), max(b['bbox'][2] for b in paddle_boxes), max(b['bbox'][3] for b in paddle_boxes) ] return paddle_boxes, actual_bbox return [], [0, 0, 0, 0] def _group_paddle_boxes_by_rows(self, paddle_boxes: List[Dict], y_tolerance: int = 10, auto_correct_skew: bool = True, inclination_threshold: float = 0.3) -> List[Dict]: """ 将 paddle_text_boxes 按 y 坐标分组(聚类)- 增强版本 Args: paddle_boxes: Paddle OCR 文字框列表 y_tolerance: Y 坐标容忍度(像素) auto_correct_skew: 是否自动校正倾斜 Returns: 分组列表,每组包含 {'y_center': float, 'boxes': List[Dict]} """ if not paddle_boxes: return [] # 🎯 步骤 1: 检测并校正倾斜(使用 BBoxExtractor) if auto_correct_skew: rotation_angle = BBoxExtractor.calculate_skew_angle(paddle_boxes) if abs(rotation_angle) > inclination_threshold: max_x = max(box['bbox'][2] for box in paddle_boxes) max_y = max(box['bbox'][3] for box in paddle_boxes) image_size = (max_x, max_y) print(f" 🔧 校正倾斜角度: {rotation_angle:.2f}°") paddle_boxes = BBoxExtractor.correct_boxes_skew( paddle_boxes, -rotation_angle, image_size ) # 🎯 步骤 2: 按校正后的 y 坐标分组 boxes_with_y = [] for box in paddle_boxes: bbox = box['bbox'] y_center = (bbox[1] + bbox[3]) / 2 boxes_with_y.append({ 'y_center': y_center, 'box': box }) # 按 y 坐标排序 boxes_with_y.sort(key=lambda x: x['y_center']) groups = [] current_group = None for item in boxes_with_y: if current_group is None: # 开始新组 current_group = { 'y_center': item['y_center'], 'boxes': [item['box']] } else: if abs(item['y_center'] - current_group['y_center']) <= y_tolerance: current_group['boxes'].append(item['box']) # 更新组的中心 current_group['y_center'] = sum( (b['bbox'][1] + b['bbox'][3]) / 2 for b in current_group['boxes'] ) / len(current_group['boxes']) else: groups.append(current_group) current_group = { 'y_center': item['y_center'], 'boxes': [item['box']] } if current_group: groups.append(current_group) print(f" ✓ 分组完成: {len(groups)} 行") return groups def _match_html_rows_to_paddle_groups(self, html_rows: List, grouped_boxes: List[Dict]) -> Dict[int, List[int]]: """ 智能匹配 HTML 行与 paddle 分组(增强版 DP:支持跳过 HTML 行,防止链条断裂) """ if not html_rows or not grouped_boxes: return {} mapping = {} # 🎯 策略 1: 数量相等,简单 1:1 映射 if len(html_rows) == len(grouped_boxes): for i in range(len(html_rows)): mapping[i] = [i] return mapping # --- 准备数据 --- # 提取 HTML 文本 html_row_texts = [] for row in html_rows: cells = row.find_all(['td', 'th']) texts = [self.text_matcher.normalize_text(c.get_text(strip=True)) for c in cells] html_row_texts.append("".join(texts)) # 预计算所有组的文本 group_texts = [] for group in grouped_boxes: boxes = group['boxes'] texts = [self.text_matcher.normalize_text(b['text']) for b in boxes] group_texts.append("".join(texts)) n_html = len(html_row_texts) n_paddle = len(grouped_boxes) # ⚡️ 优化 3: 预计算合并文本 MAX_MERGE = 4 merged_cache = {} for j in range(n_paddle): current_t = "" for k in range(MAX_MERGE): if j + k < n_paddle: current_t += group_texts[j + k] merged_cache[(j, k + 1)] = current_t else: break # --- 动态规划 (DP) --- # dp[i][j] 表示:HTML 前 i 行 (0..i) 匹配到了 Paddle 的前 j 组 (0..j) 的最大得分 # 初始化为负无穷 dp = np.full((n_html, n_paddle), -np.inf) # 记录路径:path[i][j] = (prev_j, start_j) # prev_j: 上一行结束的 paddle index # start_j: 当前行开始的 paddle index (因为一行可能对应多个组) path = {} # 参数配置 SEARCH_WINDOW = 15 # 向前搜索窗口 SKIP_PADDLE_PENALTY = 0.1 # 跳过 Paddle 组的惩罚 SKIP_HTML_PENALTY = 0.3 # 关键:跳过 HTML 行的惩罚 # --- 1. 初始化第一行 --- # 选项 A: 匹配 Paddle 组 for end_j in range(min(n_paddle, SEARCH_WINDOW + MAX_MERGE)): for count in range(1, MAX_MERGE + 1): start_j = end_j - count + 1 if start_j < 0: continue current_text = merged_cache.get((start_j, count), "") similarity = self._calculate_similarity(html_row_texts[0], current_text) penalty = start_j * SKIP_PADDLE_PENALTY score = similarity - penalty # 只有得分尚可才作为有效状态 if score > 0.1: if score > dp[0][end_j]: dp[0][end_j] = score path[(0, end_j)] = (-1, start_j) # 选项 B: 第一行就跳过 (虽然少见,但为了完整性) # 如果第一行跳过,相当于没有消耗任何 paddle 组,状态难以用 dp[0][j] 表达 # 这里简化处理,假设第一行必须匹配点什么,或者由后续行修正 # --- 2. 状态转移 --- for i in range(1, n_html): html_text = html_row_texts[i] # 获取上一行所有有效位置 valid_prev_indices = [j for j in range(n_paddle) if dp[i-1][j] > -np.inf] # 剪枝 if len(valid_prev_indices) > 30: valid_prev_indices.sort(key=lambda j: dp[i-1][j], reverse=True) valid_prev_indices = valid_prev_indices[:30] # 🛡️ 关键修复:允许跳过当前 HTML 行 (继承上一行的状态) # 如果跳过当前行,Paddle 指针 j 不变 for prev_j in valid_prev_indices: score_skip = dp[i-1][prev_j] - SKIP_HTML_PENALTY if score_skip > dp[i][prev_j]: dp[i][prev_j] = score_skip # 记录路径:start_j = prev_j + 1 表示没有消耗新组 (空范围) path[(i, prev_j)] = (prev_j, prev_j + 1) # 如果是空行,直接跳过计算,仅保留继承的状态 if not html_text: continue # 正常匹配逻辑 for prev_j in valid_prev_indices: prev_score = dp[i-1][prev_j] max_gap = min(SEARCH_WINDOW, n_paddle - prev_j - 1) for gap in range(max_gap): start_j = prev_j + 1 + gap for count in range(1, MAX_MERGE + 1): end_j = start_j + count - 1 if end_j >= n_paddle: break current_text = merged_cache.get((start_j, count), "") # 长度预筛选 h_len = len(html_text) p_len = len(current_text) if h_len > 10 and p_len < h_len * 0.2: continue similarity = self._calculate_similarity(html_text, current_text) # 计算惩罚 # 1. 跳过惩罚 (gap) # 2. 长度惩罚 (防止过度合并) len_penalty = 0.0 if h_len > 0: ratio = p_len / h_len if ratio > 2.0: len_penalty = (ratio - 2.0) * 0.2 current_score = similarity - (gap * SKIP_PADDLE_PENALTY) - len_penalty # 只有正收益才转移 if current_score > 0.1: total_score = prev_score + current_score if total_score > dp[i][end_j]: dp[i][end_j] = total_score path[(i, end_j)] = (prev_j, start_j) # --- 3. 回溯找最优路径 --- # 找到最后一行得分最高的结束位置 best_end_j = -1 max_score = -np.inf # 优先找最后一行,如果最后一行没匹配上,往前找 found_end = False for i in range(n_html - 1, -1, -1): for j in range(n_paddle): if dp[i][j] > max_score: max_score = dp[i][j] best_end_j = j best_last_row = i if max_score > -np.inf: found_end = True break mapping = {} used_groups = set() if found_end: curr_i = best_last_row curr_j = best_end_j while curr_i >= 0: if (curr_i, curr_j) in path: prev_j, start_j = path[(curr_i, curr_j)] # 如果 start_j <= curr_j,说明消耗了 Paddle 组 # 如果 start_j > curr_j,说明是跳过 HTML 行 (空范围) if start_j <= curr_j: indices = list(range(start_j, curr_j + 1)) mapping[curr_i] = indices used_groups.update(indices) else: mapping[curr_i] = [] curr_j = prev_j curr_i -= 1 else: break # 填补未匹配的行 for i in range(n_html): if i not in mapping: mapping[i] = [] # --- 4. 后处理:未匹配组的归属 (Orphans) --- unused_groups = [i for i in range(len(grouped_boxes)) if i not in used_groups] if unused_groups: print(f" ℹ️ 发现 {len(unused_groups)} 个未匹配的 paddle 组: {unused_groups}") for unused_idx in unused_groups: unused_group = grouped_boxes[unused_idx] unused_y_min = min(b['bbox'][1] for b in unused_group['boxes']) unused_y_max = max(b['bbox'][3] for b in unused_group['boxes']) above_idx = None below_idx = None above_distance = float('inf') below_distance = float('inf') for i in range(unused_idx - 1, -1, -1): if i in used_groups: above_idx = i above_group = grouped_boxes[i] max_y_box = max(above_group['boxes'], key=lambda b: b['bbox'][3]) above_y_center = (max_y_box['bbox'][1] + max_y_box['bbox'][3]) / 2 above_distance = abs(unused_y_min - above_y_center) break for i in range(unused_idx + 1, len(grouped_boxes)): if i in used_groups: below_idx = i below_group = grouped_boxes[i] min_y_box = min(below_group['boxes'], key=lambda b: b['bbox'][1]) below_y_center = (min_y_box['bbox'][1] + min_y_box['bbox'][3]) / 2 below_distance = abs(below_y_center - unused_y_max) break closest_used_idx = None merge_direction = "" if above_idx is not None and below_idx is not None: if above_distance < below_distance: closest_used_idx = above_idx merge_direction = "上方" else: closest_used_idx = below_idx merge_direction = "下方" elif above_idx is not None: closest_used_idx = above_idx merge_direction = "上方" elif below_idx is not None: closest_used_idx = below_idx merge_direction = "下方" if closest_used_idx is not None: target_html_row = None for html_row_idx, group_indices in mapping.items(): if closest_used_idx in group_indices: target_html_row = html_row_idx break if target_html_row is not None: if unused_idx not in mapping[target_html_row]: mapping[target_html_row].append(unused_idx) mapping[target_html_row].sort() print(f" • 组 {unused_idx} 合并到 HTML 行 {target_html_row}({merge_direction}行)") used_groups.add(unused_idx) # 🔑 策略 4: 第三遍 - 按 y 坐标排序每行的组索引 for row_idx in mapping: if mapping[row_idx]: mapping[row_idx].sort(key=lambda idx: grouped_boxes[idx]['y_center']) return mapping def _calculate_similarity(self, text1: str, text2: str) -> float: """ 计算两个文本的相似度,结合字符覆盖率和序列相似度 (性能优化版) """ if not text1 or not text2: return 0.0 len1, len2 = len(text1), len(text2) # ⚡️ 优化 1: 长度快速检查 # 如果长度差异过大(例如一个 50 字符,一个 2 字符),直接认为不匹配 if len1 > 0 and len2 > 0: min_l, max_l = min(len1, len2), max(len1, len2) if max_l > 10 and min_l / max_l < 0.2: return 0.0 # 1. 字符覆盖率 (Character Overlap) from collections import Counter c1 = Counter(text1) c2 = Counter(text2) intersection = c1 & c2 overlap_count = sum(intersection.values()) coverage = overlap_count / len1 if len1 > 0 else 0 # ⚡️ 优化 2: 覆盖率低时跳过昂贵的 fuzz 计算 # 如果字符重叠率低于 30%,说明内容基本不相关,没必要算序列相似度 if coverage < 0.3: return coverage * 0.7 # 2. 序列相似度 (Sequence Similarity) # 使用 token_sort_ratio 来容忍一定的乱序 seq_score = fuzz.token_sort_ratio(text1, text2) / 100.0 return (coverage * 0.7) + (seq_score * 0.3) def _preprocess_text_for_matching(self, text: str) -> str: """ 预处理文本:在不同类型的字符(如中文和数字/英文)之间插入空格, 以便于 token_sort_ratio 更准确地进行分词和匹配。 """ if not text: return "" import re # 1. 在中文和非中文(数字/字母)之间插入空格 # 例如: "2024年" -> "2024 年", "ID号码123" -> "ID号码 123" text = re.sub(r'([\u4e00-\u9fa5])([a-zA-Z0-9])', r'\1 \2', text) text = re.sub(r'([a-zA-Z0-9])([\u4e00-\u9fa5])', r'\1 \2', text) return text def _calculate_subsequence_score(self, target: str, source: str) -> float: """ 计算子序列匹配得分 (解决 OCR 噪音插入问题) 例如: Target="12345", Source="12(date)34(time)5" -> Score close to 100 """ # 1. 仅保留字母和数字,忽略符号干扰 t_clean = "".join(c for c in target if c.isalnum()) s_clean = "".join(c for c in source if c.isalnum()) if not t_clean or not s_clean: return 0.0 # 2. 贪婪匹配子序列 t_idx, s_idx = 0, 0 matches = 0 while t_idx < len(t_clean) and s_idx < len(s_clean): if t_clean[t_idx] == s_clean[s_idx]: matches += 1 t_idx += 1 s_idx += 1 else: # 跳过 source 中的噪音字符 s_idx += 1 # 3. 计算得分 match_rate = matches / len(t_clean) # 如果匹配率太低,直接返回 if match_rate < 0.8: return match_rate * 100 # 4. 噪音惩罚 (防止 Target="1", Source="123456789" 这种误判) # 计算噪音长度 noise_len = len(s_clean) - matches # 允许一定比例的噪音 (例如日期时间插入,通常占总长度的 30%-50%) # 如果噪音长度超过目标长度的 60%,开始扣分 penalty = 0 if noise_len > len(t_clean) * 0.6: excess_noise = noise_len - (len(t_clean) * 0.6) penalty = excess_noise * 0.5 # 每多一个噪音字符扣 0.5 分 penalty = min(penalty, 20) # 最多扣 20 分 final_score = (match_rate * 100) - penalty return max(0, final_score)