data_manager.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. import pandas as pd
  2. import json
  3. from typing import List, Dict, Any, Tuple
  4. import os
  5. import csv
  6. import io
  7. class DataManager:
  8. """统一管理数据加载、验证和预处理"""
  9. # 字段映射(支持多种别名)
  10. FIELD_MAPPING = {
  11. 'txId': ['txId', 'transaction_id', '交易ID', 'id'],
  12. 'txDate': ['txDate', 'transaction_date', '交易日期', 'date'],
  13. 'txTime': ['txTime', 'transaction_time', '交易时间', 'time'],
  14. 'txAmount': ['txAmount', 'amount', '交易金额', '金额'],
  15. 'txBalance': ['txBalance', 'balance', '余额', '交易后余额'],
  16. 'txDirection': ['txDirection', 'direction', '交易方向', '收支'],
  17. 'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'],
  18. 'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'],
  19. 'createdAt': ['createdAt', 'created_at', '创建时间']
  20. }
  21. # 必需字段
  22. REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection']
  23. @staticmethod
  24. def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]:
  25. """从JSON文件加载数据,支持字段别名"""
  26. if not os.path.exists(file_path):
  27. raise FileNotFoundError(f"数据文件不存在: {file_path}")
  28. with open(file_path, 'r', encoding='utf-8') as f:
  29. raw_data = json.load(f)
  30. if not isinstance(raw_data, list):
  31. raise ValueError("JSON文件内容必须是数组格式")
  32. if not raw_data:
  33. raise ValueError("数据文件为空")
  34. # 标准化字段名
  35. standardized_data = []
  36. for record in raw_data:
  37. standardized_record = {}
  38. for std_field, possible_names in DataManager.FIELD_MAPPING.items():
  39. for name in possible_names:
  40. if name in record:
  41. standardized_record[std_field] = record[name]
  42. break
  43. # 保留原始数据中未映射的字段
  44. for key, value in record.items():
  45. if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]:
  46. standardized_record[key] = value
  47. standardized_data.append(standardized_record)
  48. # 转换为DataFrame并优化
  49. df = pd.DataFrame(standardized_data)
  50. df = DataManager._optimize_dataframe(df)
  51. return standardized_data, df
  52. @staticmethod
  53. def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
  54. """优化DataFrame数据类型"""
  55. # 日期字段
  56. if 'txDate' in df.columns:
  57. df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date
  58. # 时间字段
  59. if 'txTime' in df.columns:
  60. df['txTime'] = df['txTime'].astype(str)
  61. # 金额字段
  62. for col in ['txAmount', 'txBalance']:
  63. if col in df.columns:
  64. df[col] = pd.to_numeric(df[col], errors='coerce')
  65. # 创建时间
  66. if 'createdAt' in df.columns:
  67. df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce')
  68. # 分类字段
  69. if 'txDirection' in df.columns:
  70. df['txDirection'] = df['txDirection'].astype('category')
  71. return df
  72. @staticmethod
  73. def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
  74. """验证数据格式"""
  75. errors = []
  76. if not data:
  77. return False, ["数据集为空"]
  78. # 检查必需字段
  79. first_record = data[0]
  80. missing_fields = []
  81. for field in DataManager.REQUIRED_FIELDS:
  82. if field not in first_record:
  83. missing_fields.append(field)
  84. if missing_fields:
  85. errors.append(f"缺少必需字段: {', '.join(missing_fields)}")
  86. return len(errors) == 0, errors
  87. @staticmethod
  88. def format_data_summary(data: List[Dict[str, Any]]) -> str:
  89. """生成数据摘要"""
  90. if not data:
  91. return "数据集为空"
  92. df = pd.DataFrame(data)
  93. summary = []
  94. summary.append(f"记录总数: {len(data)}")
  95. if 'txDate' in df.columns:
  96. summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}")
  97. if 'txAmount' in df.columns:
  98. summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}")
  99. if 'txDirection' in df.columns:
  100. direction_counts = df['txDirection'].value_counts().to_dict()
  101. summary.append(f"收支分布: {direction_counts}")
  102. return " | ".join(summary)
  103. @staticmethod
  104. def load_data_from_csv_file(file_path: str) -> List[Dict[str, Any]]:
  105. """
  106. 从CSV文件中加载数据
  107. :param file_path: json文件绝对路径
  108. :return:
  109. """
  110. if not os.path.exists(file_path):
  111. raise FileNotFoundError(f"数据文件不存在: {file_path}")
  112. with open(file_path, 'r', encoding='utf-8') as f:
  113. reader = csv.DictReader(f)
  114. json_list = [row for row in reader]
  115. if not isinstance(json_list, list):
  116. raise ValueError("JSON文件内容必须是数组格式")
  117. if not json_list:
  118. raise ValueError("数据文件为空")
  119. return json_list
  120. def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool:
  121. """
  122. 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序
  123. :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{
  124. "txId": "TX202301050001",
  125. "txDate": "2023-01-05",
  126. "txTime": "09:15",
  127. "txAmount": 3200,
  128. "txBalance": 3200,
  129. "txDirection": "收入",
  130. "txSummary": "水稻销售收入 (优质粳稻)",
  131. "txCounterparty": "金穗粮食贸易公司",
  132. "createdAt": "2025-11-30 05:57"
  133. }]
  134. :param csv_file_path: CSV 文件的路径
  135. :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入
  136. :return 是否写入成功 True:成功 False:失败
  137. """
  138. succ = True
  139. try:
  140. # 将 JSON 数据转换为 Python 的列表
  141. data = json.loads(json.dumps(json_data))
  142. # 检查数据是否为空
  143. if not data:
  144. print("JSON 数据为空,无法写入 CSV 文件")
  145. return False
  146. # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序
  147. if field_order is None:
  148. field_order = list(data[0].keys())
  149. # 打开 CSV 文件并写入数据
  150. with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
  151. writer = csv.DictWriter(csv_file, fieldnames=field_order)
  152. # 写入列名
  153. writer.writeheader()
  154. # 写入数据
  155. writer.writerows(data)
  156. print(f"数据已成功写入 {csv_file_path}")
  157. except Exception as e:
  158. print(f"写入 CSV 文件时发生错误:{e}")
  159. succ = False
  160. return succ
  161. def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool:
  162. """
  163. 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序
  164. :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{
  165. "txId": "TX202301050001",
  166. "txDate": "2023-01-05",
  167. "txTime": "09:15",
  168. "txAmount": 3200,
  169. "txBalance": 3200,
  170. "txDirection": "收入",
  171. "txSummary": "水稻销售收入 (优质粳稻)",
  172. "txCounterparty": "金穗粮食贸易公司",
  173. "createdAt": "2025-11-30 05:57"
  174. }]
  175. :param csv_file_path: CSV 文件的路径
  176. :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入
  177. :return 是否写入成功 True:成功 False:失败
  178. """
  179. succ = True
  180. try:
  181. # 将 JSON 数据转换为 Python 的列表
  182. data = json.loads(json.dumps(json_data))
  183. # 检查数据是否为空
  184. if not data:
  185. print("JSON 数据为空,无法写入 CSV 文件")
  186. return False
  187. # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序
  188. if field_order is None:
  189. field_order = list(data[0].keys())
  190. # 打开 CSV 文件并写入数据
  191. with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
  192. writer = csv.DictWriter(csv_file, fieldnames=field_order)
  193. # 写入列名
  194. writer.writeheader()
  195. # 写入数据
  196. writer.writerows(data)
  197. print(f"数据已成功写入 {csv_file_path}")
  198. except Exception as e:
  199. print(f"写入 CSV 文件时发生错误:{e}")
  200. succ = False
  201. return succ
  202. @staticmethod
  203. def json_to_csv_string(json_data: List[Dict[str, Any]], fieldnames: List[str]):
  204. """
  205. 将 JSON 数据(格式为 [{}])转换为 CSV 格式的字符串,并指定字段顺序。
  206. :param json_data: JSON 数据,格式为 [{}]
  207. :param fieldnames: 字段顺序列表
  208. :return: CSV 格式的字符串
  209. """
  210. # 检查输入数据是否为空
  211. if not json_data:
  212. raise ValueError("JSON 数据为空")
  213. # 检查字段顺序是否为空
  214. if not fieldnames:
  215. raise ValueError("字段顺序不能为空")
  216. # 使用 StringIO 来生成 CSV 字符串
  217. output = io.StringIO()
  218. writer = csv.DictWriter(output, fieldnames=fieldnames)
  219. # 写入表头
  220. writer.writeheader()
  221. # 写入每行数据
  222. for item in json_data:
  223. writer.writerow(item)
  224. # 获取生成的 CSV 字符串
  225. csv_string = output.getvalue()
  226. output.close()
  227. return csv_string