| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- import os
- from typing import Dict, List, Tuple
- from datetime import datetime
- try:
- from .content_extractor import ContentExtractor
- from .table_comparator import TableComparator
- from .paragraph_comparator import ParagraphComparator
- except ImportError:
- from content_extractor import ContentExtractor
- from table_comparator import TableComparator
- from paragraph_comparator import ParagraphComparator
- class OCRResultComparator:
- """OCR结果比较器主类"""
-
- def __init__(self):
- self.content_extractor = ContentExtractor()
- self.table_comparator = TableComparator()
- self.paragraph_comparator = ParagraphComparator()
-
- self.differences = []
- self.paragraph_match_threshold = 80
- self.content_similarity_threshold = 95
- self.max_paragraph_window = 6
- self.table_comparison_mode = 'standard'
- self.header_similarity_threshold = 90
-
- def compare_files(self, file1_path: str, file2_path: str) -> Dict:
- """比较两个OCR结果文件"""
- print(f"\n📖 读取文件...")
-
- # 读取文件内容
- with open(file1_path, 'r', encoding='utf-8') as f:
- content1 = f.read()
-
- with open(file2_path, 'r', encoding='utf-8') as f:
- content2 = f.read()
-
- print(f"✅ 文件读取完成")
- print(f" 文件1大小: {len(content1)} 字符")
- print(f" 文件2大小: {len(content2)} 字符")
-
- # ✅ 提取结构化内容(包含位置信息)
- print(f"\n📊 提取结构化内容...")
- structured_content1 = self.content_extractor.extract_structured_content(content1)
- structured_content2 = self.content_extractor.extract_structured_content(content2)
-
- print(f" 文件1: {len(structured_content1['tables'])}个表格, {len(structured_content1['paragraph_blocks'])}个段落块")
- print(f" 文件2: {len(structured_content2['tables'])}个表格, {len(structured_content2['paragraph_blocks'])}个段落块")
-
- # 初始化差异列表
- all_differences = []
-
- # ✅ 智能表格匹配与比较
- print(f"\n🔍 开始表格智能匹配...")
-
- tables1 = structured_content1['tables']
- tables2 = structured_content2['tables']
-
- # 记录匹配的表格对
- table_matches = []
-
- if tables1 and tables2:
- # 找到匹配的表格对
- table_matches = self.table_comparator.find_matching_tables(
- [t['data'] for t in tables1],
- [t['data'] for t in tables2]
- )
-
- if not table_matches:
- print(f" ⚠️ 未找到匹配的表格")
- all_differences.append({
- 'type': 'table_structure',
- 'position': '表格匹配',
- 'file1_value': f'{len(tables1)}个表格',
- 'file2_value': f'{len(tables2)}个表格',
- 'description': '未找到可匹配的表格',
- 'severity': 'high'
- })
- else:
- # 比较每对匹配的表格
- for idx1, idx2, similarity in table_matches:
- print(f"\n 📋 对比匹配的表格: 表格{idx1+1} vs 表格{idx2+1}")
-
- if self.table_comparison_mode == 'flow_list':
- table_diffs = self.table_comparator.compare_table_flow_list(
- tables1[idx1]['data'], tables2[idx2]['data']
- )
- else:
- table_diffs = self.table_comparator.compare_tables(
- tables1[idx1]['data'], tables2[idx2]['data']
- )
-
- # 为每个差异添加表格标识
- for diff in table_diffs:
- diff['table_pair'] = f'表格{idx1+1}↔表格{idx2+1}'
- diff['table_similarity'] = similarity
-
- all_differences.extend(table_diffs)
- print(f" 发现 {len(table_diffs)} 个差异")
-
- # 检查未匹配的表格
- matched_tables1 = {m[0] for m in table_matches}
- matched_tables2 = {m[1] for m in table_matches}
-
- for i in range(len(tables1)):
- if i not in matched_tables1:
- all_differences.append({
- 'type': 'table_unmatched',
- 'position': f'文件1表格{i+1}',
- 'file1_value': f'表格{i+1} (无匹配)',
- 'file2_value': '',
- 'description': f'文件1的表格{i+1}在文件2中无匹配表格',
- 'severity': 'medium'
- })
-
- for j in range(len(tables2)):
- if j not in matched_tables2:
- all_differences.append({
- 'type': 'table_unmatched',
- 'position': f'文件2表格{j+1}',
- 'file1_value': '',
- 'file2_value': f'表格{j+1} (无匹配)',
- 'description': f'文件2的表格{j+1}在文件1中无匹配表格',
- 'severity': 'medium'
- })
-
- elif tables1 and not tables2:
- all_differences.append({
- 'type': 'table_structure',
- 'position': '表格结构',
- 'file1_value': f'包含{len(tables1)}个表格',
- 'file2_value': '无表格',
- 'description': '文件1包含表格但文件2无表格',
- 'severity': 'high'
- })
- elif not tables1 and tables2:
- all_differences.append({
- 'type': 'table_structure',
- 'position': '表格结构',
- 'file1_value': '无表格',
- 'file2_value': f'包含{len(tables2)}个表格',
- 'description': '文件2包含表格但文件1无表格',
- 'severity': 'high'
- })
-
- # ✅ 根据表格匹配结果对齐段落块
- print(f"\n🔍 开始段落对比(基于表格位置对齐)...")
-
- paragraph_blocks1 = structured_content1['paragraph_blocks']
- paragraph_blocks2 = structured_content2['paragraph_blocks']
-
- # ✅ 构建段落块对应关系
- aligned_blocks = self._align_paragraph_blocks(
- paragraph_blocks1, paragraph_blocks2,
- tables1, tables2, table_matches
- )
-
- for block_pair in aligned_blocks:
- block1 = block_pair['block1']
- block2 = block_pair['block2']
- position_desc = block_pair['position']
-
- paragraphs1 = block1['paragraphs'] if block1 else []
- paragraphs2 = block2['paragraphs'] if block2 else []
-
- if not paragraphs1 and not paragraphs2:
- continue
-
- print(f" 📦 {position_desc}: 文件1有{len(paragraphs1)}个段落, 文件2有{len(paragraphs2)}个段落")
-
- # 每个段落块独立对比,指针重新初始化
- block_diffs = self.paragraph_comparator.compare_paragraphs(
- paragraphs1, paragraphs2
- )
-
- # 为每个差异添加段落块标识
- for diff in block_diffs:
- diff['paragraph_block'] = position_desc
-
- all_differences.extend(block_diffs)
-
- total_paragraph_diffs = len([d for d in all_differences if d['type'] == 'paragraph'])
- print(f"✅ 段落对比完成,共发现 {total_paragraph_diffs} 个差异")
-
- print(f"\n✅ 对比完成")
-
- # 统计差异
- stats = {
- 'total_differences': len(all_differences),
- 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]),
- 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']),
- 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']),
- 'datetime_differences': len([d for d in all_differences if d['type'] == 'table_datetime']),
- 'text_differences': len([d for d in all_differences if d['type'] == 'table_text']),
- 'table_pre_header': len([d for d in all_differences if d['type'] == 'table_pre_header']),
- 'table_header_mismatch': len([d for d in all_differences if d['type'] == 'table_header_mismatch']),
- 'table_header_critical': len([d for d in all_differences if d['type'] == 'table_header_critical']),
- 'table_header_position': len([d for d in all_differences if d['type'] == 'table_header_position']),
- 'table_row_missing': len([d for d in all_differences if d['type'] == 'table_row_missing']),
- 'high_severity': len([d for d in all_differences if d.get('severity') in ['critical', 'high']]),
- 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']),
- 'low_severity': len([d for d in all_differences if d.get('severity') == 'low'])
- }
-
- # ✅ 构建返回结果
- result = {
- 'differences': all_differences,
- 'statistics': stats,
- 'file1_tables': len(tables1),
- 'file2_tables': len(tables2),
- 'file1_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks1),
- 'file2_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks2),
- 'file1_path': file1_path,
- 'file2_path': file2_path,
- 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }
-
- print(f"\n" + "="*60)
- print(f"📊 对比结果汇总")
- print(f"="*60)
- print(f"总差异数: {result['statistics']['total_differences']}")
- print(f" - 段落差异: {result['statistics']['paragraph_differences']}")
- print(f" - 表格差异: {result['statistics']['table_differences']}")
- print(f" - 金额: {result['statistics']['amount_differences']}")
- print(f" - 日期: {result['statistics']['datetime_differences']}")
- print(f" - 文本: {result['statistics']['text_differences']}")
- print(f"\n严重级别分布:")
- print(f" 🔴 高: {result['statistics']['high_severity']}")
- print(f" 🟡 中: {result['statistics']['medium_severity']}")
- print(f" 🟢 低: {result['statistics']['low_severity']}")
- print(f"="*60)
-
- return result
-
- def _align_paragraph_blocks(self, blocks1: List[Dict], blocks2: List[Dict],
- tables1: List[Dict], tables2: List[Dict],
- table_matches: List[Tuple[int, int, float]]) -> List[Dict]:
- """
- 根据表格位置对齐段落块
-
- Returns:
- [
- {'block1': dict, 'block2': dict, 'position': str},
- ...
- ]
- """
- aligned = []
-
- # 如果没有表格,直接对比所有段落块
- if not tables1 and not tables2:
- max_blocks = max(len(blocks1), len(blocks2))
- for i in range(max_blocks):
- aligned.append({
- 'block1': blocks1[i] if i < len(blocks1) else None,
- 'block2': blocks2[i] if i < len(blocks2) else None,
- 'position': f'段落块{i+1}'
- })
- return aligned
-
- # 构建表格索引映射
- table_map = {idx1: idx2 for idx1, idx2, _ in table_matches}
-
- # ✅ 策略:根据表格位置划分段落块
- # 1. 第一个表格前的段落块
- # 2. 每对匹配表格之间的段落块
- # 3. 最后一个表格后的段落块
-
- # 第一个表格前的段落块
- if blocks1 or blocks2:
- first_table_idx1 = min(table_map.keys()) if table_map else len(blocks1)
- first_table_idx2 = min(table_map.values()) if table_map else len(blocks2)
-
- # 找到第一个表格前的所有段落块
- pre_blocks1 = [b for b in blocks1 if b['end_pos'] <= (tables1[first_table_idx1]['start_pos'] if first_table_idx1 < len(tables1) else float('inf'))]
- pre_blocks2 = [b for b in blocks2 if b['end_pos'] <= (tables2[first_table_idx2]['start_pos'] if first_table_idx2 < len(tables2) else float('inf'))]
-
- if pre_blocks1 or pre_blocks2:
- # 合并所有表格前的段落
- merged_block1 = self._merge_paragraph_blocks(pre_blocks1) if pre_blocks1 else None
- merged_block2 = self._merge_paragraph_blocks(pre_blocks2) if pre_blocks2 else None
-
- aligned.append({
- 'block1': merged_block1,
- 'block2': merged_block2,
- 'position': '文档开头(表格前)'
- })
-
- # 每对匹配表格之间的段落块
- sorted_matches = sorted(table_matches, key=lambda x: x[0])
-
- for i, (idx1, idx2, _) in enumerate(sorted_matches):
- # 当前表格后、下一个表格前的段落块
- table1_end = tables1[idx1]['end_pos']
- table2_end = tables2[idx2]['end_pos']
-
- # 下一个表格的开始位置
- if i + 1 < len(sorted_matches):
- next_idx1 = sorted_matches[i + 1][0]
- next_idx2 = sorted_matches[i + 1][1]
- next_table1_start = tables1[next_idx1]['start_pos']
- next_table2_start = tables2[next_idx2]['start_pos']
- else:
- next_table1_start = float('inf')
- next_table2_start = float('inf')
-
- # 找到这个范围内的段落块
- between_blocks1 = [b for b in blocks1
- if b['start_pos'] >= table1_end and b['end_pos'] <= next_table1_start]
- between_blocks2 = [b for b in blocks2
- if b['start_pos'] >= table2_end and b['end_pos'] <= next_table2_start]
-
- if between_blocks1 or between_blocks2:
- merged_block1 = self._merge_paragraph_blocks(between_blocks1) if between_blocks1 else None
- merged_block2 = self._merge_paragraph_blocks(between_blocks2) if between_blocks2 else None
-
- aligned.append({
- 'block1': merged_block1,
- 'block2': merged_block2,
- 'position': f'表格{idx1+1}↔表格{idx2+1} 之后'
- })
-
- return aligned
-
- def _merge_paragraph_blocks(self, blocks: List[Dict]) -> Dict:
- """合并多个段落块为一个"""
- if not blocks:
- return None
-
- if len(blocks) == 1:
- return blocks[0]
-
- all_paragraphs = []
- for block in blocks:
- all_paragraphs.extend(block['paragraphs'])
-
- return {
- 'start_pos': blocks[0]['start_pos'],
- 'end_pos': blocks[-1]['end_pos'],
- 'paragraphs': all_paragraphs
- }
|