metrics_agent.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. from typing import Dict, Any, List, Optional
  2. from langchain_openai import ChatOpenAI
  3. from pydantic import BaseModel, Field
  4. import pandas as pd
  5. import asyncio
  6. import json
  7. import traceback
  8. import re
  9. import types
  10. from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types
  11. from llmops.agents.datadev.llm import get_llm
  12. class MetricComputationResult(BaseModel):
  13. """指标计算结果"""
  14. metric_id: str
  15. value: Any
  16. status: str
  17. computation_code: str = ""
  18. error: Optional[str] = None
  19. class MetricsCalculator:
  20. """指标计算智能体:安全执行指标计算"""
  21. def __init__(self, llm):
  22. self.llm = llm
  23. def _extract_json_from_response(self, content: str) -> dict:
  24. """从LLM响应中提取JSON,支持Markdown代码块格式"""
  25. content = content.strip()
  26. # 尝试匹配 ```json ... ``` 格式
  27. json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL)
  28. if json_match:
  29. return json.loads(json_match.group(1))
  30. # 尝试匹配 ``` ... ``` 格式
  31. code_match = re.search(r'```\s*(\{.*?\})\s*```', content, re.DOTALL)
  32. if code_match:
  33. return json.loads(code_match.group(1))
  34. # 尝试直接解析(兼容纯JSON格式)
  35. try:
  36. return json.loads(content)
  37. except json.JSONDecodeError:
  38. pass
  39. # 尝试提取第一个 { 和最后一个 } 之间的内容
  40. first_brace = content.find('{')
  41. last_brace = content.rfind('}')
  42. if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
  43. try:
  44. return json.loads(content[first_brace:last_brace + 1])
  45. except json.JSONDecodeError:
  46. pass
  47. raise ValueError(f"无法从响应中提取有效JSON: {content[:200]}...")
  48. def _execute_code_safely(self, code: str, df: pd.DataFrame) -> Any:
  49. """
  50. 核心修复:使用types.FunctionType创建真正的函数作用域
  51. 解决列表推导式中变量不可见的问题
  52. """
  53. # 准备执行环境
  54. globals_dict = {
  55. "pd": pd,
  56. "df": df.copy(),
  57. "__builtins__": {
  58. "abs": abs, "sum": sum, "len": len, "str": str, "int": int, "float": float,
  59. "list": list, "dict": dict, "set": set, "tuple": tuple,
  60. "min": min, "max": max, "round": round, "range": range,
  61. "sorted": sorted, "enumerate": enumerate, "zip": zip,
  62. }
  63. }
  64. # 移除可能存在的危险内置函数
  65. safe_globals = globals_dict.copy()
  66. # 创建函数代码
  67. func_code = f"""
  68. def _compute_metric():
  69. # 用户生成的代码
  70. {code}
  71. # 确保result被定义
  72. return result
  73. """
  74. try:
  75. # 编译代码
  76. compiled = compile(func_code, "<string>", "exec")
  77. # 创建局部命名空间
  78. local_ns = {}
  79. # 执行编译后的代码
  80. exec(compiled, safe_globals, local_ns)
  81. # 获取函数并执行
  82. compute_func = local_ns["_compute_metric"]
  83. # 执行函数并返回结果
  84. result = compute_func()
  85. # 关键:立即转换numpy类型
  86. return convert_numpy_types(result)
  87. except Exception as e:
  88. raise ValueError(f"代码执行失败: {str(e)}\n代码内容: {code}")
  89. async def compute_metric(
  90. self,
  91. metric: MetricRequirement,
  92. df: pd.DataFrame
  93. ) -> MetricComputationResult:
  94. """动态计算单个指标(最终修复版)"""
  95. available_fields = df.columns.tolist()
  96. # 内置常用指标模板,提高成功率
  97. builtin_templates = {
  98. "total_income": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()",
  99. "total_expense": "df_filtered = df[df['txDirection']=='支出']; result = abs(df_filtered['txAmount'].sum())",
  100. "net_income": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
  101. "net_profit": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
  102. "balance_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_daily = df.groupby(df['txDate'].dt.date)['txBalance'].last().reset_index(); result = {'dates': df_daily['txDate'].astype(str).tolist(), 'balances': df_daily['txBalance'].tolist()}",
  103. "top_income_sources": "df_income = df[df['txDirection']=='收入']; top_client = df_income.groupby('txCounterparty')['txAmount'].sum().idxmax(); result = {top_client: df_income.groupby('txCounterparty')['txAmount'].sum().max()}",
  104. "top_expense_categories": "df_expense = df[df['txDirection']=='支出']; top_cat = df_expense.groupby('txSummary')['txAmount'].sum().idxmax(); result = {top_cat: df_expense.groupby('txSummary')['txAmount'].sum().max()}",
  105. "income_sources_breakdown": "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()",
  106. "expense_categories_breakdown": "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()",
  107. "monthly_income_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_income = df[df['txDirection']=='收入']; df_income['month'] = df_income['txDate'].dt.to_period('M'); result = df_income.groupby('month')['txAmount'].sum().to_dict()",
  108. "monthly_expense_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_expense = df[df['txDirection']=='支出']; df_expense['month'] = df_expense['txDate'].dt.to_period('M'); result = df_expense.groupby('month')['txAmount'].sum().to_dict()",
  109. }
  110. # 如果是指标ID在模板中,直接使用模板
  111. if metric.metric_id in builtin_templates:
  112. code = builtin_templates[metric.metric_id]
  113. print(f" 📦 使用内置模板: {metric.metric_id}")
  114. else:
  115. # 否则调用LLM生成代码
  116. prompt = f"""你是数据分析执行器。根据计算逻辑,生成并执行 Pandas/Python 代码。
  117. 数据信息:
  118. - 字段: {available_fields}
  119. - 行数: {len(df)}
  120. - 指标名称: {metric.metric_name}
  121. - 指标ID: {metric.metric_id}
  122. - 计算逻辑: {metric.calculation_logic}
  123. 要求:
  124. 1. 生成可执行的 Pandas/Python 代码(只操作 df 变量)
  125. 2. 代码必须安全,禁止系统调用
  126. 3. **最终结果必须赋值给变量 `result`**(这是强制要求)
  127. 4. `result` 可以是:数值、列表、字典、DataFrame
  128. 5. 如果无法计算,将 error 字段设为具体原因
  129. **关键约束**:
  130. - 代码最后一行必须是 `result = ...`
  131. - 确保 `result` 变量在执行后被定义
  132. - 不要包含任何解释性文字,只输出JSON
  133. - **重要**:避免在列表推导式中引用外部变量,所有变量必须在同一作用域定义
  134. 输出格式(必须返回纯JSON,不要Markdown代码块):
  135. {{
  136. "metric_id": "{metric.metric_id}",
  137. "value": <计算结果>,
  138. "status": "success",
  139. "computation_code": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()"
  140. }}
  141. 如果无法计算:
  142. {{
  143. "metric_id": "{metric.metric_id}",
  144. "value": null,
  145. "status": "error",
  146. "error": "具体错误原因",
  147. "computation_code": ""
  148. }}
  149. **必须确保代码执行后,`result` 变量被定义!**"""
  150. try:
  151. # 调用 LLM
  152. response = await self.llm.ainvoke(prompt)
  153. print(f"\n 🤖 LLM 原始响应: {response.content[:200]}...")
  154. # 使用增强的JSON解析
  155. response_dict = self._extract_json_from_response(response.content)
  156. # 提取代码
  157. code = response_dict.get("computation_code", "")
  158. print(f" 💻 生成代码: {code}")
  159. if not code:
  160. # LLM无法生成代码,尝试使用简化模板
  161. if "收入" in metric.metric_name and "分类" in metric.metric_name:
  162. code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txSummary')['txAmount'].sum().to_dict()"
  163. elif "支出" in metric.metric_name and "分类" in metric.metric_name:
  164. code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
  165. elif "收入" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
  166. code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()"
  167. elif "支出" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
  168. code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
  169. else:
  170. raise ValueError("LLM 未生成计算代码,且无法匹配内置模板")
  171. except Exception as e:
  172. # 如果LLM调用失败,使用最简单模板
  173. print(f" ⚠️ LLM调用失败,使用回退模板: {e}")
  174. if "收入" in metric.metric_name:
  175. code = "df_income = df[df['txDirection']=='收入']; result = df_income['txAmount'].sum()"
  176. elif "支出" in metric.metric_name:
  177. code = "df_expense = df[df['txDirection']=='支出']; result = abs(df_expense['txAmount'].sum())"
  178. else:
  179. code = "result = None"
  180. try:
  181. # 使用重构的执行器
  182. computed_value = self._execute_code_safely(code, df)
  183. print(f" ✅ 计算结果: {computed_value}")
  184. return MetricComputationResult(
  185. metric_id=metric.metric_id,
  186. value=computed_value,
  187. status="success",
  188. computation_code=code
  189. )
  190. except Exception as e:
  191. # 捕获所有异常并记录详细信息
  192. error_msg = f"指标 {metric.metric_id} 计算失败: {str(e)}"
  193. print(f" ❌ {error_msg}")
  194. return MetricComputationResult(
  195. metric_id=metric.metric_id,
  196. value=None,
  197. status="failed",
  198. error=error_msg,
  199. computation_code=code
  200. )
  201. async def compute_batch(
  202. self,
  203. metrics: List[MetricRequirement],
  204. df: pd.DataFrame,
  205. max_concurrent: int = 3
  206. ) -> List[MetricComputationResult]:
  207. """批量计算指标(限制并发)"""
  208. semaphore = asyncio.Semaphore(max_concurrent)
  209. async def compute_with_semaphore(metric):
  210. async with semaphore:
  211. return await self.compute_metric(metric, df)
  212. tasks = [compute_with_semaphore(m) for m in metrics]
  213. return await asyncio.gather(*tasks)
  214. async def metrics_node(state: AgentState) -> AgentState:
  215. """指标计算节点:正确提取待计算指标并增强调试"""
  216. if not state["data_set"]:
  217. new_state = state.copy()
  218. new_state["messages"].append(("ai", "❌ 错误:数据集为空"))
  219. # 关键修复:返回前清理状态
  220. return convert_numpy_types(new_state)
  221. df = pd.DataFrame(state["data_set"])
  222. # 提取待计算指标
  223. pending_ids = state.get("pending_metric_ids", [])
  224. computed_ids = set(state["computed_metrics"].keys())
  225. required_metrics = state["metrics_requirements"]
  226. pending_metrics = [
  227. m for m in required_metrics
  228. if m.metric_id not in computed_ids
  229. ]
  230. if pending_ids:
  231. valid_ids = [m.metric_id for m in pending_metrics]
  232. pending_metrics = [
  233. m for m in pending_metrics
  234. if m.metric_id in pending_ids and m.metric_id in valid_ids
  235. ]
  236. if not pending_metrics:
  237. coverage = len(computed_ids) / len(required_metrics) if required_metrics else 1
  238. if coverage >= 1.0:
  239. new_state = state.copy()
  240. new_state["messages"].append(("ai", "✅ 所有指标已计算完成"))
  241. new_state["completeness_score"] = 1.0
  242. new_state["is_complete"] = True
  243. new_state["next_route"] = "report_compiler"
  244. # 关键修复:返回前清理状态
  245. return convert_numpy_types(new_state)
  246. # 重新计算
  247. pending_metrics = [
  248. m for m in required_metrics
  249. if m.metric_id not in computed_ids
  250. ]
  251. if not pending_metrics:
  252. raise ValueError("无法提取待计算指标")
  253. print(f"\n📊 待计算指标: {[m.metric_id for m in pending_metrics]}")
  254. # 限制单次计算数量
  255. batch_size = 5
  256. metrics_to_compute = pending_metrics[:batch_size]
  257. print(f"🧮 本次计算 {len(metrics_to_compute)} 个指标")
  258. # 批量计算
  259. llm = get_llm()
  260. calculator = MetricsCalculator(llm)
  261. results = await calculator.compute_batch(metrics_to_compute, df)
  262. # 更新状态
  263. new_state = state.copy()
  264. success_count = 0
  265. for r in results:
  266. if r.status == "success":
  267. new_state["computed_metrics"][r.metric_id] = r.value
  268. success_count += 1
  269. else:
  270. print(f"⚠️ 指标 {r.metric_id} 失败: {r.error[:100]}...")
  271. new_state["messages"].append(
  272. ("ai", f"⚠️ 指标计算失败 {r.metric_id}: {r.error[:100]}...")
  273. )
  274. # 记录失败次数
  275. if "failed_metric_attempts" not in new_state:
  276. new_state["failed_metric_attempts"] = {}
  277. new_state["failed_metric_attempts"][r.metric_id] = new_state.get("failed_metric_attempts", {}).get(
  278. r.metric_id, 0) + 1
  279. # 更新待计算指标列表
  280. remaining_ids = [m.metric_id for m in pending_metrics[success_count:]]
  281. new_state["pending_metric_ids"] = remaining_ids
  282. # 新增:记录失败指标,避免无限重试
  283. if success_count == 0 and len(metrics_to_compute) > 0:
  284. # 如果全部失败,标记为跳过
  285. new_state["messages"].append(
  286. ("ai", f"⚠️ {len(metrics_to_compute)}个指标全部计算失败,将跳过这些指标")
  287. )
  288. # 从待计算列表中移除这些失败的指标
  289. failed_ids = [m.metric_id for m in metrics_to_compute]
  290. new_state["pending_metric_ids"] = [mid for mid in remaining_ids if mid not in failed_ids]
  291. print(f"✅ 成功 {success_count}/{len(results)},剩余 {len(new_state['pending_metric_ids'])} 个指标")
  292. # 重新评估覆盖率
  293. coverage = len(new_state["computed_metrics"]) / len(required_metrics)
  294. new_state["completeness_score"] = coverage
  295. new_state["messages"].append(
  296. ("ai", f"🧮 计算完成 {success_count}/{len(results)} 个指标,覆盖率 {coverage:.2%}")
  297. )
  298. # 关键修复:返回前清理状态
  299. return convert_numpy_types(new_state)