import re from typing import Dict, List, Tuple, Optional try: from .data_type_detector import DataTypeDetector from .similarity_calculator import SimilarityCalculator from .text_processor import TextProcessor except ImportError: from data_type_detector import DataTypeDetector from similarity_calculator import SimilarityCalculator from text_processor import TextProcessor class TableComparator: """表格数据比较""" def __init__(self): self.detector = DataTypeDetector() self.calculator = SimilarityCalculator() self.text_processor = TextProcessor() self.header_similarity_threshold = 90 self.content_similarity_threshold = 95 self.max_paragraph_window = 6 def find_matching_tables(self, tables1: List[List[List[str]]], tables2: List[List[List[str]]]) -> List[Tuple[int, int, float]]: """ 智能匹配两个文件中的表格 Returns: List[Tuple[int, int, float]]: (table1_index, table2_index, similarity_score) """ matches = [] for i, table1 in enumerate(tables1): if not table1: continue best_match = None best_score = 0 for j, table2 in enumerate(tables2): if not table2: continue # 计算表格相似度 score = self._calculate_table_similarity(table1, table2) if score > best_score: best_score = score best_match = j if best_match is not None and best_score > 50: # 至少50%相似度 matches.append((i, best_match, best_score)) print(f" 📊 表格匹配: 文件1表格{i+1} ↔ 文件2表格{best_match+1} (相似度: {best_score:.1f}%)") return matches def _get_max_columns(self, table: List[List[str]]) -> int: """获取表格的最大列数""" if not table: return 0 return max(len(row) for row in table) def _calculate_table_similarity(self, table1: List[List[str]], table2: List[List[str]]) -> float: """计算两个表格的相似度""" if not table1 or not table2: return 0.0 # 1. 行数相似度 (权重: 15%) row_count1 = len(table1) row_count2 = len(table2) row_similarity = 100 * (1 - abs(row_count1 - row_count2) / max(row_count1, row_count2)) # 2. 列数相似度 (权重: 15%) - ✅ 使用最大列数 col_count1 = self._get_max_columns(table1) col_count2 = self._get_max_columns(table2) max_cols = max(col_count1, col_count2) min_cols = min(col_count1, col_count2) if max_cols == 0: col_similarity = 0 else: # 如果列数差异在合理范围内(比如差1-2列),给予较高分数 col_diff = abs(col_count1 - col_count2) if col_diff == 0: col_similarity = 100 elif col_diff <= 2: # 差1-2列,给予80-95分 col_similarity = 100 - (col_diff * 10) else: # 差异较大时,使用比例计算 col_similarity = 100 * (min_cols / max_cols) print(f" 行数对比: {row_count1} vs {row_count2}, 相似度: {row_similarity:.1f}%") print(f" 列数对比: {col_count1} vs {col_count2}, 相似度: {col_similarity:.1f}%") # 3. 表头相似度 (权重: 50%) - ✅ 先检测表头位置 header_row_idx1 = self.detect_table_header_row(table1) header_row_idx2 = self.detect_table_header_row(table2) print(f" 表头位置: 文件1第{header_row_idx1+1}行, 文件2第{header_row_idx2+1}行") header_similarity = 0 if header_row_idx1 < len(table1) and header_row_idx2 < len(table2): header1 = table1[header_row_idx1] header2 = table2[header_row_idx2] if header1 and header2: # ✅ 智能表头匹配 header_similarity = self._calculate_header_similarity_smart(header1, header2) print(f" 表头相似度: {header_similarity:.1f}%") # 4. 内容特征相似度 (权重: 20%) content_similarity = self._calculate_content_features_similarity(table1, table2) print(f" 内容特征相似度: {content_similarity:.1f}%") # ✅ 调整权重分配 total_similarity = ( row_similarity * 0.15 + # 行数 15% col_similarity * 0.15 + # 列数 15% header_similarity * 0.50 + # 表头 50% (最重要) content_similarity * 0.20 # 内容 20% ) return total_similarity def _calculate_header_similarity_smart(self, header1: List[str], header2: List[str]) -> float: """ 智能计算表头相似度 处理以下情况: 1. 列数不同但表头内容相似 2. PaddleOCR可能将多行表头合并 3. 表头顺序可能不同 """ if not header1 or not header2: return 0.0 # 标准化表头 norm_headers1 = [self.normalize_header_text(h) for h in header1] norm_headers2 = [self.normalize_header_text(h) for h in header2] # 方法1: 精确匹配 (最高优先级) common_headers = set(norm_headers1) & set(norm_headers2) max_len = max(len(norm_headers1), len(norm_headers2)) if max_len == 0: return 0.0 exact_match_ratio = len(common_headers) / max_len # 方法2: 模糊匹配 (针对列数不同的情况) fuzzy_matches = 0 # 使用较短的表头作为基准 if len(norm_headers1) <= len(norm_headers2): base_headers = norm_headers1 compare_headers = norm_headers2 else: base_headers = norm_headers2 compare_headers = norm_headers1 for base_h in base_headers: best_similarity = 0 for comp_h in compare_headers: similarity = self.calculator.calculate_text_similarity(base_h, comp_h) if similarity > best_similarity: best_similarity = similarity if best_similarity == 100: break # 如果相似度超过70%,认为是匹配的 if best_similarity > 70: fuzzy_matches += 1 fuzzy_match_ratio = fuzzy_matches / max_len if max_len > 0 else 0 # 方法3: 关键字匹配 (识别常见表头) key_headers = { 'date': ['日期', 'date', '时间', 'time'], 'type': ['类型', 'type', '业务', 'business'], 'number': ['号', 'no', '编号', 'id', '票据', 'bill'], 'description': ['摘要', 'description', '说明', 'remark'], 'amount': ['金额', 'amount', '借方', 'debit', '贷方', 'credit'], 'balance': ['余额', 'balance'], 'counterparty': ['对手', 'counterparty', '账户', 'account', '户名', 'name'] } def categorize_header(h: str) -> set: categories = set() h_lower = h.lower() for category, keywords in key_headers.items(): for keyword in keywords: if keyword in h_lower: categories.add(category) return categories categories1 = set() for h in norm_headers1: categories1.update(categorize_header(h)) categories2 = set() for h in norm_headers2: categories2.update(categorize_header(h)) common_categories = categories1 & categories2 all_categories = categories1 | categories2 category_match_ratio = len(common_categories) / len(all_categories) if all_categories else 0 # ✅ 综合三种方法,加权计算 final_similarity = ( exact_match_ratio * 0.4 + # 精确匹配 40% fuzzy_match_ratio * 0.4 + # 模糊匹配 40% category_match_ratio * 0.2 # 语义匹配 20% ) * 100 print(f" 精确匹配: {exact_match_ratio:.1%}, 模糊匹配: {fuzzy_match_ratio:.1%}, 语义匹配: {category_match_ratio:.1%}") return final_similarity def _calculate_content_features_similarity(self, table1: List[List[str]], table2: List[List[str]]) -> float: """计算表格内容特征相似度""" # 统计数字、日期等特征 features1 = self._extract_table_features(table1) features2 = self._extract_table_features(table2) # 比较特征 similarity_scores = [] for key in ['numeric_ratio', 'date_ratio', 'empty_ratio']: if key in features1 and key in features2: diff = abs(features1[key] - features2[key]) similarity_scores.append(100 * (1 - diff)) return sum(similarity_scores) / len(similarity_scores) if similarity_scores else 0 def _extract_table_features(self, table: List[List[str]]) -> Dict: """提取表格特征""" total_cells = 0 numeric_cells = 0 date_cells = 0 empty_cells = 0 for row in table: for cell in row: total_cells += 1 if not cell or cell.strip() == '': empty_cells += 1 continue if self.detector.is_numeric(cell): numeric_cells += 1 if self.detector.extract_datetime(cell): date_cells += 1 return { 'numeric_ratio': numeric_cells / total_cells if total_cells > 0 else 0, 'date_ratio': date_cells / total_cells if total_cells > 0 else 0, 'empty_ratio': empty_cells / total_cells if total_cells > 0 else 0, 'total_cells': total_cells } def normalize_header_text(self, text: str) -> str: """标准化表头文本""" # 移除括号内容 text = re.sub(r'[((].*?[))]', '', text) # 移除空格 text = re.sub(r'\s+', '', text) # 只保留字母、数字和中文 text = re.sub(r'[^\w\u4e00-\u9fff]', '', text) return text.lower().strip() def detect_table_header_row(self, table: List[List[str]]) -> int: """ 智能检测表格的表头行索引 检测策略: 1. 查找包含表头关键字最多的行 2. 确认下一行是数据行 3. 避免将合并单元格的元数据行误判为表头 """ if not table: return 0 header_keywords = [ '日期', 'date', '时间', 'time', '类型', 'type', '业务', 'business', '号', 'no', '编号', 'id', '票据', 'bill', '摘要', 'description', '说明', 'remark', '金额', 'amount', '借方', 'debit', '贷方', 'credit', '余额', 'balance', '对手', 'counterparty', '账户', 'account', '户名', 'name' ] best_header_row = 0 best_score = 0 for row_idx, row in enumerate(table[:5]): # 只检查前5行 if not row: continue # 计算关键字匹配分数 keyword_count = 0 non_empty_cells = 0 for cell in row: cell_text = str(cell).strip() if cell_text: non_empty_cells += 1 cell_lower = cell_text.lower() for keyword in header_keywords: if keyword in cell_lower: keyword_count += 1 break # 避免空行或几乎空的行 if non_empty_cells < 3: continue # 计算得分:关键字比例 + 列数奖励 keyword_ratio = keyword_count / non_empty_cells if non_empty_cells > 0 else 0 column_bonus = min(non_empty_cells / 5, 1.0) # 列数越多,奖励越高 score = keyword_ratio * 0.7 + column_bonus * 0.3 # 如果下一行是数据行,加分 if row_idx + 1 < len(table): next_row = table[row_idx + 1] if self._is_data_row(next_row): score += 0.2 if score > best_score: best_score = score best_header_row = row_idx # 如果最佳得分太低,返回0(第一行) if best_score < 0.3: print(f" ⚠️ 未检测到明确表头,默认使用第1行 (得分: {best_score:.2f})") return 0 print(f" 📍 检测到表头在第 {best_header_row + 1} 行 (得分: {best_score:.2f})") return best_header_row def _is_data_row(self, row: List[str]) -> bool: """判断是否为数据行""" if not row: return False data_pattern_count = 0 non_empty_count = 0 for cell in row: cell_text = str(cell).strip() if not cell_text: continue non_empty_count += 1 # 包含数字 if re.search(r'\d', cell_text): data_pattern_count += 1 # 包含日期格式 if re.search(r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}', cell_text): data_pattern_count += 1 # 包含金额格式 if re.search(r'-?\d+[,,]?\d*\.?\d+', cell_text): data_pattern_count += 1 if non_empty_count == 0: return False # 至少30%的单元格包含数据特征 return data_pattern_count / non_empty_count >= 0.3 def compare_table_headers(self, headers1: List[str], headers2: List[str]) -> Dict: """比较表格表头""" result = { 'match': True, 'differences': [], 'column_mapping': {}, 'similarity_scores': [] } if len(headers1) != len(headers2): result['match'] = False result['differences'].append({ 'type': 'table_header_critical', 'description': f'表头列数不一致: {len(headers1)} vs {len(headers2)}', 'severity': 'critical' }) return result for i, (h1, h2) in enumerate(zip(headers1, headers2)): norm_h1 = self.normalize_header_text(h1) norm_h2 = self.normalize_header_text(h2) similarity = self.calculator.calculate_text_similarity(norm_h1, norm_h2) result['similarity_scores'].append({ 'column_index': i, 'header1': h1, 'header2': h2, 'similarity': similarity }) if similarity < self.header_similarity_threshold: result['match'] = False result['differences'].append({ 'type': 'table_header_mismatch', 'column_index': i, 'header1': h1, 'header2': h2, 'similarity': similarity, 'description': f'第{i+1}列表头不匹配: "{h1}" vs "{h2}" (相似度: {similarity:.1f}%)', 'severity': 'medium' if similarity < 50 else 'high' }) else: result['column_mapping'][i] = i return result def compare_cell_value(self, value1: str, value2: str, column_type: str, column_name: str = '') -> Dict: """比较单元格值""" result = { 'match': True, 'difference': None } v1 = self.text_processor.normalize_text(value1) v2 = self.text_processor.normalize_text(value2) if v1 == v2: return result if column_type == 'text_number': norm_v1 = self.detector.normalize_text_number(v1) norm_v2 = self.detector.normalize_text_number(v2) if norm_v1 == norm_v2: result['match'] = False result['difference'] = { 'type': 'table_text', 'value1': value1, 'value2': value2, 'description': f'文本型数字格式差异: "{value1}" vs "{value2}" (内容相同,空格不同)', 'severity': 'low' } else: result['match'] = False result['difference'] = { 'type': 'table_text', 'value1': value1, 'value2': value2, 'description': f'文本型数字不一致: {value1} vs {value2}', 'severity': 'high' } return result if column_type == 'numeric': if self.detector.is_numeric(v1) and self.detector.is_numeric(v2): num1 = self.detector.parse_number(v1) num2 = self.detector.parse_number(v2) if abs(num1 - num2) > 0.01: result['match'] = False result['difference'] = { 'type': 'table_amount', 'value1': value1, 'value2': value2, 'diff_amount': abs(num1 - num2), 'description': f'金额不一致: {value1} vs {value2}', 'severity': 'high' # ✅ 修改:金额差异 = high } else: result['match'] = False result['difference'] = { 'type': 'table_text', 'value1': value1, 'value2': value2, 'description': f'长数字字符串不一致: {value1} vs {value2}', 'severity': 'medium' # ✅ 修改:数字字符串差异 = medium } elif column_type == 'datetime': datetime1 = self.detector.extract_datetime(v1) datetime2 = self.detector.extract_datetime(v2) if datetime1 != datetime2: result['match'] = False result['difference'] = { 'type': 'table_datetime', 'value1': value1, 'value2': value2, 'description': f'日期时间不一致: {value1} vs {value2}', 'severity': 'medium' # 日期差异 = medium } else: similarity = self.calculator.calculate_text_similarity(v1, v2) if similarity < self.content_similarity_threshold: result['match'] = False result['difference'] = { 'type': 'table_text', 'value1': value1, 'value2': value2, 'similarity': similarity, 'description': f'文本不一致: {value1} vs {value2} (相似度: {similarity:.1f}%)', 'severity': 'low' if similarity > 80 else 'medium' # 根据相似度判断 } return result def compare_tables(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]: """标准表格比较""" differences = [] max_rows = max(len(table1), len(table2)) for i in range(max_rows): row1 = table1[i] if i < len(table1) else [] row2 = table2[i] if i < len(table2) else [] max_cols = max(len(row1), len(row2)) for j in range(max_cols): cell1 = row1[j] if j < len(row1) else "" cell2 = row2[j] if j < len(row2) else "" if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2: continue if cell1 != cell2: if self.detector.is_numeric(cell1) and self.detector.is_numeric(cell2): num1 = self.detector.parse_number(cell1) num2 = self.detector.parse_number(cell2) if abs(num1 - num2) > 0.001: differences.append({ 'type': 'table_amount', 'position': f'行{i+1}列{j+1}', 'file1_value': cell1, 'file2_value': cell2, 'description': f'金额不一致: {cell1} vs {cell2}', 'severity': 'high', # ✅ 添加:金额差异 = high 'row_index': i, 'col_index': j }) else: differences.append({ 'type': 'table_text', 'position': f'行{i+1}列{j+1}', 'file1_value': cell1, 'file2_value': cell2, 'description': f'文本不一致: {cell1} vs {cell2}', 'severity': 'medium', # ✅ 添加:文本差异 = medium 'row_index': i, 'col_index': j }) return differences def compare_table_flow_list(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]: """流水列表表格比较算法""" differences = [] if not table1 or not table2: return [{ 'type': 'table_empty', 'description': '表格为空', 'severity': 'critical' }] print(f"\n📋 开始流水表格对比...") # 检测表头位置 header_row_idx1 = self.detect_table_header_row(table1) header_row_idx2 = self.detect_table_header_row(table2) if header_row_idx1 != header_row_idx2: differences.append({ 'type': 'table_header_position', 'position': '表头位置', 'file1_value': f'第{header_row_idx1 + 1}行', 'file2_value': f'第{header_row_idx2 + 1}行', 'description': f'表头位置不一致: 文件1在第{header_row_idx1 + 1}行,文件2在第{header_row_idx2 + 1}行', 'severity': 'high' }) # 比对表头前的内容 if header_row_idx1 > 0 or header_row_idx2 > 0: print(f"\n📝 对比表头前的内容...") pre_header_table1 = table1[:header_row_idx1] if header_row_idx1 > 0 else [] pre_header_table2 = table2[:header_row_idx2] if header_row_idx2 > 0 else [] if pre_header_table1 or pre_header_table2: pre_header_diffs = self.compare_tables(pre_header_table1, pre_header_table2) for diff in pre_header_diffs: diff['type'] = 'table_pre_header' diff['position'] = f"表头前{diff['position']}" diff['severity'] = 'medium' differences.extend(pre_header_diffs) # 比较表头 headers1 = table1[header_row_idx1] headers2 = table2[header_row_idx2] print(f"\n📋 对比表头...") header_result = self.compare_table_headers(headers1, headers2) if not header_result['match']: print(f"\n⚠️ 表头文字存在差异") for diff in header_result['differences']: differences.append({ 'type': diff.get('type', 'table_header_mismatch'), 'position': '表头', 'file1_value': diff.get('header1', ''), 'file2_value': diff.get('header2', ''), 'description': diff['description'], 'severity': diff.get('severity', 'high'), }) if diff.get('severity') == 'critical': return differences # 检测列类型并比较数据行 column_types1 = self._detect_column_types(table1, header_row_idx1, headers1) column_types2 = self._detect_column_types(table2, header_row_idx2, headers2) # 处理列类型不匹配 mismatched_columns = self._check_column_type_mismatch( column_types1, column_types2, headers1, headers2, differences ) # 合并列类型 column_types = self._merge_column_types(column_types1, column_types2, mismatched_columns) # 逐行比较数据 data_diffs = self._compare_data_rows( table1, table2, header_row_idx1, header_row_idx2, headers1, column_types, mismatched_columns, header_result['match'] ) differences.extend(data_diffs) print(f"\n✅ 流水表格对比完成,发现 {len(differences)} 个差异") return differences def _detect_column_types(self, table: List[List[str]], header_row_idx: int, headers: List[str]) -> List[str]: """检测列类型""" column_types = [] for col_idx in range(len(headers)): col_values = [ row[col_idx] for row in table[header_row_idx + 1:] if col_idx < len(row) ] col_type = self.detector.detect_column_type(col_values) column_types.append(col_type) return column_types def _check_column_type_mismatch(self, column_types1: List[str], column_types2: List[str], headers1: List[str], headers2: List[str], differences: List[Dict]) -> List[int]: """检查列类型不匹配""" mismatched_columns = [] for col_idx in range(min(len(column_types1), len(column_types2))): if column_types1[col_idx] != column_types2[col_idx]: mismatched_columns.append(col_idx) differences.append({ 'type': 'table_column_type_mismatch', 'position': f'第{col_idx + 1}列', 'file1_value': f'{headers1[col_idx]} ({column_types1[col_idx]})', 'file2_value': f'{headers2[col_idx]} ({column_types2[col_idx]})', 'description': f'列类型不一致: {column_types1[col_idx]} vs {column_types2[col_idx]}', 'severity': 'high', 'column_index': col_idx }) total_columns = min(len(column_types1), len(column_types2)) mismatch_ratio = len(mismatched_columns) / total_columns if total_columns > 0 else 0 if mismatch_ratio > 0.5: differences.append({ 'type': 'table_header_critical', 'position': '表格列类型', 'file1_value': f'{len(mismatched_columns)}列类型不一致', 'file2_value': f'共{total_columns}列', 'description': f'列类型差异过大: {len(mismatched_columns)}/{total_columns}列不匹配 ({mismatch_ratio:.1%})', 'severity': 'critical' }) return mismatched_columns def _merge_column_types(self, column_types1: List[str], column_types2: List[str], mismatched_columns: List[int]) -> List[str]: """合并列类型""" column_types = [] for col_idx in range(max(len(column_types1), len(column_types2))): if col_idx >= len(column_types1): column_types.append(column_types2[col_idx]) elif col_idx >= len(column_types2): column_types.append(column_types1[col_idx]) elif col_idx in mismatched_columns: type1 = column_types1[col_idx] type2 = column_types2[col_idx] if type1 == 'text' or type2 == 'text': column_types.append('text') elif type1 == 'text_number' or type2 == 'text_number': column_types.append('text_number') else: column_types.append(type1) else: column_types.append(column_types1[col_idx]) return column_types def _compare_data_rows(self, table1: List[List[str]], table2: List[List[str]], header_row_idx1: int, header_row_idx2: int, headers1: List[str], column_types: List[str], mismatched_columns: List[int], header_match: bool) -> List[Dict]: """逐行比较数据""" differences = [] data_rows1 = table1[header_row_idx1 + 1:] data_rows2 = table2[header_row_idx2 + 1:] max_rows = max(len(data_rows1), len(data_rows2)) for row_idx in range(max_rows): row1 = data_rows1[row_idx] if row_idx < len(data_rows1) else [] row2 = data_rows2[row_idx] if row_idx < len(data_rows2) else [] actual_row_num = header_row_idx1 + row_idx + 2 if not row1: differences.append({ 'type': 'table_row_missing', 'position': f'第{actual_row_num}行', 'file1_value': '', 'file2_value': ', '.join(row2), 'description': f'文件1缺少第{actual_row_num}行', 'severity': 'high', 'row_index': actual_row_num }) continue if not row2: differences.append({ 'type': 'table_row_missing', 'position': f'第{actual_row_num}行', 'file1_value': ', '.join(row1), 'file2_value': '', 'description': f'文件2缺少第{actual_row_num}行', 'severity': 'high', 'row_index': actual_row_num }) continue # 逐列比较 max_cols = max(len(row1), len(row2)) for col_idx in range(max_cols): cell1 = row1[col_idx] if col_idx < len(row1) else '' cell2 = row2[col_idx] if col_idx < len(row2) else '' if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2: continue column_type = column_types[col_idx] if col_idx < len(column_types) else 'text' column_name = headers1[col_idx] if col_idx < len(headers1) else f'列{col_idx + 1}' compare_result = self.compare_cell_value(cell1, cell2, column_type, column_name) if not compare_result['match']: diff_info = compare_result['difference'] type_mismatch_note = "" if col_idx in mismatched_columns: type_mismatch_note = " [列类型冲突]" # ✅ 确定最终严重度:优先使用 diff_info 的 severity base_severity = diff_info.get('severity', 'medium') # 如果列类型冲突,且基础严重度不是 high,则提升到 high final_severity = 'high' if col_idx in mismatched_columns else base_severity differences.append({ 'type': diff_info['type'], 'position': f'第{actual_row_num}行第{col_idx + 1}列', 'file1_value': diff_info['value1'], 'file2_value': diff_info['value2'], 'description': diff_info['description'] + type_mismatch_note, 'severity': final_severity, # ✅ 使用计算后的严重度 'row_index': actual_row_num, 'col_index': col_idx, 'column_name': column_name, 'column_type': column_type, 'column_type_mismatch': col_idx in mismatched_columns, }) return differences