import os from typing import Dict, List, Tuple from datetime import datetime try: from .content_extractor import ContentExtractor from .table_comparator import TableComparator from .paragraph_comparator import ParagraphComparator except ImportError: from content_extractor import ContentExtractor from table_comparator import TableComparator from paragraph_comparator import ParagraphComparator class OCRResultComparator: """OCR结果比较器主类""" def __init__(self): self.content_extractor = ContentExtractor() self.table_comparator = TableComparator() self.paragraph_comparator = ParagraphComparator() self.differences = [] self.paragraph_match_threshold = 80 self.content_similarity_threshold = 95 self.max_paragraph_window = 6 self.table_comparison_mode = 'standard' self.header_similarity_threshold = 90 def compare_files(self, file1_path: str, file2_path: str) -> Dict: """比较两个OCR结果文件""" print(f"\n📖 读取文件...") # 读取文件内容 with open(file1_path, 'r', encoding='utf-8') as f: content1 = f.read() with open(file2_path, 'r', encoding='utf-8') as f: content2 = f.read() print(f"✅ 文件读取完成") print(f" 文件1大小: {len(content1)} 字符") print(f" 文件2大小: {len(content2)} 字符") # ✅ 提取结构化内容(包含位置信息) print(f"\n📊 提取结构化内容...") structured_content1 = self.content_extractor.extract_structured_content(content1) structured_content2 = self.content_extractor.extract_structured_content(content2) print(f" 文件1: {len(structured_content1['tables'])}个表格, {len(structured_content1['paragraph_blocks'])}个段落块") print(f" 文件2: {len(structured_content2['tables'])}个表格, {len(structured_content2['paragraph_blocks'])}个段落块") # 初始化差异列表 all_differences = [] # ✅ 智能表格匹配与比较 print(f"\n🔍 开始表格智能匹配...") tables1 = structured_content1['tables'] tables2 = structured_content2['tables'] # 记录匹配的表格对 table_matches = [] if tables1 and tables2: # 找到匹配的表格对 table_matches = self.table_comparator.find_matching_tables( [t['data'] for t in tables1], [t['data'] for t in tables2] ) if not table_matches: print(f" ⚠️ 未找到匹配的表格") all_differences.append({ 'type': 'table_structure', 'position': '表格匹配', 'file1_value': f'{len(tables1)}个表格', 'file2_value': f'{len(tables2)}个表格', 'description': '未找到可匹配的表格', 'severity': 'high' }) else: # 比较每对匹配的表格 for idx1, idx2, similarity in table_matches: print(f"\n 📋 对比匹配的表格: 表格{idx1+1} vs 表格{idx2+1}") if self.table_comparison_mode == 'flow_list': table_diffs = self.table_comparator.compare_table_flow_list( tables1[idx1]['data'], tables2[idx2]['data'] ) else: table_diffs = self.table_comparator.compare_tables( tables1[idx1]['data'], tables2[idx2]['data'] ) # 为每个差异添加表格标识 for diff in table_diffs: diff['table_pair'] = f'表格{idx1+1}↔表格{idx2+1}' diff['table_similarity'] = similarity all_differences.extend(table_diffs) print(f" 发现 {len(table_diffs)} 个差异") # 检查未匹配的表格 matched_tables1 = {m[0] for m in table_matches} matched_tables2 = {m[1] for m in table_matches} for i in range(len(tables1)): if i not in matched_tables1: all_differences.append({ 'type': 'table_unmatched', 'position': f'文件1表格{i+1}', 'file1_value': f'表格{i+1} (无匹配)', 'file2_value': '', 'description': f'文件1的表格{i+1}在文件2中无匹配表格', 'severity': 'medium' }) for j in range(len(tables2)): if j not in matched_tables2: all_differences.append({ 'type': 'table_unmatched', 'position': f'文件2表格{j+1}', 'file1_value': '', 'file2_value': f'表格{j+1} (无匹配)', 'description': f'文件2的表格{j+1}在文件1中无匹配表格', 'severity': 'medium' }) elif tables1 and not tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': f'包含{len(tables1)}个表格', 'file2_value': '无表格', 'description': '文件1包含表格但文件2无表格', 'severity': 'high' }) elif not tables1 and tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': '无表格', 'file2_value': f'包含{len(tables2)}个表格', 'description': '文件2包含表格但文件1无表格', 'severity': 'high' }) # ✅ 根据表格匹配结果对齐段落块 print(f"\n🔍 开始段落对比(基于表格位置对齐)...") paragraph_blocks1 = structured_content1['paragraph_blocks'] paragraph_blocks2 = structured_content2['paragraph_blocks'] # ✅ 构建段落块对应关系 aligned_blocks = self._align_paragraph_blocks( paragraph_blocks1, paragraph_blocks2, tables1, tables2, table_matches ) for block_pair in aligned_blocks: block1 = block_pair['block1'] block2 = block_pair['block2'] position_desc = block_pair['position'] paragraphs1 = block1['paragraphs'] if block1 else [] paragraphs2 = block2['paragraphs'] if block2 else [] if not paragraphs1 and not paragraphs2: continue print(f" 📦 {position_desc}: 文件1有{len(paragraphs1)}个段落, 文件2有{len(paragraphs2)}个段落") # 每个段落块独立对比,指针重新初始化 block_diffs = self.paragraph_comparator.compare_paragraphs( paragraphs1, paragraphs2 ) # 为每个差异添加段落块标识 for diff in block_diffs: diff['paragraph_block'] = position_desc all_differences.extend(block_diffs) total_paragraph_diffs = len([d for d in all_differences if d['type'] == 'paragraph']) print(f"✅ 段落对比完成,共发现 {total_paragraph_diffs} 个差异") print(f"\n✅ 对比完成") # 统计差异 stats = { 'total_differences': len(all_differences), 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]), 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']), 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']), 'datetime_differences': len([d for d in all_differences if d['type'] == 'table_datetime']), 'text_differences': len([d for d in all_differences if d['type'] == 'table_text']), 'table_pre_header': len([d for d in all_differences if d['type'] == 'table_pre_header']), 'table_header_mismatch': len([d for d in all_differences if d['type'] == 'table_header_mismatch']), 'table_header_critical': len([d for d in all_differences if d['type'] == 'table_header_critical']), 'table_header_position': len([d for d in all_differences if d['type'] == 'table_header_position']), 'table_row_missing': len([d for d in all_differences if d['type'] == 'table_row_missing']), 'high_severity': len([d for d in all_differences if d.get('severity') in ['critical', 'high']]), 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']), 'low_severity': len([d for d in all_differences if d.get('severity') == 'low']) } # ✅ 构建返回结果 result = { 'differences': all_differences, 'statistics': stats, 'file1_tables': len(tables1), 'file2_tables': len(tables2), 'file1_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks1), 'file2_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks2), 'file1_path': file1_path, 'file2_path': file2_path, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } print(f"\n" + "="*60) print(f"📊 对比结果汇总") print(f"="*60) print(f"总差异数: {result['statistics']['total_differences']}") print(f" - 段落差异: {result['statistics']['paragraph_differences']}") print(f" - 表格差异: {result['statistics']['table_differences']}") print(f" - 金额: {result['statistics']['amount_differences']}") print(f" - 日期: {result['statistics']['datetime_differences']}") print(f" - 文本: {result['statistics']['text_differences']}") print(f"\n严重级别分布:") print(f" 🔴 高: {result['statistics']['high_severity']}") print(f" 🟡 中: {result['statistics']['medium_severity']}") print(f" 🟢 低: {result['statistics']['low_severity']}") print(f"="*60) return result def _align_paragraph_blocks(self, blocks1: List[Dict], blocks2: List[Dict], tables1: List[Dict], tables2: List[Dict], table_matches: List[Tuple[int, int, float]]) -> List[Dict]: """ 根据表格位置对齐段落块 Returns: [ {'block1': dict, 'block2': dict, 'position': str}, ... ] """ aligned = [] # 如果没有表格,直接对比所有段落块 if not tables1 and not tables2: max_blocks = max(len(blocks1), len(blocks2)) for i in range(max_blocks): aligned.append({ 'block1': blocks1[i] if i < len(blocks1) else None, 'block2': blocks2[i] if i < len(blocks2) else None, 'position': f'段落块{i+1}' }) return aligned # 构建表格索引映射 table_map = {idx1: idx2 for idx1, idx2, _ in table_matches} # ✅ 策略:根据表格位置划分段落块 # 1. 第一个表格前的段落块 # 2. 每对匹配表格之间的段落块 # 3. 最后一个表格后的段落块 # 第一个表格前的段落块 if blocks1 or blocks2: first_table_idx1 = min(table_map.keys()) if table_map else len(blocks1) first_table_idx2 = min(table_map.values()) if table_map else len(blocks2) # 找到第一个表格前的所有段落块 pre_blocks1 = [b for b in blocks1 if b['end_pos'] <= (tables1[first_table_idx1]['start_pos'] if first_table_idx1 < len(tables1) else float('inf'))] pre_blocks2 = [b for b in blocks2 if b['end_pos'] <= (tables2[first_table_idx2]['start_pos'] if first_table_idx2 < len(tables2) else float('inf'))] if pre_blocks1 or pre_blocks2: # 合并所有表格前的段落 merged_block1 = self._merge_paragraph_blocks(pre_blocks1) if pre_blocks1 else None merged_block2 = self._merge_paragraph_blocks(pre_blocks2) if pre_blocks2 else None aligned.append({ 'block1': merged_block1, 'block2': merged_block2, 'position': '文档开头(表格前)' }) # 每对匹配表格之间的段落块 sorted_matches = sorted(table_matches, key=lambda x: x[0]) for i, (idx1, idx2, _) in enumerate(sorted_matches): # 当前表格后、下一个表格前的段落块 table1_end = tables1[idx1]['end_pos'] table2_end = tables2[idx2]['end_pos'] # 下一个表格的开始位置 if i + 1 < len(sorted_matches): next_idx1 = sorted_matches[i + 1][0] next_idx2 = sorted_matches[i + 1][1] next_table1_start = tables1[next_idx1]['start_pos'] next_table2_start = tables2[next_idx2]['start_pos'] else: next_table1_start = float('inf') next_table2_start = float('inf') # 找到这个范围内的段落块 between_blocks1 = [b for b in blocks1 if b['start_pos'] >= table1_end and b['end_pos'] <= next_table1_start] between_blocks2 = [b for b in blocks2 if b['start_pos'] >= table2_end and b['end_pos'] <= next_table2_start] if between_blocks1 or between_blocks2: merged_block1 = self._merge_paragraph_blocks(between_blocks1) if between_blocks1 else None merged_block2 = self._merge_paragraph_blocks(between_blocks2) if between_blocks2 else None aligned.append({ 'block1': merged_block1, 'block2': merged_block2, 'position': f'表格{idx1+1}↔表格{idx2+1} 之后' }) return aligned def _merge_paragraph_blocks(self, blocks: List[Dict]) -> Dict: """合并多个段落块为一个""" if not blocks: return None if len(blocks) == 1: return blocks[0] all_paragraphs = [] for block in blocks: all_paragraphs.extend(block['paragraphs']) return { 'start_pos': blocks[0]['start_pos'], 'end_pos': blocks[-1]['end_pos'], 'paragraphs': all_paragraphs }