import streamlit as st import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.express as px def create_dataframe_diff_visualizer(): st.title("📊 DataFrame可视化比对工具") st.markdown("---") # 初始化数据 if 'original_df' not in st.session_state: st.session_state.original_df = create_sample_data() if 'edited_df' not in st.session_state: st.session_state.edited_df = st.session_state.original_df.copy() # 控制面板 with st.expander("🎛️ 控制面板", expanded=True): col1, col2, col3, col4 = st.columns(4) with col1: if st.button("🔄 重置数据", type="secondary"): st.session_state.original_df = create_sample_data() st.session_state.edited_df = st.session_state.original_df.copy() st.rerun() with col2: if st.button("🎲 生成随机差异", type="secondary"): st.session_state.edited_df = create_random_differences(st.session_state.original_df) st.rerun() with col3: sync_mode = st.checkbox("🔗 同步滚动", value=True) with col4: show_stats = st.checkbox("📈 显示统计", value=True) # 分析差异 diff_analysis = analyze_dataframe_differences( st.session_state.original_df, st.session_state.edited_df ) # 显示差异统计 if show_stats: display_diff_statistics(diff_analysis) # 主要比对区域 st.subheader("📝 数据比对") # 使用两列布局 left_col, right_col = st.columns(2) with left_col: st.markdown("### 📝 可编辑版本 (左侧)") # 可编辑的数据编辑器 edited_df = st.data_editor( st.session_state.edited_df, height=500, use_container_width=True, num_rows="dynamic", key="left_editor", column_config=create_column_config(st.session_state.edited_df) ) # 更新编辑后的数据 if not edited_df.equals(st.session_state.edited_df): st.session_state.edited_df = edited_df.copy() st.rerun() with right_col: st.markdown("### 📊 原始版本 (右侧)") # 显示带差异高亮的原始数据 display_dataframe_with_diff_highlighting( st.session_state.original_df, diff_analysis, "original" ) # 详细差异视图 st.markdown("---") create_detailed_diff_view(diff_analysis) def create_sample_data() -> pd.DataFrame: """创建示例数据""" np.random.seed(42) data = { 'ID': range(1, 21), 'Name': [f'Product_{i}' for i in range(1, 21)], 'Category': np.random.choice(['Electronics', 'Clothing', 'Food', 'Books'], 20), 'Price': np.round(np.random.uniform(10, 100, 20), 2), 'Stock': np.random.randint(0, 200, 20), 'Rating': np.round(np.random.uniform(1, 5, 20), 1), 'Active': np.random.choice([True, False], 20) } return pd.DataFrame(data) def create_random_differences(df: pd.DataFrame) -> pd.DataFrame: """创建随机差异用于演示""" modified_df = df.copy() # 随机修改一些单元格 num_changes = np.random.randint(5, 15) for _ in range(num_changes): row_idx = np.random.randint(0, len(modified_df)) col_idx = np.random.randint(1, len(modified_df.columns)) # 跳过ID列 col_name = modified_df.columns[col_idx] if col_name == 'Name': modified_df.loc[row_idx, col_name] = f'Modified_{row_idx}' elif col_name == 'Category': modified_df.loc[row_idx, col_name] = np.random.choice(['Modified_Cat', 'New_Category']) elif col_name == 'Price': modified_df.loc[row_idx, col_name] = np.round(np.random.uniform(10, 150), 2) elif col_name == 'Stock': modified_df.loc[row_idx, col_name] = np.random.randint(0, 300) elif col_name == 'Rating': modified_df.loc[row_idx, col_name] = np.round(np.random.uniform(1, 5), 1) elif col_name == 'Active': modified_df.loc[row_idx, col_name] = not modified_df.loc[row_idx, col_name] return modified_df def analyze_dataframe_differences(df1: pd.DataFrame, df2: pd.DataFrame) -> Dict: """分析两个DataFrame之间的差异""" # 确保两个DataFrame具有相同的形状和列 if df1.shape != df2.shape: st.warning("⚠️ 两个DataFrame的形状不匹配!") common_columns = list(set(df1.columns) & set(df2.columns)) differences = { 'cell_differences': [], 'added_rows': [], 'removed_rows': [], 'column_differences': { 'added_columns': list(set(df2.columns) - set(df1.columns)), 'removed_columns': list(set(df1.columns) - set(df2.columns)) }, 'summary': { 'total_differences': 0, 'modified_cells': 0, 'modified_rows': set(), 'modified_columns': set() } } # 比较相同大小的DataFrame min_rows = min(len(df1), len(df2)) for row_idx in range(min_rows): for col in common_columns: try: val1 = df1.iloc[row_idx][col] val2 = df2.iloc[row_idx][col] # 处理NaN值 if pd.isna(val1) and pd.isna(val2): continue if pd.isna(val1) or pd.isna(val2) or val1 != val2: differences['cell_differences'].append({ 'row': row_idx, 'column': col, 'original_value': val1, 'new_value': val2, 'change_type': determine_change_type(val1, val2) }) differences['summary']['modified_cells'] += 1 differences['summary']['modified_rows'].add(row_idx) differences['summary']['modified_columns'].add(col) except Exception as e: st.warning(f"比较时出错 (行{row_idx}, 列{col}): {e}") # 检查行数差异 if len(df1) > len(df2): differences['removed_rows'] = list(range(len(df2), len(df1))) elif len(df2) > len(df1): differences['added_rows'] = list(range(len(df1), len(df2))) differences['summary']['total_differences'] = ( differences['summary']['modified_cells'] + len(differences['added_rows']) + len(differences['removed_rows']) + len(differences['column_differences']['added_columns']) + len(differences['column_differences']['removed_columns']) ) return differences def determine_change_type(val1, val2) -> str: """确定变更类型""" if pd.isna(val1): return "added" elif pd.isna(val2): return "removed" else: return "modified" def create_column_config(df: pd.DataFrame) -> Dict: """为data_editor创建列配置""" config = {} for col in df.columns: if df[col].dtype in ['int64', 'float64']: config[col] = st.column_config.NumberColumn( col, help=f"数值列: {col}", format="%.2f" if df[col].dtype == 'float64' else "%d" ) elif df[col].dtype == 'bool': config[col] = st.column_config.CheckboxColumn( col, help=f"布尔列: {col}" ) else: config[col] = st.column_config.TextColumn( col, help=f"文本列: {col}", max_chars=100 ) return config def display_dataframe_with_diff_highlighting(df: pd.DataFrame, diff_analysis: Dict, view_type: str): """显示带差异高亮的DataFrame""" # 创建样式化的HTML表格 html_table = create_styled_diff_table(df, diff_analysis, view_type) # 自定义CSS样式 st.markdown(""" """, unsafe_allow_html=True) # 显示表格 st.markdown(f'
{html_table}
', unsafe_allow_html=True) def create_styled_diff_table(df: pd.DataFrame, diff_analysis: Dict, view_type: str) -> str: """创建带样式的差异表格HTML""" # 创建差异映射 diff_map = {} for diff in diff_analysis['cell_differences']: key = (diff['row'], diff['column']) diff_map[key] = diff # 开始构建HTML html = '' # 表头 html += '' for col in df.columns: html += f'' html += '' # 表格行 for row_idx in range(len(df)): row_class = "row-highlight" if row_idx in diff_analysis['summary']['modified_rows'] else "" html += f'' for col in df.columns: value = df.iloc[row_idx][col] cell_key = (row_idx, col) # 确定单元格样式 cell_class = "" marker_class = "" if cell_key in diff_map: diff_info = diff_map[cell_key] change_type = diff_info['change_type'] if change_type == "modified": cell_class = "cell-modified" marker_class = "marker-modified" elif change_type == "added": cell_class = "cell-added" marker_class = "marker-added" elif change_type == "removed": cell_class = "cell-removed" marker_class = "marker-removed" # 处理值显示 display_value = str(value) if not pd.isna(value) else "" # 构建单元格HTML cell_html = f'' html += cell_html html += '' html += '
{col}
' if marker_class: cell_html += f'
' cell_html += display_value cell_html += '
' return html def display_diff_statistics(diff_analysis: Dict): """显示差异统计信息""" st.subheader("📈 差异统计") col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric( "总差异数", diff_analysis['summary']['total_differences'], help="所有类型的差异总数" ) with col2: st.metric( "修改的单元格", diff_analysis['summary']['modified_cells'], help="被修改的单元格数量" ) with col3: st.metric( "影响的行数", len(diff_analysis['summary']['modified_rows']), help="包含差异的行数" ) with col4: st.metric( "影响的列数", len(diff_analysis['summary']['modified_columns']), help="包含差异的列数" ) with col5: added_rows = len(diff_analysis['added_rows']) removed_rows = len(diff_analysis['removed_rows']) row_diff = added_rows - removed_rows st.metric( "行数变化", f"+{added_rows}/-{removed_rows}", delta=row_diff if row_diff != 0 else None ) def create_detailed_diff_view(diff_analysis: Dict): """创建详细的差异视图""" st.subheader("🔍 详细差异分析") if diff_analysis['summary']['total_differences'] == 0: st.success("✅ 没有发现任何差异!") return # 差异类型选择器 diff_types = [] if diff_analysis['cell_differences']: diff_types.append("单元格差异") if diff_analysis['added_rows']: diff_types.append("新增行") if diff_analysis['removed_rows']: diff_types.append("删除行") if diff_analysis['column_differences']['added_columns']: diff_types.append("新增列") if diff_analysis['column_differences']['removed_columns']: diff_types.append("删除列") selected_diff_type = st.selectbox("选择要查看的差异类型", diff_types) # 显示相应的差异详情 if selected_diff_type == "单元格差异": display_cell_differences(diff_analysis['cell_differences']) elif selected_diff_type == "新增行": st.info(f"新增了 {len(diff_analysis['added_rows'])} 行: {diff_analysis['added_rows']}") elif selected_diff_type == "删除行": st.warning(f"删除了 {len(diff_analysis['removed_rows'])} 行: {diff_analysis['removed_rows']}") elif selected_diff_type == "新增列": st.info(f"新增了列: {diff_analysis['column_differences']['added_columns']}") elif selected_diff_type == "删除列": st.warning(f"删除了列: {diff_analysis['column_differences']['removed_columns']}") def display_cell_differences(cell_differences: List[Dict]): """显示单元格差异详情""" if not cell_differences: return st.write(f"共发现 {len(cell_differences)} 个单元格差异:") # 创建差异DataFrame用于显示 diff_data = [] for diff in cell_differences: diff_data.append({ '位置': f"行{diff['row']}, 列{diff['column']}", '列名': diff['column'], '原始值': diff['original_value'], '新值': diff['new_value'], '变更类型': diff['change_type'] }) diff_df = pd.DataFrame(diff_data) # 使用颜色编码的表格 st.dataframe( diff_df, use_container_width=True, height=300, column_config={ '位置': st.column_config.TextColumn('位置', help='差异的具体位置'), '列名': st.column_config.TextColumn('列名'), '原始值': st.column_config.TextColumn('原始值'), '新值': st.column_config.TextColumn('新值'), '变更类型': st.column_config.TextColumn('变更类型') } ) # 导出差异报告 if st.button("📥 导出差异报告"): csv_data = diff_df.to_csv(index=False) st.download_button( label="下载CSV格式差异报告", data=csv_data, file_name="dataframe_diff_report.csv", mime="text/csv" ) def create_plotly_diff_heatmap(diff_analysis: Dict, df_shape: Tuple[int, int]): """创建差异热力图""" if not diff_analysis['cell_differences']: return None # 创建差异矩阵 diff_matrix = np.zeros(df_shape) for diff in diff_analysis['cell_differences']: row, col = diff['row'], df_shape[1] - 1 # 简化处理 if diff['change_type'] == 'modified': diff_matrix[row, col] = 1 elif diff['change_type'] == 'added': diff_matrix[row, col] = 2 elif diff['change_type'] == 'removed': diff_matrix[row, col] = 3 fig = go.Figure(data=go.Heatmap( z=diff_matrix, colorscale=[[0, 'white'], [0.33, 'yellow'], [0.66, 'green'], [1, 'red']], showscale=True, colorbar=dict( title="差异类型", tickmode="array", tickvals=[0, 1, 2, 3], ticktext=["无差异", "修改", "新增", "删除"] ) )) fig.update_layout( title="DataFrame差异热力图", xaxis_title="列", yaxis_title="行", height=400 ) return fig # 主函数 def main(): create_dataframe_diff_visualizer() # 可选:添加热力图视图 if st.checkbox("🔥 显示差异热力图"): if 'original_df' in st.session_state and 'edited_df' in st.session_state: diff_analysis = analyze_dataframe_differences( st.session_state.original_df, st.session_state.edited_df ) heatmap_fig = create_plotly_diff_heatmap( diff_analysis, st.session_state.original_df.shape ) if heatmap_fig: st.plotly_chart(heatmap_fig, use_container_width=True) if __name__ == "__main__": main()