|
|
@@ -0,0 +1,849 @@
|
|
|
+import re
|
|
|
+import sys
|
|
|
+from typing import Dict, List, Tuple, Optional
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+# 添加 ocr_platform 根目录到 Python 路径
|
|
|
+# 使用 resolve() 确保路径是绝对路径,避免相对路径导致的 IndexError
|
|
|
+_file_path = Path(__file__).resolve()
|
|
|
+ocr_platform_root = _file_path.parents[1] # table_comparator.py -> ocr_comparator -> ocr_platform
|
|
|
+if str(ocr_platform_root) not in sys.path:
|
|
|
+ sys.path.insert(0, str(ocr_platform_root))
|
|
|
+
|
|
|
+try:
|
|
|
+ from .data_type_detector import DataTypeDetector
|
|
|
+ from .similarity_calculator import SimilarityCalculator
|
|
|
+ from .text_processor import TextProcessor
|
|
|
+except ImportError:
|
|
|
+ from data_type_detector import DataTypeDetector
|
|
|
+ from similarity_calculator import SimilarityCalculator
|
|
|
+ from text_processor import TextProcessor
|
|
|
+
|
|
|
+
|
|
|
+class TableComparator:
|
|
|
+ """表格数据比较"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.detector = DataTypeDetector()
|
|
|
+ self.calculator = SimilarityCalculator()
|
|
|
+ self.text_processor = TextProcessor()
|
|
|
+ self.header_similarity_threshold = 90
|
|
|
+ self.content_similarity_threshold = 95
|
|
|
+ self.max_paragraph_window = 6
|
|
|
+
|
|
|
+ def find_matching_tables(self, tables1: List[List[List[str]]],
|
|
|
+ tables2: List[List[List[str]]]) -> List[Tuple[int, int, float]]:
|
|
|
+ """
|
|
|
+ 智能匹配两个文件中的表格
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ List[Tuple[int, int, float]]: (table1_index, table2_index, similarity_score)
|
|
|
+ """
|
|
|
+ matches = []
|
|
|
+
|
|
|
+ for i, table1 in enumerate(tables1):
|
|
|
+ if not table1:
|
|
|
+ continue
|
|
|
+
|
|
|
+ best_match = None
|
|
|
+ best_score = 0
|
|
|
+
|
|
|
+ for j, table2 in enumerate(tables2):
|
|
|
+ if not table2:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 计算表格相似度
|
|
|
+ score = self._calculate_table_similarity(table1, table2)
|
|
|
+
|
|
|
+ if score > best_score:
|
|
|
+ best_score = score
|
|
|
+ best_match = j
|
|
|
+
|
|
|
+ if best_match is not None and best_score > 50: # 至少50%相似度
|
|
|
+ matches.append((i, best_match, best_score))
|
|
|
+ print(f" 📊 表格匹配: 文件1表格{i+1} ↔ 文件2表格{best_match+1} (相似度: {best_score:.1f}%)")
|
|
|
+
|
|
|
+ return matches
|
|
|
+
|
|
|
+ def _get_max_columns(self, table: List[List[str]]) -> int:
|
|
|
+ """获取表格的最大列数"""
|
|
|
+ if not table:
|
|
|
+ return 0
|
|
|
+ return max(len(row) for row in table)
|
|
|
+
|
|
|
+ def _calculate_table_similarity(self, table1: List[List[str]],
|
|
|
+ table2: List[List[str]]) -> float:
|
|
|
+ """计算两个表格的相似度"""
|
|
|
+ if not table1 or not table2:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ # 1. 行数相似度 (权重: 15%)
|
|
|
+ row_count1 = len(table1)
|
|
|
+ row_count2 = len(table2)
|
|
|
+ row_similarity = 100 * (1 - abs(row_count1 - row_count2) / max(row_count1, row_count2))
|
|
|
+
|
|
|
+ # 2. 列数相似度 (权重: 15%) - ✅ 使用最大列数
|
|
|
+ col_count1 = self._get_max_columns(table1)
|
|
|
+ col_count2 = self._get_max_columns(table2)
|
|
|
+
|
|
|
+ max_cols = max(col_count1, col_count2)
|
|
|
+ min_cols = min(col_count1, col_count2)
|
|
|
+
|
|
|
+ if max_cols == 0:
|
|
|
+ col_similarity = 0
|
|
|
+ else:
|
|
|
+ # 如果列数差异在合理范围内(比如差1-2列),给予较高分数
|
|
|
+ col_diff = abs(col_count1 - col_count2)
|
|
|
+ if col_diff == 0:
|
|
|
+ col_similarity = 100
|
|
|
+ elif col_diff <= 2:
|
|
|
+ # 差1-2列,给予80-95分
|
|
|
+ col_similarity = 100 - (col_diff * 10)
|
|
|
+ else:
|
|
|
+ # 差异较大时,使用比例计算
|
|
|
+ col_similarity = 100 * (min_cols / max_cols)
|
|
|
+
|
|
|
+ print(f" 行数对比: {row_count1} vs {row_count2}, 相似度: {row_similarity:.1f}%")
|
|
|
+ print(f" 列数对比: {col_count1} vs {col_count2}, 相似度: {col_similarity:.1f}%")
|
|
|
+
|
|
|
+ # 3. 表头相似度 (权重: 50%) - ✅ 先检测表头位置
|
|
|
+ header_row_idx1 = self.detect_table_header_row(table1)
|
|
|
+ header_row_idx2 = self.detect_table_header_row(table2)
|
|
|
+
|
|
|
+ print(f" 表头位置: 文件1第{header_row_idx1+1}行, 文件2第{header_row_idx2+1}行")
|
|
|
+
|
|
|
+ header_similarity = 0
|
|
|
+ if header_row_idx1 < len(table1) and header_row_idx2 < len(table2):
|
|
|
+ header1 = table1[header_row_idx1]
|
|
|
+ header2 = table2[header_row_idx2]
|
|
|
+
|
|
|
+ if header1 and header2:
|
|
|
+ # ✅ 智能表头匹配
|
|
|
+ header_similarity = self._calculate_header_similarity_smart(header1, header2)
|
|
|
+
|
|
|
+ print(f" 表头相似度: {header_similarity:.1f}%")
|
|
|
+
|
|
|
+ # 4. 内容特征相似度 (权重: 20%)
|
|
|
+ content_similarity = self._calculate_content_features_similarity(table1, table2)
|
|
|
+
|
|
|
+ print(f" 内容特征相似度: {content_similarity:.1f}%")
|
|
|
+
|
|
|
+ # ✅ 调整权重分配
|
|
|
+ total_similarity = (
|
|
|
+ row_similarity * 0.15 + # 行数 15%
|
|
|
+ col_similarity * 0.15 + # 列数 15%
|
|
|
+ header_similarity * 0.50 + # 表头 50% (最重要)
|
|
|
+ content_similarity * 0.20 # 内容 20%
|
|
|
+ )
|
|
|
+
|
|
|
+ return total_similarity
|
|
|
+
|
|
|
+ def _calculate_header_similarity_smart(self, header1: List[str],
|
|
|
+ header2: List[str]) -> float:
|
|
|
+ """
|
|
|
+ 智能计算表头相似度
|
|
|
+
|
|
|
+ 处理以下情况:
|
|
|
+ 1. 列数不同但表头内容相似
|
|
|
+ 2. PaddleOCR可能将多行表头合并
|
|
|
+ 3. 表头顺序可能不同
|
|
|
+ """
|
|
|
+ if not header1 or not header2:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ # 标准化表头
|
|
|
+ norm_headers1 = [self.normalize_header_text(h) for h in header1]
|
|
|
+ norm_headers2 = [self.normalize_header_text(h) for h in header2]
|
|
|
+
|
|
|
+ # 方法1: 精确匹配 (最高优先级)
|
|
|
+ common_headers = set(norm_headers1) & set(norm_headers2)
|
|
|
+ max_len = max(len(norm_headers1), len(norm_headers2))
|
|
|
+
|
|
|
+ if max_len == 0:
|
|
|
+ return 0.0
|
|
|
+
|
|
|
+ exact_match_ratio = len(common_headers) / max_len
|
|
|
+
|
|
|
+ # 方法2: 模糊匹配 (针对列数不同的情况)
|
|
|
+ fuzzy_matches = 0
|
|
|
+
|
|
|
+ # 使用较短的表头作为基准
|
|
|
+ if len(norm_headers1) <= len(norm_headers2):
|
|
|
+ base_headers = norm_headers1
|
|
|
+ compare_headers = norm_headers2
|
|
|
+ else:
|
|
|
+ base_headers = norm_headers2
|
|
|
+ compare_headers = norm_headers1
|
|
|
+
|
|
|
+ for base_h in base_headers:
|
|
|
+ best_similarity = 0
|
|
|
+ for comp_h in compare_headers:
|
|
|
+ similarity = self.calculator.calculate_text_similarity(base_h, comp_h)
|
|
|
+ if similarity > best_similarity:
|
|
|
+ best_similarity = similarity
|
|
|
+ if best_similarity == 100:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 如果相似度超过70%,认为是匹配的
|
|
|
+ if best_similarity > 70:
|
|
|
+ fuzzy_matches += 1
|
|
|
+
|
|
|
+ fuzzy_match_ratio = fuzzy_matches / max_len if max_len > 0 else 0
|
|
|
+
|
|
|
+ # 方法3: 关键字匹配 (识别常见表头)
|
|
|
+ key_headers = {
|
|
|
+ 'date': ['日期', 'date', '时间', 'time'],
|
|
|
+ 'type': ['类型', 'type', '业务', 'business'],
|
|
|
+ 'number': ['号', 'no', '编号', 'id', '票据', 'bill'],
|
|
|
+ 'description': ['摘要', 'description', '说明', 'remark'],
|
|
|
+ 'amount': ['金额', 'amount', '借方', 'debit', '贷方', 'credit'],
|
|
|
+ 'balance': ['余额', 'balance'],
|
|
|
+ 'counterparty': ['对手', 'counterparty', '账户', 'account', '户名', 'name']
|
|
|
+ }
|
|
|
+
|
|
|
+ def categorize_header(h: str) -> set:
|
|
|
+ categories = set()
|
|
|
+ h_lower = h.lower()
|
|
|
+ for category, keywords in key_headers.items():
|
|
|
+ for keyword in keywords:
|
|
|
+ if keyword in h_lower:
|
|
|
+ categories.add(category)
|
|
|
+ return categories
|
|
|
+
|
|
|
+ categories1 = set()
|
|
|
+ for h in norm_headers1:
|
|
|
+ categories1.update(categorize_header(h))
|
|
|
+
|
|
|
+ categories2 = set()
|
|
|
+ for h in norm_headers2:
|
|
|
+ categories2.update(categorize_header(h))
|
|
|
+
|
|
|
+ common_categories = categories1 & categories2
|
|
|
+ all_categories = categories1 | categories2
|
|
|
+
|
|
|
+ category_match_ratio = len(common_categories) / len(all_categories) if all_categories else 0
|
|
|
+
|
|
|
+ # ✅ 综合三种方法,加权计算
|
|
|
+ final_similarity = (
|
|
|
+ exact_match_ratio * 0.4 + # 精确匹配 40%
|
|
|
+ fuzzy_match_ratio * 0.4 + # 模糊匹配 40%
|
|
|
+ category_match_ratio * 0.2 # 语义匹配 20%
|
|
|
+ ) * 100
|
|
|
+
|
|
|
+ print(f" 精确匹配: {exact_match_ratio:.1%}, 模糊匹配: {fuzzy_match_ratio:.1%}, 语义匹配: {category_match_ratio:.1%}")
|
|
|
+
|
|
|
+ return final_similarity
|
|
|
+
|
|
|
+ def _calculate_content_features_similarity(self, table1: List[List[str]],
|
|
|
+ table2: List[List[str]]) -> float:
|
|
|
+ """计算表格内容特征相似度"""
|
|
|
+ # 统计数字、日期等特征
|
|
|
+ features1 = self._extract_table_features(table1)
|
|
|
+ features2 = self._extract_table_features(table2)
|
|
|
+
|
|
|
+ # 比较特征
|
|
|
+ similarity_scores = []
|
|
|
+
|
|
|
+ for key in ['numeric_ratio', 'date_ratio', 'empty_ratio']:
|
|
|
+ if key in features1 and key in features2:
|
|
|
+ diff = abs(features1[key] - features2[key])
|
|
|
+ similarity_scores.append(100 * (1 - diff))
|
|
|
+
|
|
|
+ return sum(similarity_scores) / len(similarity_scores) if similarity_scores else 0
|
|
|
+
|
|
|
+ def _extract_table_features(self, table: List[List[str]]) -> Dict:
|
|
|
+ """提取表格特征"""
|
|
|
+ total_cells = 0
|
|
|
+ numeric_cells = 0
|
|
|
+ date_cells = 0
|
|
|
+ empty_cells = 0
|
|
|
+
|
|
|
+ for row in table:
|
|
|
+ for cell in row:
|
|
|
+ total_cells += 1
|
|
|
+
|
|
|
+ if not cell or cell.strip() == '':
|
|
|
+ empty_cells += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ if self.detector.is_numeric(cell):
|
|
|
+ numeric_cells += 1
|
|
|
+
|
|
|
+ if self.detector.extract_datetime(cell):
|
|
|
+ date_cells += 1
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'numeric_ratio': numeric_cells / total_cells if total_cells > 0 else 0,
|
|
|
+ 'date_ratio': date_cells / total_cells if total_cells > 0 else 0,
|
|
|
+ 'empty_ratio': empty_cells / total_cells if total_cells > 0 else 0,
|
|
|
+ 'total_cells': total_cells
|
|
|
+ }
|
|
|
+
|
|
|
+ def normalize_header_text(self, text: str) -> str:
|
|
|
+ """标准化表头文本"""
|
|
|
+ # 移除括号内容
|
|
|
+ text = re.sub(r'[((].*?[))]', '', text)
|
|
|
+ # 移除空格
|
|
|
+ text = re.sub(r'\s+', '', text)
|
|
|
+ # 只保留字母、数字和中文
|
|
|
+ text = re.sub(r'[^\w\u4e00-\u9fff]', '', text)
|
|
|
+ return text.lower().strip()
|
|
|
+
|
|
|
+ def detect_table_header_row(self, table: List[List[str]]) -> int:
|
|
|
+ """
|
|
|
+ 智能检测表格的表头行索引
|
|
|
+
|
|
|
+ 检测策略:
|
|
|
+ 1. 查找包含表头关键字最多的行
|
|
|
+ 2. 确认下一行是数据行(或分类行)
|
|
|
+ 3. 特殊处理:资产负债表等多层表头
|
|
|
+ """
|
|
|
+ if not table:
|
|
|
+ return 0
|
|
|
+
|
|
|
+ header_keywords = [
|
|
|
+ '日期', 'date', '时间', 'time',
|
|
|
+ '类型', 'type', '业务', 'business',
|
|
|
+ '号', 'no', '编号', 'id', '票据', 'bill',
|
|
|
+ '摘要', 'description', '说明', 'remark',
|
|
|
+ '金额', 'amount', '借方', 'debit', '贷方', 'credit',
|
|
|
+ '余额', 'balance',
|
|
|
+ '对手', 'counterparty', '账户', 'account', '户名', 'name',
|
|
|
+ # ✅ 新增:资产负债表关键词
|
|
|
+ # '资产', 'asset', '负债', 'liability', '期末', 'period', '期初'
|
|
|
+ '期末', 'period', '期初'
|
|
|
+ ]
|
|
|
+
|
|
|
+ best_header_row = 0
|
|
|
+ best_score = 0
|
|
|
+
|
|
|
+ for row_idx, row in enumerate(table[:10]):
|
|
|
+ if not row:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 计算关键字匹配分数
|
|
|
+ keyword_count = 0
|
|
|
+ non_empty_cells = 0
|
|
|
+
|
|
|
+ for cell in row:
|
|
|
+ cell_text = str(cell).strip()
|
|
|
+ if cell_text:
|
|
|
+ non_empty_cells += 1
|
|
|
+ cell_lower = cell_text.lower()
|
|
|
+
|
|
|
+ for keyword in header_keywords:
|
|
|
+ if keyword in cell_lower:
|
|
|
+ keyword_count += 1
|
|
|
+ break
|
|
|
+
|
|
|
+ if non_empty_cells < 3:
|
|
|
+ continue
|
|
|
+
|
|
|
+ keyword_ratio = keyword_count / non_empty_cells if non_empty_cells > 0 else 0
|
|
|
+ column_bonus = min(non_empty_cells / 5, 1.0)
|
|
|
+ score = keyword_ratio * 0.7 + column_bonus * 0.3
|
|
|
+
|
|
|
+ # ✅ 改进:跳过分类行和数据行检测
|
|
|
+ if row_idx + 1 < len(table):
|
|
|
+ next_row = table[row_idx + 1]
|
|
|
+ # 如果下一行是数据行,加分
|
|
|
+ if self._is_data_row(next_row):
|
|
|
+ score += 0.2
|
|
|
+ # ✅ 新增:如果下一行是分类行(如"流动资产:"),小幅加分
|
|
|
+ elif self._is_category_row(next_row):
|
|
|
+ score += 0.1
|
|
|
+
|
|
|
+ if score > best_score:
|
|
|
+ best_score = score
|
|
|
+ best_header_row = row_idx
|
|
|
+
|
|
|
+ if best_score < 0.3:
|
|
|
+ print(f" ⚠️ 未检测到明确表头,默认使用第1行 (得分: {best_score:.2f})")
|
|
|
+ return 0
|
|
|
+
|
|
|
+ print(f" 📍 检测到表头在第 {best_header_row + 1} 行 (得分: {best_score:.2f})")
|
|
|
+ return best_header_row
|
|
|
+
|
|
|
+ def _is_category_row(self, row: List[str]) -> bool:
|
|
|
+ """
|
|
|
+ ✅ 新增:判断是否为分类行(如"流动资产:")
|
|
|
+ """
|
|
|
+ if not row:
|
|
|
+ return False
|
|
|
+
|
|
|
+ category_patterns = [
|
|
|
+ # r'流动[资产负债]',
|
|
|
+ # r'非流动[资产负债]',
|
|
|
+ r'.*:$', # 以冒号结尾
|
|
|
+ ]
|
|
|
+
|
|
|
+ for cell in row:
|
|
|
+ cell_text = str(cell).strip()
|
|
|
+ if not cell_text:
|
|
|
+ continue
|
|
|
+
|
|
|
+ for pattern in category_patterns:
|
|
|
+ if re.search(pattern, cell_text):
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _is_data_row(self, row: List[str]) -> bool:
|
|
|
+ """
|
|
|
+ 判断是否为数据行(改进版)
|
|
|
+
|
|
|
+ ✅ "-" 符号表示金额为0或空,应该被认为是有效的数据单元格
|
|
|
+ """
|
|
|
+ if not row:
|
|
|
+ return False
|
|
|
+
|
|
|
+ data_pattern_count = 0
|
|
|
+ non_empty_count = 0
|
|
|
+
|
|
|
+ for cell in row:
|
|
|
+ cell_text = str(cell).strip()
|
|
|
+ if not cell_text:
|
|
|
+ continue
|
|
|
+
|
|
|
+ non_empty_count += 1
|
|
|
+
|
|
|
+ # ✅ "-" 符号也是有效的数据(表示0或空)
|
|
|
+ if cell_text == '-' or cell_text == '—' or cell_text == '--':
|
|
|
+ data_pattern_count += 1
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 包含数字
|
|
|
+ if re.search(r'\d', cell_text):
|
|
|
+ data_pattern_count += 1
|
|
|
+
|
|
|
+ # 包含日期格式
|
|
|
+ if re.search(r'\d{4}[-/年]\d{1,2}[-/月]\d{1,2}', cell_text):
|
|
|
+ data_pattern_count += 1
|
|
|
+
|
|
|
+ # 包含金额格式
|
|
|
+ if re.search(r'-?\d+[,,]?\d*\.?\d+', cell_text):
|
|
|
+ data_pattern_count += 1
|
|
|
+
|
|
|
+ if non_empty_count == 0:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 至少30%的单元格包含数据特征
|
|
|
+ return data_pattern_count / non_empty_count >= 0.3
|
|
|
+
|
|
|
+ def compare_table_headers(self, headers1: List[str], headers2: List[str]) -> Dict:
|
|
|
+ """比较表格表头"""
|
|
|
+ result = {
|
|
|
+ 'match': True,
|
|
|
+ 'differences': [],
|
|
|
+ 'column_mapping': {},
|
|
|
+ 'similarity_scores': []
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(headers1) != len(headers2):
|
|
|
+ result['match'] = False
|
|
|
+ result['differences'].append({
|
|
|
+ 'type': 'table_header_critical',
|
|
|
+ 'description': f'表头列数不一致: {len(headers1)} vs {len(headers2)}',
|
|
|
+ 'severity': 'critical'
|
|
|
+ })
|
|
|
+ return result
|
|
|
+
|
|
|
+ for i, (h1, h2) in enumerate(zip(headers1, headers2)):
|
|
|
+ norm_h1 = self.normalize_header_text(h1)
|
|
|
+ norm_h2 = self.normalize_header_text(h2)
|
|
|
+
|
|
|
+ similarity = self.calculator.calculate_text_similarity(norm_h1, norm_h2)
|
|
|
+ result['similarity_scores'].append({
|
|
|
+ 'column_index': i,
|
|
|
+ 'header1': h1,
|
|
|
+ 'header2': h2,
|
|
|
+ 'similarity': similarity
|
|
|
+ })
|
|
|
+
|
|
|
+ if similarity < self.header_similarity_threshold:
|
|
|
+ result['match'] = False
|
|
|
+ result['differences'].append({
|
|
|
+ 'type': 'table_header_mismatch',
|
|
|
+ 'column_index': i,
|
|
|
+ 'header1': h1,
|
|
|
+ 'header2': h2,
|
|
|
+ 'similarity': similarity,
|
|
|
+ 'description': f'第{i+1}列表头不匹配: "{h1}" vs "{h2}" (相似度: {similarity:.1f}%)',
|
|
|
+ 'severity': 'medium' if similarity < 50 else 'high'
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ result['column_mapping'][i] = i
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ def compare_cell_value(self, value1: str, value2: str, column_type: str,
|
|
|
+ column_name: str = '') -> Dict:
|
|
|
+ """比较单元格值"""
|
|
|
+ result = {
|
|
|
+ 'match': True,
|
|
|
+ 'difference': None
|
|
|
+ }
|
|
|
+
|
|
|
+ v1 = self.text_processor.normalize_text(value1)
|
|
|
+ v2 = self.text_processor.normalize_text(value2)
|
|
|
+
|
|
|
+ if v1 == v2:
|
|
|
+ return result
|
|
|
+
|
|
|
+ if column_type == 'text_number':
|
|
|
+ norm_v1 = self.detector.normalize_text_number(v1)
|
|
|
+ norm_v2 = self.detector.normalize_text_number(v2)
|
|
|
+
|
|
|
+ if norm_v1 == norm_v2:
|
|
|
+ result['match'] = False
|
|
|
+ result['difference'] = {
|
|
|
+ 'type': 'table_text',
|
|
|
+ 'value1': value1,
|
|
|
+ 'value2': value2,
|
|
|
+ 'description': f'文本型数字格式差异: "{value1}" vs "{value2}" (内容相同,空格不同)',
|
|
|
+ 'severity': 'low'
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ result['match'] = False
|
|
|
+ result['difference'] = {
|
|
|
+ 'type': 'table_text',
|
|
|
+ 'value1': value1,
|
|
|
+ 'value2': value2,
|
|
|
+ 'description': f'文本型数字不一致: {value1} vs {value2}',
|
|
|
+ 'severity': 'high'
|
|
|
+ }
|
|
|
+ return result
|
|
|
+
|
|
|
+ if column_type == 'numeric':
|
|
|
+ if self.detector.is_numeric(v1) and self.detector.is_numeric(v2):
|
|
|
+ num1 = self.detector.parse_number(v1)
|
|
|
+ num2 = self.detector.parse_number(v2)
|
|
|
+ if abs(num1 - num2) > 0.01:
|
|
|
+ result['match'] = False
|
|
|
+ result['difference'] = {
|
|
|
+ 'type': 'table_amount',
|
|
|
+ 'value1': value1,
|
|
|
+ 'value2': value2,
|
|
|
+ 'diff_amount': abs(num1 - num2),
|
|
|
+ 'description': f'金额不一致: {value1} vs {value2}',
|
|
|
+ 'severity': 'high' # ✅ 修改:金额差异 = high
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ result['match'] = False
|
|
|
+ result['difference'] = {
|
|
|
+ 'type': 'table_text',
|
|
|
+ 'value1': value1,
|
|
|
+ 'value2': value2,
|
|
|
+ 'description': f'长数字字符串不一致: {value1} vs {value2}',
|
|
|
+ 'severity': 'medium' # ✅ 修改:数字字符串差异 = medium
|
|
|
+ }
|
|
|
+ elif column_type == 'datetime':
|
|
|
+ datetime1 = self.detector.extract_datetime(v1)
|
|
|
+ datetime2 = self.detector.extract_datetime(v2)
|
|
|
+
|
|
|
+ if datetime1 != datetime2:
|
|
|
+ result['match'] = False
|
|
|
+ result['difference'] = {
|
|
|
+ 'type': 'table_datetime',
|
|
|
+ 'value1': value1,
|
|
|
+ 'value2': value2,
|
|
|
+ 'description': f'日期时间不一致: {value1} vs {value2}',
|
|
|
+ 'severity': 'medium' # 日期差异 = medium
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ similarity = self.calculator.calculate_text_similarity(v1, v2)
|
|
|
+ if similarity < self.content_similarity_threshold:
|
|
|
+ result['match'] = False
|
|
|
+ result['difference'] = {
|
|
|
+ 'type': 'table_text',
|
|
|
+ 'value1': value1,
|
|
|
+ 'value2': value2,
|
|
|
+ 'similarity': similarity,
|
|
|
+ 'description': f'文本不一致: {value1} vs {value2} (相似度: {similarity:.1f}%)',
|
|
|
+ 'severity': 'low' if similarity > 80 else 'medium' # 根据相似度判断
|
|
|
+ }
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ def compare_tables(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]:
|
|
|
+ """标准表格比较"""
|
|
|
+ differences = []
|
|
|
+ max_rows = max(len(table1), len(table2))
|
|
|
+
|
|
|
+ for i in range(max_rows):
|
|
|
+ row1 = table1[i] if i < len(table1) else []
|
|
|
+ row2 = table2[i] if i < len(table2) else []
|
|
|
+
|
|
|
+ max_cols = max(len(row1), len(row2))
|
|
|
+
|
|
|
+ for j in range(max_cols):
|
|
|
+ cell1 = row1[j] if j < len(row1) else ""
|
|
|
+ cell2 = row2[j] if j < len(row2) else ""
|
|
|
+
|
|
|
+ if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if cell1 != cell2:
|
|
|
+ if self.detector.is_numeric(cell1) and self.detector.is_numeric(cell2):
|
|
|
+ num1 = self.detector.parse_number(cell1)
|
|
|
+ num2 = self.detector.parse_number(cell2)
|
|
|
+ if abs(num1 - num2) > 0.001:
|
|
|
+ differences.append({
|
|
|
+ 'type': 'table_amount',
|
|
|
+ 'position': f'行{i+1}列{j+1}',
|
|
|
+ 'file1_value': cell1,
|
|
|
+ 'file2_value': cell2,
|
|
|
+ 'description': f'金额不一致: {cell1} vs {cell2}',
|
|
|
+ 'severity': 'high', # ✅ 添加:金额差异 = high
|
|
|
+ 'row_index': i,
|
|
|
+ 'col_index': j
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ differences.append({
|
|
|
+ 'type': 'table_text',
|
|
|
+ 'position': f'行{i+1}列{j+1}',
|
|
|
+ 'file1_value': cell1,
|
|
|
+ 'file2_value': cell2,
|
|
|
+ 'description': f'文本不一致: {cell1} vs {cell2}',
|
|
|
+ 'severity': 'medium', # ✅ 添加:文本差异 = medium
|
|
|
+ 'row_index': i,
|
|
|
+ 'col_index': j
|
|
|
+ })
|
|
|
+
|
|
|
+ return differences
|
|
|
+
|
|
|
+ def compare_table_flow_list(self, table1: List[List[str]], table2: List[List[str]]) -> List[Dict]:
|
|
|
+ """流水列表表格比较算法"""
|
|
|
+ differences = []
|
|
|
+
|
|
|
+ if not table1 or not table2:
|
|
|
+ return [{
|
|
|
+ 'type': 'table_empty',
|
|
|
+ 'description': '表格为空',
|
|
|
+ 'severity': 'critical'
|
|
|
+ }]
|
|
|
+
|
|
|
+ print(f"\n📋 开始流水表格对比...")
|
|
|
+
|
|
|
+ # 检测表头位置
|
|
|
+ header_row_idx1 = self.detect_table_header_row(table1)
|
|
|
+ header_row_idx2 = self.detect_table_header_row(table2)
|
|
|
+
|
|
|
+ if header_row_idx1 != header_row_idx2:
|
|
|
+ differences.append({
|
|
|
+ 'type': 'table_header_position',
|
|
|
+ 'position': '表头位置',
|
|
|
+ 'file1_value': f'第{header_row_idx1 + 1}行',
|
|
|
+ 'file2_value': f'第{header_row_idx2 + 1}行',
|
|
|
+ 'description': f'表头位置不一致: 文件1在第{header_row_idx1 + 1}行,文件2在第{header_row_idx2 + 1}行',
|
|
|
+ 'severity': 'high'
|
|
|
+ })
|
|
|
+
|
|
|
+ # 比对表头前的内容
|
|
|
+ if header_row_idx1 > 0 or header_row_idx2 > 0:
|
|
|
+ print(f"\n📝 对比表头前的内容...")
|
|
|
+ pre_header_table1 = table1[:header_row_idx1] if header_row_idx1 > 0 else []
|
|
|
+ pre_header_table2 = table2[:header_row_idx2] if header_row_idx2 > 0 else []
|
|
|
+
|
|
|
+ if pre_header_table1 or pre_header_table2:
|
|
|
+ pre_header_diffs = self.compare_tables(pre_header_table1, pre_header_table2)
|
|
|
+ for diff in pre_header_diffs:
|
|
|
+ diff['type'] = 'table_pre_header'
|
|
|
+ diff['position'] = f"表头前{diff['position']}"
|
|
|
+ diff['severity'] = 'medium'
|
|
|
+ differences.extend(pre_header_diffs)
|
|
|
+
|
|
|
+ # 比较表头
|
|
|
+ headers1 = table1[header_row_idx1]
|
|
|
+ headers2 = table2[header_row_idx2]
|
|
|
+
|
|
|
+ print(f"\n📋 对比表头...")
|
|
|
+ header_result = self.compare_table_headers(headers1, headers2)
|
|
|
+
|
|
|
+ if not header_result['match']:
|
|
|
+ print(f"\n⚠️ 表头文字存在差异")
|
|
|
+ for diff in header_result['differences']:
|
|
|
+ differences.append({
|
|
|
+ 'type': diff.get('type', 'table_header_mismatch'),
|
|
|
+ 'position': '表头',
|
|
|
+ 'file1_value': diff.get('header1', ''),
|
|
|
+ 'file2_value': diff.get('header2', ''),
|
|
|
+ 'description': diff['description'],
|
|
|
+ 'severity': diff.get('severity', 'high'),
|
|
|
+ })
|
|
|
+ if diff.get('severity') == 'critical':
|
|
|
+ return differences
|
|
|
+
|
|
|
+ # 检测列类型并比较数据行
|
|
|
+ column_types1 = self._detect_column_types(table1, header_row_idx1, headers1)
|
|
|
+ column_types2 = self._detect_column_types(table2, header_row_idx2, headers2)
|
|
|
+
|
|
|
+ # 处理列类型不匹配
|
|
|
+ mismatched_columns = self._check_column_type_mismatch(
|
|
|
+ column_types1, column_types2, headers1, headers2, differences
|
|
|
+ )
|
|
|
+
|
|
|
+ # 合并列类型
|
|
|
+ column_types = self._merge_column_types(column_types1, column_types2, mismatched_columns)
|
|
|
+
|
|
|
+ # 逐行比较数据
|
|
|
+ data_diffs = self._compare_data_rows(
|
|
|
+ table1, table2, header_row_idx1, header_row_idx2,
|
|
|
+ headers1, column_types, mismatched_columns, header_result['match']
|
|
|
+ )
|
|
|
+ differences.extend(data_diffs)
|
|
|
+
|
|
|
+ print(f"\n✅ 流水表格对比完成,发现 {len(differences)} 个差异")
|
|
|
+ return differences
|
|
|
+
|
|
|
+ def _detect_column_types(self, table: List[List[str]], header_row_idx: int,
|
|
|
+ headers: List[str]) -> List[str]:
|
|
|
+ """检测列类型"""
|
|
|
+ column_types = []
|
|
|
+ for col_idx in range(len(headers)):
|
|
|
+ col_values = [
|
|
|
+ row[col_idx]
|
|
|
+ for row in table[header_row_idx + 1:]
|
|
|
+ if col_idx < len(row)
|
|
|
+ ]
|
|
|
+ col_type = self.detector.detect_column_type(col_values)
|
|
|
+ column_types.append(col_type)
|
|
|
+ return column_types
|
|
|
+
|
|
|
+ def _check_column_type_mismatch(self, column_types1: List[str], column_types2: List[str],
|
|
|
+ headers1: List[str], headers2: List[str],
|
|
|
+ differences: List[Dict]) -> List[int]:
|
|
|
+ """检查列类型不匹配"""
|
|
|
+ mismatched_columns = []
|
|
|
+ for col_idx in range(min(len(column_types1), len(column_types2))):
|
|
|
+ if column_types1[col_idx] != column_types2[col_idx]:
|
|
|
+ mismatched_columns.append(col_idx)
|
|
|
+ differences.append({
|
|
|
+ 'type': 'table_column_type_mismatch',
|
|
|
+ 'position': f'第{col_idx + 1}列',
|
|
|
+ 'file1_value': f'{headers1[col_idx]} ({column_types1[col_idx]})',
|
|
|
+ 'file2_value': f'{headers2[col_idx]} ({column_types2[col_idx]})',
|
|
|
+ 'description': f'列类型不一致: {column_types1[col_idx]} vs {column_types2[col_idx]}',
|
|
|
+ 'severity': 'high',
|
|
|
+ 'column_index': col_idx
|
|
|
+ })
|
|
|
+
|
|
|
+ total_columns = min(len(column_types1), len(column_types2))
|
|
|
+ mismatch_ratio = len(mismatched_columns) / total_columns if total_columns > 0 else 0
|
|
|
+
|
|
|
+ if mismatch_ratio > 0.5:
|
|
|
+ differences.append({
|
|
|
+ 'type': 'table_header_critical',
|
|
|
+ 'position': '表格列类型',
|
|
|
+ 'file1_value': f'{len(mismatched_columns)}列类型不一致',
|
|
|
+ 'file2_value': f'共{total_columns}列',
|
|
|
+ 'description': f'列类型差异过大: {len(mismatched_columns)}/{total_columns}列不匹配 ({mismatch_ratio:.1%})',
|
|
|
+ 'severity': 'critical'
|
|
|
+ })
|
|
|
+
|
|
|
+ return mismatched_columns
|
|
|
+
|
|
|
+ def _merge_column_types(self, column_types1: List[str], column_types2: List[str],
|
|
|
+ mismatched_columns: List[int]) -> List[str]:
|
|
|
+ """合并列类型"""
|
|
|
+ column_types = []
|
|
|
+ for col_idx in range(max(len(column_types1), len(column_types2))):
|
|
|
+ if col_idx >= len(column_types1):
|
|
|
+ column_types.append(column_types2[col_idx])
|
|
|
+ elif col_idx >= len(column_types2):
|
|
|
+ column_types.append(column_types1[col_idx])
|
|
|
+ elif col_idx in mismatched_columns:
|
|
|
+ type1 = column_types1[col_idx]
|
|
|
+ type2 = column_types2[col_idx]
|
|
|
+
|
|
|
+ if type1 == 'text' or type2 == 'text':
|
|
|
+ column_types.append('text')
|
|
|
+ elif type1 == 'text_number' or type2 == 'text_number':
|
|
|
+ column_types.append('text_number')
|
|
|
+ else:
|
|
|
+ column_types.append(type1)
|
|
|
+ else:
|
|
|
+ column_types.append(column_types1[col_idx])
|
|
|
+
|
|
|
+ return column_types
|
|
|
+
|
|
|
+ def _compare_data_rows(self, table1: List[List[str]], table2: List[List[str]],
|
|
|
+ header_row_idx1: int, header_row_idx2: int,
|
|
|
+ headers1: List[str], column_types: List[str],
|
|
|
+ mismatched_columns: List[int], header_match: bool) -> List[Dict]:
|
|
|
+ """逐行比较数据"""
|
|
|
+ differences = []
|
|
|
+ data_rows1 = table1[header_row_idx1 + 1:]
|
|
|
+ data_rows2 = table2[header_row_idx2 + 1:]
|
|
|
+ max_rows = max(len(data_rows1), len(data_rows2))
|
|
|
+
|
|
|
+ for row_idx in range(max_rows):
|
|
|
+ row1 = data_rows1[row_idx] if row_idx < len(data_rows1) else []
|
|
|
+ row2 = data_rows2[row_idx] if row_idx < len(data_rows2) else []
|
|
|
+ actual_row_num = header_row_idx1 + row_idx + 2
|
|
|
+
|
|
|
+ if not row1:
|
|
|
+ differences.append({
|
|
|
+ 'type': 'table_row_missing',
|
|
|
+ 'position': f'第{actual_row_num}行',
|
|
|
+ 'file1_value': '',
|
|
|
+ 'file2_value': ', '.join(row2),
|
|
|
+ 'description': f'文件1缺少第{actual_row_num}行',
|
|
|
+ 'severity': 'high',
|
|
|
+ 'row_index': actual_row_num
|
|
|
+ })
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not row2:
|
|
|
+ differences.append({
|
|
|
+ 'type': 'table_row_missing',
|
|
|
+ 'position': f'第{actual_row_num}行',
|
|
|
+ 'file1_value': ', '.join(row1),
|
|
|
+ 'file2_value': '',
|
|
|
+ 'description': f'文件2缺少第{actual_row_num}行',
|
|
|
+ 'severity': 'high',
|
|
|
+ 'row_index': actual_row_num
|
|
|
+ })
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 逐列比较
|
|
|
+ max_cols = max(len(row1), len(row2))
|
|
|
+ for col_idx in range(max_cols):
|
|
|
+ cell1 = row1[col_idx] if col_idx < len(row1) else ''
|
|
|
+ cell2 = row2[col_idx] if col_idx < len(row2) else ''
|
|
|
+
|
|
|
+ if "[图片内容-忽略]" in cell1 or "[图片内容-忽略]" in cell2:
|
|
|
+ continue
|
|
|
+
|
|
|
+ column_type = column_types[col_idx] if col_idx < len(column_types) else 'text'
|
|
|
+ column_name = headers1[col_idx] if col_idx < len(headers1) else f'列{col_idx + 1}'
|
|
|
+
|
|
|
+ compare_result = self.compare_cell_value(cell1, cell2, column_type, column_name)
|
|
|
+
|
|
|
+ if not compare_result['match']:
|
|
|
+ diff_info = compare_result['difference']
|
|
|
+ type_mismatch_note = ""
|
|
|
+ if col_idx in mismatched_columns:
|
|
|
+ type_mismatch_note = " [列类型冲突]"
|
|
|
+
|
|
|
+ # ✅ 确定最终严重度:优先使用 diff_info 的 severity
|
|
|
+ base_severity = diff_info.get('severity', 'medium')
|
|
|
+
|
|
|
+ # 如果列类型冲突,且基础严重度不是 high,则提升到 high
|
|
|
+ final_severity = 'high' if col_idx in mismatched_columns else base_severity
|
|
|
+
|
|
|
+ differences.append({
|
|
|
+ 'type': diff_info['type'],
|
|
|
+ 'position': f'第{actual_row_num}行第{col_idx + 1}列',
|
|
|
+ 'file1_value': diff_info['value1'],
|
|
|
+ 'file2_value': diff_info['value2'],
|
|
|
+ 'description': diff_info['description'] + type_mismatch_note,
|
|
|
+ 'severity': final_severity, # ✅ 使用计算后的严重度
|
|
|
+ 'row_index': actual_row_num,
|
|
|
+ 'col_index': col_idx,
|
|
|
+ 'column_name': column_name,
|
|
|
+ 'column_type': column_type,
|
|
|
+ 'column_type_mismatch': col_idx in mismatched_columns,
|
|
|
+ })
|
|
|
+
|
|
|
+ return differences
|