metric_calculation_agent.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. """
  2. 指标计算Agent (Metric Calculation Agent)
  3. ====================================
  4. 此Agent负责根据意图识别结果执行具体的指标计算任务。
  5. 核心功能:
  6. 1. 配置文件加载:读取和解析JSON格式的指标计算配置文件
  7. 2. API调用管理:根据配置文件调用相应的计算API
  8. 3. 结果处理:处理API返回的数据,提取关键指标
  9. 4. 错误处理:处理API调用失败、网络异常等错误情况
  10. 5. 结果验证:验证计算结果的合理性和完整性
  11. 工作流程:
  12. 1. 接收意图识别结果和用户参数
  13. 2. 加载对应的指标计算配置文件
  14. 3. 构造API请求参数
  15. 4. 调用远程计算服务
  16. 5. 解析和验证返回结果
  17. 6. 返回结构化的计算结果
  18. 技术实现:
  19. - 支持动态加载JSON配置文件
  20. - 使用requests库进行HTTP API调用
  21. - 集成LangChain用于复杂计算逻辑(可选)
  22. - 完善的错误处理和超时机制
  23. - 支持多种计算方法(标准、高级、自定义)
  24. 配置文件结构:
  25. - api_config: API端点和认证信息
  26. - param_mapping: 参数映射规则
  27. - input_schema: 输入数据验证规则
  28. - output_schema: 输出数据结构定义
  29. - calculation_logic: 计算逻辑描述
  30. 作者: Big Agent Team
  31. 版本: 1.0.0
  32. 创建时间: 2024-12-18
  33. """
  34. import os
  35. import json
  36. import requests
  37. from datetime import datetime
  38. from typing import Dict, List, Any, Optional
  39. from langchain_openai import ChatOpenAI
  40. from langchain_core.prompts import ChatPromptTemplate
  41. import re
  42. class MetricCalculationAgent:
  43. """远程指标计算Agent"""
  44. def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
  45. """
  46. 初始化指标计算Agent
  47. Args:
  48. api_key: DeepSeek API密钥
  49. base_url: DeepSeek API基础URL
  50. """
  51. self.llm = ChatOpenAI(
  52. model="deepseek-chat",
  53. api_key=api_key,
  54. base_url=base_url,
  55. temperature=0.1
  56. )
  57. # 加载配置文件
  58. self.configs = self._load_configs()
  59. # 加载数据文件映射
  60. self.data_files = self._load_data_files()
  61. # 初始化API调用跟踪
  62. self.api_calls = []
  63. def _load_data_files(self) -> Dict[str, str]:
  64. """加载数据文件映射"""
  65. data_files = {}
  66. data_dir = "data_files"
  67. if os.path.exists(data_dir):
  68. for file in os.listdir(data_dir):
  69. if file.endswith('.json'):
  70. try:
  71. # 提取文件名中的关键词,用于匹配配置文件
  72. key = file.replace('原始数据-流水分析-', '').replace('.json', '')
  73. data_files[key] = os.path.join(data_dir, file)
  74. except Exception as e:
  75. print(f"处理数据文件 {file} 失败: {e}")
  76. return data_files
  77. def _select_data_file(self, config_name: str) -> Optional[str]:
  78. """
  79. 根据配置文件名选择对应的数据文件
  80. Args:
  81. config_name: 配置文件名
  82. Returns:
  83. 数据文件路径,如果找不到则返回None
  84. """
  85. # 配置文件名模式:指标计算-{category}-{metric}.json
  86. # 数据文件名模式:原始数据-流水分析-{category}.json
  87. # 从配置文件名中提取类别信息
  88. match = re.search(r'指标计算-(.+?)-', config_name)
  89. if match:
  90. category = match.group(1)
  91. # 优先选择原始数据文件
  92. # 1. 首先查找完全匹配的原始数据文件
  93. if category in self.data_files:
  94. file_path = self.data_files[category]
  95. if '原始数据' in file_path:
  96. return file_path
  97. # 2. 如果没有完全匹配,查找包含类别的原始数据文件
  98. for key, file_path in self.data_files.items():
  99. if category in key and '原始数据' in file_path:
  100. return file_path
  101. # 如果找不到匹配的文件,返回默认的农业原始数据文件(如果存在)
  102. if '农业' in self.data_files:
  103. return self.data_files['农业']
  104. return None
  105. def _load_table_data(self, data_file_path: str) -> List[Dict]:
  106. """加载数据文件中的表格数据"""
  107. try:
  108. with open(data_file_path, 'r', encoding='utf-8') as f:
  109. data = json.load(f)
  110. return data if isinstance(data, list) else []
  111. except Exception as e:
  112. print(f"加载数据文件 {data_file_path} 失败: {e}")
  113. return []
  114. def _load_configs(self) -> Dict[str, Dict]:
  115. """加载所有配置文件"""
  116. configs = {}
  117. json_dir = "json_files"
  118. if os.path.exists(json_dir):
  119. for file in os.listdir(json_dir):
  120. if file.endswith('.json'):
  121. try:
  122. with open(os.path.join(json_dir, file), 'r', encoding='utf-8') as f:
  123. config = json.load(f)
  124. key = file.replace('.json', '')
  125. configs[key] = config
  126. except Exception as e:
  127. print(f"加载配置文件 {file} 失败: {e}")
  128. return configs
  129. async def calculate_metrics(self, intent_result: Dict[str, Any]) -> Dict[str, Any]:
  130. """
  131. 根据意图识别结果进行指标计算
  132. Args:
  133. intent_result: 意图识别结果
  134. Returns:
  135. 指标计算结果
  136. """
  137. try:
  138. results = []
  139. target_configs = intent_result.get("target_configs", [])
  140. if not target_configs:
  141. return {
  142. "success": False,
  143. "message": "没有找到需要调用的配置文件",
  144. "results": []
  145. }
  146. for config_name in target_configs:
  147. if config_name in self.configs:
  148. config = self.configs[config_name]
  149. result = await self._call_metric_api(config, intent_result, config_name)
  150. results.append({
  151. "config_name": config_name,
  152. "result": result
  153. })
  154. else:
  155. results.append({
  156. "config_name": config_name,
  157. "error": f"配置文件 {config_name} 不存在"
  158. })
  159. return {
  160. "success": True,
  161. "results": results,
  162. "total_configs": len(target_configs),
  163. "successful_calculations": len([r for r in results if "result" in r])
  164. }
  165. except Exception as e:
  166. print(f"指标计算失败: {e}")
  167. return {
  168. "success": False,
  169. "message": f"指标计算过程中发生错误: {str(e)}",
  170. "results": []
  171. }
  172. async def _call_metric_api(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
  173. """
  174. 调用具体的指标计算API
  175. Args:
  176. config: 配置文件
  177. intent_result: 意图识别结果
  178. Returns:
  179. API调用结果
  180. """
  181. try:
  182. # 记录API调用开始
  183. start_time = datetime.now()
  184. # 使用真实API服务的配置
  185. method = "POST"
  186. url = "http://10.192.72.11:6300/api/data_analyst/full" # 真实API服务地址
  187. headers = {
  188. "Accept": "*/*",
  189. "Accept-Encoding": "gzip, deflate, br",
  190. "Connection": "keep-alive",
  191. "Content-Type": "application/json",
  192. "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
  193. }
  194. timeout = 600 # 10分钟超时,给慢API更多时间
  195. # 添加重试机制,最多重试3次
  196. max_retries = 3
  197. retry_delay = 5 # 每次重试间隔5秒
  198. for attempt in range(max_retries):
  199. try:
  200. print(f"🔄 API调用尝试 {attempt + 1}/{max_retries} (配置: {config_name})")
  201. # 准备请求数据
  202. request_data = self._prepare_request_data(config, intent_result, config_name)
  203. # 根据HTTP方法调用API
  204. if method.upper() == "GET":
  205. params = request_data.get("params", {})
  206. response = requests.get(url, headers=headers, params=params, timeout=timeout)
  207. elif method.upper() == "POST":
  208. json_data = request_data.get("json", {})
  209. response = requests.post(url, headers=headers, json=json_data, timeout=timeout)
  210. else:
  211. return {
  212. "success": False,
  213. "message": f"不支持的HTTP方法: {method}"
  214. }
  215. # 处理响应
  216. if response.status_code == 200:
  217. try:
  218. response_data = response.json()
  219. # 检查API响应结构并提取结果
  220. extracted_result = None
  221. if isinstance(response_data, dict):
  222. # 检查是否有code字段和data.result结构
  223. if response_data.get("code") == 0 and "data" in response_data:
  224. data = response_data["data"]
  225. if "result" in data:
  226. # 从result字段中提取JSON
  227. extracted_result = self._extract_json_from_result(data["result"])
  228. # 记录API调用结果 - 简化版
  229. end_time = datetime.now()
  230. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  231. api_call_info = {
  232. "call_id": call_id,
  233. "response": {
  234. "status_code": response.status_code,
  235. "data": response_data,
  236. "extracted_result": extracted_result
  237. }
  238. }
  239. self.api_calls.append(api_call_info)
  240. # 保存API结果到文件 - 成功状态
  241. # 使用运行ID创建独立的文件夹
  242. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  243. api_results_dir = f"api_results_{run_id}"
  244. os.makedirs(api_results_dir, exist_ok=True)
  245. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  246. filename = f"{timestamp}_{call_id}_success.json"
  247. filepath = os.path.join(api_results_dir, filename)
  248. try:
  249. with open(filepath, 'w', encoding='utf-8') as f:
  250. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  251. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  252. except Exception as e:
  253. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  254. return {
  255. "success": True,
  256. "data": response_data,
  257. "extracted_result": extracted_result,
  258. "status_code": response.status_code
  259. }
  260. except json.JSONDecodeError:
  261. # 记录API调用结果(JSON解析失败)- 简化版
  262. end_time = datetime.now()
  263. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  264. api_call_info = {
  265. "call_id": call_id,
  266. "response": {
  267. "status_code": response.status_code,
  268. "data": response.text,
  269. "error": "JSON解析失败"
  270. }
  271. }
  272. self.api_calls.append(api_call_info)
  273. # 保存API结果到文件 - 失败状态
  274. # 使用运行ID创建独立的文件夹
  275. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  276. api_results_dir = f"api_results_{run_id}"
  277. os.makedirs(api_results_dir, exist_ok=True)
  278. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  279. filename = f"{timestamp}_{call_id}_fail.json"
  280. filepath = os.path.join(api_results_dir, filename)
  281. try:
  282. with open(filepath, 'w', encoding='utf-8') as f:
  283. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  284. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  285. except Exception as e:
  286. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  287. return {
  288. "success": True,
  289. "data": response.text,
  290. "extracted_result": None,
  291. "status_code": response.status_code
  292. }
  293. else:
  294. # 记录API调用结果(HTTP错误)- 简化版
  295. end_time = datetime.now()
  296. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  297. api_call_info = {
  298. "call_id": call_id,
  299. "response": {
  300. "status_code": response.status_code,
  301. "error": response.text
  302. }
  303. }
  304. self.api_calls.append(api_call_info)
  305. # 保存API结果到文件 - 失败状态
  306. # 使用运行ID创建独立的文件夹
  307. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  308. api_results_dir = f"api_results_{run_id}"
  309. os.makedirs(api_results_dir, exist_ok=True)
  310. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  311. filename = f"{timestamp}_{call_id}_fail.json"
  312. filepath = os.path.join(api_results_dir, filename)
  313. try:
  314. with open(filepath, 'w', encoding='utf-8') as f:
  315. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  316. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  317. except Exception as e:
  318. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  319. return {
  320. "success": False,
  321. "message": f"API调用失败,状态码: {response.status_code}",
  322. "response": response.text
  323. }
  324. # 如果执行到这里,说明本次尝试成功,跳出重试循环
  325. break
  326. except requests.exceptions.Timeout:
  327. # 记录API调用结果(超时)- 简化版
  328. end_time = datetime.now()
  329. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  330. api_call_info = {
  331. "call_id": call_id,
  332. "response": {
  333. "error": "API调用超时"
  334. }
  335. }
  336. self.api_calls.append(api_call_info)
  337. # 保存API结果到文件
  338. # 使用运行ID创建独立的文件夹
  339. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  340. api_results_dir = f"api_results_{run_id}"
  341. os.makedirs(api_results_dir, exist_ok=True)
  342. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  343. filename = f"{timestamp}_{call_id}_fail.json"
  344. filepath = os.path.join(api_results_dir, filename)
  345. try:
  346. with open(filepath, 'w', encoding='utf-8') as f:
  347. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  348. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  349. except Exception as e:
  350. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  351. # 如果不是最后一次尝试,等待后重试
  352. if attempt < max_retries - 1:
  353. print(f"⏳ API调用超时,{retry_delay}秒后重试...")
  354. import time
  355. time.sleep(retry_delay)
  356. continue
  357. else:
  358. return {
  359. "success": False,
  360. "message": "API调用超时"
  361. }
  362. except requests.exceptions.RequestException as e:
  363. # 记录API调用结果(请求异常)- 简化版
  364. end_time = datetime.now()
  365. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  366. api_call_info = {
  367. "call_id": call_id,
  368. "response": {
  369. "error": str(e)
  370. }
  371. }
  372. self.api_calls.append(api_call_info)
  373. # 保存API结果到文件
  374. # 使用运行ID创建独立的文件夹
  375. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  376. api_results_dir = f"api_results_{run_id}"
  377. os.makedirs(api_results_dir, exist_ok=True)
  378. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  379. filename = f"{timestamp}_{call_id}_fail.json"
  380. filepath = os.path.join(api_results_dir, filename)
  381. try:
  382. with open(filepath, 'w', encoding='utf-8') as f:
  383. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  384. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  385. except Exception as e:
  386. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  387. # 如果不是最后一次尝试,等待后重试
  388. if attempt < max_retries - 1:
  389. print(f"❌ API调用异常: {str(e)},{retry_delay}秒后重试...")
  390. import time
  391. time.sleep(retry_delay)
  392. continue
  393. else:
  394. return {
  395. "success": False,
  396. "message": f"API调用异常: {str(e)}"
  397. }
  398. except Exception as e:
  399. # 记录API调用结果(其他异常)- 简化版
  400. end_time = datetime.now()
  401. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  402. api_call_info = {
  403. "call_id": call_id,
  404. "response": {
  405. "error": str(e)
  406. }
  407. }
  408. self.api_calls.append(api_call_info)
  409. # 保存API结果到文件
  410. # 使用运行ID创建独立的文件夹
  411. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  412. api_results_dir = f"api_results_{run_id}"
  413. os.makedirs(api_results_dir, exist_ok=True)
  414. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  415. filename = f"{timestamp}_{call_id}_fail.json"
  416. filepath = os.path.join(api_results_dir, filename)
  417. try:
  418. with open(filepath, 'w', encoding='utf-8') as f:
  419. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  420. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  421. except Exception as e:
  422. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  423. # 如果不是最后一次尝试,等待后重试
  424. if attempt < max_retries - 1:
  425. print(f"❌ 其他异常: {str(e)},{retry_delay}秒后重试...")
  426. import time
  427. time.sleep(retry_delay)
  428. continue
  429. else:
  430. return {
  431. "success": False,
  432. "message": f"API调用异常: {str(e)}"
  433. }
  434. except Exception as e:
  435. # 处理所有未捕获的异常
  436. print(f"❌ API调用过程中发生未预期的错误: {str(e)}")
  437. return {
  438. "success": False,
  439. "message": f"API调用过程中发生未预期的错误: {str(e)}"
  440. }
  441. def _prepare_request_data(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
  442. """
  443. 准备API请求数据
  444. Args:
  445. config: 配置文件
  446. intent_result: 意图识别结果
  447. config_name: 配置文件名
  448. Returns:
  449. 请求数据
  450. """
  451. # 从配置文件中获取question和prompt
  452. question = config.get("question", "")
  453. prompt = config.get("prompt", "")
  454. # 选择对应的数据文件
  455. data_file_path = self._select_data_file(config_name)
  456. table_data = []
  457. if data_file_path:
  458. table_data = self._load_table_data(data_file_path)
  459. else:
  460. print(f"警告:找不到配置文件 {config_name} 对应的数据文件")
  461. # 构造documents数组
  462. documents = []
  463. if table_data:
  464. # 使用数据文件名作为标题
  465. title = f"数据表-{config_name}"
  466. if data_file_path:
  467. title = os.path.basename(data_file_path).replace('.json', '')
  468. documents.append({
  469. "id": 1,
  470. "title": title,
  471. "text": "",
  472. "table": table_data
  473. })
  474. # 构造API请求体
  475. request_data = {
  476. "disable_planning": False,
  477. "question": question,
  478. "prompt": prompt,
  479. "documents": documents
  480. }
  481. return {"json": request_data}
  482. def _extract_json_from_result(self, result_text: str) -> Dict[str, Any]:
  483. """
  484. 从API结果文本中提取JSON内容
  485. Args:
  486. result_text: API返回的result字段内容
  487. Returns:
  488. 提取的JSON对象
  489. """
  490. import re
  491. import json
  492. try:
  493. # 查找```json和```之间的内容
  494. json_match = re.search(r'```json\s*(.*?)\s*```', result_text, re.DOTALL)
  495. if json_match:
  496. json_str = json_match.group(1).strip()
  497. return json.loads(json_str)
  498. # 如果没有```json标记,查找大括号包围的内容
  499. brace_match = re.search(r'\{.*\}', result_text, re.DOTALL)
  500. if brace_match:
  501. json_str = brace_match.group(0).strip()
  502. return json.loads(json_str)
  503. # 如果都找不到,尝试直接解析整个文本
  504. return json.loads(result_text.strip())
  505. except json.JSONDecodeError as e:
  506. print(f"JSON解析失败: {e}")
  507. return {"error": f"无法解析JSON结果: {str(e)}", "raw_result": result_text}
  508. def get_available_configs(self) -> List[str]:
  509. """获取所有可用的配置文件名"""
  510. return list(self.configs.keys())
  511. def get_config_details(self, config_name: str) -> Optional[Dict]:
  512. """获取指定配置文件的详细信息"""
  513. return self.configs.get(config_name)