| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- from typing import List, Dict, Optional, Any, Union
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
- from pydantic import BaseModel, Field
- from langchain_openai import ChatOpenAI
- import json
- from llmops.agents.state import AgentState, MetricRequirement, convert_numpy_types
- from llmops.agents.datadev.llm import get_llm
- class ActionItem(BaseModel):
- """动作项定义"""
- action: str = Field(description="动作名称")
- parameters: Optional[Dict[str, Any]] = Field(default_factory=dict)
- class ClarificationRequest(BaseModel):
- """澄清请求结构化格式"""
- questions: List[str] = Field(description="需要澄清的问题列表")
- missing_fields: List[str] = Field(default_factory=list, description="缺少的字段或信息")
- class PlanningOutput(BaseModel):
- """规划决策输出 - 支持灵活格式"""
- decision: str = Field(
- description="决策类型: generate_outline, compute_metrics, finalize, clarify"
- )
- reasoning: str = Field(description="详细推理过程")
- next_actions: List[Union[str, ActionItem]] = Field(
- default_factory=list,
- description="下一步动作列表"
- )
- # 关键修复:明确传递待计算指标ID列表
- metrics_to_compute: List[str] = Field(
- default_factory=list,
- description="待计算指标ID列表(如 ['total_income', 'avg_balance'])"
- )
- additional_requirements: Optional[
- Union[Dict[str, Any], List[Any], ClarificationRequest]
- ] = Field(default=None, description="额外需求或澄清信息")
- def normalize_additional_requirements(req: Any) -> Optional[Dict[str, Any]]:
- """
- 规范化 additional_requirements
- 将列表转换为字典格式
- """
- if req is None:
- return None
- if isinstance(req, dict):
- return req
- if isinstance(req, list):
- # 如果LLM错误地返回了列表,转换为字典格式
- return {
- "questions": [str(item) for item in req],
- "missing_fields": []
- }
- return {"raw": str(req)}
- def create_planning_agent(llm, state: AgentState):
- """创建规划智能体(修复版:移除JSON示例,避免变量冲突)"""
- prompt = ChatPromptTemplate.from_messages([
- ("system", """你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
- ### 决策选项(四选一)
- 1. generate_outline:大纲未生成或大纲无效
- 2. compute_metrics:大纲已生成但指标未完成(覆盖率<80%)
- 3. finalize:指标覆盖率≥80%,信息充足
- 4. clarify:用户需求模糊,缺少关键信息
- ### 决策规则(按顺序检查)
- 1. 检查 outline_draft 是否为空 → 空则选择 generate_outline
- 2. 检查 metrics_requirements 是否为空 → 空则选择 generate_outline
- 3. 计算指标覆盖率 = 已计算指标 / 总需求指标
- - 覆盖率 < 0.8 → 选择 compute_metrics
- - 覆盖率 ≥ 0.8 → 选择 finalize
- 4. 如果无法理解需求 → 选择 clarify
- ### 重要原则
- - 大纲草稿已存在时,不要重复生成大纲
- - 决策为 compute_metrics 时,必须提供具体的指标ID列表
- - 确保 metrics_to_compute 是字符串数组格式
- ### 输出字段说明
- - decision: 决策字符串
- - reasoning: 决策原因说明
- - next_actions: 动作列表(可选)
- - metrics_to_compute: 待计算指标ID列表(决策为compute_metrics时必须提供)
- - additional_requirements: 额外需求(可选)
- 必须输出有效的JSON格式!"""),
- MessagesPlaceholder("messages"),
- ("user", "报告需求:{question}\n\n请输出决策结果。")
- ])
- return prompt | llm.with_structured_output(PlanningOutput)
- async def planning_node(state: AgentState) -> AgentState:
- """规划节点:正确识别待计算指标并传递"""
- llm = get_llm()
- planner = create_planning_agent(llm, state)
- # 构建完整的状态评估上下文
- required_count = len(state["metrics_requirements"])
- computed_count = len(state["computed_metrics"])
- coverage = computed_count / required_count if required_count > 0 else 0
- # 新增:跟踪失败次数,避免无限循环
- failed_attempts = state.get("failed_metric_attempts", {})
- pending_ids = state.get("pending_metric_ids", [])
- # 过滤掉失败次数过多的指标
- max_retry = 3
- filtered_pending_ids = [
- mid for mid in pending_ids
- if failed_attempts.get(mid, 0) < max_retry
- ]
- status_snapshot = f"""当前状态评估:
- - 规划步骤: {state['planning_step']}
- - 大纲版本: {state['outline_version']}
- - 大纲草稿存在: {state['outline_draft'] is not None}
- - 指标需求总数: {required_count}
- - 已计算指标数: {computed_count}
- - 指标覆盖率: {coverage:.2%}
- - 待计算指标数: {len(pending_ids)}
- - 有效待计算指标数: {len(filtered_pending_ids)}
- - 失败尝试记录: {failed_attempts}
- 建议下一步: {"计算指标" if coverage < 0.8 else "生成报告"}"""
- # 执行规划
- result = await planner.ainvoke({
- "question": state["question"],
- "messages": [("system", status_snapshot)]
- })
- # 规范化结果
- normalized_req = normalize_additional_requirements(result.additional_requirements)
- # 找出所有未计算的指标
- computed_ids = set(state["computed_metrics"].keys())
- required_metrics = state["metrics_requirements"]
- pending_metrics = [
- m for m in required_metrics
- if m.metric_id not in computed_ids
- ]
- # 关键:使用 LLM 返回的指标ID,如果没有则使用全部待计算指标
- if result.metrics_to_compute:
- pending_ids = result.metrics_to_compute
- valid_ids = [m.metric_id for m in pending_metrics]
- pending_metrics = [m for m in pending_metrics if m.metric_id in pending_ids and m.metric_id in valid_ids]
- # 更新状态
- new_state = state.copy()
- new_state["plan_history"].append(
- f"Step {new_state['planning_step']}: {result.decision}"
- )
- new_state["planning_step"] += 1
- new_state["additional_requirements"] = normalized_req
- # 关键:保存待计算指标ID列表
- if pending_metrics:
- pending_ids = [m.metric_id for m in pending_metrics]
- new_state["pending_metric_ids"] = pending_ids
- new_state["metrics_to_compute"] = pending_metrics # 保存完整对象
- # 设置路由标志
- if result.decision == "generate_outline":
- new_state["messages"].append(
- ("ai", f"📋 规划决策:生成大纲 (v{new_state['outline_version'] + 1})")
- )
- new_state["next_route"] = "outline_generator"
- elif result.decision == "compute_metrics":
- # 修复:确保显示正确的数量
- if not pending_metrics:
- # 如果没有待计算指标但有需求,则计算所有未完成的
- computed_ids = set(state["computed_metrics"].keys())
- pending_metrics = [m for m in required_metrics if m.metric_id not in computed_ids]
- # 新增:如果有效待计算指标为空但还有指标未计算,说明都失败了太多次
- if not filtered_pending_ids and pending_ids:
- new_state["messages"].append(
- ("ai", f"⚠️ 剩余 {len(pending_ids)} 个指标已多次计算失败,将跳过这些指标直接生成报告")
- )
- new_state["next_route"] = "report_compiler"
- # 关键修复:返回前清理状态
- return convert_numpy_types(new_state)
- new_state["messages"].append(
- ("ai", f"🧮 规划决策:计算 {len(pending_metrics)} 个指标 ({[m.metric_id for m in pending_metrics]})")
- )
- new_state["next_route"] = "metrics_calculator"
- elif result.decision == "finalize":
- new_state["is_complete"] = True
- new_state["messages"].append(
- ("ai", f"✅ 规划决策:信息充足,生成最终报告(覆盖率 {coverage:.2%})")
- )
- new_state["next_route"] = "report_compiler"
- elif result.decision == "clarify":
- questions = []
- if normalized_req and "questions" in normalized_req:
- questions = normalized_req["questions"]
- new_state["messages"].append(
- ("ai", f"❓ 需要澄清:{';'.join(questions) if questions else '请提供更详细的需求'}")
- )
- new_state["next_route"] = "clarify_node"
- # 关键修复:返回前清理状态
- return convert_numpy_types(new_state)
|