| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import pandas as pd
- import json
- from typing import List, Dict, Any, Tuple
- import os
- class DataManager:
- """统一管理数据加载、验证和预处理"""
- # 字段映射(支持多种别名)
- FIELD_MAPPING = {
- 'txId': ['txId', 'transaction_id', '交易ID', 'id'],
- 'txDate': ['txDate', 'transaction_date', '交易日期', 'date'],
- 'txTime': ['txTime', 'transaction_time', '交易时间', 'time'],
- 'txAmount': ['txAmount', 'amount', '交易金额', '金额'],
- 'txBalance': ['txBalance', 'balance', '余额', '交易后余额'],
- 'txDirection': ['txDirection', 'direction', '交易方向', '收支'],
- 'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'],
- 'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'],
- 'createdAt': ['createdAt', 'created_at', '创建时间']
- }
- # 必需字段
- REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection']
- @staticmethod
- def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]:
- """从JSON文件加载数据,支持字段别名"""
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"数据文件不存在: {file_path}")
- with open(file_path, 'r', encoding='utf-8') as f:
- raw_data = json.load(f)
- if not isinstance(raw_data, list):
- raise ValueError("JSON文件内容必须是数组格式")
- if not raw_data:
- raise ValueError("数据文件为空")
- # 标准化字段名
- standardized_data = []
- for record in raw_data:
- standardized_record = {}
- for std_field, possible_names in DataManager.FIELD_MAPPING.items():
- for name in possible_names:
- if name in record:
- standardized_record[std_field] = record[name]
- break
- # 保留原始数据中未映射的字段
- for key, value in record.items():
- if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]:
- standardized_record[key] = value
- standardized_data.append(standardized_record)
- # 转换为DataFrame并优化
- df = pd.DataFrame(standardized_data)
- df = DataManager._optimize_dataframe(df)
- return standardized_data, df
- @staticmethod
- def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
- """优化DataFrame数据类型"""
- # 日期字段
- if 'txDate' in df.columns:
- df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date
- # 时间字段
- if 'txTime' in df.columns:
- df['txTime'] = df['txTime'].astype(str)
- # 金额字段
- for col in ['txAmount', 'txBalance']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- # 创建时间
- if 'createdAt' in df.columns:
- df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce')
- # 分类字段
- if 'txDirection' in df.columns:
- df['txDirection'] = df['txDirection'].astype('category')
- return df
- @staticmethod
- def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
- """验证数据格式"""
- errors = []
- if not data:
- return False, ["数据集为空"]
- # 检查必需字段
- first_record = data[0]
- missing_fields = []
- for field in DataManager.REQUIRED_FIELDS:
- if field not in first_record:
- missing_fields.append(field)
- if missing_fields:
- errors.append(f"缺少必需字段: {', '.join(missing_fields)}")
- return len(errors) == 0, errors
- @staticmethod
- def format_data_summary(data: List[Dict[str, Any]]) -> str:
- """生成数据摘要"""
- if not data:
- return "数据集为空"
- df = pd.DataFrame(data)
- summary = []
- summary.append(f"记录总数: {len(data)}")
- if 'txDate' in df.columns:
- summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}")
- if 'txAmount' in df.columns:
- summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}")
- if 'txDirection' in df.columns:
- direction_counts = df['txDirection'].value_counts().to_dict()
- summary.append(f"收支分布: {direction_counts}")
- return " | ".join(summary)
|