浏览代码

feat: 调整段落和表头相似度阈值,新增Markdown格式处理和标点差异检查

zhch158_admin 1 月之前
父节点
当前提交
977fe61923
共有 1 个文件被更改,包括 304 次插入53 次删除
  1. 304 53
      compare_ocr_results.py

+ 304 - 53
compare_ocr_results.py

@@ -12,10 +12,11 @@ from fuzzywuzzy import fuzz
 class OCRResultComparator:
     def __init__(self):
         self.differences = []
-        self.similarity_threshold = 95
+        self.paragraph_match_threshold = 80  # 段落相似度阈值, 大于80代表段落匹配,<100,表示存在差异,小于80代表段落不匹配
+        self.content_similarity_threshold = 95  # 段落匹配,比较内容,大于95认为无差异
         self.max_paragraph_window = 6
         self.table_comparison_mode = 'standard'  # 新增:表格比较模式
-        self.header_similarity_threshold = 80  # 表头相似度阈值
+        self.header_similarity_threshold = 90  # 表头相似度阈值
     
     def normalize_text(self, text: str) -> str:
         """标准化文本:去除多余空格、回车等无效字符"""
@@ -244,62 +245,262 @@ class OCRResultComparator:
         
         return max(similarity_scores)
     
+    def strip_markdown_formatting(self, text: str) -> str:
+        """移除Markdown格式标记,只保留纯文本内容"""
+        if not text:
+            return ""
+        
+        # 移除标题标记 (# ## ### 等)
+        text = re.sub(r'^#+\s*', '', text)
+        
+        # 移除粗体标记 (**text** 或 __text__)
+        text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
+        text = re.sub(r'__(.+?)__', r'\1', text)
+        
+        # 移除斜体标记 (*text* 或 _text_)
+        text = re.sub(r'\*(.+?)\*', r'\1', text)
+        text = re.sub(r'_(.+?)_', r'\1', text)
+        
+        # 移除链接 [text](url)
+        text = re.sub(r'\[(.+?)\]\(.+?\)', r'\1', text)
+        
+        # 移除图片引用 ![alt](url)
+        text = re.sub(r'!\[.*?\]\(.+?\)', '', text)
+        
+        # 移除代码标记 `code`
+        text = re.sub(r'`(.+?)`', r'\1', text)
+        
+        # 移除HTML标签
+        text = re.sub(r'<[^>]+>', '', text)
+        
+        # 移除列表标记 (- * + 1. 2. 等)
+        text = re.sub(r'^\s*[-*+]\s+', '', text)
+        text = re.sub(r'^\s*\d+\.\s+', '', text)
+        
+        # 移除引用标记 (>)
+        text = re.sub(r'^\s*>\s+', '', text)
+        
+        # 标准化空白字符
+        text = re.sub(r'\s+', ' ', text.strip())
+        
+        return text
+
+    def normalize_text_for_comparison(self, text: str) -> str:
+        """
+        用于比较的文本标准化:移除格式 + 标准化空白 + 统一标点
+        
+        Args:
+            text: 原始文本
+        
+        Returns:
+            标准化后的纯文本
+        """
+        # 第一步:移除Markdown格式
+        text = self.strip_markdown_formatting(text)
+        
+        # 第二步:统一标点符号(中英文转换)
+        text = self.normalize_punctuation(text)
+        
+        # 第三步:标准化空白字符
+        text = self.normalize_text(text)
+        
+        return text
+
+    def normalize_punctuation(self, text: str) -> str:
+        """
+        统一标点符号 - 将中文标点转换为英文标点
+    
+        Args:
+            text: 原始文本
+    
+        Returns:
+            标点统一后的文本
+        """
+        if not text:
+            return ""
+        
+        # 中文标点到英文标点的映射
+        punctuation_map = {
+            ':': ':',   # 冒号
+            ';': ';',   # 分号
+            ',': ',',   # 逗号
+            '。': '.',   # 句号
+            '!': '!',   # 感叹号
+            '?': '?',   # 问号
+            '(': '(',   # 左括号
+            ')': ')',   # 右括号
+            '【': '[',   # 左方括号
+            '】': ']',   # 右方括号
+            '《': '<',   # 左书名号
+            '》': '>',   # 右书名号
+            '"': '"',    # 左双引号
+            '"': '"',    # 右双引号
+            ''': "'",    # 左单引号
+            ''': "'",    # 右单引号
+            '、': ',',   # 顿号
+            '—': '-',    # 破折号
+            '…': '...',  # 省略号
+            '~': '~',   # 波浪号
+        }
+        
+        for cn_punct, en_punct in punctuation_map.items():
+            text = text.replace(cn_punct, en_punct)
+        
+        return text
+
+    def check_punctuation_differences(self, text1: str, text2: str) -> List[Dict]:
+        """
+        检查两段文本的标点符号差异
+    
+        Args:
+            text1: 文本1
+            text2: 文本2
+    
+        Returns:
+            标点差异列表
+        """
+        differences = []
+    
+        # 如果标准化后相同,说明只有标点差异
+        normalized1 = self.normalize_punctuation(text1)
+        normalized2 = self.normalize_punctuation(text2)
+    
+        if normalized1 == normalized2 and text1 != text2:
+            # 找出具体的标点差异位置
+            min_len = min(len(text1), len(text2))
+            
+            for i in range(min_len):
+                if text1[i] != text2[i]:
+                    # 检查是否是全角半角标点的差异
+                    char1 = text1[i]
+                    char2 = text2[i]
+                    
+                    # 使用normalize_punctuation检查是否是对应的全角半角
+                    if self.normalize_punctuation(char1) == self.normalize_punctuation(char2):
+                        # 提取上下文(前后各3个字符)
+                        start = max(0, i - 3)
+                        end = min(len(text1), i + 4)
+                        context1 = text1[start:end]
+                        context2 = text2[start:end]
+                        
+                        differences.append({
+                            'position': i,
+                            'char1': char1,
+                            'char2': char2,
+                            'context1': context1,
+                            'context2': context2,
+                            'type': 'full_half_width'
+                        })
+    
+        return differences
+
     def compare_paragraphs_with_flexible_matching(self, paras1: List[str], paras2: List[str]) -> List[Dict]:
         """改进的段落匹配算法 - 更好地处理段落重组"""
+        """_summary_
+        paras1: 文件1的段落列表
+        paras2: 文件2的段落列表
+        paras1和paras2中的段落顺序有可能不一致,需要对窗口内的段落进行匹配,窗口的段落的顺序可以不一样
+        para1和para2中的段落可能存在合并或拆分的情况,需要考虑这种情况
+        """
         differences = []
+    
+        # ✅ 预处理:移除格式并统一标点(用于匹配)
+        normalized_paras1 = [self.normalize_text_for_comparison(p) for p in paras1]
+        normalized_paras2 = [self.normalize_text_for_comparison(p) for p in paras2]
         
-        # 直接调用进行预处理
-        meaningful_paras1 = paras1
-        meaningful_paras2 = paras2
+        # 但保留原始文本(用于差异检测)
+        original_paras1 = [self.strip_markdown_formatting(p) for p in paras1]
+        original_paras2 = [self.strip_markdown_formatting(p) for p in paras2]
 
         # 使用预处理后的段落进行匹配
         used_paras1 = set()
         used_paras2 = set()
-        
-        best_match = {'similarity': 0.0}  # 初始化best_match
-        # 文件1和文件2同时向下遍历,当有匹配项时,文件2的窗口从匹配项的下一个位置开始
-        paras2_idx = 0
-        for window_size1 in range(1, min(self.max_paragraph_window, len(meaningful_paras1) + 1)):  # 增加到6个段落
-            for i in range(len(meaningful_paras1) - window_size1 + 1):
+    
+        # 文件1和文件2同时向下遍历
+        start_index2 = 0
+        last_match_index2 = 0
+    
+        for window_size1 in range(1, min(self.max_paragraph_window, len(normalized_paras1) + 1)):
+            for i in range(len(normalized_paras1) - window_size1 + 1):
+                # 跳过已使用的段落
                 if any(idx in used_paras1 for idx in range(i, i + window_size1)):
                     continue
-                    
-                # 合并文件1中的段落
-                combined_para1 = "".join(meaningful_paras1[i:i+window_size1])
                 
-                # 在文件2中寻找最佳匹配
+                # 合并文件1中的段落(用于匹配的标准化版本)
+                combined_normalized1 = "".join(normalized_paras1[i:i+window_size1])
+                
+                # 合并文件1中的段落(原始版本,用于差异检测)
+                combined_original1 = "".join(original_paras1[i:i+window_size1])
+                
+                # 查找最佳匹配
                 best_match = self._find_best_match_in_paras2_improved(
-                    combined_para1, 
-                    meaningful_paras2[paras2_idx: min(paras2_idx + self.max_paragraph_window, len(meaningful_paras2))], 
-                    paras2_idx
+                    combined_normalized1, 
+                    normalized_paras2,
+                    start_index2,
+                    last_match_index2,
+                    used_paras2
                 )
                 
-                if best_match and best_match['similarity'] >= self.similarity_threshold:
-                    paras2_idx = best_match['indices'][-1] + 1  # 更新文件2的起始索引
+                if best_match and best_match['similarity'] >= self.paragraph_match_threshold:
+                    # 更新搜索位置
+                    matched_indices = best_match['indices']
+                    last_match_index2 = matched_indices[-1]
+                    start_index2 = last_match_index2 + 1
+                
                     # 记录匹配
                     for idx in range(i, i + window_size1):
                         used_paras1.add(idx)
-                    for idx in best_match['indices']:
+                    for idx in matched_indices:
                         used_paras2.add(idx)
-                    
-                    # 只有当相似度明显不同时才记录差异
-                    if best_match['similarity'] < 95.0:  # 提高阈值到95%
+                
+                    # ✅ 获取原始文本(未标准化标点的版本)
+                    combined_original2 = "".join([original_paras2[idx] for idx in matched_indices])
+                
+                    # ✅ 检查标点差异
+                    punctuation_diffs = self.check_punctuation_differences(
+                        combined_original1, 
+                        combined_original2
+                    )
+                
+                    if punctuation_diffs:
+                        # 有标点差异
+                        diff_description = []
+                        for pdiff in punctuation_diffs:
+                            diff_description.append(
+                                f"位置{pdiff['position']}: '{pdiff['char1']}' vs '{pdiff['char2']}' "
+                                f"(上下文: ...{pdiff['context1']}... vs ...{pdiff['context2']}...)"
+                            )
+                        
+                        differences.append({
+                            'type': 'paragraph_punctuation',  # ✅ 新类型
+                            'position': f'段落{i+1}' + (f'-{i+window_size1}' if window_size1 > 1 else ''),
+                            'file1_value': combined_original1,
+                            'file2_value': combined_original2,
+                            'description': f'段落全角半角标点差异: {"; ".join(diff_description)}',
+                            'punctuation_differences': punctuation_diffs,
+                            'similarity': 100.0,  # 内容完全相同
+                            'severity': 'low'
+                        })
+                
+                    elif best_match['similarity'] < self.content_similarity_threshold:
+                        # 内容有差异
                         severity = 'low' if best_match['similarity'] >= 90 else 'medium'
                         differences.append({
                             'type': 'paragraph',
                             'position': f'段落{i+1}' + (f'-{i+window_size1}' if window_size1 > 1 else ''),
-                            'file1_value': combined_para1,
-                            'file2_value': best_match['text'],
-                            'description': f'段落格式差异 (相似度: {best_match["similarity"]:.1f}%)',
+                            'file1_value': combined_original1,
+                            'file2_value': combined_original2,
+                            'description': f'段落内容差异 (相似度: {best_match["similarity"]:.1f}%)',
                             'similarity': best_match['similarity'],
                             'severity': severity
                         })
-                    
-            if paras2_idx >= len(meaningful_paras2):
-                break  # 文件2已全部匹配完,退出
         
-        # 处理未匹配的有意义段落
-        for i, para in enumerate(meaningful_paras1):
+        # 如果文件2已全部匹配完,退出
+        if len(used_paras2) >= len(normalized_paras2):
+            return differences
+    
+        # 处理未匹配的段落
+        for i, para in enumerate(original_paras1):
             if i not in used_paras1:
                 differences.append({
                     'type': 'paragraph',
@@ -310,8 +511,8 @@ class OCRResultComparator:
                     'similarity': 0.0,
                     'severity': 'medium'
                 })
-        
-        for j, para in enumerate(meaningful_paras2):
+    
+        for j, para in enumerate(original_paras2):
             if j not in used_paras2:
                 differences.append({
                     'type': 'paragraph',
@@ -322,39 +523,89 @@ class OCRResultComparator:
                     'similarity': 0.0,
                     'severity': 'medium'
                 })
-        
+    
         return differences
 
+
     def _find_best_match_in_paras2_improved(self, target_text: str, paras2: List[str], 
-                                       paras2_idx: int) -> Dict:
-        """改进的段落匹配方法"""
+                                       start_index: int, last_match_index: int,
+                                       used_paras2: set) -> Dict:
+        """
+        改进的段落匹配方法 - 借鉴 _find_matching_bbox 的窗口查找逻辑
+    
+        Args:
+            target_text: 目标文本(已标准化)
+            paras2: 文件2的段落列表(已标准化)
+            start_index: 起始搜索索引(上次匹配后的下一个位置)
+            last_match_index: 上次匹配成功的索引
+            used_paras2: 已使用的段落索引集合
+    
+        Returns:
+            最佳匹配结果
+        """
+        # ✅ 向前查找窗口(类似 _find_matching_bbox)
+        search_start = last_match_index - 1
+        unused_count = 0
+        
+        # 向前找到 look_ahead_window 个未使用的段落
+        while search_start >= 0:
+            if search_start not in used_paras2:
+                unused_count += 1
+            if unused_count >= self.max_paragraph_window:
+                break
+            search_start -= 1
+        
+        if search_start < 0:
+            search_start = 0
+            # 跳过开头已使用的段落
+            while search_start < start_index and search_start in used_paras2:
+                search_start += 1
+    
+        # 搜索范围:从 search_start 到 start_index + window
+        search_end = min(start_index + self.max_paragraph_window, len(paras2))
+    
         best_match = None
-        
-        for window_size in range(1, len(paras2) + 1):
-            for j in range(len(paras2) - window_size + 1):
+    
+        # ✅ 遍历不同窗口大小
+        for window_size in range(1, self.max_paragraph_window + 1):
+            for j in range(search_start, search_end):
+                # ✅ 跳过已使用的段落
+                if any(idx in used_paras2 for idx in range(j, min(j + window_size, len(paras2)))):
+                    continue
+                
+                # 确保不越界
+                if j + window_size > len(paras2):
+                    break
+                
+                # 合并段落
                 combined_para2 = "".join(paras2[j:j+window_size])
-                similarity = self.calculate_text_similarity(target_text, combined_para2)
-
-                if best_match and best_match['similarity'] == 100.0:
-                    break  # 找到完美匹配,提前退出
                 
+                # 计算相似度
+                if target_text == combined_para2:
+                    similarity = 100.0
+                else:
+                    similarity = self.calculate_text_similarity(target_text, combined_para2)
+                
+                # 更新最佳匹配
                 if not best_match or similarity > best_match['similarity']:
                     best_match = {
                         'text': combined_para2,
                         'similarity': similarity,
-                        'indices': list(range(j + paras2_idx, j + paras2_idx + window_size))
+                        'indices': list(range(j, j + window_size))
                     }
-            if best_match and best_match['similarity'] == 100.0:
-                break  # 找到完美匹配,提前退出
-        
-        # Return empty dict if no match found
+                    
+                    # ✅ 如果找到完美匹配,提前返回
+                    if similarity == 100.0:
+                        return best_match
+    
+        # 如果没有找到匹配,返回空结果
         if best_match is None:
             return {
                 'text': '',
                 'similarity': 0.0,
                 'indices': []
             }
-        
+    
         return best_match
     
     def detect_column_type(self, column_values: List[str]) -> str:
@@ -509,7 +760,7 @@ class OCRResultComparator:
         else:
             # 文本比较
             similarity = self.calculate_text_similarity(v1, v2)
-            if similarity < self.similarity_threshold:
+            if similarity < self.content_similarity_threshold:
                 result['match'] = False
                 result['difference'] = {
                     'type': 'table_text',  # ✅ 文本差异
@@ -1127,8 +1378,8 @@ if __name__ == "__main__":
     else:
         # 测试流水表格对比
         result = compare_ocr_results(
-            file1_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/data_PPStructureV3_Results/A用户_单元格扫描流水_page_001.md',
-            file2_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/mineru-vlm-2.5.3_Results/A用户_单元格扫描流水_page_001.md',
+            file1_path='/Users/zhch158/workspace/data/流水分析/对公_招商银行图/merged_results/对公_招商银行图_page_001.md',
+            file2_path='/Users/zhch158/workspace/data/流水分析/对公_招商银行图/data_DotsOCR_Results/对公_招商银行图_page_001.md',
             output_file=f'./output/flow_list_comparison_{time.strftime("%Y%m%d_%H%M%S")}',
             output_format='both',
             ignore_images=True,