||
- import streamlit as st
- import pandas as pd
- import numpy as np
- from typing import Dict, List, Tuple, Optional, Set
- import difflib
- from dataclasses import dataclass
- from enum import Enum
- class ChangeType(Enum):
- UNCHANGED = "unchanged"
- MODIFIED = "modified"
- ADDED = "added"
- REMOVED = "removed"
- @dataclass
- class CellDiff:
- row_left: Optional[int]
- row_right: Optional[int]
- column: str
- change_type: ChangeType
- old_value: any = None
- new_value: any = None
- similarity: float = 0.0
- @dataclass
- class RowDiff:
- row_left: Optional[int]
- row_right: Optional[int]
- change_type: ChangeType
- similarity: float = 0.0
- cell_diffs: List[CellDiff] = None
- class DataFrameDiffAlgorithm:
- """类似VSCode的DataFrame差异算法"""
-
- def __init__(self, similarity_threshold: float = 0.7):
- self.similarity_threshold = similarity_threshold
-
- def compute_diff(self, df_left: pd.DataFrame, df_right: pd.DataFrame) -> List[RowDiff]:
- """计算两个DataFrame的差异,从上到下寻找最匹配的行"""
-
- # 确保列对齐
- all_columns = list(set(df_left.columns) | set(df_right.columns))
- df_left_aligned = self._align_columns(df_left, all_columns)
- df_right_aligned = self._align_columns(df_right, all_columns)
-
- # 计算行相似度矩阵
- similarity_matrix = self._compute_similarity_matrix(df_left_aligned, df_right_aligned)
-
- # 从上到下匹配行
- row_mappings = self._match_rows_top_down(similarity_matrix)
-
- # 生成差异结果
- return self._generate_diff_result(df_left_aligned, df_right_aligned, row_mappings, all_columns)
-
- def _align_columns(self, df: pd.DataFrame, all_columns: List[str]) -> pd.DataFrame:
- """对齐DataFrame列"""
- aligned_df = df.copy()
- for col in all_columns:
- if col not in aligned_df.columns:
- aligned_df[col] = None
- return aligned_df[all_columns]
-
- def _compute_similarity_matrix(self, df_left: pd.DataFrame, df_right: pd.DataFrame) -> np.ndarray:
- """计算行之间的相似度矩阵"""
- matrix = np.zeros((len(df_left), len(df_right)))
-
- for i in range(len(df_left)):
- for j in range(len(df_right)):
- matrix[i, j] = self._compute_row_similarity(
- df_left.iloc[i], df_right.iloc[j]
- )
-
- return matrix
-
- def _compute_row_similarity(self, row1: pd.Series, row2: pd.Series) -> float:
- """计算两行的相似度"""
- total_cols = len(row1)
- if total_cols == 0:
- return 1.0
-
- matches = 0
- for col in row1.index:
- val1, val2 = row1[col], row2[col]
-
- # 处理NaN值
- if pd.isna(val1) and pd.isna(val2):
- matches += 1
- elif pd.isna(val1) or pd.isna(val2):
- continue
- else:
- # 字符串相似度计算
- str1, str2 = str(val1), str(val2)
- if str1 == str2:
- matches += 1
- else:
- # 使用difflib计算字符串相似度
- similarity = difflib.SequenceMatcher(None, str1, str2).ratio()
- matches += similarity * 0.5 # 部分匹配给予部分分数
-
- return matches / total_cols
-
- def _match_rows_top_down(self, similarity_matrix: np.ndarray) -> Dict[int, Optional[int]]:
- """从上到下匹配行,优先匹配相似度高的行"""
- left_rows, right_rows = similarity_matrix.shape
- matched_right = set()
- row_mappings = {}
-
- # 从上到下处理左侧每一行
- for left_idx in range(left_rows):
- best_right_idx = None
- best_similarity = 0.0
-
- # 在未匹配的右侧行中寻找最佳匹配
- for right_idx in range(right_rows):
- if right_idx not in matched_right:
- similarity = similarity_matrix[left_idx, right_idx]
- if similarity > best_similarity and similarity >= self.similarity_threshold:
- best_similarity = similarity
- best_right_idx = right_idx
-
- if best_right_idx is not None:
- row_mappings[left_idx] = best_right_idx
- matched_right.add(best_right_idx)
- else:
- row_mappings[left_idx] = None # 左侧行被删除
-
- return row_mappings
-
- def _generate_diff_result(self, df_left: pd.DataFrame, df_right: pd.DataFrame,
- row_mappings: Dict[int, Optional[int]], all_columns: List[str]) -> List[RowDiff]:
- """生成差异结果"""
- result = []
- matched_right_rows = set(row_mappings.values()) - {None}
-
- # 处理匹配的行和删除的行
- for left_idx, right_idx in row_mappings.items():
- if right_idx is None:
- # 删除的行
- result.append(RowDiff(
- row_left=left_idx,
- row_right=None,
- change_type=ChangeType.REMOVED,
- similarity=0.0
- ))
- else:
- # 匹配的行 - 检查单元格差异
- cell_diffs = self._compare_cells(
- df_left.iloc[left_idx], df_right.iloc[right_idx],
- left_idx, right_idx, all_columns
- )
-
- change_type = ChangeType.UNCHANGED
- if any(cell.change_type != ChangeType.UNCHANGED for cell in cell_diffs):
- change_type = ChangeType.MODIFIED
-
- similarity = self._compute_row_similarity(df_left.iloc[left_idx], df_right.iloc[right_idx])
-
- result.append(RowDiff(
- row_left=left_idx,
- row_right=right_idx,
- change_type=change_type,
- similarity=similarity,
- cell_diffs=cell_diffs
- ))
-
- # 处理新增的行(右侧未匹配的行)
- for right_idx in range(len(df_right)):
- if right_idx not in matched_right_rows:
- result.append(RowDiff(
- row_left=None,
- row_right=right_idx,
- change_type=ChangeType.ADDED,
- similarity=0.0
- ))
-
- return result
-
- def _compare_cells(self, row_left: pd.Series, row_right: pd.Series,
- left_idx: int, right_idx: int, columns: List[str]) -> List[CellDiff]:
- """比较单元格差异"""
- cell_diffs = []
-
- for col in columns:
- val_left = row_left[col] if col in row_left.index else None
- val_right = row_right[col] if col in row_right.index else None
-
- # 处理NaN值
- if pd.isna(val_left) and pd.isna(val_right):
- change_type = ChangeType.UNCHANGED
- elif pd.isna(val_left):
- change_type = ChangeType.ADDED
- elif pd.isna(val_right):
- change_type = ChangeType.REMOVED
- elif str(val_left) == str(val_right):
- change_type = ChangeType.UNCHANGED
- else:
- change_type = ChangeType.MODIFIED
-
- cell_diffs.append(CellDiff(
- row_left=left_idx,
- row_right=right_idx,
- column=col,
- change_type=change_type,
- old_value=val_left,
- new_value=val_right
- ))
-
- return cell_diffs
- class VSCodeStyleDataFrameDiff:
- """类似VSCode样式的DataFrame差异展示"""
-
- def __init__(self):
- self.diff_algorithm = DataFrameDiffAlgorithm()
- self._inject_css()
-
- def _inject_css(self):
- """注入VSCode风格的CSS样式 + data_editor样式"""
- st.markdown("""
- <style>
- /* VSCode风格的差异显示 */
- .vscode-diff-container {
- font-family: 'Monaco', 'Menlo', 'Ubuntu Mono', monospace;
- font-size: 12px;
- border: 1px solid #3c3c3c;
- border-radius: 6px;
- overflow: hidden;
- }
-
- .diff-header {
- background-color: #2d2d30;
- color: #cccccc;
- padding: 8px 12px;
- font-weight: bold;
- border-bottom: 1px solid #3c3c3c;
- }
-
- .diff-content {
- height: 500px;
- overflow: auto;
- background-color: #1e1e1e;
- }
-
- .diff-table {
- width: 100%;
- border-collapse: collapse;
- color: #cccccc;
- }
-
- .diff-table th {
- background-color: #2d2d30;
- border: 1px solid #3c3c3c;
- padding: 6px 8px;
- text-align: left;
- position: sticky;
- top: 0;
- z-index: 10;
- }
-
- .diff-table td {
- border: 1px solid #3c3c3c;
- padding: 4px 8px;
- white-space: nowrap;
- position: relative;
- }
-
- /* 差异颜色 - VSCode风格 */
- .diff-added {
- background-color: rgba(22, 160, 133, 0.2) !important;
- border-left: 3px solid #16a085 !important;
- }
-
- .diff-removed {
- background-color: rgba(231, 76, 60, 0.2) !important;
- border-left: 3px solid #e74c3c !important;
- }
-
- .diff-modified {
- background-color: rgba(241, 196, 15, 0.2) !important;
- border-left: 3px solid #f1c40f !important;
- }
-
- .diff-unchanged {
- background-color: transparent;
- }
-
- /* 行号样式 */
- .line-number {
- background-color: #2d2d30;
- color: #858585;
- text-align: right;
- padding: 4px 8px;
- border-right: 1px solid #3c3c3c;
- user-select: none;
- min-width: 40px;
- }
-
- /* 悬停效果 */
- .diff-table tbody tr:hover {
- background-color: rgba(255, 255, 255, 0.05);
- }
-
- /* 尝试通过CSS选择器为data_editor添加样式 */
- div[data-testid="stDataFrame"] {
- height: 500px;
- }
-
- /* 针对特定行的data_editor样式 - 通过行索引 */
- div[data-testid="stDataFrame"] tbody tr:nth-child(1) {
- background-color: rgba(22, 160, 133, 0.1);
- }
-
- div[data-testid="stDataFrame"] tbody tr:nth-child(2) {
- background-color: rgba(241, 196, 15, 0.1);
- }
-
- div[data-testid="stDataFrame"] tbody tr:nth-child(5) {
- background-color: rgba(231, 76, 60, 0.1);
- }
-
- /* 为data_editor添加差异指示器 */
- .data-editor-wrapper {
- position: relative;
- }
-
- .diff-indicator-overlay {
- position: absolute;
- top: 0;
- left: 0;
- width: 4px;
- height: 100%;
- pointer-events: none;
- z-index: 1000;
- }
-
- .indicator-added { background-color: #16a085; }
- .indicator-removed { background-color: #e74c3c; }
- .indicator-modified { background-color: #f1c40f; }
-
- /* 滚动条样式 */
- .diff-content::-webkit-scrollbar,
- div[data-testid="stDataFrame"] ::-webkit-scrollbar {
- width: 12px;
- height: 12px;
- }
-
- .diff-content::-webkit-scrollbar-track,
- div[data-testid="stDataFrame"] ::-webkit-scrollbar-track {
- background: #2d2d30;
- }
-
- .diff-content::-webkit-scrollbar-thumb,
- div[data-testid="stDataFrame"] ::-webkit-scrollbar-thumb {
- background: #555555;
- border-radius: 6px;
- }
-
- .diff-content::-webkit-scrollbar-thumb:hover,
- div[data-testid="stDataFrame"] ::-webkit-scrollbar-thumb:hover {
- background: #777777;
- }
- </style>
- """, unsafe_allow_html=True)
-
- def _create_editable_diff_view(self, row_diffs: List[RowDiff]):
- """创建可编辑的差异视图"""
- st.subheader("🔍 并排对比")
-
- left_col, right_col = st.columns(2)
-
- with left_col:
- st.markdown("### ✏️ 可编辑版本 (左侧)")
-
- # 方案1: 使用标记版本的DataFrame
- self._create_marked_data_editor(st.session_state.df_edited, row_diffs)
-
- with right_col:
- st.markdown("### 📝 原始版本 (右侧)")
- self._create_diff_table(row_diffs, 'right')
-
- def _create_marked_data_editor(self, df: pd.DataFrame, row_diffs: List[RowDiff]):
- """创建带标记的data_editor(方案1:在数据中添加标记)"""
-
- # 创建带差异标记的DataFrame
- marked_df = self._create_marked_dataframe(df, row_diffs)
-
- # 创建列配置
- column_config = self._create_marked_column_config(marked_df, row_diffs)
-
- # 显示data_editor
- edited_df = st.data_editor(
- marked_df,
- height=500,
- use_container_width=True,
- num_rows="dynamic",
- column_config=column_config,
- key="marked_diff_editor",
- hide_index=False
- )
-
- # 移除标记列,恢复原始数据格式
- cleaned_df = self._clean_marked_dataframe(edited_df)
-
- # 检测数据变化并更新
- if not cleaned_df.equals(st.session_state.df_edited):
- st.session_state.df_edited = cleaned_df.copy()
- st.rerun()
-
- def _create_marked_dataframe(self, df: pd.DataFrame, row_diffs: List[RowDiff]) -> pd.DataFrame:
- """创建带差异标记的DataFrame"""
- marked_df = df.copy()
-
- # 添加差异标记列
- marked_df.insert(0, '🎨 差异类型', '')
-
- # 根据差异类型为每行添加标记
- for row_diff in row_diffs:
- if row_diff.row_right is not None and row_diff.row_right < len(marked_df):
- if row_diff.change_type == ChangeType.ADDED:
- marked_df.iloc[row_diff.row_right, 0] = '🟢 新增'
- elif row_diff.change_type == ChangeType.REMOVED:
- marked_df.iloc[row_diff.row_right, 0] = '🔴 删除'
- elif row_diff.change_type == ChangeType.MODIFIED:
- marked_df.iloc[row_diff.row_right, 0] = '🟡 修改'
- else:
- marked_df.iloc[row_diff.row_right, 0] = '⚪ 未变'
-
- return marked_df
-
- def _create_marked_column_config(self, marked_df: pd.DataFrame, row_diffs: List[RowDiff]) -> Dict:
- """为带标记的DataFrame创建列配置"""
- config = {}
-
- # 差异类型列配置
- config['🎨 差异类型'] = st.column_config.SelectboxColumn(
- '🎨 差异类型',
- help="行的差异类型",
- options=['⚪ 未变', '🟡 修改', '🟢 新增', '🔴 删除'],
- disabled=True, # 只读
- width="small"
- )
-
- # 其他列配置
- for col in marked_df.columns[1:]: # 跳过差异标记列
- if marked_df[col].dtype in ['int64', 'float64']:
- config[col] = st.column_config.NumberColumn(
- col,
- help=f"数值列: {col}",
- format="%.2f" if marked_df[col].dtype == 'float64' else "%d"
- )
- elif marked_df[col].dtype == 'bool':
- config[col] = st.column_config.CheckboxColumn(
- col,
- help=f"布尔列: {col}"
- )
- else:
- config[col] = st.column_config.TextColumn(
- col,
- help=f"文本列: {col}",
- max_chars=100
- )
-
- return config
-
- def _clean_marked_dataframe(self, marked_df: pd.DataFrame) -> pd.DataFrame:
- """移除标记列,恢复原始DataFrame格式"""
- return marked_df.drop(columns=['🎨 差异类型'])
-
- def create_diff_view(self):
- """创建差异对比视图"""
- st.title("📊 VSCode风格 DataFrame 差异对比")
- st.markdown("---")
-
- # 初始化数据
- if 'df_original' not in st.session_state:
- st.session_state.df_original = self._create_sample_data()
-
- if 'df_edited' not in st.session_state:
- st.session_state.df_edited = st.session_state.df_original.copy()
-
- # 控制面板
- self._create_control_panel()
-
- # 计算差异
- row_diffs = self.diff_algorithm.compute_diff(
- st.session_state.df_original,
- st.session_state.df_edited
- )
-
- # 显示统计信息
- self._display_diff_statistics(row_diffs)
-
- # 主要对比区域
- self._create_main_diff_view(row_diffs)
-
- # 详细差异列表
- self._create_detailed_diff_view(row_diffs)
-
- def _create_sample_data(self) -> pd.DataFrame:
- """创建示例数据"""
- return pd.DataFrame({
- 'ID': [1, 2, 3, 4, 5],
- 'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
- 'Age': [25, 30, 35, 40, 45],
- 'City': ['New York', 'London', 'Paris', 'Tokyo', 'Sydney'],
- 'Salary': [50000, 60000, 70000, 80000, 90000]
- })
-
- def _create_control_panel(self):
- """创建控制面板"""
- with st.expander("🎛️ 控制面板", expanded=True):
- col1, col2, col3, col4 = st.columns(4)
-
- with col1:
- if st.button("🔄 重置数据"):
- st.session_state.df_original = self._create_sample_data()
- st.session_state.df_edited = st.session_state.df_original.copy()
- st.rerun()
-
- with col2:
- if st.button("🎲 生成随机差异"):
- st.session_state.df_edited = self._create_random_diff()
- st.rerun()
-
- with col3:
- similarity_threshold = st.slider(
- "相似度阈值",
- min_value=0.1,
- max_value=1.0,
- value=0.7,
- step=0.1
- )
- self.diff_algorithm.similarity_threshold = similarity_threshold
-
- with col4:
- auto_scroll = st.checkbox("🔗 同步滚动", value=True)
-
- def _create_random_diff(self) -> pd.DataFrame:
- """创建随机差异用于演示"""
- df = st.session_state.df_original.copy()
-
- # 修改一些值
- df.loc[1, 'Name'] = 'Robert'
- df.loc[2, 'Age'] = 36
- df.loc[3, 'City'] = 'Berlin'
-
- # 删除一行
- df = df.drop(index=4)
-
- # 新增一行
- new_row = pd.DataFrame({
- 'ID': [6], 'Name': ['Frank'], 'Age': [28],
- 'City': ['Madrid'], 'Salary': [55000]
- })
- df = pd.concat([df, new_row], ignore_index=True)
-
- return df
-
- def _display_diff_statistics(self, row_diffs: List[RowDiff]):
- """显示差异统计"""
- stats = self._compute_diff_stats(row_diffs)
-
- col1, col2, col3, col4, col5 = st.columns(5)
-
- with col1:
- st.metric("总行数", stats['total_rows'])
- with col2:
- st.metric("🟢 新增", stats['added_rows'], delta=stats['added_rows'])
- with col3:
- st.metric("🔴 删除", stats['removed_rows'], delta=-stats['removed_rows'])
- with col4:
- st.metric("🟡 修改", stats['modified_rows'])
- with col5:
- st.metric("⚪ 未变", stats['unchanged_rows'])
-
- def _compute_diff_stats(self, row_diffs: List[RowDiff]) -> Dict:
- """计算差异统计"""
- stats = {
- 'total_rows': len(row_diffs),
- 'added_rows': 0,
- 'removed_rows': 0,
- 'modified_rows': 0,
- 'unchanged_rows': 0
- }
-
- for row_diff in row_diffs:
- if row_diff.change_type == ChangeType.ADDED:
- stats['added_rows'] += 1
- elif row_diff.change_type == ChangeType.REMOVED:
- stats['removed_rows'] += 1
- elif row_diff.change_type == ChangeType.MODIFIED:
- stats['modified_rows'] += 1
- else:
- stats['unchanged_rows'] += 1
-
- return stats
-
- def _create_main_diff_view(self, row_diffs: List[RowDiff]):
- """创建主要差异视图"""
- self._create_editable_diff_view(row_diffs)
- def _create_diff_table(self, row_diffs: List[RowDiff], side: str):
- """创建带差异高亮的表格"""
- df = st.session_state.df_original if side == 'left' else st.session_state.df_edited
-
- # 构建HTML表格
- html_table = self._build_diff_html_table(row_diffs, side, df)
-
- # 显示表格
- st.markdown(f"""
- <div class="vscode-diff-container">
- <div class="diff-header">
- {"原始版本" if side == 'left' else "编辑版本"}
- ({len(df)} 行)
- </div>
- <div class="diff-content">
- {html_table}
- </div>
- </div>
- """, unsafe_allow_html=True)
-
- def _build_diff_html_table(self, row_diffs: List[RowDiff], side: str, df: pd.DataFrame) -> str:
- """构建差异HTML表格"""
- if df.empty:
- return "<p>数据为空</p>"
-
- html = '<table class="diff-table">'
-
- # 表头
- html += '<thead><tr><th class="line-number">#</th>'
- for col in df.columns:
- html += f'<th>{col}</th>'
- html += '</tr></thead><tbody>'
-
- # 根据差异类型构建行
- for i, row_diff in enumerate(row_diffs):
- row_idx = row_diff.row_left if side == 'left' else row_diff.row_right
-
- if row_idx is None:
- continue # 该侧没有对应行
-
- if row_idx >= len(df):
- continue # 超出范围
-
- # 确定行样式
- row_class = self._get_row_css_class(row_diff.change_type)
-
- html += f'<tr class="{row_class}">'
- html += f'<td class="line-number">{row_idx + 1}</td>'
-
- # 构建单元格
- for col in df.columns:
- cell_class = row_class
- cell_value = df.iloc[row_idx][col]
-
- # 如果是修改的行,检查单元格级别的差异
- if row_diff.change_type == ChangeType.MODIFIED and row_diff.cell_diffs:
- cell_diff = next((cd for cd in row_diff.cell_diffs if cd.column == col), None)
- if cell_diff:
- cell_class = self._get_cell_css_class(cell_diff.change_type)
-
- display_value = str(cell_value) if not pd.isna(cell_value) else ""
- html += f'<td class="{cell_class}">{display_value}</td>'
-
- html += '</tr>'
-
- html += '</tbody></table>'
- return html
-
- def _get_row_css_class(self, change_type: ChangeType) -> str:
- """获取行的CSS类"""
- mapping = {
- ChangeType.ADDED: "diff-added",
- ChangeType.REMOVED: "diff-removed",
- ChangeType.MODIFIED: "diff-modified",
- ChangeType.UNCHANGED: "diff-unchanged"
- }
- return mapping.get(change_type, "diff-unchanged")
-
- def _get_cell_css_class(self, change_type: ChangeType) -> str:
- """获取单元格的CSS类"""
- return self._get_row_css_class(change_type)
-
- def _create_detailed_diff_view(self, row_diffs: List[RowDiff]):
- """创建详细差异视图"""
- st.markdown("---")
- st.subheader("📋 详细差异列表")
-
- # 筛选选项
- change_types = st.multiselect(
- "显示的变更类型",
- [ct.value for ct in ChangeType],
- default=[ChangeType.ADDED.value, ChangeType.REMOVED.value, ChangeType.MODIFIED.value]
- )
-
- filtered_diffs = [
- rd for rd in row_diffs
- if rd.change_type.value in change_types
- ]
-
- if not filtered_diffs:
- st.info("没有符合条件的差异")
- return
-
- # 显示差异详情
- for i, row_diff in enumerate(filtered_diffs):
- with st.expander(f"差异 {i+1}: {row_diff.change_type.value.upper()}", expanded=False):
- self._display_row_diff_details(row_diff)
-
- def _display_row_diff_details(self, row_diff: RowDiff):
- """显示行差异详情"""
- col1, col2 = st.columns(2)
-
- with col1:
- st.write("**位置信息:**")
- st.write(f"- 左侧行: {row_diff.row_left}")
- st.write(f"- 右侧行: {row_diff.row_right}")
- st.write(f"- 变更类型: {row_diff.change_type.value}")
- st.write(f"- 相似度: {row_diff.similarity:.2%}")
-
- with col2:
- if row_diff.cell_diffs:
- st.write("**单元格差异:**")
- cell_diff_data = []
- for cell_diff in row_diff.cell_diffs:
- if cell_diff.change_type != ChangeType.UNCHANGED:
- cell_diff_data.append({
- '列': cell_diff.column,
- '变更类型': cell_diff.change_type.value,
- '原值': str(cell_diff.old_value),
- '新值': str(cell_diff.new_value)
- })
-
- if cell_diff_data:
- st.dataframe(pd.DataFrame(cell_diff_data), use_container_width=True)
- def main():
- """主函数"""
- diff_viewer = VSCodeStyleDataFrameDiff()
- diff_viewer.create_diff_view()
- if __name__ == "__main__":
- main()
|