| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- from typing import List, Dict, Optional, Any, Union
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
- from pydantic import BaseModel, Field
- from langchain_openai import ChatOpenAI
- import json
- import os
- from datetime import datetime
- # 数据模型定义
- class ActionItem(BaseModel):
- """动作项定义"""
- action: str = Field(description="动作名称")
- parameters: Optional[Dict[str, Any]] = Field(default_factory=dict, description="动作参数")
- class ClarificationRequest(BaseModel):
- """澄清请求结构化格式"""
- questions: List[str] = Field(description="需要澄清的问题列表")
- missing_fields: List[str] = Field(default_factory=list, description="缺少的字段或信息")
- class PlanningDecision(BaseModel):
- """规划决策输出"""
- decision: str = Field(
- description="决策类型: data_classify, generate_outline, compute_metrics, finalize_report, clarify_requirements"
- )
- reasoning: str = Field(description="详细推理过程")
- next_actions: List[Union[str, ActionItem]] = Field(
- default_factory=list,
- description="下一步动作列表"
- )
- metrics_to_compute: List[str] = Field(
- default_factory=list,
- description="待计算指标ID列表(如 ['total_income', 'avg_balance'])"
- )
- priority_metrics: List[str] = Field(
- default_factory=list,
- description="优先级高的指标ID"
- )
- additional_requirements: Optional[
- Union[Dict[str, Any], List[Any], ClarificationRequest]
- ] = Field(default=None, description="额外需求或澄清信息")
- def normalize_requirements(req: Any) -> Optional[Dict[str, Any]]:
- """
- 规范化 additional_requirements
- 将列表转换为字典格式
- """
- if req is None:
- return None
- if isinstance(req, dict):
- return req
- if isinstance(req, list):
- # 如果LLM错误地返回了列表,转换为字典格式
- return {
- "questions": [str(item) for item in req],
- "missing_fields": []
- }
- return {"raw": str(req)}
- class PlanningAgent:
- """规划智能体:负责状态分析和决策制定"""
- def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
- """
- 初始化规划Agent
- Args:
- api_key: DeepSeek API密钥
- base_url: DeepSeek API基础URL
- """
- self.llm = ChatOpenAI(
- model="deepseek-chat",
- api_key=api_key,
- base_url=base_url,
- temperature=0.1
- )
- # 初始化API调用跟踪
- self.api_calls = []
- def create_planning_prompt(self) -> ChatPromptTemplate:
- """创建规划提示模板"""
- return ChatPromptTemplate.from_messages([
- ("system", """
- 你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
- ### 决策选项(三选一)
- 1. data_classify: 数据未分类打标或分类打标数据集数量为0
- 2. generate_outline:大纲未生成或大纲无效
- 3. compute_metrics:大纲已生成但指标未完成
- ### 决策规则(按顺序检查)
- 1. 检查 data_set_classified 是否为空 或 数量为0时 → 选择 data_classify
- 2. 检查 outline_draft 是否为空 → 空则选择 generate_outline
- 3. 检查 metrics_requirements 是否为空 → 空则选择 generate_outline
- 4. 检查是否有待计算指标 → 有则选择 compute_metrics
- 5. 所有指标都已计算完成 → 选择 finalize_report
- 6. 如果无法理解需求 → 选择 clarify_requirements
- ### 重要原则
- - 数据已分类打标后,不要重复分类打标
- - 大纲草稿已存在时,不要重复生成大纲
- - 决策为 compute_metrics 时,必须从状态信息中的"有效待计算指标ID列表"中选择
- - 确保 metrics_to_compute 是字符串数组格式
- - 确保指标ID与大纲中的global_metrics.metric_id完全一致
- - 从状态信息中的"有效待计算指标ID列表"中提取metric_id作为metrics_to_compute的值
- - 计算失败的指标可以重试最多3次
- - 绝对不要自己生成新的指标ID,必须严格使用状态信息中提供的已有指标ID
- - 如果状态信息中没有可用的指标ID,不要生成compute_metrics决策
- ### 输出字段说明
- - decision: 决策字符串
- - reasoning: 决策原因说明
- - metrics_to_compute: 待计算指标ID列表,必须从状态信息中的"有效待计算指标ID列表"中选择。选择所有可用指标,除非指标数量过多(>10个)需要分批计算
- - priority_metrics: 优先级指标列表(前2-3个最重要的指标),从metrics_to_compute中选择
- 必须输出有效的JSON格式!"""),
- MessagesPlaceholder("messages"),
- ("user", "报告需求:{question}\n\n请输出决策结果。")
- ])
- async def make_decision(self, question: str, industry: str, current_state: Dict[str, Any]) -> PlanningDecision:
- """
- 根据当前状态做出规划决策
- Args:
- question: 用户查询
- industry: 行业
- current_state: 当前状态信息
- Returns:
- 规划决策结果
- """
- planner = self.create_planning_prompt() | self.llm
- # 构建状态评估上下文
- status_info = self._build_status_context(current_state)
- # 记录大模型输入
- print("========================================")
- print("[AGENT] PlanningAgent (规划Agent)")
- print("[MODEL_INPUT] PlanningAgent:")
- print(f"[CONTEXT] 基于当前状态做出规划决策")
- print(f"Question: {question}")
- print(f"Status info: {status_info}")
- print("========================================")
- # 执行规划
- start_time = datetime.now()
- response = await planner.ainvoke({
- "question": question,
- "industry": industry,
- "messages": [("system", status_info)]
- })
- end_time = datetime.now()
- # 解析JSON响应
- try:
- # 从响应中提取JSON内容
- content = response.content if hasattr(response, 'content') else str(response)
- # 尝试找到JSON部分
- json_start = content.find('{')
- json_end = content.rfind('}') + 1
- if json_start >= 0 and json_end > json_start:
- json_str = content[json_start:json_end]
- decision_data = json.loads(json_str)
- # 预处理 additional_requirements 字段
- if "additional_requirements" in decision_data:
- req = decision_data["additional_requirements"]
- if isinstance(req, str):
- # 如果是字符串,尝试将其转换为合适的格式
- if req.strip():
- # 将字符串包装为字典格式
- decision_data["additional_requirements"] = {"raw_content": req}
- else:
- # 空字符串设为 None
- decision_data["additional_requirements"] = None
- elif isinstance(req, list):
- # 如果是列表,转换为字典格式
- decision_data["additional_requirements"] = {
- "questions": [str(item) for item in req],
- "missing_fields": []
- }
- # 如果已经是 dict 或其他允许的类型,保持不变
- decision = PlanningDecision(**decision_data)
- # 验证决策的合理性
- if decision.decision == "compute_metrics":
- if not decision.metrics_to_compute:
- raise ValueError("AI决策缺少具体的指标ID")
- # 如果AI生成的指标ID明显是错误的(比如metric_001),使用默认逻辑
- if any(mid.startswith("metric_") and mid.replace("metric_", "").isdigit()
- for mid in decision.metrics_to_compute):
- raise ValueError("AI生成的指标ID格式不正确")
- else:
- raise ValueError("No JSON found in response")
- except Exception as e:
- print(f"解析规划决策响应失败: {e},使用默认决策")
- # 返回默认决策
- decision = self._get_default_decision(current_state)
- # 记录API调用结果
- content = response.content if hasattr(response, 'content') else str(response)
- call_id = f"api_mll_规划决策_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
- api_call_info = {
- "call_id": call_id,
- "timestamp": end_time.isoformat(),
- "agent": "PlanningAgent",
- "model": "deepseek-chat",
- "request": {
- "question": question,
- "status_info": status_info,
- "start_time": start_time.isoformat()
- },
- "response": {
- "content": content,
- "decision": decision.dict() if hasattr(decision, 'dict') else decision,
- "end_time": end_time.isoformat(),
- "duration": (end_time - start_time).total_seconds()
- },
- "success": True
- }
- self.api_calls.append(api_call_info)
- # 保存API结果到文件
- api_results_dir = "api_results"
- os.makedirs(api_results_dir, exist_ok=True)
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- filename = f"{timestamp}_{call_id}.json"
- filepath = os.path.join(api_results_dir, filename)
- try:
- with open(filepath, 'w', encoding='utf-8') as f:
- json.dump(api_call_info, f, ensure_ascii=False, indent=2)
- print(f"[API_RESULT] 保存API结果文件: {filepath}")
- except Exception as e:
- print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
- # 记录大模型输出
- print(
- f"[MODEL_OUTPUT] PlanningAgent: {json.dumps(decision.dict() if hasattr(decision, 'dict') else decision, ensure_ascii=False)}")
- print("========================================")
- return decision
- def _build_status_context(self, state: Dict[str, Any]) -> str:
- """构建状态评估上下文"""
- required_count = len(state.get("metrics_requirements", []))
- computed_count = len(state.get("computed_metrics", {}))
- coverage = computed_count / required_count if required_count > 0 else 0
- # 计算失败统计
- failed_attempts = state.get("failed_metric_attempts", {})
- pending_ids = state.get("pending_metric_ids", [])
- # 过滤掉失败次数过多的指标
- max_retry = 3
- filtered_pending_ids = [
- mid for mid in pending_ids
- if failed_attempts.get(mid, 0) < max_retry
- ]
- # 获取可用的指标ID
- available_metric_ids = []
- outline_draft = state.get('outline_draft')
- if outline_draft and outline_draft.global_metrics:
- available_metric_ids = [m.metric_id for m in outline_draft.global_metrics if m.metric_id]
- return f"""
- 当前状态评估:
- - 规划步骤: {state.get('planning_step', 0)}
- - 数据分类打标数量: {len(state.get('data_set_classified', 0))}
- - 大纲版本: {state.get('outline_version', 0)}
- - 大纲草稿存在: {state.get('outline_draft') is not None}
- - 指标需求总数: {required_count}
- - 已计算指标数: {computed_count}
- - 指标覆盖率: {coverage:.2%}
- - 待计算指标数: {len(pending_ids)}
- - 有效待计算指标ID列表: {filtered_pending_ids}
- - 可用指标ID列表: {available_metric_ids}
- - 失败尝试记录: {failed_attempts}
- """
- def analyze_current_state(state: Dict[str, Any]) -> Dict[str, Any]:
- """
- 分析当前状态,返回关键信息
- Args:
- state: 当前状态
- Returns:
- 状态分析结果
- """
- required_metrics = state.get("metrics_requirements", [])
- computed_metrics = state.get("computed_metrics", {})
- # 计算覆盖率
- required_count = len(required_metrics)
- computed_count = len(computed_metrics)
- coverage = computed_count / required_count if required_count > 0 else 0
- # 找出未计算的指标
- computed_ids = set(computed_metrics.keys())
- pending_metrics = [
- m for m in required_metrics
- if m.metric_id not in computed_ids
- ]
- # 检查失败次数
- failed_attempts = state.get("failed_metric_attempts", {})
- max_retry = 3
- valid_pending_metrics = [
- m for m in pending_metrics
- if failed_attempts.get(m.metric_id, 0) < max_retry
- ]
- return {
- "has_outline": state.get("outline_draft") is not None,
- "required_count": required_count,
- "computed_count": computed_count,
- "coverage": coverage,
- "pending_metrics": pending_metrics,
- "valid_pending_metrics": valid_pending_metrics,
- "pending_ids": [m.metric_id for m in pending_metrics],
- "valid_pending_ids": [m.metric_id for m in valid_pending_metrics],
- "planning_step": state.get("planning_step", 0),
- "outline_version": state.get("outline_version", 0)
- }
- async def plan_next_action(question: str, industry: str, current_state: Dict[str, Any],
- api_key: str) -> PlanningDecision:
- """
- 规划下一步行动的主函数
- Args:
- question: 用户查询
- current_state: 当前状态
- api_key: API密钥
- Returns:
- 规划决策结果
- """
- agent = PlanningAgent(api_key)
- try:
- decision = await agent.make_decision(question, industry, current_state)
- print(f"\n🧠 规划决策:{decision.decision}")
- print(f" 推理:{decision.reasoning[:100]}...")
- if decision.metrics_to_compute:
- print(f" 待计算指标:{decision.metrics_to_compute}")
- return decision
- except Exception as e:
- print(f"⚠️ 规划决策出错: {e},使用默认决策")
- # 直接返回最基本的默认决策,避免复杂的默认决策逻辑
- return PlanningDecision(
- decision="finalize_report",
- reasoning="规划决策失败,使用默认的报告生成决策",
- next_actions=["生成最终报告"],
- metrics_to_compute=[],
- priority_metrics=[]
- )
|