import sys
import time
import re
import difflib
import json
import argparse
from typing import Dict, List, Tuple
import markdown
from bs4 import BeautifulSoup
from fuzzywuzzy import fuzz
class OCRResultComparator:
def __init__(self):
self.differences = []
self.similarity_threshold = 85 # 相似度阈值,超过85%认为是匹配的
self.max_paragraph_window = 6 # 最大合并段落数
def normalize_text(self, text: str) -> str:
"""标准化文本:去除多余空格、回车等无效字符"""
if not text:
return ""
# 去除多余的空白字符
text = re.sub(r'\s+', ' ', text.strip())
# 去除标点符号周围的空格
text = re.sub(r'\s*([,。:;!?、])\s*', r'\1', text)
return text
def is_image_reference(self, text: str) -> bool:
"""判断是否为图片引用或描述"""
image_keywords = [
'图', '图片', '图像', 'image', 'figure', 'fig',
'照片', '截图', '示意图', '流程图', '结构图'
]
# 检查是否包含图片相关关键词
for keyword in image_keywords:
if keyword in text.lower():
return True
# 检查是否为Markdown图片语法
if re.search(r'!\[.*?\]\(.*?\)', text):
return True
# 检查是否为HTML图片标签
if re.search(r'
]*>', text, re.IGNORECASE):
return True
return False
def extract_table_data(self, md_content: str) -> List[List[List[str]]]:
"""从Markdown中提取表格数据"""
tables = []
# 使用BeautifulSoup解析HTML表格
soup = BeautifulSoup(md_content, 'html.parser')
html_tables = soup.find_all('table')
for table in html_tables:
table_data = []
rows = table.find_all('tr')
for row in rows:
cells = row.find_all(['td', 'th'])
row_data = []
for cell in cells:
cell_text = self.normalize_text(cell.get_text())
# 跳过图片内容
if not self.is_image_reference(cell_text):
row_data.append(cell_text)
else:
row_data.append("[图片内容-忽略]")
if row_data: # 只添加非空行
table_data.append(row_data)
if table_data:
tables.append(table_data)
return tables
def merge_split_paragraphs(self, lines: List[str]) -> List[str]:
# 合并连续的非空行作为一个段落,且过滤图片内容
merged_lines = []
current_paragraph = ""
for i, line in enumerate(lines):
# 跳过空行
if not line:
if current_paragraph:
merged_lines.append(current_paragraph)
current_paragraph = ""
continue
# 跳过图片内容
if self.is_image_reference(line):
continue
# 检查是否是标题(以数字、中文数字或特殊标记开头)
is_title = (
line.startswith(('一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、')) or
line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')) or
line.startswith('#')
)
# 如果是标题,结束当前段落
if is_title:
if current_paragraph:
merged_lines.append(current_paragraph)
current_paragraph = ""
merged_lines.append(line)
else:
# 检查是否应该与前一行合并 # 如果当前段落不为空,且当前段落最后一个字符非空白字符
if current_paragraph and not current_paragraph.endswith((' ', '\t')):
current_paragraph += line
else:
current_paragraph = line
# 处理最后一个段落
if current_paragraph:
merged_lines.append(current_paragraph)
return merged_lines
def extract_paragraphs(self, md_content: str) -> List[str]:
"""提取段落文本"""
# 移除表格
content = re.sub(r'
', '', md_content, flags=re.DOTALL)
# 移除HTML标签
content = re.sub(r'<[^>]+>', '', content)
# 移除Markdown注释
content = re.sub(r'', '', content, flags=re.DOTALL)
# 分割段落
paragraphs = []
lines = content.split('\n')
merged_lines = self.merge_split_paragraphs(lines)
for line in merged_lines:
normalized = self.normalize_text(line)
if normalized:
paragraphs.append(normalized)
else:
print(f"跳过的内容无效或图片段落: {line[0:30]}...")
return paragraphs
def compare_tables(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]:
"""比较表格数据"""
differences = []
# 确定最大行数
max_rows = max(len(table1), len(table2))
for i in range(max_rows):
row1 = table1[i] if i < len(table1) else []
row2 = table2[i] if i < len(table2) else []
# 确定最大列数
max_cols = max(len(row1), len(row2))
for j in range(max_cols):
cell1 = row1[j] if j < len(row1) else ""
cell2 = row2[j] if j < len(row2) else ""
# 跳过图片内容比较
if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2:
continue
if cell1 != cell2:
# 特别处理数字金额
if self.is_numeric(cell1) and self.is_numeric(cell2):
num1 = self.parse_number(cell1)
num2 = self.parse_number(cell2)
if abs(num1 - num2) > 0.001: # 允许小数精度误差
differences.append({
'type': 'table_amount',
'position': f'行{i+1}列{j+1}',
'file1_value': cell1,
'file2_value': cell2,
'description': f'金额不一致: {cell1} vs {cell2}',
'row_index': i,
'col_index': j
})
else:
differences.append({
'type': 'table_text',
'position': f'行{i+1}列{j+1}',
'file1_value': cell1,
'file2_value': cell2,
'description': f'文本不一致: {cell1} vs {cell2}',
'row_index': i,
'col_index': j
})
return differences
def is_numeric(self, text: str) -> bool:
"""判断文本是否为数字"""
if not text:
return False
# 移除千分位分隔符和负号
clean_text = re.sub(r'[,,-]', '', text)
try:
float(clean_text)
return True
except ValueError:
return False
def parse_number(self, text: str) -> float:
"""解析数字"""
if not text:
return 0.0
clean_text = re.sub(r'[,,]', '', text)
try:
return float(clean_text)
except ValueError:
return 0.0
def normalize_text_for_comparison(self, text: str) -> str:
"""增强的文本标准化 - 用于语义比较"""
if not text:
return ""
# 移除Markdown格式标记
text = re.sub(r'#{1,6}\s*', '', text) # 移除标题标记
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) # 移除粗体标记
text = re.sub(r'\*(.+?)\*', r'\1', text) # 移除斜体标记
text = re.sub(r'`(.+?)`', r'\1', text) # 移除代码标记
text = re.sub(r'', '', text, flags=re.DOTALL) # 移除注释
# 统一标点符号
punctuation_map = {
',': ',', '。': '.', ':': ':', ';': ';',
'!': '!', '?': '?', '(': '(', ')': ')',
'【': '[', '】': ']', '《': '<', '》': '>',
'"': '"', '"': '"', ''': "'", ''': "'",
'、': ',', '…': '...'
}
for chinese_punct, english_punct in punctuation_map.items():
text = text.replace(chinese_punct, english_punct)
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text.strip())
# 移除标点符号周围的空格
text = re.sub(r'\s*([,.():;!?])\s*', r'\1', text)
return text
def calculate_text_similarity(self, text1: str, text2: str) -> float:
"""改进的相似度计算"""
if not text1 and not text2:
return 100.0
if not text1 or not text2:
return 0.0
# 如果标准化后完全相同,返回100%
if text1 == text2:
return 100.0
# 使用多种相似度算法
similarity_scores = [
fuzz.ratio(text1, text2),
fuzz.partial_ratio(text1, text2),
fuzz.token_sort_ratio(text1, text2),
fuzz.token_set_ratio(text1, text2)
]
# 对于包含关系,给予更高的权重
if text1 in text2 or text2 in text1:
max_score = max(similarity_scores)
# 提升包含关系的相似度
return min(100.0, max_score + 10)
return max(similarity_scores)
def compare_paragraphs_with_flexible_matching(self, paras1: List[str], paras2: List[str]) -> List[Dict]:
"""改进的段落匹配算法 - 更好地处理段落重组"""
differences = []
# 直接调用normalize_text_for_comparison进行预处理
meaningful_paras1 = [self.normalize_text_for_comparison(p) for p in paras1]
meaningful_paras2 = [self.normalize_text_for_comparison(p) for p in paras2]
# 使用预处理后的段落进行匹配
used_paras1 = set()
used_paras2 = set()
best_match = {'similarity': 0.0} # 初始化best_match
# 文件1和文件2同时向下遍历,当有匹配项时,文件2的窗口从匹配项的下一个位置开始
paras2_idx = 0
for window_size1 in range(1, min(self.max_paragraph_window, len(meaningful_paras1) + 1)): # 增加到6个段落
for i in range(len(meaningful_paras1) - window_size1 + 1):
if any(idx in used_paras1 for idx in range(i, i + window_size1)):
continue
# 合并文件1中的段落
combined_para1 = "".join(meaningful_paras1[i:i+window_size1])
# 在文件2中寻找最佳匹配
best_match = self._find_best_match_in_paras2_improved(
combined_para1,
meaningful_paras2[paras2_idx: min(paras2_idx + self.max_paragraph_window, len(meaningful_paras2))],
paras2_idx
)
if best_match and best_match['similarity'] >= self.similarity_threshold:
paras2_idx = best_match['indices'][-1] + 1 # 更新文件2的起始索引
# 记录匹配
for idx in range(i, i + window_size1):
used_paras1.add(idx)
for idx in best_match['indices']:
used_paras2.add(idx)
# 只有当相似度明显不同时才记录差异
if best_match['similarity'] < 95.0: # 提高阈值到95%
severity = 'low' if best_match['similarity'] >= 90 else 'medium'
differences.append({
'type': 'paragraph',
'position': f'段落{i+1}' + (f'-{i+window_size1}' if window_size1 > 1 else ''),
'file1_value': combined_para1,
'file2_value': best_match['text'],
'description': f'段落格式差异 (相似度: {best_match["similarity"]:.1f}%)',
'similarity': best_match['similarity'],
'severity': severity
})
if paras2_idx >= len(meaningful_paras2):
break # 文件2已全部匹配完,退出
# 处理未匹配的有意义段落
for i, para in enumerate(meaningful_paras1):
if i not in used_paras1:
differences.append({
'type': 'paragraph',
'position': f'段落{i+1}',
'file1_value': para,
'file2_value': "",
'description': '文件1中独有的段落',
'similarity': 0.0,
'severity': 'medium'
})
for j, para in enumerate(meaningful_paras2):
if j not in used_paras2:
differences.append({
'type': 'paragraph',
'position': f'段落{j+1}',
'file1_value': "",
'file2_value': para,
'description': '文件2中独有的段落',
'similarity': 0.0,
'severity': 'medium'
})
return differences
def _find_best_match_in_paras2_improved(self, target_text: str, paras2: List[str],
paras2_idx: int) -> Dict:
"""改进的段落匹配方法"""
best_match = None
for window_size in range(1, len(paras2) + 1):
for j in range(len(paras2) - window_size + 1):
combined_para2 = "".join(paras2[j:j+window_size])
similarity = self.calculate_text_similarity(target_text, combined_para2)
if best_match and best_match['similarity'] == 100.0:
break # 找到完美匹配,提前退出
if not best_match or similarity > best_match['similarity']:
best_match = {
'text': combined_para2,
'similarity': similarity,
'indices': list(range(j + paras2_idx, j + paras2_idx + window_size))
}
if best_match and best_match['similarity'] == 100.0:
break # 找到完美匹配,提前退出
# Return empty dict if no match found
if best_match is None:
return {
'text': '',
'similarity': 0.0,
'indices': []
}
return best_match
def compare_files(self, file1_path: str, file2_path: str) -> Dict:
"""改进的文件比较方法"""
# 读取文件
with open(file1_path, 'r', encoding='utf-8') as f:
content1 = f.read()
with open(file2_path, 'r', encoding='utf-8') as f:
content2 = f.read()
# 提取表格和段落
tables1 = self.extract_table_data(content1)
tables2 = self.extract_table_data(content2)
paras1 = self.extract_paragraphs(content1)
paras2 = self.extract_paragraphs(content2)
# 比较结果
all_differences = []
# 比较表格 (保持原有逻辑)
if tables1 and tables2:
table_diffs = self.compare_tables(tables1[0], tables2[0])
all_differences.extend(table_diffs)
elif tables1 and not tables2:
all_differences.append({
'type': 'table_structure',
'position': '表格结构',
'file1_value': f'包含{len(tables1)}个表格',
'file2_value': '无表格',
'description': '文件1包含表格但文件2无表格',
'severity': 'high'
})
elif not tables1 and tables2:
all_differences.append({
'type': 'table_structure',
'position': '表格结构',
'file1_value': '无表格',
'file2_value': f'包含{len(tables2)}个表格',
'description': '文件2包含表格但文件1无表格',
'severity': 'high'
})
# 使用增强的段落比较
para_diffs = self.compare_paragraphs_with_flexible_matching(paras1, paras2)
all_differences.extend(para_diffs)
# # 生成unified diff报告
# unified_diff_data = self.generate_unified_diff_report(
# paras1, paras2, file1_path, file2_path,
# "./output/pre_validation/unified_diff_comparison"
# )
# 统计信息
stats = {
'total_differences': len(all_differences),
'table_differences': len([d for d in all_differences if d['type'].startswith('table')]),
'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']),
'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']),
'high_severity': len([d for d in all_differences if d.get('severity') == 'high']),
'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']),
'low_severity': len([d for d in all_differences if d.get('severity') == 'low'])
}
# 在返回结果中添加unified diff数据
result = {
'differences': all_differences,
'statistics': stats,
'file1_tables': len(tables1),
'file2_tables': len(tables2),
'file1_paragraphs': len(paras1),
'file2_paragraphs': len(paras2),
'file1_path': file1_path,
'file2_path': file2_path,
# 'unified_diff': unified_diff_data # 添加unified diff数据
}
return result
def generate_unified_diff(self, paras1: List[str], paras2: List[str], file1_path: str, file2_path: str) -> Dict:
"""
生成类似git diff的统一差异格式,并返回结构化数据
"""
# 直接调用normalize_text_for_comparison进行预处理
file1_lines = [self.normalize_text_for_comparison(p) for p in paras1]
file2_lines = [self.normalize_text_for_comparison(p) for p in paras2]
# 使用unified_diff生成差异
diff = difflib.unified_diff(
file1_lines,
file2_lines,
fromfile=file1_path,
tofile=file2_path,
lineterm='' # 确保每行末尾不添加额外字符
)
# 将差异生成器转换为列表
diff_output = list(diff)
# 解析diff输出并生成结构化数据
structured_diff = self._parse_unified_diff(diff_output, file1_lines, file2_lines, file1_path, file2_path)
return structured_diff
def _parse_unified_diff(self, diff_lines: List[str], file1_lines: List[str], file2_lines: List[str],
file1_path: str, file2_path: str) -> Dict:
"""解析unified diff输出并生成结构化数据"""
differences = []
current_hunk = None
file1_line_num = 0
file2_line_num = 0
for line in diff_lines:
if line.startswith('---') or line.startswith('+++'):
continue
elif line.startswith('@@'):
# 解析hunk头部,例如: @@ -1,5 +1,4 @@
import re
match = re.match(r'@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@', line)
if match:
file1_start = int(match.group(1))
file1_count = int(match.group(2)) if match.group(2) else 1
file2_start = int(match.group(3))
file2_count = int(match.group(4)) if match.group(4) else 1
current_hunk = {
'file1_start': file1_start,
'file1_count': file1_count,
'file2_start': file2_start,
'file2_count': file2_count
}
file1_line_num = file1_start - 1 # 转为0基索引
file2_line_num = file2_start - 1
elif line.startswith(' '):
# 未改变的行
file1_line_num += 1
file2_line_num += 1
elif line.startswith('-'):
# 文件1中删除的行
content = line[1:] # 去掉'-'前缀
differences.append({
'type': 'paragraph',
'position': f'段落{file1_line_num + 1}',
'file1_value': content,
'file2_value': "",
'description': '文件1中独有的段落',
'similarity': 0.0,
'severity': 'medium',
'line_number': file1_line_num + 1,
'change_type': 'deletion'
})
file1_line_num += 1
elif line.startswith('+'):
# 文件2中添加的行
content = line[1:] # 去掉'+'前缀
differences.append({
'type': 'paragraph',
'position': f'段落{file2_line_num + 1}',
'file1_value': "",
'file2_value': content,
'description': '文件2中独有的段落',
'similarity': 0.0,
'severity': 'medium',
'line_number': file2_line_num + 1,
'change_type': 'addition'
})
file2_line_num += 1
# 计算统计信息
stats = {
'total_differences': len(differences),
'table_differences': 0, # diff不包含表格差异
'paragraph_differences': len(differences),
'amount_differences': 0,
'high_severity': len([d for d in differences if d.get('severity') == 'high']),
'medium_severity': len([d for d in differences if d.get('severity') == 'medium']),
'low_severity': len([d for d in differences if d.get('severity') == 'low']),
'deletions': len([d for d in differences if d.get('change_type') == 'deletion']),
'additions': len([d for d in differences if d.get('change_type') == 'addition'])
}
return {
'differences': differences,
'statistics': stats,
'file1_tables': 0,
'file2_tables': 0,
'file1_paragraphs': len(file1_lines),
'file2_paragraphs': len(file2_lines),
'file1_path': file1_path,
'file2_path': file2_path,
'diff_type': 'unified_diff'
}
def generate_unified_diff_report(self, paras1: List[str], paras2: List[str], file1_path: str, file2_path: str, output_file: str):
"""生成unified diff的JSON和Markdown报告"""
# 生成结构化diff数据
diff_data = self.generate_unified_diff(paras1, paras2, file1_path, file2_path)
# 添加时间戳
import datetime
diff_data['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 生成JSON报告
json_file = f"{output_file}_unified_diff.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(diff_data, f, ensure_ascii=False, indent=2)
# 生成Markdown报告
md_file = f"{output_file}_unified_diff.md"
self._generate_unified_diff_markdown(diff_data, md_file)
print(f"📄 Unified Diff JSON报告: {json_file}")
print(f"📝 Unified Diff Markdown报告: {md_file}")
return diff_data
def _generate_unified_diff_markdown(self, diff_data: Dict, output_file: str):
"""生成unified diff的Markdown报告"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write("# OCR结果Unified Diff对比报告\n\n")
# 基本信息
f.write("## 基本信息\n\n")
f.write(f"- **文件1**: `{diff_data['file1_path']}`\n")
f.write(f"- **文件2**: `{diff_data['file2_path']}`\n")
f.write(f"- **比较时间**: {diff_data.get('timestamp', 'N/A')}\n")
f.write(f"- **对比方式**: Unified Diff\n\n")
# 统计信息
stats = diff_data['statistics']
f.write("## 统计信息\n\n")
f.write(f"- 总差异数量: **{stats['total_differences']}**\n")
f.write(f"- 删除行数: **{stats['deletions']}**\n")
f.write(f"- 添加行数: **{stats['additions']}**\n")
f.write(f"- 文件1段落数: {diff_data['file1_paragraphs']}\n")
f.write(f"- 文件2段落数: {diff_data['file2_paragraphs']}\n\n")
# 差异详情
if diff_data['differences']:
f.write("## 差异详情\n\n")
# 按变更类型分组
deletions = [d for d in diff_data['differences'] if d['change_type'] == 'deletion']
additions = [d for d in diff_data['differences'] if d['change_type'] == 'addition']
if deletions:
f.write(f"### 🗑️ 删除内容 ({len(deletions)}项)\n\n")
for i, diff in enumerate(deletions, 1):
f.write(f"**{i}. 第{diff['line_number']}行**\n")
f.write(f"```\n{diff['file1_value']}\n```\n\n")
if additions:
f.write(f"### ➕ 新增内容 ({len(additions)}项)\n\n")
for i, diff in enumerate(additions, 1):
f.write(f"**{i}. 第{diff['line_number']}行**\n")
f.write(f"```\n{diff['file2_value']}\n```\n\n")
# 详细差异表格
f.write("## 详细差异列表\n\n")
f.write("| 序号 | 类型 | 行号 | 变更类型 | 内容 | 描述 |\n")
f.write("| --- | --- | --- | --- | --- | --- |\n")
for i, diff in enumerate(diff_data['differences'], 1):
change_icon = "🗑️" if diff['change_type'] == 'deletion' else "➕"
content = diff['file1_value'] if diff['change_type'] == 'deletion' else diff['file2_value']
f.write(f"| {i} | {change_icon} | {diff['line_number']} | {diff['change_type']} | ")
f.write(f"`{content[:50]}{'...' if len(content) > 50 else ''}` | ")
f.write(f"{diff['description']} |\n")
else:
f.write("## 结论\n\n")
f.write("🎉 **完美匹配!没有发现任何差异。**\n\n")
def generate_json_report(self, comparison_result: Dict, output_file: str):
"""生成JSON格式的比较报告"""
# report_data = {
# 'comparison_summary': {
# 'timestamp': re.sub(r'[^\w\-_\.]', '_', str(comparison_result.get('timestamp', ''))),
# 'file1': comparison_result['file1_path'],
# 'file2': comparison_result['file2_path'],
# 'statistics': comparison_result['statistics'],
# 'file_info': {
# 'file1_tables': comparison_result['file1_tables'],
# 'file2_tables': comparison_result['file2_tables'],
# 'file1_paragraphs': comparison_result['file1_paragraphs'],
# 'file2_paragraphs': comparison_result['file2_paragraphs']
# }
# },
# 'differences': comparison_result['differences']
# }
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(comparison_result, f, ensure_ascii=False, indent=2)
def generate_markdown_report(self, comparison_result: Dict, output_file: str):
"""生成Markdown格式的比较报告"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write("# OCR结果对比报告\n\n")
# 基本信息
f.write("## 基本信息\n\n")
f.write(f"- **文件1**: `{comparison_result['file1_path']}`\n")
f.write(f"- **文件2**: `{comparison_result['file2_path']}`\n")
f.write(f"- **比较时间**: {comparison_result.get('timestamp', 'N/A')}\n\n")
# 统计信息
stats = comparison_result['statistics']
f.write("## 统计信息\n\n")
f.write(f"- 总差异数量: **{stats['total_differences']}**\n")
f.write(f"- 表格差异: **{stats['table_differences']}**\n")
f.write(f"- 金额差异: **{stats['amount_differences']}**\n")
f.write(f"- 段落差异: **{stats['paragraph_differences']}**\n")
f.write(f"- 文件1表格数: {comparison_result['file1_tables']}\n")
f.write(f"- 文件2表格数: {comparison_result['file2_tables']}\n")
f.write(f"- 文件1段落数: {comparison_result['file1_paragraphs']}\n")
f.write(f"- 文件2段落数: {comparison_result['file2_paragraphs']}\n\n")
# 差异摘要
if stats['total_differences'] == 0:
f.write("## 结论\n\n")
f.write("🎉 **完美匹配!没有发现任何差异。**\n\n")
else:
f.write("## 差异摘要\n\n")
# 按类型分组显示差异
diff_by_type = {}
for diff in comparison_result['differences']:
diff_type = diff['type']
if diff_type not in diff_by_type:
diff_by_type[diff_type] = []
diff_by_type[diff_type].append(diff)
for diff_type, diffs in diff_by_type.items():
type_name = {
'table_amount': '💰 表格金额差异',
'table_text': '📝 表格文本差异',
'paragraph': '📄 段落差异',
'table_structure': '🏗️ 表格结构差异'
}.get(diff_type, f'❓ {diff_type}')
f.write(f"### {type_name} ({len(diffs)}个)\n\n")
for i, diff in enumerate(diffs, 1):
f.write(f"**{i}. {diff['position']}**\n")
f.write(f"- 文件1: `{diff['file1_value']}`\n")
f.write(f"- 文件2: `{diff['file2_value']}`\n")
f.write(f"- 说明: {diff['description']}\n\n")
# 详细差异列表
if comparison_result['differences']:
f.write("## 详细差异列表\n\n")
f.write("| 序号 | 类型 | 位置 | 文件1内容 | 文件2内容 | 描述 |\n")
f.write("| --- | --- | --- | --- | --- | --- |\n")
for i, diff in enumerate(comparison_result['differences'], 1):
f.write(f"| {i} | {diff['type']} | {diff['position']} | ")
f.write(f"`{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}` | ")
f.write(f"`{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}` | ")
f.write(f"{diff['description']} |\n")
def compare_ocr_results(file1_path: str, file2_path: str, output_file: str = "comparison_report",
output_format: str = "markdown", ignore_images: bool = True):
"""
比较两个OCR结果文件
Args:
file1_path: 第一个OCR结果文件路径
file2_path: 第二个OCR结果文件路径
output_file: 输出文件名(不含扩展名),默认为"comparison_report"
output_format: 输出格式,选项: 'json', 'markdown', 'both',默认为'markdown'
ignore_images: 是否忽略图片内容,默认为True
Returns:
Dict: 比较结果字典
"""
comparator = OCRResultComparator()
print("🔍 开始对比OCR结果...")
print(f"📄 文件1: {file1_path}")
print(f"📄 文件2: {file2_path}")
print(f"📁 输出格式: {output_format}")
print(f"🖼️ 图片处理: {'忽略' if ignore_images else '对比'}")
try:
# 执行比较
result = comparator.compare_files(file1_path, file2_path)
# 添加时间戳
import datetime
result['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 生成报告
if output_format in ['json', 'both']:
json_file = f"{output_file}.json"
comparator.generate_json_report(result, json_file)
print(f"📄 JSON报告已保存至: {json_file}")
if output_format in ['markdown', 'both']:
md_file = f"{output_file}.md"
comparator.generate_markdown_report(result, md_file)
print(f"📝 Markdown报告已保存至: {md_file}")
# 打印简要结果
print(f"\n📊 对比完成!")
print(f" 总差异数: {result['statistics']['total_differences']}")
print(f" 表格差异: {result['statistics']['table_differences']}")
print(f" 金额差异: {result['statistics']['amount_differences']}")
print(f" 段落差异: {result['statistics']['paragraph_differences']}")
# 打印前几个重要差异
if result['differences']:
print(f"\n🔍 前3个重要差异:")
for i, diff in enumerate(result['differences'][:3], 1):
print(f" {i}. {diff['position']}: {diff['description']}")
print(f" 文件1: '{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}'")
print(f" 文件2: '{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}'")
else:
print(f"\n🎉 恭喜!两个文件内容完全一致!")
# 添加处理统计信息(模仿 ocr_by_vlm.py 的风格)
print("\n📊 对比处理统计")
print(f" 文件1路径: {result['file1_path']}")
print(f" 文件2路径: {result['file2_path']}")
print(f" 输出文件: {output_file}")
print(f" 输出格式: {output_format}")
print(f" 忽略图片: {ignore_images}")
print(f" 处理时间: {result['timestamp']}")
print(f" 文件1表格数: {result['file1_tables']}")
print(f" 文件2表格数: {result['file2_tables']}")
print(f" 文件1段落数: {result['file1_paragraphs']}")
print(f" 文件2段落数: {result['file2_paragraphs']}")
return result
except Exception as e:
import traceback
traceback.print_exc()
raise Exception(f"OCR对比任务失败: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='OCR结果对比工具')
parser.add_argument('file1', nargs= '?', help='第一个OCR结果文件路径')
parser.add_argument('file2', nargs= '?', help='第二个OCR结果文件路径')
parser.add_argument('-o', '--output', default='comparison_report',
help='输出文件名(不含扩展名)')
parser.add_argument('-f', '--format', choices=['json', 'markdown', 'both'],
default='markdown', help='输出格式: json, markdown, 或 both')
parser.add_argument('--ignore-images', action='store_true',
help='忽略图片内容(默认已启用)')
args = parser.parse_args()
if args.file1 and args.file2:
result = compare_ocr_results(
file1_path=args.file1,
file2_path=args.file2,
output_file=args.output,
output_format=args.format,
ignore_images=args.ignore_images
)
else:
# 如果sys.argv没有被传入参数,则提供默认参数用于测试
result = compare_ocr_results(
file1_path='/Users/zhch158/workspace/data/至远彩色印刷工业有限公司/data_DotsOCR_Results/2023年度报告母公司_page_001.md',
file2_path='./output/pre_validation/2023年度报告母公司_page_001.md',
# output_file=f'./output/comparison_result_{time.strftime("%Y%m%d_%H%M%S")}',
output_file=f'./output/pre_validation/2023年度报告母公司_page_001_comparison_result',
output_format='both',
ignore_images=True
)
print("\n🎉 OCR对比完成!")