data_manager.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. import pandas as pd
  2. import json
  3. from typing import List, Dict, Any, Tuple
  4. import os
  5. import csv
  6. import io
  7. class DataManager:
  8. """统一管理数据加载、验证和预处理"""
  9. # 字段映射(支持多种别名)
  10. FIELD_MAPPING = {
  11. 'txId': ['txId', 'transaction_id', '交易ID', 'id'],
  12. 'txDate': ['txDate', 'transaction_date', '交易日期', 'date'],
  13. 'txTime': ['txTime', 'transaction_time', '交易时间', 'time'],
  14. 'txAmount': ['txAmount', 'amount', '交易金额', '金额'],
  15. 'txBalance': ['txBalance', 'balance', '余额', '交易后余额'],
  16. 'txDirection': ['txDirection', 'direction', '交易方向', '收支'],
  17. 'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'],
  18. 'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'],
  19. 'createdAt': ['createdAt', 'created_at', '创建时间']
  20. }
  21. # 必需字段
  22. REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection']
  23. @staticmethod
  24. def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]:
  25. """从JSON文件加载数据,支持字段别名"""
  26. if not os.path.exists(file_path):
  27. raise FileNotFoundError(f"数据文件不存在: {file_path}")
  28. with open(file_path, 'r', encoding='utf-8') as f:
  29. raw_data = json.load(f)
  30. if not isinstance(raw_data, list):
  31. raise ValueError("JSON文件内容必须是数组格式")
  32. if not raw_data:
  33. raise ValueError("数据文件为空")
  34. # 标准化字段名
  35. standardized_data = []
  36. for record in raw_data:
  37. standardized_record = {}
  38. for std_field, possible_names in DataManager.FIELD_MAPPING.items():
  39. for name in possible_names:
  40. if name in record:
  41. standardized_record[std_field] = record[name]
  42. break
  43. # 保留原始数据中未映射的字段
  44. for key, value in record.items():
  45. if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]:
  46. standardized_record[key] = value
  47. standardized_data.append(standardized_record)
  48. # 转换为DataFrame并优化
  49. df = pd.DataFrame(standardized_data)
  50. df = DataManager._optimize_dataframe(df)
  51. return standardized_data, df
  52. @staticmethod
  53. def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
  54. """优化DataFrame数据类型"""
  55. # 日期字段
  56. if 'txDate' in df.columns:
  57. df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date
  58. # 时间字段
  59. if 'txTime' in df.columns:
  60. df['txTime'] = df['txTime'].astype(str)
  61. # 金额字段
  62. for col in ['txAmount', 'txBalance']:
  63. if col in df.columns:
  64. df[col] = pd.to_numeric(df[col], errors='coerce')
  65. # 创建时间
  66. if 'createdAt' in df.columns:
  67. df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce')
  68. # 分类字段
  69. if 'txDirection' in df.columns:
  70. df['txDirection'] = df['txDirection'].astype('category')
  71. return df
  72. @staticmethod
  73. def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
  74. """验证数据格式"""
  75. errors = []
  76. if not data:
  77. return False, ["数据集为空"]
  78. # 检查必需字段
  79. first_record = data[0]
  80. missing_fields = []
  81. for field in DataManager.REQUIRED_FIELDS:
  82. if field not in first_record:
  83. missing_fields.append(field)
  84. if missing_fields:
  85. errors.append(f"缺少必需字段: {', '.join(missing_fields)}")
  86. return len(errors) == 0, errors
  87. @staticmethod
  88. def format_data_summary(data: List[Dict[str, Any]]) -> str:
  89. """生成数据摘要"""
  90. if not data:
  91. return "数据集为空"
  92. df = pd.DataFrame(data)
  93. summary = []
  94. summary.append(f"记录总数: {len(data)}")
  95. if 'txDate' in df.columns:
  96. summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}")
  97. if 'txAmount' in df.columns:
  98. summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}")
  99. if 'txDirection' in df.columns:
  100. direction_counts = df['txDirection'].value_counts().to_dict()
  101. summary.append(f"收支分布: {direction_counts}")
  102. return " | ".join(summary)
  103. @staticmethod
  104. def load_data_from_csv_file(file_path: str) -> List[Dict[str, Any]]:
  105. """
  106. 从CSV文件中加载数据,自动转换数字类型
  107. :param file_path: CSV文件绝对路径
  108. :return: 处理后的数据列表
  109. """
  110. if not os.path.exists(file_path):
  111. raise FileNotFoundError(f"数据文件不存在: {file_path}")
  112. with open(file_path, 'r', encoding='utf-8') as f:
  113. reader = csv.DictReader(f)
  114. json_list = [row for row in reader]
  115. if not isinstance(json_list, list):
  116. raise ValueError("CSV文件内容必须是数组格式")
  117. if not json_list:
  118. raise ValueError("数据文件为空")
  119. # 定义需要转换为数字的字段
  120. numeric_fields = ['txAmount', 'txBalance']
  121. # 对每条记录进行数字类型转换
  122. for record in json_list:
  123. for field in numeric_fields:
  124. if field in record and record[field] is not None and record[field] != '':
  125. try:
  126. # 尝试转换为float,如果是整数则转换为int
  127. value = float(record[field])
  128. if value == int(value):
  129. record[field] = int(value)
  130. else:
  131. record[field] = value
  132. except (ValueError, TypeError):
  133. # 如果转换失败,保持原字符串格式
  134. pass
  135. return json_list
  136. def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool:
  137. """
  138. 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序
  139. :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{
  140. "txId": "TX202301050001",
  141. "txDate": "2023-01-05",
  142. "txTime": "09:15",
  143. "txAmount": 3200,
  144. "txBalance": 3200,
  145. "txDirection": "收入",
  146. "txSummary": "水稻销售收入 (优质粳稻)",
  147. "txCounterparty": "金穗粮食贸易公司",
  148. "createdAt": "2025-11-30 05:57"
  149. }]
  150. :param csv_file_path: CSV 文件的路径
  151. :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入
  152. :return 是否写入成功 True:成功 False:失败
  153. """
  154. succ = True
  155. try:
  156. # 将 JSON 数据转换为 Python 的列表
  157. data = json.loads(json.dumps(json_data))
  158. # 检查数据是否为空
  159. if not data:
  160. print("JSON 数据为空,无法写入 CSV 文件")
  161. return False
  162. # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序
  163. if field_order is None:
  164. field_order = list(data[0].keys())
  165. # 打开 CSV 文件并写入数据
  166. with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
  167. writer = csv.DictWriter(csv_file, fieldnames=field_order)
  168. # 写入列名
  169. writer.writeheader()
  170. # 写入数据
  171. writer.writerows(data)
  172. print(f"数据已成功写入 {csv_file_path}")
  173. except Exception as e:
  174. print(f"写入 CSV 文件时发生错误:{e}")
  175. succ = False
  176. return succ
  177. def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool:
  178. """
  179. 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序
  180. :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{
  181. "txId": "TX202301050001",
  182. "txDate": "2023-01-05",
  183. "txTime": "09:15",
  184. "txAmount": 3200,
  185. "txBalance": 3200,
  186. "txDirection": "收入",
  187. "txSummary": "水稻销售收入 (优质粳稻)",
  188. "txCounterparty": "金穗粮食贸易公司",
  189. "createdAt": "2025-11-30 05:57"
  190. }]
  191. :param csv_file_path: CSV 文件的路径
  192. :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入
  193. :return 是否写入成功 True:成功 False:失败
  194. """
  195. succ = True
  196. try:
  197. # 将 JSON 数据转换为 Python 的列表
  198. data = json.loads(json.dumps(json_data))
  199. # 检查数据是否为空
  200. if not data:
  201. print("JSON 数据为空,无法写入 CSV 文件")
  202. return False
  203. # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序
  204. if field_order is None:
  205. field_order = list(data[0].keys())
  206. # 打开 CSV 文件并写入数据
  207. with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
  208. writer = csv.DictWriter(csv_file, fieldnames=field_order)
  209. # 写入列名
  210. writer.writeheader()
  211. # 写入数据
  212. writer.writerows(data)
  213. print(f"数据已成功写入 {csv_file_path}")
  214. except Exception as e:
  215. print(f"写入 CSV 文件时发生错误:{e}")
  216. succ = False
  217. return succ
  218. @staticmethod
  219. def json_to_csv_string(json_data: List[Dict[str, Any]], fieldnames: List[str]):
  220. """
  221. 将 JSON 数据(格式为 [{}])转换为 CSV 格式的字符串,并指定字段顺序。
  222. :param json_data: JSON 数据,格式为 [{}]
  223. :param fieldnames: 字段顺序列表
  224. :return: CSV 格式的字符串
  225. """
  226. # 检查输入数据是否为空
  227. if not json_data:
  228. raise ValueError("JSON 数据为空")
  229. # 检查字段顺序是否为空
  230. if not fieldnames:
  231. raise ValueError("字段顺序不能为空")
  232. # 使用 StringIO 来生成 CSV 字符串
  233. output = io.StringIO()
  234. writer = csv.DictWriter(output, fieldnames=fieldnames)
  235. # 写入表头
  236. writer.writeheader()
  237. # 写入每行数据
  238. for item in json_data:
  239. writer.writerow(item)
  240. # 获取生成的 CSV 字符串
  241. csv_string = output.getvalue()
  242. output.close()
  243. return csv_string