import sys import time import re import difflib import json import argparse from typing import Dict, List, Tuple import markdown from bs4 import BeautifulSoup from fuzzywuzzy import fuzz class OCRResultComparator: def __init__(self): self.differences = [] self.similarity_threshold = 85 # 相似度阈值,超过85%认为是匹配的 self.max_paragraph_window = 6 # 最大合并段落数 def normalize_text(self, text: str) -> str: """标准化文本:去除多余空格、回车等无效字符""" if not text: return "" # 去除多余的空白字符 text = re.sub(r'\s+', ' ', text.strip()) # 去除标点符号周围的空格 text = re.sub(r'\s*([,。:;!?、])\s*', r'\1', text) return text def is_image_reference(self, text: str) -> bool: """判断是否为图片引用或描述""" image_keywords = [ '图', '图片', '图像', 'image', 'figure', 'fig', '照片', '截图', '示意图', '流程图', '结构图' ] # 检查是否包含图片相关关键词 for keyword in image_keywords: if keyword in text.lower(): return True # 检查是否为Markdown图片语法 if re.search(r'!\[.*?\]\(.*?\)', text): return True # 检查是否为HTML图片标签 if re.search(r']*>', text, re.IGNORECASE): return True return False def extract_table_data(self, md_content: str) -> List[List[List[str]]]: """从Markdown中提取表格数据""" tables = [] # 使用BeautifulSoup解析HTML表格 soup = BeautifulSoup(md_content, 'html.parser') html_tables = soup.find_all('table') for table in html_tables: table_data = [] rows = table.find_all('tr') for row in rows: cells = row.find_all(['td', 'th']) row_data = [] for cell in cells: cell_text = self.normalize_text(cell.get_text()) # 跳过图片内容 if not self.is_image_reference(cell_text): row_data.append(cell_text) else: row_data.append("[图片内容-忽略]") if row_data: # 只添加非空行 table_data.append(row_data) if table_data: tables.append(table_data) return tables def merge_split_paragraphs(self, lines: List[str]) -> List[str]: # 合并连续的非空行作为一个段落,且过滤图片内容 merged_lines = [] current_paragraph = "" for i, line in enumerate(lines): # 跳过空行 if not line: if current_paragraph: merged_lines.append(current_paragraph) current_paragraph = "" continue # 跳过图片内容 if self.is_image_reference(line): continue # 检查是否是标题(以数字、中文数字或特殊标记开头) is_title = ( line.startswith(('一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、')) or line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')) or line.startswith('#') ) # 如果是标题,结束当前段落 if is_title: if current_paragraph: merged_lines.append(current_paragraph) current_paragraph = "" merged_lines.append(line) else: # 检查是否应该与前一行合并 # 如果当前段落不为空,且当前段落最后一个字符非空白字符 if current_paragraph and not current_paragraph.endswith((' ', '\t')): current_paragraph += line else: current_paragraph = line # 处理最后一个段落 if current_paragraph: merged_lines.append(current_paragraph) return merged_lines def extract_paragraphs(self, md_content: str) -> List[str]: """提取段落文本""" # 移除表格 content = re.sub(r'.*?
', '', md_content, flags=re.DOTALL) # 移除HTML标签 content = re.sub(r'<[^>]+>', '', content) # 移除Markdown注释 content = re.sub(r'', '', content, flags=re.DOTALL) # 分割段落 paragraphs = [] lines = content.split('\n') merged_lines = self.merge_split_paragraphs(lines) for line in merged_lines: normalized = self.normalize_text(line) if normalized: paragraphs.append(normalized) else: print(f"跳过的内容无效或图片段落: {line[0:30]}...") return paragraphs def compare_tables(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]: """比较表格数据""" differences = [] # 确定最大行数 max_rows = max(len(table1), len(table2)) for i in range(max_rows): row1 = table1[i] if i < len(table1) else [] row2 = table2[i] if i < len(table2) else [] # 确定最大列数 max_cols = max(len(row1), len(row2)) for j in range(max_cols): cell1 = row1[j] if j < len(row1) else "" cell2 = row2[j] if j < len(row2) else "" # 跳过图片内容比较 if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2: continue if cell1 != cell2: # 特别处理数字金额 if self.is_numeric(cell1) and self.is_numeric(cell2): num1 = self.parse_number(cell1) num2 = self.parse_number(cell2) if abs(num1 - num2) > 0.001: # 允许小数精度误差 differences.append({ 'type': 'table_amount', 'position': f'行{i+1}列{j+1}', 'file1_value': cell1, 'file2_value': cell2, 'description': f'金额不一致: {cell1} vs {cell2}', 'row_index': i, 'col_index': j }) else: differences.append({ 'type': 'table_text', 'position': f'行{i+1}列{j+1}', 'file1_value': cell1, 'file2_value': cell2, 'description': f'文本不一致: {cell1} vs {cell2}', 'row_index': i, 'col_index': j }) return differences def is_numeric(self, text: str) -> bool: """判断文本是否为数字""" if not text: return False # 移除千分位分隔符和负号 clean_text = re.sub(r'[,,-]', '', text) try: float(clean_text) return True except ValueError: return False def parse_number(self, text: str) -> float: """解析数字""" if not text: return 0.0 clean_text = re.sub(r'[,,]', '', text) try: return float(clean_text) except ValueError: return 0.0 def normalize_text_for_comparison(self, text: str) -> str: """增强的文本标准化 - 用于语义比较""" if not text: return "" # 移除Markdown格式标记 text = re.sub(r'#{1,6}\s*', '', text) # 移除标题标记 text = re.sub(r'\*\*(.+?)\*\*', r'\1', text) # 移除粗体标记 text = re.sub(r'\*(.+?)\*', r'\1', text) # 移除斜体标记 text = re.sub(r'`(.+?)`', r'\1', text) # 移除代码标记 text = re.sub(r'', '', text, flags=re.DOTALL) # 移除注释 # 统一标点符号 punctuation_map = { ',': ',', '。': '.', ':': ':', ';': ';', '!': '!', '?': '?', '(': '(', ')': ')', '【': '[', '】': ']', '《': '<', '》': '>', '"': '"', '"': '"', ''': "'", ''': "'", '、': ',', '…': '...' } for chinese_punct, english_punct in punctuation_map.items(): text = text.replace(chinese_punct, english_punct) # 移除多余的空白字符 text = re.sub(r'\s+', ' ', text.strip()) # 移除标点符号周围的空格 text = re.sub(r'\s*([,.():;!?])\s*', r'\1', text) return text def calculate_text_similarity(self, text1: str, text2: str) -> float: """改进的相似度计算""" if not text1 and not text2: return 100.0 if not text1 or not text2: return 0.0 # 如果标准化后完全相同,返回100% if text1 == text2: return 100.0 # 使用多种相似度算法 similarity_scores = [ fuzz.ratio(text1, text2), fuzz.partial_ratio(text1, text2), fuzz.token_sort_ratio(text1, text2), fuzz.token_set_ratio(text1, text2) ] # 对于包含关系,给予更高的权重 if text1 in text2 or text2 in text1: max_score = max(similarity_scores) # 提升包含关系的相似度 return min(100.0, max_score + 10) return max(similarity_scores) def compare_paragraphs_with_flexible_matching(self, paras1: List[str], paras2: List[str]) -> List[Dict]: """改进的段落匹配算法 - 更好地处理段落重组""" differences = [] # 直接调用normalize_text_for_comparison进行预处理 meaningful_paras1 = [self.normalize_text_for_comparison(p) for p in paras1] meaningful_paras2 = [self.normalize_text_for_comparison(p) for p in paras2] # 使用预处理后的段落进行匹配 used_paras1 = set() used_paras2 = set() best_match = {'similarity': 0.0} # 初始化best_match # 文件1和文件2同时向下遍历,当有匹配项时,文件2的窗口从匹配项的下一个位置开始 paras2_idx = 0 for window_size1 in range(1, min(self.max_paragraph_window, len(meaningful_paras1) + 1)): # 增加到6个段落 for i in range(len(meaningful_paras1) - window_size1 + 1): if any(idx in used_paras1 for idx in range(i, i + window_size1)): continue # 合并文件1中的段落 combined_para1 = "".join(meaningful_paras1[i:i+window_size1]) # 在文件2中寻找最佳匹配 best_match = self._find_best_match_in_paras2_improved( combined_para1, meaningful_paras2[paras2_idx: min(paras2_idx + self.max_paragraph_window, len(meaningful_paras2))], paras2_idx ) if best_match and best_match['similarity'] >= self.similarity_threshold: paras2_idx = best_match['indices'][-1] + 1 # 更新文件2的起始索引 # 记录匹配 for idx in range(i, i + window_size1): used_paras1.add(idx) for idx in best_match['indices']: used_paras2.add(idx) # 只有当相似度明显不同时才记录差异 if best_match['similarity'] < 95.0: # 提高阈值到95% severity = 'low' if best_match['similarity'] >= 90 else 'medium' differences.append({ 'type': 'paragraph', 'position': f'段落{i+1}' + (f'-{i+window_size1}' if window_size1 > 1 else ''), 'file1_value': combined_para1, 'file2_value': best_match['text'], 'description': f'段落格式差异 (相似度: {best_match["similarity"]:.1f}%)', 'similarity': best_match['similarity'], 'severity': severity }) if paras2_idx >= len(meaningful_paras2): break # 文件2已全部匹配完,退出 # 处理未匹配的有意义段落 for i, para in enumerate(meaningful_paras1): if i not in used_paras1: differences.append({ 'type': 'paragraph', 'position': f'段落{i+1}', 'file1_value': para, 'file2_value': "", 'description': '文件1中独有的段落', 'similarity': 0.0, 'severity': 'medium' }) for j, para in enumerate(meaningful_paras2): if j not in used_paras2: differences.append({ 'type': 'paragraph', 'position': f'段落{j+1}', 'file1_value': "", 'file2_value': para, 'description': '文件2中独有的段落', 'similarity': 0.0, 'severity': 'medium' }) return differences def _find_best_match_in_paras2_improved(self, target_text: str, paras2: List[str], paras2_idx: int) -> Dict: """改进的段落匹配方法""" best_match = None for window_size in range(1, len(paras2) + 1): for j in range(len(paras2) - window_size + 1): combined_para2 = "".join(paras2[j:j+window_size]) similarity = self.calculate_text_similarity(target_text, combined_para2) if best_match and best_match['similarity'] == 100.0: break # 找到完美匹配,提前退出 if not best_match or similarity > best_match['similarity']: best_match = { 'text': combined_para2, 'similarity': similarity, 'indices': list(range(j + paras2_idx, j + paras2_idx + window_size)) } if best_match and best_match['similarity'] == 100.0: break # 找到完美匹配,提前退出 # Return empty dict if no match found if best_match is None: return { 'text': '', 'similarity': 0.0, 'indices': [] } return best_match def compare_files(self, file1_path: str, file2_path: str) -> Dict: """改进的文件比较方法""" # 读取文件 with open(file1_path, 'r', encoding='utf-8') as f: content1 = f.read() with open(file2_path, 'r', encoding='utf-8') as f: content2 = f.read() # 提取表格和段落 tables1 = self.extract_table_data(content1) tables2 = self.extract_table_data(content2) paras1 = self.extract_paragraphs(content1) paras2 = self.extract_paragraphs(content2) # 比较结果 all_differences = [] # 比较表格 (保持原有逻辑) if tables1 and tables2: table_diffs = self.compare_tables(tables1[0], tables2[0]) all_differences.extend(table_diffs) elif tables1 and not tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': f'包含{len(tables1)}个表格', 'file2_value': '无表格', 'description': '文件1包含表格但文件2无表格', 'severity': 'high' }) elif not tables1 and tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': '无表格', 'file2_value': f'包含{len(tables2)}个表格', 'description': '文件2包含表格但文件1无表格', 'severity': 'high' }) # 使用增强的段落比较 para_diffs = self.compare_paragraphs_with_flexible_matching(paras1, paras2) all_differences.extend(para_diffs) # # 生成unified diff报告 # unified_diff_data = self.generate_unified_diff_report( # paras1, paras2, file1_path, file2_path, # "./output/pre_validation/unified_diff_comparison" # ) # 统计信息 stats = { 'total_differences': len(all_differences), 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]), 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']), 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']), 'high_severity': len([d for d in all_differences if d.get('severity') == 'high']), 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']), 'low_severity': len([d for d in all_differences if d.get('severity') == 'low']) } # 在返回结果中添加unified diff数据 result = { 'differences': all_differences, 'statistics': stats, 'file1_tables': len(tables1), 'file2_tables': len(tables2), 'file1_paragraphs': len(paras1), 'file2_paragraphs': len(paras2), 'file1_path': file1_path, 'file2_path': file2_path, # 'unified_diff': unified_diff_data # 添加unified diff数据 } return result def generate_unified_diff(self, paras1: List[str], paras2: List[str], file1_path: str, file2_path: str) -> Dict: """ 生成类似git diff的统一差异格式,并返回结构化数据 """ # 直接调用normalize_text_for_comparison进行预处理 file1_lines = [self.normalize_text_for_comparison(p) for p in paras1] file2_lines = [self.normalize_text_for_comparison(p) for p in paras2] # 使用unified_diff生成差异 diff = difflib.unified_diff( file1_lines, file2_lines, fromfile=file1_path, tofile=file2_path, lineterm='' # 确保每行末尾不添加额外字符 ) # 将差异生成器转换为列表 diff_output = list(diff) # 解析diff输出并生成结构化数据 structured_diff = self._parse_unified_diff(diff_output, file1_lines, file2_lines, file1_path, file2_path) return structured_diff def _parse_unified_diff(self, diff_lines: List[str], file1_lines: List[str], file2_lines: List[str], file1_path: str, file2_path: str) -> Dict: """解析unified diff输出并生成结构化数据""" differences = [] current_hunk = None file1_line_num = 0 file2_line_num = 0 for line in diff_lines: if line.startswith('---') or line.startswith('+++'): continue elif line.startswith('@@'): # 解析hunk头部,例如: @@ -1,5 +1,4 @@ import re match = re.match(r'@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@', line) if match: file1_start = int(match.group(1)) file1_count = int(match.group(2)) if match.group(2) else 1 file2_start = int(match.group(3)) file2_count = int(match.group(4)) if match.group(4) else 1 current_hunk = { 'file1_start': file1_start, 'file1_count': file1_count, 'file2_start': file2_start, 'file2_count': file2_count } file1_line_num = file1_start - 1 # 转为0基索引 file2_line_num = file2_start - 1 elif line.startswith(' '): # 未改变的行 file1_line_num += 1 file2_line_num += 1 elif line.startswith('-'): # 文件1中删除的行 content = line[1:] # 去掉'-'前缀 differences.append({ 'type': 'paragraph', 'position': f'段落{file1_line_num + 1}', 'file1_value': content, 'file2_value': "", 'description': '文件1中独有的段落', 'similarity': 0.0, 'severity': 'medium', 'line_number': file1_line_num + 1, 'change_type': 'deletion' }) file1_line_num += 1 elif line.startswith('+'): # 文件2中添加的行 content = line[1:] # 去掉'+'前缀 differences.append({ 'type': 'paragraph', 'position': f'段落{file2_line_num + 1}', 'file1_value': "", 'file2_value': content, 'description': '文件2中独有的段落', 'similarity': 0.0, 'severity': 'medium', 'line_number': file2_line_num + 1, 'change_type': 'addition' }) file2_line_num += 1 # 计算统计信息 stats = { 'total_differences': len(differences), 'table_differences': 0, # diff不包含表格差异 'paragraph_differences': len(differences), 'amount_differences': 0, 'high_severity': len([d for d in differences if d.get('severity') == 'high']), 'medium_severity': len([d for d in differences if d.get('severity') == 'medium']), 'low_severity': len([d for d in differences if d.get('severity') == 'low']), 'deletions': len([d for d in differences if d.get('change_type') == 'deletion']), 'additions': len([d for d in differences if d.get('change_type') == 'addition']) } return { 'differences': differences, 'statistics': stats, 'file1_tables': 0, 'file2_tables': 0, 'file1_paragraphs': len(file1_lines), 'file2_paragraphs': len(file2_lines), 'file1_path': file1_path, 'file2_path': file2_path, 'diff_type': 'unified_diff' } def generate_unified_diff_report(self, paras1: List[str], paras2: List[str], file1_path: str, file2_path: str, output_file: str): """生成unified diff的JSON和Markdown报告""" # 生成结构化diff数据 diff_data = self.generate_unified_diff(paras1, paras2, file1_path, file2_path) # 添加时间戳 import datetime diff_data['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 生成JSON报告 json_file = f"{output_file}_unified_diff.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(diff_data, f, ensure_ascii=False, indent=2) # 生成Markdown报告 md_file = f"{output_file}_unified_diff.md" self._generate_unified_diff_markdown(diff_data, md_file) print(f"📄 Unified Diff JSON报告: {json_file}") print(f"📝 Unified Diff Markdown报告: {md_file}") return diff_data def _generate_unified_diff_markdown(self, diff_data: Dict, output_file: str): """生成unified diff的Markdown报告""" with open(output_file, 'w', encoding='utf-8') as f: f.write("# OCR结果Unified Diff对比报告\n\n") # 基本信息 f.write("## 基本信息\n\n") f.write(f"- **文件1**: `{diff_data['file1_path']}`\n") f.write(f"- **文件2**: `{diff_data['file2_path']}`\n") f.write(f"- **比较时间**: {diff_data.get('timestamp', 'N/A')}\n") f.write(f"- **对比方式**: Unified Diff\n\n") # 统计信息 stats = diff_data['statistics'] f.write("## 统计信息\n\n") f.write(f"- 总差异数量: **{stats['total_differences']}**\n") f.write(f"- 删除行数: **{stats['deletions']}**\n") f.write(f"- 添加行数: **{stats['additions']}**\n") f.write(f"- 文件1段落数: {diff_data['file1_paragraphs']}\n") f.write(f"- 文件2段落数: {diff_data['file2_paragraphs']}\n\n") # 差异详情 if diff_data['differences']: f.write("## 差异详情\n\n") # 按变更类型分组 deletions = [d for d in diff_data['differences'] if d['change_type'] == 'deletion'] additions = [d for d in diff_data['differences'] if d['change_type'] == 'addition'] if deletions: f.write(f"### 🗑️ 删除内容 ({len(deletions)}项)\n\n") for i, diff in enumerate(deletions, 1): f.write(f"**{i}. 第{diff['line_number']}行**\n") f.write(f"```\n{diff['file1_value']}\n```\n\n") if additions: f.write(f"### ➕ 新增内容 ({len(additions)}项)\n\n") for i, diff in enumerate(additions, 1): f.write(f"**{i}. 第{diff['line_number']}行**\n") f.write(f"```\n{diff['file2_value']}\n```\n\n") # 详细差异表格 f.write("## 详细差异列表\n\n") f.write("| 序号 | 类型 | 行号 | 变更类型 | 内容 | 描述 |\n") f.write("| --- | --- | --- | --- | --- | --- |\n") for i, diff in enumerate(diff_data['differences'], 1): change_icon = "🗑️" if diff['change_type'] == 'deletion' else "➕" content = diff['file1_value'] if diff['change_type'] == 'deletion' else diff['file2_value'] f.write(f"| {i} | {change_icon} | {diff['line_number']} | {diff['change_type']} | ") f.write(f"`{content[:50]}{'...' if len(content) > 50 else ''}` | ") f.write(f"{diff['description']} |\n") else: f.write("## 结论\n\n") f.write("🎉 **完美匹配!没有发现任何差异。**\n\n") def generate_json_report(self, comparison_result: Dict, output_file: str): """生成JSON格式的比较报告""" # report_data = { # 'comparison_summary': { # 'timestamp': re.sub(r'[^\w\-_\.]', '_', str(comparison_result.get('timestamp', ''))), # 'file1': comparison_result['file1_path'], # 'file2': comparison_result['file2_path'], # 'statistics': comparison_result['statistics'], # 'file_info': { # 'file1_tables': comparison_result['file1_tables'], # 'file2_tables': comparison_result['file2_tables'], # 'file1_paragraphs': comparison_result['file1_paragraphs'], # 'file2_paragraphs': comparison_result['file2_paragraphs'] # } # }, # 'differences': comparison_result['differences'] # } with open(output_file, 'w', encoding='utf-8') as f: json.dump(comparison_result, f, ensure_ascii=False, indent=2) def generate_markdown_report(self, comparison_result: Dict, output_file: str): """生成Markdown格式的比较报告""" with open(output_file, 'w', encoding='utf-8') as f: f.write("# OCR结果对比报告\n\n") # 基本信息 f.write("## 基本信息\n\n") f.write(f"- **文件1**: `{comparison_result['file1_path']}`\n") f.write(f"- **文件2**: `{comparison_result['file2_path']}`\n") f.write(f"- **比较时间**: {comparison_result.get('timestamp', 'N/A')}\n\n") # 统计信息 stats = comparison_result['statistics'] f.write("## 统计信息\n\n") f.write(f"- 总差异数量: **{stats['total_differences']}**\n") f.write(f"- 表格差异: **{stats['table_differences']}**\n") f.write(f"- 金额差异: **{stats['amount_differences']}**\n") f.write(f"- 段落差异: **{stats['paragraph_differences']}**\n") f.write(f"- 文件1表格数: {comparison_result['file1_tables']}\n") f.write(f"- 文件2表格数: {comparison_result['file2_tables']}\n") f.write(f"- 文件1段落数: {comparison_result['file1_paragraphs']}\n") f.write(f"- 文件2段落数: {comparison_result['file2_paragraphs']}\n\n") # 差异摘要 if stats['total_differences'] == 0: f.write("## 结论\n\n") f.write("🎉 **完美匹配!没有发现任何差异。**\n\n") else: f.write("## 差异摘要\n\n") # 按类型分组显示差异 diff_by_type = {} for diff in comparison_result['differences']: diff_type = diff['type'] if diff_type not in diff_by_type: diff_by_type[diff_type] = [] diff_by_type[diff_type].append(diff) for diff_type, diffs in diff_by_type.items(): type_name = { 'table_amount': '💰 表格金额差异', 'table_text': '📝 表格文本差异', 'paragraph': '📄 段落差异', 'table_structure': '🏗️ 表格结构差异' }.get(diff_type, f'❓ {diff_type}') f.write(f"### {type_name} ({len(diffs)}个)\n\n") for i, diff in enumerate(diffs, 1): f.write(f"**{i}. {diff['position']}**\n") f.write(f"- 文件1: `{diff['file1_value']}`\n") f.write(f"- 文件2: `{diff['file2_value']}`\n") f.write(f"- 说明: {diff['description']}\n\n") # 详细差异列表 if comparison_result['differences']: f.write("## 详细差异列表\n\n") f.write("| 序号 | 类型 | 位置 | 文件1内容 | 文件2内容 | 描述 |\n") f.write("| --- | --- | --- | --- | --- | --- |\n") for i, diff in enumerate(comparison_result['differences'], 1): f.write(f"| {i} | {diff['type']} | {diff['position']} | ") f.write(f"`{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}` | ") f.write(f"`{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}` | ") f.write(f"{diff['description']} |\n") def compare_ocr_results(file1_path: str, file2_path: str, output_file: str = "comparison_report", output_format: str = "markdown", ignore_images: bool = True): """ 比较两个OCR结果文件 Args: file1_path: 第一个OCR结果文件路径 file2_path: 第二个OCR结果文件路径 output_file: 输出文件名(不含扩展名),默认为"comparison_report" output_format: 输出格式,选项: 'json', 'markdown', 'both',默认为'markdown' ignore_images: 是否忽略图片内容,默认为True Returns: Dict: 比较结果字典 """ comparator = OCRResultComparator() print("🔍 开始对比OCR结果...") print(f"📄 文件1: {file1_path}") print(f"📄 文件2: {file2_path}") print(f"📁 输出格式: {output_format}") print(f"🖼️ 图片处理: {'忽略' if ignore_images else '对比'}") try: # 执行比较 result = comparator.compare_files(file1_path, file2_path) # 添加时间戳 import datetime result['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 生成报告 if output_format in ['json', 'both']: json_file = f"{output_file}.json" comparator.generate_json_report(result, json_file) print(f"📄 JSON报告已保存至: {json_file}") if output_format in ['markdown', 'both']: md_file = f"{output_file}.md" comparator.generate_markdown_report(result, md_file) print(f"📝 Markdown报告已保存至: {md_file}") # 打印简要结果 print(f"\n📊 对比完成!") print(f" 总差异数: {result['statistics']['total_differences']}") print(f" 表格差异: {result['statistics']['table_differences']}") print(f" 金额差异: {result['statistics']['amount_differences']}") print(f" 段落差异: {result['statistics']['paragraph_differences']}") # 打印前几个重要差异 if result['differences']: print(f"\n🔍 前3个重要差异:") for i, diff in enumerate(result['differences'][:3], 1): print(f" {i}. {diff['position']}: {diff['description']}") print(f" 文件1: '{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}'") print(f" 文件2: '{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}'") else: print(f"\n🎉 恭喜!两个文件内容完全一致!") # 添加处理统计信息(模仿 ocr_by_vlm.py 的风格) print("\n📊 对比处理统计") print(f" 文件1路径: {result['file1_path']}") print(f" 文件2路径: {result['file2_path']}") print(f" 输出文件: {output_file}") print(f" 输出格式: {output_format}") print(f" 忽略图片: {ignore_images}") print(f" 处理时间: {result['timestamp']}") print(f" 文件1表格数: {result['file1_tables']}") print(f" 文件2表格数: {result['file2_tables']}") print(f" 文件1段落数: {result['file1_paragraphs']}") print(f" 文件2段落数: {result['file2_paragraphs']}") return result except Exception as e: import traceback traceback.print_exc() raise Exception(f"OCR对比任务失败: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description='OCR结果对比工具') parser.add_argument('file1', nargs= '?', help='第一个OCR结果文件路径') parser.add_argument('file2', nargs= '?', help='第二个OCR结果文件路径') parser.add_argument('-o', '--output', default='comparison_report', help='输出文件名(不含扩展名)') parser.add_argument('-f', '--format', choices=['json', 'markdown', 'both'], default='markdown', help='输出格式: json, markdown, 或 both') parser.add_argument('--ignore-images', action='store_true', help='忽略图片内容(默认已启用)') args = parser.parse_args() if args.file1 and args.file2: result = compare_ocr_results( file1_path=args.file1, file2_path=args.file2, output_file=args.output, output_format=args.format, ignore_images=args.ignore_images ) else: # 如果sys.argv没有被传入参数,则提供默认参数用于测试 result = compare_ocr_results( file1_path='/Users/zhch158/workspace/data/至远彩色印刷工业有限公司/data_DotsOCR_Results/2023年度报告母公司_page_001.md', file2_path='./output/pre_validation/2023年度报告母公司_page_001.md', # output_file=f'./output/comparison_result_{time.strftime("%Y%m%d_%H%M%S")}', output_file=f'./output/pre_validation/2023年度报告母公司_page_001_comparison_result', output_format='both', ignore_images=True ) print("\n🎉 OCR对比完成!")