data_manager.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import pandas as pd
  2. import json
  3. from typing import List, Dict, Any, Tuple
  4. import os
  5. class DataManager:
  6. """统一管理数据加载、验证和预处理"""
  7. # 字段映射(支持多种别名)
  8. FIELD_MAPPING = {
  9. 'txId': ['txId', 'transaction_id', '交易ID', 'id'],
  10. 'txDate': ['txDate', 'transaction_date', '交易日期', 'date'],
  11. 'txTime': ['txTime', 'transaction_time', '交易时间', 'time'],
  12. 'txAmount': ['txAmount', 'amount', '交易金额', '金额'],
  13. 'txBalance': ['txBalance', 'balance', '余额', '交易后余额'],
  14. 'txDirection': ['txDirection', 'direction', '交易方向', '收支'],
  15. 'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'],
  16. 'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'],
  17. 'createdAt': ['createdAt', 'created_at', '创建时间']
  18. }
  19. # 必需字段
  20. REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection']
  21. @staticmethod
  22. def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]:
  23. """从JSON文件加载数据,支持字段别名"""
  24. if not os.path.exists(file_path):
  25. raise FileNotFoundError(f"数据文件不存在: {file_path}")
  26. with open(file_path, 'r', encoding='utf-8') as f:
  27. raw_data = json.load(f)
  28. if not isinstance(raw_data, list):
  29. raise ValueError("JSON文件内容必须是数组格式")
  30. if not raw_data:
  31. raise ValueError("数据文件为空")
  32. # 标准化字段名
  33. standardized_data = []
  34. for record in raw_data:
  35. standardized_record = {}
  36. for std_field, possible_names in DataManager.FIELD_MAPPING.items():
  37. for name in possible_names:
  38. if name in record:
  39. standardized_record[std_field] = record[name]
  40. break
  41. # 保留原始数据中未映射的字段
  42. for key, value in record.items():
  43. if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]:
  44. standardized_record[key] = value
  45. standardized_data.append(standardized_record)
  46. # 转换为DataFrame并优化
  47. df = pd.DataFrame(standardized_data)
  48. df = DataManager._optimize_dataframe(df)
  49. return standardized_data, df
  50. @staticmethod
  51. def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
  52. """优化DataFrame数据类型"""
  53. # 日期字段
  54. if 'txDate' in df.columns:
  55. df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date
  56. # 时间字段
  57. if 'txTime' in df.columns:
  58. df['txTime'] = df['txTime'].astype(str)
  59. # 金额字段
  60. for col in ['txAmount', 'txBalance']:
  61. if col in df.columns:
  62. df[col] = pd.to_numeric(df[col], errors='coerce')
  63. # 创建时间
  64. if 'createdAt' in df.columns:
  65. df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce')
  66. # 分类字段
  67. if 'txDirection' in df.columns:
  68. df['txDirection'] = df['txDirection'].astype('category')
  69. return df
  70. @staticmethod
  71. def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
  72. """验证数据格式"""
  73. errors = []
  74. if not data:
  75. return False, ["数据集为空"]
  76. # 检查必需字段
  77. first_record = data[0]
  78. missing_fields = []
  79. for field in DataManager.REQUIRED_FIELDS:
  80. if field not in first_record:
  81. missing_fields.append(field)
  82. if missing_fields:
  83. errors.append(f"缺少必需字段: {', '.join(missing_fields)}")
  84. return len(errors) == 0, errors
  85. @staticmethod
  86. def format_data_summary(data: List[Dict[str, Any]]) -> str:
  87. """生成数据摘要"""
  88. if not data:
  89. return "数据集为空"
  90. df = pd.DataFrame(data)
  91. summary = []
  92. summary.append(f"记录总数: {len(data)}")
  93. if 'txDate' in df.columns:
  94. summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}")
  95. if 'txAmount' in df.columns:
  96. summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}")
  97. if 'txDirection' in df.columns:
  98. direction_counts = df['txDirection'].value_counts().to_dict()
  99. summary.append(f"收支分布: {direction_counts}")
  100. return " | ".join(summary)