8 İşlemeler f5cd9b03e6 ... 740c3f723d

Yazar SHA1 Mesaj Tarih
  JiaQiang 740c3f723d Merge remote-tracking branch 'origin/master' 1 hafta önce
  JiaQiang 3974dfd443 配置类 1 hafta önce
  JiaQiang 3f83f56356 deepseek api_key环境设置 1 hafta önce
  JiaQiang b58d5ebd8f 图状态 1 hafta önce
  JiaQiang b3b426aa3e 指标计算智能体 1 hafta önce
  JiaQiang d4b9658338 指标计算智能体 1 hafta önce
  JiaQiang 996fbc71d6 大纲生成智能体 1 hafta önce
  JiaQiang bb2a88db45 规划智能体 1 hafta önce

+ 1 - 0
.env

@@ -0,0 +1 @@
+DEEPSEEK_API_KEY=sk-438668d443224063adbb1d295fe44a9f

+ 670 - 0
llmops/agents/metric_calculation_agent.py

@@ -0,0 +1,670 @@
+"""
+指标计算Agent (Metric Calculation Agent)
+====================================
+
+此Agent负责根据意图识别结果执行具体的指标计算任务。
+
+核心功能:
+1. 配置文件加载:读取和解析JSON格式的指标计算配置文件
+2. API调用管理:根据配置文件调用相应的计算API
+3. 结果处理:处理API返回的数据,提取关键指标
+4. 错误处理:处理API调用失败、网络异常等错误情况
+5. 结果验证:验证计算结果的合理性和完整性
+
+工作流程:
+1. 接收意图识别结果和用户参数
+2. 加载对应的指标计算配置文件
+3. 构造API请求参数
+4. 调用远程计算服务
+5. 解析和验证返回结果
+6. 返回结构化的计算结果
+
+技术实现:
+- 支持动态加载JSON配置文件
+- 使用requests库进行HTTP API调用
+- 集成LangChain用于复杂计算逻辑(可选)
+- 完善的错误处理和超时机制
+- 支持多种计算方法(标准、高级、自定义)
+
+配置文件结构:
+- api_config: API端点和认证信息
+- param_mapping: 参数映射规则
+- input_schema: 输入数据验证规则
+- output_schema: 输出数据结构定义
+- calculation_logic: 计算逻辑描述
+
+作者: Big Agent Team
+版本: 1.0.0
+创建时间: 2024-12-18
+"""
+
+import os
+import json
+import requests
+from datetime import datetime
+from typing import Dict, List, Any, Optional
+from langchain_openai import ChatOpenAI
+from langchain_core.prompts import ChatPromptTemplate
+import re
+
+
+class MetricCalculationAgent:
+    """远程指标计算Agent"""
+
+    def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
+        """
+        初始化指标计算Agent
+
+        Args:
+            api_key: DeepSeek API密钥
+            base_url: DeepSeek API基础URL
+        """
+        self.llm = ChatOpenAI(
+            model="deepseek-chat",
+            api_key=api_key,
+            base_url=base_url,
+            temperature=0.1
+        )
+
+        # 加载配置文件
+        self.configs = self._load_configs()
+
+        # 加载数据文件映射
+        self.data_files = self._load_data_files()
+
+        # 初始化API调用跟踪
+        self.api_calls = []
+
+    def _load_data_files(self) -> Dict[str, str]:
+        """加载数据文件映射"""
+        data_files = {}
+        data_dir = "data_files"
+
+        if os.path.exists(data_dir):
+            for file in os.listdir(data_dir):
+                if file.endswith('.json'):
+                    try:
+                        # 提取文件名中的关键词,用于匹配配置文件
+                        key = file.replace('原始数据-流水分析-', '').replace('.json', '')
+                        data_files[key] = os.path.join(data_dir, file)
+                    except Exception as e:
+                        print(f"处理数据文件 {file} 失败: {e}")
+
+        return data_files
+
+    def _select_data_file(self, config_name: str) -> Optional[str]:
+        """
+        根据配置文件名选择对应的数据文件
+
+        Args:
+            config_name: 配置文件名
+
+        Returns:
+            数据文件路径,如果找不到则返回None
+        """
+        # 配置文件名模式:指标计算-{category}-{metric}.json
+        # 数据文件名模式:原始数据-流水分析-{category}.json
+
+        # 从配置文件名中提取类别信息
+        match = re.search(r'指标计算-(.+?)-', config_name)
+        if match:
+            category = match.group(1)
+
+            # 优先选择原始数据文件
+            # 1. 首先查找完全匹配的原始数据文件
+            if category in self.data_files:
+                file_path = self.data_files[category]
+                if '原始数据' in file_path:
+                    return file_path
+
+            # 2. 如果没有完全匹配,查找包含类别的原始数据文件
+            for key, file_path in self.data_files.items():
+                if category in key and '原始数据' in file_path:
+                    return file_path
+
+        # 如果找不到匹配的文件,返回默认的农业原始数据文件(如果存在)
+        if '农业' in self.data_files:
+            return self.data_files['农业']
+
+        return None
+
+    def _load_table_data(self, data_file_path: str) -> List[Dict]:
+        """加载数据文件中的表格数据"""
+        try:
+            with open(data_file_path, 'r', encoding='utf-8') as f:
+                data = json.load(f)
+                return data if isinstance(data, list) else []
+        except Exception as e:
+            print(f"加载数据文件 {data_file_path} 失败: {e}")
+            return []
+
+    def _load_configs(self) -> Dict[str, Dict]:
+        """加载所有配置文件"""
+        configs = {}
+        json_dir = "json_files"
+
+        if os.path.exists(json_dir):
+            for file in os.listdir(json_dir):
+                if file.endswith('.json'):
+                    try:
+                        with open(os.path.join(json_dir, file), 'r', encoding='utf-8') as f:
+                            config = json.load(f)
+                            key = file.replace('.json', '')
+                            configs[key] = config
+                    except Exception as e:
+                        print(f"加载配置文件 {file} 失败: {e}")
+
+        return configs
+
+    async def calculate_metrics(self, intent_result: Dict[str, Any]) -> Dict[str, Any]:
+        """
+        根据意图识别结果进行指标计算
+
+        Args:
+            intent_result: 意图识别结果
+
+        Returns:
+            指标计算结果
+        """
+        try:
+            results = []
+            target_configs = intent_result.get("target_configs", [])
+
+            if not target_configs:
+                return {
+                    "success": False,
+                    "message": "没有找到需要调用的配置文件",
+                    "results": []
+                }
+
+            for config_name in target_configs:
+                if config_name in self.configs:
+                    config = self.configs[config_name]
+                    result = await self._call_metric_api(config, intent_result, config_name)
+                    results.append({
+                        "config_name": config_name,
+                        "result": result
+                    })
+                else:
+                    results.append({
+                        "config_name": config_name,
+                        "error": f"配置文件 {config_name} 不存在"
+                    })
+
+            return {
+                "success": True,
+                "results": results,
+                "total_configs": len(target_configs),
+                "successful_calculations": len([r for r in results if "result" in r])
+            }
+
+        except Exception as e:
+            print(f"指标计算失败: {e}")
+            return {
+                "success": False,
+                "message": f"指标计算过程中发生错误: {str(e)}",
+                "results": []
+            }
+
+    async def _call_metric_api(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
+        """
+        调用具体的指标计算API
+
+        Args:
+            config: 配置文件
+            intent_result: 意图识别结果
+
+        Returns:
+            API调用结果
+        """
+        try:
+            # 记录API调用开始
+            start_time = datetime.now()
+
+            # 使用真实API服务的配置
+            method = "POST"
+            url = "http://10.192.72.11:6300/api/data_analyst/full"  # 真实API服务地址
+            headers = {
+                "Accept": "*/*",
+                "Accept-Encoding": "gzip, deflate, br",
+                "Connection": "keep-alive",
+                "Content-Type": "application/json",
+                "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
+            }
+            timeout = 600  # 10分钟超时,给慢API更多时间
+
+            # 添加重试机制,最多重试3次
+            max_retries = 3
+            retry_delay = 5  # 每次重试间隔5秒
+
+            for attempt in range(max_retries):
+                try:
+                    print(f"🔄 API调用尝试 {attempt + 1}/{max_retries} (配置: {config_name})")
+
+                    # 准备请求数据
+                    request_data = self._prepare_request_data(config, intent_result, config_name)
+
+                    # 根据HTTP方法调用API
+                    if method.upper() == "GET":
+                        params = request_data.get("params", {})
+                        response = requests.get(url, headers=headers, params=params, timeout=timeout)
+                    elif method.upper() == "POST":
+                        json_data = request_data.get("json", {})
+                        response = requests.post(url, headers=headers, json=json_data, timeout=timeout)
+                    else:
+                        return {
+                            "success": False,
+                            "message": f"不支持的HTTP方法: {method}"
+                        }
+
+                    # 处理响应
+                    if response.status_code == 200:
+                        try:
+                            response_data = response.json()
+
+                            # 检查API响应结构并提取结果
+                            extracted_result = None
+                            if isinstance(response_data, dict):
+                                # 检查是否有code字段和data.result结构
+                                if response_data.get("code") == 0 and "data" in response_data:
+                                    data = response_data["data"]
+                                    if "result" in data:
+                                        # 从result字段中提取JSON
+                                        extracted_result = self._extract_json_from_result(data["result"])
+
+                            # 记录API调用结果
+                            end_time = datetime.now()
+                            call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+                            api_call_info = {
+                                "call_id": call_id,
+                                "timestamp": end_time.isoformat(),
+                                "agent": "MetricCalculationAgent",
+                                "api_endpoint": url,
+                                "config_name": config_name,
+                                "request": {
+                                    "method": method,
+                                    "url": url,
+                                    "headers": headers,
+                                    "json_data": json_data if method.upper() == "POST" else None,
+                                    "params": params if method.upper() == "GET" else None,
+                                    "start_time": start_time.isoformat()
+                                },
+                                "response": {
+                                    "status_code": response.status_code,
+                                    "data": response_data,
+                                    "extracted_result": extracted_result,
+                                    "end_time": end_time.isoformat(),
+                                    "duration": (end_time - start_time).total_seconds()
+                                },
+                                "success": True
+                            }
+                            self.api_calls.append(api_call_info)
+
+                            # 保存API结果到文件
+                            api_results_dir = "api_results"
+                            os.makedirs(api_results_dir, exist_ok=True)
+                            filename = f"{call_id}.json"
+                            filepath = os.path.join(api_results_dir, filename)
+
+                            try:
+                                with open(filepath, 'w', encoding='utf-8') as f:
+                                    json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+                                print(f"[API_RESULT] 保存API结果文件: {filepath}")
+                            except Exception as e:
+                                print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+                            return {
+                                "success": True,
+                                "data": response_data,
+                                "extracted_result": extracted_result,
+                                "status_code": response.status_code
+                            }
+                        except json.JSONDecodeError:
+                            # 记录API调用结果(JSON解析失败)
+                            end_time = datetime.now()
+                            call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+                            api_call_info = {
+                                "call_id": call_id,
+                                "timestamp": end_time.isoformat(),
+                                "agent": "MetricCalculationAgent",
+                                "api_endpoint": url,
+                                "config_name": config_name,
+                                "request": {
+                                    "method": method,
+                                    "url": url,
+                                    "headers": headers,
+                                    "json_data": json_data if method.upper() == "POST" else None,
+                                    "params": params if method.upper() == "GET" else None,
+                                    "start_time": start_time.isoformat()
+                                },
+                                "response": {
+                                    "status_code": response.status_code,
+                                    "data": response.text,
+                                    "error": "JSON解析失败",
+                                    "end_time": end_time.isoformat(),
+                                    "duration": (end_time - start_time).total_seconds()
+                                },
+                                "success": False
+                            }
+                            self.api_calls.append(api_call_info)
+
+                            # 保存API结果到文件
+                            api_results_dir = "api_results"
+                            os.makedirs(api_results_dir, exist_ok=True)
+                            filename = f"{call_id}.json"
+                            filepath = os.path.join(api_results_dir, filename)
+
+                            try:
+                                with open(filepath, 'w', encoding='utf-8') as f:
+                                    json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+                                print(f"[API_RESULT] 保存API结果文件: {filepath}")
+                            except Exception as e:
+                                print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+                            return {
+                                "success": True,
+                                "data": response.text,
+                                "extracted_result": None,
+                                "status_code": response.status_code
+                            }
+                    else:
+                        # 记录API调用结果(HTTP错误)
+                        end_time = datetime.now()
+                        call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+                        api_call_info = {
+                            "call_id": call_id,
+                            "timestamp": end_time.isoformat(),
+                            "agent": "MetricCalculationAgent",
+                            "api_endpoint": url,
+                            "config_name": config_name,
+                            "request": {
+                                "method": method,
+                                "url": url,
+                                "headers": headers,
+                                "json_data": json_data if method.upper() == "POST" else None,
+                                "params": params if method.upper() == "GET" else None,
+                                "start_time": start_time.isoformat()
+                            },
+                            "response": {
+                                "status_code": response.status_code,
+                                "error": response.text,
+                                "end_time": end_time.isoformat(),
+                                "duration": (end_time - start_time).total_seconds()
+                            },
+                            "success": False
+                        }
+                        self.api_calls.append(api_call_info)
+
+                        # 保存API结果到文件
+                        api_results_dir = "api_results"
+                        os.makedirs(api_results_dir, exist_ok=True)
+                        filename = f"{call_id}.json"
+                        filepath = os.path.join(api_results_dir, filename)
+
+                        try:
+                            with open(filepath, 'w', encoding='utf-8') as f:
+                                json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+                            print(f"[API_RESULT] 保存API结果文件: {filepath}")
+                        except Exception as e:
+                            print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+                        return {
+                            "success": False,
+                            "message": f"API调用失败,状态码: {response.status_code}",
+                            "response": response.text
+                        }
+
+                    # 如果执行到这里,说明本次尝试成功,跳出重试循环
+                    break
+
+                except requests.exceptions.Timeout:
+                    # 记录API调用结果(超时)
+                    end_time = datetime.now()
+                    call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+                    api_call_info = {
+                        "call_id": call_id,
+                        "timestamp": end_time.isoformat(),
+                        "agent": "MetricCalculationAgent",
+                        "api_endpoint": url,
+                        "config_name": config_name,
+                        "request": {
+                            "method": method,
+                            "url": url,
+                            "headers": headers,
+                            "json_data": json_data if method.upper() == "POST" else None,
+                            "params": params if method.upper() == "GET" else None,
+                            "start_time": start_time.isoformat()
+                        },
+                        "response": {
+                            "error": "API调用超时",
+                            "end_time": end_time.isoformat(),
+                            "duration": (end_time - start_time).total_seconds()
+                        },
+                        "success": False
+                    }
+                    self.api_calls.append(api_call_info)
+
+                    # 保存API结果到文件
+                    api_results_dir = "api_results"
+                    os.makedirs(api_results_dir, exist_ok=True)
+                    filename = f"{call_id}.json"
+                    filepath = os.path.join(api_results_dir, filename)
+
+                    try:
+                        with open(filepath, 'w', encoding='utf-8') as f:
+                            json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+                        print(f"[API_RESULT] 保存API结果文件: {filepath}")
+                    except Exception as e:
+                        print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+                    # 如果不是最后一次尝试,等待后重试
+                    if attempt < max_retries - 1:
+                        print(f"⏳ API调用超时,{retry_delay}秒后重试...")
+                        import time
+                        time.sleep(retry_delay)
+                        continue
+                    else:
+                        return {
+                            "success": False,
+                            "message": "API调用超时"
+                        }
+                except requests.exceptions.RequestException as e:
+                    # 记录API调用结果(请求异常)
+                    end_time = datetime.now()
+                    call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+                    api_call_info = {
+                        "call_id": call_id,
+                        "timestamp": end_time.isoformat(),
+                        "agent": "MetricCalculationAgent",
+                        "api_endpoint": url,
+                        "config_name": config_name,
+                        "request": {
+                            "method": method,
+                            "url": url,
+                            "headers": headers,
+                            "json_data": json_data if method.upper() == "POST" else None,
+                            "params": params if method.upper() == "GET" else None,
+                            "start_time": start_time.isoformat()
+                        },
+                        "response": {
+                            "error": str(e),
+                            "end_time": end_time.isoformat(),
+                            "duration": (end_time - start_time).total_seconds()
+                        },
+                        "success": False
+                    }
+                    self.api_calls.append(api_call_info)
+
+                    # 保存API结果到文件
+                    api_results_dir = "api_results"
+                    os.makedirs(api_results_dir, exist_ok=True)
+                    filename = f"{call_id}.json"
+                    filepath = os.path.join(api_results_dir, filename)
+
+                    try:
+                        with open(filepath, 'w', encoding='utf-8') as f:
+                            json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+                        print(f"[API_RESULT] 保存API结果文件: {filepath}")
+                    except Exception as e:
+                        print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+                    # 如果不是最后一次尝试,等待后重试
+                    if attempt < max_retries - 1:
+                        print(f"❌ API调用异常: {str(e)},{retry_delay}秒后重试...")
+                        import time
+                        time.sleep(retry_delay)
+                        continue
+                    else:
+                        return {
+                            "success": False,
+                            "message": f"API调用异常: {str(e)}"
+                        }
+                except Exception as e:
+                    # 记录API调用结果(其他异常)
+                    end_time = datetime.now()
+                    call_id = f"api_{config_name}_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+                    api_call_info = {
+                        "call_id": call_id,
+                        "timestamp": end_time.isoformat(),
+                        "agent": "MetricCalculationAgent",
+                        "api_endpoint": url,
+                        "config_name": config_name,
+                        "request": {
+                            "method": method,
+                            "url": url,
+                            "headers": headers,
+                            "json_data": json_data if method.upper() == "POST" else None,
+                            "params": params if method.upper() == "GET" else None,
+                            "start_time": start_time.isoformat()
+                        },
+                        "response": {
+                            "error": str(e),
+                            "end_time": end_time.isoformat(),
+                            "duration": (end_time - start_time).total_seconds()
+                        },
+                        "success": False
+                    }
+                    self.api_calls.append(api_call_info)
+
+                    # 保存API结果到文件
+                    api_results_dir = "api_results"
+                    os.makedirs(api_results_dir, exist_ok=True)
+                    filename = f"{call_id}.json"
+                    filepath = os.path.join(api_results_dir, filename)
+
+                    try:
+                        with open(filepath, 'w', encoding='utf-8') as f:
+                            json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+                        print(f"[API_RESULT] 保存API结果文件: {filepath}")
+                    except Exception as e:
+                        print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+                    # 如果不是最后一次尝试,等待后重试
+                    if attempt < max_retries - 1:
+                        print(f"❌ 其他异常: {str(e)},{retry_delay}秒后重试...")
+                        import time
+                        time.sleep(retry_delay)
+                        continue
+                    else:
+                        return {
+                            "success": False,
+                            "message": f"API调用异常: {str(e)}"
+                        }
+        except Exception as e:
+            # 处理所有未捕获的异常
+            print(f"❌ API调用过程中发生未预期的错误: {str(e)}")
+            return {
+                "success": False,
+                "message": f"API调用过程中发生未预期的错误: {str(e)}"
+            }
+
+    def _prepare_request_data(self, config: Dict[str, Any], intent_result: Dict[str, Any], config_name: str) -> Dict[str, Any]:
+        """
+        准备API请求数据
+
+        Args:
+            config: 配置文件
+            intent_result: 意图识别结果
+            config_name: 配置文件名
+
+        Returns:
+            请求数据
+        """
+        # 从配置文件中获取question和prompt
+        question = config.get("question", "")
+        prompt = config.get("prompt", "")
+
+        # 选择对应的数据文件
+        data_file_path = self._select_data_file(config_name)
+        table_data = []
+
+        if data_file_path:
+            table_data = self._load_table_data(data_file_path)
+        else:
+            print(f"警告:找不到配置文件 {config_name} 对应的数据文件")
+
+        # 构造documents数组
+        documents = []
+        if table_data:
+            # 使用数据文件名作为标题
+            title = f"数据表-{config_name}"
+            if data_file_path:
+                title = os.path.basename(data_file_path).replace('.json', '')
+
+            documents.append({
+                "id": 1,
+                "title": title,
+                "text": "",
+                "table": table_data
+            })
+
+        # 构造API请求体
+        request_data = {
+            "disable_planning": False,
+            "question": question,
+            "prompt": prompt,
+            "documents": documents
+        }
+
+        return {"json": request_data}
+
+    def _extract_json_from_result(self, result_text: str) -> Dict[str, Any]:
+        """
+        从API结果文本中提取JSON内容
+
+        Args:
+            result_text: API返回的result字段内容
+
+        Returns:
+            提取的JSON对象
+        """
+        import re
+        import json
+
+        try:
+            # 查找```json和```之间的内容
+            json_match = re.search(r'```json\s*(.*?)\s*```', result_text, re.DOTALL)
+            if json_match:
+                json_str = json_match.group(1).strip()
+                return json.loads(json_str)
+
+            # 如果没有```json标记,查找大括号包围的内容
+            brace_match = re.search(r'\{.*\}', result_text, re.DOTALL)
+            if brace_match:
+                json_str = brace_match.group(0).strip()
+                return json.loads(json_str)
+
+            # 如果都找不到,尝试直接解析整个文本
+            return json.loads(result_text.strip())
+
+        except json.JSONDecodeError as e:
+            print(f"JSON解析失败: {e}")
+            return {"error": f"无法解析JSON结果: {str(e)}", "raw_result": result_text}
+
+    def get_available_configs(self) -> List[str]:
+        """获取所有可用的配置文件名"""
+        return list(self.configs.keys())
+
+    def get_config_details(self, config_name: str) -> Optional[Dict]:
+        """获取指定配置文件的详细信息"""
+        return self.configs.get(config_name)

+ 644 - 151
llmops/agents/outline_agent.py

@@ -1,94 +1,375 @@
+"""
+报告大纲生成Agent (Report Outline Generation Agent)
+===============================================
+
+此Agent负责根据用户需求和数据样本,生成专业的报告大纲结构。
+
+核心功能:
+1. 分析用户需求:理解报告目标和关键指标
+2. 数据结构分析:识别可用字段和数据特征
+3. 大纲生成:创建结构化的报告章节和指标需求
+4. 智能推断:自动推断所需字段和计算逻辑
+
+工作流程:
+1. 接收用户查询和数据样本
+2. 分析数据结构和可用字段
+3. 生成报告标题和章节结构
+4. 定义全局指标需求
+5. 返回结构化的大纲对象
+
+技术实现:
+- 使用LangChain和结构化输出
+- 支持异步处理
+- 自动字段推断和补全
+- 错误处理和默认值提供
+
+作者: Big Agent Team
+版本: 1.0.0
+创建时间: 2024-12-20
+"""
+
 from typing import List, Dict, Any
 from langchain_openai import ChatOpenAI
 from langchain_core.prompts import ChatPromptTemplate
-import json  # 确保导入json
+import json
+import os
 import uuid
+import requests
+from datetime import datetime
+
+from pydantic import BaseModel, Field
+
+
+# 数据模型定义(与现有项目兼容)
+class MetricRequirement(BaseModel):
+    """指标需求定义"""
+    metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
+    metric_name: str = Field(description="指标中文名称")
+    calculation_logic: str = Field(description="计算逻辑描述")
+    required_fields: List[str] = Field(description="所需字段")
+    dependencies: List[str] = Field(default_factory=list, description="依赖的其他指标ID")
+
+
+class ReportSection(BaseModel):
+    """报告大纲章节"""
+    section_id: str = Field(description="章节ID")
+    title: str = Field(description="章节标题")
+    description: str = Field(description="章节内容要求")
+    metrics_needed: List[str] = Field(description="所需指标ID列表")
 
-from llmops.agents.state import AgentState, ReportOutline, ReportSection, MetricRequirement, convert_numpy_types
-from llmops.agents.datadev.llm import get_llm
 
+class ReportOutline(BaseModel):
+    """完整报告大纲"""
+    report_title: str = Field(description="报告标题")
+    sections: List[ReportSection] = Field(description="章节列表")
+    global_metrics: List[MetricRequirement] = Field(description="全局指标列表")
 
-class OutlineGenerator:
+
+class OutlineGeneratorAgent:
     """大纲生成智能体:将报告需求转化为结构化大纲"""
 
-    def __init__(self, llm):
-        self.llm = llm.with_structured_output(ReportOutline)
+    def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
+        """
+        初始化大纲生成Agent
+
+        Args:
+            api_key: DeepSeek API密钥
+            base_url: DeepSeek API基础URL
+        """
+        self.llm = ChatOpenAI(
+            model="deepseek-chat",
+            api_key=api_key,
+            base_url=base_url,
+            temperature=0.1
+        )
 
-    def create_prompt(self, question: str, sample_data: List[Dict]) -> str:
+        # 初始化API调用跟踪
+        self.api_calls = []
+
+        # 获取可用的知识元数据
+        self.available_knowledge = self._load_available_knowledge()
+
+
+
+    def _convert_new_format_to_outline(self, new_format_data: Dict[str, Any]) -> Dict[str, Any]:
+        """将新的JSON格式转换为原来的ReportOutline格式"""
+
+        # 转换sections
+        sections = []
+        for section_data in new_format_data.get("sections", []):
+            # 从metrics中提取指标名称
+            metrics_needed = []
+            for metric_type in ["calculation_metrics", "statistical_metrics", "analysis_metrics"]:
+                for metric in section_data.get("metrics", {}).get(metric_type, []):
+                    # 这里可以根据metric_name映射到实际的metric_id
+                    # 暂时使用metric_name作为metric_id
+                    metrics_needed.append(metric.get("metric_name", ""))
+
+            section = {
+                "section_id": section_data.get("section_id", ""),
+                "title": section_data.get("section_title", ""),
+                "description": section_data.get("section_description", ""),
+                "metrics_needed": metrics_needed
+            }
+            sections.append(section)
+
+        # 生成global_metrics:使用知识ID进行匹配,并强制添加更多农业相关指标
+        global_metrics = []
+        used_knowledge_ids = set()
+
+        # 首先处理LLM生成的指标
+        for section in sections:
+            for metric_name in section["metrics_needed"]:
+                # 查找对应的指标描述(从原始数据中获取)
+                metric_description = ""
+                for section_data in new_format_data.get("sections", []):
+                    for metric_type in ["calculation_metrics", "statistical_metrics", "analysis_metrics"]:
+                        for metric in section_data.get("metrics", {}).get(metric_type, []):
+                            if metric.get("metric_name") == metric_name:
+                                metric_description = metric.get("metric_description", "")
+                                break
+                        if metric_description:
+                            break
+                    if metric_description:
+                        break
+
+                # 使用知识ID匹配算法找到最佳匹配
+                knowledge_id = self._match_metric_to_knowledge(metric_name, metric_description)
+
+                # 如果找到匹配的知识ID,使用它作为metric_id
+                if knowledge_id and knowledge_id not in used_knowledge_ids:
+                    global_metrics.append({
+                        "metric_id": knowledge_id,  # 使用知识ID作为metric_id
+                        "metric_name": metric_name,
+                        "calculation_logic": f"使用规则引擎计算{metric_name}: {metric_description}",
+                        "required_fields": ["transactions"],  # 规则引擎使用transactions数据
+                        "dependencies": []
+                    })
+                    used_knowledge_ids.add(knowledge_id)
+                else:
+                    # 如果没有找到匹配的知识ID,生成一个基本的MetricRequirement作为备选
+                    if not any(m.get("metric_id") == metric_name for m in global_metrics):
+                        print(f"⚠️ 指标 '{metric_name}' 未找到匹配的知识ID,使用默认配置")
+                        global_metrics.append({
+                            "metric_id": metric_name,
+                            "metric_name": metric_name,
+                            "calculation_logic": f"计算{metric_name}: {metric_description}",
+                            "required_fields": ["txAmount", "txDirection"],
+                            "dependencies": []
+                        })
+
+        # 注意:现在依赖LLM根据提示词生成包含所有必需指标的大纲,不再在代码中强制添加
+
+        # 如果LLM没有提供任何指标,则自动补充基础指标
+        if not global_metrics:
+            print("⚠️ LLM未提供指标,使用默认基础指标")
+            available_metrics = self._load_available_metrics()
+
+            # 选择前5个基础指标
+            base_metrics = [m for m in available_metrics if m.get('type') == '基础统计指标'][:5]
+
+            for metric in base_metrics:
+                metric_name = metric['name']
+                knowledge_id = f"metric-{metric_name}"
+                if sections:  # 确保有章节
+                    sections[0]["metrics_needed"].append(knowledge_id)  # 添加到第一个章节
+                global_metrics.append({
+                    "metric_id": knowledge_id,
+                    "metric_name": metric_name,
+                    "calculation_logic": f"使用规则引擎计算{metric_name}: {metric.get('description', '')}",
+                    "required_fields": ["transactions"],
+                    "dependencies": []
+                })
+
+        print(f"📊 最终生成 {len(global_metrics)} 个指标")
+
+        return {
+            "report_title": new_format_data.get("chapter_title", "流水分析报告"),
+            "sections": sections,
+            "global_metrics": global_metrics
+        }
+
+    def create_prompt(self) -> str:
         """创建大纲生成提示"""
 
-        available_fields = list(sample_data[0].keys()) if sample_data else []
-        sample_str = json.dumps(sample_data[:2], ensure_ascii=False, indent=2)
-
-        # 关键修复:提供详细的字段说明和示例
-        return f"""你是银行流水报告大纲专家。根据用户需求和样本数据,生成专业、可执行的报告大纲。
-
-需求分析:
-{question}
-
-可用字段:
-{', '.join(available_fields)}
-
-样本数据:
-{sample_str}
-
-输出要求(必须生成有效的JSON):
-1. report_title: 报告标题(字符串)
-2. sections: 章节列表,每个章节必须包含:
-   - section_id: 章节唯一ID(如"sec_1", "sec_2")
-   - title: 章节标题
-   - description: 章节描述
-   - metrics_needed: 所需指标ID列表(字符串数组,可为空)
-3. global_metrics: 全局指标列表,每个指标必须包含:
-   - metric_id: 指标唯一ID(如"total_income", "avg_balance")
-   - metric_name: 指标名称
-   - calculation_logic: 计算逻辑描述
-   - required_fields: 所需字段列表
-   - dependencies: 依赖的其他指标ID(可为空)
-
-重要提示:
-- 必须生成section_id,格式为"sec_1", "sec_2"等
-- 必须生成metric_id,格式为字母+下划线+描述
-- metrics_needed必须是字符串数组
-- 确保所有字段都存在,不能缺失
-
-输出示例:
+        # 从API动态获取可用的指标列表
+        available_metrics = self._load_available_metrics()
+
+        # 构建指标列表文本
+        metrics_list_text = "指标名称\t指标类型\t指标描述\n"
+        for metric in available_metrics:
+            metrics_list_text += f"{metric['name']}\t{metric.get('type', '计算型指标')}\t{metric.get('description', '')}\n"
+
+        # 构建基础提示词
+        base_prompt = f"""[角色定义]
+你的角色是: 流水分析报告的大纲生成模块。
+你的目标是:
+基于输入的流水分析业务背景信息,
+生成一份可交付、结构清晰、可被程序解析的流水分析报告大纲,
+并以结构化 JSON 的形式,明确每个章节及其下属分析主题所需的分析指标与分析项要求,
+以指导后续分析能力的调用。
+
+[职责边界]
+你只能完成以下事项:
+1.确定流水分析报告应包含的章节结构
+2.明确每个章节下需要覆盖的分析主题
+3.为每个分析主题列出所需的计算指标、统计指标或分析指标
+
+你不得做以下任何事情:
+1.不得计算任何指标
+2.不得对流水数据进行分析
+3.不得判断交易是否异常或存在风险
+4.不得生成任何分析结论、判断性描述或报告正文
+5.不得决定分析执行顺序或分析方法
+
+你输出的内容仅是"分析需求清单",而不是"分析结果"。
+
+[可用指标总览]
+系统当前支持 {len(available_metrics)} 个指标。
+
+[重要要求]
+请根据用户需求和可用指标列表,从上述指标中选择最相关的指标。优先选择基础统计指标和时间分析指标,确保报告的完整性和实用性。
+
+[强制要求]
+生成大纲时,请:
+1. 从可用指标中选择合适的指标组合
+2. 确保选择的指标能够满足用户分析需求
+3. 在metrics_needed数组中列出选定的指标名称
+4. 在global_metrics数组中包含对应指标的详细定义
+
+[可选择的指标列表]
+{metrics_list_text}
+
+[重要兼容性要求]
+虽然你必须使用上述JSON结构输出,但为了确保与现有系统的兼容性,请在输出中额外包含以下字段:
+- 在根级别添加 "report_title": "流水分析报告"
+- 在根级别添加 "global_metrics": [] (空数组或根据实际需求填充指标定义)
+- 确保输出能被现有系统正确解析和使用
+
+[输出格式要求]
+你必须且只能以 JSON 字符串 形式输出分析大纲,不得输出任何解释性自然语言。
+JSON 必须严格遵循以下结构约定:
 {{
-  "report_title": "2024年第三季度分析报告",
+  "chapter_id": "string",
+  "chapter_title": "string",
+  "chapter_type": "string",
   "sections": [
     {{
-      "section_id": "sec_1",
-      "title": "收入概览",
-      "description": "分析收入总额",
-      "metrics_needed": ["total_income", "avg_income"]
+      "section_id": "string",
+      "section_title": "string",
+      "section_description": "string",
+      "metrics_needed": ["string"]
     }}
   ],
-  "global_metrics": [
-    {{
-      "metric_id": "total_income",
-      "metric_name": "总收入",
-      "calculation_logic": "sum of all income transactions",
-      "required_fields": ["txAmount", "txDirection"],
-      "dependencies": []
-    }}
-  ]
+  "global_metrics": []
 }}"""
 
-    async def generate(self, state: AgentState) -> ReportOutline:
+        return base_prompt
+
+        print(f"📊 最终生成 {len(global_metrics)} 个指标")
+
+        return {
+            "report_title": new_format_data.get("chapter_title", "流水分析报告"),
+            "sections": sections,
+            "global_metrics": global_metrics
+        }
+
+
+    async def generate_outline(self, question: str, industry: str, sample_data: List[Dict[str, Any]]) -> ReportOutline:
         """异步生成大纲(修复版:自动补全缺失字段)"""
-        prompt = self.create_prompt(
-            question=state["question"],
-            sample_data=state["data_set"][:2]
-        )
+        prompt = self.create_prompt()
+
+        # 在prompt末尾添加业务背景信息
+        full_prompt = f"""{prompt}
+
+【业务背景信息】
+行业:{industry}
+产品类型:经营贷
+客群类型:小微企业"""
 
         messages = [
             ("system", "你是一名专业的报告大纲生成专家,必须输出完整、有效的JSON格式,包含所有必需字段。"),
-            ("user", prompt)
+            ("user", full_prompt)
         ]
 
-        outline = await self.llm.ainvoke(messages)
-
-        # 关键修复:后处理,补全缺失的section_id和metric_id
+        # 记录大模型输入
+        print("========================================")
+        print("[AGENT] OutlineGeneratorAgent (大纲生成Agent)")
+        print(f"[KNOWLEDGE_BASE] 已加载 {len(self.available_knowledge)} 个知识元数据")
+        if self.available_knowledge:
+            sample_knowledge = self.available_knowledge[:3]  # 显示前3个作为示例
+            print(f"[KNOWLEDGE_SAMPLE] 示例知识: {[k.get('id', '') for k in sample_knowledge]}")
+        print("[MODEL_INPUT] OutlineGeneratorAgent:")
+        print(f"[CONTEXT] 基于用户需求和数据样本生成报告大纲")
+        print(f"Question: {question}")
+        print(f"Sample data count: {len(sample_data)}")
+        print("========================================")
+
+        # 执行API调用
+        start_time = datetime.now()
+        response = await self.llm.ainvoke(messages)
+        end_time = datetime.now()
+
+        # 解析JSON响应
+        try:
+            # 从响应中提取JSON内容
+            content = response.content if hasattr(response, 'content') else str(response)
+            # 尝试找到JSON部分
+            json_start = content.find('{')
+            json_end = content.rfind('}') + 1
+            if json_start >= 0 and json_end > json_start:
+                json_str = content[json_start:json_end]
+                outline_data = json.loads(json_str)
+
+                # 转换新的JSON格式为原来的ReportOutline格式
+                converted_data = self._convert_new_format_to_outline(outline_data)
+                outline = ReportOutline(**converted_data)
+            else:
+                raise ValueError("No JSON found in response")
+        except Exception as e:
+            print(f"解析大纲响应失败: {e},使用默认大纲")
+            # 不在这里创建大纲,在函数末尾统一处理
+
+        # 记录API调用结果
+        call_id = f"api_mll_大纲生成_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+        api_call_info = {
+            "call_id": call_id,
+            "timestamp": end_time.isoformat(),
+            "agent": "OutlineGeneratorAgent",
+            "model": "deepseek-chat",
+            "request": {
+                "question": question,
+                "sample_data_count": len(sample_data),
+                "prompt": prompt,
+                "start_time": start_time.isoformat()
+            },
+            "response": {
+                "content": content,
+                "end_time": end_time.isoformat(),
+                "duration": (end_time - start_time).total_seconds()
+            },
+            "success": True
+        }
+        self.api_calls.append(api_call_info)
+
+        # 保存API结果到文件
+        api_results_dir = "api_results"
+        os.makedirs(api_results_dir, exist_ok=True)
+        filename = f"{call_id}.json"
+        filepath = os.path.join(api_results_dir, filename)
+
+        try:
+            with open(filepath, 'w', encoding='utf-8') as f:
+                json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+            print(f"[API_RESULT] 保存API结果文件: {filepath}")
+        except Exception as e:
+            print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+        # 记录大模型输出
+        print(f"[MODEL_OUTPUT] OutlineGeneratorAgent: {json.dumps(outline.dict() if hasattr(outline, 'dict') else outline, ensure_ascii=False)}")
+        print("========================================")
+
+        # 后处理,补全缺失的section_id和metric_id
         outline = self._post_process_outline(outline)
 
         return outline
@@ -143,92 +424,304 @@ class OutlineGenerator:
 
         return list(set(fields))
 
+    def _load_available_knowledge(self) -> List[Dict[str, Any]]:
+        """
+        从规则引擎获取可用的知识元数据
 
-async def outline_node(state: AgentState) -> AgentState:
-    """大纲生成节点:设置成功标志,防止重复生成"""
-
-    llm = get_llm()
-    generator = OutlineGenerator(llm)
-
-    try:
-        # 异步生成大纲
-        outline = await generator.generate(state)
-
-        # 更新状态
-        new_state = state.copy()
-        new_state["outline_draft"] = outline
-        new_state["outline_version"] += 1
-
-        # 防护:设置成功标志
-        new_state["outline_ready"] = True  # 明确标志:大纲已就绪
-
-        new_state["metrics_requirements"] = outline.global_metrics
-        new_state["metrics_pending"] = outline.global_metrics.copy()  # 待计算指标
-        new_state["messages"].append(
-            ("ai", f"✅ 大纲生成完成 v{new_state['outline_version']}:{outline.report_title}")
-        )
-
-        print(f"\n📝 大纲已生成:{outline.report_title}")
-        print(f"   章节数:{len(outline.sections)}")
-        print(f"   指标数:{len(outline.global_metrics)}")
-
-        # 新增:详细打印大纲内容
-        print("\n" + "=" * 70)
-        print("📋 详细大纲内容")
-        print("=" * 70)
-        print(json.dumps(outline.dict(), ensure_ascii=False, indent=2))
-        print("=" * 70)
-
-        # 关键修复:返回前清理状态
-        return convert_numpy_types(new_state)
-
-    except Exception as e:
-        print(f"⚠️ 大纲生成出错: {e},使用默认结构")
-
-        # 创建默认大纲
-        default_outline = ReportOutline(
-            report_title="默认交易分析报告",
-            sections=[
-                ReportSection(
-                    section_id="sec_1",
-                    title="交易概览",
-                    description="基础交易情况分析",
-                    metrics_needed=["total_transactions", "total_income", "total_expense"]
-                )
-            ],
-            global_metrics=[
-                MetricRequirement(
-                    metric_id="total_transactions",
-                    metric_name="总交易笔数",
-                    calculation_logic="count all transactions",
-                    required_fields=["txId"],
-                    dependencies=[]
-                ),
-                MetricRequirement(
-                    metric_id="total_income",
-                    metric_name="总收入",
-                    calculation_logic="sum of income transactions",
-                    required_fields=["txAmount", "txDirection"],
-                    dependencies=[]
-                )
-            ]
-        )
+        Returns:
+            知识元数据列表,包含id和description
+        """
+        try:
+            url = "http://localhost:8081/api/rules/getKnowledgeMeta"
+            headers = {
+                "Accept": "*/*",
+                "Accept-Encoding": "gzip, deflate, br",
+                "Connection": "keep-alive",
+                "Content-Type": "application/json",
+                "User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0"
+            }
+
+            response = requests.post(url, headers=headers, json={}, timeout=30)
+
+            if response.status_code == 200:
+                knowledge_meta = response.json()
+                if isinstance(knowledge_meta, list):
+                    print(f"✅ 成功获取 {len(knowledge_meta)} 个知识元数据")
+                    return knowledge_meta
+                else:
+                    print(f"⚠️ 知识元数据格式异常: {knowledge_meta}")
+                    return []
+            else:
+                print(f"❌ 获取知识元数据失败,状态码: {response.status_code}")
+                print(f"响应内容: {response.text}")
+                return []
+
+        except Exception as e:
+            print(f"❌ 获取知识元数据时发生错误: {str(e)}")
+            return []
+
+    def _load_available_metrics(self) -> List[Dict[str, str]]:
+        """
+        从知识库中提取可用的指标列表
 
-        new_state = state.copy()
-        new_state["outline_draft"] = default_outline
-        new_state["outline_version"] += 1
-        new_state["outline_ready"] = True  # 即使默认也标记为就绪
-        new_state["metrics_requirements"] = default_outline.global_metrics
-        new_state["messages"].append(
-            ("ai", f"⚠️ 使用默认大纲 v{new_state['outline_version']}")
-        )
+        Returns:
+            指标列表,包含name和description字段
+        """
+        knowledge_list = self._load_available_knowledge()
+
+        metrics = []
+        for knowledge in knowledge_list:
+            knowledge_id = knowledge.get("id", "")
+            description = knowledge.get("description", "")
+
+            # 从知识ID中提取指标名称
+            if knowledge_id.startswith("metric-"):
+                metric_name = knowledge_id.replace("metric-", "")
+
+                # 从描述中提取更简洁的指标描述
+                short_description = self._extract_metric_description(description)
+
+                metrics.append({
+                    "name": metric_name,
+                    "description": short_description,
+                    "type": self._classify_metric_type(metric_name, description)
+                })
+
+        print(f"✅ 从知识库中提取了 {len(metrics)} 个可用指标")
+        return metrics
+
+    def _extract_metric_description(self, full_description: str) -> str:
+        """从完整描述中提取简洁的指标描述"""
+        # 移除"因子概述:"等前缀
+        description = full_description.replace("因子概述:", "").strip()
+
+        # 如果描述太长,取前50个字符
+        if len(description) > 50:
+            description = description[:50] + "..."
+
+        return description
+
+    def _classify_metric_type(self, metric_name: str, description: str) -> str:
+        """根据指标名称和描述分类指标类型"""
+        if any(keyword in metric_name for keyword in ["收入", "支出", "金额", "交易笔数"]):
+            return "基础统计指标"
+        elif any(keyword in metric_name for keyword in ["时间范围", "时间跨度"]):
+            return "时间分析指标"
+        elif any(keyword in metric_name for keyword in ["比例", "占比", "构成"]):
+            return "结构分析指标"
+        elif any(keyword in metric_name for keyword in ["排名", "TOP", "前三"]):
+            return "专项分析指标"
+        elif any(keyword in metric_name for keyword in ["账户", "数量"]):
+            return "账户分析指标"
+        else:
+            return "其他指标"
+
+    def _match_metric_to_knowledge(self, metric_name: str, metric_description: str) -> str:
+        """
+        根据指标名称和描述匹配最合适的知识ID
 
-        # 新增:详细打印默认大纲内容
-        print("\n" + "=" * 70)
-        print("📋 默认大纲内容")
-        print("=" * 70)
-        print(json.dumps(default_outline.dict(), ensure_ascii=False, indent=2))
-        print("=" * 70)
+        Args:
+            metric_name: 指标名称
+            metric_description: 指标描述
 
-        # 关键修复:返回前清理状态
-        return convert_numpy_types(new_state)
+        Returns:
+            匹配的知识ID,如果没有找到则返回空字符串
+        """
+        if not self.available_knowledge:
+            return ""
+
+        # 精确匹配:直接用指标名称匹配知识ID
+        for knowledge in self.available_knowledge:
+            knowledge_id = knowledge.get("id", "")
+            # 去掉前缀匹配,如 "metric-分析账户数量" 匹配 "分析账户数量"
+            if knowledge_id.startswith("metric-") and knowledge_id.replace("metric-", "") == metric_name:
+                print(f"🔗 精确匹配指标 '{metric_name}' -> 知识ID: {knowledge_id}")
+                return knowledge_id
+
+        # 扩展匹配:匹配更多的农业相关指标
+        if "农业" in metric_name:
+            if "总经营收入" in metric_name:
+                # 匹配农业总经营收入
+                for knowledge in self.available_knowledge:
+                    if knowledge.get("id") == "metric-农业总经营收入":
+                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业总经营收入")
+                        return "metric-农业总经营收入"
+            if "总经营支出" in metric_name:
+                # 匹配农业总经营支出
+                for knowledge in self.available_knowledge:
+                    if knowledge.get("id") == "metric-农业总经营支出":
+                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业总经营支出")
+                        return "metric-农业总经营支出"
+            if "交易对手收入排名TOP3" in metric_name or "收入排名" in metric_name:
+                # 匹配农业交易对手收入TOP3
+                for knowledge in self.available_knowledge:
+                    if knowledge.get("id") == "metric-农业交易对手经营收入top3":
+                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业交易对手经营收入top3")
+                        return "metric-农业交易对手经营收入top3"
+            if "交易对手支出排名TOP3" in metric_name or "支出排名" in metric_name:
+                # 匹配农业交易对手支出TOP3
+                for knowledge in self.available_knowledge:
+                    if knowledge.get("id") == "metric-农业交易对手经营支出top3":
+                        print(f"🔗 扩展匹配指标 '{metric_name}' -> 知识ID: metric-农业交易对手经营支出top3")
+                        return "metric-农业交易对手经营支出top3"
+
+        # 如果精确匹配失败,使用关键词匹配
+        keywords = [metric_name]
+        if metric_description:
+            # 从描述中提取关键信息
+            desc_lower = metric_description.lower()
+            if "收入" in metric_name or "收入" in desc_lower:
+                keywords.extend(["收入", "总收入", "经营收入"])
+            if "支出" in metric_name or "支出" in desc_lower:
+                keywords.extend(["支出", "总支出", "经营支出"])
+            if "排名" in metric_name or "top" in desc_lower:
+                keywords.append("排名")
+            if "比例" in metric_name or "占比" in desc_lower:
+                keywords.append("比例")
+            if "时间范围" in metric_name:
+                keywords.append("时间范围")
+            if "账户" in metric_name:
+                keywords.append("账户")
+
+        best_match = None
+        best_score = 0
+
+        for knowledge in self.available_knowledge:
+            knowledge_id = knowledge.get("id", "")
+            knowledge_desc = knowledge.get("description", "").lower()
+
+            # 计算匹配分数
+            score = 0
+            for keyword in keywords:
+                if keyword.lower() in knowledge_desc:
+                    score += 1
+
+            # 行业匹配加分
+            if "黑色金属" in knowledge_desc and "黑色金属" in metric_name:
+                score += 2
+            if "农业" in knowledge_desc and "农业" in metric_name:
+                score += 2
+
+            # 直接名称匹配加分
+            if metric_name.lower() in knowledge_desc:
+                score += 3
+
+            if score > best_score:
+                best_score = score
+                best_match = knowledge_id
+
+        if best_match and best_score > 0:
+            print(f"🔗 关键词匹配指标 '{metric_name}' -> 知识ID: {best_match} (匹配分数: {best_score})")
+            return best_match
+
+        print(f"❌ 指标 '{metric_name}' 未找到匹配的知识ID")
+        return ""
+
+
+async def generate_report_outline(question: str, industry: str, sample_data: List[Dict[str, Any]], api_key: str, max_retries: int = 3, retry_delay: float = 2.0) -> ReportOutline:
+    """
+    生成报告大纲的主函数,支持重试机制
+
+    Args:
+        question: 用户查询问题
+        industry: 行业
+        sample_data: 数据样本
+        api_key: API密钥
+        max_retries: 最大重试次数,默认3次
+        retry_delay: 重试间隔时间(秒),默认2秒
+
+    Returns:
+        生成的报告大纲
+    """
+    import asyncio
+    import time
+
+    agent = OutlineGeneratorAgent(api_key)
+
+    print(f"📝 开始生成报告大纲(最多重试 {max_retries} 次)...")
+
+    for attempt in range(max_retries):
+        try:
+            print(f"   尝试 {attempt + 1}/{max_retries}...")
+            start_time = time.time()
+
+            outline = await agent.generate_outline(question, industry, sample_data)
+
+            elapsed_time = time.time() - start_time
+            print(".2f")
+            print("\n📝 大纲生成成功:")
+            print(f"   标题:{outline.report_title}")
+            print(f"   章节数:{len(outline.sections)}")
+            print(f"   指标数:{len(outline.global_metrics)}")
+
+            return outline
+
+        except Exception as e:
+            elapsed_time = time.time() - start_time if 'start_time' in locals() else 0
+            print(".2f")
+            print(f"   错误详情: {str(e)}")
+
+            # 如果不是最后一次尝试,等待后重试
+            if attempt < max_retries - 1:
+                print(f"   ⏳ {retry_delay} 秒后进行第 {attempt + 2} 次重试...")
+                await asyncio.sleep(retry_delay)
+                # 增加重试间隔,避免频繁调用
+                retry_delay = min(retry_delay * 1.5, 10.0)  # 最多等待10秒
+            else:
+                print(f"   ❌ 已达到最大重试次数 ({max_retries}),使用默认结构")
+
+    # 所有重试都失败后,使用默认结构
+    print("⚠️ 所有重试均失败,使用默认大纲结构")
+
+    # 获取实际可用的指标来构建默认大纲
+    available_metrics = self._load_available_metrics()
+
+    # 选择一些基础指标作为默认值
+    default_metric_ids = []
+    default_global_metrics = []
+
+    # 优先选择基础统计指标
+    base_metrics = [m for m in available_metrics if m.get('type') == '基础统计指标']
+    if base_metrics:
+        # 选择前3个基础指标
+        for metric in base_metrics[:3]:
+            metric_name = metric['name']
+            knowledge_id = f"metric-{metric_name}"
+            default_metric_ids.append(knowledge_id)
+            default_global_metrics.append(MetricRequirement(
+                metric_id=knowledge_id,
+                metric_name=metric_name,
+                calculation_logic=f"使用规则引擎计算{metric_name}: {metric.get('description', '')}",
+                required_fields=["transactions"],
+                dependencies=[]
+            ))
+
+    # 如果基础指标不够,补充其他类型的指标
+    if len(default_metric_ids) < 3:
+        other_metrics = [m for m in available_metrics if m.get('type') != '基础统计指标']
+        for metric in other_metrics[:3-len(default_metric_ids)]:
+            metric_name = metric['name']
+            knowledge_id = f"metric-{metric_name}"
+            default_metric_ids.append(knowledge_id)
+            default_global_metrics.append(MetricRequirement(
+                metric_id=knowledge_id,
+                metric_name=metric_name,
+                calculation_logic=f"使用规则引擎计算{metric_name}: {metric.get('description', '')}",
+                required_fields=["transactions"],
+                dependencies=[]
+            ))
+
+    # 创建使用实际指标的默认大纲
+    default_outline = ReportOutline(
+        report_title="默认交易分析报告",
+        sections=[
+            ReportSection(
+                section_id="sec_1",
+                title="交易概览",
+                description="基础交易情况分析",
+                metrics_needed=default_metric_ids
+            )
+        ],
+        global_metrics=default_global_metrics
+    )
+    return default_outline

+ 359 - 124
llmops/agents/planning_agent.py

@@ -1,17 +1,46 @@
+"""
+规划Agent (Planning Agent)
+=========================
+
+此Agent负责分析当前状态并做出智能决策,决定下一步行动。
+
+核心功能:
+1. 状态评估:分析大纲、指标计算进度和完整性
+2. 决策制定:决定生成大纲、计算指标、完成报告或澄清需求
+3. 优先级排序:确定最关键的任务和指标
+4. 流程控制:管理整个报告生成工作流的执行顺序
+
+决策逻辑:
+- 大纲为空 → 生成大纲
+- 指标覆盖率 < 80% → 计算指标
+- 指标覆盖率 ≥ 80% → 生成报告
+- 需求模糊 → 澄清需求
+
+技术实现:
+- 使用LangChain和结构化输出
+- 支持异步处理
+- 智能状态评估
+- 灵活的决策机制
+
+作者: Big Agent Team
+版本: 1.0.0
+创建时间: 2024-12-20
+"""
+
 from typing import List, Dict, Optional, Any, Union
 from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
 from pydantic import BaseModel, Field
 from langchain_openai import ChatOpenAI
 import json
-
-from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types
-from llmops.agents.datadev.llm import get_llm
+import os
+from datetime import datetime
 
 
+# 数据模型定义
 class ActionItem(BaseModel):
     """动作项定义"""
     action: str = Field(description="动作名称")
-    parameters: Optional[Dict[str, Any]] = Field(default_factory=dict)
+    parameters: Optional[Dict[str, Any]] = Field(default_factory=dict, description="动作参数")
 
 
 class ClarificationRequest(BaseModel):
@@ -20,27 +49,30 @@ class ClarificationRequest(BaseModel):
     missing_fields: List[str] = Field(default_factory=list, description="缺少的字段或信息")
 
 
-class PlanningOutput(BaseModel):
-    """规划决策输出 - 支持灵活格式"""
+class PlanningDecision(BaseModel):
+    """规划决策输出"""
     decision: str = Field(
-        description="决策类型: generate_outline, compute_metrics, finalize, clarify"
+        description="决策类型: generate_outline, compute_metrics, finalize_report, clarify_requirements"
     )
     reasoning: str = Field(description="详细推理过程")
     next_actions: List[Union[str, ActionItem]] = Field(
         default_factory=list,
         description="下一步动作列表"
     )
-    # 关键修复:明确传递待计算指标ID列表
     metrics_to_compute: List[str] = Field(
         default_factory=list,
         description="待计算指标ID列表(如 ['total_income', 'avg_balance'])"
     )
+    priority_metrics: List[str] = Field(
+        default_factory=list,
+        description="优先级高的指标ID"
+    )
     additional_requirements: Optional[
         Union[Dict[str, Any], List[Any], ClarificationRequest]
     ] = Field(default=None, description="额外需求或澄清信息")
 
 
-def normalize_additional_requirements(req: Any) -> Optional[Dict[str, Any]]:
+def normalize_requirements(req: Any) -> Optional[Dict[str, Any]]:
     """
     规范化 additional_requirements
     将列表转换为字典格式
@@ -61,160 +93,363 @@ def normalize_additional_requirements(req: Any) -> Optional[Dict[str, Any]]:
     return {"raw": str(req)}
 
 
-def create_planning_agent(llm, state: AgentState):
-    """创建规划智能体(修复版:移除JSON示例,避免变量冲突)"""
-    prompt = ChatPromptTemplate.from_messages([
-        ("system", """你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
+class PlanningAgent:
+    """规划智能体:负责状态分析和决策制定"""
+
+    def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
+        """
+        初始化规划Agent
+
+        Args:
+            api_key: DeepSeek API密钥
+            base_url: DeepSeek API基础URL
+        """
+        self.llm = ChatOpenAI(
+            model="deepseek-chat",
+            api_key=api_key,
+            base_url=base_url,
+            temperature=0.1
+        )
+
+        # 初始化API调用跟踪
+        self.api_calls = []
+
+    def create_planning_prompt(self) -> ChatPromptTemplate:
+        """创建规划提示模板"""
+        return ChatPromptTemplate.from_messages([
+            ("system", """你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
 
 ### 决策选项(四选一)
 1. generate_outline:大纲未生成或大纲无效
 2. compute_metrics:大纲已生成但指标未完成(覆盖率<80%)
-3. finalize:指标覆盖率≥80%,信息充足
-4. clarify:用户需求模糊,缺少关键信息
+3. finalize_report:指标覆盖率≥80%,信息充足
+4. clarify_requirements:用户需求模糊,缺少关键信息
 
 ### 决策规则(按顺序检查)
 1. 检查 outline_draft 是否为空 → 空则选择 generate_outline
 2. 检查 metrics_requirements 是否为空 → 空则选择 generate_outline
-3. 计算指标覆盖率 = 已计算指标 / 总需求指标
-   - 覆盖率 < 0.8 → 选择 compute_metrics
-   - 覆盖率 ≥ 0.8 → 选择 finalize
-4. 如果无法理解需求 → 选择 clarify
+3. 检查是否有待计算指标 → 有则选择 compute_metrics
+4. 所有指标都已计算完成 → 选择 finalize_report
+5. 如果无法理解需求 → 选择 clarify_requirements
 
 ### 重要原则
 - 大纲草稿已存在时,不要重复生成大纲
-- 决策为 compute_metrics 时,必须提供具体的指标ID列表
+- 决策为 compute_metrics 时,必须从状态信息中的"有效待计算指标ID列表"中选择
 - 确保 metrics_to_compute 是字符串数组格式
+- 确保指标ID与大纲中的global_metrics.metric_id完全一致
+- 从状态信息中的"有效待计算指标ID列表"中提取metric_id作为metrics_to_compute的值
+- 计算失败的指标可以重试最多3次
+- 绝对不要自己生成新的指标ID,必须严格使用状态信息中提供的已有指标ID
+- 如果状态信息中没有可用的指标ID,不要生成compute_metrics决策
 
 ### 输出字段说明
 - decision: 决策字符串
 - reasoning: 决策原因说明
 - next_actions: 动作列表(可选)
-- metrics_to_compute: 待计算指标ID列表(决策为compute_metrics时必须提供)
+- metrics_to_compute: 待计算指标ID列表,必须从状态信息中的可用指标ID中选择(决策为compute_metrics时必须提供)
+- priority_metrics: 优先级指标列表(前2-3个最重要的指标)
 - additional_requirements: 额外需求(可选)
 
 必须输出有效的JSON格式!"""),
 
-    MessagesPlaceholder("messages"),
+            MessagesPlaceholder("messages"),
+
+            ("user", "报告需求:{question}\n\n请输出决策结果。")
+        ])
+
+    async def make_decision(self, question: str, industry: str, current_state: Dict[str, Any]) -> PlanningDecision:
+        """
+        根据当前状态做出规划决策
+
+        Args:
+            question: 用户查询
+            industry: 行业
+            current_state: 当前状态信息
+
+        Returns:
+            规划决策结果
+        """
+        planner = self.create_planning_prompt() | self.llm
+
+        # 构建状态评估上下文
+        status_info = self._build_status_context(current_state)
+
+        # 记录大模型输入
+        print("========================================")
+        print("[AGENT] PlanningAgent (规划Agent)")
+        print("[MODEL_INPUT] PlanningAgent:")
+        print(f"[CONTEXT] 基于当前状态做出规划决策")
+        print(f"Question: {question}")
+        print(f"Status info: {status_info}")
+        print("========================================")
+
+        # 执行规划
+        start_time = datetime.now()
+        response = await planner.ainvoke({
+            "question": question,
+            "industry": industry,
+            "messages": [("system", status_info)]
+        })
+        end_time = datetime.now()
+
+        # 解析JSON响应
+        try:
+            # 从响应中提取JSON内容
+            content = response.content if hasattr(response, 'content') else str(response)
+            # 尝试找到JSON部分
+            json_start = content.find('{')
+            json_end = content.rfind('}') + 1
+            if json_start >= 0 and json_end > json_start:
+                json_str = content[json_start:json_end]
+                decision_data = json.loads(json_str)
+
+                # 预处理 additional_requirements 字段
+                if "additional_requirements" in decision_data:
+                    req = decision_data["additional_requirements"]
+                    if isinstance(req, str):
+                        # 如果是字符串,尝试将其转换为合适的格式
+                        if req.strip():
+                            # 将字符串包装为字典格式
+                            decision_data["additional_requirements"] = {"raw_content": req}
+                        else:
+                            # 空字符串设为 None
+                            decision_data["additional_requirements"] = None
+                    elif isinstance(req, list):
+                        # 如果是列表,转换为字典格式
+                        decision_data["additional_requirements"] = {
+                            "questions": [str(item) for item in req],
+                            "missing_fields": []
+                        }
+                    # 如果已经是 dict 或其他允许的类型,保持不变
+
+                decision = PlanningDecision(**decision_data)
+
+                # 验证决策的合理性
+                if decision.decision == "compute_metrics":
+                    if not decision.metrics_to_compute:
+                        raise ValueError("AI决策缺少具体的指标ID")
+                    # 如果AI生成的指标ID明显是错误的(比如metric_001),使用默认逻辑
+                    if any(mid.startswith("metric_") and mid.replace("metric_", "").isdigit()
+                          for mid in decision.metrics_to_compute):
+                        raise ValueError("AI生成的指标ID格式不正确")
+
+            else:
+                raise ValueError("No JSON found in response")
+        except Exception as e:
+            print(f"解析规划决策响应失败: {e},使用默认决策")
+            # 返回默认决策
+            decision = self._get_default_decision(current_state)
+
+        # 记录API调用结果
+        content = response.content if hasattr(response, 'content') else str(response)
+        call_id = f"api_mll_规划决策_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+        api_call_info = {
+            "call_id": call_id,
+            "timestamp": end_time.isoformat(),
+            "agent": "PlanningAgent",
+            "model": "deepseek-chat",
+            "request": {
+                "question": question,
+                "status_info": status_info,
+                "start_time": start_time.isoformat()
+            },
+            "response": {
+                "content": content,
+                "decision": decision.dict() if hasattr(decision, 'dict') else decision,
+                "end_time": end_time.isoformat(),
+                "duration": (end_time - start_time).total_seconds()
+            },
+            "success": True
+        }
+        self.api_calls.append(api_call_info)
+
+        # 保存API结果到文件
+        api_results_dir = "api_results"
+        os.makedirs(api_results_dir, exist_ok=True)
+        filename = f"{call_id}.json"
+        filepath = os.path.join(api_results_dir, filename)
+
+        try:
+            with open(filepath, 'w', encoding='utf-8') as f:
+                json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+            print(f"[API_RESULT] 保存API结果文件: {filepath}")
+        except Exception as e:
+            print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+        # 记录大模型输出
+        print(f"[MODEL_OUTPUT] PlanningAgent: {json.dumps(decision.dict() if hasattr(decision, 'dict') else decision, ensure_ascii=False)}")
+        print("========================================")
+
+        return decision
+
+    def _build_status_context(self, state: Dict[str, Any]) -> str:
+        """构建状态评估上下文"""
+        required_count = len(state.get("metrics_requirements", []))
+        computed_count = len(state.get("computed_metrics", {}))
+        coverage = computed_count / required_count if required_count > 0 else 0
+
+        # 计算失败统计
+        failed_attempts = state.get("failed_metric_attempts", {})
+        pending_ids = state.get("pending_metric_ids", [])
+
+        # 过滤掉失败次数过多的指标
+        max_retry = 3
+        filtered_pending_ids = [
+            mid for mid in pending_ids
+            if failed_attempts.get(mid, 0) < max_retry
+        ]
+
+        # 获取可用的指标ID
+        available_metric_ids = []
+        if state.get('outline_draft') and state.get('outline_draft').get('global_metrics'):
+            available_metric_ids = [m.get('metric_id', '') for m in state['outline_draft']['global_metrics']]
+            available_metric_ids = [mid for mid in available_metric_ids if mid]  # 过滤空值
+
+        return f"""当前状态评估:
+- 规划步骤: {state.get('planning_step', 0)}
+- 大纲版本: {state.get('outline_version', 0)}
+- 大纲草稿存在: {state.get('outline_draft') is not None}
+- 指标需求总数: {required_count}
+- 已计算指标数: {computed_count}
+- 指标覆盖率: {coverage:.2%}
+- 待计算指标数: {len(pending_ids)}
+- 有效待计算指标ID列表: {filtered_pending_ids}
+- 可用指标ID列表: {available_metric_ids}
+- 失败尝试记录: {failed_attempts}
+"""
 
-    ("user", "报告需求:{question}\n\n请输出决策结果。")
-    ])
 
-    return prompt | llm.with_structured_output(PlanningOutput)
+def analyze_current_state(state: Dict[str, Any]) -> Dict[str, Any]:
+    """
+    分析当前状态,返回关键信息
 
+    Args:
+        state: 当前状态
 
-async def planning_node(state: AgentState) -> AgentState:
-    """规划节点:正确识别待计算指标并传递"""
-    llm = get_llm()
-    planner = create_planning_agent(llm, state)
+    Returns:
+        状态分析结果
+    """
+    required_metrics = state.get("metrics_requirements", [])
+    computed_metrics = state.get("computed_metrics", {})
 
-    # 构建完整的状态评估上下文
-    required_count = len(state["metrics_requirements"])
-    computed_count = len(state["computed_metrics"])
+    # 计算覆盖率
+    required_count = len(required_metrics)
+    computed_count = len(computed_metrics)
     coverage = computed_count / required_count if required_count > 0 else 0
 
-    # 新增:跟踪失败次数,避免无限循环
-    failed_attempts = state.get("failed_metric_attempts", {})
-    pending_ids = state.get("pending_metric_ids", [])
+    # 找出未计算的指标
+    computed_ids = set(computed_metrics.keys())
+    pending_metrics = [
+        m for m in required_metrics
+        if m.metric_id not in computed_ids
+    ]
 
-    # 过滤掉失败次数过多的指标
+    # 检查失败次数
+    failed_attempts = state.get("failed_metric_attempts", {})
     max_retry = 3
-    filtered_pending_ids = [
-        mid for mid in pending_ids
-        if failed_attempts.get(mid, 0) < max_retry
+    valid_pending_metrics = [
+        m for m in pending_metrics
+        if failed_attempts.get(m.metric_id, 0) < max_retry
     ]
 
-    status_snapshot = f"""当前状态评估:
-- 规划步骤: {state['planning_step']}
-- 大纲版本: {state['outline_version']}
-- 大纲草稿存在: {state['outline_draft'] is not None}
-- 指标需求总数: {required_count}
-- 已计算指标数: {computed_count}
-- 指标覆盖率: {coverage:.2%}
-- 待计算指标数: {len(pending_ids)}
-- 有效待计算指标数: {len(filtered_pending_ids)}
-- 失败尝试记录: {failed_attempts}
+    return {
+        "has_outline": state.get("outline_draft") is not None,
+        "required_count": required_count,
+        "computed_count": computed_count,
+        "coverage": coverage,
+        "pending_metrics": pending_metrics,
+        "valid_pending_metrics": valid_pending_metrics,
+        "pending_ids": [m.metric_id for m in pending_metrics],
+        "valid_pending_ids": [m.metric_id for m in valid_pending_metrics],
+        "planning_step": state.get("planning_step", 0),
+        "outline_version": state.get("outline_version", 0)
+    }
+
+
+async def plan_next_action(question: str, industry: str, current_state: Dict[str, Any], api_key: str) -> PlanningDecision:
+    """
+    规划下一步行动的主函数
 
-建议下一步: {"计算指标" if coverage < 0.8 else "生成报告"}"""
+    Args:
+        question: 用户查询
+        current_state: 当前状态
+        api_key: API密钥
 
-    # 执行规划
-    result = await planner.ainvoke({
-        "question": state["question"],
-        "messages": [("system", status_snapshot)]
-    })
+    Returns:
+        规划决策结果
+    """
+    agent = PlanningAgent(api_key)
 
-    # 规范化结果
-    normalized_req = normalize_additional_requirements(result.additional_requirements)
+    try:
+        decision = await agent.make_decision(question, industry, current_state)
 
-    # 找出所有未计算的指标
-    computed_ids = set(state["computed_metrics"].keys())
-    required_metrics = state["metrics_requirements"]
+        print(f"\n🧠 规划决策:{decision.decision}")
+        print(f"   推理:{decision.reasoning[:100]}...")
 
-    pending_metrics = [
-        m for m in required_metrics
-        if m.metric_id not in computed_ids
-    ]
+        if decision.metrics_to_compute:
+            print(f"   待计算指标:{decision.metrics_to_compute}")
 
-    # 关键:使用 LLM 返回的指标ID,如果没有则使用全部待计算指标
-    if result.metrics_to_compute:
-        pending_ids = result.metrics_to_compute
-        valid_ids = [m.metric_id for m in pending_metrics]
-        pending_metrics = [m for m in pending_metrics if m.metric_id in pending_ids and m.metric_id in valid_ids]
+        return decision
 
-    # 更新状态
-    new_state = state.copy()
-    new_state["plan_history"].append(
-        f"Step {new_state['planning_step']}: {result.decision}"
-    )
-    new_state["planning_step"] += 1
-    new_state["additional_requirements"] = normalized_req
-
-    # 关键:保存待计算指标ID列表
-    if pending_metrics:
-        pending_ids = [m.metric_id for m in pending_metrics]
-        new_state["pending_metric_ids"] = pending_ids
-        new_state["metrics_to_compute"] = pending_metrics  # 保存完整对象
-
-    # 设置路由标志
-    if result.decision == "generate_outline":
-        new_state["messages"].append(
-            ("ai", f"📋 规划决策:生成大纲 (v{new_state['outline_version'] + 1})")
-        )
-        new_state["next_route"] = "outline_generator"
-    elif result.decision == "compute_metrics":
-        # 修复:确保显示正确的数量
-        if not pending_metrics:
-            # 如果没有待计算指标但有需求,则计算所有未完成的
-            computed_ids = set(state["computed_metrics"].keys())
-            pending_metrics = [m for m in required_metrics if m.metric_id not in computed_ids]
-
-        # 新增:如果有效待计算指标为空但还有指标未计算,说明都失败了太多次
-        if not filtered_pending_ids and pending_ids:
-            new_state["messages"].append(
-                ("ai", f"⚠️ 剩余 {len(pending_ids)} 个指标已多次计算失败,将跳过这些指标直接生成报告")
-            )
-            new_state["next_route"] = "report_compiler"
-            # 关键修复:返回前清理状态
-            return convert_numpy_types(new_state)
+    except Exception as e:
+        print(f"⚠️ 规划决策出错: {e},使用默认决策")
 
-        new_state["messages"].append(
-            ("ai", f"🧮 规划决策:计算 {len(pending_metrics)} 个指标 ({[m.metric_id for m in pending_metrics]})")
-        )
-        new_state["next_route"] = "metrics_calculator"
-    elif result.decision == "finalize":
-        new_state["is_complete"] = True
-        new_state["messages"].append(
-            ("ai", f"✅ 规划决策:信息充足,生成最终报告(覆盖率 {coverage:.2%})")
-        )
-        new_state["next_route"] = "report_compiler"
-    elif result.decision == "clarify":
-        questions = []
-        if normalized_req and "questions" in normalized_req:
-            questions = normalized_req["questions"]
-
-        new_state["messages"].append(
-            ("ai", f"❓ 需要澄清:{';'.join(questions) if questions else '请提供更详细的需求'}")
+        # 直接返回最基本的默认决策,避免复杂的默认决策逻辑
+        return PlanningDecision(
+            decision="finalize_report",
+            reasoning="规划决策失败,使用默认的报告生成决策",
+            next_actions=["生成最终报告"],
+            metrics_to_compute=[],
+            priority_metrics=[]
         )
-        new_state["next_route"] = "clarify_node"
 
-    # 关键修复:返回前清理状态
-    return convert_numpy_types(new_state)
+    def _get_default_decision(self, current_state: Dict[str, Any]) -> PlanningDecision:
+        """
+        基于状态分析的默认决策逻辑
+
+        Args:
+            current_state: 当前状态信息
+
+        Returns:
+            默认规划决策
+        """
+        state_analysis = analyze_current_state(current_state)
+
+        if not state_analysis["has_outline"]:
+            default_decision = PlanningDecision(
+                decision="generate_outline",
+                reasoning="大纲不存在,需要先生成大纲",
+                next_actions=["生成报告大纲"],
+                metrics_to_compute=[],
+                priority_metrics=[]
+            )
+        elif state_analysis["coverage"] < 0.8 and state_analysis["valid_pending_metrics"]:
+            # 计算指标 - 使用实际的指标ID
+            metrics_to_compute = state_analysis["valid_pending_ids"][:5]  # 最多计算5个
+            default_decision = PlanningDecision(
+                decision="compute_metrics",
+                reasoning=f"指标覆盖率{state_analysis['coverage']:.1%},需要计算更多指标",
+                next_actions=[f"计算指标: {', '.join(metrics_to_compute)}"],
+                metrics_to_compute=metrics_to_compute,
+                priority_metrics=metrics_to_compute[:2]  # 前2个为优先级
+            )
+        elif state_analysis["valid_pending_ids"]:
+            # 还有指标但都失败了,生成报告
+            default_decision = PlanningDecision(
+                decision="finalize_report",
+                reasoning="部分指标计算失败,但已有足够信息生成报告",
+                next_actions=["生成最终报告"],
+                metrics_to_compute=[],
+                priority_metrics=[]
+            )
+        else:
+            # 信息充足,生成报告
+            default_decision = PlanningDecision(
+                decision="finalize_report",
+                reasoning="所有必要指标已计算完成",
+                next_actions=["生成最终报告"],
+                metrics_to_compute=[],
+                priority_metrics=[]
+            )
+
+        return default_decision

+ 88 - 0
llmops/config.py

@@ -0,0 +1,88 @@
+"""
+Big Agent 配置文件
+================
+
+此配置文件包含了Big Agent系统的所有核心配置信息,包括:
+- API密钥配置(DeepSeek等)
+- 工作流参数设置
+- 路径配置
+- 环境变量加载
+
+主要功能:
+1. 从环境变量或.env文件加载API密钥
+2. 定义系统工作流的基本参数
+3. 配置项目文件路径
+4. 验证配置完整性
+
+作者: Big Agent Team
+版本: 1.0.0
+更新时间: 2024-12-18
+"""
+
+import os
+from dotenv import load_dotenv
+
+# ============================================================================
+# 环境变量加载
+# ============================================================================
+# 从.env文件加载环境变量,确保敏感信息不被硬编码到源码中
+load_dotenv()
+
+# ============================================================================
+# DeepSeek API 配置
+# ============================================================================
+# API密钥配置 - 优先从环境变量读取,支持通过.env文件配置
+DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
+
+# API基础URL配置 - 默认使用DeepSeek官方API地址
+DEEPSEEK_BASE_URL = os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
+
+# ============================================================================
+# 项目路径配置
+# ============================================================================
+# 定义项目中各个功能模块的数据存储路径
+PATHS = {
+    "json_configs": "json_files",      # JSON配置文件目录 - 存储指标计算配置
+    "knowledge_base": "knowledge_base", # 知识库目录 - 存储生成的知识文档
+    "logs": "logs",                   # 日志目录 - 存储系统运行日志
+    "api_results": "api_results"      # API结果目录 - 存储所有API调用结果
+}
+
+# ============================================================================
+# 目录初始化
+# ============================================================================
+# 确保所有必要的目录都存在,避免文件操作时的路径错误
+for path in PATHS.values():
+    os.makedirs(path, exist_ok=True)
+
+# ============================================================================
+# 配置验证函数
+# ============================================================================
+def validate_config():
+    """
+    验证系统配置是否正确
+
+    此函数检查所有必要的配置项是否已正确设置,包括:
+    - API密钥是否存在
+    - API URL格式是否正确
+
+    Returns:
+        list: 配置错误列表,如果为空则表示配置正确
+    """
+    errors = []
+
+    # 检查API密钥
+    if not DEEPSEEK_API_KEY:
+        errors.append("DEEPSEEK_API_KEY环境变量未设置")
+
+    # 检查API URL格式
+    if not DEEPSEEK_BASE_URL.startswith("https://"):
+        errors.append("DEEPSEEK_BASE_URL必须是HTTPS URL")
+
+    return errors
+
+# ============================================================================
+# 配置状态标识
+# ============================================================================
+# 全局配置状态标识,True表示配置正确,False表示存在配置问题
+CONFIG_VALID = len(validate_config()) == 0

+ 328 - 0
llmops/workflow_state.py

@@ -0,0 +1,328 @@
+"""
+整合的工作流状态定义
+===================
+
+此文件定义了整合了多个Agent的工作流状态,兼容现有的Big Agent状态管理和新增的报告生成Agent状态。
+
+状态层次:
+1. 输入层:用户查询和数据
+2. 意图层:意图识别结果
+3. 规划层:规划决策和大纲生成
+4. 计算层:指标计算结果
+5. 结果层:最终报告生成
+6. 对话层:消息历史和错误处理
+
+兼容性:
+- 兼容现有的Big Agent WorkflowState
+- 整合来自other_agents的AgentState
+- 支持扩展新的Agent状态需求
+
+作者: Big Agent Team
+版本: 1.0.0
+创建时间: 2024-12-20
+"""
+
+from typing import TypedDict, List, Dict, Any, Optional
+from datetime import datetime
+from langchain_core.messages import BaseMessage
+from pydantic import BaseModel, Field
+
+
+# ============= 数据模型 =============
+
+class MetricRequirement(BaseModel):
+    """指标需求定义"""
+    metric_id: str = Field(description="指标唯一标识,如 'total_income_jan'")
+    metric_name: str = Field(description="指标中文名称")
+    calculation_logic: str = Field(description="计算逻辑描述")
+    required_fields: List[str] = Field(description="所需字段")
+    dependencies: List[str] = Field(default_factory=list, description="依赖的其他指标ID")
+
+
+class ReportSection(BaseModel):
+    """报告大纲章节"""
+    section_id: str = Field(description="章节ID")
+    title: str = Field(description="章节标题")
+    description: str = Field(description="章节内容要求")
+    metrics_needed: List[str] = Field(description="所需指标ID列表")
+
+
+class ReportOutline(BaseModel):
+    """完整报告大纲"""
+    report_title: str = Field(description="报告标题")
+    sections: List[ReportSection] = Field(description="章节列表")
+    global_metrics: List[MetricRequirement] = Field(description="全局指标列表")
+
+
+# ============= 序列化工具函数 =============
+
+def convert_numpy_types(obj: Any) -> Any:
+    """
+    递归转换所有numpy类型为Python原生类型
+    确保所有数据可序列化
+    """
+    if isinstance(obj, dict):
+        return {str(k): convert_numpy_types(v) for k, v in obj.items()}
+    elif isinstance(obj, list):
+        return [convert_numpy_types(item) for item in obj]
+    elif isinstance(obj, tuple):
+        return tuple(convert_numpy_types(item) for item in obj)
+    elif isinstance(obj, set):
+        return {convert_numpy_types(item) for item in obj}
+    elif hasattr(obj, 'item') and hasattr(obj, 'dtype'):  # numpy scalar
+        return convert_numpy_types(obj.item())
+    else:
+        return obj
+
+
+# ============= 整合的工作流状态定义 =============
+
+class IntegratedWorkflowState(TypedDict):
+    """整合的工作流状态定义 - 兼容多个Agent系统"""
+
+    # === 基础输入层 (兼容Big Agent) ===
+    user_input: str
+    question: str  # 别名,兼容报告生成Agent
+
+    industry: str  # 行业
+
+    # === 数据层 ===
+    data_set: List[Dict[str, Any]]  # 报告生成Agent的数据格式
+    transactions_df: Optional[Any]  # 可选的数据框格式
+
+    # === 意图识别层 (Big Agent原有) ===
+    intent_result: Optional[Dict[str, Any]]
+
+    # === 规划和大纲层 (新增) ===
+    planning_step: int
+    plan_history: List[str]
+    outline_draft: Optional[ReportOutline]
+    outline_version: int
+    outline_ready: bool
+
+    # === 指标计算层 ===
+    metrics_requirements: List[MetricRequirement]  # 报告生成Agent格式
+    computed_metrics: Dict[str, Any]  # 计算结果
+    metrics_cache: Dict[str, Any]  # 缓存
+    pending_metric_ids: List[str]  # 待计算指标ID
+    failed_metric_attempts: Dict[str, int]  # 失败统计
+    calculation_results: Optional[Dict[str, Any]]  # Big Agent格式的计算结果
+
+    # === 结果层 ===
+    report_draft: Dict[str, Any]  # 报告草稿
+    knowledge_result: Optional[Dict[str, Any]]  # Big Agent知识沉淀结果
+    is_complete: bool
+    completeness_score: float
+    answer: Optional[str]  # 最终答案
+
+    # === 对话和消息层 ===
+    messages: List[Dict[str, Any]]  # Big Agent消息格式
+    current_node: str
+    session_id: str
+    next_route: str
+
+    # === 错误处理层 ===
+    errors: List[str]
+    last_decision: str
+
+    # === 时间跟踪层 ===
+    start_time: str
+    end_time: Optional[str]
+    api_result: Dict[str, Any]  # 存储所有API调用结果
+
+
+# ============= 状态创建和初始化函数 =============
+
+def create_initial_integrated_state(question: str, industry: str, data: List[Dict[str, Any]], session_id: str = None) -> IntegratedWorkflowState:
+    """
+    创建初始的整合状态
+
+    Args:
+        question: 用户查询
+        data: 数据集
+        session_id: 会话ID
+
+    Returns:
+        初始化后的状态
+    """
+    current_time = datetime.now().isoformat()
+    session = session_id or f"session_{int(datetime.now().timestamp())}"
+
+    return {
+        # 基础输入
+        "user_input": question,
+        "question": question,
+        "industry": industry,
+
+        # 数据层
+        "data_set": convert_numpy_types(data),
+        "transactions_df": None,
+
+        # 意图识别层
+        "intent_result": None,
+
+        # 规划和大纲层
+        "planning_step": 0,
+        "plan_history": [],
+        "outline_draft": None,
+        "outline_version": 0,
+        "outline_ready": False,
+
+        # 指标计算层
+        "metrics_requirements": [],
+        "computed_metrics": {},
+        "metrics_cache": {},
+        "pending_metric_ids": [],
+        "failed_metric_attempts": {},
+        "calculation_results": None,
+
+        # 结果层
+        "report_draft": {},
+        "knowledge_result": None,
+        "is_complete": False,
+        "completeness_score": 0.0,
+        "answer": None,
+
+        # 对话和消息层
+        "messages": [{
+            "role": "user",
+            "content": question,
+            "timestamp": current_time
+        }],
+        "current_node": "start",
+        "session_id": session,
+        "next_route": "planning_node",
+
+        # 错误处理层
+        "errors": [],
+        "last_decision": "init",
+
+        # 时间跟踪层
+        "start_time": current_time,
+        "end_time": None,
+        "api_result": {},  # 存储所有API调用结果
+
+        # 计算模式配置层
+        "use_rules_engine_only": False,
+        "use_traditional_engine_only": False
+    }
+
+
+def is_state_ready_for_calculation(state: IntegratedWorkflowState) -> bool:
+    """
+    检查状态是否准备好进行指标计算
+
+    Args:
+        state: 当前状态
+
+    Returns:
+        是否准备好
+    """
+    return (
+        state.get("outline_draft") is not None and
+        len(state.get("metrics_requirements", [])) > 0 and
+        len(state.get("pending_metric_ids", [])) > 0
+    )
+
+
+def get_calculation_progress(state: IntegratedWorkflowState) -> Dict[str, Any]:
+    """
+    获取指标计算进度信息
+
+    Args:
+        state: 当前状态
+
+    Returns:
+        进度信息
+    """
+    required = len(state.get("metrics_requirements", []))
+    computed = len(state.get("computed_metrics", {}))
+    pending = len(state.get("pending_metric_ids", []))
+
+    return {
+        "required_count": required,
+        "computed_count": computed,
+        "pending_count": pending,
+        "coverage_rate": computed / required if required > 0 else 0,
+        "is_complete": computed >= required * 0.8  # 80%覆盖率视为完成
+    }
+
+
+def update_state_with_outline_generation(state: IntegratedWorkflowState, outline: ReportOutline) -> IntegratedWorkflowState:
+    """
+    使用大纲生成结果更新状态
+
+    Args:
+        state: 当前状态
+        outline: 生成的大纲
+
+    Returns:
+        更新后的状态
+    """
+    new_state = state.copy()
+    new_state["outline_draft"] = outline
+    new_state["outline_version"] += 1
+    new_state["outline_ready"] = True
+    new_state["metrics_requirements"] = outline.global_metrics
+    new_state["pending_metric_ids"] = [m.metric_id for m in outline.global_metrics]
+
+    # 添加消息
+    new_state["messages"].append({
+        "role": "assistant",
+        "content": f"✅ 大纲生成完成 v{new_state['outline_version']}:{outline.report_title}",
+        "timestamp": datetime.now().isoformat()
+    })
+
+    return new_state
+
+
+def update_state_with_planning_decision(state: IntegratedWorkflowState, decision: Dict[str, Any]) -> IntegratedWorkflowState:
+    """
+    使用规划决策结果更新状态
+
+    Args:
+        state: 当前状态
+        decision: 规划决策
+
+    Returns:
+        更新后的状态
+    """
+    new_state = state.copy()
+    new_state["planning_step"] += 1
+    new_state["last_decision"] = decision.get("decision", "unknown")
+    new_state["next_route"] = decision.get("next_route", "planning_node")
+
+    # 如果有待计算指标,更新待计算列表
+    if decision.get("metrics_to_compute"):
+        new_state["pending_metric_ids"] = decision["metrics_to_compute"]
+
+    # 添加规划历史
+    new_state["plan_history"].append(
+        f"Step {new_state['planning_step']}: {decision.get('decision', 'unknown')}"
+    )
+
+    return new_state
+
+
+def finalize_state_with_report(state: IntegratedWorkflowState, final_report: Dict[str, Any]) -> IntegratedWorkflowState:
+    """
+    使用最终报告完成状态
+
+    Args:
+        state: 当前状态
+        final_report: 最终报告
+
+    Returns:
+        完成的状态
+    """
+    new_state = state.copy()
+    new_state["report_draft"] = final_report
+    new_state["is_complete"] = True
+    new_state["answer"] = final_report
+    new_state["end_time"] = datetime.now().isoformat()
+
+    # 计算完整性分数
+    progress = get_calculation_progress(new_state)
+    new_state["completeness_score"] = progress["coverage_rate"]
+
+    return new_state