|
@@ -1,5 +1,5 @@
|
|
|
import os
|
|
import os
|
|
|
-from typing import Dict
|
|
|
|
|
|
|
+from typing import Dict, List, Tuple
|
|
|
from datetime import datetime
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
@@ -42,37 +42,32 @@ class OCRResultComparator:
|
|
|
print(f" 文件1大小: {len(content1)} 字符")
|
|
print(f" 文件1大小: {len(content1)} 字符")
|
|
|
print(f" 文件2大小: {len(content2)} 字符")
|
|
print(f" 文件2大小: {len(content2)} 字符")
|
|
|
|
|
|
|
|
- # 提取表格
|
|
|
|
|
- print(f"\n📊 提取表格...")
|
|
|
|
|
- tables1 = self.content_extractor.extract_table_data(content1)
|
|
|
|
|
- tables2 = self.content_extractor.extract_table_data(content2)
|
|
|
|
|
- print(f" 文件1表格数: {len(tables1)}")
|
|
|
|
|
- print(f" 文件2表格数: {len(tables2)}")
|
|
|
|
|
-
|
|
|
|
|
- # 提取段落
|
|
|
|
|
- print(f"\n📝 提取段落...")
|
|
|
|
|
- paragraphs1 = self.content_extractor.extract_paragraphs(content1)
|
|
|
|
|
- paragraphs2 = self.content_extractor.extract_paragraphs(content2)
|
|
|
|
|
- print(f" 文件1段落数: {len(paragraphs1)}")
|
|
|
|
|
- print(f" 文件2段落数: {len(paragraphs2)}")
|
|
|
|
|
-
|
|
|
|
|
- # 比较段落
|
|
|
|
|
- print(f"\n🔍 开始段落对比...")
|
|
|
|
|
- paragraph_differences = self.paragraph_comparator.compare_paragraphs(
|
|
|
|
|
- paragraphs1, paragraphs2
|
|
|
|
|
- )
|
|
|
|
|
- print(f"✅ 段落对比完成,发现 {len(paragraph_differences)} 个差异")
|
|
|
|
|
|
|
+ # ✅ 提取结构化内容(包含位置信息)
|
|
|
|
|
+ print(f"\n📊 提取结构化内容...")
|
|
|
|
|
+ structured_content1 = self.content_extractor.extract_structured_content(content1)
|
|
|
|
|
+ structured_content2 = self.content_extractor.extract_structured_content(content2)
|
|
|
|
|
+
|
|
|
|
|
+ print(f" 文件1: {len(structured_content1['tables'])}个表格, {len(structured_content1['paragraph_blocks'])}个段落块")
|
|
|
|
|
+ print(f" 文件2: {len(structured_content2['tables'])}个表格, {len(structured_content2['paragraph_blocks'])}个段落块")
|
|
|
|
|
|
|
|
- # 初始化所有差异列表
|
|
|
|
|
|
|
+ # 初始化差异列表
|
|
|
all_differences = []
|
|
all_differences = []
|
|
|
- all_differences.extend(paragraph_differences)
|
|
|
|
|
|
|
|
|
|
# ✅ 智能表格匹配与比较
|
|
# ✅ 智能表格匹配与比较
|
|
|
print(f"\n🔍 开始表格智能匹配...")
|
|
print(f"\n🔍 开始表格智能匹配...")
|
|
|
|
|
|
|
|
|
|
+ tables1 = structured_content1['tables']
|
|
|
|
|
+ tables2 = structured_content2['tables']
|
|
|
|
|
+
|
|
|
|
|
+ # 记录匹配的表格对
|
|
|
|
|
+ table_matches = []
|
|
|
|
|
+
|
|
|
if tables1 and tables2:
|
|
if tables1 and tables2:
|
|
|
# 找到匹配的表格对
|
|
# 找到匹配的表格对
|
|
|
- table_matches = self.table_comparator.find_matching_tables(tables1, tables2)
|
|
|
|
|
|
|
+ table_matches = self.table_comparator.find_matching_tables(
|
|
|
|
|
+ [t['data'] for t in tables1],
|
|
|
|
|
+ [t['data'] for t in tables2]
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
if not table_matches:
|
|
if not table_matches:
|
|
|
print(f" ⚠️ 未找到匹配的表格")
|
|
print(f" ⚠️ 未找到匹配的表格")
|
|
@@ -91,11 +86,11 @@ class OCRResultComparator:
|
|
|
|
|
|
|
|
if self.table_comparison_mode == 'flow_list':
|
|
if self.table_comparison_mode == 'flow_list':
|
|
|
table_diffs = self.table_comparator.compare_table_flow_list(
|
|
table_diffs = self.table_comparator.compare_table_flow_list(
|
|
|
- tables1[idx1], tables2[idx2]
|
|
|
|
|
|
|
+ tables1[idx1]['data'], tables2[idx2]['data']
|
|
|
)
|
|
)
|
|
|
else:
|
|
else:
|
|
|
table_diffs = self.table_comparator.compare_tables(
|
|
table_diffs = self.table_comparator.compare_tables(
|
|
|
- tables1[idx1], tables2[idx2]
|
|
|
|
|
|
|
+ tables1[idx1]['data'], tables2[idx2]['data']
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
# 为每个差异添加表格标识
|
|
# 为每个差异添加表格标识
|
|
@@ -151,9 +146,48 @@ class OCRResultComparator:
|
|
|
'severity': 'high'
|
|
'severity': 'high'
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
|
|
+ # ✅ 根据表格匹配结果对齐段落块
|
|
|
|
|
+ print(f"\n🔍 开始段落对比(基于表格位置对齐)...")
|
|
|
|
|
+
|
|
|
|
|
+ paragraph_blocks1 = structured_content1['paragraph_blocks']
|
|
|
|
|
+ paragraph_blocks2 = structured_content2['paragraph_blocks']
|
|
|
|
|
+
|
|
|
|
|
+ # ✅ 构建段落块对应关系
|
|
|
|
|
+ aligned_blocks = self._align_paragraph_blocks(
|
|
|
|
|
+ paragraph_blocks1, paragraph_blocks2,
|
|
|
|
|
+ tables1, tables2, table_matches
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ for block_pair in aligned_blocks:
|
|
|
|
|
+ block1 = block_pair['block1']
|
|
|
|
|
+ block2 = block_pair['block2']
|
|
|
|
|
+ position_desc = block_pair['position']
|
|
|
|
|
+
|
|
|
|
|
+ paragraphs1 = block1['paragraphs'] if block1 else []
|
|
|
|
|
+ paragraphs2 = block2['paragraphs'] if block2 else []
|
|
|
|
|
+
|
|
|
|
|
+ if not paragraphs1 and not paragraphs2:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ print(f" 📦 {position_desc}: 文件1有{len(paragraphs1)}个段落, 文件2有{len(paragraphs2)}个段落")
|
|
|
|
|
+
|
|
|
|
|
+ # 每个段落块独立对比,指针重新初始化
|
|
|
|
|
+ block_diffs = self.paragraph_comparator.compare_paragraphs(
|
|
|
|
|
+ paragraphs1, paragraphs2
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 为每个差异添加段落块标识
|
|
|
|
|
+ for diff in block_diffs:
|
|
|
|
|
+ diff['paragraph_block'] = position_desc
|
|
|
|
|
+
|
|
|
|
|
+ all_differences.extend(block_diffs)
|
|
|
|
|
+
|
|
|
|
|
+ total_paragraph_diffs = len([d for d in all_differences if d['type'] == 'paragraph'])
|
|
|
|
|
+ print(f"✅ 段落对比完成,共发现 {total_paragraph_diffs} 个差异")
|
|
|
|
|
+
|
|
|
print(f"\n✅ 对比完成")
|
|
print(f"\n✅ 对比完成")
|
|
|
|
|
|
|
|
- # ✅ 统计差异 - 细化分类(与原版本保持一致)
|
|
|
|
|
|
|
+ # 统计差异
|
|
|
stats = {
|
|
stats = {
|
|
|
'total_differences': len(all_differences),
|
|
'total_differences': len(all_differences),
|
|
|
'table_differences': len([d for d in all_differences if d['type'].startswith('table')]),
|
|
'table_differences': len([d for d in all_differences if d['type'].startswith('table')]),
|
|
@@ -171,17 +205,17 @@ class OCRResultComparator:
|
|
|
'low_severity': len([d for d in all_differences if d.get('severity') == 'low'])
|
|
'low_severity': len([d for d in all_differences if d.get('severity') == 'low'])
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- # ✅ 构建返回结果 - 与原版本结构保持完全一致
|
|
|
|
|
|
|
+ # ✅ 构建返回结果
|
|
|
result = {
|
|
result = {
|
|
|
- 'differences': all_differences, # ✅ 原版本使用 differences 而非 paragraph_differences
|
|
|
|
|
|
|
+ 'differences': all_differences,
|
|
|
'statistics': stats,
|
|
'statistics': stats,
|
|
|
'file1_tables': len(tables1),
|
|
'file1_tables': len(tables1),
|
|
|
'file2_tables': len(tables2),
|
|
'file2_tables': len(tables2),
|
|
|
- 'file1_paragraphs': len(paragraphs1),
|
|
|
|
|
- 'file2_paragraphs': len(paragraphs2),
|
|
|
|
|
|
|
+ 'file1_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks1),
|
|
|
|
|
+ 'file2_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks2),
|
|
|
'file1_path': file1_path,
|
|
'file1_path': file1_path,
|
|
|
'file2_path': file2_path,
|
|
'file2_path': file2_path,
|
|
|
- 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') # ✅ 添加时间戳
|
|
|
|
|
|
|
+ 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
print(f"\n" + "="*60)
|
|
print(f"\n" + "="*60)
|
|
@@ -199,4 +233,111 @@ class OCRResultComparator:
|
|
|
print(f" 🟢 低: {result['statistics']['low_severity']}")
|
|
print(f" 🟢 低: {result['statistics']['low_severity']}")
|
|
|
print(f"="*60)
|
|
print(f"="*60)
|
|
|
|
|
|
|
|
- return result
|
|
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ def _align_paragraph_blocks(self, blocks1: List[Dict], blocks2: List[Dict],
|
|
|
|
|
+ tables1: List[Dict], tables2: List[Dict],
|
|
|
|
|
+ table_matches: List[Tuple[int, int, float]]) -> List[Dict]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 根据表格位置对齐段落块
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ [
|
|
|
|
|
+ {'block1': dict, 'block2': dict, 'position': str},
|
|
|
|
|
+ ...
|
|
|
|
|
+ ]
|
|
|
|
|
+ """
|
|
|
|
|
+ aligned = []
|
|
|
|
|
+
|
|
|
|
|
+ # 如果没有表格,直接对比所有段落块
|
|
|
|
|
+ if not tables1 and not tables2:
|
|
|
|
|
+ max_blocks = max(len(blocks1), len(blocks2))
|
|
|
|
|
+ for i in range(max_blocks):
|
|
|
|
|
+ aligned.append({
|
|
|
|
|
+ 'block1': blocks1[i] if i < len(blocks1) else None,
|
|
|
|
|
+ 'block2': blocks2[i] if i < len(blocks2) else None,
|
|
|
|
|
+ 'position': f'段落块{i+1}'
|
|
|
|
|
+ })
|
|
|
|
|
+ return aligned
|
|
|
|
|
+
|
|
|
|
|
+ # 构建表格索引映射
|
|
|
|
|
+ table_map = {idx1: idx2 for idx1, idx2, _ in table_matches}
|
|
|
|
|
+
|
|
|
|
|
+ # ✅ 策略:根据表格位置划分段落块
|
|
|
|
|
+ # 1. 第一个表格前的段落块
|
|
|
|
|
+ # 2. 每对匹配表格之间的段落块
|
|
|
|
|
+ # 3. 最后一个表格后的段落块
|
|
|
|
|
+
|
|
|
|
|
+ # 第一个表格前的段落块
|
|
|
|
|
+ if blocks1 or blocks2:
|
|
|
|
|
+ first_table_idx1 = min(table_map.keys()) if table_map else len(blocks1)
|
|
|
|
|
+ first_table_idx2 = min(table_map.values()) if table_map else len(blocks2)
|
|
|
|
|
+
|
|
|
|
|
+ # 找到第一个表格前的所有段落块
|
|
|
|
|
+ pre_blocks1 = [b for b in blocks1 if b['end_pos'] <= (tables1[first_table_idx1]['start_pos'] if first_table_idx1 < len(tables1) else float('inf'))]
|
|
|
|
|
+ pre_blocks2 = [b for b in blocks2 if b['end_pos'] <= (tables2[first_table_idx2]['start_pos'] if first_table_idx2 < len(tables2) else float('inf'))]
|
|
|
|
|
+
|
|
|
|
|
+ if pre_blocks1 or pre_blocks2:
|
|
|
|
|
+ # 合并所有表格前的段落
|
|
|
|
|
+ merged_block1 = self._merge_paragraph_blocks(pre_blocks1) if pre_blocks1 else None
|
|
|
|
|
+ merged_block2 = self._merge_paragraph_blocks(pre_blocks2) if pre_blocks2 else None
|
|
|
|
|
+
|
|
|
|
|
+ aligned.append({
|
|
|
|
|
+ 'block1': merged_block1,
|
|
|
|
|
+ 'block2': merged_block2,
|
|
|
|
|
+ 'position': '文档开头(表格前)'
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 每对匹配表格之间的段落块
|
|
|
|
|
+ sorted_matches = sorted(table_matches, key=lambda x: x[0])
|
|
|
|
|
+
|
|
|
|
|
+ for i, (idx1, idx2, _) in enumerate(sorted_matches):
|
|
|
|
|
+ # 当前表格后、下一个表格前的段落块
|
|
|
|
|
+ table1_end = tables1[idx1]['end_pos']
|
|
|
|
|
+ table2_end = tables2[idx2]['end_pos']
|
|
|
|
|
+
|
|
|
|
|
+ # 下一个表格的开始位置
|
|
|
|
|
+ if i + 1 < len(sorted_matches):
|
|
|
|
|
+ next_idx1 = sorted_matches[i + 1][0]
|
|
|
|
|
+ next_idx2 = sorted_matches[i + 1][1]
|
|
|
|
|
+ next_table1_start = tables1[next_idx1]['start_pos']
|
|
|
|
|
+ next_table2_start = tables2[next_idx2]['start_pos']
|
|
|
|
|
+ else:
|
|
|
|
|
+ next_table1_start = float('inf')
|
|
|
|
|
+ next_table2_start = float('inf')
|
|
|
|
|
+
|
|
|
|
|
+ # 找到这个范围内的段落块
|
|
|
|
|
+ between_blocks1 = [b for b in blocks1
|
|
|
|
|
+ if b['start_pos'] >= table1_end and b['end_pos'] <= next_table1_start]
|
|
|
|
|
+ between_blocks2 = [b for b in blocks2
|
|
|
|
|
+ if b['start_pos'] >= table2_end and b['end_pos'] <= next_table2_start]
|
|
|
|
|
+
|
|
|
|
|
+ if between_blocks1 or between_blocks2:
|
|
|
|
|
+ merged_block1 = self._merge_paragraph_blocks(between_blocks1) if between_blocks1 else None
|
|
|
|
|
+ merged_block2 = self._merge_paragraph_blocks(between_blocks2) if between_blocks2 else None
|
|
|
|
|
+
|
|
|
|
|
+ aligned.append({
|
|
|
|
|
+ 'block1': merged_block1,
|
|
|
|
|
+ 'block2': merged_block2,
|
|
|
|
|
+ 'position': f'表格{idx1+1}↔表格{idx2+1} 之后'
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ return aligned
|
|
|
|
|
+
|
|
|
|
|
+ def _merge_paragraph_blocks(self, blocks: List[Dict]) -> Dict:
|
|
|
|
|
+ """合并多个段落块为一个"""
|
|
|
|
|
+ if not blocks:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ if len(blocks) == 1:
|
|
|
|
|
+ return blocks[0]
|
|
|
|
|
+
|
|
|
|
|
+ all_paragraphs = []
|
|
|
|
|
+ for block in blocks:
|
|
|
|
|
+ all_paragraphs.extend(block['paragraphs'])
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ 'start_pos': blocks[0]['start_pos'],
|
|
|
|
|
+ 'end_pos': blocks[-1]['end_pos'],
|
|
|
|
|
+ 'paragraphs': all_paragraphs
|
|
|
|
|
+ }
|