JiaQiang 1 неделя назад
Родитель
Сommit
d4b9658338
1 измененных файлов с 369 добавлено и 0 удалено
  1. 369 0
      llmops/agents/metrics_agent.py

+ 369 - 0
llmops/agents/metrics_agent.py

@@ -0,0 +1,369 @@
+from typing import Dict, Any, List, Optional
+from langchain_openai import ChatOpenAI
+from pydantic import BaseModel, Field
+import pandas as pd
+import asyncio
+import json
+import traceback
+import re
+import types
+
+from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types
+from llmops.agents.datadev.llm import get_llm
+
+
+class MetricComputationResult(BaseModel):
+    """指标计算结果"""
+    metric_id: str
+    value: Any
+    status: str
+    computation_code: str = ""
+    error: Optional[str] = None
+
+
+class MetricsCalculator:
+    """指标计算智能体:安全执行指标计算"""
+
+    def __init__(self, llm):
+        self.llm = llm
+
+    def _extract_json_from_response(self, content: str) -> dict:
+        """从LLM响应中提取JSON,支持Markdown代码块格式"""
+        content = content.strip()
+
+        # 尝试匹配 ```json ... ``` 格式
+        json_match = re.search(r'```json\s*(\{.*?\})\s*```', content, re.DOTALL)
+        if json_match:
+            return json.loads(json_match.group(1))
+
+        # 尝试匹配 ``` ... ``` 格式
+        code_match = re.search(r'```\s*(\{.*?\})\s*```', content, re.DOTALL)
+        if code_match:
+            return json.loads(code_match.group(1))
+
+        # 尝试直接解析(兼容纯JSON格式)
+        try:
+            return json.loads(content)
+        except json.JSONDecodeError:
+            pass
+
+        # 尝试提取第一个 { 和最后一个 } 之间的内容
+        first_brace = content.find('{')
+        last_brace = content.rfind('}')
+        if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
+            try:
+                return json.loads(content[first_brace:last_brace + 1])
+            except json.JSONDecodeError:
+                pass
+
+        raise ValueError(f"无法从响应中提取有效JSON: {content[:200]}...")
+
+    def _execute_code_safely(self, code: str, df: pd.DataFrame) -> Any:
+        """
+        核心修复:使用types.FunctionType创建真正的函数作用域
+        解决列表推导式中变量不可见的问题
+        """
+        # 准备执行环境
+        globals_dict = {
+            "pd": pd,
+            "df": df.copy(),
+            "__builtins__": {
+                "abs": abs, "sum": sum, "len": len, "str": str, "int": int, "float": float,
+                "list": list, "dict": dict, "set": set, "tuple": tuple,
+                "min": min, "max": max, "round": round, "range": range,
+                "sorted": sorted, "enumerate": enumerate, "zip": zip,
+            }
+        }
+
+        # 移除可能存在的危险内置函数
+        safe_globals = globals_dict.copy()
+
+        # 创建函数代码
+        func_code = f"""
+def _compute_metric():
+    # 用户生成的代码
+    {code}
+
+    # 确保result被定义
+    return result
+"""
+
+        try:
+            # 编译代码
+            compiled = compile(func_code, "<string>", "exec")
+
+            # 创建局部命名空间
+            local_ns = {}
+
+            # 执行编译后的代码
+            exec(compiled, safe_globals, local_ns)
+
+            # 获取函数并执行
+            compute_func = local_ns["_compute_metric"]
+
+            # 执行函数并返回结果
+            result = compute_func()
+
+            # 关键:立即转换numpy类型
+            return convert_numpy_types(result)
+
+        except Exception as e:
+            raise ValueError(f"代码执行失败: {str(e)}\n代码内容: {code}")
+
+    async def compute_metric(
+            self,
+            metric: MetricRequirement,
+            df: pd.DataFrame
+    ) -> MetricComputationResult:
+        """动态计算单个指标(最终修复版)"""
+
+        available_fields = df.columns.tolist()
+
+        # 内置常用指标模板,提高成功率
+        builtin_templates = {
+            "total_income": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()",
+            "total_expense": "df_filtered = df[df['txDirection']=='支出']; result = abs(df_filtered['txAmount'].sum())",
+            "net_income": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
+            "net_profit": "income = df[df['txDirection']=='收入']['txAmount'].sum(); expense = df[df['txDirection']=='支出']['txAmount'].sum(); result = income - expense",
+            "balance_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_daily = df.groupby(df['txDate'].dt.date)['txBalance'].last().reset_index(); result = {'dates': df_daily['txDate'].astype(str).tolist(), 'balances': df_daily['txBalance'].tolist()}",
+            "top_income_sources": "df_income = df[df['txDirection']=='收入']; top_client = df_income.groupby('txCounterparty')['txAmount'].sum().idxmax(); result = {top_client: df_income.groupby('txCounterparty')['txAmount'].sum().max()}",
+            "top_expense_categories": "df_expense = df[df['txDirection']=='支出']; top_cat = df_expense.groupby('txSummary')['txAmount'].sum().idxmax(); result = {top_cat: df_expense.groupby('txSummary')['txAmount'].sum().max()}",
+            "income_sources_breakdown": "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()",
+            "expense_categories_breakdown": "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()",
+            "monthly_income_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_income = df[df['txDirection']=='收入']; df_income['month'] = df_income['txDate'].dt.to_period('M'); result = df_income.groupby('month')['txAmount'].sum().to_dict()",
+            "monthly_expense_trend": "df['txDate'] = pd.to_datetime(df['txDate']); df_expense = df[df['txDirection']=='支出']; df_expense['month'] = df_expense['txDate'].dt.to_period('M'); result = df_expense.groupby('month')['txAmount'].sum().to_dict()",
+        }
+
+        # 如果是指标ID在模板中,直接使用模板
+        if metric.metric_id in builtin_templates:
+            code = builtin_templates[metric.metric_id]
+            print(f"   📦 使用内置模板: {metric.metric_id}")
+        else:
+            # 否则调用LLM生成代码
+            prompt = f"""你是数据分析执行器。根据计算逻辑,生成并执行 Pandas/Python 代码。
+
+数据信息:
+- 字段: {available_fields}
+- 行数: {len(df)}
+- 指标名称: {metric.metric_name}
+- 指标ID: {metric.metric_id}
+- 计算逻辑: {metric.calculation_logic}
+
+要求:
+1. 生成可执行的 Pandas/Python 代码(只操作 df 变量)
+2. 代码必须安全,禁止系统调用
+3. **最终结果必须赋值给变量 `result`**(这是强制要求)
+4. `result` 可以是:数值、列表、字典、DataFrame
+5. 如果无法计算,将 error 字段设为具体原因
+
+**关键约束**:
+- 代码最后一行必须是 `result = ...`
+- 确保 `result` 变量在执行后被定义
+- 不要包含任何解释性文字,只输出JSON
+- **重要**:避免在列表推导式中引用外部变量,所有变量必须在同一作用域定义
+
+输出格式(必须返回纯JSON,不要Markdown代码块):
+{{
+  "metric_id": "{metric.metric_id}",
+  "value": <计算结果>,
+  "status": "success",
+  "computation_code": "df_filtered = df[df['txDirection']=='收入']; result = df_filtered['txAmount'].sum()"
+}}
+
+如果无法计算:
+{{
+  "metric_id": "{metric.metric_id}",
+  "value": null,
+  "status": "error",
+  "error": "具体错误原因",
+  "computation_code": ""
+}}
+
+**必须确保代码执行后,`result` 变量被定义!**"""
+
+            try:
+                # 调用 LLM
+                response = await self.llm.ainvoke(prompt)
+
+                print(f"\n   🤖 LLM 原始响应: {response.content[:200]}...")
+
+                # 使用增强的JSON解析
+                response_dict = self._extract_json_from_response(response.content)
+
+                # 提取代码
+                code = response_dict.get("computation_code", "")
+                print(f"   💻 生成代码: {code}")
+
+                if not code:
+                    # LLM无法生成代码,尝试使用简化模板
+                    if "收入" in metric.metric_name and "分类" in metric.metric_name:
+                        code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txSummary')['txAmount'].sum().to_dict()"
+                    elif "支出" in metric.metric_name and "分类" in metric.metric_name:
+                        code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
+                    elif "收入" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
+                        code = "df_income = df[df['txDirection']=='收入']; result = df_income.groupby('txCounterparty')['txAmount'].sum().to_dict()"
+                    elif "支出" in metric.metric_name and ("top" in metric.metric_name or "主要" in metric.metric_name):
+                        code = "df_expense = df[df['txDirection']=='支出']; result = df_expense.groupby('txSummary')['txAmount'].sum().to_dict()"
+                    else:
+                        raise ValueError("LLM 未生成计算代码,且无法匹配内置模板")
+
+            except Exception as e:
+                # 如果LLM调用失败,使用最简单模板
+                print(f"   ⚠️ LLM调用失败,使用回退模板: {e}")
+                if "收入" in metric.metric_name:
+                    code = "df_income = df[df['txDirection']=='收入']; result = df_income['txAmount'].sum()"
+                elif "支出" in metric.metric_name:
+                    code = "df_expense = df[df['txDirection']=='支出']; result = abs(df_expense['txAmount'].sum())"
+                else:
+                    code = "result = None"
+
+        try:
+            # 使用重构的执行器
+            computed_value = self._execute_code_safely(code, df)
+            print(f"   ✅ 计算结果: {computed_value}")
+
+            return MetricComputationResult(
+                metric_id=metric.metric_id,
+                value=computed_value,
+                status="success",
+                computation_code=code
+            )
+
+        except Exception as e:
+            # 捕获所有异常并记录详细信息
+            error_msg = f"指标 {metric.metric_id} 计算失败: {str(e)}"
+            print(f"   ❌ {error_msg}")
+
+            return MetricComputationResult(
+                metric_id=metric.metric_id,
+                value=None,
+                status="failed",
+                error=error_msg,
+                computation_code=code
+            )
+
+    async def compute_batch(
+            self,
+            metrics: List[MetricRequirement],
+            df: pd.DataFrame,
+            max_concurrent: int = 3
+    ) -> List[MetricComputationResult]:
+        """批量计算指标(限制并发)"""
+        semaphore = asyncio.Semaphore(max_concurrent)
+
+        async def compute_with_semaphore(metric):
+            async with semaphore:
+                return await self.compute_metric(metric, df)
+
+        tasks = [compute_with_semaphore(m) for m in metrics]
+        return await asyncio.gather(*tasks)
+
+
+async def metrics_node(state: AgentState) -> AgentState:
+    """指标计算节点:正确提取待计算指标并增强调试"""
+
+    if not state["data_set"]:
+        new_state = state.copy()
+        new_state["messages"].append(("ai", "❌ 错误:数据集为空"))
+        # 关键修复:返回前清理状态
+        return convert_numpy_types(new_state)
+
+    df = pd.DataFrame(state["data_set"])
+
+    # 提取待计算指标
+    pending_ids = state.get("pending_metric_ids", [])
+    computed_ids = set(state["computed_metrics"].keys())
+    required_metrics = state["metrics_requirements"]
+
+    pending_metrics = [
+        m for m in required_metrics
+        if m.metric_id not in computed_ids
+    ]
+
+    if pending_ids:
+        valid_ids = [m.metric_id for m in pending_metrics]
+        pending_metrics = [
+            m for m in pending_metrics
+            if m.metric_id in pending_ids and m.metric_id in valid_ids
+        ]
+
+    if not pending_metrics:
+        coverage = len(computed_ids) / len(required_metrics) if required_metrics else 1
+
+        if coverage >= 1.0:
+            new_state = state.copy()
+            new_state["messages"].append(("ai", "✅ 所有指标已计算完成"))
+            new_state["completeness_score"] = 1.0
+            new_state["is_complete"] = True
+            new_state["next_route"] = "report_compiler"
+            # 关键修复:返回前清理状态
+            return convert_numpy_types(new_state)
+
+        # 重新计算
+        pending_metrics = [
+            m for m in required_metrics
+            if m.metric_id not in computed_ids
+        ]
+
+        if not pending_metrics:
+            raise ValueError("无法提取待计算指标")
+
+    print(f"\n📊 待计算指标: {[m.metric_id for m in pending_metrics]}")
+
+    # 限制单次计算数量
+    batch_size = 5
+    metrics_to_compute = pending_metrics[:batch_size]
+
+    print(f"🧮 本次计算 {len(metrics_to_compute)} 个指标")
+
+    # 批量计算
+    llm = get_llm()
+    calculator = MetricsCalculator(llm)
+
+    results = await calculator.compute_batch(metrics_to_compute, df)
+
+    # 更新状态
+    new_state = state.copy()
+    success_count = 0
+
+    for r in results:
+        if r.status == "success":
+            new_state["computed_metrics"][r.metric_id] = r.value
+            success_count += 1
+        else:
+            print(f"⚠️ 指标 {r.metric_id} 失败: {r.error[:100]}...")
+            new_state["messages"].append(
+                ("ai", f"⚠️ 指标计算失败 {r.metric_id}: {r.error[:100]}...")
+            )
+            # 记录失败次数
+            if "failed_metric_attempts" not in new_state:
+                new_state["failed_metric_attempts"] = {}
+            new_state["failed_metric_attempts"][r.metric_id] = new_state.get("failed_metric_attempts", {}).get(
+                r.metric_id, 0) + 1
+
+    # 更新待计算指标列表
+    remaining_ids = [m.metric_id for m in pending_metrics[success_count:]]
+    new_state["pending_metric_ids"] = remaining_ids
+
+    # 新增:记录失败指标,避免无限重试
+    if success_count == 0 and len(metrics_to_compute) > 0:
+        # 如果全部失败,标记为跳过
+        new_state["messages"].append(
+            ("ai", f"⚠️ {len(metrics_to_compute)}个指标全部计算失败,将跳过这些指标")
+        )
+        # 从待计算列表中移除这些失败的指标
+        failed_ids = [m.metric_id for m in metrics_to_compute]
+        new_state["pending_metric_ids"] = [mid for mid in remaining_ids if mid not in failed_ids]
+
+    print(f"✅ 成功 {success_count}/{len(results)},剩余 {len(new_state['pending_metric_ids'])} 个指标")
+
+    # 重新评估覆盖率
+    coverage = len(new_state["computed_metrics"]) / len(required_metrics)
+    new_state["completeness_score"] = coverage
+
+    new_state["messages"].append(
+        ("ai", f"🧮 计算完成 {success_count}/{len(results)} 个指标,覆盖率 {coverage:.2%}")
+    )
+
+    # 关键修复:返回前清理状态
+    return convert_numpy_types(new_state)