from typing import Dict, Any, List, Optional from langchain_openai import ChatOpenAI from pydantic import BaseModel, Field import pandas as pd import asyncio import json import traceback import re import types from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types from llmops.agents.datadev.llm import get_llm class MetricComputationResult(BaseModel): """指标计算结果""" metric_id: str value: Any status: str computation_code: str = "" error: Optional[str] = None class MetricsCalculator: """指标计算智能体:安全执行指标计算""" def __init__(self, llm): self.llm = llm def _extract_json_from_response(self, content: str) -> dict: """从LLM响应中提取JSON,支持Markdown代码块格式""" content = content.strip() # 尝试匹配 ```json ... ``` 格式 json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL) if json_match: return json.loads(json_match.group(1)) # 尝试匹配 ``` ... ``` 格式 code_match = re.search(r'```\s*(\{.*?\})\s*```', content, re.DOTALL) if code_match: return json.loads(code_match.group(1)) # 尝试直接解析(兼容纯JSON格式) try: return json.loads(content) except json.JSONDecodeError: pass # 尝试提取第一个 { 和最后一个 } 之间的内容 first_brace = content.find('{') last_brace = content.rfind('}') if first_brace != -1 and last_brace != -1 and last_brace > first_brace: try: return json.loads(content[first_brace:last_brace + 1]) except json.JSONDecodeError: pass raise ValueError(f"无法从响应中提取有效JSON: {content[:200]}...") def _execute_code_safely(self, code: str, df: pd.DataFrame) -> Any: """ 核心修复:使用types.FunctionType创建真正的函数作用域 解决列表推导式中变量不可见的问题 """ # 准备执行环境 globals_dict = { "pd": pd, "df": df.copy(), "__builtins__": { "abs": abs, "sum": sum, "len": len, "str": str, "int": int, "float": float, "list": list, "dict": dict, "set": set, "tuple": tuple, "min": min, "max": max, "round": round, "range": range, "sorted": sorted, "enumerate": enumerate, "zip": zip, } } # 移除可能存在的危险内置函数 safe_globals = globals_dict.copy() # 创建函数代码 func_code = f""" def _compute_metric(): # 用户生成的代码 {code} # 确保result被定义 return result """ try: # 编译代码 compiled = compile(func_code, "", "exec") # 创建局部命名空间 local_ns = {} # 执行编译后的代码 exec(compiled, safe_globals, local_ns) # 获取函数并执行 compute_func = local_ns["_compute_metric"] # 执行函数并返回结果 result = compute_func() # 关键:立即转换numpy类型 return convert_numpy_types(result) except Exception as e: raise ValueError(f"代码执行失败: {str(e)}\n代码内容: {code}") async def compute_metric( self, metric: MetricRequirement, df: pd.DataFrame ) -> MetricComputationResult: """动态计算单个指标(最终修复版)""" available_fields = df.columns.tolist() # 内置常用指标模板,提高成功率 builtin_templates = { "total_income": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()", "total_expense": "df_filtered = df[df['txDirection']=='支出']; result = abs(df_filtered['txAmount'].sum())", "net_income": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense", "net_profit": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense", "balance_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_daily = df.groupby(df['txDate'].dt.date)['txBalance'].last().reset_index(); result = {'dates': df_daily['txDate'].astype(str).tolist(), 'balances': df_daily['txBalance'].tolist()}", "top_income_sources": "df_income = df[df['txDirection']=='收入']; top_client = df_income.groupby('txCounterparty')['txAmount'].sum().idxmax(); result = {top_client: df_income.groupby('txCounterparty')['txAmount'].sum().max()}", "top_expense_categories": "df_expense = df[df['txDirection']=='支出']; top_cat = df_expense.groupby('txSummary')['txAmount'].sum().idxmax(); result = {top_cat: df_expense.groupby('txSummary')['txAmount'].sum().max()}", "income_sources_breakdown": "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()", "expense_categories_breakdown": "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()", "monthly_income_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_income = df[df['txDirection']=='收入']; df_income['month'] = df_income['txDate'].dt.to_period('M'); result = df_income.groupby('month')['txAmount'].sum().to_dict()", "monthly_expense_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_expense = df[df['txDirection']=='支出']; df_expense['month'] = df_expense['txDate'].dt.to_period('M'); result = df_expense.groupby('month')['txAmount'].sum().to_dict()", } # 如果是指标ID在模板中,直接使用模板 if metric.metric_id in builtin_templates: code = builtin_templates[metric.metric_id] print(f" 📦 使用内置模板: {metric.metric_id}") else: # 否则调用LLM生成代码 prompt = f"""你是数据分析执行器。根据计算逻辑,生成并执行 Pandas/Python 代码。 数据信息: - 字段: {available_fields} - 行数: {len(df)} - 指标名称: {metric.metric_name} - 指标ID: {metric.metric_id} - 计算逻辑: {metric.calculation_logic} 要求: 1. 生成可执行的 Pandas/Python 代码(只操作 df 变量) 2. 代码必须安全,禁止系统调用 3. **最终结果必须赋值给变量 `result`**(这是强制要求) 4. `result` 可以是:数值、列表、字典、DataFrame 5. 如果无法计算,将 error 字段设为具体原因 **关键约束**: - 代码最后一行必须是 `result = ...` - 确保 `result` 变量在执行后被定义 - 不要包含任何解释性文字,只输出JSON - **重要**:避免在列表推导式中引用外部变量,所有变量必须在同一作用域定义 输出格式(必须返回纯JSON,不要Markdown代码块): {{ "metric_id": "{metric.metric_id}", "value": <计算结果>, "status": "success", "computation_code": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()" }} 如果无法计算: {{ "metric_id": "{metric.metric_id}", "value": null, "status": "error", "error": "具体错误原因", "computation_code": "" }} **必须确保代码执行后,`result` 变量被定义!**""" try: # 调用 LLM response = await self.llm.ainvoke(prompt) print(f"\n 🤖 LLM 原始响应: {response.content[:200]}...") # 使用增强的JSON解析 response_dict = self._extract_json_from_response(response.content) # 提取代码 code = response_dict.get("computation_code", "") print(f" 💻 生成代码: {code}") if not code: # LLM无法生成代码,尝试使用简化模板 if "收入" in metric.metric_name and "分类" in metric.metric_name: code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txSummary')['txAmount'].sum().to_dict()" elif "支出" in metric.metric_name and "分类" in metric.metric_name: code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()" elif "收入" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name): code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()" elif "支出" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name): code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()" else: raise ValueError("LLM 未生成计算代码,且无法匹配内置模板") except Exception as e: # 如果LLM调用失败,使用最简单模板 print(f" ⚠️ LLM调用失败,使用回退模板: {e}") if "收入" in metric.metric_name: code = "df_income = df[df['txDirection']=='收入']; result = df_income['txAmount'].sum()" elif "支出" in metric.metric_name: code = "df_expense = df[df['txDirection']=='支出']; result = abs(df_expense['txAmount'].sum())" else: code = "result = None" try: # 使用重构的执行器 computed_value = self._execute_code_safely(code, df) print(f" ✅ 计算结果: {computed_value}") return MetricComputationResult( metric_id=metric.metric_id, value=computed_value, status="success", computation_code=code ) except Exception as e: # 捕获所有异常并记录详细信息 error_msg = f"指标 {metric.metric_id} 计算失败: {str(e)}" print(f" ❌ {error_msg}") return MetricComputationResult( metric_id=metric.metric_id, value=None, status="failed", error=error_msg, computation_code=code ) async def compute_batch( self, metrics: List[MetricRequirement], df: pd.DataFrame, max_concurrent: int = 3 ) -> List[MetricComputationResult]: """批量计算指标(限制并发)""" semaphore = asyncio.Semaphore(max_concurrent) async def compute_with_semaphore(metric): async with semaphore: return await self.compute_metric(metric, df) tasks = [compute_with_semaphore(m) for m in metrics] return await asyncio.gather(*tasks) async def metrics_node(state: AgentState) -> AgentState: """指标计算节点:正确提取待计算指标并增强调试""" if not state["data_set"]: new_state = state.copy() new_state["messages"].append(("ai", "❌ 错误:数据集为空")) # 关键修复:返回前清理状态 return convert_numpy_types(new_state) df = pd.DataFrame(state["data_set"]) # 提取待计算指标 pending_ids = state.get("pending_metric_ids", []) computed_ids = set(state["computed_metrics"].keys()) required_metrics = state["metrics_requirements"] pending_metrics = [ m for m in required_metrics if m.metric_id not in computed_ids ] if pending_ids: valid_ids = [m.metric_id for m in pending_metrics] pending_metrics = [ m for m in pending_metrics if m.metric_id in pending_ids and m.metric_id in valid_ids ] if not pending_metrics: coverage = len(computed_ids) / len(required_metrics) if required_metrics else 1 if coverage >= 1.0: new_state = state.copy() new_state["messages"].append(("ai", "✅ 所有指标已计算完成")) new_state["completeness_score"] = 1.0 new_state["is_complete"] = True new_state["next_route"] = "report_compiler" # 关键修复:返回前清理状态 return convert_numpy_types(new_state) # 重新计算 pending_metrics = [ m for m in required_metrics if m.metric_id not in computed_ids ] if not pending_metrics: raise ValueError("无法提取待计算指标") print(f"\n📊 待计算指标: {[m.metric_id for m in pending_metrics]}") # 限制单次计算数量 batch_size = 5 metrics_to_compute = pending_metrics[:batch_size] print(f"🧮 本次计算 {len(metrics_to_compute)} 个指标") # 批量计算 llm = get_llm() calculator = MetricsCalculator(llm) results = await calculator.compute_batch(metrics_to_compute, df) # 更新状态 new_state = state.copy() success_count = 0 for r in results: if r.status == "success": new_state["computed_metrics"][r.metric_id] = r.value success_count += 1 else: print(f"⚠️ 指标 {r.metric_id} 失败: {r.error[:100]}...") new_state["messages"].append( ("ai", f"⚠️ 指标计算失败 {r.metric_id}: {r.error[:100]}...") ) # 记录失败次数 if "failed_metric_attempts" not in new_state: new_state["failed_metric_attempts"] = {} new_state["failed_metric_attempts"][r.metric_id] = new_state.get("failed_metric_attempts", {}).get( r.metric_id, 0) + 1 # 更新待计算指标列表 remaining_ids = [m.metric_id for m in pending_metrics[success_count:]] new_state["pending_metric_ids"] = remaining_ids # 新增:记录失败指标,避免无限重试 if success_count == 0 and len(metrics_to_compute) > 0: # 如果全部失败,标记为跳过 new_state["messages"].append( ("ai", f"⚠️ {len(metrics_to_compute)}个指标全部计算失败,将跳过这些指标") ) # 从待计算列表中移除这些失败的指标 failed_ids = [m.metric_id for m in metrics_to_compute] new_state["pending_metric_ids"] = [mid for mid in remaining_ids if mid not in failed_ids] print(f"✅ 成功 {success_count}/{len(results)},剩余 {len(new_state['pending_metric_ids'])} 个指标") # 重新评估覆盖率 coverage = len(new_state["computed_metrics"]) / len(required_metrics) new_state["completeness_score"] = coverage new_state["messages"].append( ("ai", f"🧮 计算完成 {success_count}/{len(results)} 个指标,覆盖率 {coverage:.2%}") ) # 关键修复:返回前清理状态 return convert_numpy_types(new_state)