metric_calculation_agent.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. """
  2. 指标计算Agent (Metric Calculation Agent)
  3. ====================================
  4. 此Agent负责根据意图识别结果执行具体的指标计算任务。
  5. 核心功能:
  6. 1. 配置文件加载:读取和解析JSON格式的指标计算配置文件
  7. 2. API调用管理:根据配置文件调用相应的计算API
  8. 3. 结果处理:处理API返回的数据,提取关键指标
  9. 4. 错误处理:处理API调用失败、网络异常等错误情况
  10. 5. 结果验证:验证计算结果的合理性和完整性
  11. 工作流程:
  12. 1. 接收意图识别结果和用户参数
  13. 2. 加载对应的指标计算配置文件
  14. 3. 构造API请求参数
  15. 4. 调用远程计算服务
  16. 5. 解析和验证返回结果
  17. 6. 返回结构化的计算结果
  18. 技术实现:
  19. - 支持动态加载JSON配置文件
  20. - 使用requests库进行HTTP API调用
  21. - 集成LangChain用于复杂计算逻辑(可选)
  22. - 完善的错误处理和超时机制
  23. - 支持多种计算方法(标准、高级、自定义)
  24. 配置文件结构:
  25. - api_config: API端点和认证信息
  26. - param_mapping: 参数映射规则
  27. - input_schema: 输入数据验证规则
  28. - output_schema: 输出数据结构定义
  29. - calculation_logic: 计算逻辑描述
  30. 作者: Big Agent Team
  31. 版本: 1.0.0
  32. 创建时间: 2024-12-18
  33. """
  34. import os
  35. import json
  36. import requests
  37. from datetime import datetime
  38. from typing import Dict, List, Any, Optional
  39. from langchain_openai import ChatOpenAI
  40. from langchain_core.prompts import ChatPromptTemplate
  41. import re
  42. class MetricCalculationAgent:
  43. """远程指标计算Agent"""
  44. def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
  45. """
  46. 初始化指标计算Agent
  47. Args:
  48. api_key: DeepSeek API密钥
  49. base_url: DeepSeek API基础URL
  50. """
  51. self.llm = ChatOpenAI(
  52. model="deepseek-chat",
  53. api_key=api_key,
  54. base_url=base_url,
  55. temperature=0.1
  56. )
  57. # 加载配置文件
  58. self.configs = self._load_configs()
  59. # 加载数据文件映射
  60. self.data_files = self._load_data_files()
  61. # 初始化API调用跟踪
  62. self.api_calls = []
  63. def _load_data_files(self) -> Dict[str, str]:
  64. """加载数据文件映射"""
  65. data_files = {}
  66. data_dir = "data_files"
  67. if os.path.exists(data_dir):
  68. for file in os.listdir(data_dir):
  69. if file.endswith('.json'):
  70. try:
  71. # 提取文件名中的关键词,用于匹配配置文件
  72. key = file.replace('原始数据-流水分析-', '').replace('.json', '')
  73. data_files[key] = os.path.join(data_dir, file)
  74. except Exception as e:
  75. print(f"处理数据文件 {file} 失败: {e}")
  76. return data_files
  77. def _select_data_file(self, config_name: str) -> Optional[str]:
  78. """
  79. 根据配置文件名选择对应的数据文件
  80. Args:
  81. config_name: 配置文件名
  82. Returns:
  83. 数据文件路径,如果找不到则返回None
  84. """
  85. # 配置文件名模式:指标计算-{category}-{metric}.json
  86. # 数据文件名模式:原始数据-流水分析-{category}.json
  87. # 从配置文件名中提取类别信息
  88. match = re.search(r'指标计算-(.+?)-', config_name)
  89. if match:
  90. category = match.group(1)
  91. # 优先选择原始数据文件
  92. # 1. 首先查找完全匹配的原始数据文件
  93. if category in self.data_files:
  94. file_path = self.data_files[category]
  95. if '原始数据' in file_path:
  96. return file_path
  97. # 2. 如果没有完全匹配,查找包含类别的原始数据文件
  98. for key, file_path in self.data_files.items():
  99. if category in key and '原始数据' in file_path:
  100. return file_path
  101. # 如果找不到匹配的文件,返回默认的农业原始数据文件(如果存在)
  102. if '农业' in self.data_files:
  103. return self.data_files['农业']
  104. return None
  105. def _load_table_data(self, data_file_path: str) -> List[Dict]:
  106. """加载数据文件中的表格数据"""
  107. try:
  108. with open(data_file_path, 'r', encoding='utf-8') as f:
  109. data = json.load(f)
  110. return data if isinstance(data, list) else []
  111. except Exception as e:
  112. print(f"加载数据文件 {data_file_path} 失败: {e}")
  113. return []
  114. def _load_configs(self) -> Dict[str, Dict]:
  115. """加载所有配置文件"""
  116. configs = {}
  117. json_dir = "json_files"
  118. if os.path.exists(json_dir):
  119. for file in os.listdir(json_dir):
  120. if file.endswith('.json'):
  121. try:
  122. with open(os.path.join(json_dir, file), 'r', encoding='utf-8') as f:
  123. config = json.load(f)
  124. key = file.replace('.json', '')
  125. configs[key] = config
  126. except Exception as e:
  127. print(f"加载配置文件 {file} 失败: {e}")
  128. return configs
  129. async def calculate_metrics(self, intent_result: Dict[str, Any]) -> Dict[str, Any]:
  130. """
  131. 根据意图识别结果进行指标计算
  132. Args:
  133. intent_result: 意图识别结果
  134. Returns:
  135. 指标计算结果
  136. """
  137. try:
  138. results = []
  139. target_configs = intent_result.get("target_configs", [])
  140. if not target_configs:
  141. return {
  142. "success": False,
  143. "message": "没有找到需要调用的配置文件",
  144. "results": []
  145. }
  146. for config_name in target_configs:
  147. if config_name in self.configs:
  148. config = self.configs[config_name]
  149. result = await self._call_metric_api(config, intent_result, config_name)
  150. results.append({
  151. "config_name": config_name,
  152. "result": result
  153. })
  154. else:
  155. results.append({
  156. "config_name": config_name,
  157. "error": f"配置文件 {config_name} 不存在"
  158. })
  159. return {
  160. "success": True,
  161. "results": results,
  162. "total_configs": len(target_configs),
  163. "successful_calculations": len([r for r in results if "result" in r])
  164. }
  165. except Exception as e:
  166. print(f"指标计算失败: {e}")
  167. return {
  168. "success": False,
  169. "message": f"指标计算过程中发生错误: {str(e)}",
  170. "results": []
  171. }
  172. async def _call_metric_api(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
  173. """
  174. 调用具体的指标计算API
  175. Args:
  176. config: 配置文件
  177. intent_result: 意图识别结果
  178. Returns:
  179. API调用结果
  180. """
  181. try:
  182. # 记录API调用开始
  183. start_time = datetime.now()
  184. # 使用真实API服务的配置
  185. method = "POST"
  186. url = "http://10.192.72.11:6300/api/data_analyst/full" # 真实API服务地址
  187. headers = {
  188. "Accept": "*/*",
  189. "Accept-Encoding": "gzip, deflate, br",
  190. "Connection": "keep-alive",
  191. "Content-Type": "application/json",
  192. "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
  193. }
  194. timeout = 600 # 10分钟超时,给慢API更多时间
  195. # 添加重试机制,最多重试3次
  196. max_retries = 3
  197. retry_delay = 5 # 每次重试间隔5秒
  198. for attempt in range(max_retries):
  199. try:
  200. print(f"🔄 API调用尝试 {attempt + 1}/{max_retries} (配置: {config_name})")
  201. # 准备请求数据
  202. request_data = self._prepare_request_data(config, intent_result, config_name)
  203. # 根据HTTP方法调用API
  204. if method.upper() == "GET":
  205. params = request_data.get("params", {})
  206. response = requests.get(url, headers=headers, params=params, timeout=timeout)
  207. elif method.upper() == "POST":
  208. json_data = request_data.get("json", {})
  209. response = requests.post(url, headers=headers, json=json_data, timeout=timeout)
  210. else:
  211. return {
  212. "success": False,
  213. "message": f"不支持的HTTP方法: {method}"
  214. }
  215. # 处理响应
  216. if response.status_code == 200:
  217. try:
  218. response_data = response.json()
  219. # 检查API响应结构并提取结果
  220. extracted_result = None
  221. if isinstance(response_data, dict):
  222. # 检查是否有code字段和data.result结构
  223. if response_data.get("code") == 0 and "data" in response_data:
  224. data = response_data["data"]
  225. if "result" in data:
  226. # 从result字段中提取JSON
  227. extracted_result = self._extract_json_from_result(data["result"])
  228. # 记录API调用结果
  229. end_time = datetime.now()
  230. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  231. api_call_info = {
  232. "call_id": call_id,
  233. "timestamp": end_time.isoformat(),
  234. "agent": "MetricCalculationAgent",
  235. "api_endpoint": url,
  236. "config_name": config_name,
  237. "request": {
  238. "method": method,
  239. "url": url,
  240. "headers": headers,
  241. "json_data": json_data if method.upper() == "POST" else None,
  242. "params": params if method.upper() == "GET" else None,
  243. "start_time": start_time.isoformat()
  244. },
  245. "response": {
  246. "status_code": response.status_code,
  247. "data": response_data,
  248. "extracted_result": extracted_result,
  249. "end_time": end_time.isoformat(),
  250. "duration": (end_time - start_time).total_seconds()
  251. },
  252. "success": True
  253. }
  254. self.api_calls.append(api_call_info)
  255. # 保存API结果到文件
  256. api_results_dir = "api_results"
  257. os.makedirs(api_results_dir, exist_ok=True)
  258. filename = f"{call_id}.json"
  259. filepath = os.path.join(api_results_dir, filename)
  260. try:
  261. with open(filepath, 'w', encoding='utf-8') as f:
  262. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  263. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  264. except Exception as e:
  265. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  266. return {
  267. "success": True,
  268. "data": response_data,
  269. "extracted_result": extracted_result,
  270. "status_code": response.status_code
  271. }
  272. except json.JSONDecodeError:
  273. # 记录API调用结果(JSON解析失败)
  274. end_time = datetime.now()
  275. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  276. api_call_info = {
  277. "call_id": call_id,
  278. "timestamp": end_time.isoformat(),
  279. "agent": "MetricCalculationAgent",
  280. "api_endpoint": url,
  281. "config_name": config_name,
  282. "request": {
  283. "method": method,
  284. "url": url,
  285. "headers": headers,
  286. "json_data": json_data if method.upper() == "POST" else None,
  287. "params": params if method.upper() == "GET" else None,
  288. "start_time": start_time.isoformat()
  289. },
  290. "response": {
  291. "status_code": response.status_code,
  292. "data": response.text,
  293. "error": "JSON解析失败",
  294. "end_time": end_time.isoformat(),
  295. "duration": (end_time - start_time).total_seconds()
  296. },
  297. "success": False
  298. }
  299. self.api_calls.append(api_call_info)
  300. # 保存API结果到文件
  301. api_results_dir = "api_results"
  302. os.makedirs(api_results_dir, exist_ok=True)
  303. filename = f"{call_id}.json"
  304. filepath = os.path.join(api_results_dir, filename)
  305. try:
  306. with open(filepath, 'w', encoding='utf-8') as f:
  307. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  308. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  309. except Exception as e:
  310. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  311. return {
  312. "success": True,
  313. "data": response.text,
  314. "extracted_result": None,
  315. "status_code": response.status_code
  316. }
  317. else:
  318. # 记录API调用结果(HTTP错误)
  319. end_time = datetime.now()
  320. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  321. api_call_info = {
  322. "call_id": call_id,
  323. "timestamp": end_time.isoformat(),
  324. "agent": "MetricCalculationAgent",
  325. "api_endpoint": url,
  326. "config_name": config_name,
  327. "request": {
  328. "method": method,
  329. "url": url,
  330. "headers": headers,
  331. "json_data": json_data if method.upper() == "POST" else None,
  332. "params": params if method.upper() == "GET" else None,
  333. "start_time": start_time.isoformat()
  334. },
  335. "response": {
  336. "status_code": response.status_code,
  337. "error": response.text,
  338. "end_time": end_time.isoformat(),
  339. "duration": (end_time - start_time).total_seconds()
  340. },
  341. "success": False
  342. }
  343. self.api_calls.append(api_call_info)
  344. # 保存API结果到文件
  345. api_results_dir = "api_results"
  346. os.makedirs(api_results_dir, exist_ok=True)
  347. filename = f"{call_id}.json"
  348. filepath = os.path.join(api_results_dir, filename)
  349. try:
  350. with open(filepath, 'w', encoding='utf-8') as f:
  351. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  352. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  353. except Exception as e:
  354. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  355. return {
  356. "success": False,
  357. "message": f"API调用失败,状态码: {response.status_code}",
  358. "response": response.text
  359. }
  360. # 如果执行到这里,说明本次尝试成功,跳出重试循环
  361. break
  362. except requests.exceptions.Timeout:
  363. # 记录API调用结果(超时)
  364. end_time = datetime.now()
  365. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  366. api_call_info = {
  367. "call_id": call_id,
  368. "timestamp": end_time.isoformat(),
  369. "agent": "MetricCalculationAgent",
  370. "api_endpoint": url,
  371. "config_name": config_name,
  372. "request": {
  373. "method": method,
  374. "url": url,
  375. "headers": headers,
  376. "json_data": json_data if method.upper() == "POST" else None,
  377. "params": params if method.upper() == "GET" else None,
  378. "start_time": start_time.isoformat()
  379. },
  380. "response": {
  381. "error": "API调用超时",
  382. "end_time": end_time.isoformat(),
  383. "duration": (end_time - start_time).total_seconds()
  384. },
  385. "success": False
  386. }
  387. self.api_calls.append(api_call_info)
  388. # 保存API结果到文件
  389. api_results_dir = "api_results"
  390. os.makedirs(api_results_dir, exist_ok=True)
  391. filename = f"{call_id}.json"
  392. filepath = os.path.join(api_results_dir, filename)
  393. try:
  394. with open(filepath, 'w', encoding='utf-8') as f:
  395. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  396. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  397. except Exception as e:
  398. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  399. # 如果不是最后一次尝试,等待后重试
  400. if attempt < max_retries - 1:
  401. print(f"⏳ API调用超时,{retry_delay}秒后重试...")
  402. import time
  403. time.sleep(retry_delay)
  404. continue
  405. else:
  406. return {
  407. "success": False,
  408. "message": "API调用超时"
  409. }
  410. except requests.exceptions.RequestException as e:
  411. # 记录API调用结果(请求异常)
  412. end_time = datetime.now()
  413. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  414. api_call_info = {
  415. "call_id": call_id,
  416. "timestamp": end_time.isoformat(),
  417. "agent": "MetricCalculationAgent",
  418. "api_endpoint": url,
  419. "config_name": config_name,
  420. "request": {
  421. "method": method,
  422. "url": url,
  423. "headers": headers,
  424. "json_data": json_data if method.upper() == "POST" else None,
  425. "params": params if method.upper() == "GET" else None,
  426. "start_time": start_time.isoformat()
  427. },
  428. "response": {
  429. "error": str(e),
  430. "end_time": end_time.isoformat(),
  431. "duration": (end_time - start_time).total_seconds()
  432. },
  433. "success": False
  434. }
  435. self.api_calls.append(api_call_info)
  436. # 保存API结果到文件
  437. api_results_dir = "api_results"
  438. os.makedirs(api_results_dir, exist_ok=True)
  439. filename = f"{call_id}.json"
  440. filepath = os.path.join(api_results_dir, filename)
  441. try:
  442. with open(filepath, 'w', encoding='utf-8') as f:
  443. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  444. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  445. except Exception as e:
  446. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  447. # 如果不是最后一次尝试,等待后重试
  448. if attempt < max_retries - 1:
  449. print(f"❌ API调用异常: {str(e)},{retry_delay}秒后重试...")
  450. import time
  451. time.sleep(retry_delay)
  452. continue
  453. else:
  454. return {
  455. "success": False,
  456. "message": f"API调用异常: {str(e)}"
  457. }
  458. except Exception as e:
  459. # 记录API调用结果(其他异常)
  460. end_time = datetime.now()
  461. call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  462. api_call_info = {
  463. "call_id": call_id,
  464. "timestamp": end_time.isoformat(),
  465. "agent": "MetricCalculationAgent",
  466. "api_endpoint": url,
  467. "config_name": config_name,
  468. "request": {
  469. "method": method,
  470. "url": url,
  471. "headers": headers,
  472. "json_data": json_data if method.upper() == "POST" else None,
  473. "params": params if method.upper() == "GET" else None,
  474. "start_time": start_time.isoformat()
  475. },
  476. "response": {
  477. "error": str(e),
  478. "end_time": end_time.isoformat(),
  479. "duration": (end_time - start_time).total_seconds()
  480. },
  481. "success": False
  482. }
  483. self.api_calls.append(api_call_info)
  484. # 保存API结果到文件
  485. api_results_dir = "api_results"
  486. os.makedirs(api_results_dir, exist_ok=True)
  487. filename = f"{call_id}.json"
  488. filepath = os.path.join(api_results_dir, filename)
  489. try:
  490. with open(filepath, 'w', encoding='utf-8') as f:
  491. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  492. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  493. except Exception as e:
  494. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  495. # 如果不是最后一次尝试,等待后重试
  496. if attempt < max_retries - 1:
  497. print(f"❌ 其他异常: {str(e)},{retry_delay}秒后重试...")
  498. import time
  499. time.sleep(retry_delay)
  500. continue
  501. else:
  502. return {
  503. "success": False,
  504. "message": f"API调用异常: {str(e)}"
  505. }
  506. except Exception as e:
  507. # 处理所有未捕获的异常
  508. print(f"❌ API调用过程中发生未预期的错误: {str(e)}")
  509. return {
  510. "success": False,
  511. "message": f"API调用过程中发生未预期的错误: {str(e)}"
  512. }
  513. def _prepare_request_data(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
  514. """
  515. 准备API请求数据
  516. Args:
  517. config: 配置文件
  518. intent_result: 意图识别结果
  519. config_name: 配置文件名
  520. Returns:
  521. 请求数据
  522. """
  523. # 从配置文件中获取question和prompt
  524. question = config.get("question", "")
  525. prompt = config.get("prompt", "")
  526. # 选择对应的数据文件
  527. data_file_path = self._select_data_file(config_name)
  528. table_data = []
  529. if data_file_path:
  530. table_data = self._load_table_data(data_file_path)
  531. else:
  532. print(f"警告:找不到配置文件 {config_name} 对应的数据文件")
  533. # 构造documents数组
  534. documents = []
  535. if table_data:
  536. # 使用数据文件名作为标题
  537. title = f"数据表-{config_name}"
  538. if data_file_path:
  539. title = os.path.basename(data_file_path).replace('.json', '')
  540. documents.append({
  541. "id": 1,
  542. "title": title,
  543. "text": "",
  544. "table": table_data
  545. })
  546. # 构造API请求体
  547. request_data = {
  548. "disable_planning": False,
  549. "question": question,
  550. "prompt": prompt,
  551. "documents": documents
  552. }
  553. return {"json": request_data}
  554. def _extract_json_from_result(self, result_text: str) -> Dict[str, Any]:
  555. """
  556. 从API结果文本中提取JSON内容
  557. Args:
  558. result_text: API返回的result字段内容
  559. Returns:
  560. 提取的JSON对象
  561. """
  562. import re
  563. import json
  564. try:
  565. # 查找```json和```之间的内容
  566. json_match = re.search(r'```json\s*(.*?)\s*```', result_text, re.DOTALL)
  567. if json_match:
  568. json_str = json_match.group(1).strip()
  569. return json.loads(json_str)
  570. # 如果没有```json标记,查找大括号包围的内容
  571. brace_match = re.search(r'\{.*\}', result_text, re.DOTALL)
  572. if brace_match:
  573. json_str = brace_match.group(0).strip()
  574. return json.loads(json_str)
  575. # 如果都找不到,尝试直接解析整个文本
  576. return json.loads(result_text.strip())
  577. except json.JSONDecodeError as e:
  578. print(f"JSON解析失败: {e}")
  579. return {"error": f"无法解析JSON结果: {str(e)}", "raw_result": result_text}
  580. def get_available_configs(self) -> List[str]:
  581. """获取所有可用的配置文件名"""
  582. return list(self.configs.keys())
  583. def get_config_details(self, config_name: str) -> Optional[Dict]:
  584. """获取指定配置文件的详细信息"""
  585. return self.configs.get(config_name)