瀏覽代碼

feat: 增强OCR结果比较功能,新增数字和日期时间解析,改进列类型检测与比较逻辑

zhch158_admin 1 月之前
父節點
當前提交
f894ad1f3b
共有 1 個文件被更改,包括 310 次插入165 次删除
  1. 310 165
      compare_ocr_results.py

+ 310 - 165
compare_ocr_results.py

@@ -196,27 +196,253 @@ class OCRResultComparator:
         
         return differences
     
+    def parse_number(self, text: str) -> float:
+        """解析数字,处理千分位和货币符号"""
+        if not text:
+            return 0.0
+        
+        # 移除货币符号和千分位分隔符
+        clean_text = re.sub(r'[¥$€£,,\s]', '', text)
+        
+        # 处理负号
+        is_negative = False
+        if clean_text.startswith('-') or clean_text.startswith('−'):
+            is_negative = True
+            clean_text = clean_text[1:]
+        
+        # 处理括号表示的负数 (123.45) -> -123.45
+        if clean_text.startswith('(') and clean_text.endswith(')'):
+            is_negative = True
+            clean_text = clean_text[1:-1]
+        
+        try:
+            number = float(clean_text)
+            return -number if is_negative else number
+        except ValueError:
+            return 0.0
+
+    def extract_datetime(self, text: str) -> str:
+        """提取并标准化日期时间"""
+        # 尝试匹配各种日期时间格式
+        patterns = [
+            (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})\s*(\d{1,2}):(\d{1,2}):(\d{1,2})', 
+            lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)} {m.group(4).zfill(2)}:{m.group(5).zfill(2)}:{m.group(6).zfill(2)}"),
+            (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})', 
+            lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"),
+            (r'(\d{4})年(\d{1,2})月(\d{1,2})日', 
+            lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"),
+        ]
+        
+        for pattern, formatter in patterns:
+            match = re.search(pattern, text)
+            if match:
+                return formatter(match)
+        
+        return text
+
     def is_numeric(self, text: str) -> bool:
-        """判断文本是否为数字"""
+        """判断文本是否为数字 - 改进版:区分数值和长数字字符串"""
+        """>15位的数字字符串视为文本型数字"""
         if not text:
             return False
-        # 移除千分位分隔符和负号
-        clean_text = re.sub(r'[,,-]', '', text)
+        
+        # 移除千分位分隔符、空格和负号
+        clean_text = re.sub(r'[,,\s-]', '', text)
+        
+        # ✅ 新增:长数字字符串判断(超过15位,认为是文本型数字)
+        if len(clean_text) > 15:
+            return False
+        
         try:
             float(clean_text)
             return True
         except ValueError:
             return False
     
-    def parse_number(self, text: str) -> float:
-        """解析数字"""
+    def is_text_number(self, text: str) -> bool:
+        """
+        判断是否为文本型数字(如账号、订单号、流水号)
+        
+        特征:
+        1. 长度超过15位的纯数字
+        2. 或者包含空格/连字符的数字序列
+        """
         if not text:
-            return 0.0
-        clean_text = re.sub(r'[,,]', '', text)
-        try:
-            return float(clean_text)
-        except ValueError:
-            return 0.0
+            return False
+        
+        # 移除空格和连字符
+        clean_text = re.sub(r'[\s-]', '', text)
+        
+        # 检查是否为纯数字且长度超过15位
+        if clean_text.isdigit() and len(clean_text) > 15:
+            return True
+        
+        # 检查是否为带空格/连字符的数字序列
+        if re.match(r'^[\d\s-]+$', text) and len(clean_text) > 10:
+            return True
+        
+        return False
+
+    def detect_column_type(self, column_values: List[str]) -> str:
+        """检测列的数据类型 - 改进版:区分数值和文本型数字"""
+        if not column_values:
+            return 'text'
+        
+        # 过滤空值, 如果只有1个代表空值的字符,如:"/"、"-",也视为空值
+        non_empty_values = [v for v in column_values if v and v.strip() and v not in ['/', '-']]
+        if not non_empty_values:
+            return 'text'
+        
+        # ✅ 优先检测文本型数字(账号、订单号等)
+        text_number_count = 0
+        for value in non_empty_values[:5]:
+            if self.is_text_number(value):
+                text_number_count += 1
+        
+        if text_number_count >= len(non_empty_values[:5]) * 0.6:
+            return 'text'  # ✅ 新增类型
+        
+        # 检测是否为日期时间
+        datetime_patterns = [
+            r'\d{4}[-/]\d{1,2}[-/]\d{1,2}',  # YYYY-MM-DD
+            r'\d{4}[-/]\d{1,2}[-/]\d{1,2}\s*\d{1,2}:\d{1,2}:\d{1,2}',  # YYYY-MM-DD HH:MM:SS
+            r'\d{4}年\d{1,2}月\d{1,2}日',  # 中文日期
+        ]
+        
+        datetime_count = 0
+        for value in non_empty_values[:5]:
+            for pattern in datetime_patterns:
+                if re.search(pattern, value):
+                    datetime_count += 1
+                    break
+        
+        if datetime_count >= len(non_empty_values[:5]) * 0.6:
+            return 'datetime'
+        
+        # 检测是否为数字/金额(短数字)
+        numeric_count = 0
+        for value in non_empty_values[:5]:
+            if self.is_numeric(value) and not self.is_text_number(value):
+                numeric_count += 1
+        
+        if numeric_count >= len(non_empty_values[:5]) * 0.6:
+            return 'numeric'
+        
+        # 默认为文本
+        return 'text'
+    
+    def normalize_text_number(self, text: str) -> str:
+        """
+        标准化文本型数字:移除空格和连字符
+        
+        Args:
+            text: 原始文本
+        
+        Returns:
+            标准化后的文本
+        """
+        if not text:
+            return ""
+        
+        # 移除空格、连字符、全角空格
+        text = re.sub(r'[\s\-\u3000]', '', text)
+        
+        return text
+
+    def compare_cell_value(self, value1: str, value2: str, column_type: str, 
+                      column_name: str = '') -> Dict:
+        """比较单元格值 - 改进版:支持文本型数字"""
+        result = {
+            'match': True,
+            'difference': None
+        }
+        
+        # 标准化值
+        v1 = self.normalize_text(value1)
+        v2 = self.normalize_text(value2)
+        
+        if v1 == v2:
+            return result
+        
+        # ✅ 新增:文本型数字比较
+        if column_type == 'text_number':
+            # 标准化后比较(移除空格和连字符)
+            norm_v1 = self.normalize_text_number(v1)
+            norm_v2 = self.normalize_text_number(v2)
+            
+            if norm_v1 == norm_v2:
+                # 内容相同,只是格式不同(空格差异)
+                result['match'] = False
+                result['difference'] = {
+                    'type': 'table_text',
+                    'value1': value1,
+                    'value2': value2,
+                    'description': f'文本型数字格式差异: "{value1}" vs "{value2}" (内容相同,空格不同)',
+                    'severity': 'low'
+                }
+            else:
+                # 内容不同
+                result['match'] = False
+                result['difference'] = {
+                    'type': 'table_text',
+                    'value1': value1,
+                    'value2': value2,
+                    'description': f'文本型数字不一致: {value1} vs {value2}',
+                    'severity': 'high'
+                }
+            return result
+        
+        # 根据列类型采用不同的比较策略
+        if column_type == 'numeric':
+            # 数字/金额比较
+            if self.is_numeric(v1) and self.is_numeric(v2):
+                num1 = self.parse_number(v1)  # ✅ 使用 parse_number
+                num2 = self.parse_number(v2)
+                if abs(num1 - num2) > 0.01:  # 允许0.01的误差
+                    result['match'] = False
+                    result['difference'] = {
+                        'type': 'table_amount',
+                        'value1': value1,
+                        'value2': value2,
+                        'diff_amount': abs(num1 - num2),
+                        'description': f'金额不一致: {value1} vs {value2}'
+                    }
+            else:
+                # 虽然检测为 numeric,但实际是长数字,按文本比较
+                result['match'] = False
+                result['difference'] = {
+                    'type': 'table_text',
+                    'value1': value1,
+                    'value2': value2,
+                    'description': f'长数字字符串不一致: {value1} vs {value2}'
+                }
+        elif column_type == 'datetime':
+            # 日期时间比较
+            datetime1 = self.extract_datetime(v1)  # ✅ 使用 extract_datetime
+            datetime2 = self.extract_datetime(v2)
+            
+            if datetime1 != datetime2:
+                result['match'] = False
+                result['difference'] = {
+                    'type': 'table_datetime',
+                    'value1': value1,
+                    'value2': value2,
+                    'description': f'日期时间不一致: {value1} vs {value2}'
+                }
+        else:
+            # 文本比较
+            similarity = self.calculate_text_similarity(v1, v2)
+            if similarity < self.content_similarity_threshold:
+                result['match'] = False
+                result['difference'] = {
+                    'type': 'table_text',
+                    'value1': value1,
+                    'value2': value2,
+                    'similarity': similarity,
+                    'description': f'文本不一致: {value1} vs {value2} (相似度: {similarity:.1f}%)'
+                }
+        
+        return result
     
     def calculate_text_similarity(self, text1: str, text2: str) -> float:
         """改进的相似度计算"""
@@ -608,45 +834,6 @@ class OCRResultComparator:
     
         return best_match
     
-    def detect_column_type(self, column_values: List[str]) -> str:
-        """检测列的数据类型"""
-        if not column_values:
-            return 'text'
-        
-        # 过滤空值
-        non_empty_values = [v for v in column_values if v and v.strip()]
-        if not non_empty_values:
-            return 'text'
-        
-        # 检测是否为日期时间
-        datetime_patterns = [
-            r'\d{4}[-/]\d{1,2}[-/]\d{1,2}',  # YYYY-MM-DD
-            r'\d{4}[-/]\d{1,2}[-/]\d{1,2}\s*\d{1,2}:\d{1,2}:\d{1,2}',  # YYYY-MM-DD HH:MM:SS
-            r'\d{4}年\d{1,2}月\d{1,2}日',  # 中文日期
-        ]
-        
-        datetime_count = 0
-        for value in non_empty_values[:5]:  # 检查前5个值
-            for pattern in datetime_patterns:
-                if re.search(pattern, value):
-                    datetime_count += 1
-                    break
-        
-        if datetime_count >= len(non_empty_values[:5]) * 0.6:
-            return 'datetime'
-        
-        # 检测是否为数字/金额
-        numeric_count = 0
-        for value in non_empty_values[:5]:
-            if self.is_numeric(value):
-                numeric_count += 1
-        
-        if numeric_count >= len(non_empty_values[:5]) * 0.6:
-            return 'numeric'
-        
-        # 默认为文本
-        return 'text'
-    
     def normalize_header_text(self, text: str) -> str:
         """标准化表头文本"""
         # 移除括号及其内容
@@ -704,93 +891,6 @@ class OCRResultComparator:
         
         return result
     
-    def compare_cell_value(self, value1: str, value2: str, column_type: str, 
-                          column_name: str = '') -> Dict:
-        """比较单元格值 - 统一错误类型"""
-        result = {
-            'match': True,
-            'difference': None
-        }
-        
-        # 标准化值
-        v1 = self.normalize_text(value1)
-        v2 = self.normalize_text(value2)
-        
-        if v1 == v2:
-            return result
-        
-        # 根据列类型采用不同的比较策略
-        if column_type == 'numeric':
-            # 数字/金额比较
-            if self.is_numeric(v1) and self.is_numeric(v2):
-                num1 = self.parse_number(v1)
-                num2 = self.parse_number(v2)
-                if abs(num1 - num2) > 0.01:  # 允许0.01的误差
-                    result['match'] = False
-                    result['difference'] = {
-                        'type': 'table_amount',  # ✅ 统一类型
-                        'value1': value1,
-                        'value2': value2,
-                        'diff_amount': abs(num1 - num2),
-                        'description': f'金额不一致: {value1} vs {value2}'
-                    }
-            else:
-                result['match'] = False
-                result['difference'] = {
-                    'type': 'table_amount',  # ✅ 格式错误也算金额差异
-                    'value1': value1,
-                    'value2': value2,
-                    'description': f'数字格式错误: {value1} vs {value2}'
-                }
-        
-        elif column_type == 'datetime':
-            # 日期时间比较
-            datetime1 = self.extract_datetime(v1)
-            datetime2 = self.extract_datetime(v2)
-            
-            if datetime1 != datetime2:
-                result['match'] = False
-                result['difference'] = {
-                    'type': 'table_datetime',  # ✅ 日期时间类型
-                    'value1': value1,
-                    'value2': value2,
-                    'description': f'日期时间不一致: {value1} vs {value2}'
-                }
-        
-        else:
-            # 文本比较
-            similarity = self.calculate_text_similarity(v1, v2)
-            if similarity < self.content_similarity_threshold:
-                result['match'] = False
-                result['difference'] = {
-                    'type': 'table_text',  # ✅ 文本差异
-                    'value1': value1,
-                    'value2': value2,
-                    'similarity': similarity,
-                    'description': f'文本不一致: {value1} vs {value2} (相似度: {similarity:.1f}%)'
-                }
-        
-        return result
-    
-    def extract_datetime(self, text: str) -> str:
-        """提取并标准化日期时间"""
-        # 尝试匹配各种日期时间格式
-        patterns = [
-            (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})\s*(\d{1,2}):(\d{1,2}):(\d{1,2})', 
-             lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)} {m.group(4).zfill(2)}:{m.group(5).zfill(2)}:{m.group(6).zfill(2)}"),
-            (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})', 
-             lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"),
-            (r'(\d{4})年(\d{1,2})月(\d{1,2})日', 
-             lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"),
-        ]
-        
-        for pattern, formatter in patterns:
-            match = re.search(pattern, text)
-            if match:
-                return formatter(match)
-        
-        return text
-    
     def detect_table_header_row(self, table: List[List[str]]) -> int:
         """
         智能检测表格的表头行索引
@@ -960,27 +1060,67 @@ class OCRResultComparator:
             column_types2.append(col_type)
             print(f"   文件2列 {col_idx + 1} ({headers2[col_idx]}): {col_type}")
         
-        # ✅ 新增:检查列类型是否一致
-        column_types_match = column_types1 == column_types2
-        
-        if not column_types_match:
-            print(f"\n⚠️  列类型存在差异,不再比较单元格内容...")
-            for col_idx in range(min(len(column_types1), len(column_types2))):
-                if column_types1[col_idx] != column_types2[col_idx]:
-                    differences.append({
-                        'type': 'table_header_critical',
-                        'position': f'第{col_idx + 1}列',
-                        'file1_value': f'{headers1[col_idx]} ({column_types1[col_idx]})',
-                        'file2_value': f'{headers2[col_idx]} ({column_types2[col_idx]})',
-                        'description': f'列类型不一致: {column_types1[col_idx]} vs {column_types2[col_idx]}',
-                        'severity': 'critical',
-                        'column_index': col_idx
-                    })
-            return differences
+        # ✅ 改进:统计列类型差异,只有超过阈值才停止比较
+        mismatched_columns = []
+        for col_idx in range(min(len(column_types1), len(column_types2))):
+            if column_types1[col_idx] != column_types2[col_idx]:
+                mismatched_columns.append(col_idx)
+                differences.append({
+                    'type': 'table_column_type_mismatch',  # ✅ 新类型,区别于 critical
+                    'position': f'第{col_idx + 1}列',
+                    'file1_value': f'{headers1[col_idx]} ({column_types1[col_idx]})',
+                    'file2_value': f'{headers2[col_idx]} ({column_types2[col_idx]})',
+                    'description': f'列类型不一致: {column_types1[col_idx]} vs {column_types2[col_idx]}',
+                    'severity': 'high',
+                    'column_index': col_idx
+                })
         
-        # ✅ 使用两个文件中更准确的列类型(优先使用数据更多的文件)
-        column_types = column_types1  # 默认使用文件1的列类型
+        # ✅ 计算列类型差异比例
+        total_columns = min(len(column_types1), len(column_types2))
+        mismatch_ratio = len(mismatched_columns) / total_columns if total_columns > 0 else 0
         
+        # ✅ 只有当差异比例超过50%时才停止比较
+        if mismatch_ratio > 0.5:
+            print(f"\n⚠️  列类型差异过大 ({len(mismatched_columns)}/{total_columns} = {mismatch_ratio:.1%}),不再比较单元格内容...")
+            # 添加一个汇总差异
+            differences.append({
+                'type': 'table_header_critical',
+                'position': '表格列类型',
+                'file1_value': f'{len(mismatched_columns)}列类型不一致',
+                'file2_value': f'共{total_columns}列',
+                'description': f'列类型差异过大: {len(mismatched_columns)}/{total_columns}列不匹配 ({mismatch_ratio:.1%})',
+                'severity': 'critical'
+            })
+            return differences
+        elif mismatched_columns:
+            print(f"\n⚠️  检测到 {len(mismatched_columns)} 列类型差异,但仍继续比较单元格...")
+            print(f"   不匹配的列: {[col_idx + 1 for col_idx in mismatched_columns]}")
+    
+        # ✅ 为每列选择更合适的类型(优先使用数据更丰富的文件)
+        column_types = []
+        for col_idx in range(max(len(column_types1), len(column_types2))):
+            if col_idx >= len(column_types1):
+                column_types.append(column_types2[col_idx])
+            elif col_idx >= len(column_types2):
+                column_types.append(column_types1[col_idx])
+            elif col_idx in mismatched_columns:
+                # ✅ 对于类型不一致的列,选择更通用的类型
+                type1 = column_types1[col_idx]
+                type2 = column_types2[col_idx]
+                
+                # 类型优先级: text > text_number > numeric/datetime
+                if type1 == 'text' or type2 == 'text':
+                    column_types.append('text')
+                elif type1 == 'text_number' or type2 == 'text_number':
+                    column_types.append('text_number')
+                else:
+                    # 默认使用文件1的类型
+                    column_types.append(type1)
+                
+                print(f"   📝 第{col_idx + 1}列类型冲突,使用通用类型: {column_types[-1]}")
+            else:
+                column_types.append(column_types1[col_idx])
+    
         # 第五步:逐行比较数据
         data_rows1 = table1[header_row_idx1 + 1:]
         data_rows2 = table2[header_row_idx2 + 1:]
@@ -1032,16 +1172,21 @@ class OCRResultComparator:
                 if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2:
                     continue
                 
-                # ✅ 使用对应的列类型
+                # ✅ 使用合并后的列类型
                 column_type = column_types[col_idx] if col_idx < len(column_types) else 'text'
                 
-                # ✅ 获取列名(如果表头不匹配,显示两个表头)
+                # ✅ 获取列名
                 if header_result['match']:
                     column_name = headers1[col_idx] if col_idx < len(headers1) else f'列{col_idx + 1}'
                 else:
                     col_name1 = headers1[col_idx] if col_idx < len(headers1) else f'列{col_idx + 1}'
                     col_name2 = headers2[col_idx] if col_idx < len(headers2) else f'列{col_idx + 1}'
                     column_name = f"{col_name1}/{col_name2}"
+            
+                # ✅ 如果该列类型不匹配,在描述中标注
+                type_mismatch_note = ""
+                if col_idx in mismatched_columns:
+                    type_mismatch_note = f" [列类型冲突: {column_types1[col_idx]} vs {column_types2[col_idx]}]"
                 
                 compare_result = self.compare_cell_value(cell1, cell2, column_type, column_name)
                 
@@ -1054,18 +1199,18 @@ class OCRResultComparator:
                         'position': f'第{actual_row_num}行第{col_idx + 1}列',
                         'file1_value': diff_info['value1'],
                         'file2_value': diff_info['value2'],
-                        'description': diff_info['description'],
-                        'severity': 'medium',
+                        'description': diff_info['description'] + type_mismatch_note,  # ✅ 添加类型冲突标注
+                        'severity': 'high' if col_idx in mismatched_columns else 'medium',  # ✅ 类型冲突的单元格提高严重度
                         'row_index': actual_row_num,
                         'col_index': col_idx,
                         'column_name': column_name,
                         'column_type': column_type,
-                        # 保留额外信息
+                        'column_type_mismatch': col_idx in mismatched_columns,  # ✅ 新增字段
                         **{k: v for k, v in diff_info.items() if k not in ['type', 'value1', 'value2', 'description']}
                     })
                     
-                    print(f"   ⚠️  第{actual_row_num}行第{col_idx + 1}列({column_name}): {diff_info['description']}")
-        
+                    print(f"   ⚠️  第{actual_row_num}行第{col_idx + 1}列({column_name}): {diff_info['description']}{type_mismatch_note}")
+    
         print(f"\n✅ 流水表格对比完成,发现 {len(differences)} 个差异")
         
         return differences
@@ -1378,8 +1523,8 @@ if __name__ == "__main__":
     else:
         # 测试流水表格对比
         result = compare_ocr_results(
-            file1_path='/Users/zhch158/workspace/data/流水分析/对公_招商银行图/merged_results/对公_招商银行图_page_001.md',
-            file2_path='/Users/zhch158/workspace/data/流水分析/对公_招商银行图/data_DotsOCR_Results/对公_招商银行图_page_001.md',
+            file1_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/merged_results/A用户_单元格扫描流水_page_005.md',
+            file2_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/data_DotsOCR_Results/A用户_单元格扫描流水_page_005.md',
             output_file=f'./output/flow_list_comparison_{time.strftime("%Y%m%d_%H%M%S")}',
             output_format='both',
             ignore_images=True,