| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- from typing import Dict, Any, List, Optional
- from langchain_openai import ChatOpenAI
- from pydantic import BaseModel, Field
- import pandas as pd
- import asyncio
- import json
- import traceback
- import re
- import types
- from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types
- from llmops.agents.datadev.llm import get_llm
- class MetricComputationResult(BaseModel):
- """指标计算结果"""
- metric_id: str
- value: Any
- status: str
- computation_code: str = ""
- error: Optional[str] = None
- class MetricsCalculator:
- """指标计算智能体:安全执行指标计算"""
- def __init__(self, llm):
- self.llm = llm
- def _extract_json_from_response(self, content: str) -> dict:
- """从LLM响应中提取JSON,支持Markdown代码块格式"""
- content = content.strip()
- # 尝试匹配 ```json ... ``` 格式
- json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL)
- if json_match:
- return json.loads(json_match.group(1))
- # 尝试匹配 ``` ... ``` 格式
- code_match = re.search(r'```\s*(\{.*?\})\s*```', content, re.DOTALL)
- if code_match:
- return json.loads(code_match.group(1))
- # 尝试直接解析(兼容纯JSON格式)
- try:
- return json.loads(content)
- except json.JSONDecodeError:
- pass
- # 尝试提取第一个 { 和最后一个 } 之间的内容
- first_brace = content.find('{')
- last_brace = content.rfind('}')
- if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
- try:
- return json.loads(content[first_brace:last_brace + 1])
- except json.JSONDecodeError:
- pass
- raise ValueError(f"无法从响应中提取有效JSON: {content[:200]}...")
- def _execute_code_safely(self, code: str, df: pd.DataFrame) -> Any:
- """
- 核心修复:使用types.FunctionType创建真正的函数作用域
- 解决列表推导式中变量不可见的问题
- """
- # 准备执行环境
- globals_dict = {
- "pd": pd,
- "df": df.copy(),
- "__builtins__": {
- "abs": abs, "sum": sum, "len": len, "str": str, "int": int, "float": float,
- "list": list, "dict": dict, "set": set, "tuple": tuple,
- "min": min, "max": max, "round": round, "range": range,
- "sorted": sorted, "enumerate": enumerate, "zip": zip,
- }
- }
- # 移除可能存在的危险内置函数
- safe_globals = globals_dict.copy()
- # 创建函数代码
- func_code = f"""
- def _compute_metric():
- # 用户生成的代码
- {code}
- # 确保result被定义
- return result
- """
- try:
- # 编译代码
- compiled = compile(func_code, "<string>", "exec")
- # 创建局部命名空间
- local_ns = {}
- # 执行编译后的代码
- exec(compiled, safe_globals, local_ns)
- # 获取函数并执行
- compute_func = local_ns["_compute_metric"]
- # 执行函数并返回结果
- result = compute_func()
- # 关键:立即转换numpy类型
- return convert_numpy_types(result)
- except Exception as e:
- raise ValueError(f"代码执行失败: {str(e)}\n代码内容: {code}")
- async def compute_metric(
- self,
- metric: MetricRequirement,
- df: pd.DataFrame
- ) -> MetricComputationResult:
- """动态计算单个指标(最终修复版)"""
- available_fields = df.columns.tolist()
- # 内置常用指标模板,提高成功率
- builtin_templates = {
- "total_income": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()",
- "total_expense": "df_filtered = df[df['txDirection']=='支出']; result = abs(df_filtered['txAmount'].sum())",
- "net_income": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
- "net_profit": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
- "balance_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_daily = df.groupby(df['txDate'].dt.date)['txBalance'].last().reset_index(); result = {'dates': df_daily['txDate'].astype(str).tolist(), 'balances': df_daily['txBalance'].tolist()}",
- "top_income_sources": "df_income = df[df['txDirection']=='收入']; top_client = df_income.groupby('txCounterparty')['txAmount'].sum().idxmax(); result = {top_client: df_income.groupby('txCounterparty')['txAmount'].sum().max()}",
- "top_expense_categories": "df_expense = df[df['txDirection']=='支出']; top_cat = df_expense.groupby('txSummary')['txAmount'].sum().idxmax(); result = {top_cat: df_expense.groupby('txSummary')['txAmount'].sum().max()}",
- "income_sources_breakdown": "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()",
- "expense_categories_breakdown": "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()",
- "monthly_income_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_income = df[df['txDirection']=='收入']; df_income['month'] = df_income['txDate'].dt.to_period('M'); result = df_income.groupby('month')['txAmount'].sum().to_dict()",
- "monthly_expense_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_expense = df[df['txDirection']=='支出']; df_expense['month'] = df_expense['txDate'].dt.to_period('M'); result = df_expense.groupby('month')['txAmount'].sum().to_dict()",
- }
- # 如果是指标ID在模板中,直接使用模板
- if metric.metric_id in builtin_templates:
- code = builtin_templates[metric.metric_id]
- print(f" 📦 使用内置模板: {metric.metric_id}")
- else:
- # 否则调用LLM生成代码
- prompt = f"""你是数据分析执行器。根据计算逻辑,生成并执行 Pandas/Python 代码。
- 数据信息:
- - 字段: {available_fields}
- - 行数: {len(df)}
- - 指标名称: {metric.metric_name}
- - 指标ID: {metric.metric_id}
- - 计算逻辑: {metric.calculation_logic}
- 要求:
- 1. 生成可执行的 Pandas/Python 代码(只操作 df 变量)
- 2. 代码必须安全,禁止系统调用
- 3. **最终结果必须赋值给变量 `result`**(这是强制要求)
- 4. `result` 可以是:数值、列表、字典、DataFrame
- 5. 如果无法计算,将 error 字段设为具体原因
- **关键约束**:
- - 代码最后一行必须是 `result = ...`
- - 确保 `result` 变量在执行后被定义
- - 不要包含任何解释性文字,只输出JSON
- - **重要**:避免在列表推导式中引用外部变量,所有变量必须在同一作用域定义
- 输出格式(必须返回纯JSON,不要Markdown代码块):
- {{
- "metric_id": "{metric.metric_id}",
- "value": <计算结果>,
- "status": "success",
- "computation_code": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()"
- }}
- 如果无法计算:
- {{
- "metric_id": "{metric.metric_id}",
- "value": null,
- "status": "error",
- "error": "具体错误原因",
- "computation_code": ""
- }}
- **必须确保代码执行后,`result` 变量被定义!**"""
- try:
- # 调用 LLM
- response = await self.llm.ainvoke(prompt)
- print(f"\n 🤖 LLM 原始响应: {response.content[:200]}...")
- # 使用增强的JSON解析
- response_dict = self._extract_json_from_response(response.content)
- # 提取代码
- code = response_dict.get("computation_code", "")
- print(f" 💻 生成代码: {code}")
- if not code:
- # LLM无法生成代码,尝试使用简化模板
- if "收入" in metric.metric_name and "分类" in metric.metric_name:
- code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txSummary')['txAmount'].sum().to_dict()"
- elif "支出" in metric.metric_name and "分类" in metric.metric_name:
- code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
- elif "收入" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
- code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()"
- elif "支出" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
- code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
- else:
- raise ValueError("LLM 未生成计算代码,且无法匹配内置模板")
- except Exception as e:
- # 如果LLM调用失败,使用最简单模板
- print(f" ⚠️ LLM调用失败,使用回退模板: {e}")
- if "收入" in metric.metric_name:
- code = "df_income = df[df['txDirection']=='收入']; result = df_income['txAmount'].sum()"
- elif "支出" in metric.metric_name:
- code = "df_expense = df[df['txDirection']=='支出']; result = abs(df_expense['txAmount'].sum())"
- else:
- code = "result = None"
- try:
- # 使用重构的执行器
- computed_value = self._execute_code_safely(code, df)
- print(f" ✅ 计算结果: {computed_value}")
- return MetricComputationResult(
- metric_id=metric.metric_id,
- value=computed_value,
- status="success",
- computation_code=code
- )
- except Exception as e:
- # 捕获所有异常并记录详细信息
- error_msg = f"指标 {metric.metric_id} 计算失败: {str(e)}"
- print(f" ❌ {error_msg}")
- return MetricComputationResult(
- metric_id=metric.metric_id,
- value=None,
- status="failed",
- error=error_msg,
- computation_code=code
- )
- async def compute_batch(
- self,
- metrics: List[MetricRequirement],
- df: pd.DataFrame,
- max_concurrent: int = 3
- ) -> List[MetricComputationResult]:
- """批量计算指标(限制并发)"""
- semaphore = asyncio.Semaphore(max_concurrent)
- async def compute_with_semaphore(metric):
- async with semaphore:
- return await self.compute_metric(metric, df)
- tasks = [compute_with_semaphore(m) for m in metrics]
- return await asyncio.gather(*tasks)
- async def metrics_node(state: AgentState) -> AgentState:
- """指标计算节点:正确提取待计算指标并增强调试"""
- if not state["data_set"]:
- new_state = state.copy()
- new_state["messages"].append(("ai", "❌ 错误:数据集为空"))
- # 关键修复:返回前清理状态
- return convert_numpy_types(new_state)
- df = pd.DataFrame(state["data_set"])
- # 提取待计算指标
- pending_ids = state.get("pending_metric_ids", [])
- computed_ids = set(state["computed_metrics"].keys())
- required_metrics = state["metrics_requirements"]
- pending_metrics = [
- m for m in required_metrics
- if m.metric_id not in computed_ids
- ]
- if pending_ids:
- valid_ids = [m.metric_id for m in pending_metrics]
- pending_metrics = [
- m for m in pending_metrics
- if m.metric_id in pending_ids and m.metric_id in valid_ids
- ]
- if not pending_metrics:
- coverage = len(computed_ids) / len(required_metrics) if required_metrics else 1
- if coverage >= 1.0:
- new_state = state.copy()
- new_state["messages"].append(("ai", "✅ 所有指标已计算完成"))
- new_state["completeness_score"] = 1.0
- new_state["is_complete"] = True
- new_state["next_route"] = "report_compiler"
- # 关键修复:返回前清理状态
- return convert_numpy_types(new_state)
- # 重新计算
- pending_metrics = [
- m for m in required_metrics
- if m.metric_id not in computed_ids
- ]
- if not pending_metrics:
- raise ValueError("无法提取待计算指标")
- print(f"\n📊 待计算指标: {[m.metric_id for m in pending_metrics]}")
- # 限制单次计算数量
- batch_size = 5
- metrics_to_compute = pending_metrics[:batch_size]
- print(f"🧮 本次计算 {len(metrics_to_compute)} 个指标")
- # 批量计算
- llm = get_llm()
- calculator = MetricsCalculator(llm)
- results = await calculator.compute_batch(metrics_to_compute, df)
- # 更新状态
- new_state = state.copy()
- success_count = 0
- for r in results:
- if r.status == "success":
- new_state["computed_metrics"][r.metric_id] = r.value
- success_count += 1
- else:
- print(f"⚠️ 指标 {r.metric_id} 失败: {r.error[:100]}...")
- new_state["messages"].append(
- ("ai", f"⚠️ 指标计算失败 {r.metric_id}: {r.error[:100]}...")
- )
- # 记录失败次数
- if "failed_metric_attempts" not in new_state:
- new_state["failed_metric_attempts"] = {}
- new_state["failed_metric_attempts"][r.metric_id] = new_state.get("failed_metric_attempts", {}).get(
- r.metric_id, 0) + 1
- # 更新待计算指标列表
- remaining_ids = [m.metric_id for m in pending_metrics[success_count:]]
- new_state["pending_metric_ids"] = remaining_ids
- # 新增:记录失败指标,避免无限重试
- if success_count == 0 and len(metrics_to_compute) > 0:
- # 如果全部失败,标记为跳过
- new_state["messages"].append(
- ("ai", f"⚠️ {len(metrics_to_compute)}个指标全部计算失败,将跳过这些指标")
- )
- # 从待计算列表中移除这些失败的指标
- failed_ids = [m.metric_id for m in metrics_to_compute]
- new_state["pending_metric_ids"] = [mid for mid in remaining_ids if mid not in failed_ids]
- print(f"✅ 成功 {success_count}/{len(results)},剩余 {len(new_state['pending_metric_ids'])} 个指标")
- # 重新评估覆盖率
- coverage = len(new_state["computed_metrics"]) / len(required_metrics)
- new_state["completeness_score"] = coverage
- new_state["messages"].append(
- ("ai", f"🧮 计算完成 {success_count}/{len(results)} 个指标,覆盖率 {coverage:.2%}")
- )
- # 关键修复:返回前清理状态
- return convert_numpy_types(new_state)
|