import os from typing import Dict from datetime import datetime try: from .content_extractor import ContentExtractor from .table_comparator import TableComparator from .paragraph_comparator import ParagraphComparator except ImportError: from content_extractor import ContentExtractor from table_comparator import TableComparator from paragraph_comparator import ParagraphComparator class OCRResultComparator: """OCR结果比较器主类""" def __init__(self): self.content_extractor = ContentExtractor() self.table_comparator = TableComparator() self.paragraph_comparator = ParagraphComparator() self.differences = [] self.paragraph_match_threshold = 80 self.content_similarity_threshold = 95 self.max_paragraph_window = 6 self.table_comparison_mode = 'standard' self.header_similarity_threshold = 90 def compare_files(self, file1_path: str, file2_path: str) -> Dict: """比较两个OCR结果文件""" print(f"\n📖 读取文件...") # 读取文件内容 with open(file1_path, 'r', encoding='utf-8') as f: content1 = f.read() with open(file2_path, 'r', encoding='utf-8') as f: content2 = f.read() print(f"✅ 文件读取完成") print(f" 文件1大小: {len(content1)} 字符") print(f" 文件2大小: {len(content2)} 字符") # 提取表格 print(f"\n📊 提取表格...") tables1 = self.content_extractor.extract_table_data(content1) tables2 = self.content_extractor.extract_table_data(content2) print(f" 文件1表格数: {len(tables1)}") print(f" 文件2表格数: {len(tables2)}") # 提取段落 print(f"\n📝 提取段落...") paragraphs1 = self.content_extractor.extract_paragraphs(content1) paragraphs2 = self.content_extractor.extract_paragraphs(content2) print(f" 文件1段落数: {len(paragraphs1)}") print(f" 文件2段落数: {len(paragraphs2)}") # 比较段落 print(f"\n🔍 开始段落对比...") paragraph_differences = self.paragraph_comparator.compare_paragraphs( paragraphs1, paragraphs2 ) print(f"✅ 段落对比完成,发现 {len(paragraph_differences)} 个差异") # 初始化所有差异列表 all_differences = [] all_differences.extend(paragraph_differences) # ✅ 智能表格匹配与比较 print(f"\n🔍 开始表格智能匹配...") if tables1 and tables2: # 找到匹配的表格对 table_matches = self.table_comparator.find_matching_tables(tables1, tables2) if not table_matches: print(f" ⚠️ 未找到匹配的表格") all_differences.append({ 'type': 'table_structure', 'position': '表格匹配', 'file1_value': f'{len(tables1)}个表格', 'file2_value': f'{len(tables2)}个表格', 'description': '未找到可匹配的表格', 'severity': 'high' }) else: # 比较每对匹配的表格 for idx1, idx2, similarity in table_matches: print(f"\n 📋 对比匹配的表格: 表格{idx1+1} vs 表格{idx2+1}") if self.table_comparison_mode == 'flow_list': table_diffs = self.table_comparator.compare_table_flow_list( tables1[idx1], tables2[idx2] ) else: table_diffs = self.table_comparator.compare_tables( tables1[idx1], tables2[idx2] ) # 为每个差异添加表格标识 for diff in table_diffs: diff['table_pair'] = f'表格{idx1+1}↔表格{idx2+1}' diff['table_similarity'] = similarity all_differences.extend(table_diffs) print(f" 发现 {len(table_diffs)} 个差异") # 检查未匹配的表格 matched_tables1 = {m[0] for m in table_matches} matched_tables2 = {m[1] for m in table_matches} for i in range(len(tables1)): if i not in matched_tables1: all_differences.append({ 'type': 'table_unmatched', 'position': f'文件1表格{i+1}', 'file1_value': f'表格{i+1} (无匹配)', 'file2_value': '', 'description': f'文件1的表格{i+1}在文件2中无匹配表格', 'severity': 'medium' }) for j in range(len(tables2)): if j not in matched_tables2: all_differences.append({ 'type': 'table_unmatched', 'position': f'文件2表格{j+1}', 'file1_value': '', 'file2_value': f'表格{j+1} (无匹配)', 'description': f'文件2的表格{j+1}在文件1中无匹配表格', 'severity': 'medium' }) elif tables1 and not tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': f'包含{len(tables1)}个表格', 'file2_value': '无表格', 'description': '文件1包含表格但文件2无表格', 'severity': 'high' }) elif not tables1 and tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': '无表格', 'file2_value': f'包含{len(tables2)}个表格', 'description': '文件2包含表格但文件1无表格', 'severity': 'high' }) print(f"\n✅ 对比完成") # ✅ 统计差异 - 细化分类(与原版本保持一致) stats = { 'total_differences': len(all_differences), 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]), 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']), 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']), 'datetime_differences': len([d for d in all_differences if d['type'] == 'table_datetime']), 'text_differences': len([d for d in all_differences if d['type'] == 'table_text']), 'table_pre_header': len([d for d in all_differences if d['type'] == 'table_pre_header']), 'table_header_mismatch': len([d for d in all_differences if d['type'] == 'table_header_mismatch']), 'table_header_critical': len([d for d in all_differences if d['type'] == 'table_header_critical']), 'table_header_position': len([d for d in all_differences if d['type'] == 'table_header_position']), 'table_row_missing': len([d for d in all_differences if d['type'] == 'table_row_missing']), 'high_severity': len([d for d in all_differences if d.get('severity') in ['critical', 'high']]), 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']), 'low_severity': len([d for d in all_differences if d.get('severity') == 'low']) } # ✅ 构建返回结果 - 与原版本结构保持完全一致 result = { 'differences': all_differences, # ✅ 原版本使用 differences 而非 paragraph_differences 'statistics': stats, 'file1_tables': len(tables1), 'file2_tables': len(tables2), 'file1_paragraphs': len(paragraphs1), 'file2_paragraphs': len(paragraphs2), 'file1_path': file1_path, 'file2_path': file2_path, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') # ✅ 添加时间戳 } print(f"\n" + "="*60) print(f"📊 对比结果汇总") print(f"="*60) print(f"总差异数: {result['statistics']['total_differences']}") print(f" - 段落差异: {result['statistics']['paragraph_differences']}") print(f" - 表格差异: {result['statistics']['table_differences']}") print(f" - 金额: {result['statistics']['amount_differences']}") print(f" - 日期: {result['statistics']['datetime_differences']}") print(f" - 文本: {result['statistics']['text_differences']}") print(f"\n严重级别分布:") print(f" 🔴 高: {result['statistics']['high_severity']}") print(f" 🟡 中: {result['statistics']['medium_severity']}") print(f" 🟢 低: {result['statistics']['low_severity']}") print(f"="*60) return result