rules_engine_metric_calculation_agent.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. """
  2. 规则引擎指标计算Agent (Rules Engine Metric Calculation Agent)
  3. ===========================================================
  4. 此Agent负责根据意图识别结果执行规则引擎模式的指标计算任务。
  5. 核心功能:
  6. 1. 配置文件加载:读取和解析规则引擎指标计算配置文件
  7. 2. API调用管理:根据配置文件调用规则引擎API
  8. 3. 结果处理:处理API返回的数据,提取关键指标
  9. 4. 错误处理:处理API调用失败、网络异常等错误情况
  10. 5. 结果验证:验证计算结果的合理性和完整性
  11. 工作流程:
  12. 1. 接收意图识别结果和用户参数
  13. 2. 加载对应的规则引擎指标计算配置文件
  14. 3. 构造API请求参数(id和input)
  15. 4. 调用远程规则引擎服务
  16. 5. 解析和验证返回结果
  17. 6. 返回结构化的计算结果
  18. 技术实现:
  19. - 支持动态加载JSON配置文件
  20. - 使用requests库进行HTTP API调用
  21. - 集成LangChain用于复杂计算逻辑(可选)
  22. - 完善的错误处理和超时机制
  23. - 支持多种计算方法(标准、高级、自定义)
  24. 配置文件结构:
  25. - id: 规则引擎执行ID
  26. - input: 数据文件路径
  27. - description: 规则描述
  28. API接口:
  29. POST http://localhost:8081/api/rules/executeKnowledge
  30. 请求体:
  31. {
  32. "id": "知识ID",
  33. "input": {
  34. "动态字段名": [...] // 根据知识的inputField字段动态确定,如"transactions"或"resultTag"
  35. }
  36. }
  37. 作者: Big Agent Team
  38. 版本: 1.0.0
  39. 创建时间: 2024-12-19
  40. """
  41. import os
  42. import json
  43. import requests
  44. from datetime import datetime
  45. from typing import Dict, List, Any, Optional
  46. from langchain_openai import ChatOpenAI
  47. from langchain_core.prompts import ChatPromptTemplate
  48. import re
  49. from llmops.config import RULES_ENGINE_BASE_URL
  50. class RulesEngineMetricCalculationAgent:
  51. """规则引擎指标计算Agent"""
  52. def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
  53. # 获取可用的知识元数据
  54. self.available_knowledge = self._load_available_knowledge()
  55. # 加载数据文件映射
  56. self.data_files = self._load_data_files()
  57. # 初始化API调用跟踪
  58. self.api_calls = []
  59. # 加载配置文件
  60. self.configs = {}
  61. def _load_data_files(self) -> Dict[str, str]:
  62. """加载数据文件映射"""
  63. data_files = {}
  64. data_dir = "data_files"
  65. if os.path.exists(data_dir):
  66. for file in os.listdir(data_dir):
  67. if file.endswith('.json'):
  68. try:
  69. # 提取文件名,用于匹配配置文件
  70. key = file.replace('.json', '')
  71. data_files[key] = os.path.join(data_dir, file)
  72. except Exception as e:
  73. print(f"处理数据文件 {file} 失败: {e}")
  74. return data_files
  75. def _select_data_file(self, input_filename: str) -> Optional[str]:
  76. """
  77. 根据输入文件名选择对应的数据文件
  78. Args:
  79. input_filename: 配置文件中的input字段值
  80. Returns:
  81. 数据文件路径,如果找不到则返回None
  82. """
  83. # input字段直接指定数据文件名
  84. if input_filename in self.data_files:
  85. return self.data_files[input_filename]
  86. # 如果找不到精确匹配,尝试模糊匹配
  87. for key, file_path in self.data_files.items():
  88. if input_filename in key or key in input_filename:
  89. return file_path
  90. return None
  91. def _load_table_data(self, data_file_path: str) -> List[Dict[str, Any]]:
  92. """加载数据文件中的JSON数据"""
  93. try:
  94. with open(data_file_path, 'r', encoding='utf-8') as f:
  95. data = json.load(f)
  96. if isinstance(data, list):
  97. return data
  98. elif isinstance(data, dict):
  99. # 如果是字典,尝试提取其中的数组数据
  100. for key, value in data.items():
  101. if isinstance(value, list):
  102. return value
  103. return []
  104. else:
  105. return []
  106. except Exception as e:
  107. print(f"加载数据文件 {data_file_path} 失败: {e}")
  108. return []
  109. def _load_available_knowledge(self) -> List[Dict[str, Any]]:
  110. """
  111. 从规则引擎获取可用的知识元数据
  112. Returns:
  113. 知识元数据列表,包含id、description和inputField
  114. """
  115. try:
  116. url = f"{RULES_ENGINE_BASE_URL}/api/rules/getKnowledgeMeta"
  117. headers = {
  118. "Accept": "*/*",
  119. "Accept-Encoding": "gzip, deflate, br",
  120. "Connection": "keep-alive",
  121. "Content-Type": "application/json",
  122. "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
  123. }
  124. response = requests.post(url, headers=headers, json={}, timeout=30)
  125. if response.status_code == 200:
  126. knowledge_meta = response.json()
  127. if isinstance(knowledge_meta, list):
  128. print(f"✅ 成功获取 {len(knowledge_meta)} 个知识元数据")
  129. return knowledge_meta
  130. else:
  131. print(f"⚠️ 知识元数据格式异常: {knowledge_meta}")
  132. return []
  133. else:
  134. print(f"❌ 获取知识元数据失败,状态码: {response.status_code}")
  135. print(f"响应内容: {response.text}")
  136. return []
  137. except Exception as e:
  138. print(f"❌ 获取知识元数据时发生错误: {str(e)}")
  139. return []
  140. def _get_input_field_for_knowledge(self, knowledge_id: str) -> str:
  141. """
  142. 根据知识ID获取对应的inputField字段名
  143. Args:
  144. knowledge_id: 知识ID
  145. Returns:
  146. inputField字段名,默认为"transactions"
  147. """
  148. for knowledge in self.available_knowledge:
  149. if knowledge.get("id") == knowledge_id:
  150. input_field = knowledge.get("inputField", "transactions")
  151. print(f"🔗 知识 {knowledge_id} 使用输入字段: {input_field}")
  152. return input_field
  153. print(f"⚠️ 未找到知识 {knowledge_id} 的输入字段,使用默认值: transactions")
  154. return "transactions" # 默认值
  155. async def calculate_metrics(self, intent_result: Dict[str, Any], classified_data: List[Dict[str, Any]] = None) -> Dict[str, Any]:
  156. """
  157. 根据意图识别结果进行规则引擎指标计算
  158. Args:
  159. intent_result: 意图识别结果
  160. classified_data: 分类后的数据,如果提供则优先使用,否则从intent_result或文件加载
  161. Returns:
  162. 指标计算结果
  163. """
  164. try:
  165. results = []
  166. target_configs = intent_result.get("target_configs", [])
  167. if not target_configs:
  168. return {
  169. "success": False,
  170. "message": "没有找到需要调用的配置文件",
  171. "results": []
  172. }
  173. for config_name in target_configs:
  174. if config_name in self.configs:
  175. # 使用传统的JSON配置文件方式
  176. config = self.configs[config_name]
  177. result = await self._call_rules_engine_api(config, intent_result, config_name)
  178. results.append({
  179. "config_name": config_name,
  180. "result": result
  181. })
  182. elif config_name.startswith("metric-"):
  183. # 直接使用知识ID调用API,无需配置文件
  184. result = await self._call_rules_engine_api_by_knowledge_id(config_name, intent_result, classified_data)
  185. results.append({
  186. "config_name": config_name,
  187. "result": result
  188. })
  189. else:
  190. results.append({
  191. "config_name": config_name,
  192. "error": f"配置文件 {config_name} 不存在,且不是有效的知识ID"
  193. })
  194. return {
  195. "success": True,
  196. "results": results,
  197. "total_configs": len(target_configs),
  198. "successful_calculations": len([r for r in results if "result" in r])
  199. }
  200. except Exception as e:
  201. print(f"规则引擎指标计算失败: {e}")
  202. return {
  203. "success": False,
  204. "message": f"规则引擎指标计算过程中发生错误: {str(e)}",
  205. "results": []
  206. }
  207. async def _call_rules_engine_api(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
  208. """
  209. 调用规则引擎API
  210. Args:
  211. config: 配置文件
  212. intent_result: 意图识别结果
  213. Returns:
  214. API调用结果
  215. """
  216. try:
  217. # 记录API调用开始
  218. start_time = datetime.now()
  219. # 规则引擎API配置
  220. method = "POST"
  221. url = f"{RULES_ENGINE_BASE_URL}/api/rules/executeKnowledge"
  222. headers = {
  223. "Accept": "*/*",
  224. "Accept-Encoding": "gzip, deflate, br",
  225. "Connection": "keep-alive",
  226. "Content-Type": "application/json",
  227. "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
  228. }
  229. timeout = 180 # 3分钟超时
  230. # 准备请求数据
  231. request_data = self._prepare_rules_engine_request_data(config, intent_result, config_name)
  232. # 调用API
  233. json_data = request_data.get("json", {})
  234. response = requests.post(url, headers=headers, json=json_data, timeout=timeout)
  235. # 处理响应
  236. if response.status_code == 200:
  237. try:
  238. response_data = response.json()
  239. # 记录API调用结果 - 简化版
  240. end_time = datetime.now()
  241. call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  242. api_call_info = {
  243. "call_id": call_id,
  244. "response": response_data
  245. }
  246. self.api_calls.append(api_call_info)
  247. # 保存API结果到文件 - 成功状态
  248. # 使用运行ID创建独立的文件夹
  249. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  250. api_results_dir = f"api_results_{run_id}"
  251. os.makedirs(api_results_dir, exist_ok=True)
  252. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  253. filename = f"{timestamp}_{call_id}_success.json"
  254. filepath = os.path.join(api_results_dir, filename)
  255. try:
  256. with open(filepath, 'w', encoding='utf-8') as f:
  257. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  258. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  259. except Exception as e:
  260. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  261. return {
  262. "success": True,
  263. "data": response_data,
  264. "status_code": response.status_code
  265. }
  266. except json.JSONDecodeError:
  267. # 记录API调用结果(JSON解析失败)- 简化版
  268. end_time = datetime.now()
  269. call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  270. api_call_info = {
  271. "call_id": call_id,
  272. "response": {
  273. "status_code": response.status_code,
  274. "data": response.text,
  275. "error": "JSON解析失败"
  276. }
  277. }
  278. self.api_calls.append(api_call_info)
  279. # 保存API结果到文件 - 失败状态
  280. # 使用运行ID创建独立的文件夹
  281. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  282. api_results_dir = f"api_results_{run_id}"
  283. os.makedirs(api_results_dir, exist_ok=True)
  284. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  285. filename = f"{timestamp}_{call_id}_fail.json"
  286. filepath = os.path.join(api_results_dir, filename)
  287. try:
  288. with open(filepath, 'w', encoding='utf-8') as f:
  289. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  290. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  291. except Exception as e:
  292. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  293. return {
  294. "success": True,
  295. "data": response.text,
  296. "status_code": response.status_code
  297. }
  298. else:
  299. # 记录API调用结果(HTTP错误)- 简化版
  300. end_time = datetime.now()
  301. call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  302. api_call_info = {
  303. "call_id": call_id,
  304. "response": {
  305. "status_code": response.status_code,
  306. "error": response.text
  307. }
  308. }
  309. self.api_calls.append(api_call_info)
  310. # 保存API结果到文件 - 失败状态
  311. # 使用运行ID创建独立的文件夹
  312. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  313. api_results_dir = f"api_results_{run_id}"
  314. os.makedirs(api_results_dir, exist_ok=True)
  315. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  316. filename = f"{timestamp}_{call_id}_fail.json"
  317. filepath = os.path.join(api_results_dir, filename)
  318. try:
  319. with open(filepath, 'w', encoding='utf-8') as f:
  320. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  321. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  322. except Exception as e:
  323. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  324. return {
  325. "success": False,
  326. "message": f"规则引擎API调用失败,状态码: {response.status_code}",
  327. "response": response.text
  328. }
  329. except requests.exceptions.Timeout:
  330. # 记录API调用结果(超时)- 简化版
  331. end_time = datetime.now()
  332. call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  333. api_call_info = {
  334. "call_id": call_id,
  335. "response": {
  336. "error": "API调用超时"
  337. }
  338. }
  339. self.api_calls.append(api_call_info)
  340. # 保存API结果到文件 - 失败状态
  341. # 使用运行ID创建独立的文件夹
  342. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  343. api_results_dir = f"api_results_{run_id}"
  344. os.makedirs(api_results_dir, exist_ok=True)
  345. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  346. filename = f"{timestamp}_{call_id}_fail.json"
  347. filepath = os.path.join(api_results_dir, filename)
  348. try:
  349. with open(filepath, 'w', encoding='utf-8') as f:
  350. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  351. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  352. except Exception as e:
  353. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  354. return {
  355. "success": False,
  356. "message": "规则引擎API调用超时"
  357. }
  358. except requests.exceptions.RequestException as e:
  359. # 记录API调用结果(请求异常)- 简化版
  360. end_time = datetime.now()
  361. call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  362. api_call_info = {
  363. "call_id": call_id,
  364. "response": {
  365. "error": str(e)
  366. }
  367. }
  368. self.api_calls.append(api_call_info)
  369. # 保存API结果到文件 - 失败状态
  370. # 使用运行ID创建独立的文件夹
  371. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  372. api_results_dir = f"api_results_{run_id}"
  373. os.makedirs(api_results_dir, exist_ok=True)
  374. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  375. filename = f"{timestamp}_{call_id}_fail.json"
  376. filepath = os.path.join(api_results_dir, filename)
  377. try:
  378. with open(filepath, 'w', encoding='utf-8') as f:
  379. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  380. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  381. except Exception as e:
  382. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  383. return {
  384. "success": False,
  385. "message": f"规则引擎API调用异常: {str(e)}"
  386. }
  387. except Exception as e:
  388. # 记录API调用结果(其他异常)- 简化版
  389. end_time = datetime.now()
  390. call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  391. api_call_info = {
  392. "call_id": call_id,
  393. "response": {
  394. "error": str(e)
  395. }
  396. }
  397. self.api_calls.append(api_call_info)
  398. # 保存API结果到文件 - 失败状态
  399. # 使用运行ID创建独立的文件夹
  400. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  401. api_results_dir = f"api_results_{run_id}"
  402. os.makedirs(api_results_dir, exist_ok=True)
  403. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  404. filename = f"{timestamp}_{call_id}_fail.json"
  405. filepath = os.path.join(api_results_dir, filename)
  406. try:
  407. with open(filepath, 'w', encoding='utf-8') as f:
  408. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  409. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  410. except Exception as e:
  411. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  412. return {
  413. "success": False,
  414. "message": f"处理规则引擎API调用时发生错误: {str(e)}"
  415. }
  416. def _prepare_rules_engine_request_data(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
  417. """
  418. 准备规则引擎API请求数据
  419. Args:
  420. config: 配置文件
  421. intent_result: 意图识别结果
  422. config_name: 配置文件名
  423. Returns:
  424. 请求数据
  425. """
  426. # 从配置文件中获取id和input
  427. request_id = config.get("id", "")
  428. input_filename = config.get("input", "")
  429. # 加载对应的数据文件
  430. input_data = {}
  431. if input_filename:
  432. data_file_path = self._select_data_file(input_filename)
  433. if data_file_path:
  434. input_data = self._load_table_data(data_file_path)
  435. else:
  436. print(f"警告:找不到配置文件 {config_name} 对应的数据文件: {input_filename}")
  437. # 构造API请求体
  438. request_data = {
  439. "id": request_id,
  440. "input": input_data
  441. }
  442. return {"json": request_data}
  443. async def _call_rules_engine_api_by_knowledge_id(self, knowledge_id: str, intent_result: Dict[str, Any], classified_data: List[Dict[str, Any]] = None) -> Dict[str, Any]:
  444. """
  445. 直接通过知识ID调用规则引擎API
  446. Args:
  447. knowledge_id: 知识ID,如 "metric-分析账户数量"
  448. intent_result: 意图识别结果(用于获取数据文件信息)
  449. classified_data: 分类后的数据,如果提供则优先使用,否则从intent_result或文件加载
  450. Returns:
  451. API调用结果
  452. """
  453. # 记录API调用开始
  454. start_time = datetime.now()
  455. # 规则引擎API配置
  456. method = "POST"
  457. url = f"{RULES_ENGINE_BASE_URL}/api/rules/executeKnowledge"
  458. headers = {
  459. "Accept": "*/*",
  460. "Accept-Encoding": "gzip, deflate, br",
  461. "Connection": "keep-alive",
  462. "Content-Type": "application/json",
  463. "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
  464. }
  465. timeout = 180 # 3分钟超时
  466. try:
  467. # 根据知识ID获取正确的输入字段名
  468. input_field_name = self._get_input_field_for_knowledge(knowledge_id)
  469. # 构造请求数据 - 优先使用传入的分类后数据
  470. input_data = {}
  471. # 检查是否有传入的分类后数据
  472. if classified_data and len(classified_data) > 0:
  473. # 使用传入的分类后数据
  474. print(f" 使用传入的分类后数据,记录数: {len(classified_data)}")
  475. input_data = {input_field_name: classified_data}
  476. else:
  477. # 回退到文件加载方式(向后兼容)
  478. input_filename = intent_result.get("data_file", "加工数据-流水分析-农业打标.json")
  479. print(f" 未找到分类后数据,回退到文件加载: {input_filename}")
  480. if input_filename:
  481. data_file_path = self._select_data_file(input_filename)
  482. if data_file_path:
  483. raw_data = self._load_table_data(data_file_path)
  484. # 使用正确的字段名包装数据
  485. input_data = {input_field_name: raw_data}
  486. else:
  487. print(f"警告:找不到数据文件: {input_filename}")
  488. input_data = {input_field_name: []}
  489. # 构造API请求体
  490. request_data = {
  491. "id": knowledge_id, # 直接使用知识ID
  492. "input": input_data
  493. }
  494. # 调用API
  495. json_data = request_data
  496. response = requests.post(url, headers=headers, json=json_data, timeout=timeout)
  497. # 处理响应(复用现有的响应处理逻辑)
  498. if response.status_code == 200:
  499. try:
  500. response_data = response.json()
  501. # 记录API调用结果 - 简化版
  502. end_time = datetime.now()
  503. call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  504. api_call_info = {
  505. "call_id": call_id,
  506. "response": response_data
  507. }
  508. self.api_calls.append(api_call_info)
  509. # 保存API结果到文件 - 成功状态
  510. # 使用运行ID创建独立的文件夹
  511. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  512. api_results_dir = f"api_results_{run_id}"
  513. os.makedirs(api_results_dir, exist_ok=True)
  514. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  515. filename = f"{timestamp}_{call_id}_success.json"
  516. filepath = os.path.join(api_results_dir, filename)
  517. try:
  518. with open(filepath, 'w', encoding='utf-8') as f:
  519. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  520. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  521. except Exception as e:
  522. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  523. return {
  524. "success": True,
  525. "data": response_data,
  526. "status_code": response.status_code
  527. }
  528. except json.JSONDecodeError:
  529. # 记录JSON解析失败的API调用 - 简化版
  530. end_time = datetime.now()
  531. call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  532. api_call_info = {
  533. "call_id": call_id,
  534. "response": {
  535. "status_code": response.status_code,
  536. "data": response.text,
  537. "error": "JSON解析失败"
  538. }
  539. }
  540. self.api_calls.append(api_call_info)
  541. # 保存API结果到文件 - 失败状态
  542. # 使用运行ID创建独立的文件夹
  543. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  544. api_results_dir = f"api_results_{run_id}"
  545. os.makedirs(api_results_dir, exist_ok=True)
  546. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  547. filename = f"{timestamp}_{call_id}_fail.json"
  548. filepath = os.path.join(api_results_dir, filename)
  549. try:
  550. with open(filepath, 'w', encoding='utf-8') as f:
  551. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  552. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  553. except Exception as e:
  554. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  555. return {
  556. "success": True,
  557. "data": response.text,
  558. "status_code": response.status_code
  559. }
  560. else:
  561. # 记录失败的API调用 - 简化版
  562. end_time = datetime.now()
  563. call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  564. api_call_info = {
  565. "call_id": call_id,
  566. "response": {
  567. "status_code": response.status_code,
  568. "error": response.text
  569. }
  570. }
  571. self.api_calls.append(api_call_info)
  572. # 保存API结果到文件 - 失败状态
  573. # 使用运行ID创建独立的文件夹
  574. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  575. api_results_dir = f"api_results_{run_id}"
  576. os.makedirs(api_results_dir, exist_ok=True)
  577. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  578. filename = f"{timestamp}_{call_id}_fail.json"
  579. filepath = os.path.join(api_results_dir, filename)
  580. try:
  581. with open(filepath, 'w', encoding='utf-8') as f:
  582. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  583. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  584. except Exception as e:
  585. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  586. return {
  587. "success": False,
  588. "message": f"规则引擎API调用失败,状态码: {response.status_code}",
  589. "response": response.text
  590. }
  591. except requests.exceptions.Timeout:
  592. # 记录API调用结果(超时)- 简化版
  593. end_time = datetime.now()
  594. call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  595. api_call_info = {
  596. "call_id": call_id,
  597. "response": {
  598. "error": "API调用超时"
  599. }
  600. }
  601. self.api_calls.append(api_call_info)
  602. # 保存API结果到文件 - 失败状态
  603. # 使用运行ID创建独立的文件夹
  604. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  605. api_results_dir = f"api_results_{run_id}"
  606. os.makedirs(api_results_dir, exist_ok=True)
  607. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  608. filename = f"{timestamp}_{call_id}_fail.json"
  609. filepath = os.path.join(api_results_dir, filename)
  610. try:
  611. with open(filepath, 'w', encoding='utf-8') as f:
  612. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  613. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  614. except Exception as e:
  615. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  616. return {
  617. "success": False,
  618. "message": "规则引擎API调用超时"
  619. }
  620. except requests.exceptions.RequestException as e:
  621. # 记录API调用结果(请求异常)- 简化版
  622. end_time = datetime.now()
  623. call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  624. api_call_info = {
  625. "call_id": call_id,
  626. "response": {
  627. "error": str(e)
  628. }
  629. }
  630. self.api_calls.append(api_call_info)
  631. # 保存API结果到文件 - 失败状态
  632. # 使用运行ID创建独立的文件夹
  633. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  634. api_results_dir = f"api_results_{run_id}"
  635. os.makedirs(api_results_dir, exist_ok=True)
  636. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  637. filename = f"{timestamp}_{call_id}_fail.json"
  638. filepath = os.path.join(api_results_dir, filename)
  639. try:
  640. with open(filepath, 'w', encoding='utf-8') as f:
  641. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  642. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  643. except Exception as e:
  644. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  645. return {
  646. "success": False,
  647. "message": f"规则引擎API调用异常: {str(e)}"
  648. }
  649. except Exception as e:
  650. # 记录API调用结果(其他异常)- 简化版
  651. end_time = datetime.now()
  652. call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  653. api_call_info = {
  654. "call_id": call_id,
  655. "response": {
  656. "error": str(e)
  657. }
  658. }
  659. self.api_calls.append(api_call_info)
  660. # 保存API结果到文件 - 失败状态
  661. # 使用运行ID创建独立的文件夹
  662. run_id = os.environ.get('FLOW_RUN_ID', 'default')
  663. api_results_dir = f"api_results_{run_id}"
  664. os.makedirs(api_results_dir, exist_ok=True)
  665. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  666. filename = f"{timestamp}_{call_id}_fail.json"
  667. filepath = os.path.join(api_results_dir, filename)
  668. try:
  669. with open(filepath, 'w', encoding='utf-8') as f:
  670. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  671. print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
  672. except Exception as e:
  673. print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
  674. return {
  675. "success": False,
  676. "message": f"处理规则引擎API调用时发生错误: {str(e)}"
  677. }