| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137 |
- import sys
- import time
- import re
- import difflib
- import json
- import argparse
- from typing import Dict, List, Tuple
- import markdown
- from bs4 import BeautifulSoup
- from fuzzywuzzy import fuzz
- class OCRResultComparator:
- def __init__(self):
- self.differences = []
- self.similarity_threshold = 95
- self.max_paragraph_window = 6
- self.table_comparison_mode = 'standard' # 新增:表格比较模式
- self.header_similarity_threshold = 80 # 表头相似度阈值
-
- def normalize_text(self, text: str) -> str:
- """标准化文本:去除多余空格、回车等无效字符"""
- if not text:
- return ""
- # 去除多余的空白字符
- text = re.sub(r'\s+', ' ', text.strip())
- # 去除标点符号周围的空格
- text = re.sub(r'\s*([,。:;!?、])\s*', r'\1', text)
- return text
-
- def is_image_reference(self, text: str) -> bool:
- """判断是否为图片引用或描述"""
- image_keywords = [
- '图', '图片', '图像', 'image', 'figure', 'fig',
- '照片', '截图', '示意图', '流程图', '结构图'
- ]
- # 检查是否包含图片相关关键词
- for keyword in image_keywords:
- if keyword in text.lower():
- return True
-
- # 检查是否为Markdown图片语法
- if re.search(r'!\[.*?\]\(.*?\)', text):
- return True
-
- # 检查是否为HTML图片标签
- if re.search(r'<img[^>]*>', text, re.IGNORECASE):
- return True
-
- return False
-
- def extract_table_data(self, md_content: str) -> List[List[List[str]]]:
- """从Markdown中提取表格数据"""
- tables = []
-
- # 使用BeautifulSoup解析HTML表格
- soup = BeautifulSoup(md_content, 'html.parser')
- html_tables = soup.find_all('table')
-
- for table in html_tables:
- table_data = []
- rows = table.find_all('tr')
-
- for row in rows:
- cells = row.find_all(['td', 'th'])
- row_data = []
- for cell in cells:
- cell_text = self.normalize_text(cell.get_text())
- # 跳过图片内容
- if not self.is_image_reference(cell_text):
- row_data.append(cell_text)
- else:
- row_data.append("[图片内容-忽略]")
-
- if row_data: # 只添加非空行
- table_data.append(row_data)
-
- if table_data:
- tables.append(table_data)
-
- return tables
-
- def merge_split_paragraphs(self, lines: List[str]) -> List[str]:
- # 合并连续的非空行作为一个段落,且过滤图片内容
- merged_lines = []
- current_paragraph = ""
- for i, line in enumerate(lines):
- # 跳过空行
- if not line:
- if current_paragraph:
- merged_lines.append(current_paragraph)
- current_paragraph = ""
- continue
- # 跳过图片内容
- if self.is_image_reference(line):
- continue
- # 检查是否是标题(以数字、中文数字或特殊标记开头)
- is_title = (
- line.startswith(('一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、')) or
- line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')) or
- line.startswith('#')
- )
- # 如果是标题,结束当前段落
- if is_title:
- if current_paragraph:
- merged_lines.append(current_paragraph)
- current_paragraph = ""
- merged_lines.append(line)
- else:
- # 检查是否应该与前一行合并 # 如果当前段落不为空,且当前段落最后一个字符非空白字符
- if current_paragraph and not current_paragraph.endswith((' ', '\t')):
- current_paragraph += line
- else:
- current_paragraph = line
-
- # 处理最后一个段落
- if current_paragraph:
- merged_lines.append(current_paragraph)
-
- return merged_lines
- def extract_paragraphs(self, md_content: str) -> List[str]:
- """提取段落文本"""
- # 移除表格 - 修复正则表达式
- # 使用 IGNORECASE 和 DOTALL 标志
- content = re.sub(r'<table[^>]*>.*?</table>', '', md_content, flags=re.DOTALL | re.IGNORECASE)
-
- # 移除其他 HTML 标签
- content = re.sub(r'<[^>]+>', '', content)
-
- # 移除 Markdown 注释
- content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
-
- # 分割段落
- paragraphs = []
- lines = content.split('\n')
- merged_lines = self.merge_split_paragraphs(lines)
-
- for line in merged_lines:
- normalized = self.normalize_text(line)
- if normalized:
- paragraphs.append(normalized)
- else:
- print(f"跳过的内容无效或图片段落: {line[0:30] if line else ''}...")
-
- return paragraphs
-
- def compare_tables(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]:
- """比较表格数据"""
- differences = []
-
- # 确定最大行数
- max_rows = max(len(table1), len(table2))
-
- for i in range(max_rows):
- row1 = table1[i] if i < len(table1) else []
- row2 = table2[i] if i < len(table2) else []
-
- # 确定最大列数
- max_cols = max(len(row1), len(row2))
-
- for j in range(max_cols):
- cell1 = row1[j] if j < len(row1) else ""
- cell2 = row2[j] if j < len(row2) else ""
-
- # 跳过图片内容比较
- if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2:
- continue
-
- if cell1 != cell2:
- # 特别处理数字金额
- if self.is_numeric(cell1) and self.is_numeric(cell2):
- num1 = self.parse_number(cell1)
- num2 = self.parse_number(cell2)
- if abs(num1 - num2) > 0.001: # 允许小数精度误差
- differences.append({
- 'type': 'table_amount',
- 'position': f'行{i+1}列{j+1}',
- 'file1_value': cell1,
- 'file2_value': cell2,
- 'description': f'金额不一致: {cell1} vs {cell2}',
- 'row_index': i,
- 'col_index': j
- })
- else:
- differences.append({
- 'type': 'table_text',
- 'position': f'行{i+1}列{j+1}',
- 'file1_value': cell1,
- 'file2_value': cell2,
- 'description': f'文本不一致: {cell1} vs {cell2}',
- 'row_index': i,
- 'col_index': j
- })
-
- return differences
-
- def is_numeric(self, text: str) -> bool:
- """判断文本是否为数字"""
- if not text:
- return False
- # 移除千分位分隔符和负号
- clean_text = re.sub(r'[,,-]', '', text)
- try:
- float(clean_text)
- return True
- except ValueError:
- return False
-
- def parse_number(self, text: str) -> float:
- """解析数字"""
- if not text:
- return 0.0
- clean_text = re.sub(r'[,,]', '', text)
- try:
- return float(clean_text)
- except ValueError:
- return 0.0
-
- def calculate_text_similarity(self, text1: str, text2: str) -> float:
- """改进的相似度计算"""
- if not text1 and not text2:
- return 100.0
- if not text1 or not text2:
- return 0.0
-
- # 如果标准化后完全相同,返回100%
- if text1 == text2:
- return 100.0
-
- # 使用多种相似度算法
- similarity_scores = [
- fuzz.ratio(text1, text2),
- # fuzz.partial_ratio(text1, text2),
- # fuzz.token_sort_ratio(text1, text2),
- # fuzz.token_set_ratio(text1, text2)
- ]
-
- # 对于包含关系,给予更高的权重
- # if text1 in text2 or text2 in text1:
- # max_score = max(similarity_scores)
- # # 提升包含关系的相似度
- # return min(100.0, max_score + 10)
-
- return max(similarity_scores)
-
- def compare_paragraphs_with_flexible_matching(self, paras1: List[str], paras2: List[str]) -> List[Dict]:
- """改进的段落匹配算法 - 更好地处理段落重组"""
- differences = []
-
- # 直接调用进行预处理
- meaningful_paras1 = paras1
- meaningful_paras2 = paras2
- # 使用预处理后的段落进行匹配
- used_paras1 = set()
- used_paras2 = set()
-
- best_match = {'similarity': 0.0} # 初始化best_match
- # 文件1和文件2同时向下遍历,当有匹配项时,文件2的窗口从匹配项的下一个位置开始
- paras2_idx = 0
- for window_size1 in range(1, min(self.max_paragraph_window, len(meaningful_paras1) + 1)): # 增加到6个段落
- for i in range(len(meaningful_paras1) - window_size1 + 1):
- if any(idx in used_paras1 for idx in range(i, i + window_size1)):
- continue
-
- # 合并文件1中的段落
- combined_para1 = "".join(meaningful_paras1[i:i+window_size1])
-
- # 在文件2中寻找最佳匹配
- best_match = self._find_best_match_in_paras2_improved(
- combined_para1,
- meaningful_paras2[paras2_idx: min(paras2_idx + self.max_paragraph_window, len(meaningful_paras2))],
- paras2_idx
- )
-
- if best_match and best_match['similarity'] >= self.similarity_threshold:
- paras2_idx = best_match['indices'][-1] + 1 # 更新文件2的起始索引
- # 记录匹配
- for idx in range(i, i + window_size1):
- used_paras1.add(idx)
- for idx in best_match['indices']:
- used_paras2.add(idx)
-
- # 只有当相似度明显不同时才记录差异
- if best_match['similarity'] < 95.0: # 提高阈值到95%
- severity = 'low' if best_match['similarity'] >= 90 else 'medium'
- differences.append({
- 'type': 'paragraph',
- 'position': f'段落{i+1}' + (f'-{i+window_size1}' if window_size1 > 1 else ''),
- 'file1_value': combined_para1,
- 'file2_value': best_match['text'],
- 'description': f'段落格式差异 (相似度: {best_match["similarity"]:.1f}%)',
- 'similarity': best_match['similarity'],
- 'severity': severity
- })
-
- if paras2_idx >= len(meaningful_paras2):
- break # 文件2已全部匹配完,退出
-
- # 处理未匹配的有意义段落
- for i, para in enumerate(meaningful_paras1):
- if i not in used_paras1:
- differences.append({
- 'type': 'paragraph',
- 'position': f'段落{i+1}',
- 'file1_value': para,
- 'file2_value': "",
- 'description': '文件1中独有的段落',
- 'similarity': 0.0,
- 'severity': 'medium'
- })
-
- for j, para in enumerate(meaningful_paras2):
- if j not in used_paras2:
- differences.append({
- 'type': 'paragraph',
- 'position': f'段落{j+1}',
- 'file1_value': "",
- 'file2_value': para,
- 'description': '文件2中独有的段落',
- 'similarity': 0.0,
- 'severity': 'medium'
- })
-
- return differences
- def _find_best_match_in_paras2_improved(self, target_text: str, paras2: List[str],
- paras2_idx: int) -> Dict:
- """改进的段落匹配方法"""
- best_match = None
-
- for window_size in range(1, len(paras2) + 1):
- for j in range(len(paras2) - window_size + 1):
- combined_para2 = "".join(paras2[j:j+window_size])
- similarity = self.calculate_text_similarity(target_text, combined_para2)
- if best_match and best_match['similarity'] == 100.0:
- break # 找到完美匹配,提前退出
-
- if not best_match or similarity > best_match['similarity']:
- best_match = {
- 'text': combined_para2,
- 'similarity': similarity,
- 'indices': list(range(j + paras2_idx, j + paras2_idx + window_size))
- }
- if best_match and best_match['similarity'] == 100.0:
- break # 找到完美匹配,提前退出
-
- # Return empty dict if no match found
- if best_match is None:
- return {
- 'text': '',
- 'similarity': 0.0,
- 'indices': []
- }
-
- return best_match
-
- def detect_column_type(self, column_values: List[str]) -> str:
- """检测列的数据类型"""
- if not column_values:
- return 'text'
-
- # 过滤空值
- non_empty_values = [v for v in column_values if v and v.strip()]
- if not non_empty_values:
- return 'text'
-
- # 检测是否为日期时间
- datetime_patterns = [
- r'\d{4}[-/]\d{1,2}[-/]\d{1,2}', # YYYY-MM-DD
- r'\d{4}[-/]\d{1,2}[-/]\d{1,2}\s*\d{1,2}:\d{1,2}:\d{1,2}', # YYYY-MM-DD HH:MM:SS
- r'\d{4}年\d{1,2}月\d{1,2}日', # 中文日期
- ]
-
- datetime_count = 0
- for value in non_empty_values[:5]: # 检查前5个值
- for pattern in datetime_patterns:
- if re.search(pattern, value):
- datetime_count += 1
- break
-
- if datetime_count >= len(non_empty_values[:5]) * 0.6:
- return 'datetime'
-
- # 检测是否为数字/金额
- numeric_count = 0
- for value in non_empty_values[:5]:
- if self.is_numeric(value):
- numeric_count += 1
-
- if numeric_count >= len(non_empty_values[:5]) * 0.6:
- return 'numeric'
-
- # 默认为文本
- return 'text'
-
- def normalize_header_text(self, text: str) -> str:
- """标准化表头文本"""
- # 移除括号及其内容
- text = re.sub(r'[((].*?[))]', '', text)
- # 统一空格
- text = re.sub(r'\s+', '', text)
- # 移除特殊字符
- text = re.sub(r'[^\w\u4e00-\u9fff]', '', text)
- return text.lower().strip()
-
- def compare_table_headers(self, headers1: List[str], headers2: List[str]) -> Dict:
- """比较表格表头"""
- result = {
- 'match': True,
- 'differences': [],
- 'column_mapping': {}, # 列映射关系
- 'similarity_scores': []
- }
-
- if len(headers1) != len(headers2):
- result['match'] = False
- result['differences'].append({
- 'type': 'table_header_critical',
- 'description': f'表头列数不一致: {len(headers1)} vs {len(headers2)}',
- 'severity': 'critical'
- })
- return result
-
- # 逐列比较表头
- for i, (h1, h2) in enumerate(zip(headers1, headers2)):
- norm_h1 = self.normalize_header_text(h1)
- norm_h2 = self.normalize_header_text(h2)
-
- similarity = self.calculate_text_similarity(norm_h1, norm_h2)
- result['similarity_scores'].append({
- 'column_index': i,
- 'header1': h1,
- 'header2': h2,
- 'similarity': similarity
- })
-
- if similarity < self.header_similarity_threshold:
- result['match'] = False
- result['differences'].append({
- 'type': 'table_header_mismatch',
- 'column_index': i,
- 'header1': h1,
- 'header2': h2,
- 'similarity': similarity,
- 'description': f'第{i+1}列表头不匹配: "{h1}" vs "{h2}" (相似度: {similarity:.1f}%)',
- 'severity': 'medium' if similarity < 50 else 'high'
- })
- else:
- result['column_mapping'][i] = i # 建立列映射
-
- return result
-
- def compare_cell_value(self, value1: str, value2: str, column_type: str,
- column_name: str = '') -> Dict:
- """比较单元格值 - 统一错误类型"""
- result = {
- 'match': True,
- 'difference': None
- }
-
- # 标准化值
- v1 = self.normalize_text(value1)
- v2 = self.normalize_text(value2)
-
- if v1 == v2:
- return result
-
- # 根据列类型采用不同的比较策略
- if column_type == 'numeric':
- # 数字/金额比较
- if self.is_numeric(v1) and self.is_numeric(v2):
- num1 = self.parse_number(v1)
- num2 = self.parse_number(v2)
- if abs(num1 - num2) > 0.01: # 允许0.01的误差
- result['match'] = False
- result['difference'] = {
- 'type': 'table_amount', # ✅ 统一类型
- 'value1': value1,
- 'value2': value2,
- 'diff_amount': abs(num1 - num2),
- 'description': f'金额不一致: {value1} vs {value2}'
- }
- else:
- result['match'] = False
- result['difference'] = {
- 'type': 'table_amount', # ✅ 格式错误也算金额差异
- 'value1': value1,
- 'value2': value2,
- 'description': f'数字格式错误: {value1} vs {value2}'
- }
-
- elif column_type == 'datetime':
- # 日期时间比较
- datetime1 = self.extract_datetime(v1)
- datetime2 = self.extract_datetime(v2)
-
- if datetime1 != datetime2:
- result['match'] = False
- result['difference'] = {
- 'type': 'table_datetime', # ✅ 日期时间类型
- 'value1': value1,
- 'value2': value2,
- 'description': f'日期时间不一致: {value1} vs {value2}'
- }
-
- else:
- # 文本比较
- similarity = self.calculate_text_similarity(v1, v2)
- if similarity < self.similarity_threshold:
- result['match'] = False
- result['difference'] = {
- 'type': 'table_text', # ✅ 文本差异
- 'value1': value1,
- 'value2': value2,
- 'similarity': similarity,
- 'description': f'文本不一致: {value1} vs {value2} (相似度: {similarity:.1f}%)'
- }
-
- return result
-
- def extract_datetime(self, text: str) -> str:
- """提取并标准化日期时间"""
- # 尝试匹配各种日期时间格式
- patterns = [
- (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})\s*(\d{1,2}):(\d{1,2}):(\d{1,2})',
- lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)} {m.group(4).zfill(2)}:{m.group(5).zfill(2)}:{m.group(6).zfill(2)}"),
- (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})',
- lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"),
- (r'(\d{4})年(\d{1,2})月(\d{1,2})日',
- lambda m: f"{m.group(1)}-{m.group(2).zfill(2)}-{m.group(3).zfill(2)}"),
- ]
-
- for pattern, formatter in patterns:
- match = re.search(pattern, text)
- if match:
- return formatter(match)
-
- return text
-
- def detect_table_header_row(self, table: List[List[str]]) -> int:
- """
- 智能检测表格的表头行索引
-
- 策略:
- 1. 查找包含典型表头关键词的行(如:序号、编号、时间、日期、金额等)
- 2. 检查该行后续行是否为数据行(包含数字、日期等)
- 3. 返回表头行的索引,如果找不到则返回0
- """
- # 常见表头关键词
- header_keywords = [
- # 通用表头
- '序号', '编号', '时间', '日期', '名称', '类型', '金额', '数量', '单价',
- '备注', '说明', '状态', '类别', '方式', '账号', '单号', '订单',
- # 流水表格特定
- '交易单号', '交易时间', '交易类型', '收/支', '支出', '收入',
- '交易方式', '交易对方', '商户单号', '付款方式', '收款方',
- # 英文表头
- 'no', 'id', 'time', 'date', 'name', 'type', 'amount', 'status'
- ]
-
- for row_idx, row in enumerate(table):
- if not row:
- continue
-
- # 计算该行包含表头关键词的单元格数量
- keyword_count = 0
- for cell in row:
- cell_lower = cell.lower().strip()
- for keyword in header_keywords:
- if keyword in cell_lower:
- keyword_count += 1
- break
-
- # 如果超过一半的单元格包含表头关键词,认为是表头行
- if keyword_count >= len(row) * 0.4 and keyword_count >= 2:
- # 验证:检查下一行是否像数据行
- if row_idx + 1 < len(table):
- next_row = table[row_idx + 1]
- if self.is_data_row(next_row):
- print(f" 📍 检测到表头在第 {row_idx + 1} 行")
- return row_idx
-
- # 如果没有找到明确的表头行,返回0(默认第一行)
- print(f" ⚠️ 未检测到明确表头,默认使用第1行")
- return 0
-
- def is_data_row(self, row: List[str]) -> bool:
- """判断是否为数据行(包含数字、日期等)"""
- data_pattern_count = 0
-
- for cell in row:
- if not cell:
- continue
-
- # 检查是否包含数字
- if re.search(r'\d', cell):
- data_pattern_count += 1
-
- # 检查是否为日期时间格式
- if re.search(r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}', cell):
- data_pattern_count += 1
-
- # 如果超过一半的单元格包含数据特征,认为是数据行
- return data_pattern_count >= len(row) * 0.5
-
- def compare_table_flow_list(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]:
- """专门的流水列表表格比较算法 - 支持表头不在第一行"""
- differences = []
-
- if not table1 or not table2:
- return [{
- 'type': 'table_empty',
- 'description': '表格为空',
- 'severity': 'critical'
- }]
-
- print(f"\n📋 开始流水表格对比...")
-
- # 第一步:智能检测表头位置
- header_row_idx1 = self.detect_table_header_row(table1)
- header_row_idx2 = self.detect_table_header_row(table2)
-
- if header_row_idx1 != header_row_idx2:
- differences.append({
- 'type': 'table_header_position',
- 'position': '表头位置',
- 'file1_value': f'第{header_row_idx1 + 1}行',
- 'file2_value': f'第{header_row_idx2 + 1}行',
- 'description': f'表头位置不一致: 文件1在第{header_row_idx1 + 1}行,文件2在第{header_row_idx2 + 1}行',
- 'severity': 'high'
- })
-
- # 第二步:比对表头前的内容(按单元格比对)
- if header_row_idx1 > 0 or header_row_idx2 > 0:
- print(f"\n📝 对比表头前的内容...")
-
- # 提取表头前的内容作为单独的"表格"
- pre_header_table1 = table1[:header_row_idx1] if header_row_idx1 > 0 else []
- pre_header_table2 = table2[:header_row_idx2] if header_row_idx2 > 0 else []
-
- if pre_header_table1 or pre_header_table2:
- # 复用compare_tables方法进行比对
- pre_header_diffs = self.compare_tables(pre_header_table1, pre_header_table2)
-
- # 修改:统一类型为 table_pre_header
- for diff in pre_header_diffs:
- diff['type'] = 'table_pre_header'
- diff['position'] = f"表头前{diff['position']}"
- diff['severity'] = 'medium'
- print(f" ⚠️ {diff['position']}: {diff['description']}")
-
- differences.extend(pre_header_diffs)
-
- # 第三步:比较表头
- headers1 = table1[header_row_idx1]
- headers2 = table2[header_row_idx2]
-
- print(f"\n📋 对比表头...")
- print(f" 文件1表头 (第{header_row_idx1 + 1}行): {headers1}")
- print(f" 文件2表头 (第{header_row_idx2 + 1}行): {headers2}")
-
- header_result = self.compare_table_headers(headers1, headers2)
-
- # ✅ 新增:检查列数是否一致
- column_count_match = len(headers1) == len(headers2)
- if not header_result['match']:
- print(f"\n⚠️ 表头文字存在差异")
- for diff in header_result['differences']:
- print(f" - {diff['description']}")
- differences.append({
- 'type': diff.get('type', 'table_header_mismatch'), # ✅ 改为 mismatch 而非 critical
- 'position': '表头',
- 'file1_value': diff.get('header1', ''),
- 'file2_value': diff.get('header2', ''),
- 'description': diff['description'],
- 'severity': diff.get('severity', 'high'),
- })
- if diff.get('severity', 'high') == 'critical':
- return differences
- else:
- print(f"✅ 表头匹配成功")
-
- # 第四步:检测列类型
- column_types1 = []
- column_types2 = []
-
- # 检测文件1的列类型
- for col_idx in range(len(headers1)):
- col_values1 = [
- row[col_idx]
- for row in table1[header_row_idx1 + 1:]
- if col_idx < len(row)
- ]
- col_type = self.detect_column_type(col_values1)
- column_types1.append(col_type)
- print(f" 文件1列 {col_idx + 1} ({headers1[col_idx]}): {col_type}")
-
- # 检测文件2的列类型
- for col_idx in range(len(headers2)):
- col_values2 = [
- row[col_idx]
- for row in table2[header_row_idx2 + 1:]
- if col_idx < len(row)
- ]
- col_type = self.detect_column_type(col_values2)
- column_types2.append(col_type)
- print(f" 文件2列 {col_idx + 1} ({headers2[col_idx]}): {col_type}")
-
- # ✅ 新增:检查列类型是否一致
- column_types_match = column_types1 == column_types2
-
- if not column_types_match:
- print(f"\n⚠️ 列类型存在差异,不再比较单元格内容...")
- for col_idx in range(min(len(column_types1), len(column_types2))):
- if column_types1[col_idx] != column_types2[col_idx]:
- differences.append({
- 'type': 'table_header_critical',
- 'position': f'第{col_idx + 1}列',
- 'file1_value': f'{headers1[col_idx]} ({column_types1[col_idx]})',
- 'file2_value': f'{headers2[col_idx]} ({column_types2[col_idx]})',
- 'description': f'列类型不一致: {column_types1[col_idx]} vs {column_types2[col_idx]}',
- 'severity': 'critical',
- 'column_index': col_idx
- })
- return differences
-
- # ✅ 使用两个文件中更准确的列类型(优先使用数据更多的文件)
- column_types = column_types1 # 默认使用文件1的列类型
-
- # 第五步:逐行比较数据
- data_rows1 = table1[header_row_idx1 + 1:]
- data_rows2 = table2[header_row_idx2 + 1:]
-
- max_rows = max(len(data_rows1), len(data_rows2))
-
- print(f"\n📊 开始逐行对比数据 (共{max_rows}行)...")
-
- for row_idx in range(max_rows):
- row1 = data_rows1[row_idx] if row_idx < len(data_rows1) else []
- row2 = data_rows2[row_idx] if row_idx < len(data_rows2) else []
-
- # 实际行号(加上表头行索引)
- actual_row_num = header_row_idx1 + row_idx + 2
-
- if not row1:
- differences.append({
- 'type': 'table_row_missing',
- 'position': f'第{actual_row_num}行',
- 'file1_value': '',
- 'file2_value': ', '.join(row2),
- 'description': f'文件1缺少第{actual_row_num}行',
- 'severity': 'high',
- 'row_index': actual_row_num
- })
- continue
-
- if not row2:
- # ✅ 修改:整行缺失按单元格输出
- differences.append({
- 'type': 'table_row_missing',
- 'position': f'第{actual_row_num}行',
- 'file1_value': ', '.join(row1),
- 'file2_value': '',
- 'description': f'文件2缺少第{actual_row_num}行',
- 'severity': 'high',
- 'row_index': actual_row_num
- })
- continue
-
- # 逐列比较,每个单元格差异独立输出
- max_cols = max(len(row1), len(row2))
-
- for col_idx in range(max_cols):
- cell1 = row1[col_idx] if col_idx < len(row1) else ''
- cell2 = row2[col_idx] if col_idx < len(row2) else ''
-
- # 跳过图片内容
- if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2:
- continue
-
- # ✅ 使用对应的列类型
- column_type = column_types[col_idx] if col_idx < len(column_types) else 'text'
-
- # ✅ 获取列名(如果表头不匹配,显示两个表头)
- if header_result['match']:
- column_name = headers1[col_idx] if col_idx < len(headers1) else f'列{col_idx + 1}'
- else:
- col_name1 = headers1[col_idx] if col_idx < len(headers1) else f'列{col_idx + 1}'
- col_name2 = headers2[col_idx] if col_idx < len(headers2) else f'列{col_idx + 1}'
- column_name = f"{col_name1}/{col_name2}"
-
- compare_result = self.compare_cell_value(cell1, cell2, column_type, column_name)
-
- if not compare_result['match']:
- # ✅ 直接将单元格差异添加到differences列表
- diff_info = compare_result['difference']
-
- differences.append({
- 'type': diff_info['type'], # 使用原始类型(table_amount, table_text等)
- 'position': f'第{actual_row_num}行第{col_idx + 1}列',
- 'file1_value': diff_info['value1'],
- 'file2_value': diff_info['value2'],
- 'description': diff_info['description'],
- 'severity': 'medium',
- 'row_index': actual_row_num,
- 'col_index': col_idx,
- 'column_name': column_name,
- 'column_type': column_type,
- # 保留额外信息
- **{k: v for k, v in diff_info.items() if k not in ['type', 'value1', 'value2', 'description']}
- })
-
- print(f" ⚠️ 第{actual_row_num}行第{col_idx + 1}列({column_name}): {diff_info['description']}")
-
- print(f"\n✅ 流水表格对比完成,发现 {len(differences)} 个差异")
-
- return differences
-
- def compare_tables_with_mode(self, table1: List[List[str]], table2: List[List[str]],
- mode: str = 'standard') -> List[Dict]:
- """根据模式选择表格比较算法"""
- if mode == 'flow_list':
- return self.compare_table_flow_list(table1, table2)
- else:
- return self.compare_tables(table1, table2)
-
- def compare_files(self, file1_path: str, file2_path: str) -> Dict:
- """改进的文件比较方法 - 支持不同的表格比较模式"""
- # 读取文件
- with open(file1_path, 'r', encoding='utf-8') as f:
- content1 = f.read()
-
- with open(file2_path, 'r', encoding='utf-8') as f:
- content2 = f.read()
-
- # 提取表格和段落
- tables1 = self.extract_table_data(content1)
- tables2 = self.extract_table_data(content2)
-
- paras1 = self.extract_paragraphs(content1)
- paras2 = self.extract_paragraphs(content2)
-
- # 比较结果
- all_differences = []
-
- # 比较表格 - 使用指定的比较模式
- if tables1 and tables2:
- table_diffs = self.compare_tables_with_mode(
- tables1[0], tables2[0],
- mode=self.table_comparison_mode
- )
- all_differences.extend(table_diffs)
- elif tables1 and not tables2:
- all_differences.append({
- 'type': 'table_structure',
- 'position': '表格结构',
- 'file1_value': f'包含{len(tables1)}个表格',
- 'file2_value': '无表格',
- 'description': '文件1包含表格但文件2无表格',
- 'severity': 'high'
- })
- elif not tables1 and tables2:
- all_differences.append({
- 'type': 'table_structure',
- 'position': '表格结构',
- 'file1_value': '无表格',
- 'file2_value': f'包含{len(tables2)}个表格',
- 'description': '文件2包含表格但文件1无表格',
- 'severity': 'high'
- })
-
- # 使用增强的段落比较
- para_diffs = self.compare_paragraphs_with_flexible_matching(paras1, paras2)
- all_differences.extend(para_diffs)
-
- # ✅ 改进统计信息 - 细化分类
- stats = {
- 'total_differences': len(all_differences),
- 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]),
- 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']),
- 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']),
- 'datetime_differences': len([d for d in all_differences if d['type'] == 'table_datetime']),
- 'text_differences': len([d for d in all_differences if d['type'] == 'table_text']),
- 'table_pre_header': len([d for d in all_differences if d['type'] == 'table_pre_header']),
- 'table_header_mismatch': len([d for d in all_differences if d['type'] == 'table_header_mismatch']), # ✅ 新增
- 'table_header_critical': len([d for d in all_differences if d['type'] == 'table_header_critical']), # ✅ 新增
- 'table_header_position': len([d for d in all_differences if d['type'] == 'table_header_position']),
- 'table_row_missing': len([d for d in all_differences if d['type'] == 'table_row_missing']),
- 'high_severity': len([d for d in all_differences if d.get('severity') == 'critical' or d.get('severity') == 'high']),
- 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']),
- 'low_severity': len([d for d in all_differences if d.get('severity') == 'low'])
- }
-
- result = {
- 'differences': all_differences,
- 'statistics': stats,
- 'file1_tables': len(tables1),
- 'file2_tables': len(tables2),
- 'file1_paragraphs': len(paras1),
- 'file2_paragraphs': len(paras2),
- 'file1_path': file1_path,
- 'file2_path': file2_path,
- }
-
- return result
- def generate_json_report(self, comparison_result: Dict, output_file: str):
- """生成JSON格式的比较报告"""
- # report_data = {
- # 'comparison_summary': {
- # 'timestamp': re.sub(r'[^\w\-_\.]', '_', str(comparison_result.get('timestamp', ''))),
- # 'file1': comparison_result['file1_path'],
- # 'file2': comparison_result['file2_path'],
- # 'statistics': comparison_result['statistics'],
- # 'file_info': {
- # 'file1_tables': comparison_result['file1_tables'],
- # 'file2_tables': comparison_result['file2_tables'],
- # 'file1_paragraphs': comparison_result['file1_paragraphs'],
- # 'file2_paragraphs': comparison_result['file2_paragraphs']
- # }
- # },
- # 'differences': comparison_result['differences']
- # }
-
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(comparison_result, f, ensure_ascii=False, indent=2)
-
- def generate_markdown_report(self, comparison_result: Dict, output_file: str):
- """生成Markdown格式的比较报告 - 修复类型映射"""
- with open(output_file, 'w', encoding='utf-8') as f:
- f.write("# OCR结果对比报告\n\n")
-
- # 基本信息
- f.write("## 基本信息\n\n")
- f.write(f"- **文件1**: `{comparison_result['file1_path']}`\n")
- f.write(f"- **文件2**: `{comparison_result['file2_path']}`\n")
- f.write(f"- **比较时间**: {comparison_result.get('timestamp', 'N/A')}\n\n")
-
- # 统计信息
- stats = comparison_result['statistics']
- f.write("## 统计信息\n\n")
- f.write(f"- 总差异数量: **{stats['total_differences']}**\n")
- f.write(f"- 表格差异: **{stats['table_differences']}**\n")
- f.write(f"- 其中表格金额差异: **{stats['amount_differences']}**\n")
- f.write(f"- 段落差异: **{stats['paragraph_differences']}**\n")
- f.write(f"- 高严重度: **{stats['high_severity']}**\n") # ✅ 新增
- f.write(f"- 中严重度: **{stats['medium_severity']}**\n") # ✅ 新增
- f.write(f"- 低严重度: **{stats['low_severity']}**\n") # ✅ 新增
- f.write(f"- 文件1表格数: {comparison_result['file1_tables']}\n")
- f.write(f"- 文件2表格数: {comparison_result['file2_tables']}\n")
- f.write(f"- 文件1段落数: {comparison_result['file1_paragraphs']}\n")
- f.write(f"- 文件2段落数: {comparison_result['file2_paragraphs']}\n\n")
-
- # 差异摘要
- if stats['total_differences'] == 0:
- f.write("## 结论\n\n")
- f.write("🎉 **完美匹配!没有发现任何差异。**\n\n")
- else:
- f.write("## 差异摘要\n\n")
-
- # ✅ 更新类型映射
- type_name_map = {
- 'table_amount': '💰 表格金额差异',
- 'table_text': '📝 表格文本差异',
- 'table_pre_header': '📋 表头前内容差异',
- 'table_header_position': '📍 表头位置差异',
- 'table_header_critical': '❌ 表头严重错误',
- 'table_row_missing': '🚫 表格行缺失',
- 'table_row_data': '📊 表格数据差异',
- 'table_structure': '🏗️ 表格结构差异',
- 'paragraph': '📄 段落差异'
- }
-
- # 按类型分组显示差异
- diff_by_type = {}
- for diff in comparison_result['differences']:
- diff_type = diff['type']
- if diff_type not in diff_by_type:
- diff_by_type[diff_type] = []
- diff_by_type[diff_type].append(diff)
-
- for diff_type, diffs in diff_by_type.items():
- type_name = type_name_map.get(diff_type, f'❓ {diff_type}')
-
- f.write(f"### {type_name} ({len(diffs)}个)\n\n")
-
- for i, diff in enumerate(diffs, 1):
- f.write(f"**{i}. {diff['position']}**\n")
- f.write(f"- 文件1: `{diff['file1_value']}`\n")
- f.write(f"- 文件2: `{diff['file2_value']}`\n")
- f.write(f"- 说明: {diff['description']}\n")
- if 'severity' in diff:
- severity_icon = {'critical': '🔴', 'high': '🟠', 'medium': '🟡', 'low': '🟢'}
- f.write(f"- 严重度: {severity_icon.get(diff['severity'], '⚪')} {diff['severity']}\n")
- f.write("\n")
-
- # 详细差异列表
- if comparison_result['differences']:
- f.write("## 详细差异列表\n\n")
- f.write("| 序号 | 类型 | 位置 | 文件1内容 | 文件2内容 | 描述 | 严重度 |\n")
- f.write("| --- | --- | --- | --- | --- | --- | --- |\n")
-
- for i, diff in enumerate(comparison_result['differences'], 1):
- severity = diff.get('severity', 'N/A')
- f.write(f"| {i} | {diff['type']} | {diff['position']} | ")
- f.write(f"`{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}` | ")
- f.write(f"`{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}` | ")
- f.write(f"{diff['description']} | {severity} |\n")
- def compare_ocr_results(file1_path: str, file2_path: str, output_file: str = "comparison_report",
- output_format: str = "markdown", ignore_images: bool = True,
- table_mode: str = 'standard', similarity_algorithm: str = 'ratio') -> Dict:
- """
- 比较两个OCR结果文件
-
- Args:
- file1_path: 第一个OCR结果文件路径
- file2_path: 第二个OCR结果文件路径
- output_file: 输出文件名(不含扩展名)
- output_format: 输出格式 ('json', 'markdown', 'both')
- ignore_images: 是否忽略图片内容
- table_mode: 表格比较模式 ('standard', 'flow_list')
- similarity_algorithm: 相似度算法 ('ratio', 'partial_ratio', 'token_sort_ratio', 'token_set_ratio')
- """
- comparator = OCRResultComparator()
- comparator.table_comparison_mode = table_mode
-
- # 根据参数选择相似度算法
- if similarity_algorithm == 'partial_ratio':
- comparator.calculate_text_similarity = lambda t1, t2: fuzz.partial_ratio(t1, t2)
- elif similarity_algorithm == 'token_sort_ratio':
- comparator.calculate_text_similarity = lambda t1, t2: fuzz.token_sort_ratio(t1, t2)
- elif similarity_algorithm == 'token_set_ratio':
- comparator.calculate_text_similarity = lambda t1, t2: fuzz.token_set_ratio(t1, t2)
-
- print("🔍 开始对比OCR结果...")
- print(f"📄 文件1: {file1_path}")
- print(f"📄 文件2: {file2_path}")
- print(f"📊 表格模式: {table_mode}")
- print(f"🔧 相似度算法: {similarity_algorithm}")
-
- try:
- # 执行比较
- result = comparator.compare_files(file1_path, file2_path)
-
- # 添加时间戳
- import datetime
- result['timestamp'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # 生成报告
- if output_format in ['json', 'both']:
- json_file = f"{output_file}.json"
- comparator.generate_json_report(result, json_file)
- print(f"📄 JSON报告已保存至: {json_file}")
-
- if output_format in ['markdown', 'both']:
- md_file = f"{output_file}.md"
- comparator.generate_markdown_report(result, md_file)
- print(f"📝 Markdown报告已保存至: {md_file}")
-
- # 打印简要结果
- print(f"\n📊 对比完成!")
- print(f" 总差异数: {result['statistics']['total_differences']}")
- print(f" 表格差异: {result['statistics']['table_differences']}")
- print(f" 其中表格金额差异: {result['statistics']['amount_differences']}")
- print(f" 段落差异: {result['statistics']['paragraph_differences']}")
-
- # 打印前几个重要差异
- if result['differences']:
- print(f"\n🔍 前3个重要差异:")
- for i, diff in enumerate(result['differences'][:3], 1):
- print(f" {i}. {diff['position']}: {diff['description']}")
- print(f" 文件1: '{diff['file1_value'][:50]}{'...' if len(diff['file1_value']) > 50 else ''}'")
- print(f" 文件2: '{diff['file2_value'][:50]}{'...' if len(diff['file2_value']) > 50 else ''}'")
- else:
- print(f"\n🎉 恭喜!两个文件内容完全一致!")
-
- # 添加处理统计信息(模仿 ocr_by_vlm.py 的风格)
- print("\n📊 对比处理统计")
- print(f" 文件1路径: {result['file1_path']}")
- print(f" 文件2路径: {result['file2_path']}")
- print(f" 输出文件: {output_file}")
- print(f" 输出格式: {output_format}")
- print(f" 忽略图片: {ignore_images}")
- print(f" 处理时间: {result['timestamp']}")
- print(f" 文件1表格数: {result['file1_tables']}")
- print(f" 文件2表格数: {result['file2_tables']}")
- print(f" 文件1段落数: {result['file1_paragraphs']}")
- print(f" 文件2段落数: {result['file2_paragraphs']}")
-
- return result
-
- except Exception as e:
- import traceback
- traceback.print_exc()
- raise Exception(f"OCR对比任务失败: {e}")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='OCR结果对比工具')
- parser.add_argument('file1', nargs='?', help='第一个OCR结果文件路径')
- parser.add_argument('file2', nargs='?', help='第二个OCR结果文件路径')
- parser.add_argument('-o', '--output', default='comparison_report', help='输出文件名')
- parser.add_argument('-f', '--format', choices=['json', 'markdown', 'both'],
- default='markdown', help='输出格式')
- parser.add_argument('--ignore-images', action='store_true', help='忽略图片内容')
- parser.add_argument('--table-mode', choices=['standard', 'flow_list'],
- default='standard', help='表格比较模式')
- parser.add_argument('--similarity-algorithm',
- choices=['ratio', 'partial_ratio', 'token_sort_ratio', 'token_set_ratio'],
- default='ratio', help='相似度算法')
-
- args = parser.parse_args()
- if args.file1 and args.file2:
- result = compare_ocr_results(
- file1_path=args.file1,
- file2_path=args.file2,
- output_file=args.output,
- output_format=args.format,
- ignore_images=args.ignore_images,
- table_mode=args.table_mode,
- similarity_algorithm=args.similarity_algorithm
- )
- else:
- # 测试流水表格对比
- result = compare_ocr_results(
- file1_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/data_PPStructureV3_Results/A用户_单元格扫描流水_page_001.md',
- file2_path='/Users/zhch158/workspace/data/流水分析/A用户_单元格扫描流水/mineru-vlm-2.5.3_Results/A用户_单元格扫描流水_page_001.md',
- output_file=f'./output/flow_list_comparison_{time.strftime("%Y%m%d_%H%M%S")}',
- output_format='both',
- ignore_images=True,
- table_mode='flow_list', # 使用流水表格模式
- similarity_algorithm='ratio'
- )
|