| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890 |
- """
- 规则引擎指标计算Agent (Rules Engine Metric Calculation Agent)
- ===========================================================
- 此Agent负责根据意图识别结果执行规则引擎模式的指标计算任务。
- 核心功能:
- 1. 配置文件加载:读取和解析规则引擎指标计算配置文件
- 2. API调用管理:根据配置文件调用规则引擎API
- 3. 结果处理:处理API返回的数据,提取关键指标
- 4. 错误处理:处理API调用失败、网络异常等错误情况
- 5. 结果验证:验证计算结果的合理性和完整性
- 工作流程:
- 1. 接收意图识别结果和用户参数
- 2. 加载对应的规则引擎指标计算配置文件
- 3. 构造API请求参数(id和input)
- 4. 调用远程规则引擎服务
- 5. 解析和验证返回结果
- 6. 返回结构化的计算结果
- 技术实现:
- - 支持动态加载JSON配置文件
- - 使用requests库进行HTTP API调用
- - 集成LangChain用于复杂计算逻辑(可选)
- - 完善的错误处理和超时机制
- - 支持多种计算方法(标准、高级、自定义)
- 配置文件结构:
- - id: 规则引擎执行ID
- - input: 数据文件路径
- - description: 规则描述
- API接口:
- POST http://localhost:8081/api/rules/executeKnowledge
- 请求体:
- {
- "id": "知识ID",
- "input": {
- "动态字段名": [...] // 根据知识的inputField字段动态确定,如"transactions"或"resultTag"
- }
- }
- 作者: Big Agent Team
- 版本: 1.0.0
- 创建时间: 2024-12-19
- """
- import os
- import json
- import requests
- from datetime import datetime
- from typing import Dict, List, Any, Optional
- from langchain_openai import ChatOpenAI
- from langchain_core.prompts import ChatPromptTemplate
- import re
- from llmops.config import RULES_ENGINE_BASE_URL
- class RulesEngineMetricCalculationAgent:
- """规则引擎指标计算Agent"""
- def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
- # 获取可用的知识元数据
- self.available_knowledge = self._load_available_knowledge()
- # 加载数据文件映射
- self.data_files = self._load_data_files()
- # 初始化API调用跟踪
- self.api_calls = []
- # 加载配置文件
- self.configs = {}
- def _load_data_files(self) -> Dict[str, str]:
- """加载数据文件映射"""
- data_files = {}
- data_dir = "data_files"
- if os.path.exists(data_dir):
- for file in os.listdir(data_dir):
- if file.endswith('.json'):
- try:
- # 提取文件名,用于匹配配置文件
- key = file.replace('.json', '')
- data_files[key] = os.path.join(data_dir, file)
- except Exception as e:
- print(f"处理数据文件 {file} 失败: {e}")
- return data_files
- def _select_data_file(self, input_filename: str) -> Optional[str]:
- """
- 根据输入文件名选择对应的数据文件
- Args:
- input_filename: 配置文件中的input字段值
- Returns:
- 数据文件路径,如果找不到则返回None
- """
- # input字段直接指定数据文件名
- if input_filename in self.data_files:
- return self.data_files[input_filename]
- # 如果找不到精确匹配,尝试模糊匹配
- for key, file_path in self.data_files.items():
- if input_filename in key or key in input_filename:
- return file_path
- return None
- def _load_table_data(self, data_file_path: str) -> List[Dict[str, Any]]:
- """加载数据文件中的JSON数据"""
- try:
- with open(data_file_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- if isinstance(data, list):
- return data
- elif isinstance(data, dict):
- # 如果是字典,尝试提取其中的数组数据
- for key, value in data.items():
- if isinstance(value, list):
- return value
- return []
- else:
- return []
- except Exception as e:
- print(f"加载数据文件 {data_file_path} 失败: {e}")
- return []
- def _load_available_knowledge(self) -> List[Dict[str, Any]]:
- """
- 从规则引擎获取可用的知识元数据
- Returns:
- 知识元数据列表,包含id、description和inputField
- """
- try:
- url = f"{RULES_ENGINE_BASE_URL}/api/rules/getKnowledgeMeta"
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
- }
- response = requests.post(url, headers=headers, json={}, timeout=30)
- if response.status_code == 200:
- knowledge_meta = response.json()
- if isinstance(knowledge_meta, list):
- print(f"✅ 成功获取 {len(knowledge_meta)} 个知识元数据")
- return knowledge_meta
- else:
- print(f"⚠️ 知识元数据格式异常: {knowledge_meta}")
- return []
- else:
- print(f"❌ 获取知识元数据失败,状态码: {response.status_code}")
- print(f"响应内容: {response.text}")
- return []
- except Exception as e:
- print(f"❌ 获取知识元数据时发生错误: {str(e)}")
- return []
- def _get_input_field_for_knowledge(self, knowledge_id: str) -> str:
- """
- 根据知识ID获取对应的inputField字段名
- Args:
- knowledge_id: 知识ID
- Returns:
- inputField字段名,默认为"transactions"
- """
- for knowledge in self.available_knowledge:
- if knowledge.get("id") == knowledge_id:
- input_field = knowledge.get("inputField", "transactions")
- print(f"🔗 知识 {knowledge_id} 使用输入字段: {input_field}")
- return input_field
- print(f"⚠️ 未找到知识 {knowledge_id} 的输入字段,使用默认值: transactions")
- return "transactions" # 默认值
- async def calculate_metrics(self, intent_result: Dict[str, Any], classified_data: List[Dict[str, Any]] = None) -> Dict[str, Any]:
- """
- 根据意图识别结果进行规则引擎指标计算
- Args:
- intent_result: 意图识别结果
- classified_data: 分类后的数据,如果提供则优先使用,否则从intent_result或文件加载
- Returns:
- 指标计算结果
- """
- try:
- results = []
- target_configs = intent_result.get("target_configs", [])
- if not target_configs:
- return {
- "success": False,
- "message": "没有找到需要调用的配置文件",
- "results": []
- }
- for config_name in target_configs:
- if config_name in self.configs:
- # 使用传统的JSON配置文件方式
- config = self.configs[config_name]
- result = await self._call_rules_engine_api(config, intent_result, config_name)
- results.append({
- "config_name": config_name,
- "result": result
- })
- elif config_name.startswith("metric-"):
- # 直接使用知识ID调用API,无需配置文件
- result = await self._call_rules_engine_api_by_knowledge_id(config_name, intent_result, classified_data)
- results.append({
- "config_name": config_name,
- "result": result
- })
- else:
- results.append({
- "config_name": config_name,
- "error": f"配置文件 {config_name} 不存在,且不是有效的知识ID"
- })
- return {
- "success": True,
- "results": results,
- "total_configs": len(target_configs),
- "successful_calculations": len([r for r in results if "result" in r])
- }
- except Exception as e:
- print(f"规则引擎指标计算失败: {e}")
- return {
- "success": False,
- "message": f"规则引擎指标计算过程中发生错误: {str(e)}",
- "results": []
- }
- async def _call_rules_engine_api(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
- """
- 调用规则引擎API
- Args:
- config: 配置文件
- intent_result: 意图识别结果
- Returns:
- API调用结果
- """
- try:
- # 记录API调用开始
- start_time = datetime.now()
- # 规则引擎API配置
- method = "POST"
- url = f"{RULES_ENGINE_BASE_URL}/api/rules/executeKnowledge"
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
- }
- timeout = 180 # 3分钟超时
- # 准备请求数据
- request_data = self._prepare_rules_engine_request_data(config, intent_result, config_name)
- # 调用API
- json_data = request_data.get("json", {})
- response = requests.post(url, headers=headers, json=json_data, timeout=timeout)
- # 处理响应
- if response.status_code == 200:
- try:
- response_data = response.json()
- # 记录API调用结果
- end_time = datetime.now()
- call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": config_name,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data,
- "start_time": start_time.isoformat()
- },
- "response": {
- "status_code": response.status_code,
- "data": response_data,
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": True
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{timestamp}_{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": True,
- "data": response_data,
- "status_code": response.status_code
- }
- except json.JSONDecodeError:
- # 记录API调用结果(JSON解析失败)
- end_time = datetime.now()
- call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": config_name,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data,
- "start_time": start_time.isoformat()
- },
- "response": {
- "status_code": response.status_code,
- "data": response.text,
- "error": "JSON解析失败",
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{timestamp}_{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": True,
- "data": response.text,
- "status_code": response.status_code
- }
- else:
- # 记录API调用结果(HTTP错误)
- end_time = datetime.now()
- call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": config_name,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data,
- "start_time": start_time.isoformat()
- },
- "response": {
- "status_code": response.status_code,
- "error": response.text,
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- filename = f"{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": False,
- "message": f"规则引擎API调用失败,状态码: {response.status_code}",
- "response": response.text
- }
- except requests.exceptions.Timeout:
- # 记录API调用结果(超时)
- end_time = datetime.now()
- call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": config_name,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data if 'json_data' in locals() else None,
- "start_time": start_time.isoformat()
- },
- "response": {
- "error": "API调用超时",
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- return {
- "success": False,
- "message": "规则引擎API调用超时"
- }
- except requests.exceptions.RequestException as e:
- # 记录API调用结果(请求异常)
- end_time = datetime.now()
- call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": config_name,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data if 'json_data' in locals() else None,
- "start_time": start_time.isoformat()
- },
- "response": {
- "error": str(e),
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- return {
- "success": False,
- "message": f"规则引擎API调用异常: {str(e)}"
- }
- except Exception as e:
- # 记录API调用结果(其他异常)
- end_time = datetime.now()
- call_id = f"rules_api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": config_name,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data if 'json_data' in locals() else None,
- "start_time": start_time.isoformat()
- },
- "response": {
- "error": str(e),
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- return {
- "success": False,
- "message": f"处理规则引擎API调用时发生错误: {str(e)}"
- }
- def _prepare_rules_engine_request_data(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
- """
- 准备规则引擎API请求数据
- Args:
- config: 配置文件
- intent_result: 意图识别结果
- config_name: 配置文件名
- Returns:
- 请求数据
- """
- # 从配置文件中获取id和input
- request_id = config.get("id", "")
- input_filename = config.get("input", "")
- # 加载对应的数据文件
- input_data = {}
- if input_filename:
- data_file_path = self._select_data_file(input_filename)
- if data_file_path:
- input_data = self._load_table_data(data_file_path)
- else:
- print(f"警告:找不到配置文件 {config_name} 对应的数据文件: {input_filename}")
- # 构造API请求体
- request_data = {
- "id": request_id,
- "input": input_data
- }
- return {"json": request_data}
- async def _call_rules_engine_api_by_knowledge_id(self, knowledge_id: str, intent_result: Dict[str, Any], classified_data: List[Dict[str, Any]] = None) -> Dict[str, Any]:
- """
- 直接通过知识ID调用规则引擎API
- Args:
- knowledge_id: 知识ID,如 "metric-分析账户数量"
- intent_result: 意图识别结果(用于获取数据文件信息)
- classified_data: 分类后的数据,如果提供则优先使用,否则从intent_result或文件加载
- Returns:
- API调用结果
- """
- # 记录API调用开始
- start_time = datetime.now()
- # 规则引擎API配置
- method = "POST"
- url = f"{RULES_ENGINE_BASE_URL}/api/rules/executeKnowledge"
- headers = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
- }
- timeout = 180 # 3分钟超时
- try:
- # 根据知识ID获取正确的输入字段名
- input_field_name = self._get_input_field_for_knowledge(knowledge_id)
- # 构造请求数据 - 优先使用传入的分类后数据
- input_data = {}
- # 检查是否有传入的分类后数据
- if classified_data and len(classified_data) > 0:
- # 使用传入的分类后数据
- print(f" 使用传入的分类后数据,记录数: {len(classified_data)}")
- input_data = {input_field_name: classified_data}
- else:
- # 回退到文件加载方式(向后兼容)
- input_filename = intent_result.get("data_file", "加工数据-流水分析-农业打标.json")
- print(f" 未找到分类后数据,回退到文件加载: {input_filename}")
- if input_filename:
- data_file_path = self._select_data_file(input_filename)
- if data_file_path:
- raw_data = self._load_table_data(data_file_path)
- # 使用正确的字段名包装数据
- input_data = {input_field_name: raw_data}
- else:
- print(f"警告:找不到数据文件: {input_filename}")
- input_data = {input_field_name: []}
- # 构造API请求体
- request_data = {
- "id": knowledge_id, # 直接使用知识ID
- "input": input_data
- }
- # 调用API
- json_data = request_data
- response = requests.post(url, headers=headers, json=json_data, timeout=timeout)
- # 处理响应(复用现有的响应处理逻辑)
- if response.status_code == 200:
- try:
- response_data = response.json()
- # 记录API调用结果
- end_time = datetime.now()
- call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": knowledge_id,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data,
- "start_time": start_time.isoformat()
- },
- "response": {
- "status_code": response.status_code,
- "data": response_data,
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": True
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{timestamp}_{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": True,
- "data": response_data,
- "status_code": response.status_code
- }
- except json.JSONDecodeError:
- # 记录JSON解析失败的API调用
- end_time = datetime.now()
- call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": knowledge_id,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data,
- "start_time": start_time.isoformat()
- },
- "response": {
- "status_code": response.status_code,
- "data": response.text,
- "error": "JSON解析失败",
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{timestamp}_{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": True,
- "data": response.text,
- "status_code": response.status_code
- }
- else:
- # 记录失败的API调用
- end_time = datetime.now()
- call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": knowledge_id,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data,
- "start_time": start_time.isoformat()
- },
- "response": {
- "status_code": response.status_code,
- "error": response.text,
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- filename = f"{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": False,
- "message": f"规则引擎API调用失败,状态码: {response.status_code}",
- "response": response.text
- }
- except requests.exceptions.Timeout:
- # 记录API调用结果(超时)
- end_time = datetime.now()
- call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": knowledge_id,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data if 'json_data' in locals() else None,
- "start_time": start_time.isoformat()
- },
- "response": {
- "error": "API调用超时",
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- filename = f"{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": False,
- "message": "规则引擎API调用超时"
- }
- except requests.exceptions.RequestException as e:
- # 记录API调用结果(请求异常)
- end_time = datetime.now()
- call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": knowledge_id,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data if 'json_data' in locals() else None,
- "start_time": start_time.isoformat()
- },
- "response": {
- "error": str(e),
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- filename = f"{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": False,
- "message": f"规则引擎API调用异常: {str(e)}"
- }
- except Exception as e:
- # 记录API调用结果(其他异常)
- end_time = datetime.now()
- call_id = f"rules_api_{knowledge_id}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "RulesEngineMetricCalculationAgent",
- "api_endpoint": url,
- "config_name": knowledge_id,
- "request": {
- "method": method,
- "url": url,
- "headers": headers,
- "json_data": json_data if 'json_data' in locals() else None,
- "start_time": start_time.isoformat()
- },
- "response": {
- "error": str(e),
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": False
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- filename = f"{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[RULES_API_RESULT] 保存规则引擎API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存规则引擎API结果文件失败: {filepath}, 错误: {str(e)}")
- return {
- "success": False,
- "message": f"处理规则引擎API调用时发生错误: {str(e)}"
- }
|