import streamlit as st import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional, Set import difflib from dataclasses import dataclass from enum import Enum class ChangeType(Enum): UNCHANGED = "unchanged" MODIFIED = "modified" ADDED = "added" REMOVED = "removed" @dataclass class CellDiff: row_left: Optional[int] row_right: Optional[int] column: str change_type: ChangeType old_value: any = None new_value: any = None similarity: float = 0.0 @dataclass class RowDiff: row_left: Optional[int] row_right: Optional[int] change_type: ChangeType similarity: float = 0.0 cell_diffs: List[CellDiff] = None class DataFrameDiffAlgorithm: """类似VSCode的DataFrame差异算法""" def __init__(self, similarity_threshold: float = 0.7): self.similarity_threshold = similarity_threshold def compute_diff(self, df_left: pd.DataFrame, df_right: pd.DataFrame) -> List[RowDiff]: """计算两个DataFrame的差异,从上到下寻找最匹配的行""" # 确保列对齐 all_columns = list(set(df_left.columns) | set(df_right.columns)) df_left_aligned = self._align_columns(df_left, all_columns) df_right_aligned = self._align_columns(df_right, all_columns) # 计算行相似度矩阵 similarity_matrix = self._compute_similarity_matrix(df_left_aligned, df_right_aligned) # 从上到下匹配行 row_mappings = self._match_rows_top_down(similarity_matrix) # 生成差异结果 return self._generate_diff_result(df_left_aligned, df_right_aligned, row_mappings, all_columns) def _align_columns(self, df: pd.DataFrame, all_columns: List[str]) -> pd.DataFrame: """对齐DataFrame列""" aligned_df = df.copy() for col in all_columns: if col not in aligned_df.columns: aligned_df[col] = None return aligned_df[all_columns] def _compute_similarity_matrix(self, df_left: pd.DataFrame, df_right: pd.DataFrame) -> np.ndarray: """计算行之间的相似度矩阵""" matrix = np.zeros((len(df_left), len(df_right))) for i in range(len(df_left)): for j in range(len(df_right)): matrix[i, j] = self._compute_row_similarity( df_left.iloc[i], df_right.iloc[j] ) return matrix def _compute_row_similarity(self, row1: pd.Series, row2: pd.Series) -> float: """计算两行的相似度""" total_cols = len(row1) if total_cols == 0: return 1.0 matches = 0 for col in row1.index: val1, val2 = row1[col], row2[col] # 处理NaN值 if pd.isna(val1) and pd.isna(val2): matches += 1 elif pd.isna(val1) or pd.isna(val2): continue else: # 字符串相似度计算 str1, str2 = str(val1), str(val2) if str1 == str2: matches += 1 else: # 使用difflib计算字符串相似度 similarity = difflib.SequenceMatcher(None, str1, str2).ratio() matches += similarity * 0.5 # 部分匹配给予部分分数 return matches / total_cols def _match_rows_top_down(self, similarity_matrix: np.ndarray) -> Dict[int, Optional[int]]: """从上到下匹配行,优先匹配相似度高的行""" left_rows, right_rows = similarity_matrix.shape matched_right = set() row_mappings = {} # 从上到下处理左侧每一行 for left_idx in range(left_rows): best_right_idx = None best_similarity = 0.0 # 在未匹配的右侧行中寻找最佳匹配 for right_idx in range(right_rows): if right_idx not in matched_right: similarity = similarity_matrix[left_idx, right_idx] if similarity > best_similarity and similarity >= self.similarity_threshold: best_similarity = similarity best_right_idx = right_idx if best_right_idx is not None: row_mappings[left_idx] = best_right_idx matched_right.add(best_right_idx) else: row_mappings[left_idx] = None # 左侧行被删除 return row_mappings def _generate_diff_result(self, df_left: pd.DataFrame, df_right: pd.DataFrame, row_mappings: Dict[int, Optional[int]], all_columns: List[str]) -> List[RowDiff]: """生成差异结果""" result = [] matched_right_rows = set(row_mappings.values()) - {None} # 处理匹配的行和删除的行 for left_idx, right_idx in row_mappings.items(): if right_idx is None: # 删除的行 result.append(RowDiff( row_left=left_idx, row_right=None, change_type=ChangeType.REMOVED, similarity=0.0 )) else: # 匹配的行 - 检查单元格差异 cell_diffs = self._compare_cells( df_left.iloc[left_idx], df_right.iloc[right_idx], left_idx, right_idx, all_columns ) change_type = ChangeType.UNCHANGED if any(cell.change_type != ChangeType.UNCHANGED for cell in cell_diffs): change_type = ChangeType.MODIFIED similarity = self._compute_row_similarity(df_left.iloc[left_idx], df_right.iloc[right_idx]) result.append(RowDiff( row_left=left_idx, row_right=right_idx, change_type=change_type, similarity=similarity, cell_diffs=cell_diffs )) # 处理新增的行(右侧未匹配的行) for right_idx in range(len(df_right)): if right_idx not in matched_right_rows: result.append(RowDiff( row_left=None, row_right=right_idx, change_type=ChangeType.ADDED, similarity=0.0 )) return result def _compare_cells(self, row_left: pd.Series, row_right: pd.Series, left_idx: int, right_idx: int, columns: List[str]) -> List[CellDiff]: """比较单元格差异""" cell_diffs = [] for col in columns: val_left = row_left[col] if col in row_left.index else None val_right = row_right[col] if col in row_right.index else None # 处理NaN值 if pd.isna(val_left) and pd.isna(val_right): change_type = ChangeType.UNCHANGED elif pd.isna(val_left): change_type = ChangeType.ADDED elif pd.isna(val_right): change_type = ChangeType.REMOVED elif str(val_left) == str(val_right): change_type = ChangeType.UNCHANGED else: change_type = ChangeType.MODIFIED cell_diffs.append(CellDiff( row_left=left_idx, row_right=right_idx, column=col, change_type=change_type, old_value=val_left, new_value=val_right )) return cell_diffs class VSCodeStyleDataFrameDiff: """类似VSCode样式的DataFrame差异展示""" def __init__(self): self.diff_algorithm = DataFrameDiffAlgorithm() self._inject_css() def _inject_css(self): """注入VSCode风格的CSS样式 + data_editor样式""" st.markdown(""" """, unsafe_allow_html=True) def _create_editable_diff_view(self, row_diffs: List[RowDiff]): """创建可编辑的差异视图""" st.subheader("🔍 并排对比") left_col, right_col = st.columns(2) with left_col: st.markdown("### ✏️ 可编辑版本 (左侧)") # 方案1: 使用标记版本的DataFrame self._create_marked_data_editor(st.session_state.df_edited, row_diffs) with right_col: st.markdown("### 📝 原始版本 (右侧)") self._create_diff_table(row_diffs, 'right') def _create_marked_data_editor(self, df: pd.DataFrame, row_diffs: List[RowDiff]): """创建带标记的data_editor(方案1:在数据中添加标记)""" # 创建带差异标记的DataFrame marked_df = self._create_marked_dataframe(df, row_diffs) # 创建列配置 column_config = self._create_marked_column_config(marked_df, row_diffs) # 显示data_editor edited_df = st.data_editor( marked_df, height=500, use_container_width=True, num_rows="dynamic", column_config=column_config, key="marked_diff_editor", hide_index=False ) # 移除标记列,恢复原始数据格式 cleaned_df = self._clean_marked_dataframe(edited_df) # 检测数据变化并更新 if not cleaned_df.equals(st.session_state.df_edited): st.session_state.df_edited = cleaned_df.copy() st.rerun() def _create_marked_dataframe(self, df: pd.DataFrame, row_diffs: List[RowDiff]) -> pd.DataFrame: """创建带差异标记的DataFrame""" marked_df = df.copy() # 添加差异标记列 marked_df.insert(0, '🎨 差异类型', '') # 根据差异类型为每行添加标记 for row_diff in row_diffs: if row_diff.row_right is not None and row_diff.row_right < len(marked_df): if row_diff.change_type == ChangeType.ADDED: marked_df.iloc[row_diff.row_right, 0] = '🟢 新增' elif row_diff.change_type == ChangeType.REMOVED: marked_df.iloc[row_diff.row_right, 0] = '🔴 删除' elif row_diff.change_type == ChangeType.MODIFIED: marked_df.iloc[row_diff.row_right, 0] = '🟡 修改' else: marked_df.iloc[row_diff.row_right, 0] = '⚪ 未变' return marked_df def _create_marked_column_config(self, marked_df: pd.DataFrame, row_diffs: List[RowDiff]) -> Dict: """为带标记的DataFrame创建列配置""" config = {} # 差异类型列配置 config['🎨 差异类型'] = st.column_config.SelectboxColumn( '🎨 差异类型', help="行的差异类型", options=['⚪ 未变', '🟡 修改', '🟢 新增', '🔴 删除'], disabled=True, # 只读 width="small" ) # 其他列配置 for col in marked_df.columns[1:]: # 跳过差异标记列 if marked_df[col].dtype in ['int64', 'float64']: config[col] = st.column_config.NumberColumn( col, help=f"数值列: {col}", format="%.2f" if marked_df[col].dtype == 'float64' else "%d" ) elif marked_df[col].dtype == 'bool': config[col] = st.column_config.CheckboxColumn( col, help=f"布尔列: {col}" ) else: config[col] = st.column_config.TextColumn( col, help=f"文本列: {col}", max_chars=100 ) return config def _clean_marked_dataframe(self, marked_df: pd.DataFrame) -> pd.DataFrame: """移除标记列,恢复原始DataFrame格式""" return marked_df.drop(columns=['🎨 差异类型']) def create_diff_view(self): """创建差异对比视图""" st.title("📊 VSCode风格 DataFrame 差异对比") st.markdown("---") # 初始化数据 if 'df_original' not in st.session_state: st.session_state.df_original = self._create_sample_data() if 'df_edited' not in st.session_state: st.session_state.df_edited = st.session_state.df_original.copy() # 控制面板 self._create_control_panel() # 计算差异 row_diffs = self.diff_algorithm.compute_diff( st.session_state.df_original, st.session_state.df_edited ) # 显示统计信息 self._display_diff_statistics(row_diffs) # 主要对比区域 self._create_main_diff_view(row_diffs) # 详细差异列表 self._create_detailed_diff_view(row_diffs) def _create_sample_data(self) -> pd.DataFrame: """创建示例数据""" return pd.DataFrame({ 'ID': [1, 2, 3, 4, 5], 'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], 'Age': [25, 30, 35, 40, 45], 'City': ['New York', 'London', 'Paris', 'Tokyo', 'Sydney'], 'Salary': [50000, 60000, 70000, 80000, 90000] }) def _create_control_panel(self): """创建控制面板""" with st.expander("🎛️ 控制面板", expanded=True): col1, col2, col3, col4 = st.columns(4) with col1: if st.button("🔄 重置数据"): st.session_state.df_original = self._create_sample_data() st.session_state.df_edited = st.session_state.df_original.copy() st.rerun() with col2: if st.button("🎲 生成随机差异"): st.session_state.df_edited = self._create_random_diff() st.rerun() with col3: similarity_threshold = st.slider( "相似度阈值", min_value=0.1, max_value=1.0, value=0.7, step=0.1 ) self.diff_algorithm.similarity_threshold = similarity_threshold with col4: auto_scroll = st.checkbox("🔗 同步滚动", value=True) def _create_random_diff(self) -> pd.DataFrame: """创建随机差异用于演示""" df = st.session_state.df_original.copy() # 修改一些值 df.loc[1, 'Name'] = 'Robert' df.loc[2, 'Age'] = 36 df.loc[3, 'City'] = 'Berlin' # 删除一行 df = df.drop(index=4) # 新增一行 new_row = pd.DataFrame({ 'ID': [6], 'Name': ['Frank'], 'Age': [28], 'City': ['Madrid'], 'Salary': [55000] }) df = pd.concat([df, new_row], ignore_index=True) return df def _display_diff_statistics(self, row_diffs: List[RowDiff]): """显示差异统计""" stats = self._compute_diff_stats(row_diffs) col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric("总行数", stats['total_rows']) with col2: st.metric("🟢 新增", stats['added_rows'], delta=stats['added_rows']) with col3: st.metric("🔴 删除", stats['removed_rows'], delta=-stats['removed_rows']) with col4: st.metric("🟡 修改", stats['modified_rows']) with col5: st.metric("⚪ 未变", stats['unchanged_rows']) def _compute_diff_stats(self, row_diffs: List[RowDiff]) -> Dict: """计算差异统计""" stats = { 'total_rows': len(row_diffs), 'added_rows': 0, 'removed_rows': 0, 'modified_rows': 0, 'unchanged_rows': 0 } for row_diff in row_diffs: if row_diff.change_type == ChangeType.ADDED: stats['added_rows'] += 1 elif row_diff.change_type == ChangeType.REMOVED: stats['removed_rows'] += 1 elif row_diff.change_type == ChangeType.MODIFIED: stats['modified_rows'] += 1 else: stats['unchanged_rows'] += 1 return stats def _create_main_diff_view(self, row_diffs: List[RowDiff]): """创建主要差异视图""" self._create_editable_diff_view(row_diffs) def _create_diff_table(self, row_diffs: List[RowDiff], side: str): """创建带差异高亮的表格""" df = st.session_state.df_original if side == 'left' else st.session_state.df_edited # 构建HTML表格 html_table = self._build_diff_html_table(row_diffs, side, df) # 显示表格 st.markdown(f"""
数据为空
" html = '| # | ' for col in df.columns: html += f'{col} | ' html += '
|---|---|
| {row_idx + 1} | ' # 构建单元格 for col in df.columns: cell_class = row_class cell_value = df.iloc[row_idx][col] # 如果是修改的行,检查单元格级别的差异 if row_diff.change_type == ChangeType.MODIFIED and row_diff.cell_diffs: cell_diff = next((cd for cd in row_diff.cell_diffs if cd.column == col), None) if cell_diff: cell_class = self._get_cell_css_class(cell_diff.change_type) display_value = str(cell_value) if not pd.isna(cell_value) else "" html += f'{display_value} | ' html += '