import pandas as pd import json from typing import List, Dict, Any, Tuple import os import csv import io class DataManager: """统一管理数据加载、验证和预处理""" # 字段映射(支持多种别名) FIELD_MAPPING = { 'txId': ['txId', 'transaction_id', '交易ID', 'id'], 'txDate': ['txDate', 'transaction_date', '交易日期', 'date'], 'txTime': ['txTime', 'transaction_time', '交易时间', 'time'], 'txAmount': ['txAmount', 'amount', '交易金额', '金额'], 'txBalance': ['txBalance', 'balance', '余额', '交易后余额'], 'txDirection': ['txDirection', 'direction', '交易方向', '收支'], 'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'], 'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'], 'createdAt': ['createdAt', 'created_at', '创建时间'] } # 必需字段 REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection'] @staticmethod def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]: """从JSON文件加载数据,支持字段别名""" if not os.path.exists(file_path): raise FileNotFoundError(f"数据文件不存在: {file_path}") with open(file_path, 'r', encoding='utf-8') as f: raw_data = json.load(f) if not isinstance(raw_data, list): raise ValueError("JSON文件内容必须是数组格式") if not raw_data: raise ValueError("数据文件为空") # 标准化字段名 standardized_data = [] for record in raw_data: standardized_record = {} for std_field, possible_names in DataManager.FIELD_MAPPING.items(): for name in possible_names: if name in record: standardized_record[std_field] = record[name] break # 保留原始数据中未映射的字段 for key, value in record.items(): if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]: standardized_record[key] = value standardized_data.append(standardized_record) # 转换为DataFrame并优化 df = pd.DataFrame(standardized_data) df = DataManager._optimize_dataframe(df) return standardized_data, df @staticmethod def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame: """优化DataFrame数据类型""" # 日期字段 if 'txDate' in df.columns: df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date # 时间字段 if 'txTime' in df.columns: df['txTime'] = df['txTime'].astype(str) # 金额字段 for col in ['txAmount', 'txBalance']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # 创建时间 if 'createdAt' in df.columns: df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce') # 分类字段 if 'txDirection' in df.columns: df['txDirection'] = df['txDirection'].astype('category') return df @staticmethod def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]: """验证数据格式""" errors = [] if not data: return False, ["数据集为空"] # 检查必需字段 first_record = data[0] missing_fields = [] for field in DataManager.REQUIRED_FIELDS: if field not in first_record: missing_fields.append(field) if missing_fields: errors.append(f"缺少必需字段: {', '.join(missing_fields)}") return len(errors) == 0, errors @staticmethod def format_data_summary(data: List[Dict[str, Any]]) -> str: """生成数据摘要""" if not data: return "数据集为空" df = pd.DataFrame(data) summary = [] summary.append(f"记录总数: {len(data)}") if 'txDate' in df.columns: summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}") if 'txAmount' in df.columns: summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}") if 'txDirection' in df.columns: direction_counts = df['txDirection'].value_counts().to_dict() summary.append(f"收支分布: {direction_counts}") return " | ".join(summary) @staticmethod def load_data_from_csv_file(file_path: str) -> List[Dict[str, Any]]: """ 从CSV文件中加载数据,自动转换数字类型 :param file_path: CSV文件绝对路径 :return: 处理后的数据列表 """ if not os.path.exists(file_path): raise FileNotFoundError(f"数据文件不存在: {file_path}") with open(file_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) json_list = [row for row in reader] if not isinstance(json_list, list): raise ValueError("CSV文件内容必须是数组格式") if not json_list: raise ValueError("数据文件为空") # 定义需要转换为数字的字段 numeric_fields = ['txAmount', 'txBalance'] # 对每条记录进行数字类型转换 for record in json_list: for field in numeric_fields: if field in record and record[field] is not None and record[field] != '': try: # 尝试转换为float,如果是整数则转换为int value = float(record[field]) if value == int(value): record[field] = int(value) else: record[field] = value except (ValueError, TypeError): # 如果转换失败,保持原字符串格式 pass return json_list def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool: """ 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序 :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{ "txId": "TX202301050001", "txDate": "2023-01-05", "txTime": "09:15", "txAmount": 3200, "txBalance": 3200, "txDirection": "收入", "txSummary": "水稻销售收入 (优质粳稻)", "txCounterparty": "金穗粮食贸易公司", "createdAt": "2025-11-30 05:57" }] :param csv_file_path: CSV 文件的路径 :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入 :return 是否写入成功 True:成功 False:失败 """ succ = True try: # 将 JSON 数据转换为 Python 的列表 data = json.loads(json.dumps(json_data)) # 检查数据是否为空 if not data: print("JSON 数据为空,无法写入 CSV 文件") return False # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序 if field_order is None: field_order = list(data[0].keys()) # 打开 CSV 文件并写入数据 with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=field_order) # 写入列名 writer.writeheader() # 写入数据 writer.writerows(data) print(f"数据已成功写入 {csv_file_path}") except Exception as e: print(f"写入 CSV 文件时发生错误:{e}") succ = False return succ def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool: """ 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序 :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{ "txId": "TX202301050001", "txDate": "2023-01-05", "txTime": "09:15", "txAmount": 3200, "txBalance": 3200, "txDirection": "收入", "txSummary": "水稻销售收入 (优质粳稻)", "txCounterparty": "金穗粮食贸易公司", "createdAt": "2025-11-30 05:57" }] :param csv_file_path: CSV 文件的路径 :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入 :return 是否写入成功 True:成功 False:失败 """ succ = True try: # 将 JSON 数据转换为 Python 的列表 data = json.loads(json.dumps(json_data)) # 检查数据是否为空 if not data: print("JSON 数据为空,无法写入 CSV 文件") return False # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序 if field_order is None: field_order = list(data[0].keys()) # 打开 CSV 文件并写入数据 with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=field_order) # 写入列名 writer.writeheader() # 写入数据 writer.writerows(data) print(f"数据已成功写入 {csv_file_path}") except Exception as e: print(f"写入 CSV 文件时发生错误:{e}") succ = False return succ @staticmethod def json_to_csv_string(json_data: List[Dict[str, Any]], fieldnames: List[str]): """ 将 JSON 数据(格式为 [{}])转换为 CSV 格式的字符串,并指定字段顺序。 :param json_data: JSON 数据,格式为 [{}] :param fieldnames: 字段顺序列表 :return: CSV 格式的字符串 """ # 检查输入数据是否为空 if not json_data: raise ValueError("JSON 数据为空") # 检查字段顺序是否为空 if not fieldnames: raise ValueError("字段顺序不能为空") # 使用 StringIO 来生成 CSV 字符串 output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames) # 写入表头 writer.writeheader() # 写入每行数据 for item in json_data: writer.writerow(item) # 获取生成的 CSV 字符串 csv_string = output.getvalue() output.close() return csv_string