| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- import pandas as pd
- import json
- from typing import List, Dict, Any, Tuple
- import os
- import csv
- import io
- class DataManager:
- """统一管理数据加载、验证和预处理"""
- # 字段映射(支持多种别名)
- FIELD_MAPPING = {
- 'txId': ['txId', 'transaction_id', '交易ID', 'id'],
- 'txDate': ['txDate', 'transaction_date', '交易日期', 'date'],
- 'txTime': ['txTime', 'transaction_time', '交易时间', 'time'],
- 'txAmount': ['txAmount', 'amount', '交易金额', '金额'],
- 'txBalance': ['txBalance', 'balance', '余额', '交易后余额'],
- 'txDirection': ['txDirection', 'direction', '交易方向', '收支'],
- 'txSummary': ['txSummary', 'summary', '交易摘要', '摘要', '说明'],
- 'txCounterparty': ['txCounterparty', 'counterparty', '交易对手', '对方账户'],
- 'createdAt': ['createdAt', 'created_at', '创建时间']
- }
- # 必需字段
- REQUIRED_FIELDS = ['txId', 'txDate', 'txAmount', 'txDirection']
- @staticmethod
- def load_from_file(file_path: str) -> Tuple[List[Dict[str, Any]], pd.DataFrame]:
- """从JSON文件加载数据,支持字段别名"""
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"数据文件不存在: {file_path}")
- with open(file_path, 'r', encoding='utf-8') as f:
- raw_data = json.load(f)
- if not isinstance(raw_data, list):
- raise ValueError("JSON文件内容必须是数组格式")
- if not raw_data:
- raise ValueError("数据文件为空")
- # 标准化字段名
- standardized_data = []
- for record in raw_data:
- standardized_record = {}
- for std_field, possible_names in DataManager.FIELD_MAPPING.items():
- for name in possible_names:
- if name in record:
- standardized_record[std_field] = record[name]
- break
- # 保留原始数据中未映射的字段
- for key, value in record.items():
- if key not in [name for names in DataManager.FIELD_MAPPING.values() for name in names]:
- standardized_record[key] = value
- standardized_data.append(standardized_record)
- # 转换为DataFrame并优化
- df = pd.DataFrame(standardized_data)
- df = DataManager._optimize_dataframe(df)
- return standardized_data, df
- @staticmethod
- def _optimize_dataframe(df: pd.DataFrame) -> pd.DataFrame:
- """优化DataFrame数据类型"""
- # 日期字段
- if 'txDate' in df.columns:
- df['txDate'] = pd.to_datetime(df['txDate'], errors='coerce').dt.date
- # 时间字段
- if 'txTime' in df.columns:
- df['txTime'] = df['txTime'].astype(str)
- # 金额字段
- for col in ['txAmount', 'txBalance']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- # 创建时间
- if 'createdAt' in df.columns:
- df['createdAt'] = pd.to_datetime(df['createdAt'], errors='coerce')
- # 分类字段
- if 'txDirection' in df.columns:
- df['txDirection'] = df['txDirection'].astype('category')
- return df
- @staticmethod
- def validate_data_schema(data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]:
- """验证数据格式"""
- errors = []
- if not data:
- return False, ["数据集为空"]
- # 检查必需字段
- first_record = data[0]
- missing_fields = []
- for field in DataManager.REQUIRED_FIELDS:
- if field not in first_record:
- missing_fields.append(field)
- if missing_fields:
- errors.append(f"缺少必需字段: {', '.join(missing_fields)}")
- return len(errors) == 0, errors
- @staticmethod
- def format_data_summary(data: List[Dict[str, Any]]) -> str:
- """生成数据摘要"""
- if not data:
- return "数据集为空"
- df = pd.DataFrame(data)
- summary = []
- summary.append(f"记录总数: {len(data)}")
- if 'txDate' in df.columns:
- summary.append(f"日期范围: {df['txDate'].min()} 至 {df['txDate'].max()}")
- if 'txAmount' in df.columns:
- summary.append(f"金额范围: {df['txAmount'].min()} 至 {df['txAmount'].max()}")
- if 'txDirection' in df.columns:
- direction_counts = df['txDirection'].value_counts().to_dict()
- summary.append(f"收支分布: {direction_counts}")
- return " | ".join(summary)
- @staticmethod
- def load_data_from_csv_file(file_path: str) -> List[Dict[str, Any]]:
- """
- 从CSV文件中加载数据,自动转换数字类型
- :param file_path: CSV文件绝对路径
- :return: 处理后的数据列表
- """
- if not os.path.exists(file_path):
- raise FileNotFoundError(f"数据文件不存在: {file_path}")
- with open(file_path, 'r', encoding='utf-8') as f:
- reader = csv.DictReader(f)
- json_list = [row for row in reader]
- if not isinstance(json_list, list):
- raise ValueError("CSV文件内容必须是数组格式")
- if not json_list:
- raise ValueError("数据文件为空")
- # 定义需要转换为数字的字段
- numeric_fields = ['txAmount', 'txBalance']
- # 对每条记录进行数字类型转换
- for record in json_list:
- for field in numeric_fields:
- if field in record and record[field] is not None and record[field] != '':
- try:
- # 尝试转换为float,如果是整数则转换为int
- value = float(record[field])
- if value == int(value):
- record[field] = int(value)
- else:
- record[field] = value
- except (ValueError, TypeError):
- # 如果转换失败,保持原字符串格式
- pass
- return json_list
- def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool:
- """
- 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序
- :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{
- "txId": "TX202301050001",
- "txDate": "2023-01-05",
- "txTime": "09:15",
- "txAmount": 3200,
- "txBalance": 3200,
- "txDirection": "收入",
- "txSummary": "水稻销售收入 (优质粳稻)",
- "txCounterparty": "金穗粮食贸易公司",
- "createdAt": "2025-11-30 05:57"
- }]
- :param csv_file_path: CSV 文件的路径
- :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入
- :return 是否写入成功 True:成功 False:失败
- """
- succ = True
- try:
- # 将 JSON 数据转换为 Python 的列表
- data = json.loads(json.dumps(json_data))
- # 检查数据是否为空
- if not data:
- print("JSON 数据为空,无法写入 CSV 文件")
- return False
- # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序
- if field_order is None:
- field_order = list(data[0].keys())
- # 打开 CSV 文件并写入数据
- with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
- writer = csv.DictWriter(csv_file, fieldnames=field_order)
- # 写入列名
- writer.writeheader()
- # 写入数据
- writer.writerows(data)
- print(f"数据已成功写入 {csv_file_path}")
- except Exception as e:
- print(f"写入 CSV 文件时发生错误:{e}")
- succ = False
- return succ
- def write_json_to_csv(json_data, csv_file_path, field_order=None) -> bool:
- """
- 将符合 [{}] 结构的 JSON 对象写入 CSV 文件,并允许指定字段顺序
- :param json_data: 符合 [{}] 结构的 JSON 对象,例如 [{
- "txId": "TX202301050001",
- "txDate": "2023-01-05",
- "txTime": "09:15",
- "txAmount": 3200,
- "txBalance": 3200,
- "txDirection": "收入",
- "txSummary": "水稻销售收入 (优质粳稻)",
- "txCounterparty": "金穗粮食贸易公司",
- "createdAt": "2025-11-30 05:57"
- }]
- :param csv_file_path: CSV 文件的路径
- :param field_order: 字段顺序列表,例如 ["txId", "txDate", "txTime"...]。如果未指定,则按字典键的顺序写入
- :return 是否写入成功 True:成功 False:失败
- """
- succ = True
- try:
- # 将 JSON 数据转换为 Python 的列表
- data = json.loads(json.dumps(json_data))
- # 检查数据是否为空
- if not data:
- print("JSON 数据为空,无法写入 CSV 文件")
- return False
- # 如果未指定字段顺序,则使用第一个字典的键作为字段顺序
- if field_order is None:
- field_order = list(data[0].keys())
- # 打开 CSV 文件并写入数据
- with open(csv_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
- writer = csv.DictWriter(csv_file, fieldnames=field_order)
- # 写入列名
- writer.writeheader()
- # 写入数据
- writer.writerows(data)
- print(f"数据已成功写入 {csv_file_path}")
- except Exception as e:
- print(f"写入 CSV 文件时发生错误:{e}")
- succ = False
- return succ
- @staticmethod
- def json_to_csv_string(json_data: List[Dict[str, Any]], fieldnames: List[str]):
- """
- 将 JSON 数据(格式为 [{}])转换为 CSV 格式的字符串,并指定字段顺序。
- :param json_data: JSON 数据,格式为 [{}]
- :param fieldnames: 字段顺序列表
- :return: CSV 格式的字符串
- """
- # 检查输入数据是否为空
- if not json_data:
- raise ValueError("JSON 数据为空")
- # 检查字段顺序是否为空
- if not fieldnames:
- raise ValueError("字段顺序不能为空")
- # 使用 StringIO 来生成 CSV 字符串
- output = io.StringIO()
- writer = csv.DictWriter(output, fieldnames=fieldnames)
- # 写入表头
- writer.writeheader()
- # 写入每行数据
- for item in json_data:
- writer.writerow(item)
- # 获取生成的 CSV 字符串
- csv_string = output.getvalue()
- output.close()
- return csv_string
|