import pandas as pd import json from typing import List, Dict, Any, Tuple import os class DataManager: """统一管理数据加载、验证和预处理""" # 字段映射(支持多种别名) FIELD_MAPPING = { 'txId': ['txId', 'transaction_id', '交易ID', 'id'], 'txDate': ['txDate', 'transaction_date', '交易日期', 'date'], 'txTime': ['txTime', 'transaction_time', '交易时间', 'time'], 'txAmount': ['txAmount', 'amount', '交易金额', '金额'], 'txBalance': ['txBalance', 'balance', '余额', '交易后余额'], 'txDirection': ['txDirection', 'direction', '交易方向', '收支'], 'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'], 'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'], 'createdAt': ['createdAt', 'created_at', '创建时间'] } # 必需字段 REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection'] @staticmethod def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]: """从JSON文件加载数据,支持字段别名""" if not os.path.exists(file_path): raise FileNotFoundError(f"数据文件不存在: {file_path}") with open(file_path, 'r', encoding='utf-8') as f: raw_data = json.load(f) if not isinstance(raw_data, list): raise ValueError("JSON文件内容必须是数组格式") if not raw_data: raise ValueError("数据文件为空") # 标准化字段名 standardized_data = [] for record in raw_data: standardized_record = {} for std_field, possible_names in DataManager.FIELD_MAPPING.items(): for name in possible_names: if name in record: standardized_record[std_field] = record[name] break # 保留原始数据中未映射的字段 for key, value in record.items(): if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]: standardized_record[key] = value standardized_data.append(standardized_record) # 转换为DataFrame并优化 df = pd.DataFrame(standardized_data) df = DataManager._optimize_dataframe(df) return standardized_data, df @staticmethod def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame: """优化DataFrame数据类型""" # 日期字段 if 'txDate' in df.columns: df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date # 时间字段 if 'txTime' in df.columns: df['txTime'] = df['txTime'].astype(str) # 金额字段 for col in ['txAmount', 'txBalance']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # 创建时间 if 'createdAt' in df.columns: df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce') # 分类字段 if 'txDirection' in df.columns: df['txDirection'] = df['txDirection'].astype('category') return df @staticmethod def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]: """验证数据格式""" errors = [] if not data: return False, ["数据集为空"] # 检查必需字段 first_record = data[0] missing_fields = [] for field in DataManager.REQUIRED_FIELDS: if field not in first_record: missing_fields.append(field) if missing_fields: errors.append(f"缺少必需字段: {', '.join(missing_fields)}") return len(errors) == 0, errors @staticmethod def format_data_summary(data: List[Dict[str, Any]]) -> str: """生成数据摘要""" if not data: return "数据集为空" df = pd.DataFrame(data) summary = [] summary.append(f"记录总数: {len(data)}") if 'txDate' in df.columns: summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}") if 'txAmount' in df.columns: summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}") if 'txDirection' in df.columns: direction_counts = df['txDirection'].value_counts().to_dict() summary.append(f"收支分布: {direction_counts}") return " | ".join(summary)