JiaQiang 1 тиждень тому
батько
коміт
bb2a88db45
1 змінених файлів з 455 додано та 0 видалено
  1. 455 0
      llmops/agents/planning_agent.py

+ 455 - 0
llmops/agents/planning_agent.py

@@ -0,0 +1,455 @@
+"""
+规划Agent (Planning Agent)
+=========================
+
+此Agent负责分析当前状态并做出智能决策,决定下一步行动。
+
+核心功能:
+1. 状态评估:分析大纲、指标计算进度和完整性
+2. 决策制定:决定生成大纲、计算指标、完成报告或澄清需求
+3. 优先级排序:确定最关键的任务和指标
+4. 流程控制:管理整个报告生成工作流的执行顺序
+
+决策逻辑:
+- 大纲为空 → 生成大纲
+- 指标覆盖率 < 80% → 计算指标
+- 指标覆盖率 ≥ 80% → 生成报告
+- 需求模糊 → 澄清需求
+
+技术实现:
+- 使用LangChain和结构化输出
+- 支持异步处理
+- 智能状态评估
+- 灵活的决策机制
+
+作者: Big Agent Team
+版本: 1.0.0
+创建时间: 2024-12-20
+"""
+
+from typing import List, Dict, Optional, Any, Union
+from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
+from pydantic import BaseModel, Field
+from langchain_openai import ChatOpenAI
+import json
+import os
+from datetime import datetime
+
+
+# 数据模型定义
+class ActionItem(BaseModel):
+    """动作项定义"""
+    action: str = Field(description="动作名称")
+    parameters: Optional[Dict[str, Any]] = Field(default_factory=dict, description="动作参数")
+
+
+class ClarificationRequest(BaseModel):
+    """澄清请求结构化格式"""
+    questions: List[str] = Field(description="需要澄清的问题列表")
+    missing_fields: List[str] = Field(default_factory=list, description="缺少的字段或信息")
+
+
+class PlanningDecision(BaseModel):
+    """规划决策输出"""
+    decision: str = Field(
+        description="决策类型: generate_outline, compute_metrics, finalize_report, clarify_requirements"
+    )
+    reasoning: str = Field(description="详细推理过程")
+    next_actions: List[Union[str, ActionItem]] = Field(
+        default_factory=list,
+        description="下一步动作列表"
+    )
+    metrics_to_compute: List[str] = Field(
+        default_factory=list,
+        description="待计算指标ID列表(如 ['total_income', 'avg_balance'])"
+    )
+    priority_metrics: List[str] = Field(
+        default_factory=list,
+        description="优先级高的指标ID"
+    )
+    additional_requirements: Optional[
+        Union[Dict[str, Any], List[Any], ClarificationRequest]
+    ] = Field(default=None, description="额外需求或澄清信息")
+
+
+def normalize_requirements(req: Any) -> Optional[Dict[str, Any]]:
+    """
+    规范化 additional_requirements
+    将列表转换为字典格式
+    """
+    if req is None:
+        return None
+
+    if isinstance(req, dict):
+        return req
+
+    if isinstance(req, list):
+        # 如果LLM错误地返回了列表,转换为字典格式
+        return {
+            "questions": [str(item) for item in req],
+            "missing_fields": []
+        }
+
+    return {"raw": str(req)}
+
+
+class PlanningAgent:
+    """规划智能体:负责状态分析和决策制定"""
+
+    def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
+        """
+        初始化规划Agent
+
+        Args:
+            api_key: DeepSeek API密钥
+            base_url: DeepSeek API基础URL
+        """
+        self.llm = ChatOpenAI(
+            model="deepseek-chat",
+            api_key=api_key,
+            base_url=base_url,
+            temperature=0.1
+        )
+
+        # 初始化API调用跟踪
+        self.api_calls = []
+
+    def create_planning_prompt(self) -> ChatPromptTemplate:
+        """创建规划提示模板"""
+        return ChatPromptTemplate.from_messages([
+            ("system", """你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
+
+### 决策选项(四选一)
+1. generate_outline:大纲未生成或大纲无效
+2. compute_metrics:大纲已生成但指标未完成(覆盖率<80%)
+3. finalize_report:指标覆盖率≥80%,信息充足
+4. clarify_requirements:用户需求模糊,缺少关键信息
+
+### 决策规则(按顺序检查)
+1. 检查 outline_draft 是否为空 → 空则选择 generate_outline
+2. 检查 metrics_requirements 是否为空 → 空则选择 generate_outline
+3. 检查是否有待计算指标 → 有则选择 compute_metrics
+4. 所有指标都已计算完成 → 选择 finalize_report
+5. 如果无法理解需求 → 选择 clarify_requirements
+
+### 重要原则
+- 大纲草稿已存在时,不要重复生成大纲
+- 决策为 compute_metrics 时,必须从状态信息中的"有效待计算指标ID列表"中选择
+- 确保 metrics_to_compute 是字符串数组格式
+- 确保指标ID与大纲中的global_metrics.metric_id完全一致
+- 从状态信息中的"有效待计算指标ID列表"中提取metric_id作为metrics_to_compute的值
+- 计算失败的指标可以重试最多3次
+- 绝对不要自己生成新的指标ID,必须严格使用状态信息中提供的已有指标ID
+- 如果状态信息中没有可用的指标ID,不要生成compute_metrics决策
+
+### 输出字段说明
+- decision: 决策字符串
+- reasoning: 决策原因说明
+- next_actions: 动作列表(可选)
+- metrics_to_compute: 待计算指标ID列表,必须从状态信息中的可用指标ID中选择(决策为compute_metrics时必须提供)
+- priority_metrics: 优先级指标列表(前2-3个最重要的指标)
+- additional_requirements: 额外需求(可选)
+
+必须输出有效的JSON格式!"""),
+
+            MessagesPlaceholder("messages"),
+
+            ("user", "报告需求:{question}\n\n请输出决策结果。")
+        ])
+
+    async def make_decision(self, question: str, industry: str, current_state: Dict[str, Any]) -> PlanningDecision:
+        """
+        根据当前状态做出规划决策
+
+        Args:
+            question: 用户查询
+            industry: 行业
+            current_state: 当前状态信息
+
+        Returns:
+            规划决策结果
+        """
+        planner = self.create_planning_prompt() | self.llm
+
+        # 构建状态评估上下文
+        status_info = self._build_status_context(current_state)
+
+        # 记录大模型输入
+        print("========================================")
+        print("[AGENT] PlanningAgent (规划Agent)")
+        print("[MODEL_INPUT] PlanningAgent:")
+        print(f"[CONTEXT] 基于当前状态做出规划决策")
+        print(f"Question: {question}")
+        print(f"Status info: {status_info}")
+        print("========================================")
+
+        # 执行规划
+        start_time = datetime.now()
+        response = await planner.ainvoke({
+            "question": question,
+            "industry": industry,
+            "messages": [("system", status_info)]
+        })
+        end_time = datetime.now()
+
+        # 解析JSON响应
+        try:
+            # 从响应中提取JSON内容
+            content = response.content if hasattr(response, 'content') else str(response)
+            # 尝试找到JSON部分
+            json_start = content.find('{')
+            json_end = content.rfind('}') + 1
+            if json_start >= 0 and json_end > json_start:
+                json_str = content[json_start:json_end]
+                decision_data = json.loads(json_str)
+
+                # 预处理 additional_requirements 字段
+                if "additional_requirements" in decision_data:
+                    req = decision_data["additional_requirements"]
+                    if isinstance(req, str):
+                        # 如果是字符串,尝试将其转换为合适的格式
+                        if req.strip():
+                            # 将字符串包装为字典格式
+                            decision_data["additional_requirements"] = {"raw_content": req}
+                        else:
+                            # 空字符串设为 None
+                            decision_data["additional_requirements"] = None
+                    elif isinstance(req, list):
+                        # 如果是列表,转换为字典格式
+                        decision_data["additional_requirements"] = {
+                            "questions": [str(item) for item in req],
+                            "missing_fields": []
+                        }
+                    # 如果已经是 dict 或其他允许的类型,保持不变
+
+                decision = PlanningDecision(**decision_data)
+
+                # 验证决策的合理性
+                if decision.decision == "compute_metrics":
+                    if not decision.metrics_to_compute:
+                        raise ValueError("AI决策缺少具体的指标ID")
+                    # 如果AI生成的指标ID明显是错误的(比如metric_001),使用默认逻辑
+                    if any(mid.startswith("metric_") and mid.replace("metric_", "").isdigit()
+                          for mid in decision.metrics_to_compute):
+                        raise ValueError("AI生成的指标ID格式不正确")
+
+            else:
+                raise ValueError("No JSON found in response")
+        except Exception as e:
+            print(f"解析规划决策响应失败: {e},使用默认决策")
+            # 返回默认决策
+            decision = self._get_default_decision(current_state)
+
+        # 记录API调用结果
+        content = response.content if hasattr(response, 'content') else str(response)
+        call_id = f"api_mll_规划决策_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
+        api_call_info = {
+            "call_id": call_id,
+            "timestamp": end_time.isoformat(),
+            "agent": "PlanningAgent",
+            "model": "deepseek-chat",
+            "request": {
+                "question": question,
+                "status_info": status_info,
+                "start_time": start_time.isoformat()
+            },
+            "response": {
+                "content": content,
+                "decision": decision.dict() if hasattr(decision, 'dict') else decision,
+                "end_time": end_time.isoformat(),
+                "duration": (end_time - start_time).total_seconds()
+            },
+            "success": True
+        }
+        self.api_calls.append(api_call_info)
+
+        # 保存API结果到文件
+        api_results_dir = "api_results"
+        os.makedirs(api_results_dir, exist_ok=True)
+        filename = f"{call_id}.json"
+        filepath = os.path.join(api_results_dir, filename)
+
+        try:
+            with open(filepath, 'w', encoding='utf-8') as f:
+                json.dump(api_call_info, f, ensure_ascii=False, indent=2)
+            print(f"[API_RESULT] 保存API结果文件: {filepath}")
+        except Exception as e:
+            print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
+
+        # 记录大模型输出
+        print(f"[MODEL_OUTPUT] PlanningAgent: {json.dumps(decision.dict() if hasattr(decision, 'dict') else decision, ensure_ascii=False)}")
+        print("========================================")
+
+        return decision
+
+    def _build_status_context(self, state: Dict[str, Any]) -> str:
+        """构建状态评估上下文"""
+        required_count = len(state.get("metrics_requirements", []))
+        computed_count = len(state.get("computed_metrics", {}))
+        coverage = computed_count / required_count if required_count > 0 else 0
+
+        # 计算失败统计
+        failed_attempts = state.get("failed_metric_attempts", {})
+        pending_ids = state.get("pending_metric_ids", [])
+
+        # 过滤掉失败次数过多的指标
+        max_retry = 3
+        filtered_pending_ids = [
+            mid for mid in pending_ids
+            if failed_attempts.get(mid, 0) < max_retry
+        ]
+
+        # 获取可用的指标ID
+        available_metric_ids = []
+        if state.get('outline_draft') and state.get('outline_draft').get('global_metrics'):
+            available_metric_ids = [m.get('metric_id', '') for m in state['outline_draft']['global_metrics']]
+            available_metric_ids = [mid for mid in available_metric_ids if mid]  # 过滤空值
+
+        return f"""当前状态评估:
+- 规划步骤: {state.get('planning_step', 0)}
+- 大纲版本: {state.get('outline_version', 0)}
+- 大纲草稿存在: {state.get('outline_draft') is not None}
+- 指标需求总数: {required_count}
+- 已计算指标数: {computed_count}
+- 指标覆盖率: {coverage:.2%}
+- 待计算指标数: {len(pending_ids)}
+- 有效待计算指标ID列表: {filtered_pending_ids}
+- 可用指标ID列表: {available_metric_ids}
+- 失败尝试记录: {failed_attempts}
+"""
+
+
+def analyze_current_state(state: Dict[str, Any]) -> Dict[str, Any]:
+    """
+    分析当前状态,返回关键信息
+
+    Args:
+        state: 当前状态
+
+    Returns:
+        状态分析结果
+    """
+    required_metrics = state.get("metrics_requirements", [])
+    computed_metrics = state.get("computed_metrics", {})
+
+    # 计算覆盖率
+    required_count = len(required_metrics)
+    computed_count = len(computed_metrics)
+    coverage = computed_count / required_count if required_count > 0 else 0
+
+    # 找出未计算的指标
+    computed_ids = set(computed_metrics.keys())
+    pending_metrics = [
+        m for m in required_metrics
+        if m.metric_id not in computed_ids
+    ]
+
+    # 检查失败次数
+    failed_attempts = state.get("failed_metric_attempts", {})
+    max_retry = 3
+    valid_pending_metrics = [
+        m for m in pending_metrics
+        if failed_attempts.get(m.metric_id, 0) < max_retry
+    ]
+
+    return {
+        "has_outline": state.get("outline_draft") is not None,
+        "required_count": required_count,
+        "computed_count": computed_count,
+        "coverage": coverage,
+        "pending_metrics": pending_metrics,
+        "valid_pending_metrics": valid_pending_metrics,
+        "pending_ids": [m.metric_id for m in pending_metrics],
+        "valid_pending_ids": [m.metric_id for m in valid_pending_metrics],
+        "planning_step": state.get("planning_step", 0),
+        "outline_version": state.get("outline_version", 0)
+    }
+
+
+async def plan_next_action(question: str, industry: str, current_state: Dict[str, Any], api_key: str) -> PlanningDecision:
+    """
+    规划下一步行动的主函数
+
+    Args:
+        question: 用户查询
+        current_state: 当前状态
+        api_key: API密钥
+
+    Returns:
+        规划决策结果
+    """
+    agent = PlanningAgent(api_key)
+
+    try:
+        decision = await agent.make_decision(question, industry, current_state)
+
+        print(f"\n🧠 规划决策:{decision.decision}")
+        print(f"   推理:{decision.reasoning[:100]}...")
+
+        if decision.metrics_to_compute:
+            print(f"   待计算指标:{decision.metrics_to_compute}")
+
+        return decision
+
+    except Exception as e:
+        print(f"⚠️ 规划决策出错: {e},使用默认决策")
+
+        # 直接返回最基本的默认决策,避免复杂的默认决策逻辑
+        return PlanningDecision(
+            decision="finalize_report",
+            reasoning="规划决策失败,使用默认的报告生成决策",
+            next_actions=["生成最终报告"],
+            metrics_to_compute=[],
+            priority_metrics=[]
+        )
+
+    def _get_default_decision(self, current_state: Dict[str, Any]) -> PlanningDecision:
+        """
+        基于状态分析的默认决策逻辑
+
+        Args:
+            current_state: 当前状态信息
+
+        Returns:
+            默认规划决策
+        """
+        state_analysis = analyze_current_state(current_state)
+
+        if not state_analysis["has_outline"]:
+            default_decision = PlanningDecision(
+                decision="generate_outline",
+                reasoning="大纲不存在,需要先生成大纲",
+                next_actions=["生成报告大纲"],
+                metrics_to_compute=[],
+                priority_metrics=[]
+            )
+        elif state_analysis["coverage"] < 0.8 and state_analysis["valid_pending_metrics"]:
+            # 计算指标 - 使用实际的指标ID
+            metrics_to_compute = state_analysis["valid_pending_ids"][:5]  # 最多计算5个
+            default_decision = PlanningDecision(
+                decision="compute_metrics",
+                reasoning=f"指标覆盖率{state_analysis['coverage']:.1%},需要计算更多指标",
+                next_actions=[f"计算指标: {', '.join(metrics_to_compute)}"],
+                metrics_to_compute=metrics_to_compute,
+                priority_metrics=metrics_to_compute[:2]  # 前2个为优先级
+            )
+        elif state_analysis["valid_pending_ids"]:
+            # 还有指标但都失败了,生成报告
+            default_decision = PlanningDecision(
+                decision="finalize_report",
+                reasoning="部分指标计算失败,但已有足够信息生成报告",
+                next_actions=["生成最终报告"],
+                metrics_to_compute=[],
+                priority_metrics=[]
+            )
+        else:
+            # 信息充足,生成报告
+            default_decision = PlanningDecision(
+                decision="finalize_report",
+                reasoning="所有必要指标已计算完成",
+                next_actions=["生成最终报告"],
+                metrics_to_compute=[],
+                priority_metrics=[]
+            )
+
+        return default_decision