planning_agent.py 17 KB


  1. """
  2. 规划Agent (Planning Agent)
  3. =========================
  4. 此Agent负责分析当前状态并做出智能决策,决定下一步行动。
  5. 核心功能:
  6. 1. 状态评估:分析大纲、指标计算进度和完整性
  7. 2. 决策制定:决定生成大纲、计算指标、完成报告或澄清需求
  8. 3. 优先级排序:确定最关键的任务和指标
  9. 4. 流程控制:管理整个报告生成工作流的执行顺序
  10. 决策逻辑:
  11. - 大纲为空 → 生成大纲
  12. - 指标覆盖率 < 80% → 计算指标
  13. - 指标覆盖率 ≥ 80% → 生成报告
  14. - 需求模糊 → 澄清需求
  15. 技术实现:
  16. - 使用LangChain和结构化输出
  17. - 支持异步处理
  18. - 智能状态评估
  19. - 灵活的决策机制
  20. 作者: Big Agent Team
  21. 版本: 1.0.0
  22. 创建时间: 2024-12-20
  23. """
  24. from typing import List, Dict, Optional, Any, Union
  25. from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
  26. from pydantic import BaseModel, Field
  27. from langchain_openai import ChatOpenAI
  28. import json
  29. import os
  30. from datetime import datetime
  31. # 数据模型定义
  32. class ActionItem(BaseModel):
  33. """动作项定义"""
  34. action: str = Field(description="动作名称")
  35. parameters: Optional[Dict[str, Any]] = Field(default_factory=dict, description="动作参数")
  36. class ClarificationRequest(BaseModel):
  37. """澄清请求结构化格式"""
  38. questions: List[str] = Field(description="需要澄清的问题列表")
  39. missing_fields: List[str] = Field(default_factory=list, description="缺少的字段或信息")
  40. class PlanningDecision(BaseModel):
  41. """规划决策输出"""
  42. decision: str = Field(
  43. description="决策类型: generate_outline, compute_metrics, finalize_report, clarify_requirements"
  44. )
  45. reasoning: str = Field(description="详细推理过程")
  46. next_actions: List[Union[str, ActionItem]] = Field(
  47. default_factory=list,
  48. description="下一步动作列表"
  49. )
  50. metrics_to_compute: List[str] = Field(
  51. default_factory=list,
  52. description="待计算指标ID列表(如 ['total_income', 'avg_balance'])"
  53. )
  54. priority_metrics: List[str] = Field(
  55. default_factory=list,
  56. description="优先级高的指标ID"
  57. )
  58. additional_requirements: Optional[
  59. Union[Dict[str, Any], List[Any], ClarificationRequest]
  60. ] = Field(default=None, description="额外需求或澄清信息")
  61. def normalize_requirements(req: Any) -> Optional[Dict[str, Any]]:
  62. """
  63. 规范化 additional_requirements
  64. 将列表转换为字典格式
  65. """
  66. if req is None:
  67. return None
  68. if isinstance(req, dict):
  69. return req
  70. if isinstance(req, list):
  71. # 如果LLM错误地返回了列表,转换为字典格式
  72. return {
  73. "questions": [str(item) for item in req],
  74. "missing_fields": []
  75. }
  76. return {"raw": str(req)}
  77. class PlanningAgent:
  78. """规划智能体:负责状态分析和决策制定"""
  79. def __init__(self, api_key: str, base_url: str = "https://api.deepseek.com"):
  80. """
  81. 初始化规划Agent
  82. Args:
  83. api_key: DeepSeek API密钥
  84. base_url: DeepSeek API基础URL
  85. """
  86. self.llm = ChatOpenAI(
  87. model="deepseek-chat",
  88. api_key=api_key,
  89. base_url=base_url,
  90. temperature=0.1
  91. )
  92. # 初始化API调用跟踪
  93. self.api_calls = []
  94. def create_planning_prompt(self) -> ChatPromptTemplate:
  95. """创建规划提示模板"""
  96. return ChatPromptTemplate.from_messages([
  97. ("system", """你是报告规划总控智能体,核心职责是精准分析当前状态并决定下一步行动。
  98. ### 决策选项(四选一)
  99. 1. generate_outline:大纲未生成或大纲无效
  100. 2. compute_metrics:大纲已生成但指标未完成(覆盖率<80%)
  101. 3. finalize_report:指标覆盖率≥80%,信息充足
  102. 4. clarify_requirements:用户需求模糊,缺少关键信息
  103. ### 决策规则(按顺序检查)
  104. 1. 检查 outline_draft 是否为空 → 空则选择 generate_outline
  105. 2. 检查 metrics_requirements 是否为空 → 空则选择 generate_outline
  106. 3. 检查是否有待计算指标 → 有则选择 compute_metrics
  107. 4. 所有指标都已计算完成 → 选择 finalize_report
  108. 5. 如果无法理解需求 → 选择 clarify_requirements
  109. ### 重要原则
  110. - 大纲草稿已存在时,不要重复生成大纲
  111. - 决策为 compute_metrics 时,必须从状态信息中的"有效待计算指标ID列表"中选择
  112. - 确保 metrics_to_compute 是字符串数组格式
  113. - 确保指标ID与大纲中的global_metrics.metric_id完全一致
  114. - 从状态信息中的"有效待计算指标ID列表"中提取metric_id作为metrics_to_compute的值
  115. - 计算失败的指标可以重试最多3次
  116. - 绝对不要自己生成新的指标ID,必须严格使用状态信息中提供的已有指标ID
  117. - 如果状态信息中没有可用的指标ID,不要生成compute_metrics决策
  118. ### 输出字段说明
  119. - decision: 决策字符串
  120. - reasoning: 决策原因说明
  121. - next_actions: 动作列表(可选)
  122. - metrics_to_compute: 待计算指标ID列表,必须从状态信息中的可用指标ID中选择(决策为compute_metrics时必须提供)
  123. - priority_metrics: 优先级指标列表(前2-3个最重要的指标)
  124. - additional_requirements: 额外需求(可选)
  125. 必须输出有效的JSON格式!"""),
  126. MessagesPlaceholder("messages"),
  127. ("user", "报告需求:{question}\n\n请输出决策结果。")
  128. ])
  129. async def make_decision(self, question: str, industry: str, current_state: Dict[str, Any]) -> PlanningDecision:
  130. """
  131. 根据当前状态做出规划决策
  132. Args:
  133. question: 用户查询
  134. industry: 行业
  135. current_state: 当前状态信息
  136. Returns:
  137. 规划决策结果
  138. """
  139. planner = self.create_planning_prompt() | self.llm
  140. # 构建状态评估上下文
  141. status_info = self._build_status_context(current_state)
  142. # 记录大模型输入
  143. print("========================================")
  144. print("[AGENT] PlanningAgent (规划Agent)")
  145. print("[MODEL_INPUT] PlanningAgent:")
  146. print(f"[CONTEXT] 基于当前状态做出规划决策")
  147. print(f"Question: {question}")
  148. print(f"Status info: {status_info}")
  149. print("========================================")
  150. # 执行规划
  151. start_time = datetime.now()
  152. response = await planner.ainvoke({
  153. "question": question,
  154. "industry": industry,
  155. "messages": [("system", status_info)]
  156. })
  157. end_time = datetime.now()
  158. # 解析JSON响应
  159. try:
  160. # 从响应中提取JSON内容
  161. content = response.content if hasattr(response, 'content') else str(response)
  162. # 尝试找到JSON部分
  163. json_start = content.find('{')
  164. json_end = content.rfind('}') + 1
  165. if json_start >= 0 and json_end > json_start:
  166. json_str = content[json_start:json_end]
  167. decision_data = json.loads(json_str)
  168. # 预处理 additional_requirements 字段
  169. if "additional_requirements" in decision_data:
  170. req = decision_data["additional_requirements"]
  171. if isinstance(req, str):
  172. # 如果是字符串,尝试将其转换为合适的格式
  173. if req.strip():
  174. # 将字符串包装为字典格式
  175. decision_data["additional_requirements"] = {"raw_content": req}
  176. else:
  177. # 空字符串设为 None
  178. decision_data["additional_requirements"] = None
  179. elif isinstance(req, list):
  180. # 如果是列表,转换为字典格式
  181. decision_data["additional_requirements"] = {
  182. "questions": [str(item) for item in req],
  183. "missing_fields": []
  184. }
  185. # 如果已经是 dict 或其他允许的类型,保持不变
  186. decision = PlanningDecision(**decision_data)
  187. # 验证决策的合理性
  188. if decision.decision == "compute_metrics":
  189. if not decision.metrics_to_compute:
  190. raise ValueError("AI决策缺少具体的指标ID")
  191. # 如果AI生成的指标ID明显是错误的(比如metric_001),使用默认逻辑
  192. if any(mid.startswith("metric_") and mid.replace("metric_", "").isdigit()
  193. for mid in decision.metrics_to_compute):
  194. raise ValueError("AI生成的指标ID格式不正确")
  195. else:
  196. raise ValueError("No JSON found in response")
  197. except Exception as e:
  198. print(f"解析规划决策响应失败: {e},使用默认决策")
  199. # 返回默认决策
  200. decision = self._get_default_decision(current_state)
  201. # 记录API调用结果
  202. content = response.content if hasattr(response, 'content') else str(response)
  203. call_id = f"api_mll_规划决策_{'{:.2f}'.format((end_time - start_time).total_seconds())}"
  204. api_call_info = {
  205. "call_id": call_id,
  206. "timestamp": end_time.isoformat(),
  207. "agent": "PlanningAgent",
  208. "model": "deepseek-chat",
  209. "request": {
  210. "question": question,
  211. "status_info": status_info,
  212. "start_time": start_time.isoformat()
  213. },
  214. "response": {
  215. "content": content,
  216. "decision": decision.dict() if hasattr(decision, 'dict') else decision,
  217. "end_time": end_time.isoformat(),
  218. "duration": (end_time - start_time).total_seconds()
  219. },
  220. "success": True
  221. }
  222. self.api_calls.append(api_call_info)
  223. # 保存API结果到文件
  224. api_results_dir = "api_results"
  225. os.makedirs(api_results_dir, exist_ok=True)
  226. filename = f"{call_id}.json"
  227. filepath = os.path.join(api_results_dir, filename)
  228. try:
  229. with open(filepath, 'w', encoding='utf-8') as f:
  230. json.dump(api_call_info, f, ensure_ascii=False, indent=2)
  231. print(f"[API_RESULT] 保存API结果文件: {filepath}")
  232. except Exception as e:
  233. print(f"[ERROR] 保存API结果文件失败: {filepath}, 错误: {str(e)}")
  234. # 记录大模型输出
  235. print(f"[MODEL_OUTPUT] PlanningAgent: {json.dumps(decision.dict() if hasattr(decision, 'dict') else decision, ensure_ascii=False)}")
  236. print("========================================")
  237. return decision
  238. def _build_status_context(self, state: Dict[str, Any]) -> str:
  239. """构建状态评估上下文"""
  240. required_count = len(state.get("metrics_requirements", []))
  241. computed_count = len(state.get("computed_metrics", {}))
  242. coverage = computed_count / required_count if required_count > 0 else 0
  243. # 计算失败统计
  244. failed_attempts = state.get("failed_metric_attempts", {})
  245. pending_ids = state.get("pending_metric_ids", [])
  246. # 过滤掉失败次数过多的指标
  247. max_retry = 3
  248. filtered_pending_ids = [
  249. mid for mid in pending_ids
  250. if failed_attempts.get(mid, 0) < max_retry
  251. ]
  252. # 获取可用的指标ID
  253. available_metric_ids = []
  254. if state.get('outline_draft') and state.get('outline_draft').get('global_metrics'):
  255. available_metric_ids = [m.get('metric_id', '') for m in state['outline_draft']['global_metrics']]
  256. available_metric_ids = [mid for mid in available_metric_ids if mid] # 过滤空值
  257. return f"""当前状态评估:
  258. - 规划步骤: {state.get('planning_step', 0)}
  259. - 大纲版本: {state.get('outline_version', 0)}
  260. - 大纲草稿存在: {state.get('outline_draft') is not None}
  261. - 指标需求总数: {required_count}
  262. - 已计算指标数: {computed_count}
  263. - 指标覆盖率: {coverage:.2%}
  264. - 待计算指标数: {len(pending_ids)}
  265. - 有效待计算指标ID列表: {filtered_pending_ids}
  266. - 可用指标ID列表: {available_metric_ids}
  267. - 失败尝试记录: {failed_attempts}
  268. """
  269. def analyze_current_state(state: Dict[str, Any]) -> Dict[str, Any]:
  270. """
  271. 分析当前状态,返回关键信息
  272. Args:
  273. state: 当前状态
  274. Returns:
  275. 状态分析结果
  276. """
  277. required_metrics = state.get("metrics_requirements", [])
  278. computed_metrics = state.get("computed_metrics", {})
  279. # 计算覆盖率
  280. required_count = len(required_metrics)
  281. computed_count = len(computed_metrics)
  282. coverage = computed_count / required_count if required_count > 0 else 0
  283. # 找出未计算的指标
  284. computed_ids = set(computed_metrics.keys())
  285. pending_metrics = [
  286. m for m in required_metrics
  287. if m.metric_id not in computed_ids
  288. ]
  289. # 检查失败次数
  290. failed_attempts = state.get("failed_metric_attempts", {})
  291. max_retry = 3
  292. valid_pending_metrics = [
  293. m for m in pending_metrics
  294. if failed_attempts.get(m.metric_id, 0) < max_retry
  295. ]
  296. return {
  297. "has_outline": state.get("outline_draft") is not None,
  298. "required_count": required_count,
  299. "computed_count": computed_count,
  300. "coverage": coverage,
  301. "pending_metrics": pending_metrics,
  302. "valid_pending_metrics": valid_pending_metrics,
  303. "pending_ids": [m.metric_id for m in pending_metrics],
  304. "valid_pending_ids": [m.metric_id for m in valid_pending_metrics],
  305. "planning_step": state.get("planning_step", 0),
  306. "outline_version": state.get("outline_version", 0)
  307. }
  308. async def plan_next_action(question: str, industry: str, current_state: Dict[str, Any], api_key: str) -> PlanningDecision:
  309. """
  310. 规划下一步行动的主函数
  311. Args:
  312. question: 用户查询
  313. current_state: 当前状态
  314. api_key: API密钥
  315. Returns:
  316. 规划决策结果
  317. """
  318. agent = PlanningAgent(api_key)
  319. try:
  320. decision = await agent.make_decision(question, industry, current_state)
  321. print(f"\n🧠 规划决策:{decision.decision}")
  322. print(f" 推理:{decision.reasoning[:100]}...")
  323. if decision.metrics_to_compute:
  324. print(f" 待计算指标:{decision.metrics_to_compute}")
  325. return decision
  326. except Exception as e:
  327. print(f"⚠️ 规划决策出错: {e},使用默认决策")
  328. # 直接返回最基本的默认决策,避免复杂的默认决策逻辑
  329. return PlanningDecision(
  330. decision="finalize_report",
  331. reasoning="规划决策失败,使用默认的报告生成决策",
  332. next_actions=["生成最终报告"],
  333. metrics_to_compute=[],
  334. priority_metrics=[]
  335. )
  336. def _get_default_decision(self, current_state: Dict[str, Any]) -> PlanningDecision:
  337. """
  338. 基于状态分析的默认决策逻辑
  339. Args:
  340. current_state: 当前状态信息
  341. Returns:
  342. 默认规划决策
  343. """
  344. state_analysis = analyze_current_state(current_state)
  345. if not state_analysis["has_outline"]:
  346. default_decision = PlanningDecision(
  347. decision="generate_outline",
  348. reasoning="大纲不存在,需要先生成大纲",
  349. next_actions=["生成报告大纲"],
  350. metrics_to_compute=[],
  351. priority_metrics=[]
  352. )
  353. elif state_analysis["coverage"] < 0.8 and state_analysis["valid_pending_metrics"]:
  354. # 计算指标 - 使用实际的指标ID
  355. metrics_to_compute = state_analysis["valid_pending_ids"][:5] # 最多计算5个
  356. default_decision = PlanningDecision(
  357. decision="compute_metrics",
  358. reasoning=f"指标覆盖率{state_analysis['coverage']:.1%},需要计算更多指标",
  359. next_actions=[f"计算指标: {', '.join(metrics_to_compute)}"],
  360. metrics_to_compute=metrics_to_compute,
  361. priority_metrics=metrics_to_compute[:2] # 前2个为优先级
  362. )
  363. elif state_analysis["valid_pending_ids"]:
  364. # 还有指标但都失败了,生成报告
  365. default_decision = PlanningDecision(
  366. decision="finalize_report",
  367. reasoning="部分指标计算失败,但已有足够信息生成报告",
  368. next_actions=["生成最终报告"],
  369. metrics_to_compute=[],
  370. priority_metrics=[]
  371. )
  372. else:
  373. # 信息充足,生成报告
  374. default_decision = PlanningDecision(
  375. decision="finalize_report",
  376. reasoning="所有必要指标已计算完成",
  377. next_actions=["生成最终报告"],
  378. metrics_to_compute=[],
  379. priority_metrics=[]
  380. )
  381. return default_decision