| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- from llmops.config import DATA_CLASSIFY_ENGINE_PARAM_MAPPER, RULES_ENGINE_BASE_URL
- import requests
- from llmops.agents.data_manager import DataManager
- import os
- from typing import Dict, List, Any
- class DataClassifyAgent:
- """
- 数据分类打标Agent
- """
- def __init__(self):
- # 数据打标引擎入参映射
- self.DATA_CLASSIFY_ENGINE_PARAM_MAPPER = DATA_CLASSIFY_ENGINE_PARAM_MAPPER
- # 保存分类文件的表头顺序
- self.fields_order = ["txId","txDate","txTime","txAmount","txBalance","txDirection","txSummary","txCounterparty","createdAt", "businessType"]
- def invoke_data_classify(self, industry:str, data_set: list[dict], file_name: str) -> list[dict]:
- """
- 调用分类打标接口
- :param industry: 行业
- :param data_set: 数据集
- :param file_name: 文件名称
- :return:
- """
- try:
- url = f"{RULES_ENGINE_BASE_URL}/api/rules/executeKnowledge"
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
- }
- json_data = {
- "id": self.DATA_CLASSIFY_ENGINE_PARAM_MAPPER.get(industry),
- "input": {
- "transactions": data_set
- }
- }
- # 调用分类打标服务
- response = requests.post(url, headers=headers, json=json_data, timeout=30)
- if response.status_code == 200:
- # 取结果
- data_set_classified = response.json()
- if isinstance(data_set_classified, dict):
- # 取出打标数据集
- ds = data_set_classified.get('transactions', [])
- print(f"✅ 成功分类打标数量: {len(ds)}")
- # 将分类好的数据写入数据目录中
- self.save_classified_data(ds, file_name)
- return ds
- else:
- print(f"⚠️ 分类打标格式异常: {data_set_classified}")
- return []
- else:
- print(f"❌ 数据分类打标失败,状态码: {response.status_code}")
- print(f"响应内容: {response.text}")
- return []
- except Exception as e:
- print(f"❌ 调用数据分类打标时发生错误: {str(e)}")
- import traceback
- traceback.print_exc()
- return []
- def save_classified_data(self, json_data, file_name: str):
- """
- 将生成的分类数据写入CSV文件,将文件保存到 data_files目录下
- :param json_data: 分类后的数据,结构 [{}]
- :param file_name: 文件名,规则是 原名_label.csv
- :return:
- """
- # 新文件名
- new_file_name = file_name.split(".")[0] + "_label.csv"
- full_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "data_files", new_file_name)
- print(f"分类文件:{full_path}, 是否存在:{os.path.exists(full_path)}")
- succ = DataManager.write_json_to_csv(json_data=json_data, csv_file_path=full_path, field_order=self.fields_order)
- print(f"将分类数据写入文件:{full_path} {'成功' if succ else '失败'}")
- async def data_classify(industry: str, data_set: List[Dict[str, Any]], file_name: str) -> List[Dict]:
- """
- 对数据进行分类
- Args:
- industry: 行业
- data_set: 待处理数据
- file_name: 数据文件名称
- Returns:
- 分类好的数据
- """
- import asyncio
- import time
- agent = DataClassifyAgent()
- print(f"📝 开始对文件:{file_name} 数据进行分类打标...")
- data_set_classified = []
- try:
- start_time = time.time()
- # 进行分类打标
- data_set_classified = agent.invoke_data_classify(industry=industry, data_set=data_set, file_name=file_name)
- elapsed_time = time.time() - start_time
- print(f"分类打标用时:{elapsed_time:.2f} 秒")
- process_size = len(data_set_classified)
- if process_size == len(data_set):
- print(f"\n📝 分类打标处理成功, 处理记录条数:{process_size}")
- elif process_size == 0:
- print(f"\n📝 分类打标处理失败, 处理记录条数:{process_size}")
- else:
- print(f"\n📝 分类打标处理部分数据, 处理记录条数:{process_size}")
- return data_set_classified
- except Exception as e:
- elapsed_time = time.time() - start_time if 'start_time' in locals() else 0
- import traceback
- traceback.print_exc()
- print(f" 错误详情: {str(e)}")
- return data_set_classified
|