import streamlit as st import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.express as px def create_dataframe_diff_visualizer(): st.title("📊 DataFrame可视化比对工具") st.markdown("---") # 初始化数据 if 'original_df' not in st.session_state: st.session_state.original_df = create_sample_data() if 'edited_df' not in st.session_state: st.session_state.edited_df = st.session_state.original_df.copy() # 控制面板 with st.expander("🎛️ 控制面板", expanded=True): col1, col2, col3, col4 = st.columns(4) with col1: if st.button("🔄 重置数据", type="secondary"): st.session_state.original_df = create_sample_data() st.session_state.edited_df = st.session_state.original_df.copy() st.rerun() with col2: if st.button("🎲 生成随机差异", type="secondary"): st.session_state.edited_df = create_random_differences(st.session_state.original_df) st.rerun() with col3: sync_mode = st.checkbox("🔗 同步滚动", value=True) with col4: show_stats = st.checkbox("📈 显示统计", value=True) # 分析差异 diff_analysis = analyze_dataframe_differences( st.session_state.original_df, st.session_state.edited_df ) # 显示差异统计 if show_stats: display_diff_statistics(diff_analysis) # 主要比对区域 st.subheader("📝 数据比对") # 使用两列布局 left_col, right_col = st.columns(2) with left_col: st.markdown("### 📝 可编辑版本 (左侧)") # 可编辑的数据编辑器 edited_df = st.data_editor( st.session_state.edited_df, height=500, use_container_width=True, num_rows="dynamic", key="left_editor", column_config=create_column_config(st.session_state.edited_df) ) # 更新编辑后的数据 if not edited_df.equals(st.session_state.edited_df): st.session_state.edited_df = edited_df.copy() st.rerun() with right_col: st.markdown("### 📊 原始版本 (右侧)") # 显示带差异高亮的原始数据 display_dataframe_with_diff_highlighting( st.session_state.original_df, diff_analysis, "original" ) # 详细差异视图 st.markdown("---") create_detailed_diff_view(diff_analysis) def create_sample_data() -> pd.DataFrame: """创建示例数据""" np.random.seed(42) data = { 'ID': range(1, 21), 'Name': [f'Product_{i}' for i in range(1, 21)], 'Category': np.random.choice(['Electronics', 'Clothing', 'Food', 'Books'], 20), 'Price': np.round(np.random.uniform(10, 100, 20), 2), 'Stock': np.random.randint(0, 200, 20), 'Rating': np.round(np.random.uniform(1, 5, 20), 1), 'Active': np.random.choice([True, False], 20) } return pd.DataFrame(data) def create_random_differences(df: pd.DataFrame) -> pd.DataFrame: """创建随机差异用于演示""" modified_df = df.copy() # 随机修改一些单元格 num_changes = np.random.randint(5, 15) for _ in range(num_changes): row_idx = np.random.randint(0, len(modified_df)) col_idx = np.random.randint(1, len(modified_df.columns)) # 跳过ID列 col_name = modified_df.columns[col_idx] if col_name == 'Name': modified_df.loc[row_idx, col_name] = f'Modified_{row_idx}' elif col_name == 'Category': modified_df.loc[row_idx, col_name] = np.random.choice(['Modified_Cat', 'New_Category']) elif col_name == 'Price': modified_df.loc[row_idx, col_name] = np.round(np.random.uniform(10, 150), 2) elif col_name == 'Stock': modified_df.loc[row_idx, col_name] = np.random.randint(0, 300) elif col_name == 'Rating': modified_df.loc[row_idx, col_name] = np.round(np.random.uniform(1, 5), 1) elif col_name == 'Active': modified_df.loc[row_idx, col_name] = not modified_df.loc[row_idx, col_name] return modified_df def analyze_dataframe_differences(df1: pd.DataFrame, df2: pd.DataFrame) -> Dict: """分析两个DataFrame之间的差异""" # 确保两个DataFrame具有相同的形状和列 if df1.shape != df2.shape: st.warning("⚠️ 两个DataFrame的形状不匹配!") common_columns = list(set(df1.columns) & set(df2.columns)) differences = { 'cell_differences': [], 'added_rows': [], 'removed_rows': [], 'column_differences': { 'added_columns': list(set(df2.columns) - set(df1.columns)), 'removed_columns': list(set(df1.columns) - set(df2.columns)) }, 'summary': { 'total_differences': 0, 'modified_cells': 0, 'modified_rows': set(), 'modified_columns': set() } } # 比较相同大小的DataFrame min_rows = min(len(df1), len(df2)) for row_idx in range(min_rows): for col in common_columns: try: val1 = df1.iloc[row_idx][col] val2 = df2.iloc[row_idx][col] # 处理NaN值 if pd.isna(val1) and pd.isna(val2): continue if pd.isna(val1) or pd.isna(val2) or val1 != val2: differences['cell_differences'].append({ 'row': row_idx, 'column': col, 'original_value': val1, 'new_value': val2, 'change_type': determine_change_type(val1, val2) }) differences['summary']['modified_cells'] += 1 differences['summary']['modified_rows'].add(row_idx) differences['summary']['modified_columns'].add(col) except Exception as e: st.warning(f"比较时出错 (行{row_idx}, 列{col}): {e}") # 检查行数差异 if len(df1) > len(df2): differences['removed_rows'] = list(range(len(df2), len(df1))) elif len(df2) > len(df1): differences['added_rows'] = list(range(len(df1), len(df2))) differences['summary']['total_differences'] = ( differences['summary']['modified_cells'] + len(differences['added_rows']) + len(differences['removed_rows']) + len(differences['column_differences']['added_columns']) + len(differences['column_differences']['removed_columns']) ) return differences def determine_change_type(val1, val2) -> str: """确定变更类型""" if pd.isna(val1): return "added" elif pd.isna(val2): return "removed" else: return "modified" def create_column_config(df: pd.DataFrame) -> Dict: """为data_editor创建列配置""" config = {} for col in df.columns: if df[col].dtype in ['int64', 'float64']: config[col] = st.column_config.NumberColumn( col, help=f"数值列: {col}", format="%.2f" if df[col].dtype == 'float64' else "%d" ) elif df[col].dtype == 'bool': config[col] = st.column_config.CheckboxColumn( col, help=f"布尔列: {col}" ) else: config[col] = st.column_config.TextColumn( col, help=f"文本列: {col}", max_chars=100 ) return config def display_dataframe_with_diff_highlighting(df: pd.DataFrame, diff_analysis: Dict, view_type: str): """显示带差异高亮的DataFrame""" # 创建样式化的HTML表格 html_table = create_styled_diff_table(df, diff_analysis, view_type) # 自定义CSS样式 st.markdown(""" """, unsafe_allow_html=True) # 显示表格 st.markdown(f'
| {col} | ' html += '
|---|
| ' if marker_class: cell_html += f'' cell_html += display_value cell_html += ' | ' html += cell_html html += '