import sys import time import re import difflib import json import argparse from typing import Dict, List, Tuple import markdown from bs4 import BeautifulSoup from fuzzywuzzy import fuzz class OCRResultComparator: def __init__(self): self.differences = [] self.similarity_threshold = 95 self.max_paragraph_window = 6 self.table_comparison_mode = 'standard' # 新增:表格比较模式 self.header_similarity_threshold = 80 # 表头相似度阈值 def normalize_text(self, text: str) -> str: """标准化文本:去除多余空格、回车等无效字符""" if not text: return "" # 去除多余的空白字符 text = re.sub(r'\s+', ' ', text.strip()) # 去除标点符号周围的空格 text = re.sub(r'\s*([,。:;!?、])\s*', r'\1', text) return text def is_image_reference(self, text: str) -> bool: """判断是否为图片引用或描述""" image_keywords = [ '图', '图片', '图像', 'image', 'figure', 'fig', '照片', '截图', '示意图', '流程图', '结构图' ] # 检查是否包含图片相关关键词 for keyword in image_keywords: if keyword in text.lower(): return True # 检查是否为Markdown图片语法 if re.search(r'!\[.*?\]\(.*?\)', text): return True # 检查是否为HTML图片标签 if re.search(r']*>', text, re.IGNORECASE): return True return False def extract_table_data(self, md_content: str) -> List[List[List[str]]]: """从Markdown中提取表格数据""" tables = [] # 使用BeautifulSoup解析HTML表格 soup = BeautifulSoup(md_content, 'html.parser') html_tables = soup.find_all('table') for table in html_tables: table_data = [] rows = table.find_all('tr') for row in rows: cells = row.find_all(['td', 'th']) row_data = [] for cell in cells: cell_text = self.normalize_text(cell.get_text()) # 跳过图片内容 if not self.is_image_reference(cell_text): row_data.append(cell_text) else: row_data.append("[图片内容-忽略]") if row_data: # 只添加非空行 table_data.append(row_data) if table_data: tables.append(table_data) return tables def merge_split_paragraphs(self, lines: List[str]) -> List[str]: # 合并连续的非空行作为一个段落,且过滤图片内容 merged_lines = [] current_paragraph = "" for i, line in enumerate(lines): # 跳过空行 if not line: if current_paragraph: merged_lines.append(current_paragraph) current_paragraph = "" continue # 跳过图片内容 if self.is_image_reference(line): continue # 检查是否是标题(以数字、中文数字或特殊标记开头) is_title = ( line.startswith(('一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、')) or line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')) or line.startswith('#') ) # 如果是标题,结束当前段落 if is_title: if current_paragraph: merged_lines.append(current_paragraph) current_paragraph = "" merged_lines.append(line) else: # 检查是否应该与前一行合并 # 如果当前段落不为空,且当前段落最后一个字符非空白字符 if current_paragraph and not current_paragraph.endswith((' ', '\t')): current_paragraph += line else: current_paragraph = line # 处理最后一个段落 if current_paragraph: merged_lines.append(current_paragraph) return merged_lines def extract_paragraphs(self, md_content: str) -> List[str]: """提取段落文本""" # 移除表格 - 修复正则表达式 # 使用 IGNORECASE 和 DOTALL 标志 content = re.sub(r']*>.*?', '', md_content, flags=re.DOTALL | re.IGNORECASE) # 移除其他 HTML 标签 content = re.sub(r'<[^>]+>', '', content) # 移除 Markdown 注释 content = re.sub(r'', '', content, flags=re.DOTALL) # 分割段落 paragraphs = [] lines = content.split('\n') merged_lines = self.merge_split_paragraphs(lines) for line in merged_lines: normalized = self.normalize_text(line) if normalized: paragraphs.append(normalized) else: print(f"跳过的内容无效或图片段落: {line[0:30] if line else ''}...") return paragraphs def compare_tables(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]: """比较表格数据""" differences = [] # 确定最大行数 max_rows = max(len(table1), len(table2)) for i in range(max_rows): row1 = table1[i] if i < len(table1) else [] row2 = table2[i] if i < len(table2) else [] # 确定最大列数 max_cols = max(len(row1), len(row2)) for j in range(max_cols): cell1 = row1[j] if j < len(row1) else "" cell2 = row2[j] if j < len(row2) else "" # 跳过图片内容比较 if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2: continue if cell1 != cell2: # 特别处理数字金额 if self.is_numeric(cell1) and self.is_numeric(cell2): num1 = self.parse_number(cell1) num2 = self.parse_number(cell2) if abs(num1 - num2) > 0.001: # 允许小数精度误差 differences.append({ 'type': 'table_amount', 'position': f'行{i+1}列{j+1}', 'file1_value': cell1, 'file2_value': cell2, 'description': f'金额不一致: {cell1} vs {cell2}', 'row_index': i, 'col_index': j }) else: differences.append({ 'type': 'table_text', 'position': f'行{i+1}列{j+1}', 'file1_value': cell1, 'file2_value': cell2, 'description': f'文本不一致: {cell1} vs {cell2}', 'row_index': i, 'col_index': j }) return differences def is_numeric(self, text: str) -> bool: """判断文本是否为数字""" if not text: return False # 移除千分位分隔符和负号 clean_text = re.sub(r'[,,-]', '', text) try: float(clean_text) return True except ValueError: return False def parse_number(self, text: str) -> float: """解析数字""" if not text: return 0.0 clean_text = re.sub(r'[,,]', '', text) try: return float(clean_text) except ValueError: return 0.0 def calculate_text_similarity(self, text1: str, text2: str) -> float: """改进的相似度计算""" if not text1 and not text2: return 100.0 if not text1 or not text2: return 0.0 # 如果标准化后完全相同,返回100% if text1 == text2: return 100.0 # 使用多种相似度算法 similarity_scores = [ fuzz.ratio(text1, text2), # fuzz.partial_ratio(text1, text2), # fuzz.token_sort_ratio(text1, text2), # fuzz.token_set_ratio(text1, text2) ] # 对于包含关系,给予更高的权重 # if text1 in text2 or text2 in text1: # max_score = max(similarity_scores) # # 提升包含关系的相似度 # return min(100.0, max_score + 10) return max(similarity_scores) def compare_paragraphs_with_flexible_matching(self, paras1: List[str], paras2: List[str]) -> List[Dict]: """改进的段落匹配算法 - 更好地处理段落重组""" differences = [] # 直接调用进行预处理 meaningful_paras1 = paras1 meaningful_paras2 = paras2 # 使用预处理后的段落进行匹配 used_paras1 = set() used_paras2 = set() best_match = {'similarity': 0.0} # 初始化best_match # 文件1和文件2同时向下遍历,当有匹配项时,文件2的窗口从匹配项的下一个位置开始 paras2_idx = 0 for window_size1 in range(1, min(self.max_paragraph_window, len(meaningful_paras1) + 1)): # 增加到6个段落 for i in range(len(meaningful_paras1) - window_size1 + 1): if any(idx in used_paras1 for idx in range(i, i + window_size1)): continue # 合并文件1中的段落 combined_para1 = "".join(meaningful_paras1[i:i+window_size1]) # 在文件2中寻找最佳匹配 best_match = self._find_best_match_in_paras2_improved( combined_para1, meaningful_paras2[paras2_idx: min(paras2_idx + self.max_paragraph_window, len(meaningful_paras2))], paras2_idx ) if best_match and best_match['similarity'] >= self.similarity_threshold: paras2_idx = best_match['indices'][-1] + 1 # 更新文件2的起始索引 # 记录匹配 for idx in range(i, i + window_size1): used_paras1.add(idx) for idx in best_match['indices']: used_paras2.add(idx) # 只有当相似度明显不同时才记录差异 if best_match['similarity'] < 95.0: # 提高阈值到95% severity = 'low' if best_match['similarity'] >= 90 else 'medium' differences.append({ 'type': 'paragraph', 'position': f'段落{i+1}' + (f'-{i+window_size1}' if window_size1 > 1 else ''), 'file1_value': combined_para1, 'file2_value': best_match['text'], 'description': f'段落格式差异 (相似度: {best_match["similarity"]:.1f}%)', 'similarity': best_match['similarity'], 'severity': severity }) if paras2_idx >= len(meaningful_paras2): break # 文件2已全部匹配完,退出 # 处理未匹配的有意义段落 for i, para in enumerate(meaningful_paras1): if i not in used_paras1: differences.append({ 'type': 'paragraph', 'position': f'段落{i+1}', 'file1_value': para, 'file2_value': "", 'description': '文件1中独有的段落', 'similarity': 0.0, 'severity': 'medium' }) for j, para in enumerate(meaningful_paras2): if j not in used_paras2: differences.append({ 'type': 'paragraph', 'position': f'段落{j+1}', 'file1_value': "", 'file2_value': para, 'description': '文件2中独有的段落', 'similarity': 0.0, 'severity': 'medium' }) return differences def _find_best_match_in_paras2_improved(self, target_text: str, paras2: List[str], paras2_idx: int) -> Dict: """改进的段落匹配方法""" best_match = None for window_size in range(1, len(paras2) + 1): for j in range(len(paras2) - window_size + 1): combined_para2 = "".join(paras2[j:j+window_size]) similarity = self.calculate_text_similarity(target_text, combined_para2) if best_match and best_match['similarity'] == 100.0: break # 找到完美匹配,提前退出 if not best_match or similarity > best_match['similarity']: best_match = { 'text': combined_para2, 'similarity': similarity, 'indices': list(range(j + paras2_idx, j + paras2_idx + window_size)) } if best_match and best_match['similarity'] == 100.0: break # 找到完美匹配,提前退出 # Return empty dict if no match found if best_match is None: return { 'text': '', 'similarity': 0.0, 'indices': [] } return best_match def detect_column_type(self, column_values: List[str]) -> str: """检测列的数据类型""" if not column_values: return 'text' # 过滤空值 non_empty_values = [v for v in column_values if v and v.strip()] if not non_empty_values: return 'text' # 检测是否为日期时间 datetime_patterns = [ r'\d{4}[-/]\d{1,2}[-/]\d{1,2}', # YYYY-MM-DD r'\d{4}[-/]\d{1,2}[-/]\d{1,2}\s*\d{1,2}:\d{1,2}:\d{1,2}', # YYYY-MM-DD HH:MM:SS r'\d{4}年\d{1,2}月\d{1,2}日', # 中文日期 ] datetime_count = 0 for value in non_empty_values[:5]: # 检查前5个值 for pattern in datetime_patterns: if re.search(pattern, value): datetime_count += 1 break if datetime_count >= len(non_empty_values[:5]) * 0.6: return 'datetime' # 检测是否为数字/金额 numeric_count = 0 for value in non_empty_values[:5]: if self.is_numeric(value): numeric_count += 1 if numeric_count >= len(non_empty_values[:5]) * 0.6: return 'numeric' # 默认为文本 return 'text' def normalize_header_text(self, text: str) -> str: """标准化表头文本""" # 移除括号及其内容 text = re.sub(r'[((].*?[))]', '', text) # 统一空格 text = re.sub(r'\s+', '', text) # 移除特殊字符 text = re.sub(r'[^\w\u4e00-\u9fff]', '', text) return text.lower().strip() def compare_table_headers(self, headers1: List[str], headers2: List[str]) -> Dict: """比较表格表头""" result = { 'match': True, 'differences': [], 'column_mapping': {}, # 列映射关系 'similarity_scores': [] } if len(headers1) != len(headers2): result['match'] = False result['differences'].append({ 'type': 'header_count', 'description': f'表头列数不一致: {len(headers1)} vs {len(headers2)}', 'severity': 'critical' }) return result # 逐列比较表头 for i, (h1, h2) in enumerate(zip(headers1, headers2)): norm_h1 = self.normalize_header_text(h1) norm_h2 = self.normalize_header_text(h2) similarity = self.calculate_text_similarity(norm_h1, norm_h2) result['similarity_scores'].append({ 'column_index': i, 'header1': h1, 'header2': h2, 'similarity': similarity }) if similarity < self.header_similarity_threshold: result['match'] = False result['differences'].append({ 'type': 'header_mismatch', 'column_index': i, 'header1': h1, 'header2': h2, 'similarity': similarity, 'description': f'第{i+1}列表头不匹配: "{h1}" vs "{h2}" (相似度: {similarity:.1f}%)', 'severity': 'critical' }) else: result['column_mapping'][i] = i # 建立列映射 return result def compare_cell_value(self, value1: str, value2: str, column_type: str, column_name: str = '') -> Dict: """比较单元格值 - 统一错误类型""" result = { 'match': True, 'difference': None } # 标准化值 v1 = self.normalize_text(value1) v2 = self.normalize_text(value2) if v1 == v2: return result # 根据列类型采用不同的比较策略 if column_type == 'numeric': # 数字/金额比较 if self.is_numeric(v1) and self.is_numeric(v2): num1 = self.parse_number(v1) num2 = self.parse_number(v2) if abs(num1 - num2) > 0.01: # 允许0.01的误差 result['match'] = False result['difference'] = { 'type': 'table_amount', # ✅ 统一类型 'value1': value1, 'value2': value2, 'diff_amount': abs(num1 - num2), 'description': f'金额不一致: {value1} vs {value2}' } else: result['match'] = False result['difference'] = { 'type': 'table_amount', # ✅ 格式错误也算金额差异 'value1': value1, 'value2': value2, 'description': f'数字格式错误: {value1} vs {value2}' } elif column_type == 'datetime': # 日期时间比较 datetime1 = self.extract_datetime(v1) datetime2 = self.extract_datetime(v2) if datetime1 != datetime2: result['match'] = False result['difference'] = { 'type': 'table_datetime', # ✅ 日期时间类型 'value1': value1, 'value2': value2, 'description': f'日期时间不一致: {value1} vs {value2}' } else: # 文本比较 similarity = self.calculate_text_similarity(v1, v2) if similarity < self.similarity_threshold: result['match'] = False result['difference'] = { 'type': 'table_text', # ✅ 文本差异 'value1': value1, 'value2': value2, 'similarity': similarity, 'description': f'文本不一致: {value1} vs {value2} (相似度: {similarity:.1f}%)' } return result def extract_datetime(self, text: str) -> str: """提取并标准化日期时间""" # 尝试匹配各种日期时间格式 patterns = [ (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})\s*(\d{1,2}):(\d{1,2}):(\d{1,2})', lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)} {m.group(4).zfill(2)}:{m.group(5).zfill(2)}:{m.group(6).zfill(2)}"), (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})', lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"), (r'(\d{4})年(\d{1,2})月(\d{1,2})日', lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"), ] for pattern, formatter in patterns: match = re.search(pattern, text) if match: return formatter(match) return text def detect_table_header_row(self, table: List[List[str]]) -> int: """ 智能检测表格的表头行索引 策略: 1. 查找包含典型表头关键词的行(如:序号、编号、时间、日期、金额等) 2. 检查该行后续行是否为数据行(包含数字、日期等) 3. 返回表头行的索引,如果找不到则返回0 """ # 常见表头关键词 header_keywords = [ # 通用表头 '序号', '编号', '时间', '日期', '名称', '类型', '金额', '数量', '单价', '备注', '说明', '状态', '类别', '方式', '账号', '单号', '订单', # 流水表格特定 '交易单号', '交易时间', '交易类型', '收/支', '支出', '收入', '交易方式', '交易对方', '商户单号', '付款方式', '收款方', # 英文表头 'no', 'id', 'time', 'date', 'name', 'type', 'amount', 'status' ] for row_idx, row in enumerate(table): if not row: continue # 计算该行包含表头关键词的单元格数量 keyword_count = 0 for cell in row: cell_lower = cell.lower().strip() for keyword in header_keywords: if keyword in cell_lower: keyword_count += 1 break # 如果超过一半的单元格包含表头关键词,认为是表头行 if keyword_count >= len(row) * 0.4 and keyword_count >= 2: # 验证:检查下一行是否像数据行 if row_idx + 1 < len(table): next_row = table[row_idx + 1] if self.is_data_row(next_row): print(f" 📍 检测到表头在第 {row_idx + 1} 行") return row_idx # 如果没有找到明确的表头行,返回0(默认第一行) print(f" ⚠️ 未检测到明确表头,默认使用第1行") return 0 def is_data_row(self, row: List[str]) -> bool: """判断是否为数据行(包含数字、日期等)""" data_pattern_count = 0 for cell in row: if not cell: continue # 检查是否包含数字 if re.search(r'\d', cell): data_pattern_count += 1 # 检查是否为日期时间格式 if re.search(r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}', cell): data_pattern_count += 1 # 如果超过一半的单元格包含数据特征,认为是数据行 return data_pattern_count >= len(row) * 0.5 def compare_table_flow_list(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]: """专门的流水列表表格比较算法 - 支持表头不在第一行""" differences = [] if not table1 or not table2: return [{ 'type': 'table_empty', 'description': '表格为空', 'severity': 'critical' }] print(f"\n📋 开始流水表格对比...") # 第一步:智能检测表头位置 header_row_idx1 = self.detect_table_header_row(table1) header_row_idx2 = self.detect_table_header_row(table2) if header_row_idx1 != header_row_idx2: differences.append({ 'type': 'table_header_position', 'position': '表头位置', 'file1_value': f'第{header_row_idx1 + 1}行', 'file2_value': f'第{header_row_idx2 + 1}行', 'description': f'表头位置不一致: 文件1在第{header_row_idx1 + 1}行,文件2在第{header_row_idx2 + 1}行', 'severity': 'high' }) # 第二步:比对表头前的内容(按单元格比对) if header_row_idx1 > 0 or header_row_idx2 > 0: print(f"\n📝 对比表头前的内容...") # 提取表头前的内容作为单独的"表格" pre_header_table1 = table1[:header_row_idx1] if header_row_idx1 > 0 else [] pre_header_table2 = table2[:header_row_idx2] if header_row_idx2 > 0 else [] if pre_header_table1 or pre_header_table2: # 复用compare_tables方法进行比对 pre_header_diffs = self.compare_tables(pre_header_table1, pre_header_table2) # 修改:统一类型为 table_pre_header for diff in pre_header_diffs: diff['type'] = 'table_pre_header' diff['position'] = f"表头前{diff['position']}" diff['severity'] = 'medium' print(f" ⚠️ {diff['position']}: {diff['description']}") differences.extend(pre_header_diffs) # 第三步:比较表头 headers1 = table1[header_row_idx1] headers2 = table2[header_row_idx2] print(f"\n📋 对比表头...") print(f" 文件1表头 (第{header_row_idx1 + 1}行): {headers1}") print(f" 文件2表头 (第{header_row_idx2 + 1}行): {headers2}") header_result = self.compare_table_headers(headers1, headers2) if not header_result['match']: print(f"\n❌ 表头不匹配,严重错误!") for diff in header_result['differences']: print(f" - {diff['description']}") differences.append({ 'type': 'table_header_critical', 'position': '表头', 'file1_value': ', '.join(headers1), 'file2_value': ', '.join(headers2), 'description': diff['description'], 'severity': 'critical' }) return differences print(f"✅ 表头匹配成功") # 第四步:检测列类型 column_types = [] for col_idx in range(len(headers1)): col_values1 = [ row[col_idx] for row in table1[header_row_idx1 + 1:] if col_idx < len(row) ] col_type = self.detect_column_type(col_values1) column_types.append(col_type) print(f" 列 {col_idx + 1} ({headers1[col_idx]}): {col_type}") # 第五步:逐行比较数据 data_rows1 = table1[header_row_idx1 + 1:] data_rows2 = table2[header_row_idx2 + 1:] max_rows = max(len(data_rows1), len(data_rows2)) print(f"\n📊 开始逐行对比数据 (共{max_rows}行)...") for row_idx in range(max_rows): row1 = data_rows1[row_idx] if row_idx < len(data_rows1) else [] row2 = data_rows2[row_idx] if row_idx < len(data_rows2) else [] # 实际行号(加上表头行索引) actual_row_num = header_row_idx1 + row_idx + 2 if not row1: differences.append({ 'type': 'table_row_missing', 'position': f'第{actual_row_num}行', 'file1_value': '', 'file2_value': ', '.join(row2), 'description': f'文件1缺少第{actual_row_num}行', 'severity': 'high', 'row_index': actual_row_num }) continue if not row2: # ✅ 修改:整行缺失按单元格输出 differences.append({ 'type': 'table_row_missing', 'position': f'第{actual_row_num}行', 'file1_value': ', '.join(row1), 'file2_value': '', 'description': f'文件2缺少第{actual_row_num}行', 'severity': 'high', 'row_index': actual_row_num }) continue # ✅ 修改:逐列比较,每个单元格差异独立输出 max_cols = max(len(row1), len(row2)) for col_idx in range(max_cols): cell1 = row1[col_idx] if col_idx < len(row1) else '' cell2 = row2[col_idx] if col_idx < len(row2) else '' # 跳过图片内容 if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2: continue column_type = column_types[col_idx] if col_idx < len(column_types) else 'text' column_name = headers1[col_idx] if col_idx < len(headers1) else f'列{col_idx + 1}' compare_result = self.compare_cell_value(cell1, cell2, column_type, column_name) if not compare_result['match']: # ✅ 直接将单元格差异添加到differences列表 diff_info = compare_result['difference'] differences.append({ 'type': diff_info['type'], # 使用原始类型(table_amount, table_text等) 'position': f'第{actual_row_num}行第{col_idx + 1}列', 'file1_value': diff_info['value1'], 'file2_value': diff_info['value2'], 'description': diff_info['description'], 'severity': 'medium', 'row_index': actual_row_num, 'col_index': col_idx, 'column_name': column_name, 'column_type': column_type, # 保留额外信息 **{k: v for k, v in diff_info.items() if k not in ['type', 'value1', 'value2', 'description']} }) print(f" ⚠️ 第{actual_row_num}行第{col_idx + 1}列({column_name}): {diff_info['description']}") print(f"\n✅ 流水表格对比完成,发现 {len(differences)} 个差异") return differences def compare_tables_with_mode(self, table1: List[List[str]], table2: List[List[str]], mode: str = 'standard') -> List[Dict]: """根据模式选择表格比较算法""" if mode == 'flow_list': return self.compare_table_flow_list(table1, table2) else: return self.compare_tables(table1, table2) def compare_files(self, file1_path: str, file2_path: str) -> Dict: """改进的文件比较方法 - 支持不同的表格比较模式""" # 读取文件 with open(file1_path, 'r', encoding='utf-8') as f: content1 = f.read() with open(file2_path, 'r', encoding='utf-8') as f: content2 = f.read() # 提取表格和段落 tables1 = self.extract_table_data(content1) tables2 = self.extract_table_data(content2) paras1 = self.extract_paragraphs(content1) paras2 = self.extract_paragraphs(content2) # 比较结果 all_differences = [] # 比较表格 - 使用指定的比较模式 if tables1 and tables2: table_diffs = self.compare_tables_with_mode( tables1[0], tables2[0], mode=self.table_comparison_mode ) all_differences.extend(table_diffs) elif tables1 and not tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': f'包含{len(tables1)}个表格', 'file2_value': '无表格', 'description': '文件1包含表格但文件2无表格', 'severity': 'high' }) elif not tables1 and tables2: all_differences.append({ 'type': 'table_structure', 'position': '表格结构', 'file1_value': '无表格', 'file2_value': f'包含{len(tables2)}个表格', 'description': '文件2包含表格但文件1无表格', 'severity': 'high' }) # 使用增强的段落比较 para_diffs = self.compare_paragraphs_with_flexible_matching(paras1, paras2) all_differences.extend(para_diffs) # ✅ 改进统计信息 - 细化分类 stats = { 'total_differences': len(all_differences), 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]), 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']), 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']), 'datetime_differences': len([d for d in all_differences if d['type'] == 'table_datetime']), 'text_differences': len([d for d in all_differences if d['type'] == 'table_text']), 'table_pre_header': len([d for d in all_differences if d['type'] == 'table_pre_header']), 'table_header_critical': len([d for d in all_differences if d['type'] == 'table_header_critical']), 'table_header_position': len([d for d in all_differences if d['type'] == 'table_header_position']), 'table_row_missing': len([d for d in all_differences if d['type'] == 'table_row_missing']), 'high_severity': len([d for d in all_differences if d.get('severity') == 'critical' or d.get('severity') == 'high']), 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']), 'low_severity': len([d for d in all_differences if d.get('severity') == 'low']) } result = { 'differences': all_differences, 'statistics': stats, 'file1_tables': len(tables1), 'file2_tables': len(tables2), 'file1_paragraphs': len(paras1), 'file2_paragraphs': len(paras2), 'file1_path': file1_path, 'file2_path': file2_path, } return result def generate_json_report(self, comparison_result: Dict, output_file: str): """生成JSON格式的比较报告""" # report_data = { # 'comparison_summary': { # 'timestamp': re.sub(r'[^\w\-_\.]', '_', str(comparison_result.get('timestamp', ''))), # 'file1': comparison_result['file1_path'], # 'file2': comparison_result['file2_path'], # 'statistics': comparison_result['statistics'], # 'file_info': { # 'file1_tables': comparison_result['file1_tables'], # 'file2_tables': comparison_result['file2_tables'], # 'file1_paragraphs': comparison_result['file1_paragraphs'], # 'file2_paragraphs': comparison_result['file2_paragraphs'] # } # }, # 'differences': comparison_result['differences'] # } with open(output_file, 'w', encoding='utf-8') as f: json.dump(comparison_result, f, ensure_ascii=False, indent=2) def generate_markdown_report(self, comparison_result: Dict, output_file: str): """生成Markdown格式的比较报告 - 修复类型映射""" with open(output_file, 'w', encoding='utf-8') as f: f.write("# OCR结果对比报告\n\n") # 基本信息 f.write("## 基本信息\n\n") f.write(f"- **文件1**: `{comparison_result['file1_path']}`\n") f.write(f"- **文件2**: `{comparison_result['file2_path']}`\n") f.write(f"- **比较时间**: {comparison_result.get('timestamp', 'N/A')}\n\n") # 统计信息 stats = comparison_result['statistics'] f.write("## 统计信息\n\n") f.write(f"- 总差异数量: **{stats['total_differences']}**\n") f.write(f"- 表格差异: **{stats['table_differences']}**\n") f.write(f"- 其中表格金额差异: **{stats['amount_differences']}**\n") f.write(f"- 段落差异: **{stats['paragraph_differences']}**\n") f.write(f"- 高严重度: **{stats['high_severity']}**\n") # ✅ 新增 f.write(f"- 中严重度: **{stats['medium_severity']}**\n") # ✅ 新增 f.write(f"- 低严重度: **{stats['low_severity']}**\n") # ✅ 新增 f.write(f"- 文件1表格数: {comparison_result['file1_tables']}\n") f.write(f"- 文件2表格数: {comparison_result['file2_tables']}\n") f.write(f"- 文件1段落数: {comparison_result['file1_paragraphs']}\n") f.write(f"- 文件2段落数: {comparison_result['file2_paragraphs']}\n\n") # 差异摘要 if stats['total_differences'] == 0: f.write("## 结论\n\n") f.write("🎉 **完美匹配!没有发现任何差异。**\n\n") else: f.write("## 差异摘要\n\n") # ✅ 更新类型映射 type_name_map = { 'table_amount': '💰 表格金额差异', 'table_text': '📝 表格文本差异', 'table_pre_header': '📋 表头前内容差异', 'table_header_position': '📍 表头位置差异', 'table_header_critical': '❌ 表头严重错误', 'table_row_missing': '🚫 表格行缺失', 'table_row_data': '📊 表格数据差异', 'table_structure': '🏗️ 表格结构差异', 'paragraph': '📄 段落差异' } # 按类型分组显示差异 diff_by_type = {} for diff in comparison_result['differences']: diff_type = diff['type'] if diff_type not in diff_by_type: diff_by_type[diff_type] = [] diff_by_type[diff_type].append(diff) for diff_type, diffs in diff_by_type.items(): type_name = type_name_map.get(diff_type, f'❓ {diff_type}') f.write(f"### {type_name} ({len(diffs)}个)\n\n") for i, diff in enumerate(diffs, 1): f.write(f"**{i}. {diff['position']}**\n") f.write(f"- 文件1: `{diff['file1_value']}`\n") f.write(f"- 文件2: `{diff['file2_value']}`\n") f.write(f"- 说明: {diff['description']}\n") if 'severity' in diff: severity_icon = {'critical': '🔴', 'high': '🟠', 'medium': '🟡', 'low': '🟢'} f.write(f"- 严重度: {severity_icon.get(diff['severity'], '⚪')} {diff['severity']}\n") f.write("\n") # 详细差异列表 if comparison_result['differences']: f.write("## 详细差异列表\n\n") f.write("| 序号 | 类型 | 位置 | 文件1内容 | 文件2内容 | 描述 | 严重度 |\n") f.write("| --- | --- | --- | --- | --- | --- | --- |\n") for i, diff in enumerate(comparison_result['differences'], 1): severity = diff.get('severity', 'N/A') f.write(f"| {i} | {diff['type']} | {diff['position']} | ") f.write(f"`{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}` | ") f.write(f"`{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}` | ") f.write(f"{diff['description']} | {severity} |\n") def compare_ocr_results(file1_path: str, file2_path: str, output_file: str = "comparison_report", output_format: str = "markdown", ignore_images: bool = True, table_mode: str = 'standard', similarity_algorithm: str = 'ratio'): """ 比较两个OCR结果文件 Args: file1_path: 第一个OCR结果文件路径 file2_path: 第二个OCR结果文件路径 output_file: 输出文件名(不含扩展名) output_format: 输出格式 ('json', 'markdown', 'both') ignore_images: 是否忽略图片内容 table_mode: 表格比较模式 ('standard', 'flow_list') similarity_algorithm: 相似度算法 ('ratio', 'partial_ratio', 'token_sort_ratio', 'token_set_ratio') """ comparator = OCRResultComparator() comparator.table_comparison_mode = table_mode # 根据参数选择相似度算法 if similarity_algorithm == 'partial_ratio': comparator.calculate_text_similarity = lambda t1, t2: fuzz.partial_ratio(t1, t2) elif similarity_algorithm == 'token_sort_ratio': comparator.calculate_text_similarity = lambda t1, t2: fuzz.token_sort_ratio(t1, t2) elif similarity_algorithm == 'token_set_ratio': comparator.calculate_text_similarity = lambda t1, t2: fuzz.token_set_ratio(t1, t2) print("🔍 开始对比OCR结果...") print(f"📄 文件1: {file1_path}") print(f"📄 文件2: {file2_path}") print(f"📊 表格模式: {table_mode}") print(f"🔧 相似度算法: {similarity_algorithm}") try: # 执行比较 result = comparator.compare_files(file1_path, file2_path) # 添加时间戳 import datetime result['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 生成报告 if output_format in ['json', 'both']: json_file = f"{output_file}.json" comparator.generate_json_report(result, json_file) print(f"📄 JSON报告已保存至: {json_file}") if output_format in ['markdown', 'both']: md_file = f"{output_file}.md" comparator.generate_markdown_report(result, md_file) print(f"📝 Markdown报告已保存至: {md_file}") # 打印简要结果 print(f"\n📊 对比完成!") print(f" 总差异数: {result['statistics']['total_differences']}") print(f" 表格差异: {result['statistics']['table_differences']}") print(f" 其中表格金额差异: {result['statistics']['amount_differences']}") print(f" 段落差异: {result['statistics']['paragraph_differences']}") # 打印前几个重要差异 if result['differences']: print(f"\n🔍 前3个重要差异:") for i, diff in enumerate(result['differences'][:3], 1): print(f" {i}. {diff['position']}: {diff['description']}") print(f" 文件1: '{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}'") print(f" 文件2: '{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}'") else: print(f"\n🎉 恭喜!两个文件内容完全一致!") # 添加处理统计信息(模仿 ocr_by_vlm.py 的风格) print("\n📊 对比处理统计") print(f" 文件1路径: {result['file1_path']}") print(f" 文件2路径: {result['file2_path']}") print(f" 输出文件: {output_file}") print(f" 输出格式: {output_format}") print(f" 忽略图片: {ignore_images}") print(f" 处理时间: {result['timestamp']}") print(f" 文件1表格数: {result['file1_tables']}") print(f" 文件2表格数: {result['file2_tables']}") print(f" 文件1段落数: {result['file1_paragraphs']}") print(f" 文件2段落数: {result['file2_paragraphs']}") return result except Exception as e: import traceback traceback.print_exc() raise Exception(f"OCR对比任务失败: {e}") if __name__ == "__main__": parser = argparse.ArgumentParser(description='OCR结果对比工具') parser.add_argument('file1', nargs='?', help='第一个OCR结果文件路径') parser.add_argument('file2', nargs='?', help='第二个OCR结果文件路径') parser.add_argument('-o', '--output', default='comparison_report', help='输出文件名') parser.add_argument('-f', '--format', choices=['json', 'markdown', 'both'], default='markdown', help='输出格式') parser.add_argument('--ignore-images', action='store_true', help='忽略图片内容') parser.add_argument('--table-mode', choices=['standard', 'flow_list'], default='standard', help='表格比较模式') parser.add_argument('--similarity-algorithm', choices=['ratio', 'partial_ratio', 'token_sort_ratio', 'token_set_ratio'], default='ratio', help='相似度算法') args = parser.parse_args() if args.file1 and args.file2: result = compare_ocr_results( file1_path=args.file1, file2_path=args.file2, output_file=args.output, output_format=args.format, ignore_images=args.ignore_images, table_mode=args.table_mode, similarity_algorithm=args.similarity_algorithm ) else: # 测试流水表格对比 result = compare_ocr_results( file1_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/data_PPStructureV3_Results/A用户_单元格扫描流水_page_001.md', file2_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/mineru-vlm-2.5.3_Results/A用户_单元格扫描流水_page_001.md', output_file=f'./output/flow_list_comparison_{time.strftime("%Y%m%d_%H%M%S")}', output_format='both', ignore_images=True, table_mode='flow_list', # 使用流水表格模式 similarity_algorithm='ratio' )